summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorSean Quah <8349537+squahtx@users.noreply.github.com>2022-04-25 19:39:17 +0100
committerGitHub <noreply@github.com>2022-04-25 19:39:17 +0100
commit8a87b4435a736cd42454cad7e57b65ec911f01fa (patch)
tree196227670bdc7ec3383463ab5fa8370069b1f0c4 /synapse
parentCorrect typo in user_admin_api.md device deletion JSON (#12533) (diff)
downloadsynapse-8a87b4435a736cd42454cad7e57b65ec911f01fa.tar.xz
Handle cancellation in `EventsWorkerStore._get_events_from_cache_or_db` (#12529)
Multiple calls to `EventsWorkerStore._get_events_from_cache_or_db` can
reuse the same database fetch, which is initiated by the first call.
Ensure that cancelling the first call doesn't cancel the other calls
sharing the same database fetch.

Signed-off-by: Sean Quah <seanq@element.io>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/databases/main/events_worker.py83
1 files changed, 49 insertions, 34 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6d6e146ff1..c31fc00eaa 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -75,7 +75,7 @@ from synapse.storage.util.id_generators import (
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import JsonDict, get_domain_from_id
 from synapse.util import unwrapFirstError
-from synapse.util.async_helpers import ObservableDeferred
+from synapse.util.async_helpers import ObservableDeferred, delay_cancellation
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
@@ -640,42 +640,57 @@ class EventsWorkerStore(SQLBaseStore):
         missing_events_ids.difference_update(already_fetching_ids)
 
         if missing_events_ids:
-            log_ctx = current_context()
-            log_ctx.record_event_fetch(len(missing_events_ids))
-
-            # Add entries to `self._current_event_fetches` for each event we're
-            # going to pull from the DB. We use a single deferred that resolves
-            # to all the events we pulled from the DB (this will result in this
-            # function returning more events than requested, but that can happen
-            # already due to `_get_events_from_db`).
-            fetching_deferred: ObservableDeferred[
-                Dict[str, EventCacheEntry]
-            ] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
-            for event_id in missing_events_ids:
-                self._current_event_fetches[event_id] = fetching_deferred
-
-            # Note that _get_events_from_db is also responsible for turning db rows
-            # into FrozenEvents (via _get_event_from_row), which involves seeing if
-            # the events have been redacted, and if so pulling the redaction event out
-            # of the database to check it.
-            #
-            try:
-                missing_events = await self._get_events_from_db(
-                    missing_events_ids,
-                )
 
-                event_entry_map.update(missing_events)
-            except Exception as e:
-                with PreserveLoggingContext():
-                    fetching_deferred.errback(e)
-                raise e
-            finally:
-                # Ensure that we mark these events as no longer being fetched.
+            async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]:
+                """Fetches the events in `missing_event_ids` from the database.
+
+                Also creates entries in `self._current_event_fetches` to allow
+                concurrent `_get_events_from_cache_or_db` calls to reuse the same fetch.
+                """
+                log_ctx = current_context()
+                log_ctx.record_event_fetch(len(missing_events_ids))
+
+                # Add entries to `self._current_event_fetches` for each event we're
+                # going to pull from the DB. We use a single deferred that resolves
+                # to all the events we pulled from the DB (this will result in this
+                # function returning more events than requested, but that can happen
+                # already due to `_get_events_from_db`).
+                fetching_deferred: ObservableDeferred[
+                    Dict[str, EventCacheEntry]
+                ] = ObservableDeferred(defer.Deferred(), consumeErrors=True)
                 for event_id in missing_events_ids:
-                    self._current_event_fetches.pop(event_id, None)
+                    self._current_event_fetches[event_id] = fetching_deferred
 
-            with PreserveLoggingContext():
-                fetching_deferred.callback(missing_events)
+                # Note that _get_events_from_db is also responsible for turning db rows
+                # into FrozenEvents (via _get_event_from_row), which involves seeing if
+                # the events have been redacted, and if so pulling the redaction event
+                # out of the database to check it.
+                #
+                try:
+                    missing_events = await self._get_events_from_db(
+                        missing_events_ids,
+                    )
+                except Exception as e:
+                    with PreserveLoggingContext():
+                        fetching_deferred.errback(e)
+                    raise e
+                finally:
+                    # Ensure that we mark these events as no longer being fetched.
+                    for event_id in missing_events_ids:
+                        self._current_event_fetches.pop(event_id, None)
+
+                with PreserveLoggingContext():
+                    fetching_deferred.callback(missing_events)
+
+                return missing_events
+
+            # We must allow the database fetch to complete in the presence of
+            # cancellations, since multiple `_get_events_from_cache_or_db` calls can
+            # reuse the same fetch.
+            missing_events: Dict[str, EventCacheEntry] = await delay_cancellation(
+                get_missing_events_from_db()
+            )
+            event_entry_map.update(missing_events)
 
         if already_fetching_deferreds:
             # Wait for the other event requests to finish and add their results