diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index bb81ca9d97..818b13621c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -542,11 +542,15 @@ class SlidingSyncHandler:
rooms[room_id] = room_sync_result
+ extensions = await self.get_extensions_response(
+ sync_config=sync_config, to_token=to_token
+ )
+
return SlidingSyncResult(
next_pos=to_token,
lists=lists,
rooms=rooms,
- extensions={},
+ extensions=extensions,
)
async def get_sync_room_ids_for_user(
@@ -1445,3 +1449,100 @@ class SlidingSyncHandler:
notification_count=0,
highlight_count=0,
)
+
+ async def get_extensions_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ to_token: StreamToken,
+ ) -> SlidingSyncResult.Extensions:
+ """Handle extension requests.
+
+ Args:
+ sync_config: Sync configuration
+ to_token: The point in the stream to sync up to.
+ """
+
+ if sync_config.extensions is None:
+ return SlidingSyncResult.Extensions()
+
+ to_device_response = None
+ if sync_config.extensions.to_device:
+ to_device_response = await self.get_to_device_extensions_response(
+ sync_config=sync_config,
+ to_device_request=sync_config.extensions.to_device,
+ to_token=to_token,
+ )
+
+ return SlidingSyncResult.Extensions(to_device=to_device_response)
+
+ async def get_to_device_extensions_response(
+ self,
+ sync_config: SlidingSyncConfig,
+ to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension,
+ to_token: StreamToken,
+ ) -> SlidingSyncResult.Extensions.ToDeviceExtension:
+ """Handle to-device extension (MSC3885)
+
+ Args:
+ sync_config: Sync configuration
+ to_device_request: The to-device extension from the request
+ to_token: The point in the stream to sync up to.
+ """
+
+ user_id = sync_config.user.to_string()
+ device_id = sync_config.device_id
+
+ # Check that this request has a valid device ID (not all requests have
+ # to belong to a device, and so device_id is None), and that the
+ # extension is enabled.
+ if device_id is None or not to_device_request.enabled:
+ return SlidingSyncResult.Extensions.ToDeviceExtension(
+ next_batch=f"{to_token.to_device_key}",
+ events=[],
+ )
+
+ since_stream_id = 0
+ if to_device_request.since is not None:
+ # We've already validated this is an int.
+ since_stream_id = int(to_device_request.since)
+
+ if to_token.to_device_key < since_stream_id:
+ # The since token is ahead of our current token, so we return an
+ # empty response.
+ logger.warning(
+ "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r",
+ since_stream_id,
+ to_token.to_device_key,
+ )
+ return SlidingSyncResult.Extensions.ToDeviceExtension(
+ next_batch=to_device_request.since,
+ events=[],
+ )
+
+ # Delete everything before the given since token, as we know the
+ # device must have received them.
+ deleted = await self.store.delete_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ up_to_stream_id=since_stream_id,
+ )
+
+ logger.debug(
+ "Deleted %d to-device messages up to %d for %s",
+ deleted,
+ since_stream_id,
+ user_id,
+ )
+
+ messages, stream_id = await self.store.get_messages_for_device(
+ user_id=user_id,
+ device_id=device_id,
+ from_stream_id=since_stream_id,
+ to_stream_id=to_token.to_device_key,
+ limit=min(to_device_request.limit, 100), # Limit to at most 100 events
+ )
+
+ return SlidingSyncResult.Extensions.ToDeviceExtension(
+ next_batch=f"{stream_id}",
+ events=messages,
+ )
|