summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_worker.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/events_worker.py')
-rw-r--r--synapse/storage/databases/main/events_worker.py75
1 files changed, 68 insertions, 7 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py

index c547ba7afd..8cf7ae4acd 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py
@@ -603,7 +603,11 @@ class EventsWorkerStore(SQLBaseStore): Returns: map from event id to result """ - event_entry_map = await self._get_events_from_cache( + # Shortcut: check if we have any events in the *in memory* cache - this function + # may be called repeatedly for the same event so at this point we cannot reach + # out to any external cache for performance reasons. The external cache is + # checked later on in the `get_missing_events_from_cache_or_db` function below. + event_entry_map = self._get_events_from_local_cache( event_ids, ) @@ -635,7 +639,9 @@ class EventsWorkerStore(SQLBaseStore): if missing_events_ids: - async def get_missing_events_from_db() -> Dict[str, EventCacheEntry]: + async def get_missing_events_from_cache_or_db() -> Dict[ + str, EventCacheEntry + ]: """Fetches the events in `missing_event_ids` from the database. Also creates entries in `self._current_event_fetches` to allow @@ -660,10 +666,18 @@ class EventsWorkerStore(SQLBaseStore): # the events have been redacted, and if so pulling the redaction event # out of the database to check it. # + missing_events = {} try: - missing_events = await self._get_events_from_db( + # Try to fetch from any external cache. We already checked the + # in-memory cache above. + missing_events = await self._get_events_from_external_cache( missing_events_ids, ) + # Now actually fetch any remaining events from the DB + db_missing_events = await self._get_events_from_db( + missing_events_ids - missing_events.keys(), + ) + missing_events.update(db_missing_events) except Exception as e: with PreserveLoggingContext(): fetching_deferred.errback(e) @@ -682,7 +696,7 @@ class EventsWorkerStore(SQLBaseStore): # cancellations, since multiple `_get_events_from_cache_or_db` calls can # reuse the same fetch. missing_events: Dict[str, EventCacheEntry] = await delay_cancellation( - get_missing_events_from_db() + get_missing_events_from_cache_or_db() ) event_entry_map.update(missing_events) @@ -757,7 +771,54 @@ class EventsWorkerStore(SQLBaseStore): async def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True ) -> Dict[str, EventCacheEntry]: - """Fetch events from the caches. + """Fetch events from the caches, both in memory and any external. + + May return rejected events. + + Args: + events: list of event_ids to fetch + update_metrics: Whether to update the cache hit ratio metrics + """ + event_map = self._get_events_from_local_cache( + events, update_metrics=update_metrics + ) + + missing_event_ids = (e for e in events if e not in event_map) + event_map.update( + await self._get_events_from_external_cache( + events=missing_event_ids, + update_metrics=update_metrics, + ) + ) + + return event_map + + async def _get_events_from_external_cache( + self, events: Iterable[str], update_metrics: bool = True + ) -> Dict[str, EventCacheEntry]: + """Fetch events from any configured external cache. + + May return rejected events. + + Args: + events: list of event_ids to fetch + update_metrics: Whether to update the cache hit ratio metrics + """ + event_map = {} + + for event_id in events: + ret = await self._get_event_cache.get_external( + (event_id,), None, update_metrics=update_metrics + ) + if ret: + event_map[event_id] = ret + + return event_map + + def _get_events_from_local_cache( + self, events: Iterable[str], update_metrics: bool = True + ) -> Dict[str, EventCacheEntry]: + """Fetch events from the local, in memory, caches. May return rejected events. @@ -769,7 +830,7 @@ class EventsWorkerStore(SQLBaseStore): for event_id in events: # First check if it's in the event cache - ret = await self._get_event_cache.get( + ret = self._get_event_cache.get_local( (event_id,), None, update_metrics=update_metrics ) if ret: @@ -791,7 +852,7 @@ class EventsWorkerStore(SQLBaseStore): # We add the entry back into the cache as we want to keep # recently queried events in the cache. - await self._get_event_cache.set((event_id,), cache_entry) + self._get_event_cache.set_local((event_id,), cache_entry) return event_map