summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/_base.py4
-rw-r--r--synapse/storage/databases/main/cache.py36
-rw-r--r--synapse/storage/databases/main/events.py15
3 files changed, 53 insertions, 2 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py

index e14d711c76..7251e72e3a 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py
@@ -86,7 +86,9 @@ class SQLBaseStore(metaclass=ABCMeta): """ def _invalidate_state_caches( - self, room_id: str, members_changed: Collection[str] + self, + room_id: str, + members_changed: Collection[str], ) -> None: """Invalidates caches that are based on the current state, but does not stream invalidations down replication. diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 707d18de78..f364464c23 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py
@@ -219,6 +219,11 @@ class CacheInvalidationWorkerStore(SQLBaseStore): room_id = row.keys[0] members_changed = set(row.keys[1:]) self._invalidate_state_caches(room_id, members_changed) + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] + room_id, token + ) + for user_id in members_changed: + self._membership_stream_cache.entity_has_changed(user_id, token) # type: ignore[attr-defined] elif row.cache_func == PURGE_HISTORY_CACHE_NAME: if row.keys is None: raise Exception( @@ -236,6 +241,35 @@ class CacheInvalidationWorkerStore(SQLBaseStore): room_id = row.keys[0] self._invalidate_caches_for_room_events(room_id) self._invalidate_caches_for_room(room_id) + self._curr_state_delta_stream_cache.entity_has_changed( # type: ignore[attr-defined] + room_id, token + ) + # Note: This code is commented out to improve cache performance. + # While uncommenting would provide complete correctness, our + # automatic forgotten room purge logic (see + # `forgotten_room_retention_period`) means this would frequently + # clear the entire cache (effectively) and probably have a noticable + # impact on the cache hit ratio. + # + # Not updating the cache here is safe because: + # + # 1. `_membership_stream_cache` is only used to indicate the + # *absence* of changes, i.e. "nothing has changed between tokens + # X and Y and so return early and don't query the database". + # 2. `_membership_stream_cache` is used when we query data from + # `current_state_delta_stream` and `room_memberships` but since + # nothing new is written to the database for those tables when + # purging/deleting a room (only deleting rows), there is nothing + # changed to care about. + # + # At worst, the cache might indicate a change at token X, at which + # point, we will query the database and discover nothing is there. + # + # Ideally, we would make it so that we could clear the cache on a + # more granular level but that's a bit complex and fiddly to do with + # room membership. + # + # self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] else: self._attempt_to_invalidate_cache(row.cache_func, row.keys) @@ -275,6 +309,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): self._attempt_to_invalidate_cache( "get_sliding_sync_rooms_for_user", None ) + self._membership_stream_cache.entity_has_changed(data.state_key, token) # type: ignore[attr-defined] elif data.type == EventTypes.RoomEncryption: self._attempt_to_invalidate_cache( "get_room_encryption", (data.room_id,) @@ -291,6 +326,7 @@ class CacheInvalidationWorkerStore(SQLBaseStore): # Similar to the above, but the entire caches are invalidated. This is # unfortunate for the membership caches, but should recover quickly. self._curr_state_delta_stream_cache.entity_has_changed(data.room_id, token) # type: ignore[attr-defined] + self._membership_stream_cache.all_entities_changed(token) # type: ignore[attr-defined] self._attempt_to_invalidate_cache("get_rooms_for_user", None) self._attempt_to_invalidate_cache("get_room_type", (data.room_id,)) self._attempt_to_invalidate_cache("get_room_encryption", (data.room_id,)) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index dd6ac909e9..a23aaf5096 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py
@@ -1605,7 +1605,13 @@ class PersistEventsStore: room_id delta_state: Deltas that are going to be used to update the `current_state_events` table. Changes to the current state of the room. - stream_id: TODO + stream_id: This is expected to be the minimum `stream_ordering` for the + batch of events that we are persisting; which means we do not end up in a + situation where workers see events before the `current_state_delta` updates. + FIXME: However, this function also gets called with next upcoming + `stream_ordering` when we re-sync the state of a partial stated room (see + `update_current_state(...)`) which may be "correct" but it would be good to + nail down what exactly is the expected value here. sliding_sync_table_changes: Changes to the `sliding_sync_membership_snapshots` and `sliding_sync_joined_rooms` tables derived from the given `delta_state` (see @@ -1908,6 +1914,13 @@ class PersistEventsStore: stream_id, ) + for user_id in members_to_cache_bust: + txn.call_after( + self.store._membership_stream_cache.entity_has_changed, + user_id, + stream_id, + ) + # Invalidate the various caches self.store._invalidate_state_caches_and_stream( txn, room_id, members_to_cache_bust