diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index a65299bd22..ceed40f90d 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -24,6 +24,7 @@ from synapse.api.constants import AccountDataTypes, EventTypes, Membership
from synapse.api.filtering import FilterCollection
from synapse.events import EventBase
from synapse.logging.context import current_context
+from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
from synapse.push.clientformat import format_push_rules_for_user
from synapse.storage.roommember import MemberSummary
from synapse.storage.state import StateFilter
@@ -81,7 +82,7 @@ class SyncConfig:
filter_collection = attr.ib(type=FilterCollection)
is_guest = attr.ib(type=bool)
request_key = attr.ib(type=Tuple[Any, ...])
- device_id = attr.ib(type=str)
+ device_id = attr.ib(type=Optional[str])
@attr.s(slots=True, frozen=True)
@@ -252,13 +253,13 @@ class SyncHandler:
self.storage = hs.get_storage()
self.state_store = self.storage.state
- # ExpiringCache((User, Device)) -> LruCache(state_key => event_id)
+ # ExpiringCache((User, Device)) -> LruCache(user_id => event_id)
self.lazy_loaded_members_cache = ExpiringCache(
"lazy_loaded_members_cache",
self.clock,
max_len=0,
expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE,
- )
+ ) # type: ExpiringCache[Tuple[str, Optional[str]], LruCache[str, str]]
async def wait_for_sync_for_user(
self,
@@ -341,7 +342,14 @@ class SyncHandler:
full_state: bool = False,
) -> SyncResult:
"""Get the sync for client needed to match what the server has now."""
- return await self.generate_sync_result(sync_config, since_token, full_state)
+ with start_active_span("current_sync_for_user"):
+ log_kv({"since_token": since_token})
+ sync_result = await self.generate_sync_result(
+ sync_config, since_token, full_state
+ )
+
+ set_tag(SynapseTags.SYNC_RESULT, bool(sync_result))
+ return sync_result
async def push_rules_for_user(self, user: UserID) -> JsonDict:
user_id = user.to_string()
@@ -724,8 +732,12 @@ class SyncHandler:
return summary
- def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache:
- cache = self.lazy_loaded_members_cache.get(cache_key)
+ def get_lazy_loaded_members_cache(
+ self, cache_key: Tuple[str, Optional[str]]
+ ) -> LruCache[str, str]:
+ cache = self.lazy_loaded_members_cache.get(
+ cache_key
+ ) # type: Optional[LruCache[str, str]]
if cache is None:
logger.debug("creating LruCache for %r", cache_key)
cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE)
@@ -963,6 +975,7 @@ class SyncHandler:
# to query up to a given point.
# Always use the `now_token` in `SyncResultBuilder`
now_token = self.event_sources.get_current_token()
+ log_kv({"now_token": now_token})
logger.debug(
"Calculating sync response for %r between %s and %s",
@@ -1224,6 +1237,13 @@ class SyncHandler:
user_id, device_id, since_stream_id, now_token.to_device_key
)
+ for message in messages:
+ # We pop here as we shouldn't be sending the message ID down
+ # `/sync`
+ message_id = message.pop("message_id", None)
+ if message_id:
+ set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id)
+
logger.debug(
"Returning %d to-device messages between %d and %d (current token: %d)",
len(messages),
@@ -1980,8 +2000,10 @@ class SyncHandler:
logger.info("User joined room after current token: %s", room_id)
- extrems = await self.store.get_forward_extremeties_for_room(
- room_id, event_pos.stream
+ extrems = (
+ await self.store.get_forward_extremities_for_room_at_stream_ordering(
+ room_id, event_pos.stream
+ )
)
users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
if user_id in users_in_room:
|