summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-10-19 15:04:18 +0100
committerGitHub <noreply@github.com>2023-10-19 15:04:18 +0100
commite9069c9f919685606506f04527332e83fbfa44d9 (patch)
tree5fdc1d3576a6298aa28f43a6ac83bf7cf27625da /synapse
parentAvoid sending massive replication updates when purging a room. (#16510) (diff)
downloadsynapse-e9069c9f919685606506f04527332e83fbfa44d9.tar.xz
Mark sync as limited if there is a gap in the timeline (#16485)
This splits thinsg into two queries, but most of the time we won't have
new event backwards extremities so this shouldn't actually add an extra
RTT for the majority of cases.

Note this removes the check for events with no prev events, but that was
part of MSC2716 work that has since been removed.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py52
-rw-r--r--synapse/storage/databases/main/events.py74
-rw-r--r--synapse/storage/databases/main/stream.py47
-rw-r--r--synapse/storage/schema/main/delta/82/05gaps.sql25
4 files changed, 165 insertions, 33 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60b4d95cd7..f131c0e8e0 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -500,12 +500,27 @@ class SyncHandler:
     async def _load_filtered_recents(
         self,
         room_id: str,
+        sync_result_builder: "SyncResultBuilder",
         sync_config: SyncConfig,
-        now_token: StreamToken,
+        upto_token: StreamToken,
         since_token: Optional[StreamToken] = None,
         potential_recents: Optional[List[EventBase]] = None,
         newly_joined_room: bool = False,
     ) -> TimelineBatch:
+        """Create a timeline batch for the room
+
+        Args:
+            room_id
+            sync_result_builder
+            sync_config
+            upto_token: The token up to which we should fetch (more) events.
+                If `potential_results` is non-empty then this is *start* of
+                the the list.
+            since_token
+            potential_recents: If non-empty, the events between the since token
+                and current token to send down to clients.
+            newly_joined_room
+        """
         with Measure(self.clock, "load_filtered_recents"):
             timeline_limit = sync_config.filter_collection.timeline_limit()
             block_all_timeline = (
@@ -521,6 +536,20 @@ class SyncHandler:
             else:
                 limited = False
 
+            # Check if there is a gap, if so we need to mark this as limited and
+            # recalculate which events to send down.
+            gap_token = await self.store.get_timeline_gaps(
+                room_id,
+                since_token.room_key if since_token else None,
+                sync_result_builder.now_token.room_key,
+            )
+            if gap_token:
+                # There's a gap, so we need to ignore the passed in
+                # `potential_recents`, and reset `upto_token` to match.
+                potential_recents = None
+                upto_token = sync_result_builder.now_token
+                limited = True
+
             log_kv({"limited": limited})
 
             if potential_recents:
@@ -559,10 +588,10 @@ class SyncHandler:
                 recents = []
 
             if not limited or block_all_timeline:
-                prev_batch_token = now_token
+                prev_batch_token = upto_token
                 if recents:
                     room_key = recents[0].internal_metadata.before
-                    prev_batch_token = now_token.copy_and_replace(
+                    prev_batch_token = upto_token.copy_and_replace(
                         StreamKeyType.ROOM, room_key
                     )
 
@@ -573,11 +602,15 @@ class SyncHandler:
             filtering_factor = 2
             load_limit = max(timeline_limit * filtering_factor, 10)
             max_repeat = 5  # Only try a few times per room, otherwise
-            room_key = now_token.room_key
+            room_key = upto_token.room_key
             end_key = room_key
 
             since_key = None
-            if since_token and not newly_joined_room:
+            if since_token and gap_token:
+                # If there is a gap then we need to only include events after
+                # it.
+                since_key = gap_token
+            elif since_token and not newly_joined_room:
                 since_key = since_token.room_key
 
             while limited and len(recents) < timeline_limit and max_repeat:
@@ -647,7 +680,7 @@ class SyncHandler:
                 recents = recents[-timeline_limit:]
                 room_key = recents[0].internal_metadata.before
 
-            prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
+            prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
 
         # Don't bother to bundle aggregations if the timeline is unlimited,
         # as clients will have all the necessary information.
@@ -662,7 +695,9 @@ class SyncHandler:
         return TimelineBatch(
             events=recents,
             prev_batch=prev_batch_token,
-            limited=limited or newly_joined_room,
+            # Also mark as limited if this is a new room or there has been a gap
+            # (to force client to paginate the gap).
+            limited=limited or newly_joined_room or gap_token is not None,
             bundled_aggregations=bundled_aggregations,
         )
 
@@ -2397,8 +2432,9 @@ class SyncHandler:
 
             batch = await self._load_filtered_recents(
                 room_id,
+                sync_result_builder,
                 sync_config,
-                now_token=upto_token,
+                upto_token=upto_token,
                 since_token=since_token,
                 potential_recents=events,
                 newly_joined_room=newly_joined,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef6766b5e0..3c1492e3ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2267,35 +2267,59 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
-        # From the events passed in, add all of the prev events as backwards extremities.
-        # Ignore any events that are already backwards extrems or outliers.
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            "   SELECT 1 FROM event_backward_extremities"
-            "   WHERE event_id = ? AND room_id = ?"
-            " )"
-            # 1. Don't add an event as a extremity again if we already persisted it
-            # as a non-outlier.
-            # 2. Don't add an outlier as an extremity if it has no prev_events
-            " AND NOT EXISTS ("
-            "   SELECT 1 FROM events"
-            "   LEFT JOIN event_edges edge"
-            "   ON edge.event_id = events.event_id"
-            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
-            " )"
+
+        room_id = events[0].room_id
+
+        potential_backwards_extremities = {
+            e_id
+            for ev in events
+            for e_id in ev.prev_event_ids()
+            if not ev.internal_metadata.is_outlier()
+        }
+
+        if not potential_backwards_extremities:
+            return
+
+        existing_events_outliers = self.db_pool.simple_select_many_txn(
+            txn,
+            table="events",
+            column="event_id",
+            iterable=potential_backwards_extremities,
+            keyvalues={"outlier": False},
+            retcols=("event_id",),
         )
 
-        txn.execute_batch(
-            query,
-            [
-                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
-                for ev in events
-                for e_id in ev.prev_event_ids()
-                if not ev.internal_metadata.is_outlier()
-            ],
+        potential_backwards_extremities.difference_update(
+            e for e, in existing_events_outliers
         )
 
+        if potential_backwards_extremities:
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="event_backward_extremities",
+                key_names=("room_id", "event_id"),
+                key_values=[(room_id, ev) for ev in potential_backwards_extremities],
+                value_names=(),
+                value_values=(),
+            )
+
+            # Record the stream orderings where we have new gaps.
+            gap_events = [
+                (room_id, self._instance_name, ev.internal_metadata.stream_ordering)
+                for ev in events
+                if any(
+                    e_id in potential_backwards_extremities
+                    for e_id in ev.prev_event_ids()
+                )
+            ]
+
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="timeline_gaps",
+                keys=("room_id", "instance_name", "stream_ordering"),
+                values=gap_events,
+            )
+
         # Delete all these events that we've already fetched and now know that their
         # prev events are the new backwards extremeties.
         query = (
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
+
+    async def get_timeline_gaps(
+        self,
+        room_id: str,
+        from_token: Optional[RoomStreamToken],
+        to_token: RoomStreamToken,
+    ) -> Optional[RoomStreamToken]:
+        """Check if there is a gap, and return a token that marks the position
+        of the gap in the stream.
+        """
+
+        sql = """
+            SELECT instance_name, stream_ordering
+            FROM timeline_gaps
+            WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
+            ORDER BY stream_ordering
+        """
+
+        rows = await self.db_pool.execute(
+            "get_timeline_gaps",
+            None,
+            sql,
+            room_id,
+            from_token.stream if from_token else 0,
+            to_token.get_max_stream_pos(),
+        )
+
+        if not rows:
+            return None
+
+        positions = [
+            PersistedEventPosition(instance_name, stream_ordering)
+            for instance_name, stream_ordering in rows
+        ]
+        if from_token:
+            positions = [p for p in positions if p.persisted_after(from_token)]
+
+        positions = [p for p in positions if not p.persisted_after(to_token)]
+
+        if positions:
+            # We return a stream token that ensures the event *at* the position
+            # of the gap is included (as the gap is *before* the persisted
+            # event).
+            last_position = positions[-1]
+            return RoomStreamToken(stream=last_position.stream - 1)
+
+        return None
diff --git a/synapse/storage/schema/main/delta/82/05gaps.sql b/synapse/storage/schema/main/delta/82/05gaps.sql
new file mode 100644
index 0000000000..6813b488ca
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/05gaps.sql
@@ -0,0 +1,25 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Records when we see a "gap in the timeline", due to missing events over
+-- federation. We record this so that we can tell clients there is a gap (by
+-- marking the timeline section of a sync request as limited).
+CREATE TABLE IF NOT EXISTS timeline_gaps (
+    room_id TEXT NOT NULL,
+    instance_name TEXT NOT NULL,
+    stream_ordering BIGINT NOT NULL
+);
+
+CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);