summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py25
1 files changed, 24 insertions, 1 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py

index e74e0d2e91..dccae56608 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -78,7 +78,7 @@ from synapse.storage.database import ( from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator -from synapse.types import PersistedEventPosition, RoomStreamToken +from synapse.types import PersistedEventPosition, RoomStreamToken, StrSequence from synapse.util.caches.descriptors import cached from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.cancellation import cancellable @@ -1185,6 +1185,29 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): return None + + async def rough_get_last_pos(self, room_ids: StrSequence) -> Dict[str, int]: + def rough_get_last_pos_Txn( + txn: LoggingTransaction, + ) -> Dict[str, int]: + clause, args = make_in_list_sql_clause(self.database_engine, "room_id", room_ids) + sql = f""" + SELECT room_id, MAX(stream_ordering) FROM events + WHERE {clause} + GROUP BY room_id + """ + + txn.execute(sql, (args,)) + + return { + room_id: stream_ordering for room_id, stream_ordering in txn + } + + return await self.db_pool.runInteraction( + "rough_get_last_pos", + rough_get_last_pos_Txn, + ) + async def get_last_event_pos_in_room_before_stream_ordering( self, room_id: str,