summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-05-23 12:06:16 -0500
committerGitHub <noreply@github.com>2024-05-23 12:06:16 -0500
commitc97251d5ba53b905036b3181afaa9c792777d1ff (patch)
treeb129b1a1517b1857f3ab4ad2463fb3b80d4a5708 /synapse/handlers
parentLog exceptions when failing to auto-join new user according to the `auto_join... (diff)
downloadsynapse-c97251d5ba53b905036b3181afaa9c792777d1ff.tar.xz
Add Sliding Sync `/sync/e2ee` endpoint for To-Device messages (#17167)
This is being introduced as part of Sliding Sync but doesn't have any sliding window component. It's just a way to get E2EE events without having to sit through a big initial sync  (`/sync` v2). And we can avoid encryption events being backed up by the main sync response or vice-versa.

Part of some Sliding Sync simplification/experimentation. See [this discussion](https://github.com/element-hq/synapse/pull/17167#discussion_r1610495866) for why it may not be as useful as we thought.

Based on:

 - https://github.com/matrix-org/matrix-spec-proposals/pull/3575
 - https://github.com/matrix-org/matrix-spec-proposals/pull/3885
 - https://github.com/matrix-org/matrix-spec-proposals/pull/3884
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/sync.py247
1 files changed, 237 insertions, 10 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b7917a99d6..ac5bddd52f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,11 +28,14 @@ from typing import (
     Dict,
     FrozenSet,
     List,
+    Literal,
     Mapping,
     Optional,
     Sequence,
     Set,
     Tuple,
+    Union,
+    overload,
 )
 
 import attr
@@ -128,6 +131,8 @@ class SyncVersion(Enum):
 
     # Traditional `/sync` endpoint
     SYNC_V2 = "sync_v2"
+    # Part of MSC3575 Sliding Sync
+    E2EE_SYNC = "e2ee_sync"
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -280,6 +285,26 @@ class SyncResult:
         )
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class E2eeSyncResult:
+    """
+    Attributes:
+        next_batch: Token for the next sync
+        to_device: List of direct messages for the device.
+        device_lists: List of user_ids whose devices have changed
+        device_one_time_keys_count: Dict of algorithm to count for one time keys
+            for this device
+        device_unused_fallback_key_types: List of key types that have an unused fallback
+            key
+    """
+
+    next_batch: StreamToken
+    to_device: List[JsonDict]
+    device_lists: DeviceListUpdates
+    device_one_time_keys_count: JsonMapping
+    device_unused_fallback_key_types: List[str]
+
+
 class SyncHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs_config = hs.config
@@ -322,6 +347,31 @@ class SyncHandler:
 
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
+    @overload
+    async def wait_for_sync_for_user(
+        self,
+        requester: Requester,
+        sync_config: SyncConfig,
+        sync_version: Literal[SyncVersion.SYNC_V2],
+        request_key: SyncRequestKey,
+        since_token: Optional[StreamToken] = None,
+        timeout: int = 0,
+        full_state: bool = False,
+    ) -> SyncResult: ...
+
+    @overload
+    async def wait_for_sync_for_user(
+        self,
+        requester: Requester,
+        sync_config: SyncConfig,
+        sync_version: Literal[SyncVersion.E2EE_SYNC],
+        request_key: SyncRequestKey,
+        since_token: Optional[StreamToken] = None,
+        timeout: int = 0,
+        full_state: bool = False,
+    ) -> E2eeSyncResult: ...
+
+    @overload
     async def wait_for_sync_for_user(
         self,
         requester: Requester,
@@ -331,7 +381,18 @@ class SyncHandler:
         since_token: Optional[StreamToken] = None,
         timeout: int = 0,
         full_state: bool = False,
-    ) -> SyncResult:
+    ) -> Union[SyncResult, E2eeSyncResult]: ...
+
+    async def wait_for_sync_for_user(
+        self,
+        requester: Requester,
+        sync_config: SyncConfig,
+        sync_version: SyncVersion,
+        request_key: SyncRequestKey,
+        since_token: Optional[StreamToken] = None,
+        timeout: int = 0,
+        full_state: bool = False,
+    ) -> Union[SyncResult, E2eeSyncResult]:
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
@@ -344,8 +405,10 @@ class SyncHandler:
             since_token: The point in the stream to sync from.
             timeout: How long to wait for new data to arrive before giving up.
             full_state: Whether to return the full state for each room.
+
         Returns:
             When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
+            When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
         """
         # If the user is not part of the mau group, then check that limits have
         # not been exceeded (if not part of the group by this point, almost certain
@@ -366,6 +429,29 @@ class SyncHandler:
         logger.debug("Returning sync response for %s", user_id)
         return res
 
+    @overload
+    async def _wait_for_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        sync_version: Literal[SyncVersion.SYNC_V2],
+        since_token: Optional[StreamToken],
+        timeout: int,
+        full_state: bool,
+        cache_context: ResponseCacheContext[SyncRequestKey],
+    ) -> SyncResult: ...
+
+    @overload
+    async def _wait_for_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        sync_version: Literal[SyncVersion.E2EE_SYNC],
+        since_token: Optional[StreamToken],
+        timeout: int,
+        full_state: bool,
+        cache_context: ResponseCacheContext[SyncRequestKey],
+    ) -> E2eeSyncResult: ...
+
+    @overload
     async def _wait_for_sync_for_user(
         self,
         sync_config: SyncConfig,
@@ -374,7 +460,17 @@ class SyncHandler:
         timeout: int,
         full_state: bool,
         cache_context: ResponseCacheContext[SyncRequestKey],
-    ) -> SyncResult:
+    ) -> Union[SyncResult, E2eeSyncResult]: ...
+
+    async def _wait_for_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        sync_version: SyncVersion,
+        since_token: Optional[StreamToken],
+        timeout: int,
+        full_state: bool,
+        cache_context: ResponseCacheContext[SyncRequestKey],
+    ) -> Union[SyncResult, E2eeSyncResult]:
         """The start of the machinery that produces a /sync response.
 
         See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
@@ -417,14 +513,16 @@ class SyncHandler:
         if timeout == 0 or since_token is None or full_state:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
-            result: SyncResult = await self.current_sync_for_user(
-                sync_config, sync_version, since_token, full_state=full_state
+            result: Union[SyncResult, E2eeSyncResult] = (
+                await self.current_sync_for_user(
+                    sync_config, sync_version, since_token, full_state=full_state
+                )
             )
         else:
             # Otherwise, we wait for something to happen and report it to the user.
             async def current_sync_callback(
                 before_token: StreamToken, after_token: StreamToken
-            ) -> SyncResult:
+            ) -> Union[SyncResult, E2eeSyncResult]:
                 return await self.current_sync_for_user(
                     sync_config, sync_version, since_token
                 )
@@ -456,14 +554,43 @@ class SyncHandler:
 
         return result
 
+    @overload
+    async def current_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        sync_version: Literal[SyncVersion.SYNC_V2],
+        since_token: Optional[StreamToken] = None,
+        full_state: bool = False,
+    ) -> SyncResult: ...
+
+    @overload
+    async def current_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        sync_version: Literal[SyncVersion.E2EE_SYNC],
+        since_token: Optional[StreamToken] = None,
+        full_state: bool = False,
+    ) -> E2eeSyncResult: ...
+
+    @overload
     async def current_sync_for_user(
         self,
         sync_config: SyncConfig,
         sync_version: SyncVersion,
         since_token: Optional[StreamToken] = None,
         full_state: bool = False,
-    ) -> SyncResult:
-        """Generates the response body of a sync result, represented as a SyncResult.
+    ) -> Union[SyncResult, E2eeSyncResult]: ...
+
+    async def current_sync_for_user(
+        self,
+        sync_config: SyncConfig,
+        sync_version: SyncVersion,
+        since_token: Optional[StreamToken] = None,
+        full_state: bool = False,
+    ) -> Union[SyncResult, E2eeSyncResult]:
+        """
+        Generates the response body of a sync result, represented as a
+        `SyncResult`/`E2eeSyncResult`.
 
         This is a wrapper around `generate_sync_result` which starts an open tracing
         span to track the sync. See `generate_sync_result` for the next part of your
@@ -474,15 +601,25 @@ class SyncHandler:
             sync_version: Determines what kind of sync response to generate.
             since_token: The point in the stream to sync from.p.
             full_state: Whether to return the full state for each room.
+
         Returns:
             When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
+            When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
         """
         with start_active_span("sync.current_sync_for_user"):
             log_kv({"since_token": since_token})
+
             # Go through the `/sync` v2 path
             if sync_version == SyncVersion.SYNC_V2:
-                sync_result: SyncResult = await self.generate_sync_result(
-                    sync_config, since_token, full_state
+                sync_result: Union[SyncResult, E2eeSyncResult] = (
+                    await self.generate_sync_result(
+                        sync_config, since_token, full_state
+                    )
+                )
+            # Go through the MSC3575 Sliding Sync `/sync/e2ee` path
+            elif sync_version == SyncVersion.E2EE_SYNC:
+                sync_result = await self.generate_e2ee_sync_result(
+                    sync_config, since_token
                 )
             else:
                 raise Exception(
@@ -1691,6 +1828,96 @@ class SyncHandler:
             next_batch=sync_result_builder.now_token,
         )
 
+    async def generate_e2ee_sync_result(
+        self,
+        sync_config: SyncConfig,
+        since_token: Optional[StreamToken] = None,
+    ) -> E2eeSyncResult:
+        """
+        Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result.
+
+        This is represented by a `E2eeSyncResult` struct, which is built from small
+        pieces using a `SyncResultBuilder`. The `sync_result_builder` is passed as a
+        mutable ("inout") parameter to various helper functions. These retrieve and
+        process the data which forms the sync body, often writing to the
+        `sync_result_builder` to store their output.
+
+        At the end, we transfer data from the `sync_result_builder` to a new `E2eeSyncResult`
+        instance to signify that the sync calculation is complete.
+        """
+        user_id = sync_config.user.to_string()
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
+
+        sync_result_builder = await self.get_sync_result_builder(
+            sync_config,
+            since_token,
+            full_state=False,
+        )
+
+        # 1. Calculate `to_device` events
+        await self._generate_sync_entry_for_to_device(sync_result_builder)
+
+        # 2. Calculate `device_lists`
+        # Device list updates are sent if a since token is provided.
+        device_lists = DeviceListUpdates()
+        include_device_list_updates = bool(since_token and since_token.device_list_key)
+        if include_device_list_updates:
+            # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
+            # is used in calculate_user_changes below.
+            #
+            # TODO: Running `_generate_sync_entry_for_rooms()` is a lot of work just to
+            # figure out the membership changes/derived info needed for
+            # `_generate_sync_entry_for_device_list()`. In the future, we should try to
+            # refactor this away.
+            (
+                newly_joined_rooms,
+                newly_left_rooms,
+            ) = await self._generate_sync_entry_for_rooms(sync_result_builder)
+
+            # This uses the sync_result_builder.joined which is set in
+            # `_generate_sync_entry_for_rooms`, if that didn't find any joined
+            # rooms for some reason it is a no-op.
+            (
+                newly_joined_or_invited_or_knocked_users,
+                newly_left_users,
+            ) = sync_result_builder.calculate_user_changes()
+
+            device_lists = await self._generate_sync_entry_for_device_list(
+                sync_result_builder,
+                newly_joined_rooms=newly_joined_rooms,
+                newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
+                newly_left_rooms=newly_left_rooms,
+                newly_left_users=newly_left_users,
+            )
+
+        # 3. Calculate `device_one_time_keys_count` and `device_unused_fallback_key_types`
+        device_id = sync_config.device_id
+        one_time_keys_count: JsonMapping = {}
+        unused_fallback_key_types: List[str] = []
+        if device_id:
+            # TODO: We should have a way to let clients differentiate between the states of:
+            #   * no change in OTK count since the provided since token
+            #   * the server has zero OTKs left for this device
+            #  Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
+            one_time_keys_count = await self.store.count_e2e_one_time_keys(
+                user_id, device_id
+            )
+            unused_fallback_key_types = list(
+                await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
+            )
+
+        return E2eeSyncResult(
+            to_device=sync_result_builder.to_device,
+            device_lists=device_lists,
+            device_one_time_keys_count=one_time_keys_count,
+            device_unused_fallback_key_types=unused_fallback_key_types,
+            next_batch=sync_result_builder.now_token,
+        )
+
     async def get_sync_result_builder(
         self,
         sync_config: SyncConfig,
@@ -1889,7 +2116,7 @@ class SyncHandler:
         users_that_have_changed = (
             await self._device_handler.get_device_changes_in_shared_rooms(
                 user_id,
-                sync_result_builder.joined_room_ids,
+                joined_room_ids,
                 from_token=since_token,
                 now_token=sync_result_builder.now_token,
             )