diff options
author | Erik Johnston <erik@matrix.org> | 2024-07-10 15:54:46 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2024-07-10 15:54:46 +0100 |
commit | a9c2e97ed4cd82fea0fcd210ee87155c3821ab93 (patch) | |
tree | 36d6c575e2aad94ce5ffab15a4d7196093ca476b | |
parent | Fixup (diff) | |
download | synapse-a9c2e97ed4cd82fea0fcd210ee87155c3821ab93.tar.xz |
Fixup
-rw-r--r-- | synapse/handlers/sliding_sync.py | 34 | ||||
-rw-r--r-- | synapse/storage/databases/main/cache.py | 3 | ||||
-rw-r--r-- | synapse/storage/databases/main/stream.py | 23 |
3 files changed, 52 insertions, 8 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 4d56ba7b26..cd0ba7d1f6 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1012,22 +1012,40 @@ class SlidingSyncHandler: A sorted list of room IDs by `stream_ordering` along with membership information. """ - self.store._events_stream_cache._entity_to_key - - last_activity_in_room_map = {} - to_fetch = [] + # Assemble a map of room ID to the `stream_ordering` of the last activity that the + # user should see in the room (<= `to_token`) + last_activity_in_room_map: Dict[str, int] = {} for room_id, room_for_user in sync_room_map.items(): + # If they are fully-joined to the room, let's find the latest activity + # at/before the `to_token`. if room_for_user.membership == Membership.JOIN: stream_pos = self.store._events_stream_cache._entity_to_key.get(room_id) if stream_pos is not None: last_activity_in_room_map[room_id] = stream_pos - else: - to_fetch.append(room_id) + continue + + last_event_result = await self.store.get_rough_stream_ordering_for_room( + room_id + ) + + # If the room has no events at/before the `to_token`, this is probably a + # mistake in the code that generates the `sync_room_map` since that should + # only give us rooms that the user had membership in during the token range. + assert last_event_result is not None + + _, event_pos = last_event_result + + last_activity_in_room_map[room_id] = event_pos.stream else: + # Otherwise, if the user has left/been invited/knocked/been banned from + # a room, they shouldn't see anything past that point. + # + # FIXME: It's possible that people should see beyond this point in + # invited/knocked cases if for example the room has + # `invite`/`world_readable` history visibility, see + # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream - last_activity_in_room_map.update(await self.store.rough_get_last_pos(to_fetch)) - return sorted( sync_room_map.values(), # Sort by the last activity (stream_ordering) in the room diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index 2d6b75e47e..0550ab73da 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore): if not backfilled: self._events_stream_cache.entity_has_changed(room_id, stream_ordering) # type: ignore[attr-defined] + self._attempt_to_invalidate_cache( + "get_rough_stream_ordering_for_room", (room_id,) + ) if redacts: self._invalidate_local_get_event_cache(redacts) # type: ignore[attr-defined] diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 5ab0fbc5d9..d6801accab 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -68,6 +68,7 @@ from synapse.api.filtering import Filter from synapse.events import EventBase from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.logging.opentracing import trace +from synapse.replication.tcp.streams.events import EventsStream from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( DatabasePool, @@ -1214,6 +1215,28 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return results + @cached(max_entries=10000) + async def get_rough_stream_ordering_for_room( + self, + room_id: str, + ) -> Optional[int]: + def get_rough_stream_ordering_for_room_txn( + txn: LoggingTransaction, + ) -> Dict[str, int]: + sql = f""" + SELECT DISTINCT ON (room_id) room_id, stream_ordering FROM events + WHERE room_id = ? AND stream_ordering IS NOT NULL + ORDER BY room_id, stream_ordering DESC + """ + + txn.execute(sql, (room_id,)) + + return {room_id: stream_ordering for room_id, stream_ordering in txn} + + return await self.db_pool.runInteraction( + "get_rough_stream_ordering_for_room", get_rough_stream_ordering_for_room_txn + ) + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str, |