From 28a948f04f1e04cbcbd68c53a78aa2ada3a791a1 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 16 May 2024 11:54:46 -0500 Subject: Removed `request_key` from the `SyncConfig` (moved outside as its own function parameter) (#17201) Removed `request_key` from the `SyncConfig` (moved outside as its own function parameter) so it doesn't have to flow into `_generate_sync_entry_for_xxx` methods. This way we can separate the concerns of caching from generating the response and reuse the `_generate_sync_entry_for_xxx` functions as we see fit. Plus caching doesn't really have anything to do with the config of sync. Split from https://github.com/element-hq/synapse/pull/17167 Spawning from https://github.com/element-hq/synapse/pull/17167#discussion_r1601497279 --- synapse/handlers/sync.py | 6 +++--- synapse/rest/client/sync.py | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 2bd1b8de88..40e42af1f3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -135,7 +135,6 @@ class SyncConfig: user: UserID filter_collection: FilterCollection is_guest: bool - request_key: SyncRequestKey device_id: Optional[str] @@ -328,6 +327,7 @@ class SyncHandler: requester: Requester, sync_config: SyncConfig, sync_version: SyncVersion, + request_key: SyncRequestKey, since_token: Optional[StreamToken] = None, timeout: int = 0, full_state: bool = False, @@ -340,10 +340,10 @@ class SyncHandler: requester: The user requesting the sync response. sync_config: Config/info necessary to process the sync request. sync_version: Determines what kind of sync response to generate. + request_key: The key to use for caching the response. since_token: The point in the stream to sync from. timeout: How long to wait for new data to arrive before giving up. full_state: Whether to return the full state for each room. - Returns: When `SyncVersion.SYNC_V2`, returns a full `SyncResult`. """ @@ -354,7 +354,7 @@ class SyncHandler: await self.auth_blocking.check_auth_blocking(requester=requester) res = await self.response_cache.wrap( - sync_config.request_key, + request_key, self._wait_for_sync_for_user, sync_config, sync_version, diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index d0713536e1..4a57eaf930 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -210,7 +210,6 @@ class SyncRestServlet(RestServlet): user=user, filter_collection=filter_collection, is_guest=requester.is_guest, - request_key=request_key, device_id=device_id, ) @@ -234,6 +233,7 @@ class SyncRestServlet(RestServlet): requester, sync_config, SyncVersion.SYNC_V2, + request_key, since_token=since_token, timeout=timeout, full_state=full_state, -- cgit 1.5.1 From 52a649580f34b4f36dfa21abcd05dad27e28bd1a Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 16 May 2024 11:55:51 -0500 Subject: Rename to be obvious: `joined_rooms` -> `joined_room_ids` (#17203) Split out from https://github.com/element-hq/synapse/pull/17167 --- changelog.d/17203.misc | 1 + synapse/handlers/sync.py | 4 ++-- 2 files changed, 3 insertions(+), 2 deletions(-) create mode 100644 changelog.d/17203.misc (limited to 'synapse') diff --git a/changelog.d/17203.misc b/changelog.d/17203.misc new file mode 100644 index 0000000000..142300b1f2 --- /dev/null +++ b/changelog.d/17203.misc @@ -0,0 +1 @@ +Rename to be obvious: `joined_rooms` -> `joined_room_ids`. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 40e42af1f3..6d4373008c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1850,7 +1850,7 @@ class SyncHandler: users_that_have_changed = set() - joined_rooms = sync_result_builder.joined_room_ids + joined_room_ids = sync_result_builder.joined_room_ids # Step 1a, check for changes in devices of users we share a room # with @@ -1909,7 +1909,7 @@ class SyncHandler: # Remove any users that we still share a room with. left_users_rooms = await self.store.get_rooms_for_users(newly_left_users) for user_id, entries in left_users_rooms.items(): - if any(rid in joined_rooms for rid in entries): + if any(rid in joined_room_ids for rid in entries): newly_left_users.discard(user_id) return DeviceListUpdates(changed=users_that_have_changed, left=newly_left_users) -- cgit 1.5.1 From fe07995e691a8f6d84dde4de990f8f53634ec5b5 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 16 May 2024 12:27:38 -0500 Subject: Fix `joined_rooms`/`joined_room_ids` usage (#17208) This change was introduced in https://github.com/element-hq/synapse/pull/17203 But then https://github.com/element-hq/synapse/pull/17207 was reverted which brought back usage `joined_rooms` that needed to be updated. Wasn't caught because `develop` wasn't up to date before merging. --- changelog.d/17208.misc | 1 + synapse/handlers/sync.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) create mode 100644 changelog.d/17208.misc (limited to 'synapse') diff --git a/changelog.d/17208.misc b/changelog.d/17208.misc new file mode 100644 index 0000000000..142300b1f2 --- /dev/null +++ b/changelog.d/17208.misc @@ -0,0 +1 @@ +Rename to be obvious: `joined_rooms` -> `joined_room_ids`. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6d4373008c..6634b3887e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1875,7 +1875,7 @@ class SyncHandler: # or if the changed user is the syncing user (as we always # want to include device list updates of their own devices). if user_id == changed_user_id or any( - rid in joined_rooms for rid in entries + rid in joined_room_ids for rid in entries ): users_that_have_changed.add(changed_user_id) else: -- cgit 1.5.1 From c856ae47247579446bbe1a1adc1564158e5e0643 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 16 May 2024 13:05:31 -0500 Subject: Refactor `SyncResultBuilder` assembly to its own function (#17202) We will re-use `get_sync_result_builder(...)` in https://github.com/element-hq/synapse/pull/17167 Split out from https://github.com/element-hq/synapse/pull/17167 --- changelog.d/17202.misc | 1 + synapse/handlers/sync.py | 264 ++++++++++++++++++++++++++--------------------- 2 files changed, 149 insertions(+), 116 deletions(-) create mode 100644 changelog.d/17202.misc (limited to 'synapse') diff --git a/changelog.d/17202.misc b/changelog.d/17202.misc new file mode 100644 index 0000000000..4a558c8bcf --- /dev/null +++ b/changelog.d/17202.misc @@ -0,0 +1 @@ +Refactor `SyncResultBuilder` assembly to its own function. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 6634b3887e..d3d40e8682 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1569,12 +1569,158 @@ class SyncHandler: # See https://github.com/matrix-org/matrix-doc/issues/1144 raise NotImplementedError() + sync_result_builder = await self.get_sync_result_builder( + sync_config, + since_token, + full_state, + ) + + logger.debug( + "Calculating sync response for %r between %s and %s", + sync_config.user, + sync_result_builder.since_token, + sync_result_builder.now_token, + ) + + logger.debug("Fetching account data") + + # Global account data is included if it is not filtered out. + if not sync_config.filter_collection.blocks_all_global_account_data(): + await self._generate_sync_entry_for_account_data(sync_result_builder) + + # Presence data is included if the server has it enabled and not filtered out. + include_presence_data = bool( + self.hs_config.server.presence_enabled + and not sync_config.filter_collection.blocks_all_presence() + ) + # Device list updates are sent if a since token is provided. + include_device_list_updates = bool(since_token and since_token.device_list_key) + + # If we do not care about the rooms or things which depend on the room + # data (namely presence and device list updates), then we can skip + # this process completely. + device_lists = DeviceListUpdates() + if ( + not sync_result_builder.sync_config.filter_collection.blocks_all_rooms() + or include_presence_data + or include_device_list_updates + ): + logger.debug("Fetching room data") + + # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which + # is used in calculate_user_changes below. + ( + newly_joined_rooms, + newly_left_rooms, + ) = await self._generate_sync_entry_for_rooms(sync_result_builder) + + # Work out which users have joined or left rooms we're in. We use this + # to build the presence and device_list parts of the sync response in + # `_generate_sync_entry_for_presence` and + # `_generate_sync_entry_for_device_list` respectively. + if include_presence_data or include_device_list_updates: + # This uses the sync_result_builder.joined which is set in + # `_generate_sync_entry_for_rooms`, if that didn't find any joined + # rooms for some reason it is a no-op. + ( + newly_joined_or_invited_or_knocked_users, + newly_left_users, + ) = sync_result_builder.calculate_user_changes() + + if include_presence_data: + logger.debug("Fetching presence data") + await self._generate_sync_entry_for_presence( + sync_result_builder, + newly_joined_rooms, + newly_joined_or_invited_or_knocked_users, + ) + + if include_device_list_updates: + device_lists = await self._generate_sync_entry_for_device_list( + sync_result_builder, + newly_joined_rooms=newly_joined_rooms, + newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, + newly_left_rooms=newly_left_rooms, + newly_left_users=newly_left_users, + ) + + logger.debug("Fetching to-device data") + await self._generate_sync_entry_for_to_device(sync_result_builder) + + logger.debug("Fetching OTK data") + device_id = sync_config.device_id + one_time_keys_count: JsonMapping = {} + unused_fallback_key_types: List[str] = [] + if device_id: + # TODO: We should have a way to let clients differentiate between the states of: + # * no change in OTK count since the provided since token + # * the server has zero OTKs left for this device + # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 + one_time_keys_count = await self.store.count_e2e_one_time_keys( + user_id, device_id + ) + unused_fallback_key_types = list( + await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) + ) + + num_events = 0 + + # debug for https://github.com/matrix-org/synapse/issues/9424 + for joined_room in sync_result_builder.joined: + num_events += len(joined_room.timeline.events) + + log_kv( + { + "joined_rooms_in_result": len(sync_result_builder.joined), + "events_in_result": num_events, + } + ) + + logger.debug("Sync response calculation complete") + return SyncResult( + presence=sync_result_builder.presence, + account_data=sync_result_builder.account_data, + joined=sync_result_builder.joined, + invited=sync_result_builder.invited, + knocked=sync_result_builder.knocked, + archived=sync_result_builder.archived, + to_device=sync_result_builder.to_device, + device_lists=device_lists, + device_one_time_keys_count=one_time_keys_count, + device_unused_fallback_key_types=unused_fallback_key_types, + next_batch=sync_result_builder.now_token, + ) + + async def get_sync_result_builder( + self, + sync_config: SyncConfig, + since_token: Optional[StreamToken] = None, + full_state: bool = False, + ) -> "SyncResultBuilder": + """ + Assemble a `SyncResultBuilder` with all of the initial context to + start building up the sync response: + + - Membership changes between the last sync and the current sync. + - Joined room IDs (minus any rooms to exclude). + - Rooms that became fully-stated/un-partial stated since the last sync. + + Args: + sync_config: Config/info necessary to process the sync request. + since_token: The point in the stream to sync from. + full_state: Whether to return the full state for each room. + + Returns: + `SyncResultBuilder` ready to start generating parts of the sync response. + """ + user_id = sync_config.user.to_string() + # Note: we get the users room list *before* we get the current token, this # avoids checking back in history if rooms are joined after the token is fetched. token_before_rooms = self.event_sources.get_current_token() mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id)) - # NB: The now_token gets changed by some of the generate_sync_* methods, + # NB: The `now_token` gets changed by some of the `generate_sync_*` methods, # this is due to some of the underlying streams not supporting the ability # to query up to a given point. # Always use the `now_token` in `SyncResultBuilder` @@ -1675,13 +1821,6 @@ class SyncHandler: if room_id not in mutable_rooms_to_exclude ) - logger.debug( - "Calculating sync response for %r between %s and %s", - sync_config.user, - since_token, - now_token, - ) - sync_result_builder = SyncResultBuilder( sync_config, full_state, @@ -1693,114 +1832,7 @@ class SyncHandler: membership_change_events=membership_change_events, ) - logger.debug("Fetching account data") - - # Global account data is included if it is not filtered out. - if not sync_config.filter_collection.blocks_all_global_account_data(): - await self._generate_sync_entry_for_account_data(sync_result_builder) - - # Presence data is included if the server has it enabled and not filtered out. - include_presence_data = bool( - self.hs_config.server.presence_enabled - and not sync_config.filter_collection.blocks_all_presence() - ) - # Device list updates are sent if a since token is provided. - include_device_list_updates = bool(since_token and since_token.device_list_key) - - # If we do not care about the rooms or things which depend on the room - # data (namely presence and device list updates), then we can skip - # this process completely. - device_lists = DeviceListUpdates() - if ( - not sync_result_builder.sync_config.filter_collection.blocks_all_rooms() - or include_presence_data - or include_device_list_updates - ): - logger.debug("Fetching room data") - - # Note that _generate_sync_entry_for_rooms sets sync_result_builder.joined, which - # is used in calculate_user_changes below. - ( - newly_joined_rooms, - newly_left_rooms, - ) = await self._generate_sync_entry_for_rooms(sync_result_builder) - - # Work out which users have joined or left rooms we're in. We use this - # to build the presence and device_list parts of the sync response in - # `_generate_sync_entry_for_presence` and - # `_generate_sync_entry_for_device_list` respectively. - if include_presence_data or include_device_list_updates: - # This uses the sync_result_builder.joined which is set in - # `_generate_sync_entry_for_rooms`, if that didn't find any joined - # rooms for some reason it is a no-op. - ( - newly_joined_or_invited_or_knocked_users, - newly_left_users, - ) = sync_result_builder.calculate_user_changes() - - if include_presence_data: - logger.debug("Fetching presence data") - await self._generate_sync_entry_for_presence( - sync_result_builder, - newly_joined_rooms, - newly_joined_or_invited_or_knocked_users, - ) - - if include_device_list_updates: - device_lists = await self._generate_sync_entry_for_device_list( - sync_result_builder, - newly_joined_rooms=newly_joined_rooms, - newly_joined_or_invited_or_knocked_users=newly_joined_or_invited_or_knocked_users, - newly_left_rooms=newly_left_rooms, - newly_left_users=newly_left_users, - ) - - logger.debug("Fetching to-device data") - await self._generate_sync_entry_for_to_device(sync_result_builder) - - logger.debug("Fetching OTK data") - device_id = sync_config.device_id - one_time_keys_count: JsonMapping = {} - unused_fallback_key_types: List[str] = [] - if device_id: - # TODO: We should have a way to let clients differentiate between the states of: - # * no change in OTK count since the provided since token - # * the server has zero OTKs left for this device - # Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298 - one_time_keys_count = await self.store.count_e2e_one_time_keys( - user_id, device_id - ) - unused_fallback_key_types = list( - await self.store.get_e2e_unused_fallback_key_types(user_id, device_id) - ) - - num_events = 0 - - # debug for https://github.com/matrix-org/synapse/issues/9424 - for joined_room in sync_result_builder.joined: - num_events += len(joined_room.timeline.events) - - log_kv( - { - "joined_rooms_in_result": len(sync_result_builder.joined), - "events_in_result": num_events, - } - ) - - logger.debug("Sync response calculation complete") - return SyncResult( - presence=sync_result_builder.presence, - account_data=sync_result_builder.account_data, - joined=sync_result_builder.joined, - invited=sync_result_builder.invited, - knocked=sync_result_builder.knocked, - archived=sync_result_builder.archived, - to_device=sync_result_builder.to_device, - device_lists=device_lists, - device_one_time_keys_count=one_time_keys_count, - device_unused_fallback_key_types=unused_fallback_key_types, - next_batch=sync_result_builder.now_token, - ) + return sync_result_builder @measure_func("_generate_sync_entry_for_device_list") async def _generate_sync_entry_for_device_list( -- cgit 1.5.1 From 52af16c56175160512420d8654ac558a1e5af541 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 18 May 2024 12:03:30 +0100 Subject: Add a short sleep if the request is rate-limited (#17210) This helps prevent clients from "tight-looping" retrying their request. --- changelog.d/17210.misc | 1 + synapse/api/ratelimiting.py | 4 ++++ tests/api/test_ratelimiting.py | 5 +++-- tests/handlers/test_federation.py | 1 + tests/handlers/test_room_member.py | 4 ++++ tests/unittest.py | 4 ++-- 6 files changed, 15 insertions(+), 4 deletions(-) create mode 100644 changelog.d/17210.misc (limited to 'synapse') diff --git a/changelog.d/17210.misc b/changelog.d/17210.misc new file mode 100644 index 0000000000..2059ebea7b --- /dev/null +++ b/changelog.d/17210.misc @@ -0,0 +1 @@ +Add a short pause when rate-limiting a request. diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py index a73626bc86..a99a9e09fc 100644 --- a/synapse/api/ratelimiting.py +++ b/synapse/api/ratelimiting.py @@ -316,6 +316,10 @@ class Ratelimiter: ) if not allowed: + # We pause for a bit here to stop clients from "tight-looping" on + # retrying their request. + await self.clock.sleep(0.5) + raise LimitExceededError( limiter_name=self._limiter_name, retry_after_ms=int(1000 * (time_allowed - time_now_s)), diff --git a/tests/api/test_ratelimiting.py b/tests/api/test_ratelimiting.py index a24638c9ef..a59e168db1 100644 --- a/tests/api/test_ratelimiting.py +++ b/tests/api/test_ratelimiting.py @@ -116,8 +116,9 @@ class TestRatelimiter(unittest.HomeserverTestCase): # Should raise with self.assertRaises(LimitExceededError) as context: self.get_success_or_raise( - limiter.ratelimit(None, key="test_id", _time_now_s=5) + limiter.ratelimit(None, key="test_id", _time_now_s=5), by=0.5 ) + self.assertEqual(context.exception.retry_after_ms, 5000) # Shouldn't raise @@ -192,7 +193,7 @@ class TestRatelimiter(unittest.HomeserverTestCase): # Second attempt, 1s later, will fail with self.assertRaises(LimitExceededError) as context: self.get_success_or_raise( - limiter.ratelimit(None, key=("test_id",), _time_now_s=1) + limiter.ratelimit(None, key=("test_id",), _time_now_s=1), by=0.5 ) self.assertEqual(context.exception.retry_after_ms, 9000) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index b819b60c5d..3fe5b0a1b4 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -483,6 +483,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): event.room_version, ), exc=LimitExceededError, + by=0.5, ) def _build_and_send_join_event( diff --git a/tests/handlers/test_room_member.py b/tests/handlers/test_room_member.py index 3e28117e2c..df43ce581c 100644 --- a/tests/handlers/test_room_member.py +++ b/tests/handlers/test_room_member.py @@ -70,6 +70,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase): action=Membership.JOIN, ), LimitExceededError, + by=0.5, ) @override_config({"rc_joins_per_room": {"per_second": 0, "burst_count": 2}}) @@ -206,6 +207,7 @@ class TestJoinsLimitedByPerRoomRateLimiter(FederatingHomeserverTestCase): remote_room_hosts=[self.OTHER_SERVER_NAME], ), LimitExceededError, + by=0.5, ) # TODO: test that remote joins to a room are rate limited. @@ -273,6 +275,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa action=Membership.JOIN, ), LimitExceededError, + by=0.5, ) # Try to join as Chris on the original worker. Should get denied because Alice @@ -285,6 +288,7 @@ class TestReplicatedJoinsLimitedByPerRoomRateLimiter(BaseMultiWorkerStreamTestCa action=Membership.JOIN, ), LimitExceededError, + by=0.5, ) diff --git a/tests/unittest.py b/tests/unittest.py index e6aad9ed40..18963b9e32 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -637,13 +637,13 @@ class HomeserverTestCase(TestCase): return self.successResultOf(deferred) def get_failure( - self, d: Awaitable[Any], exc: Type[_ExcType] + self, d: Awaitable[Any], exc: Type[_ExcType], by: float = 0.0 ) -> _TypedFailure[_ExcType]: """ Run a Deferred and get a Failure from it. The failure must be of the type `exc`. """ deferred: Deferred[Any] = ensureDeferred(d) # type: ignore[arg-type] - self.pump() + self.pump(by) return self.failureResultOf(deferred, exc) def get_success_or_raise(self, d: Awaitable[TV], by: float = 0.0) -> TV: -- cgit 1.5.1