summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py54
1 files changed, 42 insertions, 12 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 60a9f341b5..1a4d394eda 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
+from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME
 from synapse.handlers.relations import BundledAggregations
 from synapse.logging import issue9533_logger
 from synapse.logging.context import current_context
@@ -56,6 +57,7 @@ from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     DeviceListUpdates,
     JsonDict,
+    JsonMapping,
     MutableStateMap,
     Requester,
     RoomStreamToken,
@@ -268,6 +270,7 @@ class SyncHandler:
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
         self._device_handler = hs.get_device_handler()
+        self._task_scheduler = hs.get_task_scheduler()
 
         self.should_calculate_push_rules = hs.config.push.enable_push
 
@@ -360,13 +363,36 @@ class SyncHandler:
         # (since we now know that the device has received them)
         if since_token is not None:
             since_stream_id = since_token.to_device_key
+            # Fast path: delete a limited number of to-device messages up front.
+            # We do this to avoid the overhead of scheduling a task for every
+            # sync.
+            device_deletion_limit = 100
             deleted = await self.store.delete_messages_for_device(
-                sync_config.user.to_string(), sync_config.device_id, since_stream_id
+                sync_config.user.to_string(),
+                sync_config.device_id,
+                since_stream_id,
+                limit=device_deletion_limit,
             )
             logger.debug(
                 "Deleted %d to-device messages up to %d", deleted, since_stream_id
             )
 
+            # If we hit the limit, schedule a background task to delete the rest.
+            if deleted >= device_deletion_limit:
+                await self._task_scheduler.schedule_task(
+                    DELETE_DEVICE_MSGS_TASK_NAME,
+                    resource_id=sync_config.device_id,
+                    params={
+                        "user_id": sync_config.user.to_string(),
+                        "device_id": sync_config.device_id,
+                        "up_to_stream_id": since_stream_id,
+                    },
+                )
+                logger.debug(
+                    "Deletion of to-device messages up to %d scheduled",
+                    since_stream_id,
+                )
+
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
@@ -1768,19 +1794,23 @@ class SyncHandler:
             )
 
             if push_rules_changed:
-                global_account_data = dict(global_account_data)
-                global_account_data[
-                    AccountDataTypes.PUSH_RULES
-                ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
+                global_account_data = {
+                    AccountDataTypes.PUSH_RULES: await self._push_rules_handler.push_rules_for_user(
+                        sync_config.user
+                    ),
+                    **global_account_data,
+                }
         else:
             all_global_account_data = await self.store.get_global_account_data_for_user(
                 user_id
             )
 
-            global_account_data = dict(all_global_account_data)
-            global_account_data[
-                AccountDataTypes.PUSH_RULES
-            ] = await self._push_rules_handler.push_rules_for_user(sync_config.user)
+            global_account_data = {
+                AccountDataTypes.PUSH_RULES: await self._push_rules_handler.push_rules_for_user(
+                    sync_config.user
+                ),
+                **all_global_account_data,
+            }
 
         account_data_for_user = (
             await sync_config.filter_collection.filter_global_account_data(
@@ -1884,7 +1914,7 @@ class SyncHandler:
             blocks_all_rooms
             or sync_result_builder.sync_config.filter_collection.blocks_all_room_account_data()
         ):
-            account_data_by_room: Mapping[str, Mapping[str, JsonDict]] = {}
+            account_data_by_room: Mapping[str, Mapping[str, JsonMapping]] = {}
         elif since_token and not sync_result_builder.full_state:
             account_data_by_room = (
                 await self.store.get_updated_room_account_data_for_user(
@@ -2324,8 +2354,8 @@ class SyncHandler:
         sync_result_builder: "SyncResultBuilder",
         room_builder: "RoomSyncResultBuilder",
         ephemeral: List[JsonDict],
-        tags: Optional[Mapping[str, Mapping[str, Any]]],
-        account_data: Mapping[str, JsonDict],
+        tags: Optional[Mapping[str, JsonMapping]],
+        account_data: Mapping[str, JsonMapping],
         always_include: bool = False,
     ) -> None:
         """Populates the `joined` and `archived` section of `sync_result_builder`