diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index e14d711c76..7251e72e3a 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -86,7 +86,9 @@ class SQLBaseStore(metaclass=ABCMeta):
"""
def _invalidate_state_caches(
- self, room_id: str, members_changed: Collection[str]
+ self,
+ room_id: str,
+ members_changed: Collection[str],
) -> None:
"""Invalidates caches that are based on the current state, but does
not stream invalidations down replication.
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 707d18de78..f364464c23 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -219,6 +219,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
room_id = row.keys[0]
members_changed = set(row.keys[1:])
self._invalidate_state_caches(room_id, members_changed)
+ self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
+ room_id, token
+ )
+ for user_id in members_changed:
+ self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined]
elif row.cache_func == PURGE_HISTORY_CACHE_NAME:
if row.keys is None:
raise Exception(
@@ -236,6 +241,35 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
room_id = row.keys[0]
self._invalidate_caches_for_room_events(room_id)
self._invalidate_caches_for_room(room_id)
+ self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined]
+ room_id, token
+ )
+ # Note: This code is commented out to improve cache performance.
+ # While uncommenting would provide complete correctness, our
+ # automatic forgotten room purge logic (see
+ # `forgotten_room_retention_period`) means this would frequently
+ # clear the entire cache (effectively) and probably have a noticable
+ # impact on the cache hit ratio.
+ #
+ # Not updating the cache here is safe because:
+ #
+ # 1. `_membership_stream_cache` is only used to indicate the
+ # *absence* of changes, i.e. "nothing has changed between tokens
+ # X and Y and so return early and don't query the database".
+ # 2. `_membership_stream_cache` is used when we query data from
+ # `current_state_delta_stream` and `room_memberships` but since
+ # nothing new is written to the database for those tables when
+ # purging/deleting a room (only deleting rows), there is nothing
+ # changed to care about.
+ #
+ # At worst, the cache might indicate a change at token X, at which
+ # point, we will query the database and discover nothing is there.
+ #
+ # Ideally, we would make it so that we could clear the cache on a
+ # more granular level but that's a bit complex and fiddly to do with
+ # room membership.
+ #
+ # self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
else:
self._attempt_to_invalidate_cache(row.cache_func, row.keys)
@@ -275,6 +309,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
self._attempt_to_invalidate_cache(
"get_sliding_sync_rooms_for_user", None
)
+ self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined]
elif data.type == EventTypes.RoomEncryption:
self._attempt_to_invalidate_cache(
"get_room_encryption", (data.room_id,)
@@ -291,6 +326,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
# Similar to the above, but the entire caches are invalidated. This is
# unfortunate for the membership caches, but should recover quickly.
self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined]
+ self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined]
self._attempt_to_invalidate_cache("get_rooms_for_user", None)
self._attempt_to_invalidate_cache("get_room_type", (data.room_id,))
self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,))
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index dd6ac909e9..a23aaf5096 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -1605,7 +1605,13 @@ class PersistEventsStore:
room_id
delta_state: Deltas that are going to be used to update the
`current_state_events` table. Changes to the current state of the room.
- stream_id: TODO
+ stream_id: This is expected to be the minimum `stream_ordering` for the
+ batch of events that we are persisting; which means we do not end up in a
+ situation where workers see events before the `current_state_delta` updates.
+ FIXME: However, this function also gets called with next upcoming
+ `stream_ordering` when we re-sync the state of a partial stated room (see
+ `update_current_state(...)`) which may be "correct" but it would be good to
+ nail down what exactly is the expected value here.
sliding_sync_table_changes: Changes to the
`sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables
derived from the given `delta_state` (see
@@ -1908,6 +1914,13 @@ class PersistEventsStore:
stream_id,
)
+ for user_id in members_to_cache_bust:
+ txn.call_after(
+ self.store._membership_stream_cache.entity_has_changed,
+ user_id,
+ stream_id,
+ )
+
# Invalidate the various caches
self.store._invalidate_state_caches_and_stream(
txn, room_id, members_to_cache_bust
|