summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-05-16 05:36:54 -0500
committerGitHub <noreply@github.com>2024-05-16 11:36:54 +0100
commitd2d48cce85556753f8443d72aafe697c477c217b (patch)
tree845b22679ca35f5d3f49c2353f6f2e3577ffa52a /synapse
parentFix request path for `federation_whitelist_endpoint_enabled` option in docume... (diff)
downloadsynapse-d2d48cce85556753f8443d72aafe697c477c217b.tar.xz
Refactor Sync handler to be able to return different sync responses (`SyncVersion`) (#17200)
Refactor Sync handler to be able to be able to return different sync
responses (`SyncVersion`). Preparation to be able support sync v2 and a
new Sliding Sync `/sync/e2ee` endpoint which returns a subset of sync
v2.

Split upon request:
https://github.com/element-hq/synapse/pull/17167#discussion_r1601497279

Split from https://github.com/element-hq/synapse/pull/17167 where we
will add `SyncVersion.E2EE_SYNC` and a new type of sync response.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/sync.py65
-rw-r--r--synapse/rest/client/sync.py2
2 files changed, 60 insertions, 7 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0bef58351c..53fe2a6a53 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@
 #
 import itertools
 import logging
+from enum import Enum
 from typing import (
     TYPE_CHECKING,
     AbstractSet,
@@ -112,6 +113,23 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
 SyncRequestKey = Tuple[Any, ...]
 
 
+class SyncVersion(Enum):
+    """
+    Enum for specifying the version of sync request. This is used to key which type of
+    sync response that we are generating.
+
+    This is different than the `sync_type` you might see used in other code below; which
+    specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
+    incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
+    """
+
+    # These string values are semantically significant because they are used in the the
+    # metrics
+
+    # Traditional `/sync` endpoint
+    SYNC_V2 = "sync_v2"
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class SyncConfig:
     user: UserID
@@ -309,6 +327,7 @@ class SyncHandler:
         self,
         requester: Requester,
         sync_config: SyncConfig,
+        sync_version: SyncVersion,
         since_token: Optional[StreamToken] = None,
         timeout: int = 0,
         full_state: bool = False,
@@ -316,6 +335,17 @@ class SyncHandler:
         """Get the sync for a client if we have new data for it now. Otherwise
         wait for new data to arrive on the server. If the timeout expires, then
         return an empty sync result.
+
+        Args:
+            requester: The user requesting the sync response.
+            sync_config: Config/info necessary to process the sync request.
+            sync_version: Determines what kind of sync response to generate.
+            since_token: The point in the stream to sync from.
+            timeout: How long to wait for new data to arrive before giving up.
+            full_state: Whether to return the full state for each room.
+
+        Returns:
+            When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
         """
         # If the user is not part of the mau group, then check that limits have
         # not been exceeded (if not part of the group by this point, almost certain
@@ -327,6 +357,7 @@ class SyncHandler:
             sync_config.request_key,
             self._wait_for_sync_for_user,
             sync_config,
+            sync_version,
             since_token,
             timeout,
             full_state,
@@ -338,6 +369,7 @@ class SyncHandler:
     async def _wait_for_sync_for_user(
         self,
         sync_config: SyncConfig,
+        sync_version: SyncVersion,
         since_token: Optional[StreamToken],
         timeout: int,
         full_state: bool,
@@ -363,9 +395,11 @@ class SyncHandler:
         else:
             sync_type = "incremental_sync"
 
+        sync_label = f"{sync_version}:{sync_type}"
+
         context = current_context()
         if context:
-            context.tag = sync_type
+            context.tag = sync_label
 
         # if we have a since token, delete any to-device messages before that token
         # (since we now know that the device has received them)
@@ -384,14 +418,16 @@ class SyncHandler:
             # we are going to return immediately, so don't bother calling
             # notifier.wait_for_events.
             result: SyncResult = await self.current_sync_for_user(
-                sync_config, since_token, full_state=full_state
+                sync_config, sync_version, since_token, full_state=full_state
             )
         else:
             # Otherwise, we wait for something to happen and report it to the user.
             async def current_sync_callback(
                 before_token: StreamToken, after_token: StreamToken
             ) -> SyncResult:
-                return await self.current_sync_for_user(sync_config, since_token)
+                return await self.current_sync_for_user(
+                    sync_config, sync_version, since_token
+                )
 
             result = await self.notifier.wait_for_events(
                 sync_config.user.to_string(),
@@ -416,13 +452,14 @@ class SyncHandler:
                 lazy_loaded = "true"
             else:
                 lazy_loaded = "false"
-            non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+            non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()
 
         return result
 
     async def current_sync_for_user(
         self,
         sync_config: SyncConfig,
+        sync_version: SyncVersion,
         since_token: Optional[StreamToken] = None,
         full_state: bool = False,
     ) -> SyncResult:
@@ -431,12 +468,26 @@ class SyncHandler:
         This is a wrapper around `generate_sync_result` which starts an open tracing
         span to track the sync. See `generate_sync_result` for the next part of your
         indoctrination.
+
+        Args:
+            sync_config: Config/info necessary to process the sync request.
+            sync_version: Determines what kind of sync response to generate.
+            since_token: The point in the stream to sync from.p.
+            full_state: Whether to return the full state for each room.
+        Returns:
+            When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
         """
         with start_active_span("sync.current_sync_for_user"):
             log_kv({"since_token": since_token})
-            sync_result = await self.generate_sync_result(
-                sync_config, since_token, full_state
-            )
+            # Go through the `/sync` v2 path
+            if sync_version == SyncVersion.SYNC_V2:
+                sync_result: SyncResult = await self.generate_sync_result(
+                    sync_config, since_token, full_state
+                )
+            else:
+                raise Exception(
+                    f"Unknown sync_version (this is a Synapse problem): {sync_version}"
+                )
 
             set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
             return sync_result
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index d19aaf0e22..d0713536e1 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -40,6 +40,7 @@ from synapse.handlers.sync import (
     KnockedSyncResult,
     SyncConfig,
     SyncResult,
+    SyncVersion,
 )
 from synapse.http.server import HttpServer
 from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
@@ -232,6 +233,7 @@ class SyncRestServlet(RestServlet):
             sync_result = await self.sync_handler.wait_for_sync_for_user(
                 requester,
                 sync_config,
+                SyncVersion.SYNC_V2,
                 since_token=since_token,
                 timeout=timeout,
                 full_state=full_state,