From 91f627b9a7c1912f3de20e758dc7f940781382b5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Jul 2024 16:27:11 +0100 Subject: FIXUP --- synapse/handlers/sliding_sync.py | 14 ++++++-------- synapse/storage/databases/main/stream.py | 9 +++++++-- 2 files changed, 13 insertions(+), 10 deletions(-) diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py index 501578bd3a..a0493d33db 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py @@ -1023,6 +1023,7 @@ class SlidingSyncHandler: # Assemble a map of room ID to the `stream_ordering` of the last activity that the # user should see in the room (<= `to_token`) last_activity_in_room_map: Dict[str, int] = {} + to_fetch = [] for room_id, room_for_user in sync_room_map.items(): # If they are fully-joined to the room, let's find the latest activity # at/before the `to_token`. @@ -1032,14 +1033,7 @@ class SlidingSyncHandler: last_activity_in_room_map[room_id] = stream_pos continue - stream = await self.store.get_rough_stream_ordering_for_room(room_id) - - # If the room has no events at/before the `to_token`, this is probably a - # mistake in the code that generates the `sync_room_map` since that should - # only give us rooms that the user had membership in during the token range. - assert stream is not None - - last_activity_in_room_map[room_id] = stream + to_fetch.append(room_id) else: # Otherwise, if the user has left/been invited/knocked/been banned from # a room, they shouldn't see anything past that point. @@ -1050,6 +1044,10 @@ class SlidingSyncHandler: # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream + for room_id, stream_pos in await self.store.rough_get_last_pos(to_fetch): + if stream_pos is not None: + last_activity_in_room_map[room_id] = stream_pos + return sorted( sync_room_map.values(), # Sort by the last activity (stream_ordering) in the room diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 7a4b74e0a7..f7e1a1ad1e 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -80,7 +80,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence -from synapse.util.caches.descriptors import cached +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable from synapse.util.iterutils import batch_iter @@ -1187,7 +1187,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None - async def rough_get_last_pos(self, room_ids: StrSequence) -> Dict[str, int]: + @cachedList( + cached_method_name="get_rough_stream_ordering_for_room", list_name="room_ids", + ) + async def rough_get_last_pos( + self, room_ids: StrSequence + ) -> Dict[str, Optional[int]]: def rough_get_last_pos_txn( txn: LoggingTransaction, batch: StrSequence, -- cgit 1.4.1