summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-07-19 13:25:29 +0200
committerGitHub <noreply@github.com>2022-07-19 11:25:29 +0000
commit2ee0b6ef4b78bada535beb30301cf0e01cbb7d81 (patch)
tree763ff882ab3a25d50bdf9b5168b162261f0666c3 /synapse/storage/databases
parentIncrease batch size of `bulk_get_push_rules` and `_get_joined_profiles_from_e... (diff)
downloadsynapse-2ee0b6ef4b78bada535beb30301cf0e01cbb7d81.tar.xz
Safe async event cache (#13308)
Fix race conditions in the async cache invalidation logic, by separating
the async & local invalidation calls and ensuring any async call i
executed first.

Signed off by Nick @ Beeper (@Fizzadar).
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/censor_events.py2
-rw-r--r--synapse/storage/databases/main/events.py6
-rw-r--r--synapse/storage/databases/main/events_worker.py48
-rw-r--r--synapse/storage/databases/main/monthly_active_users.py1
-rw-r--r--synapse/storage/databases/main/purge_events.py2
5 files changed, 45 insertions, 14 deletions
diff --git a/synapse/storage/databases/main/censor_events.py b/synapse/storage/databases/main/censor_events.py
index fd3fc298b3..58177ecec1 100644
--- a/synapse/storage/databases/main/censor_events.py
+++ b/synapse/storage/databases/main/censor_events.py
@@ -194,7 +194,7 @@ class CensorEventsStore(EventsWorkerStore, CacheInvalidationWorkerStore, SQLBase
             # changed its content in the database. We can't call
             # self._invalidate_cache_and_stream because self.get_event_cache isn't of the
             # right type.
-            txn.call_after(self._get_event_cache.invalidate, (event.event_id,))
+            self.invalidate_get_event_cache_after_txn(txn, event.event_id)
             # Send that invalidation to replication so that other workers also invalidate
             # the event cache.
             self._send_invalidation_to_replication(
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index fa2266ba20..156e1bd5ab 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1293,7 +1293,7 @@ class PersistEventsStore:
         depth_updates: Dict[str, int] = {}
         for event, context in events_and_contexts:
             # Remove the any existing cache entries for the event_ids
-            txn.call_after(self.store._invalidate_get_event_cache, event.event_id)
+            self.store.invalidate_get_event_cache_after_txn(txn, event.event_id)
             # Then update the `stream_ordering` position to mark the latest
             # event as the front of the room. This should not be done for
             # backfilled events because backfilled events have negative
@@ -1675,7 +1675,7 @@ class PersistEventsStore:
                     (cache_entry.event.event_id,), cache_entry
                 )
 
-        txn.call_after(prefill)
+        txn.async_call_after(prefill)
 
     def _store_redaction(self, txn: LoggingTransaction, event: EventBase) -> None:
         """Invalidate the caches for the redacted event.
@@ -1684,7 +1684,7 @@ class PersistEventsStore:
         _invalidate_caches_for_event.
         """
         assert event.redacts is not None
-        txn.call_after(self.store._invalidate_get_event_cache, event.redacts)
+        self.store.invalidate_get_event_cache_after_txn(txn, event.redacts)
         txn.call_after(self.store.get_relations_for_event.invalidate, (event.redacts,))
         txn.call_after(self.store.get_applicable_edit.invalidate, (event.redacts,))
 
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index f3935bfead..4435373146 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -712,17 +712,41 @@ class EventsWorkerStore(SQLBaseStore):
 
         return event_entry_map
 
-    async def _invalidate_get_event_cache(self, event_id: str) -> None:
-        # First we invalidate the asynchronous cache instance. This may include
-        # out-of-process caches such as Redis/memcache. Once complete we can
-        # invalidate any in memory cache. The ordering is important here to
-        # ensure we don't pull in any remote invalid value after we invalidate
-        # the in-memory cache.
+    def invalidate_get_event_cache_after_txn(
+        self, txn: LoggingTransaction, event_id: str
+    ) -> None:
+        """
+        Prepares a database transaction to invalidate the get event cache for a given
+        event ID when executed successfully. This is achieved by attaching two callbacks
+        to the transaction, one to invalidate the async cache and one for the in memory
+        sync cache (importantly called in that order).
+
+        Arguments:
+            txn: the database transaction to attach the callbacks to
+            event_id: the event ID to be invalidated from caches
+        """
+
+        txn.async_call_after(self._invalidate_async_get_event_cache, event_id)
+        txn.call_after(self._invalidate_local_get_event_cache, event_id)
+
+    async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
+        """
+        Invalidates an event in the asyncronous get event cache, which may be remote.
+
+        Arguments:
+            event_id: the event ID to invalidate
+        """
+
         await self._get_event_cache.invalidate((event_id,))
-        self._event_ref.pop(event_id, None)
-        self._current_event_fetches.pop(event_id, None)
 
     def _invalidate_local_get_event_cache(self, event_id: str) -> None:
+        """
+        Invalidates an event in local in-memory get event caches.
+
+        Arguments:
+            event_id: the event ID to invalidate
+        """
+
         self._get_event_cache.invalidate_local((event_id,))
         self._event_ref.pop(event_id, None)
         self._current_event_fetches.pop(event_id, None)
@@ -958,7 +982,13 @@ class EventsWorkerStore(SQLBaseStore):
                 }
 
                 row_dict = self.db_pool.new_transaction(
-                    conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch
+                    conn,
+                    "do_fetch",
+                    [],
+                    [],
+                    [],
+                    self._fetch_event_rows,
+                    events_to_fetch,
                 )
 
                 # We only want to resolve deferreds from the main thread
diff --git a/synapse/storage/databases/main/monthly_active_users.py b/synapse/storage/databases/main/monthly_active_users.py
index 9a63f953fb..efd136a864 100644
--- a/synapse/storage/databases/main/monthly_active_users.py
+++ b/synapse/storage/databases/main/monthly_active_users.py
@@ -66,6 +66,7 @@ class MonthlyActiveUsersWorkerStore(RegistrationWorkerStore):
                 "initialise_mau_threepids",
                 [],
                 [],
+                [],
                 self._initialise_reserved_users,
                 hs.config.server.mau_limits_reserved_threepids[: self._max_mau_value],
             )
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 6d42276503..f6822707e4 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -304,7 +304,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
                 self._invalidate_cache_and_stream(
                     txn, self.have_seen_event, (room_id, event_id)
                 )
-                txn.call_after(self._invalidate_get_event_cache, event_id)
+                self.invalidate_get_event_cache_after_txn(txn, event_id)
 
         logger.info("[purge] done")