summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-07-10 11:58:42 +0100
committerGitHub <noreply@github.com>2024-07-10 11:58:42 +0100
commit4ca13ce0dd6d1dc931cfde7e06191200ca0ec066 (patch)
tree1efce62ab2279a8881926e32275ecbb40f4725d7 /synapse/handlers
parentMerge branch 'release-v1.111' into develop (diff)
downloadsynapse-4ca13ce0dd6d1dc931cfde7e06191200ca0ec066.tar.xz
Handle to-device extensions to Sliding Sync (#17416)
Implements MSC3885

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/sliding_sync.py103
1 files changed, 102 insertions, 1 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index bb81ca9d97..818b13621c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -542,11 +542,15 @@ class SlidingSyncHandler:
 
             rooms[room_id] = room_sync_result
 
+        extensions = await self.get_extensions_response(
+            sync_config=sync_config, to_token=to_token
+        )
+
         return SlidingSyncResult(
             next_pos=to_token,
             lists=lists,
             rooms=rooms,
-            extensions={},
+            extensions=extensions,
         )
 
     async def get_sync_room_ids_for_user(
@@ -1445,3 +1449,100 @@ class SlidingSyncHandler:
             notification_count=0,
             highlight_count=0,
         )
+
+    async def get_extensions_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        to_token: StreamToken,
+    ) -> SlidingSyncResult.Extensions:
+        """Handle extension requests.
+
+        Args:
+            sync_config: Sync configuration
+            to_token: The point in the stream to sync up to.
+        """
+
+        if sync_config.extensions is None:
+            return SlidingSyncResult.Extensions()
+
+        to_device_response = None
+        if sync_config.extensions.to_device:
+            to_device_response = await self.get_to_device_extensions_response(
+                sync_config=sync_config,
+                to_device_request=sync_config.extensions.to_device,
+                to_token=to_token,
+            )
+
+        return SlidingSyncResult.Extensions(to_device=to_device_response)
+
+    async def get_to_device_extensions_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension,
+        to_token: StreamToken,
+    ) -> SlidingSyncResult.Extensions.ToDeviceExtension:
+        """Handle to-device extension (MSC3885)
+
+        Args:
+            sync_config: Sync configuration
+            to_device_request: The to-device extension from the request
+            to_token: The point in the stream to sync up to.
+        """
+
+        user_id = sync_config.user.to_string()
+        device_id = sync_config.device_id
+
+        # Check that this request has a valid device ID (not all requests have
+        # to belong to a device, and so device_id is None), and that the
+        # extension is enabled.
+        if device_id is None or not to_device_request.enabled:
+            return SlidingSyncResult.Extensions.ToDeviceExtension(
+                next_batch=f"{to_token.to_device_key}",
+                events=[],
+            )
+
+        since_stream_id = 0
+        if to_device_request.since is not None:
+            # We've already validated this is an int.
+            since_stream_id = int(to_device_request.since)
+
+            if to_token.to_device_key < since_stream_id:
+                # The since token is ahead of our current token, so we return an
+                # empty response.
+                logger.warning(
+                    "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r",
+                    since_stream_id,
+                    to_token.to_device_key,
+                )
+                return SlidingSyncResult.Extensions.ToDeviceExtension(
+                    next_batch=to_device_request.since,
+                    events=[],
+                )
+
+            # Delete everything before the given since token, as we know the
+            # device must have received them.
+            deleted = await self.store.delete_messages_for_device(
+                user_id=user_id,
+                device_id=device_id,
+                up_to_stream_id=since_stream_id,
+            )
+
+            logger.debug(
+                "Deleted %d to-device messages up to %d for %s",
+                deleted,
+                since_stream_id,
+                user_id,
+            )
+
+        messages, stream_id = await self.store.get_messages_for_device(
+            user_id=user_id,
+            device_id=device_id,
+            from_stream_id=since_stream_id,
+            to_stream_id=to_token.to_device_key,
+            limit=min(to_device_request.limit, 100),  # Limit to at most 100 events
+        )
+
+        return SlidingSyncResult.Extensions.ToDeviceExtension(
+            next_batch=f"{stream_id}",
+            events=messages,
+        )