diff options
author | Erik Johnston <erik@matrix.org> | 2022-05-17 10:34:27 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-05-17 10:34:27 +0100 |
commit | fcf951d5dc7ca8c4cb18aa9c1f5ccb005df3610a (patch) | |
tree | 261694bc915a44abc55dc062ffb4ba5eb959f9a7 /synapse/storage/databases | |
parent | Tidy up and type-hint the database engine modules (#12734) (diff) | |
download | synapse-fcf951d5dc7ca8c4cb18aa9c1f5ccb005df3610a.tar.xz |
Track in memory events using weakrefs (#10533)
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 35 |
1 files changed, 33 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index a4a604a499..5b22d6b452 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -14,6 +14,7 @@ import logging import threading +import weakref from enum import Enum, auto from typing import ( TYPE_CHECKING, @@ -23,6 +24,7 @@ from typing import ( Dict, Iterable, List, + MutableMapping, Optional, Set, Tuple, @@ -248,6 +250,12 @@ class EventsWorkerStore(SQLBaseStore): str, ObservableDeferred[Dict[str, EventCacheEntry]] ] = {} + # We keep track of the events we have currently loaded in memory so that + # we can reuse them even if they've been evicted from the cache. We only + # track events that don't need redacting in here (as then we don't need + # to track redaction status). + self._event_ref: MutableMapping[str, EventBase] = weakref.WeakValueDictionary() + self._event_fetch_lock = threading.Condition() self._event_fetch_list: List[ Tuple[Iterable[str], "defer.Deferred[Dict[str, _EventRow]]"] @@ -723,6 +731,8 @@ class EventsWorkerStore(SQLBaseStore): def _invalidate_get_event_cache(self, event_id: str) -> None: self._get_event_cache.invalidate((event_id,)) + self._event_ref.pop(event_id, None) + self._current_event_fetches.pop(event_id, None) def _get_events_from_cache( self, events: Iterable[str], update_metrics: bool = True @@ -738,13 +748,30 @@ class EventsWorkerStore(SQLBaseStore): event_map = {} for event_id in events: + # First check if it's in the event cache ret = self._get_event_cache.get( (event_id,), None, update_metrics=update_metrics ) - if not ret: + if ret: + event_map[event_id] = ret continue - event_map[event_id] = ret + # Otherwise check if we still have the event in memory. + event = self._event_ref.get(event_id) + if event: + # Reconstruct an event cache entry + + cache_entry = EventCacheEntry( + event=event, + # We don't cache weakrefs to redacted events, so we know + # this is None. + redacted_event=None, + ) + event_map[event_id] = cache_entry + + # We add the entry back into the cache as we want to keep + # recently queried events in the cache. + self._get_event_cache.set((event_id,), cache_entry) return event_map @@ -1124,6 +1151,10 @@ class EventsWorkerStore(SQLBaseStore): self._get_event_cache.set((event_id,), cache_entry) result_map[event_id] = cache_entry + if not redacted_event: + # We only cache references to unredacted events. + self._event_ref[event_id] = original_ev + return result_map async def _enqueue_events(self, events: Collection[str]) -> Dict[str, _EventRow]: |