diff options
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 40 |
1 files changed, 34 insertions, 6 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 63d8350530..d28fc65df9 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -67,7 +67,7 @@ from synapse.storage.database import ( make_in_list_sql_clause, ) from synapse.storage.databases.main.events_worker import EventsWorkerStore -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.util.id_generators import MultiWriterIdGenerator from synapse.types import PersistedEventPosition, RoomStreamToken from synapse.util.caches.descriptors import cached @@ -944,12 +944,40 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): room_id stream_key """ - sql = ( - "SELECT coalesce(MIN(topological_ordering), 0) FROM events" - " WHERE room_id = ? AND stream_ordering >= ?" - ) + if isinstance(self.database_engine, PostgresEngine): + min_function = "LEAST" + elif isinstance(self.database_engine, Sqlite3Engine): + min_function = "MIN" + else: + raise RuntimeError(f"Unknown database engine {self.database_engine}") + + # This query used to be + # SELECT COALESCE(MIN(topological_ordering), 0) FROM events + # WHERE room_id = ? and events.stream_ordering >= {stream_key} + # which returns 0 if the stream_key is newer than any event in + # the room. That's not wrong, but it seems to interact oddly with backfill, + # requiring a second call to /messages to actually backfill from a remote + # homeserver. + # + # Instead, rollback the stream ordering to that after the most recent event in + # this room. + sql = f""" + WITH fallback(max_stream_ordering) AS ( + SELECT MAX(stream_ordering) + FROM events + WHERE room_id = ? + ) + SELECT COALESCE(MIN(topological_ordering), 0) FROM events + WHERE + room_id = ? + AND events.stream_ordering >= {min_function}( + ?, + (SELECT max_stream_ordering FROM fallback) + ) + """ + row = await self.db_pool.execute( - "get_current_topological_token", None, sql, room_id, stream_key + "get_current_topological_token", None, sql, room_id, room_id, stream_key ) return row[0][0] if row else 0 |