diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e74e0d2e91..dccae56608 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -78,7 +78,7 @@ from synapse.storage.database import (
from synapse.storage.databases.main.events_worker import EventsWorkerStore
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken
+from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence
from synapse.util.caches.descriptors import cached
from synapse.util.caches.stream_change_cache import StreamChangeCache
from synapse.util.cancellation import cancellable
@@ -1185,6 +1185,29 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
return None
+
+ async def rough_get_last_pos(self, room_ids: StrSequence) -> Dict[str, int]:
+ def rough_get_last_pos_Txn(
+ txn: LoggingTransaction,
+ ) -> Dict[str, int]:
+ clause, args = make_in_list_sql_clause(self.database_engine, "room_id", room_ids)
+ sql = f"""
+ SELECT room_id, MAX(stream_ordering) FROM events
+ WHERE {clause}
+ GROUP BY room_id
+ """
+
+ txn.execute(sql, (args,))
+
+ return {
+ room_id: stream_ordering for room_id, stream_ordering in txn
+ }
+
+ return await self.db_pool.runInteraction(
+ "rough_get_last_pos",
+ rough_get_last_pos_Txn,
+ )
+
async def get_last_event_pos_in_room_before_stream_ordering(
self,
room_id: str,
|