summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2023-10-23 09:12:42 -0400
committerGitHub <noreply@github.com>2023-10-23 09:12:42 -0400
commit91e65700bdb19ff14f3a8b995f340d5a019b0495 (patch)
tree1e03f4659b36c21fbb47dcc6aab3426c88e74f59 /synapse/storage/databases/main/stream.py
parentDocumentation. (diff)
parentMention how to redirect the Jaeger traces to a specific Jaeger instance (#16531) (diff)
downloadsynapse-github/clokep/db-upgrades.tar.xz
Merge branch 'develop' into clokep/db-upgrades github/clokep/db-upgrades clokep/db-upgrades
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py

index ea06e4eee0..872df6bda1 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): retcol="instance_name", desc="get_name_from_instance_id", ) + + async def get_timeline_gaps( + self, + room_id: str, + from_token: Optional[RoomStreamToken], + to_token: RoomStreamToken, + ) -> Optional[RoomStreamToken]: + """Check if there is a gap, and return a token that marks the position + of the gap in the stream. + """ + + sql = """ + SELECT instance_name, stream_ordering + FROM timeline_gaps + WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ? + ORDER BY stream_ordering + """ + + rows = await self.db_pool.execute( + "get_timeline_gaps", + None, + sql, + room_id, + from_token.stream if from_token else 0, + to_token.get_max_stream_pos(), + ) + + if not rows: + return None + + positions = [ + PersistedEventPosition(instance_name, stream_ordering) + for instance_name, stream_ordering in rows + ] + if from_token: + positions = [p for p in positions if p.persisted_after(from_token)] + + positions = [p for p in positions if not p.persisted_after(to_token)] + + if positions: + # We return a stream token that ensures the event *at* the position + # of the gap is included (as the gap is *before* the persisted + # event). + last_position = positions[-1] + return RoomStreamToken(stream=last_position.stream - 1) + + return None