summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/events.py74
-rw-r--r--synapse/storage/databases/main/stream.py47
2 files changed, 96 insertions, 25 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef6766b5e0..3c1492e3ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2267,35 +2267,59 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
-        # From the events passed in, add all of the prev events as backwards extremities.
-        # Ignore any events that are already backwards extrems or outliers.
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            "   SELECT 1 FROM event_backward_extremities"
-            "   WHERE event_id = ? AND room_id = ?"
-            " )"
-            # 1. Don't add an event as a extremity again if we already persisted it
-            # as a non-outlier.
-            # 2. Don't add an outlier as an extremity if it has no prev_events
-            " AND NOT EXISTS ("
-            "   SELECT 1 FROM events"
-            "   LEFT JOIN event_edges edge"
-            "   ON edge.event_id = events.event_id"
-            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
-            " )"
+
+        room_id = events[0].room_id
+
+        potential_backwards_extremities = {
+            e_id
+            for ev in events
+            for e_id in ev.prev_event_ids()
+            if not ev.internal_metadata.is_outlier()
+        }
+
+        if not potential_backwards_extremities:
+            return
+
+        existing_events_outliers = self.db_pool.simple_select_many_txn(
+            txn,
+            table="events",
+            column="event_id",
+            iterable=potential_backwards_extremities,
+            keyvalues={"outlier": False},
+            retcols=("event_id",),
         )
 
-        txn.execute_batch(
-            query,
-            [
-                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
-                for ev in events
-                for e_id in ev.prev_event_ids()
-                if not ev.internal_metadata.is_outlier()
-            ],
+        potential_backwards_extremities.difference_update(
+            e for e, in existing_events_outliers
         )
 
+        if potential_backwards_extremities:
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="event_backward_extremities",
+                key_names=("room_id", "event_id"),
+                key_values=[(room_id, ev) for ev in potential_backwards_extremities],
+                value_names=(),
+                value_values=(),
+            )
+
+            # Record the stream orderings where we have new gaps.
+            gap_events = [
+                (room_id, self._instance_name, ev.internal_metadata.stream_ordering)
+                for ev in events
+                if any(
+                    e_id in potential_backwards_extremities
+                    for e_id in ev.prev_event_ids()
+                )
+            ]
+
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="timeline_gaps",
+                keys=("room_id", "instance_name", "stream_ordering"),
+                values=gap_events,
+            )
+
         # Delete all these events that we've already fetched and now know that their
         # prev events are the new backwards extremeties.
         query = (
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
+
+    async def get_timeline_gaps(
+        self,
+        room_id: str,
+        from_token: Optional[RoomStreamToken],
+        to_token: RoomStreamToken,
+    ) -> Optional[RoomStreamToken]:
+        """Check if there is a gap, and return a token that marks the position
+        of the gap in the stream.
+        """
+
+        sql = """
+            SELECT instance_name, stream_ordering
+            FROM timeline_gaps
+            WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
+            ORDER BY stream_ordering
+        """
+
+        rows = await self.db_pool.execute(
+            "get_timeline_gaps",
+            None,
+            sql,
+            room_id,
+            from_token.stream if from_token else 0,
+            to_token.get_max_stream_pos(),
+        )
+
+        if not rows:
+            return None
+
+        positions = [
+            PersistedEventPosition(instance_name, stream_ordering)
+            for instance_name, stream_ordering in rows
+        ]
+        if from_token:
+            positions = [p for p in positions if p.persisted_after(from_token)]
+
+        positions = [p for p in positions if not p.persisted_after(to_token)]
+
+        if positions:
+            # We return a stream token that ensures the event *at* the position
+            # of the gap is included (as the gap is *before* the persisted
+            # event).
+            last_position = positions[-1]
+            return RoomStreamToken(stream=last_position.stream - 1)
+
+        return None