diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 12e9a42382..a58668a380 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -33,7 +33,7 @@ from synapse.storage.database import (
)
from synapse.storage.engines import PostgresEngine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.util.caches.descriptors import _CachedFunction
+from synapse.util.caches.descriptors import CachedFunction
from synapse.util.iterutils import batch_iter
if TYPE_CHECKING:
@@ -205,6 +205,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self.get_rooms_for_user_with_stream_ordering.invalidate(
(data.state_key,)
)
+ self.get_rooms_for_user.invalidate((data.state_key,))
else:
raise Exception("Unknown events stream row type %s" % (row.type,))
@@ -223,15 +224,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# process triggering the invalidation is responsible for clearing any external
# cached objects.
self._invalidate_local_get_event_cache(event_id)
- self.have_seen_event.invalidate((room_id, event_id))
- self.get_latest_event_ids_in_room.invalidate((room_id,))
-
- self.get_unread_event_push_actions_by_room_for_user.invalidate((room_id,))
+ self._attempt_to_invalidate_cache("have_seen_event", (room_id, event_id))
+ self._attempt_to_invalidate_cache("get_latest_event_ids_in_room", (room_id,))
+ self._attempt_to_invalidate_cache(
+ "get_unread_event_push_actions_by_room_for_user", (room_id,)
+ )
# The `_get_membership_from_event_id` is immutable, except for the
# case where we look up an event *before* persisting it.
- self._get_membership_from_event_id.invalidate((event_id,))
+ self._attempt_to_invalidate_cache("_get_membership_from_event_id", (event_id,))
if not backfilled:
self._events_stream_cache.entity_has_changed(room_id, stream_ordering)
@@ -240,19 +242,31 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._invalidate_local_get_event_cache(redacts)
# Caches which might leak edits must be invalidated for the event being
# redacted.
- self.get_relations_for_event.invalidate((redacts,))
- self.get_applicable_edit.invalidate((redacts,))
+ self._attempt_to_invalidate_cache("get_relations_for_event", (redacts,))
+ self._attempt_to_invalidate_cache("get_applicable_edit", (redacts,))
+ self._attempt_to_invalidate_cache("get_thread_id", (redacts,))
+ self._attempt_to_invalidate_cache("get_thread_id_for_receipts", (redacts,))
if etype == EventTypes.Member:
self._membership_stream_cache.entity_has_changed(state_key, stream_ordering)
- self.get_invited_rooms_for_local_user.invalidate((state_key,))
+ self._attempt_to_invalidate_cache(
+ "get_invited_rooms_for_local_user", (state_key,)
+ )
+ self._attempt_to_invalidate_cache(
+ "get_rooms_for_user_with_stream_ordering", (state_key,)
+ )
+ self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
if relates_to:
- self.get_relations_for_event.invalidate((relates_to,))
- self.get_aggregation_groups_for_event.invalidate((relates_to,))
- self.get_applicable_edit.invalidate((relates_to,))
- self.get_thread_summary.invalidate((relates_to,))
- self.get_thread_participated.invalidate((relates_to,))
+ self._attempt_to_invalidate_cache("get_relations_for_event", (relates_to,))
+ self._attempt_to_invalidate_cache("get_references_for_event", (relates_to,))
+ self._attempt_to_invalidate_cache(
+ "get_aggregation_groups_for_event", (relates_to,)
+ )
+ self._attempt_to_invalidate_cache("get_applicable_edit", (relates_to,))
+ self._attempt_to_invalidate_cache("get_thread_summary", (relates_to,))
+ self._attempt_to_invalidate_cache("get_thread_participated", (relates_to,))
+ self._attempt_to_invalidate_cache("get_threads", (room_id,))
async def invalidate_cache_and_stream(
self, cache_name: str, keys: Tuple[Any, ...]
@@ -269,9 +283,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
return
cache_func.invalidate(keys)
- await self.db_pool.runInteraction(
- "invalidate_cache_and_stream",
- self._send_invalidation_to_replication,
+ await self.send_invalidation_to_replication(
cache_func.__name__,
keys,
)
@@ -279,7 +291,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
def _invalidate_cache_and_stream(
self,
txn: LoggingTransaction,
- cache_func: _CachedFunction,
+ cache_func: CachedFunction,
keys: Tuple[Any, ...],
) -> None:
"""Invalidates the cache and adds it to the cache stream so slaves
@@ -293,7 +305,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._send_invalidation_to_replication(txn, cache_func.__name__, keys)
def _invalidate_all_cache_and_stream(
- self, txn: LoggingTransaction, cache_func: _CachedFunction
+ self, txn: LoggingTransaction, cache_func: CachedFunction
) -> None:
"""Invalidates the entire cache and adds it to the cache stream so slaves
will know to invalidate their caches.
@@ -334,6 +346,16 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
txn, CURRENT_STATE_CACHE_NAME, [room_id]
)
+ async def send_invalidation_to_replication(
+ self, cache_name: str, keys: Optional[Collection[Any]]
+ ) -> None:
+ await self.db_pool.runInteraction(
+ "send_invalidation_to_replication",
+ self._send_invalidation_to_replication,
+ cache_name,
+ keys,
+ )
+
def _send_invalidation_to_replication(
self, txn: LoggingTransaction, cache_name: str, keys: Optional[Iterable[Any]]
) -> None:
|