summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py52
-rw-r--r--synapse/storage/databases/main/events.py74
-rw-r--r--synapse/storage/databases/main/stream.py47
-rw-r--r--synapse/storage/schema/main/delta/82/05gaps.sql25
4 files changed, 165 insertions, 33 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60b4d95cd7..f131c0e8e0 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -500,12 +500,27 @@ class SyncHandler:
     async def _load_filtered_recents(
         self,
         room_id: str,
+        sync_result_builder: "SyncResultBuilder",
         sync_config: SyncConfig,
-        now_token: StreamToken,
+        upto_token: StreamToken,
         since_token: Optional[StreamToken] = None,
         potential_recents: Optional[List[EventBase]] = None,
         newly_joined_room: bool = False,
     ) -> TimelineBatch:
+        """Create a timeline batch for the room
+
+        Args:
+            room_id
+            sync_result_builder
+            sync_config
+            upto_token: The token up to which we should fetch (more) events.
+                If `potential_results` is non-empty then this is *start* of
+                the the list.
+            since_token
+            potential_recents: If non-empty, the events between the since token
+                and current token to send down to clients.
+            newly_joined_room
+        """
         with Measure(self.clock, "load_filtered_recents"):
             timeline_limit = sync_config.filter_collection.timeline_limit()
             block_all_timeline = (
@@ -521,6 +536,20 @@ class SyncHandler:
             else:
                 limited = False
 
+            # Check if there is a gap, if so we need to mark this as limited and
+            # recalculate which events to send down.
+            gap_token = await self.store.get_timeline_gaps(
+                room_id,
+                since_token.room_key if since_token else None,
+                sync_result_builder.now_token.room_key,
+            )
+            if gap_token:
+                # There's a gap, so we need to ignore the passed in
+                # `potential_recents`, and reset `upto_token` to match.
+                potential_recents = None
+                upto_token = sync_result_builder.now_token
+                limited = True
+
             log_kv({"limited": limited})
 
             if potential_recents:
@@ -559,10 +588,10 @@ class SyncHandler:
                 recents = []
 
             if not limited or block_all_timeline:
-                prev_batch_token = now_token
+                prev_batch_token = upto_token
                 if recents:
                     room_key = recents[0].internal_metadata.before
-                    prev_batch_token = now_token.copy_and_replace(
+                    prev_batch_token = upto_token.copy_and_replace(
                         StreamKeyType.ROOM, room_key
                     )
 
@@ -573,11 +602,15 @@ class SyncHandler:
             filtering_factor = 2
             load_limit = max(timeline_limit * filtering_factor, 10)
             max_repeat = 5  # Only try a few times per room, otherwise
-            room_key = now_token.room_key
+            room_key = upto_token.room_key
             end_key = room_key
 
             since_key = None
-            if since_token and not newly_joined_room:
+            if since_token and gap_token:
+                # If there is a gap then we need to only include events after
+                # it.
+                since_key = gap_token
+            elif since_token and not newly_joined_room:
                 since_key = since_token.room_key
 
             while limited and len(recents) < timeline_limit and max_repeat:
@@ -647,7 +680,7 @@ class SyncHandler:
                 recents = recents[-timeline_limit:]
                 room_key = recents[0].internal_metadata.before
 
-            prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
+            prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
 
         # Don't bother to bundle aggregations if the timeline is unlimited,
         # as clients will have all the necessary information.
@@ -662,7 +695,9 @@ class SyncHandler:
         return TimelineBatch(
             events=recents,
             prev_batch=prev_batch_token,
-            limited=limited or newly_joined_room,
+            # Also mark as limited if this is a new room or there has been a gap
+            # (to force client to paginate the gap).
+            limited=limited or newly_joined_room or gap_token is not None,
             bundled_aggregations=bundled_aggregations,
         )
 
@@ -2397,8 +2432,9 @@ class SyncHandler:
 
             batch = await self._load_filtered_recents(
                 room_id,
+                sync_result_builder,
                 sync_config,
-                now_token=upto_token,
+                upto_token=upto_token,
                 since_token=since_token,
                 potential_recents=events,
                 newly_joined_room=newly_joined,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef6766b5e0..3c1492e3ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2267,35 +2267,59 @@ class PersistEventsStore:
 
         Forward extremities are handled when we first start persisting the events.
         """
-        # From the events passed in, add all of the prev events as backwards extremities.
-        # Ignore any events that are already backwards extrems or outliers.
-        query = (
-            "INSERT INTO event_backward_extremities (event_id, room_id)"
-            " SELECT ?, ? WHERE NOT EXISTS ("
-            "   SELECT 1 FROM event_backward_extremities"
-            "   WHERE event_id = ? AND room_id = ?"
-            " )"
-            # 1. Don't add an event as a extremity again if we already persisted it
-            # as a non-outlier.
-            # 2. Don't add an outlier as an extremity if it has no prev_events
-            " AND NOT EXISTS ("
-            "   SELECT 1 FROM events"
-            "   LEFT JOIN event_edges edge"
-            "   ON edge.event_id = events.event_id"
-            "   WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
-            " )"
+
+        room_id = events[0].room_id
+
+        potential_backwards_extremities = {
+            e_id
+            for ev in events
+            for e_id in ev.prev_event_ids()
+            if not ev.internal_metadata.is_outlier()
+        }
+
+        if not potential_backwards_extremities:
+            return
+
+        existing_events_outliers = self.db_pool.simple_select_many_txn(
+            txn,
+            table="events",
+            column="event_id",
+            iterable=potential_backwards_extremities,
+            keyvalues={"outlier": False},
+            retcols=("event_id",),
         )
 
-        txn.execute_batch(
-            query,
-            [
-                (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
-                for ev in events
-                for e_id in ev.prev_event_ids()
-                if not ev.internal_metadata.is_outlier()
-            ],
+        potential_backwards_extremities.difference_update(
+            e for e, in existing_events_outliers
         )
 
+        if potential_backwards_extremities:
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="event_backward_extremities",
+                key_names=("room_id", "event_id"),
+                key_values=[(room_id, ev) for ev in potential_backwards_extremities],
+                value_names=(),
+                value_values=(),
+            )
+
+            # Record the stream orderings where we have new gaps.
+            gap_events = [
+                (room_id, self._instance_name, ev.internal_metadata.stream_ordering)
+                for ev in events
+                if any(
+                    e_id in potential_backwards_extremities
+                    for e_id in ev.prev_event_ids()
+                )
+            ]
+
+            self.db_pool.simple_insert_many_txn(
+                txn,
+                table="timeline_gaps",
+                keys=("room_id", "instance_name", "stream_ordering"),
+                values=gap_events,
+            )
+
         # Delete all these events that we've already fetched and now know that their
         # prev events are the new backwards extremeties.
         query = (
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             retcol="instance_name",
             desc="get_name_from_instance_id",
         )
+
+    async def get_timeline_gaps(
+        self,
+        room_id: str,
+        from_token: Optional[RoomStreamToken],
+        to_token: RoomStreamToken,
+    ) -> Optional[RoomStreamToken]:
+        """Check if there is a gap, and return a token that marks the position
+        of the gap in the stream.
+        """
+
+        sql = """
+            SELECT instance_name, stream_ordering
+            FROM timeline_gaps
+            WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
+            ORDER BY stream_ordering
+        """
+
+        rows = await self.db_pool.execute(
+            "get_timeline_gaps",
+            None,
+            sql,
+            room_id,
+            from_token.stream if from_token else 0,
+            to_token.get_max_stream_pos(),
+        )
+
+        if not rows:
+            return None
+
+        positions = [
+            PersistedEventPosition(instance_name, stream_ordering)
+            for instance_name, stream_ordering in rows
+        ]
+        if from_token:
+            positions = [p for p in positions if p.persisted_after(from_token)]
+
+        positions = [p for p in positions if not p.persisted_after(to_token)]
+
+        if positions:
+            # We return a stream token that ensures the event *at* the position
+            # of the gap is included (as the gap is *before* the persisted
+            # event).
+            last_position = positions[-1]
+            return RoomStreamToken(stream=last_position.stream - 1)
+
+        return None
diff --git a/synapse/storage/schema/main/delta/82/05gaps.sql b/synapse/storage/schema/main/delta/82/05gaps.sql
new file mode 100644
index 0000000000..6813b488ca
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/05gaps.sql
@@ -0,0 +1,25 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Records when we see a "gap in the timeline", due to missing events over
+-- federation. We record this so that we can tell clients there is a gap (by
+-- marking the timeline section of a sync request as limited).
+CREATE TABLE IF NOT EXISTS timeline_gaps (
+    room_id TEXT NOT NULL,
+    instance_name TEXT NOT NULL,
+    stream_ordering BIGINT NOT NULL
+);
+
+CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);