diff --git a/changelog.d/16485.bugfix b/changelog.d/16485.bugfix
new file mode 100644
index 0000000000..3cd7e1877f
--- /dev/null
+++ b/changelog.d/16485.bugfix
@@ -0,0 +1 @@
+Fix long-standing bug where `/sync` incorrectly did not mark a room as `limited` in a sync requests when there were missing remote events.
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60b4d95cd7..f131c0e8e0 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -500,12 +500,27 @@ class SyncHandler:
async def _load_filtered_recents(
self,
room_id: str,
+ sync_result_builder: "SyncResultBuilder",
sync_config: SyncConfig,
- now_token: StreamToken,
+ upto_token: StreamToken,
since_token: Optional[StreamToken] = None,
potential_recents: Optional[List[EventBase]] = None,
newly_joined_room: bool = False,
) -> TimelineBatch:
+ """Create a timeline batch for the room
+
+ Args:
+ room_id
+ sync_result_builder
+ sync_config
+ upto_token: The token up to which we should fetch (more) events.
+ If `potential_results` is non-empty then this is *start* of
+ the the list.
+ since_token
+ potential_recents: If non-empty, the events between the since token
+ and current token to send down to clients.
+ newly_joined_room
+ """
with Measure(self.clock, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
@@ -521,6 +536,20 @@ class SyncHandler:
else:
limited = False
+ # Check if there is a gap, if so we need to mark this as limited and
+ # recalculate which events to send down.
+ gap_token = await self.store.get_timeline_gaps(
+ room_id,
+ since_token.room_key if since_token else None,
+ sync_result_builder.now_token.room_key,
+ )
+ if gap_token:
+ # There's a gap, so we need to ignore the passed in
+ # `potential_recents`, and reset `upto_token` to match.
+ potential_recents = None
+ upto_token = sync_result_builder.now_token
+ limited = True
+
log_kv({"limited": limited})
if potential_recents:
@@ -559,10 +588,10 @@ class SyncHandler:
recents = []
if not limited or block_all_timeline:
- prev_batch_token = now_token
+ prev_batch_token = upto_token
if recents:
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(
+ prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key
)
@@ -573,11 +602,15 @@ class SyncHandler:
filtering_factor = 2
load_limit = max(timeline_limit * filtering_factor, 10)
max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
+ room_key = upto_token.room_key
end_key = room_key
since_key = None
- if since_token and not newly_joined_room:
+ if since_token and gap_token:
+ # If there is a gap then we need to only include events after
+ # it.
+ since_key = gap_token
+ elif since_token and not newly_joined_room:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
@@ -647,7 +680,7 @@ class SyncHandler:
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
+ prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
@@ -662,7 +695,9 @@ class SyncHandler:
return TimelineBatch(
events=recents,
prev_batch=prev_batch_token,
- limited=limited or newly_joined_room,
+ # Also mark as limited if this is a new room or there has been a gap
+ # (to force client to paginate the gap).
+ limited=limited or newly_joined_room or gap_token is not None,
bundled_aggregations=bundled_aggregations,
)
@@ -2397,8 +2432,9 @@ class SyncHandler:
batch = await self._load_filtered_recents(
room_id,
+ sync_result_builder,
sync_config,
- now_token=upto_token,
+ upto_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index ef6766b5e0..3c1492e3ad 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -2267,35 +2267,59 @@ class PersistEventsStore:
Forward extremities are handled when we first start persisting the events.
"""
- # From the events passed in, add all of the prev events as backwards extremities.
- # Ignore any events that are already backwards extrems or outliers.
- query = (
- "INSERT INTO event_backward_extremities (event_id, room_id)"
- " SELECT ?, ? WHERE NOT EXISTS ("
- " SELECT 1 FROM event_backward_extremities"
- " WHERE event_id = ? AND room_id = ?"
- " )"
- # 1. Don't add an event as a extremity again if we already persisted it
- # as a non-outlier.
- # 2. Don't add an outlier as an extremity if it has no prev_events
- " AND NOT EXISTS ("
- " SELECT 1 FROM events"
- " LEFT JOIN event_edges edge"
- " ON edge.event_id = events.event_id"
- " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)"
- " )"
+
+ room_id = events[0].room_id
+
+ potential_backwards_extremities = {
+ e_id
+ for ev in events
+ for e_id in ev.prev_event_ids()
+ if not ev.internal_metadata.is_outlier()
+ }
+
+ if not potential_backwards_extremities:
+ return
+
+ existing_events_outliers = self.db_pool.simple_select_many_txn(
+ txn,
+ table="events",
+ column="event_id",
+ iterable=potential_backwards_extremities,
+ keyvalues={"outlier": False},
+ retcols=("event_id",),
)
- txn.execute_batch(
- query,
- [
- (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id)
- for ev in events
- for e_id in ev.prev_event_ids()
- if not ev.internal_metadata.is_outlier()
- ],
+ potential_backwards_extremities.difference_update(
+ e for e, in existing_events_outliers
)
+ if potential_backwards_extremities:
+ self.db_pool.simple_upsert_many_txn(
+ txn,
+ table="event_backward_extremities",
+ key_names=("room_id", "event_id"),
+ key_values=[(room_id, ev) for ev in potential_backwards_extremities],
+ value_names=(),
+ value_values=(),
+ )
+
+ # Record the stream orderings where we have new gaps.
+ gap_events = [
+ (room_id, self._instance_name, ev.internal_metadata.stream_ordering)
+ for ev in events
+ if any(
+ e_id in potential_backwards_extremities
+ for e_id in ev.prev_event_ids()
+ )
+ ]
+
+ self.db_pool.simple_insert_many_txn(
+ txn,
+ table="timeline_gaps",
+ keys=("room_id", "instance_name", "stream_ordering"),
+ values=gap_events,
+ )
+
# Delete all these events that we've already fetched and now know that their
# prev events are the new backwards extremeties.
query = (
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index ea06e4eee0..872df6bda1 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -1616,3 +1616,50 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
retcol="instance_name",
desc="get_name_from_instance_id",
)
+
+ async def get_timeline_gaps(
+ self,
+ room_id: str,
+ from_token: Optional[RoomStreamToken],
+ to_token: RoomStreamToken,
+ ) -> Optional[RoomStreamToken]:
+ """Check if there is a gap, and return a token that marks the position
+ of the gap in the stream.
+ """
+
+ sql = """
+ SELECT instance_name, stream_ordering
+ FROM timeline_gaps
+ WHERE room_id = ? AND ? < stream_ordering AND stream_ordering <= ?
+ ORDER BY stream_ordering
+ """
+
+ rows = await self.db_pool.execute(
+ "get_timeline_gaps",
+ None,
+ sql,
+ room_id,
+ from_token.stream if from_token else 0,
+ to_token.get_max_stream_pos(),
+ )
+
+ if not rows:
+ return None
+
+ positions = [
+ PersistedEventPosition(instance_name, stream_ordering)
+ for instance_name, stream_ordering in rows
+ ]
+ if from_token:
+ positions = [p for p in positions if p.persisted_after(from_token)]
+
+ positions = [p for p in positions if not p.persisted_after(to_token)]
+
+ if positions:
+ # We return a stream token that ensures the event *at* the position
+ # of the gap is included (as the gap is *before* the persisted
+ # event).
+ last_position = positions[-1]
+ return RoomStreamToken(stream=last_position.stream - 1)
+
+ return None
diff --git a/synapse/storage/schema/main/delta/82/05gaps.sql b/synapse/storage/schema/main/delta/82/05gaps.sql
new file mode 100644
index 0000000000..6813b488ca
--- /dev/null
+++ b/synapse/storage/schema/main/delta/82/05gaps.sql
@@ -0,0 +1,25 @@
+/* Copyright 2023 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Records when we see a "gap in the timeline", due to missing events over
+-- federation. We record this so that we can tell clients there is a gap (by
+-- marking the timeline section of a sync request as limited).
+CREATE TABLE IF NOT EXISTS timeline_gaps (
+ room_id TEXT NOT NULL,
+ instance_name TEXT NOT NULL,
+ stream_ordering BIGINT NOT NULL
+);
+
+CREATE INDEX timeline_gaps_room_id ON timeline_gaps(room_id, stream_ordering);
|