summary refs log tree commit diff
path: root/synapse/storage/databases/main/cache.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/cache.py')
-rw-r--r--synapse/storage/databases/main/cache.py54
1 files changed, 35 insertions, 19 deletions
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 12e9a42382..db6ce83a2b 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -33,7 +33,7 @@ from synapse.storage.database import (
 )
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.util.caches.descriptors import _CachedFunction
+from synapse.util.caches.descriptors import CachedFunction
 from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
@@ -223,15 +223,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         # process triggering the invalidation is responsible for clearing any external
         # cached objects.
         self._invalidate_local_get_event_cache(event_id)
-        self.have_seen_event.invalidate((room_id, event_id))
 
-        self.get_latest_event_ids_in_room.invalidate((room_id,))
-
-        self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
+        self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
+        self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
+        self._attempt_to_invalidate_cache(
+            "get_unread_event_push_actions_by_room_for_user", (room_id,)
+        )
 
         # The `_get_membership_from_event_id` is immutable, except for the
         # case where we look up an event *before* persisting it.
-        self._get_membership_from_event_id.invalidate((event_id,))
+        self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
@@ -240,19 +241,26 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             self._invalidate_local_get_event_cache(redacts)
             # Caches which might leak edits must be invalidated for the event being
             # redacted.
-            self.get_relations_for_event.invalidate((redacts,))
-            self.get_applicable_edit.invalidate((redacts,))
+            self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
+            self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))
 
         if etype == EventTypes.Member:
             self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
-            self.get_invited_rooms_for_local_user.invalidate((state_key,))
+            self._attempt_to_invalidate_cache(
+                "get_invited_rooms_for_local_user", (state_key,)
+            )
 
         if relates_to:
-            self.get_relations_for_event.invalidate((relates_to,))
-            self.get_aggregation_groups_for_event.invalidate((relates_to,))
-            self.get_applicable_edit.invalidate((relates_to,))
-            self.get_thread_summary.invalidate((relates_to,))
-            self.get_thread_participated.invalidate((relates_to,))
+            self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
+            self._attempt_to_invalidate_cache(
+                "get_aggregation_groups_for_event", (relates_to,)
+            )
+            self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
+            self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
+            self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
+            self._attempt_to_invalidate_cache(
+                "get_mutual_event_relations_for_rel_type", (relates_to,)
+            )
 
     async def invalidate_cache_and_stream(
         self, cache_name: str, keys: Tuple[Any, ...]
@@ -269,9 +277,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             return
 
         cache_func.invalidate(keys)
-        await self.db_pool.runInteraction(
-            "invalidate_cache_and_stream",
-            self._send_invalidation_to_replication,
+        await self.send_invalidation_to_replication(
             cache_func.__name__,
             keys,
         )
@@ -279,7 +285,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
     def _invalidate_cache_and_stream(
         self,
         txn: LoggingTransaction,
-        cache_func: _CachedFunction,
+        cache_func: CachedFunction,
         keys: Tuple[Any, ...],
     ) -> None:
         """Invalidates the cache and adds it to the cache stream so slaves
@@ -293,7 +299,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
 
     def _invalidate_all_cache_and_stream(
-        self, txn: LoggingTransaction, cache_func: _CachedFunction
+        self, txn: LoggingTransaction, cache_func: CachedFunction
     ) -> None:
         """Invalidates the entire cache and adds it to the cache stream so slaves
         will know to invalidate their caches.
@@ -334,6 +340,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 txn, CURRENT_STATE_CACHE_NAME, [room_id]
             )
 
+    async def send_invalidation_to_replication(
+        self, cache_name: str, keys: Optional[Collection[Any]]
+    ) -> None:
+        await self.db_pool.runInteraction(
+            "send_invalidation_to_replication",
+            self._send_invalidation_to_replication,
+            cache_name,
+            keys,
+        )
+
     def _send_invalidation_to_replication(
         self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]]
     ) -> None: