diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7bd42f635f..2f1bc5a015 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,7 +40,6 @@ from synapse.api.filtering import FilterCollection
from synapse.api.presence import UserPresenceState
from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
from synapse.events import EventBase
-from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
from synapse.handlers.relations import BundledAggregations
from synapse.logging import issue9533_logger
from synapse.logging.context import current_context
@@ -58,6 +57,7 @@ from synapse.types import (
DeviceListUpdates,
JsonDict,
JsonMapping,
+ MultiWriterStreamToken,
MutableStateMap,
Requester,
RoomStreamToken,
@@ -363,36 +363,15 @@ class SyncHandler:
# (since we now know that the device has received them)
if since_token is not None:
since_stream_id = since_token.to_device_key
- # Fast path: delete a limited number of to-device messages up front.
- # We do this to avoid the overhead of scheduling a task for every
- # sync.
- device_deletion_limit = 100
deleted = await self.store.delete_messages_for_device(
sync_config.user.to_string(),
sync_config.device_id,
since_stream_id,
- limit=device_deletion_limit,
)
logger.debug(
"Deleted %d to-device messages up to %d", deleted, since_stream_id
)
- # If we hit the limit, schedule a background task to delete the rest.
- if deleted >= device_deletion_limit:
- await self._task_scheduler.schedule_task(
- DELETE_DEVICE_MSGS_TASK_NAME,
- resource_id=sync_config.device_id,
- params={
- "user_id": sync_config.user.to_string(),
- "device_id": sync_config.device_id,
- "up_to_stream_id": since_stream_id,
- },
- )
- logger.debug(
- "Deletion of to-device messages up to %d scheduled",
- since_stream_id,
- )
-
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
@@ -499,7 +478,11 @@ class SyncHandler:
event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
ephemeral_by_room.setdefault(room_id, []).append(event_copy)
- receipt_key = since_token.receipt_key if since_token else 0
+ receipt_key = (
+ since_token.receipt_key
+ if since_token
+ else MultiWriterStreamToken(stream=0)
+ )
receipt_source = self.event_sources.sources.receipt
receipts, receipt_key = await receipt_source.get_new_events(
@@ -522,12 +505,27 @@ class SyncHandler:
async def _load_filtered_recents(
self,
room_id: str,
+ sync_result_builder: "SyncResultBuilder",
sync_config: SyncConfig,
- now_token: StreamToken,
+ upto_token: StreamToken,
since_token: Optional[StreamToken] = None,
potential_recents: Optional[List[EventBase]] = None,
newly_joined_room: bool = False,
) -> TimelineBatch:
+ """Create a timeline batch for the room
+
+ Args:
+ room_id
+ sync_result_builder
+ sync_config
+ upto_token: The token up to which we should fetch (more) events.
+ If `potential_results` is non-empty then this is *start* of
+ the the list.
+ since_token
+ potential_recents: If non-empty, the events between the since token
+ and current token to send down to clients.
+ newly_joined_room
+ """
with Measure(self.clock, "load_filtered_recents"):
timeline_limit = sync_config.filter_collection.timeline_limit()
block_all_timeline = (
@@ -543,6 +541,20 @@ class SyncHandler:
else:
limited = False
+ # Check if there is a gap, if so we need to mark this as limited and
+ # recalculate which events to send down.
+ gap_token = await self.store.get_timeline_gaps(
+ room_id,
+ since_token.room_key if since_token else None,
+ sync_result_builder.now_token.room_key,
+ )
+ if gap_token:
+ # There's a gap, so we need to ignore the passed in
+ # `potential_recents`, and reset `upto_token` to match.
+ potential_recents = None
+ upto_token = sync_result_builder.now_token
+ limited = True
+
log_kv({"limited": limited})
if potential_recents:
@@ -581,10 +593,10 @@ class SyncHandler:
recents = []
if not limited or block_all_timeline:
- prev_batch_token = now_token
+ prev_batch_token = upto_token
if recents:
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(
+ prev_batch_token = upto_token.copy_and_replace(
StreamKeyType.ROOM, room_key
)
@@ -595,11 +607,15 @@ class SyncHandler:
filtering_factor = 2
load_limit = max(timeline_limit * filtering_factor, 10)
max_repeat = 5 # Only try a few times per room, otherwise
- room_key = now_token.room_key
+ room_key = upto_token.room_key
end_key = room_key
since_key = None
- if since_token and not newly_joined_room:
+ if since_token and gap_token:
+ # If there is a gap then we need to only include events after
+ # it.
+ since_key = gap_token
+ elif since_token and not newly_joined_room:
since_key = since_token.room_key
while limited and len(recents) < timeline_limit and max_repeat:
@@ -669,7 +685,7 @@ class SyncHandler:
recents = recents[-timeline_limit:]
room_key = recents[0].internal_metadata.before
- prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
+ prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
# Don't bother to bundle aggregations if the timeline is unlimited,
# as clients will have all the necessary information.
@@ -684,7 +700,9 @@ class SyncHandler:
return TimelineBatch(
events=recents,
prev_batch=prev_batch_token,
- limited=limited or newly_joined_room,
+ # Also mark as limited if this is a new room or there has been a gap
+ # (to force client to paginate the gap).
+ limited=limited or newly_joined_room or gap_token is not None,
bundled_aggregations=bundled_aggregations,
)
@@ -1499,7 +1517,7 @@ class SyncHandler:
# Presence data is included if the server has it enabled and not filtered out.
include_presence_data = bool(
- self.hs_config.server.use_presence
+ self.hs_config.server.presence_enabled
and not sync_config.filter_collection.blocks_all_presence()
)
# Device list updates are sent if a since token is provided.
@@ -2333,7 +2351,7 @@ class SyncHandler:
continue
leave_token = now_token.copy_and_replace(
- StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
+ StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
)
room_entries.append(
RoomSyncResultBuilder(
@@ -2419,8 +2437,9 @@ class SyncHandler:
batch = await self._load_filtered_recents(
room_id,
+ sync_result_builder,
sync_config,
- now_token=upto_token,
+ upto_token=upto_token,
since_token=since_token,
potential_recents=events,
newly_joined_room=newly_joined,
|