diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b7917a99d6..ac5bddd52f 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,11 +28,14 @@ from typing import (
Dict,
FrozenSet,
List,
+ Literal,
Mapping,
Optional,
Sequence,
Set,
Tuple,
+ Union,
+ overload,
)
import attr
@@ -128,6 +131,8 @@ class SyncVersion(Enum):
# Traditional `/sync` endpoint
SYNC_V2 = "sync_v2"
+ # Part of MSC3575 Sliding Sync
+ E2EE_SYNC = "e2ee_sync"
@attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -280,6 +285,26 @@ class SyncResult:
)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class E2eeSyncResult:
+ """
+ Attributes:
+ next_batch: Token for the next sync
+ to_device: List of direct messages for the device.
+ device_lists: List of user_ids whose devices have changed
+ device_one_time_keys_count: Dict of algorithm to count for one time keys
+ for this device
+ device_unused_fallback_key_types: List of key types that have an unused fallback
+ key
+ """
+
+ next_batch: StreamToken
+ to_device: List[JsonDict]
+ device_lists: DeviceListUpdates
+ device_one_time_keys_count: JsonMapping
+ device_unused_fallback_key_types: List[str]
+
+
class SyncHandler:
def __init__(self, hs: "HomeServer"):
self.hs_config = hs.config
@@ -322,6 +347,31 @@ class SyncHandler:
self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
+ @overload
+ async def wait_for_sync_for_user(
+ self,
+ requester: Requester,
+ sync_config: SyncConfig,
+ sync_version: Literal[SyncVersion.SYNC_V2],
+ request_key: SyncRequestKey,
+ since_token: Optional[StreamToken] = None,
+ timeout: int = 0,
+ full_state: bool = False,
+ ) -> SyncResult: ...
+
+ @overload
+ async def wait_for_sync_for_user(
+ self,
+ requester: Requester,
+ sync_config: SyncConfig,
+ sync_version: Literal[SyncVersion.E2EE_SYNC],
+ request_key: SyncRequestKey,
+ since_token: Optional[StreamToken] = None,
+ timeout: int = 0,
+ full_state: bool = False,
+ ) -> E2eeSyncResult: ...
+
+ @overload
async def wait_for_sync_for_user(
self,
requester: Requester,
@@ -331,7 +381,18 @@ class SyncHandler:
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
- ) -> SyncResult:
+ ) -> Union[SyncResult, E2eeSyncResult]: ...
+
+ async def wait_for_sync_for_user(
+ self,
+ requester: Requester,
+ sync_config: SyncConfig,
+ sync_version: SyncVersion,
+ request_key: SyncRequestKey,
+ since_token: Optional[StreamToken] = None,
+ timeout: int = 0,
+ full_state: bool = False,
+ ) -> Union[SyncResult, E2eeSyncResult]:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
@@ -344,8 +405,10 @@ class SyncHandler:
since_token: The point in the stream to sync from.
timeout: How long to wait for new data to arrive before giving up.
full_state: Whether to return the full state for each room.
+
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
+ When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
@@ -366,6 +429,29 @@ class SyncHandler:
logger.debug("Returning sync response for %s", user_id)
return res
+ @overload
+ async def _wait_for_sync_for_user(
+ self,
+ sync_config: SyncConfig,
+ sync_version: Literal[SyncVersion.SYNC_V2],
+ since_token: Optional[StreamToken],
+ timeout: int,
+ full_state: bool,
+ cache_context: ResponseCacheContext[SyncRequestKey],
+ ) -> SyncResult: ...
+
+ @overload
+ async def _wait_for_sync_for_user(
+ self,
+ sync_config: SyncConfig,
+ sync_version: Literal[SyncVersion.E2EE_SYNC],
+ since_token: Optional[StreamToken],
+ timeout: int,
+ full_state: bool,
+ cache_context: ResponseCacheContext[SyncRequestKey],
+ ) -> E2eeSyncResult: ...
+
+ @overload
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
@@ -374,7 +460,17 @@ class SyncHandler:
timeout: int,
full_state: bool,
cache_context: ResponseCacheContext[SyncRequestKey],
- ) -> SyncResult:
+ ) -> Union[SyncResult, E2eeSyncResult]: ...
+
+ async def _wait_for_sync_for_user(
+ self,
+ sync_config: SyncConfig,
+ sync_version: SyncVersion,
+ since_token: Optional[StreamToken],
+ timeout: int,
+ full_state: bool,
+ cache_context: ResponseCacheContext[SyncRequestKey],
+ ) -> Union[SyncResult, E2eeSyncResult]:
"""The start of the machinery that produces a /sync response.
See https://spec.matrix.org/v1.1/client-server-api/#syncing for full details.
@@ -417,14 +513,16 @@ class SyncHandler:
if timeout == 0 or since_token is None or full_state:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
- result: SyncResult = await self.current_sync_for_user(
- sync_config, sync_version, since_token, full_state=full_state
+ result: Union[SyncResult, E2eeSyncResult] = (
+ await self.current_sync_for_user(
+ sync_config, sync_version, since_token, full_state=full_state
+ )
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
- ) -> SyncResult:
+ ) -> Union[SyncResult, E2eeSyncResult]:
return await self.current_sync_for_user(
sync_config, sync_version, since_token
)
@@ -456,14 +554,43 @@ class SyncHandler:
return result
+ @overload
+ async def current_sync_for_user(
+ self,
+ sync_config: SyncConfig,
+ sync_version: Literal[SyncVersion.SYNC_V2],
+ since_token: Optional[StreamToken] = None,
+ full_state: bool = False,
+ ) -> SyncResult: ...
+
+ @overload
+ async def current_sync_for_user(
+ self,
+ sync_config: SyncConfig,
+ sync_version: Literal[SyncVersion.E2EE_SYNC],
+ since_token: Optional[StreamToken] = None,
+ full_state: bool = False,
+ ) -> E2eeSyncResult: ...
+
+ @overload
async def current_sync_for_user(
self,
sync_config: SyncConfig,
sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
- ) -> SyncResult:
- """Generates the response body of a sync result, represented as a SyncResult.
+ ) -> Union[SyncResult, E2eeSyncResult]: ...
+
+ async def current_sync_for_user(
+ self,
+ sync_config: SyncConfig,
+ sync_version: SyncVersion,
+ since_token: Optional[StreamToken] = None,
+ full_state: bool = False,
+ ) -> Union[SyncResult, E2eeSyncResult]:
+ """
+ Generates the response body of a sync result, represented as a
+ `SyncResult`/`E2eeSyncResult`.
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
@@ -474,15 +601,25 @@ class SyncHandler:
sync_version: Determines what kind of sync response to generate.
since_token: The point in the stream to sync from.p.
full_state: Whether to return the full state for each room.
+
Returns:
When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
+ When `SyncVersion.E2EE_SYNC`, returns a `E2eeSyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
+
# Go through the `/sync` v2 path
if sync_version == SyncVersion.SYNC_V2:
- sync_result: SyncResult = await self.generate_sync_result(
- sync_config, since_token, full_state
+ sync_result: Union[SyncResult, E2eeSyncResult] = (
+ await self.generate_sync_result(
+ sync_config, since_token, full_state
+ )
+ )
+ # Go through the MSC3575 Sliding Sync `/sync/e2ee` path
+ elif sync_version == SyncVersion.E2EE_SYNC:
+ sync_result = await self.generate_e2ee_sync_result(
+ sync_config, since_token
)
else:
raise Exception(
@@ -1691,6 +1828,96 @@ class SyncHandler:
next_batch=sync_result_builder.now_token,
)
+ async def generate_e2ee_sync_result(
+ self,
+ sync_config: SyncConfig,
+ since_token: Optional[StreamToken] = None,
+ ) -> E2eeSyncResult:
+ """
+ Generates the response body of a MSC3575 Sliding Sync `/sync/e2ee` result.
+
+ This is represented by a `E2eeSyncResult` struct, which is built from small
+ pieces using a `SyncResultBuilder`. The `sync_result_builder` is passed as a
+ mutable ("inout") parameter to various helper functions. These retrieve and
+ process the data which forms the sync body, often writing to the
+ `sync_result_builder` to store their output.
+
+ At the end, we transfer data from the `sync_result_builder` to a new `E2eeSyncResult`
+ instance to signify that the sync calculation is complete.
+ """
+ user_id = sync_config.user.to_string()
+ app_service = self.store.get_app_service_by_user_id(user_id)
+ if app_service:
+ # We no longer support AS users using /sync directly.
+ # See https://github.com/matrix-org/matrix-doc/issues/1144
+ raise NotImplementedError()
+
+ sync_result_builder = await self.get_sync_result_builder(
+ sync_config,
+ since_token,
+ full_state=False,
+ )
+
+ # 1. Calculate `to_device` events
+ await self._generate_sync_entry_for_to_device(sync_result_builder)
+
+ # 2. Calculate `device_lists`
+ # Device list updates are sent if a since token is provided.
+ device_lists = DeviceListUpdates()
+ include_device_list_updates = bool(since_token and since_token.device_list_key)
+ if include_device_list_updates:
+ # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which
+ # is used in calculate_user_changes below.
+ #
+ # TODO: Running `_generate_sync_entry_for_rooms()` is a lot of work just to
+ # figure out the membership changes/derived info needed for
+ # `_generate_sync_entry_for_device_list()`. In the future, we should try to
+ # refactor this away.
+ (
+ newly_joined_rooms,
+ newly_left_rooms,
+ ) = await self._generate_sync_entry_for_rooms(sync_result_builder)
+
+ # This uses the sync_result_builder.joined which is set in
+ # `_generate_sync_entry_for_rooms`, if that didn't find any joined
+ # rooms for some reason it is a no-op.
+ (
+ newly_joined_or_invited_or_knocked_users,
+ newly_left_users,
+ ) = sync_result_builder.calculate_user_changes()
+
+ device_lists = await self._generate_sync_entry_for_device_list(
+ sync_result_builder,
+ newly_joined_rooms=newly_joined_rooms,
+ newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users,
+ newly_left_rooms=newly_left_rooms,
+ newly_left_users=newly_left_users,
+ )
+
+ # 3. Calculate `device_one_time_keys_count` and `device_unused_fallback_key_types`
+ device_id = sync_config.device_id
+ one_time_keys_count: JsonMapping = {}
+ unused_fallback_key_types: List[str] = []
+ if device_id:
+ # TODO: We should have a way to let clients differentiate between the states of:
+ # * no change in OTK count since the provided since token
+ # * the server has zero OTKs left for this device
+ # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
+ one_time_keys_count = await self.store.count_e2e_one_time_keys(
+ user_id, device_id
+ )
+ unused_fallback_key_types = list(
+ await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
+ )
+
+ return E2eeSyncResult(
+ to_device=sync_result_builder.to_device,
+ device_lists=device_lists,
+ device_one_time_keys_count=one_time_keys_count,
+ device_unused_fallback_key_types=unused_fallback_key_types,
+ next_batch=sync_result_builder.now_token,
+ )
+
async def get_sync_result_builder(
self,
sync_config: SyncConfig,
@@ -1889,7 +2116,7 @@ class SyncHandler:
users_that_have_changed = (
await self._device_handler.get_device_changes_in_shared_rooms(
user_id,
- sync_result_builder.joined_room_ids,
+ joined_room_ids,
from_token=since_token,
now_token=sync_result_builder.now_token,
)
|