diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-07-19 13:25:29 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-19 11:25:29 +0000 |
commit | 2ee0b6ef4b78bada535beb30301cf0e01cbb7d81 (patch) | |
tree | 763ff882ab3a25d50bdf9b5168b162261f0666c3 /synapse/storage/databases | |
parent | Increase batch size of `bulk_get_push_rules` and `_get_joined_profiles_from_e... (diff) | |
download | synapse-2ee0b6ef4b78bada535beb30301cf0e01cbb7d81.tar.xz |
Safe async event cache (#13308)
Fix race conditions in the async cache invalidation logic, by separating the async & local invalidation calls and ensuring any async call i executed first. Signed off by Nick @ Beeper (@Fizzadar).
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r-- | synapse/storage/databases/main/censor_events.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/events.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 48 | ||||
-rw-r--r-- | synapse/storage/databases/main/monthly_active_users.py | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 2 |
5 files changed, 45 insertions, 14 deletions
diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py index fd3fc298b3..58177ecec1 100644 --- a/synapse/storage/databases/main/censor_events.py +++ b/synapse/storage/databases/main/censor_events.py @@ -194,7 +194,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase # changed its content in the database. We can't call # self._invalidate_cache_and_stream because self.get_event_cache isn't of the # right type. - txn.call_after(self._get_event_cache.invalidate, (event.event_id,)) + self.invalidate_get_event_cache_after_txn(txn, event.event_id) # Send that invalidation to replication so that other workers also invalidate # the event cache. self._send_invalidation_to_replication( diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index fa2266ba20..156e1bd5ab 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1293,7 +1293,7 @@ class PersistEventsStore: depth_updates: Dict[str, int] = {} for event, context in events_and_contexts: # Remove the any existing cache entries for the event_ids - txn.call_after(self.store._invalidate_get_event_cache, event.event_id) + self.store.invalidate_get_event_cache_after_txn(txn, event.event_id) # Then update the `stream_ordering` position to mark the latest # event as the front of the room. This should not be done for # backfilled events because backfilled events have negative @@ -1675,7 +1675,7 @@ class PersistEventsStore: (cache_entry.event.event_id,), cache_entry ) - txn.call_after(prefill) + txn.async_call_after(prefill) def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None: """Invalidate the caches for the redacted event. @@ -1684,7 +1684,7 @@ class PersistEventsStore: _invalidate_caches_for_event. """ assert event.redacts is not None - txn.call_after(self.store._invalidate_get_event_cache, event.redacts) + self.store.invalidate_get_event_cache_after_txn(txn, event.redacts) txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,)) txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,)) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index f3935bfead..4435373146 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -712,17 +712,41 @@ class EventsWorkerStore(SQLBaseStore): return event_entry_map - async def _invalidate_get_event_cache(self, event_id: str) -> None: - # First we invalidate the asynchronous cache instance. This may include - # out-of-process caches such as Redis/memcache. Once complete we can - # invalidate any in memory cache. The ordering is important here to - # ensure we don't pull in any remote invalid value after we invalidate - # the in-memory cache. + def invalidate_get_event_cache_after_txn( + self, txn: LoggingTransaction, event_id: str + ) -> None: + """ + Prepares a database transaction to invalidate the get event cache for a given + event ID when executed successfully. This is achieved by attaching two callbacks + to the transaction, one to invalidate the async cache and one for the in memory + sync cache (importantly called in that order). + + Arguments: + txn: the database transaction to attach the callbacks to + event_id: the event ID to be invalidated from caches + """ + + txn.async_call_after(self._invalidate_async_get_event_cache, event_id) + txn.call_after(self._invalidate_local_get_event_cache, event_id) + + async def _invalidate_async_get_event_cache(self, event_id: str) -> None: + """ + Invalidates an event in the asyncronous get event cache, which may be remote. + + Arguments: + event_id: the event ID to invalidate + """ + await self._get_event_cache.invalidate((event_id,)) - self._event_ref.pop(event_id, None) - self._current_event_fetches.pop(event_id, None) def _invalidate_local_get_event_cache(self, event_id: str) -> None: + """ + Invalidates an event in local in-memory get event caches. + + Arguments: + event_id: the event ID to invalidate + """ + self._get_event_cache.invalidate_local((event_id,)) self._event_ref.pop(event_id, None) self._current_event_fetches.pop(event_id, None) @@ -958,7 +982,13 @@ class EventsWorkerStore(SQLBaseStore): } row_dict = self.db_pool.new_transaction( - conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch + conn, + "do_fetch", + [], + [], + [], + self._fetch_event_rows, + events_to_fetch, ) # We only want to resolve deferreds from the main thread diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py index 9a63f953fb..efd136a864 100644 --- a/synapse/storage/databases/main/monthly_active_users.py +++ b/synapse/storage/databases/main/monthly_active_users.py @@ -66,6 +66,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore): "initialise_mau_threepids", [], [], + [], self._initialise_reserved_users, hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value], ) diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 6d42276503..f6822707e4 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -304,7 +304,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): self._invalidate_cache_and_stream( txn, self.have_seen_event, (room_id, event_id) ) - txn.call_after(self._invalidate_get_event_cache, event_id) + self.invalidate_get_event_cache_after_txn(txn, event_id) logger.info("[purge] done") |