diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef6766b5e0..3c1492e3ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2267,35 +2267,59 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
- # From the events passed in, add all of the prev events as backwards extremities.
- # Ignore any events that are already backwards extrems or outliers.
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- # 1. Don't add an event as a extremity again if we already persisted it
- # as a non-outlier.
- # 2. Don't add an outlier as an extremity if it has no prev_events
- " AND NOT EXISTS ("
- " SELECT 1 FROM events"
- " LEFT JOIN event_edges edge"
- " ON edge.event_id = events.event_id"
- " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
- " )"
+
+ room_id = events[0].room_id
+
+ potential_backwards_extremities = {
+ e_id
+ for ev in events
+ for e_id in ev.prev_event_ids()
+ if not ev.internal_metadata.is_outlier()
+ }
+
+ if not potential_backwards_extremities:
+ return
+
+ existing_events_outliers = self.db_pool.simple_select_many_txn(
+ txn,
+ table="events",
+ column="event_id",
+ iterable=potential_backwards_extremities,
+ keyvalues={"outlier": False},
+ retcols=("event_id",),
)
- txn.execute_batch(
- query,
- [
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
- for ev in events
- for e_id in ev.prev_event_ids()
- if not ev.internal_metadata.is_outlier()
- ],
+ potential_backwards_extremities.difference_update(
+ e for e, in existing_events_outliers
)
+ if potential_backwards_extremities:
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="event_backward_extremities",
+ key_names=("room_id", "event_id"),
+ key_values=[(room_id, ev) for ev in potential_backwards_extremities],
+ value_names=(),
+ value_values=(),
+ )
+
+ # Record the stream orderings where we have new gaps.
+ gap_events = [
+ (room_id, self._instance_name, ev.internal_metadata.stream_ordering)
+ for ev in events
+ if any(
+ e_id in potential_backwards_extremities
+ for e_id in ev.prev_event_ids()
+ )
+ ]
+
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="timeline_gaps",
+ keys=("room_id", "instance_name", "stream_ordering"),
+ values=gap_events,
+ )
+
# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
retcol="instance_name",
desc="get_name_from_instance_id",
)
+
+ async def get_timeline_gaps(
+ self,
+ room_id: str,
+ from_token: Optional[RoomStreamToken],
+ to_token: RoomStreamToken,
+ ) -> Optional[RoomStreamToken]:
+ """Check if there is a gap, and return a token that marks the position
+ of the gap in the stream.
+ """
+
+ sql = """
+ SELECT instance_name, stream_ordering
+ FROM timeline_gaps
+ WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
+ ORDER BY stream_ordering
+ """
+
+ rows = await self.db_pool.execute(
+ "get_timeline_gaps",
+ None,
+ sql,
+ room_id,
+ from_token.stream if from_token else 0,
+ to_token.get_max_stream_pos(),
+ )
+
+ if not rows:
+ return None
+
+ positions = [
+ PersistedEventPosition(instance_name, stream_ordering)
+ for instance_name, stream_ordering in rows
+ ]
+ if from_token:
+ positions = [p for p in positions if p.persisted_after(from_token)]
+
+ positions = [p for p in positions if not p.persisted_after(to_token)]
+
+ if positions:
+ # We return a stream token that ensures the event *at* the position
+ # of the gap is included (as the gap is *before* the persisted
+ # event).
+ last_position = positions[-1]
+ return RoomStreamToken(stream=last_position.stream - 1)
+
+ return None
diff --git a/synapse/storage/schema/main/delta/82/05gaps.sql b/synapse/storage/schema/main/delta/82/05gaps.sql
new file mode 100644
index 0000000000..6813b488ca
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/05gaps.sql
@@ -0,0 +1,25 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Records when we see a "gap in the timeline", due to missing events over
+-- federation. We record this so that we can tell clients there is a gap (by
+-- marking the timeline section of a sync request as limited).
+CREATE TABLE IF NOT EXISTS timeline_gaps (
+ room_id TEXT NOT NULL,
+ instance_name TEXT NOT NULL,
+ stream_ordering BIGINT NOT NULL
+);
+
+CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);
|