diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 501578bd3a..a0493d33db 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -1023,6 +1023,7 @@ class SlidingSyncHandler:
# Assemble a map of room ID to the `stream_ordering` of the last activity that the
# user should see in the room (<= `to_token`)
last_activity_in_room_map: Dict[str, int] = {}
+ to_fetch = []
for room_id, room_for_user in sync_room_map.items():
# If they are fully-joined to the room, let's find the latest activity
# at/before the `to_token`.
@@ -1032,14 +1033,7 @@ class SlidingSyncHandler:
last_activity_in_room_map[room_id] = stream_pos
continue
- stream = await self.store.get_rough_stream_ordering_for_room(room_id)
-
- # If the room has no events at/before the `to_token`, this is probably a
- # mistake in the code that generates the `sync_room_map` since that should
- # only give us rooms that the user had membership in during the token range.
- assert stream is not None
-
- last_activity_in_room_map[room_id] = stream
+ to_fetch.append(room_id)
else:
# Otherwise, if the user has left/been invited/knocked/been banned from
# a room, they shouldn't see anything past that point.
@@ -1050,6 +1044,10 @@ class SlidingSyncHandler:
# https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
+ for room_id, stream_pos in await self.store.rough_get_last_pos(to_fetch):
+ if stream_pos is not None:
+ last_activity_in_room_map[room_id] = stream_pos
+
return sorted(
sync_room_map.values(),
# Sort by the last activity (stream_ordering) in the room
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 7a4b74e0a7..f7e1a1ad1e 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -80,7 +80,7 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
from synapse.util.iterutils import batch_iter
@@ -1187,7 +1187,12 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
- async def rough_get_last_pos(self, room_ids: StrSequence) -> Dict[str, int]:
+ @cachedList(
+ cached_method_name="get_rough_stream_ordering_for_room", list_name="room_ids",
+ )
+ async def rough_get_last_pos(
+ self, room_ids: StrSequence
+ ) -> Dict[str, Optional[int]]:
def rough_get_last_pos_txn(
txn: LoggingTransaction,
batch: StrSequence,
|