summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2021-08-27 10:15:50 +0100
committerGitHub <noreply@github.com>2021-08-27 09:15:50 +0000
commitc4fa4f37cbc734f9cd6354a5f2661efc30d73cac (patch)
treecf10c0705a4d3f89fd6ed42b71c10bd230f96a94 /synapse/storage/databases
parentSplit `FederationHandler` in half (#10692) (diff)
downloadsynapse-c4fa4f37cbc734f9cd6354a5f2661efc30d73cac.tar.xz
Fix perf of fetching the same events many times. (#10703)
The code to deduplicate repeated fetches of the same set of events was
N^2 (over the number of events requested), which could lead to a process
being completely wedged.

The main fix is to deduplicate the returned deferreds so we only await
on a deferred once rather than many times. Seperately, when handling the
returned events from the defrered we only add the events we care about
to the event map to be returned (so that we don't pay the price of
inserting extraneous events into the dict).
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events_worker.py29
1 files changed, 23 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 375463e4e9..9501f00f3b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -520,16 +520,26 @@ class EventsWorkerStore(SQLBaseStore):
         # We now look up if we're already fetching some of the events in the DB,
         # if so we wait for those lookups to finish instead of pulling the same
         # events out of the DB multiple times.
-        already_fetching: Dict[str, defer.Deferred] = {}
+        #
+        # Note: we might get the same `ObservableDeferred` back for multiple
+        # events we're already fetching, so we deduplicate the deferreds to
+        # avoid extraneous work (if we don't do this we can end up in a n^2 mode
+        # when we wait on the same Deferred N times, then try and merge the
+        # same dict into itself N times).
+        already_fetching_ids: Set[str] = set()
+        already_fetching_deferreds: Set[
+            ObservableDeferred[Dict[str, _EventCacheEntry]]
+        ] = set()
 
         for event_id in missing_events_ids:
             deferred = self._current_event_fetches.get(event_id)
             if deferred is not None:
                 # We're already pulling the event out of the DB. Add the deferred
                 # to the collection of deferreds to wait on.
-                already_fetching[event_id] = deferred.observe()
+                already_fetching_ids.add(event_id)
+                already_fetching_deferreds.add(deferred)
 
-        missing_events_ids.difference_update(already_fetching)
+        missing_events_ids.difference_update(already_fetching_ids)
 
         if missing_events_ids:
             log_ctx = current_context()
@@ -569,18 +579,25 @@ class EventsWorkerStore(SQLBaseStore):
             with PreserveLoggingContext():
                 fetching_deferred.callback(missing_events)
 
-        if already_fetching:
+        if already_fetching_deferreds:
             # Wait for the other event requests to finish and add their results
             # to ours.
             results = await make_deferred_yieldable(
                 defer.gatherResults(
-                    already_fetching.values(),
+                    (d.observe() for d in already_fetching_deferreds),
                     consumeErrors=True,
                 )
             ).addErrback(unwrapFirstError)
 
             for result in results:
-                event_entry_map.update(result)
+                # We filter out events that we haven't asked for as we might get
+                # a *lot* of superfluous events back, and there is no point
+                # going through and inserting them all (which can take time).
+                event_entry_map.update(
+                    (event_id, entry)
+                    for event_id, entry in result.items()
+                    if event_id in already_fetching_ids
+                )
 
         if not allow_rejected:
             event_entry_map = {