summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py85
1 files changed, 52 insertions, 33 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 7bd42f635f..2f1bc5a015 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,7 +40,6 @@ from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
-from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
 from synapse.handlers.relations import BundledAggregations
 from synapse.logging import issue9533_logger
 from synapse.logging.context import current_context
@@ -58,6 +57,7 @@ from synapse.types import (
     DeviceListUpdates,
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     MutableStateMap,
     Requester,
     RoomStreamToken,
@@ -363,36 +363,15 @@ class SyncHandler:
         # (since we now know that the device has received them)
         if since_token is not None:
             since_stream_id = since_token.to_device_key
-            # Fast path: delete a limited number of to-device messages up front.
-            # We do this to avoid the overhead of scheduling a task for every
-            # sync.
-            device_deletion_limit = 100
             deleted = await self.store.delete_messages_for_device(
                 sync_config.user.to_string(),
                 sync_config.device_id,
                 since_stream_id,
-                limit=device_deletion_limit,
             )
             logger.debug(
                 "Deleted %d to-device messages up to %d", deleted, since_stream_id
             )
 
-            # If we hit the limit, schedule a background task to delete the rest.
-            if deleted >= device_deletion_limit:
-                await self._task_scheduler.schedule_task(
-                    DELETE_DEVICE_MSGS_TASK_NAME,
-                    resource_id=sync_config.device_id,
-                    params={
-                        "user_id": sync_config.user.to_string(),
-                        "device_id": sync_config.device_id,
-                        "up_to_stream_id": since_stream_id,
-                    },
-                )
-                logger.debug(
-                    "Deletion of to-device messages up to %d scheduled",
-                    since_stream_id,
-                )
-
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
@@ -499,7 +478,11 @@ class SyncHandler:
                 event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
-            receipt_key = since_token.receipt_key if since_token else 0
+            receipt_key = (
+                since_token.receipt_key
+                if since_token
+                else MultiWriterStreamToken(stream=0)
+            )
 
             receipt_source = self.event_sources.sources.receipt
             receipts, receipt_key = await receipt_source.get_new_events(
@@ -522,12 +505,27 @@ class SyncHandler:
     async def _load_filtered_recents(
         self,
         room_id: str,
+        sync_result_builder: "SyncResultBuilder",
         sync_config: SyncConfig,
-        now_token: StreamToken,
+        upto_token: StreamToken,
         since_token: Optional[StreamToken] = None,
         potential_recents: Optional[List[EventBase]] = None,
         newly_joined_room: bool = False,
     ) -> TimelineBatch:
+        """Create a timeline batch for the room
+
+        Args:
+            room_id
+            sync_result_builder
+            sync_config
+            upto_token: The token up to which we should fetch (more) events.
+                If `potential_results` is non-empty then this is *start* of
+                the the list.
+            since_token
+            potential_recents: If non-empty, the events between the since token
+                and current token to send down to clients.
+            newly_joined_room
+        """
         with Measure(self.clock, "load_filtered_recents"):
             timeline_limit = sync_config.filter_collection.timeline_limit()
             block_all_timeline = (
@@ -543,6 +541,20 @@ class SyncHandler:
             else:
                 limited = False
 
+            # Check if there is a gap, if so we need to mark this as limited and
+            # recalculate which events to send down.
+            gap_token = await self.store.get_timeline_gaps(
+                room_id,
+                since_token.room_key if since_token else None,
+                sync_result_builder.now_token.room_key,
+            )
+            if gap_token:
+                # There's a gap, so we need to ignore the passed in
+                # `potential_recents`, and reset `upto_token` to match.
+                potential_recents = None
+                upto_token = sync_result_builder.now_token
+                limited = True
+
             log_kv({"limited": limited})
 
             if potential_recents:
@@ -581,10 +593,10 @@ class SyncHandler:
                 recents = []
 
             if not limited or block_all_timeline:
-                prev_batch_token = now_token
+                prev_batch_token = upto_token
                 if recents:
                     room_key = recents[0].internal_metadata.before
-                    prev_batch_token = now_token.copy_and_replace(
+                    prev_batch_token = upto_token.copy_and_replace(
                         StreamKeyType.ROOM, room_key
                     )
 
@@ -595,11 +607,15 @@ class SyncHandler:
             filtering_factor = 2
             load_limit = max(timeline_limit * filtering_factor, 10)
             max_repeat = 5  # Only try a few times per room, otherwise
-            room_key = now_token.room_key
+            room_key = upto_token.room_key
             end_key = room_key
 
             since_key = None
-            if since_token and not newly_joined_room:
+            if since_token and gap_token:
+                # If there is a gap then we need to only include events after
+                # it.
+                since_key = gap_token
+            elif since_token and not newly_joined_room:
                 since_key = since_token.room_key
 
             while limited and len(recents) < timeline_limit and max_repeat:
@@ -669,7 +685,7 @@ class SyncHandler:
                 recents = recents[-timeline_limit:]
                 room_key = recents[0].internal_metadata.before
 
-            prev_batch_token = now_token.copy_and_replace(StreamKeyType.ROOM, room_key)
+            prev_batch_token = upto_token.copy_and_replace(StreamKeyType.ROOM, room_key)
 
         # Don't bother to bundle aggregations if the timeline is unlimited,
         # as clients will have all the necessary information.
@@ -684,7 +700,9 @@ class SyncHandler:
         return TimelineBatch(
             events=recents,
             prev_batch=prev_batch_token,
-            limited=limited or newly_joined_room,
+            # Also mark as limited if this is a new room or there has been a gap
+            # (to force client to paginate the gap).
+            limited=limited or newly_joined_room or gap_token is not None,
             bundled_aggregations=bundled_aggregations,
         )
 
@@ -1499,7 +1517,7 @@ class SyncHandler:
 
         # Presence data is included if the server has it enabled and not filtered out.
         include_presence_data = bool(
-            self.hs_config.server.use_presence
+            self.hs_config.server.presence_enabled
             and not sync_config.filter_collection.blocks_all_presence()
         )
         # Device list updates are sent if a since token is provided.
@@ -2333,7 +2351,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    StreamKeyType.ROOM, RoomStreamToken(None, event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(
@@ -2419,8 +2437,9 @@ class SyncHandler:
 
             batch = await self._load_filtered_recents(
                 room_id,
+                sync_result_builder,
                 sync_config,
-                now_token=upto_token,
+                upto_token=upto_token,
                 since_token=since_token,
                 potential_recents=events,
                 newly_joined_room=newly_joined,