diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 459436e304..94a7efee73 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1524,7 +1524,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
# majority of rooms will have a latest token from before the min stream
# pos.
- def bulk_get_max_event_pos_txn(
+ def bulk_get_max_event_pos_fallback_txn(
txn: LoggingTransaction, batched_room_ids: StrCollection
) -> Dict[str, int]:
clause, args = make_in_list_sql_clause(
@@ -1547,11 +1547,37 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
txn.execute(sql, [max_pos] + args)
return {row[0]: row[1] for row in txn}
+ # It's easier to look at the `sliding_sync_joined_rooms` table and avoid all of
+ # the joins and sub-queries.
+ def bulk_get_max_event_pos_from_sliding_sync_tables_txn(
+ txn: LoggingTransaction, batched_room_ids: StrCollection
+ ) -> Dict[str, int]:
+ clause, args = make_in_list_sql_clause(
+ self.database_engine, "room_id", batched_room_ids
+ )
+ sql = f"""
+ SELECT room_id, event_stream_ordering
+ FROM sliding_sync_joined_rooms
+ WHERE {clause}
+ ORDER BY event_stream_ordering DESC
+ """
+ txn.execute(sql, args)
+ return {row[0]: row[1] for row in txn}
+
recheck_rooms: Set[str] = set()
for batched in batch_iter(room_ids, 1000):
- batch_results = await self.db_pool.runInteraction(
- "_bulk_get_max_event_pos", bulk_get_max_event_pos_txn, batched
- )
+ if await self.have_finished_sliding_sync_background_jobs():
+ batch_results = await self.db_pool.runInteraction(
+ "bulk_get_max_event_pos_from_sliding_sync_tables_txn",
+ bulk_get_max_event_pos_from_sliding_sync_tables_txn,
+ batched,
+ )
+ else:
+ batch_results = await self.db_pool.runInteraction(
+ "bulk_get_max_event_pos_fallback_txn",
+ bulk_get_max_event_pos_fallback_txn,
+ batched,
+ )
for room_id, stream_ordering in batch_results.items():
if stream_ordering <= now_token.stream:
results.update(batch_results)
|