summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/10703.bugfix1
-rw-r--r--synapse/storage/databases/main/events_worker.py29
2 files changed, 24 insertions, 6 deletions
diff --git a/changelog.d/10703.bugfix b/changelog.d/10703.bugfix
new file mode 100644
index 0000000000..a5a4ecf8ee
--- /dev/null
+++ b/changelog.d/10703.bugfix
@@ -0,0 +1 @@
+Fix a regression introduced in v1.41.0 which affected the performance of concurrent fetches of large sets of events, in extreme cases causing the process to hang.
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 375463e4e9..9501f00f3b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -520,16 +520,26 @@ class EventsWorkerStore(SQLBaseStore):
         # We now look up if we're already fetching some of the events in the DB,
         # if so we wait for those lookups to finish instead of pulling the same
         # events out of the DB multiple times.
-        already_fetching: Dict[str, defer.Deferred] = {}
+        #
+        # Note: we might get the same `ObservableDeferred` back for multiple
+        # events we're already fetching, so we deduplicate the deferreds to
+        # avoid extraneous work (if we don't do this we can end up in a n^2 mode
+        # when we wait on the same Deferred N times, then try and merge the
+        # same dict into itself N times).
+        already_fetching_ids: Set[str] = set()
+        already_fetching_deferreds: Set[
+            ObservableDeferred[Dict[str, _EventCacheEntry]]
+        ] = set()
 
         for event_id in missing_events_ids:
             deferred = self._current_event_fetches.get(event_id)
             if deferred is not None:
                 # We're already pulling the event out of the DB. Add the deferred
                 # to the collection of deferreds to wait on.
-                already_fetching[event_id] = deferred.observe()
+                already_fetching_ids.add(event_id)
+                already_fetching_deferreds.add(deferred)
 
-        missing_events_ids.difference_update(already_fetching)
+        missing_events_ids.difference_update(already_fetching_ids)
 
         if missing_events_ids:
             log_ctx = current_context()
@@ -569,18 +579,25 @@ class EventsWorkerStore(SQLBaseStore):
             with PreserveLoggingContext():
                 fetching_deferred.callback(missing_events)
 
-        if already_fetching:
+        if already_fetching_deferreds:
             # Wait for the other event requests to finish and add their results
             # to ours.
             results = await make_deferred_yieldable(
                 defer.gatherResults(
-                    already_fetching.values(),
+                    (d.observe() for d in already_fetching_deferreds),
                     consumeErrors=True,
                 )
             ).addErrback(unwrapFirstError)
 
             for result in results:
-                event_entry_map.update(result)
+                # We filter out events that we haven't asked for as we might get
+                # a *lot* of superfluous events back, and there is no point
+                # going through and inserting them all (which can take time).
+                event_entry_map.update(
+                    (event_id, entry)
+                    for event_id, entry in result.items()
+                    if event_id in already_fetching_ids
+                )
 
         if not allow_rejected:
             event_entry_map = {