diff options
author | Eric Eastwood <eric.eastwood@beta.gouv.fr> | 2024-05-16 05:36:54 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-16 11:36:54 +0100 |
commit | d2d48cce85556753f8443d72aafe697c477c217b (patch) | |
tree | 845b22679ca35f5d3f49c2353f6f2e3577ffa52a /synapse | |
parent | Fix request path for `federation_whitelist_endpoint_enabled` option in docume... (diff) | |
download | synapse-d2d48cce85556753f8443d72aafe697c477c217b.tar.xz |
Refactor Sync handler to be able to return different sync responses (`SyncVersion`) (#17200)
Refactor Sync handler to be able to be able to return different sync responses (`SyncVersion`). Preparation to be able support sync v2 and a new Sliding Sync `/sync/e2ee` endpoint which returns a subset of sync v2. Split upon request: https://github.com/element-hq/synapse/pull/17167#discussion_r1601497279 Split from https://github.com/element-hq/synapse/pull/17167 where we will add `SyncVersion.E2EE_SYNC` and a new type of sync response.
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/sync.py | 65 | ||||
-rw-r--r-- | synapse/rest/client/sync.py | 2 |
2 files changed, 60 insertions, 7 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0bef58351c..53fe2a6a53 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -20,6 +20,7 @@ # import itertools import logging +from enum import Enum from typing import ( TYPE_CHECKING, AbstractSet, @@ -112,6 +113,23 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100 SyncRequestKey = Tuple[Any, ...] +class SyncVersion(Enum): + """ + Enum for specifying the version of sync request. This is used to key which type of + sync response that we are generating. + + This is different than the `sync_type` you might see used in other code below; which + specifies the sub-type sync request (e.g. initial_sync, full_state_sync, + incremental_sync) and is really only relevant for the `/sync` v2 endpoint. + """ + + # These string values are semantically significant because they are used in the the + # metrics + + # Traditional `/sync` endpoint + SYNC_V2 = "sync_v2" + + @attr.s(slots=True, frozen=True, auto_attribs=True) class SyncConfig: user: UserID @@ -309,6 +327,7 @@ class SyncHandler: self, requester: Requester, sync_config: SyncConfig, + sync_version: SyncVersion, since_token: Optional[StreamToken] = None, timeout: int = 0, full_state: bool = False, @@ -316,6 +335,17 @@ class SyncHandler: """Get the sync for a client if we have new data for it now. Otherwise wait for new data to arrive on the server. If the timeout expires, then return an empty sync result. + + Args: + requester: The user requesting the sync response. + sync_config: Config/info necessary to process the sync request. + sync_version: Determines what kind of sync response to generate. + since_token: The point in the stream to sync from. + timeout: How long to wait for new data to arrive before giving up. + full_state: Whether to return the full state for each room. + + Returns: + When `SyncVersion.SYNC_V2`, returns a full `SyncResult`. """ # If the user is not part of the mau group, then check that limits have # not been exceeded (if not part of the group by this point, almost certain @@ -327,6 +357,7 @@ class SyncHandler: sync_config.request_key, self._wait_for_sync_for_user, sync_config, + sync_version, since_token, timeout, full_state, @@ -338,6 +369,7 @@ class SyncHandler: async def _wait_for_sync_for_user( self, sync_config: SyncConfig, + sync_version: SyncVersion, since_token: Optional[StreamToken], timeout: int, full_state: bool, @@ -363,9 +395,11 @@ class SyncHandler: else: sync_type = "incremental_sync" + sync_label = f"{sync_version}:{sync_type}" + context = current_context() if context: - context.tag = sync_type + context.tag = sync_label # if we have a since token, delete any to-device messages before that token # (since we now know that the device has received them) @@ -384,14 +418,16 @@ class SyncHandler: # we are going to return immediately, so don't bother calling # notifier.wait_for_events. result: SyncResult = await self.current_sync_for_user( - sync_config, since_token, full_state=full_state + sync_config, sync_version, since_token, full_state=full_state ) else: # Otherwise, we wait for something to happen and report it to the user. async def current_sync_callback( before_token: StreamToken, after_token: StreamToken ) -> SyncResult: - return await self.current_sync_for_user(sync_config, since_token) + return await self.current_sync_for_user( + sync_config, sync_version, since_token + ) result = await self.notifier.wait_for_events( sync_config.user.to_string(), @@ -416,13 +452,14 @@ class SyncHandler: lazy_loaded = "true" else: lazy_loaded = "false" - non_empty_sync_counter.labels(sync_type, lazy_loaded).inc() + non_empty_sync_counter.labels(sync_label, lazy_loaded).inc() return result async def current_sync_for_user( self, sync_config: SyncConfig, + sync_version: SyncVersion, since_token: Optional[StreamToken] = None, full_state: bool = False, ) -> SyncResult: @@ -431,12 +468,26 @@ class SyncHandler: This is a wrapper around `generate_sync_result` which starts an open tracing span to track the sync. See `generate_sync_result` for the next part of your indoctrination. + + Args: + sync_config: Config/info necessary to process the sync request. + sync_version: Determines what kind of sync response to generate. + since_token: The point in the stream to sync from.p. + full_state: Whether to return the full state for each room. + Returns: + When `SyncVersion.SYNC_V2`, returns a full `SyncResult`. """ with start_active_span("sync.current_sync_for_user"): log_kv({"since_token": since_token}) - sync_result = await self.generate_sync_result( - sync_config, since_token, full_state - ) + # Go through the `/sync` v2 path + if sync_version == SyncVersion.SYNC_V2: + sync_result: SyncResult = await self.generate_sync_result( + sync_config, since_token, full_state + ) + else: + raise Exception( + f"Unknown sync_version (this is a Synapse problem): {sync_version}" + ) set_tag(SynapseTags.SYNC_RESULT, bool(sync_result)) return sync_result diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index d19aaf0e22..d0713536e1 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -40,6 +40,7 @@ from synapse.handlers.sync import ( KnockedSyncResult, SyncConfig, SyncResult, + SyncVersion, ) from synapse.http.server import HttpServer from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string @@ -232,6 +233,7 @@ class SyncRestServlet(RestServlet): sync_result = await self.sync_handler.wait_for_sync_for_user( requester, sync_config, + SyncVersion.SYNC_V2, since_token=since_token, timeout=timeout, full_state=full_state, |