diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-07-19 13:25:29 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-19 11:25:29 +0000 |
commit | 2ee0b6ef4b78bada535beb30301cf0e01cbb7d81 (patch) | |
tree | 763ff882ab3a25d50bdf9b5168b162261f0666c3 /synapse/storage/databases/main/events_worker.py | |
parent | Increase batch size of `bulk_get_push_rules` and `_get_joined_profiles_from_e... (diff) | |
download | synapse-2ee0b6ef4b78bada535beb30301cf0e01cbb7d81.tar.xz |
Safe async event cache (#13308)
Fix race conditions in the async cache invalidation logic, by separating the async & local invalidation calls and ensuring any async call i executed first. Signed off by Nick @ Beeper (@Fizzadar).
Diffstat (limited to 'synapse/storage/databases/main/events_worker.py')
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 48 |
1 files changed, 39 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f3935bfead..4435373146 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -712,17 +712,41 @@ class EventsWorkerStore(SQLBaseStore): return event_entry_map - async def _invalidate_get_event_cache(self, event_id: str) -> None: - # First we invalidate the asynchronous cache instance. This may include - # out-of-process caches such as Redis/memcache. Once complete we can - # invalidate any in memory cache. The ordering is important here to - # ensure we don't pull in any remote invalid value after we invalidate - # the in-memory cache. + def invalidate_get_event_cache_after_txn( + self, txn: LoggingTransaction, event_id: str + ) -> None: + """ + Prepares a database transaction to invalidate the get event cache for a given + event ID when executed successfully. This is achieved by attaching two callbacks + to the transaction, one to invalidate the async cache and one for the in memory + sync cache (importantly called in that order). + + Arguments: + txn: the database transaction to attach the callbacks to + event_id: the event ID to be invalidated from caches + """ + + txn.async_call_after(self._invalidate_async_get_event_cache, event_id) + txn.call_after(self._invalidate_local_get_event_cache, event_id) + + async def _invalidate_async_get_event_cache(self, event_id: str) -> None: + """ + Invalidates an event in the asyncronous get event cache, which may be remote. + + Arguments: + event_id: the event ID to invalidate + """ + await self._get_event_cache.invalidate((event_id,)) - self._event_ref.pop(event_id, None) - self._current_event_fetches.pop(event_id, None) def _invalidate_local_get_event_cache(self, event_id: str) -> None: + """ + Invalidates an event in local in-memory get event caches. + + Arguments: + event_id: the event ID to invalidate + """ + self._get_event_cache.invalidate_local((event_id,)) self._event_ref.pop(event_id, None) self._current_event_fetches.pop(event_id, None) @@ -958,7 +982,13 @@ class EventsWorkerStore(SQLBaseStore): } row_dict = self.db_pool.new_transaction( - conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch + conn, + "do_fetch", + [], + [], + [], + self._fetch_event_rows, + events_to_fetch, ) # We only want to resolve deferreds from the main thread |