diff options
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r-- | synapse/storage/databases/main/stream.py | 47 |
1 files changed, 47 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index ea06e4eee0..872df6bda1 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcol="instance_name", desc="get_name_from_instance_id", ) + + async def get_timeline_gaps( + self, + room_id: str, + from_token: Optional[RoomStreamToken], + to_token: RoomStreamToken, + ) -> Optional[RoomStreamToken]: + """Check if there is a gap, and return a token that marks the position + of the gap in the stream. + """ + + sql = """ + SELECT instance_name, stream_ordering + FROM timeline_gaps + WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ? + ORDER BY stream_ordering + """ + + rows = await self.db_pool.execute( + "get_timeline_gaps", + None, + sql, + room_id, + from_token.stream if from_token else 0, + to_token.get_max_stream_pos(), + ) + + if not rows: + return None + + positions = [ + PersistedEventPosition(instance_name, stream_ordering) + for instance_name, stream_ordering in rows + ] + if from_token: + positions = [p for p in positions if p.persisted_after(from_token)] + + positions = [p for p in positions if not p.persisted_after(to_token)] + + if positions: + # We return a stream token that ensures the event *at* the position + # of the gap is included (as the gap is *before* the persisted + # event). + last_position = positions[-1] + return RoomStreamToken(stream=last_position.stream - 1) + + return None |