summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-10-19 15:04:18 +0100
committerGitHub <noreply@github.com>2023-10-19 15:04:18 +0100
commite9069c9f919685606506f04527332e83fbfa44d9 (patch)
tree5fdc1d3576a6298aa28f43a6ac83bf7cf27625da /synapse/storage
parentAvoid sending massive replication updates when purging a room. (#16510) (diff)
downloadsynapse-e9069c9f919685606506f04527332e83fbfa44d9.tar.xz
Mark sync as limited if there is a gap in the timeline (#16485)
This splits thinsg into two queries, but most of the time we won't have
new event backwards extremities so this shouldn't actually add an extra
RTT for the majority of cases.

Note this removes the check for events with no prev events, but that was
part of MSC2716 work that has since been removed.
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/events.py74
-rw-r--r--synapse/storage/databases/main/stream.py47
-rw-r--r--synapse/storage/schema/main/delta/82/05gaps.sql25
3 files changed, 121 insertions, 25 deletions
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef6766b5e0..3c1492e3ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2267,35 +2267,59 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
-        # From the events passed in, add all of the prev events as backwards extremities.
-        # Ignore any events that are already backwards extrems or outliers.
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            "   SELECT 1 FROM event_backward_extremities"
-            "   WHERE event_id = ? AND room_id = ?"
-            " )"
-            # 1. Don't add an event as a extremity again if we already persisted it
-            # as a non-outlier.
-            # 2. Don't add an outlier as an extremity if it has no prev_events
-            " AND NOT EXISTS ("
-            "   SELECT 1 FROM events"
-            "   LEFT JOIN event_edges edge"
-            "   ON edge.event_id = events.event_id"
-            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
-            " )"
+
+        room_id = events[0].room_id
+
+        potential_backwards_extremities = {
+            e_id
+            for ev in events
+            for e_id in ev.prev_event_ids()
+            if not ev.internal_metadata.is_outlier()
+        }
+
+        if not potential_backwards_extremities:
+            return
+
+        existing_events_outliers = self.db_pool.simple_select_many_txn(
+            txn,
+            table="events",
+            column="event_id",
+            iterable=potential_backwards_extremities,
+            keyvalues={"outlier": False},
+            retcols=("event_id",),
         )
 
-        txn.execute_batch(
-            query,
-            [
-                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
-                for ev in events
-                for e_id in ev.prev_event_ids()
-                if not ev.internal_metadata.is_outlier()
-            ],
+        potential_backwards_extremities.difference_update(
+            e for e, in existing_events_outliers
         )
 
+        if potential_backwards_extremities:
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="event_backward_extremities",
+                key_names=("room_id", "event_id"),
+                key_values=[(room_id, ev) for ev in potential_backwards_extremities],
+                value_names=(),
+                value_values=(),
+            )
+
+            # Record the stream orderings where we have new gaps.
+            gap_events = [
+                (room_id, self._instance_name, ev.internal_metadata.stream_ordering)
+                for ev in events
+                if any(
+                    e_id in potential_backwards_extremities
+                    for e_id in ev.prev_event_ids()
+                )
+            ]
+
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="timeline_gaps",
+                keys=("room_id", "instance_name", "stream_ordering"),
+                values=gap_events,
+            )
+
         # Delete all these events that we've already fetched and now know that their
         # prev events are the new backwards extremeties.
         query = (
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
+
+    async def get_timeline_gaps(
+        self,
+        room_id: str,
+        from_token: Optional[RoomStreamToken],
+        to_token: RoomStreamToken,
+    ) -> Optional[RoomStreamToken]:
+        """Check if there is a gap, and return a token that marks the position
+        of the gap in the stream.
+        """
+
+        sql = """
+            SELECT instance_name, stream_ordering
+            FROM timeline_gaps
+            WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
+            ORDER BY stream_ordering
+        """
+
+        rows = await self.db_pool.execute(
+            "get_timeline_gaps",
+            None,
+            sql,
+            room_id,
+            from_token.stream if from_token else 0,
+            to_token.get_max_stream_pos(),
+        )
+
+        if not rows:
+            return None
+
+        positions = [
+            PersistedEventPosition(instance_name, stream_ordering)
+            for instance_name, stream_ordering in rows
+        ]
+        if from_token:
+            positions = [p for p in positions if p.persisted_after(from_token)]
+
+        positions = [p for p in positions if not p.persisted_after(to_token)]
+
+        if positions:
+            # We return a stream token that ensures the event *at* the position
+            # of the gap is included (as the gap is *before* the persisted
+            # event).
+            last_position = positions[-1]
+            return RoomStreamToken(stream=last_position.stream - 1)
+
+        return None
diff --git a/synapse/storage/schema/main/delta/82/05gaps.sql b/synapse/storage/schema/main/delta/82/05gaps.sql
new file mode 100644
index 0000000000..6813b488ca
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/05gaps.sql
@@ -0,0 +1,25 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Records when we see a "gap in the timeline", due to missing events over
+-- federation. We record this so that we can tell clients there is a gap (by
+-- marking the timeline section of a sync request as limited).
+CREATE TABLE IF NOT EXISTS timeline_gaps (
+    room_id TEXT NOT NULL,
+    instance_name TEXT NOT NULL,
+    stream_ordering BIGINT NOT NULL
+);
+
+CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);