diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0bef58351c..53fe2a6a53 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -20,6 +20,7 @@
#
import itertools
import logging
+from enum import Enum
from typing import (
TYPE_CHECKING,
AbstractSet,
@@ -112,6 +113,23 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
SyncRequestKey = Tuple[Any, ...]
+class SyncVersion(Enum):
+ """
+ Enum for specifying the version of sync request. This is used to key which type of
+ sync response that we are generating.
+
+ This is different than the `sync_type` you might see used in other code below; which
+ specifies the sub-type sync request (e.g. initial_sync, full_state_sync,
+ incremental_sync) and is really only relevant for the `/sync` v2 endpoint.
+ """
+
+ # These string values are semantically significant because they are used in the the
+ # metrics
+
+ # Traditional `/sync` endpoint
+ SYNC_V2 = "sync_v2"
+
+
@attr.s(slots=True, frozen=True, auto_attribs=True)
class SyncConfig:
user: UserID
@@ -309,6 +327,7 @@ class SyncHandler:
self,
requester: Requester,
sync_config: SyncConfig,
+ sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
timeout: int = 0,
full_state: bool = False,
@@ -316,6 +335,17 @@ class SyncHandler:
"""Get the sync for a client if we have new data for it now. Otherwise
wait for new data to arrive on the server. If the timeout expires, then
return an empty sync result.
+
+ Args:
+ requester: The user requesting the sync response.
+ sync_config: Config/info necessary to process the sync request.
+ sync_version: Determines what kind of sync response to generate.
+ since_token: The point in the stream to sync from.
+ timeout: How long to wait for new data to arrive before giving up.
+ full_state: Whether to return the full state for each room.
+
+ Returns:
+ When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
# If the user is not part of the mau group, then check that limits have
# not been exceeded (if not part of the group by this point, almost certain
@@ -327,6 +357,7 @@ class SyncHandler:
sync_config.request_key,
self._wait_for_sync_for_user,
sync_config,
+ sync_version,
since_token,
timeout,
full_state,
@@ -338,6 +369,7 @@ class SyncHandler:
async def _wait_for_sync_for_user(
self,
sync_config: SyncConfig,
+ sync_version: SyncVersion,
since_token: Optional[StreamToken],
timeout: int,
full_state: bool,
@@ -363,9 +395,11 @@ class SyncHandler:
else:
sync_type = "incremental_sync"
+ sync_label = f"{sync_version}:{sync_type}"
+
context = current_context()
if context:
- context.tag = sync_type
+ context.tag = sync_label
# if we have a since token, delete any to-device messages before that token
# (since we now know that the device has received them)
@@ -384,14 +418,16 @@ class SyncHandler:
# we are going to return immediately, so don't bother calling
# notifier.wait_for_events.
result: SyncResult = await self.current_sync_for_user(
- sync_config, since_token, full_state=full_state
+ sync_config, sync_version, since_token, full_state=full_state
)
else:
# Otherwise, we wait for something to happen and report it to the user.
async def current_sync_callback(
before_token: StreamToken, after_token: StreamToken
) -> SyncResult:
- return await self.current_sync_for_user(sync_config, since_token)
+ return await self.current_sync_for_user(
+ sync_config, sync_version, since_token
+ )
result = await self.notifier.wait_for_events(
sync_config.user.to_string(),
@@ -416,13 +452,14 @@ class SyncHandler:
lazy_loaded = "true"
else:
lazy_loaded = "false"
- non_empty_sync_counter.labels(sync_type, lazy_loaded).inc()
+ non_empty_sync_counter.labels(sync_label, lazy_loaded).inc()
return result
async def current_sync_for_user(
self,
sync_config: SyncConfig,
+ sync_version: SyncVersion,
since_token: Optional[StreamToken] = None,
full_state: bool = False,
) -> SyncResult:
@@ -431,12 +468,26 @@ class SyncHandler:
This is a wrapper around `generate_sync_result` which starts an open tracing
span to track the sync. See `generate_sync_result` for the next part of your
indoctrination.
+
+ Args:
+ sync_config: Config/info necessary to process the sync request.
+ sync_version: Determines what kind of sync response to generate.
+ since_token: The point in the stream to sync from.p.
+ full_state: Whether to return the full state for each room.
+ Returns:
+ When `SyncVersion.SYNC_V2`, returns a full `SyncResult`.
"""
with start_active_span("sync.current_sync_for_user"):
log_kv({"since_token": since_token})
- sync_result = await self.generate_sync_result(
- sync_config, since_token, full_state
- )
+ # Go through the `/sync` v2 path
+ if sync_version == SyncVersion.SYNC_V2:
+ sync_result: SyncResult = await self.generate_sync_result(
+ sync_config, since_token, full_state
+ )
+ else:
+ raise Exception(
+ f"Unknown sync_version (this is a Synapse problem): {sync_version}"
+ )
set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
return sync_result
|