summary refs log tree commit diff
path: root/synapse/handlers/sync.py
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-04-13 16:10:07 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2021-04-13 16:10:07 +0100
commit77866a5f5f9e805ebe599a5dc7d19152b68a0f01 (patch)
treee81016df9807f97c1c8f88b82ad9c73cbaa94586 /synapse/handlers/sync.py
parentRevert "Patch to temporarily drop cross-user m.key_share_requests (#8675)" (diff)
parentMerge branch 'erikj/fix_stalled_catchup' into matrix-org-hotfixes (diff)
downloadsynapse-77866a5f5f9e805ebe599a5dc7d19152b68a0f01.tar.xz
Merge branch 'matrix-org-hotfixes' of github.com:matrix-org/synapse into matrix-org-hotfixes
Diffstat (limited to 'synapse/handlers/sync.py')
-rw-r--r--synapse/handlers/sync.py38
1 files changed, 30 insertions, 8 deletions
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py

index a65299bd22..ceed40f90d 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -24,6 +24,7 @@ from synapse.api.constants import AccountDataTypes, EventTypes, Membership from synapse.api.filtering import FilterCollection from synapse.events import EventBase from synapse.logging.context import current_context +from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter @@ -81,7 +82,7 @@ class SyncConfig: filter_collection = attr.ib(type=FilterCollection) is_guest = attr.ib(type=bool) request_key = attr.ib(type=Tuple[Any, ...]) - device_id = attr.ib(type=str) + device_id = attr.ib(type=Optional[str]) @attr.s(slots=True, frozen=True) @@ -252,13 +253,13 @@ class SyncHandler: self.storage = hs.get_storage() self.state_store = self.storage.state - # ExpiringCache((User, Device)) -> LruCache(state_key => event_id) + # ExpiringCache((User, Device)) -> LruCache(user_id => event_id) self.lazy_loaded_members_cache = ExpiringCache( "lazy_loaded_members_cache", self.clock, max_len=0, expiry_ms=LAZY_LOADED_MEMBERS_CACHE_MAX_AGE, - ) + ) # type: ExpiringCache[Tuple[str, Optional[str]], LruCache[str, str]] async def wait_for_sync_for_user( self, @@ -341,7 +342,14 @@ class SyncHandler: full_state: bool = False, ) -> SyncResult: """Get the sync for client needed to match what the server has now.""" - return await self.generate_sync_result(sync_config, since_token, full_state) + with start_active_span("current_sync_for_user"): + log_kv({"since_token": since_token}) + sync_result = await self.generate_sync_result( + sync_config, since_token, full_state + ) + + set_tag(SynapseTags.SYNC_RESULT, bool(sync_result)) + return sync_result async def push_rules_for_user(self, user: UserID) -> JsonDict: user_id = user.to_string() @@ -724,8 +732,12 @@ class SyncHandler: return summary - def get_lazy_loaded_members_cache(self, cache_key: Tuple[str, str]) -> LruCache: - cache = self.lazy_loaded_members_cache.get(cache_key) + def get_lazy_loaded_members_cache( + self, cache_key: Tuple[str, Optional[str]] + ) -> LruCache[str, str]: + cache = self.lazy_loaded_members_cache.get( + cache_key + ) # type: Optional[LruCache[str, str]] if cache is None: logger.debug("creating LruCache for %r", cache_key) cache = LruCache(LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE) @@ -963,6 +975,7 @@ class SyncHandler: # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` now_token = self.event_sources.get_current_token() + log_kv({"now_token": now_token}) logger.debug( "Calculating sync response for %r between %s and %s", @@ -1224,6 +1237,13 @@ class SyncHandler: user_id, device_id, since_stream_id, now_token.to_device_key ) + for message in messages: + # We pop here as we shouldn't be sending the message ID down + # `/sync` + message_id = message.pop("message_id", None) + if message_id: + set_tag(SynapseTags.TO_DEVICE_MESSAGE_ID, message_id) + logger.debug( "Returning %d to-device messages between %d and %d (current token: %d)", len(messages), @@ -1980,8 +2000,10 @@ class SyncHandler: logger.info("User joined room after current token: %s", room_id) - extrems = await self.store.get_forward_extremeties_for_room( - room_id, event_pos.stream + extrems = ( + await self.store.get_forward_extremities_for_room_at_stream_ordering( + room_id, event_pos.stream + ) ) users_in_room = await self.state.get_current_users_in_room(room_id, extrems) if user_id in users_in_room: