diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60b4d95cd7..f131c0e8e0 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -500,12 +500,27 @@ class SyncHandler:
async def _load_filtered_recents(
self,
room_id: str,
+ sync_result_builder: "SyncResultBuilder",
sync_config: SyncConfig,
- now_token: StreamToken,
+ upto_token: StreamToken,
since_token: Optional[StreamToken] = None,
potential_recents: Optional[List[EventBase]] = None,
newly_joined_room: bool = False,
) -> TimelineBatch:
+ """Create a timeline batch for the room
+
+ Args:
+ room_id
+ sync_result_builder
+ sync_config
+ upto_token: The token up to which we should fetch (more) events.
+ If `potential_results` is non-empty then this is *start* of
+ the the list.
+ since_token
+ potential_recents: If non-empty, the events between the since token
+ and current token to send down to clients.
+ newly_joined_room
+ """
with Measure(self.clock, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
@@ -521,6 +536,20 @@ class SyncHandler:
else:
limited = False
+ # Check if there is a gap, if so we need to mark this as limited and
+ # recalculate which events to send down.
+ gap_token = await self.store.get_timeline_gaps(
+ room_id,
+ since_token.room_key if since_token else None,
+ sync_result_builder.now_token.room_key,
+ )
+ if gap_token:
+ # There's a gap, so we need to ignore the passed in
+ # `potential_recents`, and reset `upto_token` to match.
+ potential_recents = None
+ upto_token = sync_result_builder.now_token
+ limited = True
+
log_kv({"limited": limited})
if potential_recents:
@@ -559,10 +588,10 @@ class SyncHandler:
recents = []
if not limited or block_all_timeline:
- prev_batch_token = now_token
+ prev_batch_token = upto_token
if recents:
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(
+ prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key
)
@@ -573,11 +602,15 @@ class SyncHandler:
filtering_factor = 2
load_limit = max(timeline_limit * filtering_factor, 10)
max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
+ room_key = upto_token.room_key
end_key = room_key
since_key = None
- if since_token and not newly_joined_room:
+ if since_token and gap_token:
+ # If there is a gap then we need to only include events after
+ # it.
+ since_key = gap_token
+ elif since_token and not newly_joined_room:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
@@ -647,7 +680,7 @@ class SyncHandler:
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
+ prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
@@ -662,7 +695,9 @@ class SyncHandler:
return TimelineBatch(
events=recents,
prev_batch=prev_batch_token,
- limited=limited or newly_joined_room,
+ # Also mark as limited if this is a new room or there has been a gap
+ # (to force client to paginate the gap).
+ limited=limited or newly_joined_room or gap_token is not None,
bundled_aggregations=bundled_aggregations,
)
@@ -2397,8 +2432,9 @@ class SyncHandler:
batch = await self._load_filtered_recents(
room_id,
+ sync_result_builder,
sync_config,
- now_token=upto_token,
+ upto_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
|