summary refs log tree commit diff
path: root/synapse/storage/databases/main/stream.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/stream.py')
-rw-r--r--synapse/storage/databases/main/stream.py47
1 files changed, 47 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
+
+    async def get_timeline_gaps(
+        self,
+        room_id: str,
+        from_token: Optional[RoomStreamToken],
+        to_token: RoomStreamToken,
+    ) -> Optional[RoomStreamToken]:
+        """Check if there is a gap, and return a token that marks the position
+        of the gap in the stream.
+        """
+
+        sql = """
+            SELECT instance_name, stream_ordering
+            FROM timeline_gaps
+            WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
+            ORDER BY stream_ordering
+        """
+
+        rows = await self.db_pool.execute(
+            "get_timeline_gaps",
+            None,
+            sql,
+            room_id,
+            from_token.stream if from_token else 0,
+            to_token.get_max_stream_pos(),
+        )
+
+        if not rows:
+            return None
+
+        positions = [
+            PersistedEventPosition(instance_name, stream_ordering)
+            for instance_name, stream_ordering in rows
+        ]
+        if from_token:
+            positions = [p for p in positions if p.persisted_after(from_token)]
+
+        positions = [p for p in positions if not p.persisted_after(to_token)]
+
+        if positions:
+            # We return a stream token that ensures the event *at* the position
+            # of the gap is included (as the gap is *before* the persisted
+            # event).
+            last_position = positions[-1]
+            return RoomStreamToken(stream=last_position.stream - 1)
+
+        return None