summary refs log tree commit diff
path: root/synapse/storage/databases/main/events_worker.py
diff options
context:
space:
mode:
authorNick Mills-Barrett <nick@beeper.com>2022-07-19 13:25:29 +0200
committerGitHub <noreply@github.com>2022-07-19 11:25:29 +0000
commit2ee0b6ef4b78bada535beb30301cf0e01cbb7d81 (patch)
tree763ff882ab3a25d50bdf9b5168b162261f0666c3 /synapse/storage/databases/main/events_worker.py
parentIncrease batch size of `bulk_get_push_rules` and `_get_joined_profiles_from_e... (diff)
downloadsynapse-2ee0b6ef4b78bada535beb30301cf0e01cbb7d81.tar.xz
Safe async event cache (#13308)
Fix race conditions in the async cache invalidation logic, by separating
the async & local invalidation calls and ensuring any async call i
executed first.

Signed off by Nick @ Beeper (@Fizzadar).
Diffstat (limited to 'synapse/storage/databases/main/events_worker.py')
-rw-r--r--synapse/storage/databases/main/events_worker.py48
1 files changed, 39 insertions, 9 deletions
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index f3935bfead..4435373146 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -712,17 +712,41 @@ class EventsWorkerStore(SQLBaseStore):
 
         return event_entry_map
 
-    async def _invalidate_get_event_cache(self, event_id: str) -> None:
-        # First we invalidate the asynchronous cache instance. This may include
-        # out-of-process caches such as Redis/memcache. Once complete we can
-        # invalidate any in memory cache. The ordering is important here to
-        # ensure we don't pull in any remote invalid value after we invalidate
-        # the in-memory cache.
+    def invalidate_get_event_cache_after_txn(
+        self, txn: LoggingTransaction, event_id: str
+    ) -> None:
+        """
+        Prepares a database transaction to invalidate the get event cache for a given
+        event ID when executed successfully. This is achieved by attaching two callbacks
+        to the transaction, one to invalidate the async cache and one for the in memory
+        sync cache (importantly called in that order).
+
+        Arguments:
+            txn: the database transaction to attach the callbacks to
+            event_id: the event ID to be invalidated from caches
+        """
+
+        txn.async_call_after(self._invalidate_async_get_event_cache, event_id)
+        txn.call_after(self._invalidate_local_get_event_cache, event_id)
+
+    async def _invalidate_async_get_event_cache(self, event_id: str) -> None:
+        """
+        Invalidates an event in the asyncronous get event cache, which may be remote.
+
+        Arguments:
+            event_id: the event ID to invalidate
+        """
+
         await self._get_event_cache.invalidate((event_id,))
-        self._event_ref.pop(event_id, None)
-        self._current_event_fetches.pop(event_id, None)
 
     def _invalidate_local_get_event_cache(self, event_id: str) -> None:
+        """
+        Invalidates an event in local in-memory get event caches.
+
+        Arguments:
+            event_id: the event ID to invalidate
+        """
+
         self._get_event_cache.invalidate_local((event_id,))
         self._event_ref.pop(event_id, None)
         self._current_event_fetches.pop(event_id, None)
@@ -958,7 +982,13 @@ class EventsWorkerStore(SQLBaseStore):
                 }
 
                 row_dict = self.db_pool.new_transaction(
-                    conn, "do_fetch", [], [], self._fetch_event_rows, events_to_fetch
+                    conn,
+                    "do_fetch",
+                    [],
+                    [],
+                    [],
+                    self._fetch_event_rows,
+                    events_to_fetch,
                 )
 
                 # We only want to resolve deferreds from the main thread