summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/ratelimiting.py4
-rw-r--r--synapse/handlers/sync.py276
-rw-r--r--synapse/rest/client/sync.py2
3 files changed, 159 insertions, 123 deletions
diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py

index a73626bc86..a99a9e09fc 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py
@@ -316,6 +316,10 @@ class Ratelimiter: ) if not allowed: + # We pause for a bit here to stop clients from "tight-looping" on + # retrying their request. + await self.clock.sleep(0.5) + raise LimitExceededError( limiter_name=self._limiter_name, retry_after_ms=int(1000 * (time_allowed - time_now_s)), diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2bd1b8de88..d3d40e8682 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py
@@ -135,7 +135,6 @@ class SyncConfig: user: UserID filter_collection: FilterCollection is_guest: bool - request_key: SyncRequestKey device_id: Optional[str] @@ -328,6 +327,7 @@ class SyncHandler: requester: Requester, sync_config: SyncConfig, sync_version: SyncVersion, + request_key: SyncRequestKey, since_token: Optional[StreamToken] = None, timeout: int = 0, full_state: bool = False, @@ -340,10 +340,10 @@ class SyncHandler: requester: The user requesting the sync response. sync_config: Config/info necessary to process the sync request. sync_version: Determines what kind of sync response to generate. + request_key: The key to use for caching the response. since_token: The point in the stream to sync from. timeout: How long to wait for new data to arrive before giving up. full_state: Whether to return the full state for each room. - Returns: When `SyncVersion.SYNC_V2`, returns a full `SyncResult`. """ @@ -354,7 +354,7 @@ class SyncHandler: await self.auth_blocking.check_auth_blocking(requester=requester) res = await self.response_cache.wrap( - sync_config.request_key, + request_key, self._wait_for_sync_for_user, sync_config, sync_version, @@ -1569,12 +1569,158 @@ class SyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + sync_result_builder = await self.get_sync_result_builder( + sync_config, + since_token, + full_state, + ) + + logger.debug( + "Calculating sync response for %r between %s and %s", + sync_config.user, + sync_result_builder.since_token, + sync_result_builder.now_token, + ) + + logger.debug("Fetching account data") + + # Global account data is included if it is not filtered out. + if not sync_config.filter_collection.blocks_all_global_account_data(): + await self._generate_sync_entry_for_account_data(sync_result_builder) + + # Presence data is included if the server has it enabled and not filtered out. + include_presence_data = bool( + self.hs_config.server.presence_enabled + and not sync_config.filter_collection.blocks_all_presence() + ) + # Device list updates are sent if a since token is provided. + include_device_list_updates = bool(since_token and since_token.device_list_key) + + # If we do not care about the rooms or things which depend on the room + # data (namely presence and device list updates), then we can skip + # this process completely. + device_lists = DeviceListUpdates() + if ( + not sync_result_builder.sync_config.filter_collection.blocks_all_rooms() + or include_presence_data + or include_device_list_updates + ): + logger.debug("Fetching room data") + + # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which + # is used in calculate_user_changes below. + ( + newly_joined_rooms, + newly_left_rooms, + ) = await self._generate_sync_entry_for_rooms(sync_result_builder) + + # Work out which users have joined or left rooms we're in. We use this + # to build the presence and device_list parts of the sync response in + # `_generate_sync_entry_for_presence` and + # `_generate_sync_entry_for_device_list` respectively. + if include_presence_data or include_device_list_updates: + # This uses the sync_result_builder.joined which is set in + # `_generate_sync_entry_for_rooms`, if that didn't find any joined + # rooms for some reason it is a no-op. + ( + newly_joined_or_invited_or_knocked_users, + newly_left_users, + ) = sync_result_builder.calculate_user_changes() + + if include_presence_data: + logger.debug("Fetching presence data") + await self._generate_sync_entry_for_presence( + sync_result_builder, + newly_joined_rooms, + newly_joined_or_invited_or_knocked_users, + ) + + if include_device_list_updates: + device_lists = await self._generate_sync_entry_for_device_list( + sync_result_builder, + newly_joined_rooms=newly_joined_rooms, + newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, + newly_left_rooms=newly_left_rooms, + newly_left_users=newly_left_users, + ) + + logger.debug("Fetching to-device data") + await self._generate_sync_entry_for_to_device(sync_result_builder) + + logger.debug("Fetching OTK data") + device_id = sync_config.device_id + one_time_keys_count: JsonMapping = {} + unused_fallback_key_types: List[str] = [] + if device_id: + # TODO: We should have a way to let clients differentiate between the states of: + # * no change in OTK count since the provided since token + # * the server has zero OTKs left for this device + # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 + one_time_keys_count = await self.store.count_e2e_one_time_keys( + user_id, device_id + ) + unused_fallback_key_types = list( + await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) + ) + + num_events = 0 + + # debug for https://github.com/matrix-org/synapse/issues/9424 + for joined_room in sync_result_builder.joined: + num_events += len(joined_room.timeline.events) + + log_kv( + { + "joined_rooms_in_result": len(sync_result_builder.joined), + "events_in_result": num_events, + } + ) + + logger.debug("Sync response calculation complete") + return SyncResult( + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + knocked=sync_result_builder.knocked, + archived=sync_result_builder.archived, + to_device=sync_result_builder.to_device, + device_lists=device_lists, + device_one_time_keys_count=one_time_keys_count, + device_unused_fallback_key_types=unused_fallback_key_types, + next_batch=sync_result_builder.now_token, + ) + + async def get_sync_result_builder( + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + full_state: bool = False, + ) -> "SyncResultBuilder": + """ + Assemble a `SyncResultBuilder` with all of the initial context to + start building up the sync response: + + - Membership changes between the last sync and the current sync. + - Joined room IDs (minus any rooms to exclude). + - Rooms that became fully-stated/un-partial stated since the last sync. + + Args: + sync_config: Config/info necessary to process the sync request. + since_token: The point in the stream to sync from. + full_state: Whether to return the full state for each room. + + Returns: + `SyncResultBuilder` ready to start generating parts of the sync response. + """ + user_id = sync_config.user.to_string() + # Note: we get the users room list *before* we get the current token, this # avoids checking back in history if rooms are joined after the token is fetched. token_before_rooms = self.event_sources.get_current_token() mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) - # NB: The now_token gets changed by some of the generate_sync_* methods, + # NB: The `now_token` gets changed by some of the `generate_sync_*` methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` @@ -1675,13 +1821,6 @@ class SyncHandler: if room_id not in mutable_rooms_to_exclude ) - logger.debug( - "Calculating sync response for %r between %s and %s", - sync_config.user, - since_token, - now_token, - ) - sync_result_builder = SyncResultBuilder( sync_config, full_state, @@ -1693,114 +1832,7 @@ class SyncHandler: membership_change_events=membership_change_events, ) - logger.debug("Fetching account data") - - # Global account data is included if it is not filtered out. - if not sync_config.filter_collection.blocks_all_global_account_data(): - await self._generate_sync_entry_for_account_data(sync_result_builder) - - # Presence data is included if the server has it enabled and not filtered out. - include_presence_data = bool( - self.hs_config.server.presence_enabled - and not sync_config.filter_collection.blocks_all_presence() - ) - # Device list updates are sent if a since token is provided. - include_device_list_updates = bool(since_token and since_token.device_list_key) - - # If we do not care about the rooms or things which depend on the room - # data (namely presence and device list updates), then we can skip - # this process completely. - device_lists = DeviceListUpdates() - if ( - not sync_result_builder.sync_config.filter_collection.blocks_all_rooms() - or include_presence_data - or include_device_list_updates - ): - logger.debug("Fetching room data") - - # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which - # is used in calculate_user_changes below. - ( - newly_joined_rooms, - newly_left_rooms, - ) = await self._generate_sync_entry_for_rooms(sync_result_builder) - - # Work out which users have joined or left rooms we're in. We use this - # to build the presence and device_list parts of the sync response in - # `_generate_sync_entry_for_presence` and - # `_generate_sync_entry_for_device_list` respectively. - if include_presence_data or include_device_list_updates: - # This uses the sync_result_builder.joined which is set in - # `_generate_sync_entry_for_rooms`, if that didn't find any joined - # rooms for some reason it is a no-op. - ( - newly_joined_or_invited_or_knocked_users, - newly_left_users, - ) = sync_result_builder.calculate_user_changes() - - if include_presence_data: - logger.debug("Fetching presence data") - await self._generate_sync_entry_for_presence( - sync_result_builder, - newly_joined_rooms, - newly_joined_or_invited_or_knocked_users, - ) - - if include_device_list_updates: - device_lists = await self._generate_sync_entry_for_device_list( - sync_result_builder, - newly_joined_rooms=newly_joined_rooms, - newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, - newly_left_rooms=newly_left_rooms, - newly_left_users=newly_left_users, - ) - - logger.debug("Fetching to-device data") - await self._generate_sync_entry_for_to_device(sync_result_builder) - - logger.debug("Fetching OTK data") - device_id = sync_config.device_id - one_time_keys_count: JsonMapping = {} - unused_fallback_key_types: List[str] = [] - if device_id: - # TODO: We should have a way to let clients differentiate between the states of: - # * no change in OTK count since the provided since token - # * the server has zero OTKs left for this device - # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 - one_time_keys_count = await self.store.count_e2e_one_time_keys( - user_id, device_id - ) - unused_fallback_key_types = list( - await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) - ) - - num_events = 0 - - # debug for https://github.com/matrix-org/synapse/issues/9424 - for joined_room in sync_result_builder.joined: - num_events += len(joined_room.timeline.events) - - log_kv( - { - "joined_rooms_in_result": len(sync_result_builder.joined), - "events_in_result": num_events, - } - ) - - logger.debug("Sync response calculation complete") - return SyncResult( - presence=sync_result_builder.presence, - account_data=sync_result_builder.account_data, - joined=sync_result_builder.joined, - invited=sync_result_builder.invited, - knocked=sync_result_builder.knocked, - archived=sync_result_builder.archived, - to_device=sync_result_builder.to_device, - device_lists=device_lists, - device_one_time_keys_count=one_time_keys_count, - device_unused_fallback_key_types=unused_fallback_key_types, - next_batch=sync_result_builder.now_token, - ) + return sync_result_builder @measure_func("_generate_sync_entry_for_device_list") async def _generate_sync_entry_for_device_list( @@ -1850,7 +1882,7 @@ class SyncHandler: users_that_have_changed = set() - joined_rooms = sync_result_builder.joined_room_ids + joined_room_ids = sync_result_builder.joined_room_ids # Step 1a, check for changes in devices of users we share a room # with @@ -1875,7 +1907,7 @@ class SyncHandler: # or if the changed user is the syncing user (as we always # want to include device list updates of their own devices). if user_id == changed_user_id or any( - rid in joined_rooms for rid in entries + rid in joined_room_ids for rid in entries ): users_that_have_changed.add(changed_user_id) else: @@ -1909,7 +1941,7 @@ class SyncHandler: # Remove any users that we still share a room with. left_users_rooms = await self.store.get_rooms_for_users(newly_left_users) for user_id, entries in left_users_rooms.items(): - if any(rid in joined_rooms for rid in entries): + if any(rid in joined_room_ids for rid in entries): newly_left_users.discard(user_id) return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index d0713536e1..4a57eaf930 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py
@@ -210,7 +210,6 @@ class SyncRestServlet(RestServlet): user=user, filter_collection=filter_collection, is_guest=requester.is_guest, - request_key=request_key, device_id=device_id, ) @@ -234,6 +233,7 @@ class SyncRestServlet(RestServlet): requester, sync_config, SyncVersion.SYNC_V2, + request_key, since_token=since_token, timeout=timeout, full_state=full_state,