From 1bf143699c0ac8dd53111bfca4628f126d65210d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 28 Aug 2023 11:03:23 -0400 Subject: Combine logic about not overriding BUSY presence. (#16170) Simplify some of the presence code by reducing duplicated code between worker & non-worker modes. The main change is to push some of the logic from `user_syncing` into `set_state`. This is done by passing whether the user is setting the presence via a `/sync` with a new `is_sync` flag to `set_state`. If this is `true` some additional logic is performed: * Don't override `busy` presence. * Update the `last_user_sync_ts`. * Never update the status message. --- synapse/handlers/presence.py | 155 ++++++++++++++++++------------------------- 1 file changed, 63 insertions(+), 92 deletions(-) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index e8e9db4b91..c395dcdb43 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -151,15 +151,13 @@ class BasePresenceHandler(abc.ABC): self._federation_queue = PresenceFederationQueue(hs, self) - self._busy_presence_enabled = hs.config.experimental.msc3026_enabled - self.VALID_PRESENCE: Tuple[str, ...] = ( PresenceState.ONLINE, PresenceState.UNAVAILABLE, PresenceState.OFFLINE, ) - if self._busy_presence_enabled: + if hs.config.experimental.msc3026_enabled: self.VALID_PRESENCE += (PresenceState.BUSY,) active_presence = self.store.take_presence_startup_info() @@ -255,17 +253,19 @@ class BasePresenceHandler(abc.ABC): self, target_user: UserID, state: JsonDict, - ignore_status_msg: bool = False, force_notify: bool = False, + is_sync: bool = False, ) -> None: """Set the presence state of the user. Args: target_user: The ID of the user to set the presence state of. state: The presence state as a JSON dictionary. - ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. - If False, the user's current status will be updated. force_notify: Whether to force notification of the update to clients. + is_sync: True if this update was from a sync, which results in + *not* overriding a previously set BUSY status, updating the + user's last_user_sync_ts, and ignoring the "status_msg" field of + the `state` dict. """ @abc.abstractmethod @@ -491,23 +491,18 @@ class WorkerPresenceHandler(BasePresenceHandler): if not affect_presence or not self._presence_enabled: return _NullContextManager() - prev_state = await self.current_state_for_user(user_id) - if prev_state.state != PresenceState.BUSY: - # We set state here but pass ignore_status_msg = True as we don't want to - # cause the status message to be cleared. - # Note that this causes last_active_ts to be incremented which is not - # what the spec wants: see comment in the BasePresenceHandler version - # of this function. - await self.set_state( - UserID.from_string(user_id), - {"presence": presence_state}, - ignore_status_msg=True, - ) + # Note that this causes last_active_ts to be incremented which is not + # what the spec wants. + await self.set_state( + UserID.from_string(user_id), + state={"presence": presence_state}, + is_sync=True, + ) curr_sync = self._user_to_num_current_syncs.get(user_id, 0) self._user_to_num_current_syncs[user_id] = curr_sync + 1 - # If we went from no in flight sync to some, notify replication + # If this is the first in-flight sync, notify replication if self._user_to_num_current_syncs[user_id] == 1: self.mark_as_coming_online(user_id) @@ -518,7 +513,7 @@ class WorkerPresenceHandler(BasePresenceHandler): if user_id in self._user_to_num_current_syncs: self._user_to_num_current_syncs[user_id] -= 1 - # If we went from one in flight sync to non, notify replication + # If there are no more in-flight syncs, notify replication if self._user_to_num_current_syncs[user_id] == 0: self.mark_as_going_offline(user_id) @@ -598,17 +593,19 @@ class WorkerPresenceHandler(BasePresenceHandler): self, target_user: UserID, state: JsonDict, - ignore_status_msg: bool = False, force_notify: bool = False, + is_sync: bool = False, ) -> None: """Set the presence state of the user. Args: target_user: The ID of the user to set the presence state of. state: The presence state as a JSON dictionary. - ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. - If False, the user's current status will be updated. force_notify: Whether to force notification of the update to clients. + is_sync: True if this update was from a sync, which results in + *not* overriding a previously set BUSY status, updating the + user's last_user_sync_ts, and ignoring the "status_msg" field of + the `state` dict. """ presence = state["presence"] @@ -626,8 +623,8 @@ class WorkerPresenceHandler(BasePresenceHandler): instance_name=self._presence_writer_instance, user_id=user_id, state=state, - ignore_status_msg=ignore_status_msg, force_notify=force_notify, + is_sync=is_sync, ) async def bump_presence_active_time(self, user: UserID) -> None: @@ -992,45 +989,13 @@ class PresenceHandler(BasePresenceHandler): curr_sync = self.user_to_num_current_syncs.get(user_id, 0) self.user_to_num_current_syncs[user_id] = curr_sync + 1 - prev_state = await self.current_state_for_user(user_id) - - # If they're busy then they don't stop being busy just by syncing, - # so just update the last sync time. - if prev_state.state != PresenceState.BUSY: - # XXX: We set_state separately here and just update the last_active_ts above - # This keeps the logic as similar as possible between the worker and single - # process modes. Using set_state will actually cause last_active_ts to be - # updated always, which is not what the spec calls for, but synapse has done - # this for... forever, I think. - await self.set_state( - UserID.from_string(user_id), - {"presence": presence_state}, - ignore_status_msg=True, - ) - # Retrieve the new state for the logic below. This should come from the - # in-memory cache. - prev_state = await self.current_state_for_user(user_id) - - # To keep the single process behaviour consistent with worker mode, run the - # same logic as `update_external_syncs_row`, even though it looks weird. - if prev_state.state == PresenceState.OFFLINE: - await self._update_states( - [ - prev_state.copy_and_replace( - state=PresenceState.ONLINE, - last_active_ts=self.clock.time_msec(), - last_user_sync_ts=self.clock.time_msec(), - ) - ] - ) - # otherwise, set the new presence state & update the last sync time, - # but don't update last_active_ts as this isn't an indication that - # they've been active (even though it's probably been updated by - # set_state above) - else: - await self._update_states( - [prev_state.copy_and_replace(last_user_sync_ts=self.clock.time_msec())] - ) + # Note that this causes last_active_ts to be incremented which is not + # what the spec wants. + await self.set_state( + UserID.from_string(user_id), + state={"presence": presence_state}, + is_sync=True, + ) async def _end() -> None: try: @@ -1080,32 +1045,27 @@ class PresenceHandler(BasePresenceHandler): process_id, set() ) - updates = [] + # USER_SYNC is sent when a user starts or stops syncing on a remote + # process. (But only for the initial and last device.) + # + # When a user *starts* syncing it also calls set_state(...) which + # will update the state, last_active_ts, and last_user_sync_ts. + # Simply ensure the user is tracked as syncing in this case. + # + # When a user *stops* syncing, update the last_user_sync_ts and mark + # them as no longer syncing. Note this doesn't quite match the + # monolith behaviour, which updates last_user_sync_ts at the end of + # every sync, not just the last in-flight sync. if is_syncing and user_id not in process_presence: - if prev_state.state == PresenceState.OFFLINE: - updates.append( - prev_state.copy_and_replace( - state=PresenceState.ONLINE, - last_active_ts=sync_time_msec, - last_user_sync_ts=sync_time_msec, - ) - ) - else: - updates.append( - prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) - ) process_presence.add(user_id) - elif user_id in process_presence: - updates.append( - prev_state.copy_and_replace(last_user_sync_ts=sync_time_msec) + elif not is_syncing and user_id in process_presence: + new_state = prev_state.copy_and_replace( + last_user_sync_ts=sync_time_msec ) + await self._update_states([new_state]) - if not is_syncing: process_presence.discard(user_id) - if updates: - await self._update_states(updates) - self.external_process_last_updated_ms[process_id] = self.clock.time_msec() async def update_external_syncs_clear(self, process_id: str) -> None: @@ -1204,17 +1164,19 @@ class PresenceHandler(BasePresenceHandler): self, target_user: UserID, state: JsonDict, - ignore_status_msg: bool = False, force_notify: bool = False, + is_sync: bool = False, ) -> None: """Set the presence state of the user. Args: target_user: The ID of the user to set the presence state of. state: The presence state as a JSON dictionary. - ignore_status_msg: True to ignore the "status_msg" field of the `state` dict. - If False, the user's current status will be updated. force_notify: Whether to force notification of the update to clients. + is_sync: True if this update was from a sync, which results in + *not* overriding a previously set BUSY status, updating the + user's last_user_sync_ts, and ignoring the "status_msg" field of + the `state` dict. """ status_msg = state.get("status_msg", None) presence = state["presence"] @@ -1227,18 +1189,27 @@ class PresenceHandler(BasePresenceHandler): return user_id = target_user.to_string() + now = self.clock.time_msec() prev_state = await self.current_state_for_user(user_id) + # Syncs do not override a previous presence of busy. + # + # TODO: This is a hack for lack of multi-device support. Unfortunately + # removing this requires coordination with clients. + if prev_state.state == PresenceState.BUSY and is_sync: + presence = PresenceState.BUSY + new_fields = {"state": presence} - if not ignore_status_msg: - new_fields["status_msg"] = status_msg + if presence == PresenceState.ONLINE or presence == PresenceState.BUSY: + new_fields["last_active_ts"] = now - if presence == PresenceState.ONLINE or ( - presence == PresenceState.BUSY and self._busy_presence_enabled - ): - new_fields["last_active_ts"] = self.clock.time_msec() + if is_sync: + new_fields["last_user_sync_ts"] = now + else: + # Syncs do not override the status message. + new_fields["status_msg"] = status_msg await self._update_states( [prev_state.copy_and_replace(**new_fields)], force_notify=force_notify -- cgit 1.5.1 From 40901af5e096cb10ab69141875b071b4ea4ed1e0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 28 Aug 2023 13:08:49 -0400 Subject: Pass the device ID around in the presence handler (#16171) Refactoring to pass the device ID (in addition to the user ID) through the presence handler (specifically the `user_syncing`, `set_state`, and `bump_presence_active_time` methods and their replication versions). --- changelog.d/16171.misc | 1 + synapse/handlers/events.py | 1 + synapse/handlers/message.py | 9 ++++--- synapse/handlers/presence.py | 46 +++++++++++++++++++++++++++++------- synapse/replication/http/presence.py | 11 +++++---- synapse/rest/client/presence.py | 2 +- synapse/rest/client/read_marker.py | 4 +++- synapse/rest/client/receipts.py | 4 +++- synapse/rest/client/room.py | 4 +++- synapse/rest/client/sync.py | 1 + tests/handlers/test_presence.py | 38 ++++++++++++++++++++--------- 11 files changed, 91 insertions(+), 30 deletions(-) create mode 100644 changelog.d/16171.misc (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/16171.misc b/changelog.d/16171.misc new file mode 100644 index 0000000000..4d709cb56e --- /dev/null +++ b/changelog.d/16171.misc @@ -0,0 +1 @@ +Track per-device information in the presence code. diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 33359f6ed7..d12803bf0f 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -67,6 +67,7 @@ class EventStreamHandler: context = await presence_handler.user_syncing( requester.user.to_string(), + requester.device_id, affect_presence=affect_presence, presence_state=PresenceState.ONLINE, ) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 3184bfb047..4a15c76a7b 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1921,7 +1921,10 @@ class EventCreationHandler: # We don't want to block sending messages on any presence code. This # matters as sometimes presence code can take a while. run_as_background_process( - "bump_presence_active_time", self._bump_active_time, requester.user + "bump_presence_active_time", + self._bump_active_time, + requester.user, + requester.device_id, ) async def _notify() -> None: @@ -1958,10 +1961,10 @@ class EventCreationHandler: logger.info("maybe_kick_guest_users %r", current_state) await self.hs.get_room_member_handler().kick_guest_users(current_state) - async def _bump_active_time(self, user: UserID) -> None: + async def _bump_active_time(self, user: UserID, device_id: Optional[str]) -> None: try: presence = self.hs.get_presence_handler() - await presence.bump_presence_active_time(user) + await presence.bump_presence_active_time(user, device_id) except Exception: logger.exception("Error bumping presence active time") diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c395dcdb43..50c68c86ce 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -165,7 +165,11 @@ class BasePresenceHandler(abc.ABC): @abc.abstractmethod async def user_syncing( - self, user_id: str, affect_presence: bool, presence_state: str + self, + user_id: str, + device_id: Optional[str], + affect_presence: bool, + presence_state: str, ) -> ContextManager[None]: """Returns a context manager that should surround any stream requests from the user. @@ -176,6 +180,7 @@ class BasePresenceHandler(abc.ABC): Args: user_id: the user that is starting a sync + device_id: the user's device that is starting a sync affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. @@ -252,6 +257,7 @@ class BasePresenceHandler(abc.ABC): async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, @@ -260,6 +266,7 @@ class BasePresenceHandler(abc.ABC): Args: target_user: The ID of the user to set the presence state of. + device_id: the device that the user is setting the presence state of. state: The presence state as a JSON dictionary. force_notify: Whether to force notification of the update to clients. is_sync: True if this update was from a sync, which results in @@ -269,7 +276,9 @@ class BasePresenceHandler(abc.ABC): """ @abc.abstractmethod - async def bump_presence_active_time(self, user: UserID) -> None: + async def bump_presence_active_time( + self, user: UserID, device_id: Optional[str] + ) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -381,7 +390,9 @@ class BasePresenceHandler(abc.ABC): # We set force_notify=True here so that this presence update is guaranteed to # increment the presence stream ID (which resending the current user's presence # otherwise would not do). - await self.set_state(UserID.from_string(user_id), state, force_notify=True) + await self.set_state( + UserID.from_string(user_id), None, state, force_notify=True + ) async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool: raise NotImplementedError( @@ -481,7 +492,11 @@ class WorkerPresenceHandler(BasePresenceHandler): self.send_user_sync(user_id, False, last_sync_ms) async def user_syncing( - self, user_id: str, affect_presence: bool, presence_state: str + self, + user_id: str, + device_id: Optional[str], + affect_presence: bool, + presence_state: str, ) -> ContextManager[None]: """Record that a user is syncing. @@ -495,6 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler): # what the spec wants. await self.set_state( UserID.from_string(user_id), + device_id, state={"presence": presence_state}, is_sync=True, ) @@ -592,6 +608,7 @@ class WorkerPresenceHandler(BasePresenceHandler): async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, @@ -600,6 +617,7 @@ class WorkerPresenceHandler(BasePresenceHandler): Args: target_user: The ID of the user to set the presence state of. + device_id: the device that the user is setting the presence state of. state: The presence state as a JSON dictionary. force_notify: Whether to force notification of the update to clients. is_sync: True if this update was from a sync, which results in @@ -622,12 +640,15 @@ class WorkerPresenceHandler(BasePresenceHandler): await self._set_state_client( instance_name=self._presence_writer_instance, user_id=user_id, + device_id=device_id, state=state, force_notify=force_notify, is_sync=is_sync, ) - async def bump_presence_active_time(self, user: UserID) -> None: + async def bump_presence_active_time( + self, user: UserID, device_id: Optional[str] + ) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -638,7 +659,9 @@ class WorkerPresenceHandler(BasePresenceHandler): # Proxy request to instance that writes presence user_id = user.to_string() await self._bump_active_client( - instance_name=self._presence_writer_instance, user_id=user_id + instance_name=self._presence_writer_instance, + user_id=user_id, + device_id=device_id, ) @@ -943,7 +966,9 @@ class PresenceHandler(BasePresenceHandler): return await self._update_states(changes) - async def bump_presence_active_time(self, user: UserID) -> None: + async def bump_presence_active_time( + self, user: UserID, device_id: Optional[str] + ) -> None: """We've seen the user do something that indicates they're interacting with the app. """ @@ -966,6 +991,7 @@ class PresenceHandler(BasePresenceHandler): async def user_syncing( self, user_id: str, + device_id: Optional[str], affect_presence: bool = True, presence_state: str = PresenceState.ONLINE, ) -> ContextManager[None]: @@ -977,7 +1003,8 @@ class PresenceHandler(BasePresenceHandler): when users disconnect/reconnect. Args: - user_id + user_id: the user that is starting a sync + device_id: the user's device that is starting a sync affect_presence: If false this function will be a no-op. Useful for streams that are not associated with an actual client that is being used by a user. @@ -993,6 +1020,7 @@ class PresenceHandler(BasePresenceHandler): # what the spec wants. await self.set_state( UserID.from_string(user_id), + device_id, state={"presence": presence_state}, is_sync=True, ) @@ -1163,6 +1191,7 @@ class PresenceHandler(BasePresenceHandler): async def set_state( self, target_user: UserID, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, @@ -1171,6 +1200,7 @@ class PresenceHandler(BasePresenceHandler): Args: target_user: The ID of the user to set the presence state of. + device_id: the device that the user is setting the presence state of. state: The presence state as a JSON dictionary. force_notify: Whether to force notification of the update to clients. is_sync: True if this update was from a sync, which results in diff --git a/synapse/replication/http/presence.py b/synapse/replication/http/presence.py index a24fb9310b..6c9e79fb07 100644 --- a/synapse/replication/http/presence.py +++ b/synapse/replication/http/presence.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, Tuple +from typing import TYPE_CHECKING, Optional, Tuple from twisted.web.server import Request @@ -51,14 +51,14 @@ class ReplicationBumpPresenceActiveTime(ReplicationEndpoint): self._presence_handler = hs.get_presence_handler() @staticmethod - async def _serialize_payload(user_id: str) -> JsonDict: # type: ignore[override] - return {} + async def _serialize_payload(user_id: str, device_id: Optional[str]) -> JsonDict: # type: ignore[override] + return {"device_id": device_id} async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict, user_id: str ) -> Tuple[int, JsonDict]: await self._presence_handler.bump_presence_active_time( - UserID.from_string(user_id) + UserID.from_string(user_id), content.get("device_id") ) return (200, {}) @@ -95,11 +95,13 @@ class ReplicationPresenceSetState(ReplicationEndpoint): @staticmethod async def _serialize_payload( # type: ignore[override] user_id: str, + device_id: Optional[str], state: JsonDict, force_notify: bool = False, is_sync: bool = False, ) -> JsonDict: return { + "device_id": device_id, "state": state, "force_notify": force_notify, "is_sync": is_sync, @@ -110,6 +112,7 @@ class ReplicationPresenceSetState(ReplicationEndpoint): ) -> Tuple[int, JsonDict]: await self._presence_handler.set_state( UserID.from_string(user_id), + content.get("device_id"), content["state"], content["force_notify"], content.get("is_sync", False), diff --git a/synapse/rest/client/presence.py b/synapse/rest/client/presence.py index 8e193330f8..d578faa969 100644 --- a/synapse/rest/client/presence.py +++ b/synapse/rest/client/presence.py @@ -97,7 +97,7 @@ class PresenceStatusRestServlet(RestServlet): raise SynapseError(400, "Unable to parse state") if self._use_presence: - await self.presence_handler.set_state(user, state) + await self.presence_handler.set_state(user, requester.device_id, state) return 200, {} diff --git a/synapse/rest/client/read_marker.py b/synapse/rest/client/read_marker.py index 4f96e51eeb..1707e51972 100644 --- a/synapse/rest/client/read_marker.py +++ b/synapse/rest/client/read_marker.py @@ -52,7 +52,9 @@ class ReadMarkerRestServlet(RestServlet): ) -> Tuple[int, JsonDict]: requester = await self.auth.get_user_by_req(request) - await self.presence_handler.bump_presence_active_time(requester.user) + await self.presence_handler.bump_presence_active_time( + requester.user, requester.device_id + ) body = parse_json_object_from_request(request) diff --git a/synapse/rest/client/receipts.py b/synapse/rest/client/receipts.py index 316e7b9982..869a374459 100644 --- a/synapse/rest/client/receipts.py +++ b/synapse/rest/client/receipts.py @@ -94,7 +94,9 @@ class ReceiptRestServlet(RestServlet): Codes.INVALID_PARAM, ) - await self.presence_handler.bump_presence_active_time(requester.user) + await self.presence_handler.bump_presence_active_time( + requester.user, requester.device_id + ) if receipt_type == ReceiptTypes.FULLY_READ: await self.read_marker_handler.received_client_read_marker( diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py index dc498001e4..553938ce9d 100644 --- a/synapse/rest/client/room.py +++ b/synapse/rest/client/room.py @@ -1229,7 +1229,9 @@ class RoomTypingRestServlet(RestServlet): content = parse_json_object_from_request(request) - await self.presence_handler.bump_presence_active_time(requester.user) + await self.presence_handler.bump_presence_active_time( + requester.user, requester.device_id + ) # Limit timeout to stop people from setting silly typing timeouts. timeout = min(content.get("timeout", 30000), 120000) diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py index d7854ed4fd..42bdd3bb10 100644 --- a/synapse/rest/client/sync.py +++ b/synapse/rest/client/sync.py @@ -205,6 +205,7 @@ class SyncRestServlet(RestServlet): context = await self.presence_handler.user_syncing( user.to_string(), + requester.device_id, affect_presence=affect_presence, presence_state=set_presence, ) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index a3fdcf7f93..a987267308 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -524,6 +524,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase): def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.user_id = f"@test:{self.hs.config.server.server_name}" + self.device_id = "dev-1" # Move the reactor to the initial time. self.reactor.advance(1000) @@ -608,7 +609,10 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase): self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000 / 2) self.get_success( presence_handler.user_syncing( - self.user_id, sync_state != PresenceState.OFFLINE, sync_state + self.user_id, + self.device_id, + sync_state != PresenceState.OFFLINE, + sync_state, ) ) @@ -632,6 +636,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase): class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): user_id = "@test:server" user_id_obj = UserID.from_string(user_id) + device_id = "dev-1" def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: self.presence_handler = hs.get_presence_handler() @@ -652,7 +657,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): self.get_success( worker_presence_handler.user_syncing( - self.user_id, True, PresenceState.ONLINE + self.user_id, self.device_id, True, PresenceState.ONLINE ), by=0.1, ) @@ -708,7 +713,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # Mark user as offline self.get_success( self.presence_handler.set_state( - self.user_id_obj, {"presence": PresenceState.OFFLINE} + self.user_id_obj, self.device_id, {"presence": PresenceState.OFFLINE} ) ) @@ -740,7 +745,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # Mark user as online again self.get_success( self.presence_handler.set_state( - self.user_id_obj, {"presence": PresenceState.ONLINE} + self.user_id_obj, self.device_id, {"presence": PresenceState.ONLINE} ) ) @@ -769,7 +774,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): self.get_success( self.presence_handler.user_syncing( - self.user_id, False, PresenceState.ONLINE + self.user_id, self.device_id, False, PresenceState.ONLINE ) ) @@ -786,7 +791,9 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg) self.get_success( - self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing( + self.user_id, self.device_id, True, PresenceState.ONLINE + ) ) state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) @@ -800,7 +807,9 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): self._set_presencestate_with_status_msg(PresenceState.UNAVAILABLE, status_msg) self.get_success( - self.presence_handler.user_syncing(self.user_id, True, PresenceState.ONLINE) + self.presence_handler.user_syncing( + self.user_id, self.device_id, True, PresenceState.ONLINE + ) ) state = self.get_success(self.presence_handler.get_state(self.user_id_obj)) @@ -838,7 +847,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # /presence/*. self.get_success( worker_to_sync_against.get_presence_handler().user_syncing( - self.user_id, True, PresenceState.ONLINE + self.user_id, self.device_id, True, PresenceState.ONLINE ), by=0.1, ) @@ -875,6 +884,7 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): self.get_success( self.presence_handler.set_state( self.user_id_obj, + self.device_id, {"presence": state, "status_msg": status_msg}, ) ) @@ -1116,7 +1126,9 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): # Mark test2 as online, test will be offline with a last_active of 0 self.get_success( self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test2:server"), + "dev-1", + {"presence": PresenceState.ONLINE}, ) ) self.reactor.pump([0]) # Wait for presence updates to be handled @@ -1163,7 +1175,9 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): # Mark test as online self.get_success( self.presence_handler.set_state( - UserID.from_string("@test:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test:server"), + "dev-1", + {"presence": PresenceState.ONLINE}, ) ) @@ -1171,7 +1185,9 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): # Note we don't join them to the room yet self.get_success( self.presence_handler.set_state( - UserID.from_string("@test2:server"), {"presence": PresenceState.ONLINE} + UserID.from_string("@test2:server"), + "dev-1", + {"presence": PresenceState.ONLINE}, ) ) -- cgit 1.5.1 From e9235d92f2a3cde489a4d24303e7868a93f3fb4d Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 29 Aug 2023 11:44:07 -0400 Subject: Track currently syncing users by device for presence (#16172) Refactoring to use both the user ID & the device ID when tracking the currently syncing users in the presence handler. This is done both locally and over replication. Note that the device ID is discarded but will be used in a future change. --- changelog.d/16172.misc | 1 + synapse/handlers/presence.py | 155 +++++++++++++++++++++++------------- synapse/replication/tcp/commands.py | 17 +++- synapse/replication/tcp/handler.py | 19 +++-- 4 files changed, 129 insertions(+), 63 deletions(-) create mode 100644 changelog.d/16172.misc (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/16172.misc b/changelog.d/16172.misc new file mode 100644 index 0000000000..4d709cb56e --- /dev/null +++ b/changelog.d/16172.misc @@ -0,0 +1 @@ +Track per-device information in the presence code. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 50c68c86ce..2f841863ae 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -23,6 +23,7 @@ The methods that define policy are: """ import abc import contextlib +import itertools import logging from bisect import bisect from contextlib import contextmanager @@ -188,15 +189,17 @@ class BasePresenceHandler(abc.ABC): """ @abc.abstractmethod - def get_currently_syncing_users_for_replication(self) -> Iterable[str]: - """Get an iterable of syncing users on this worker, to send to the presence handler + def get_currently_syncing_users_for_replication( + self, + ) -> Iterable[Tuple[str, Optional[str]]]: + """Get an iterable of syncing users and devices on this worker, to send to the presence handler This is called when a replication connection is established. It should return - a list of user ids, which are then sent as USER_SYNC commands to inform the - process handling presence about those users. + a list of tuples of user ID & device ID, which are then sent as USER_SYNC commands + to inform the process handling presence about those users/devices. Returns: - An iterable of user_id strings. + An iterable of tuples of user ID and device ID. """ async def get_state(self, target_user: UserID) -> UserPresenceState: @@ -284,7 +287,12 @@ class BasePresenceHandler(abc.ABC): """ async def update_external_syncs_row( # noqa: B027 (no-op by design) - self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + self, + process_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + sync_time_msec: int, ) -> None: """Update the syncing users for an external process as a delta. @@ -295,6 +303,7 @@ class BasePresenceHandler(abc.ABC): syncing against. This allows synapse to process updates as user start and stop syncing against a given process. user_id: The user who has started or stopped syncing + device_id: The user's device that has started or stopped syncing is_syncing: Whether or not the user is now syncing sync_time_msec: Time in ms when the user was last syncing """ @@ -425,16 +434,18 @@ class WorkerPresenceHandler(BasePresenceHandler): hs.config.worker.writers.presence, ) - # The number of ongoing syncs on this process, by user id. + # The number of ongoing syncs on this process, by (user ID, device ID). # Empty if _presence_enabled is false. - self._user_to_num_current_syncs: Dict[str, int] = {} + self._user_device_to_num_current_syncs: Dict[ + Tuple[str, Optional[str]], int + ] = {} self.notifier = hs.get_notifier() self.instance_id = hs.get_instance_id() - # user_id -> last_sync_ms. Lists the users that have stopped syncing but - # we haven't notified the presence writer of that yet - self.users_going_offline: Dict[str, int] = {} + # (user_id, device_id) -> last_sync_ms. Lists the devices that have stopped + # syncing but we haven't notified the presence writer of that yet + self._user_devices_going_offline: Dict[Tuple[str, Optional[str]], int] = {} self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs) self._set_state_client = ReplicationPresenceSetState.make_client(hs) @@ -457,39 +468,47 @@ class WorkerPresenceHandler(BasePresenceHandler): ClearUserSyncsCommand(self.instance_id) ) - def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None: + def send_user_sync( + self, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + last_sync_ms: int, + ) -> None: if self._presence_enabled: self.hs.get_replication_command_handler().send_user_sync( - self.instance_id, user_id, is_syncing, last_sync_ms + self.instance_id, user_id, device_id, is_syncing, last_sync_ms ) - def mark_as_coming_online(self, user_id: str) -> None: + def mark_as_coming_online(self, user_id: str, device_id: Optional[str]) -> None: """A user has started syncing. Send a UserSync to the presence writer, unless they had recently stopped syncing. """ - going_offline = self.users_going_offline.pop(user_id, None) + going_offline = self._user_devices_going_offline.pop((user_id, device_id), None) if not going_offline: # Safe to skip because we haven't yet told the presence writer they # were offline - self.send_user_sync(user_id, True, self.clock.time_msec()) + self.send_user_sync(user_id, device_id, True, self.clock.time_msec()) - def mark_as_going_offline(self, user_id: str) -> None: + def mark_as_going_offline(self, user_id: str, device_id: Optional[str]) -> None: """A user has stopped syncing. We wait before notifying the presence writer as its likely they'll come back soon. This allows us to avoid sending a stopped syncing immediately followed by a started syncing notification to the presence writer """ - self.users_going_offline[user_id] = self.clock.time_msec() + self._user_devices_going_offline[(user_id, device_id)] = self.clock.time_msec() def send_stop_syncing(self) -> None: """Check if there are any users who have stopped syncing a while ago and haven't come back yet. If there are poke the presence writer about them. """ now = self.clock.time_msec() - for user_id, last_sync_ms in list(self.users_going_offline.items()): + for (user_id, device_id), last_sync_ms in list( + self._user_devices_going_offline.items() + ): if now - last_sync_ms > UPDATE_SYNCING_USERS_MS: - self.users_going_offline.pop(user_id, None) - self.send_user_sync(user_id, False, last_sync_ms) + self._user_devices_going_offline.pop((user_id, device_id), None) + self.send_user_sync(user_id, device_id, False, last_sync_ms) async def user_syncing( self, @@ -515,23 +534,23 @@ class WorkerPresenceHandler(BasePresenceHandler): is_sync=True, ) - curr_sync = self._user_to_num_current_syncs.get(user_id, 0) - self._user_to_num_current_syncs[user_id] = curr_sync + 1 + curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0) + self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1 # If this is the first in-flight sync, notify replication - if self._user_to_num_current_syncs[user_id] == 1: - self.mark_as_coming_online(user_id) + if self._user_device_to_num_current_syncs[(user_id, device_id)] == 1: + self.mark_as_coming_online(user_id, device_id) def _end() -> None: # We check that the user_id is in user_to_num_current_syncs because # user_to_num_current_syncs may have been cleared if we are # shutting down. - if user_id in self._user_to_num_current_syncs: - self._user_to_num_current_syncs[user_id] -= 1 + if (user_id, device_id) in self._user_device_to_num_current_syncs: + self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1 # If there are no more in-flight syncs, notify replication - if self._user_to_num_current_syncs[user_id] == 0: - self.mark_as_going_offline(user_id) + if self._user_device_to_num_current_syncs[(user_id, device_id)] == 0: + self.mark_as_going_offline(user_id, device_id) @contextlib.contextmanager def _user_syncing() -> Generator[None, None, None]: @@ -598,10 +617,12 @@ class WorkerPresenceHandler(BasePresenceHandler): # If this is a federation sender, notify about presence updates. await self.maybe_send_presence_to_interested_destinations(state_to_notify) - def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + def get_currently_syncing_users_for_replication( + self, + ) -> Iterable[Tuple[str, Optional[str]]]: return [ - user_id - for user_id, count in self._user_to_num_current_syncs.items() + user_id_device_id + for user_id_device_id, count in self._user_device_to_num_current_syncs.items() if count > 0 ] @@ -723,17 +744,23 @@ class PresenceHandler(BasePresenceHandler): # Keeps track of the number of *ongoing* syncs on this process. While # this is non zero a user will never go offline. - self.user_to_num_current_syncs: Dict[str, int] = {} + self._user_device_to_num_current_syncs: Dict[ + Tuple[str, Optional[str]], int + ] = {} # Keeps track of the number of *ongoing* syncs on other processes. + # # While any sync is ongoing on another process the user will never # go offline. + # # Each process has a unique identifier and an update frequency. If # no update is received from that process within the update period then # we assume that all the sync requests on that process have stopped. - # Stored as a dict from process_id to set of user_id, and a dict of - # process_id to millisecond timestamp last updated. - self.external_process_to_current_syncs: Dict[str, Set[str]] = {} + # Stored as a dict from process_id to set of (user_id, device_id), and + # a dict of process_id to millisecond timestamp last updated. + self.external_process_to_current_syncs: Dict[ + str, Set[Tuple[str, Optional[str]]] + ] = {} self.external_process_last_updated_ms: Dict[str, int] = {} self.external_sync_linearizer = Linearizer(name="external_sync_linearizer") @@ -938,7 +965,10 @@ class PresenceHandler(BasePresenceHandler): # that were syncing on that process to see if they need to be timed # out. users_to_check.update( - self.external_process_to_current_syncs.pop(process_id, ()) + user_id + for user_id, device_id in self.external_process_to_current_syncs.pop( + process_id, () + ) ) self.external_process_last_updated_ms.pop(process_id) @@ -951,11 +981,15 @@ class PresenceHandler(BasePresenceHandler): syncing_user_ids = { user_id - for user_id, count in self.user_to_num_current_syncs.items() + for (user_id, _), count in self._user_device_to_num_current_syncs.items() if count } - for user_ids in self.external_process_to_current_syncs.values(): - syncing_user_ids.update(user_ids) + syncing_user_ids.update( + user_id + for user_id, _ in itertools.chain( + *self.external_process_to_current_syncs.values() + ) + ) changes = handle_timeouts( states, @@ -1013,8 +1047,8 @@ class PresenceHandler(BasePresenceHandler): if not affect_presence or not self._presence_enabled: return _NullContextManager() - curr_sync = self.user_to_num_current_syncs.get(user_id, 0) - self.user_to_num_current_syncs[user_id] = curr_sync + 1 + curr_sync = self._user_device_to_num_current_syncs.get((user_id, device_id), 0) + self._user_device_to_num_current_syncs[(user_id, device_id)] = curr_sync + 1 # Note that this causes last_active_ts to be incremented which is not # what the spec wants. @@ -1027,7 +1061,7 @@ class PresenceHandler(BasePresenceHandler): async def _end() -> None: try: - self.user_to_num_current_syncs[user_id] -= 1 + self._user_device_to_num_current_syncs[(user_id, device_id)] -= 1 prev_state = await self.current_state_for_user(user_id) await self._update_states( @@ -1049,12 +1083,19 @@ class PresenceHandler(BasePresenceHandler): return _user_syncing() - def get_currently_syncing_users_for_replication(self) -> Iterable[str]: + def get_currently_syncing_users_for_replication( + self, + ) -> Iterable[Tuple[str, Optional[str]]]: # since we are the process handling presence, there is nothing to do here. return [] async def update_external_syncs_row( - self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int + self, + process_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + sync_time_msec: int, ) -> None: """Update the syncing users for an external process as a delta. @@ -1063,6 +1104,7 @@ class PresenceHandler(BasePresenceHandler): syncing against. This allows synapse to process updates as user start and stop syncing against a given process. user_id: The user who has started or stopped syncing + device_id: The user's device that has started or stopped syncing is_syncing: Whether or not the user is now syncing sync_time_msec: Time in ms when the user was last syncing """ @@ -1073,26 +1115,27 @@ class PresenceHandler(BasePresenceHandler): process_id, set() ) - # USER_SYNC is sent when a user starts or stops syncing on a remote - # process. (But only for the initial and last device.) + # USER_SYNC is sent when a user's device starts or stops syncing on + # a remote # process. (But only for the initial and last sync for that + # device.) # - # When a user *starts* syncing it also calls set_state(...) which + # When a device *starts* syncing it also calls set_state(...) which # will update the state, last_active_ts, and last_user_sync_ts. - # Simply ensure the user is tracked as syncing in this case. + # Simply ensure the user & device is tracked as syncing in this case. # - # When a user *stops* syncing, update the last_user_sync_ts and mark + # When a device *stops* syncing, update the last_user_sync_ts and mark # them as no longer syncing. Note this doesn't quite match the # monolith behaviour, which updates last_user_sync_ts at the end of # every sync, not just the last in-flight sync. - if is_syncing and user_id not in process_presence: - process_presence.add(user_id) - elif not is_syncing and user_id in process_presence: + if is_syncing and (user_id, device_id) not in process_presence: + process_presence.add((user_id, device_id)) + elif not is_syncing and (user_id, device_id) in process_presence: new_state = prev_state.copy_and_replace( last_user_sync_ts=sync_time_msec ) await self._update_states([new_state]) - process_presence.discard(user_id) + process_presence.discard((user_id, device_id)) self.external_process_last_updated_ms[process_id] = self.clock.time_msec() @@ -1106,7 +1149,9 @@ class PresenceHandler(BasePresenceHandler): process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) - prev_states = await self.current_state_for_users(process_presence) + prev_states = await self.current_state_for_users( + {user_id for user_id, device_id in process_presence} + ) time_now_ms = self.clock.time_msec() await self._update_states( diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 58a871c6d9..e616b5e1c8 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -267,27 +267,38 @@ class UserSyncCommand(Command): NAME = "USER_SYNC" def __init__( - self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int + self, + instance_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + last_sync_ms: int, ): self.instance_id = instance_id self.user_id = user_id + self.device_id = device_id self.is_syncing = is_syncing self.last_sync_ms = last_sync_ms @classmethod def from_line(cls: Type["UserSyncCommand"], line: str) -> "UserSyncCommand": - instance_id, user_id, state, last_sync_ms = line.split(" ", 3) + device_id: Optional[str] + instance_id, user_id, device_id, state, last_sync_ms = line.split(" ", 4) + + if device_id == "None": + device_id = None if state not in ("start", "end"): raise Exception("Invalid USER_SYNC state %r" % (state,)) - return cls(instance_id, user_id, state == "start", int(last_sync_ms)) + return cls(instance_id, user_id, device_id, state == "start", int(last_sync_ms)) def to_line(self) -> str: return " ".join( ( self.instance_id, self.user_id, + str(self.device_id), "start" if self.is_syncing else "end", str(self.last_sync_ms), ) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 92c5a55acc..d9045d7b73 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -428,7 +428,11 @@ class ReplicationCommandHandler: if self._is_presence_writer: return self._presence_handler.update_external_syncs_row( - cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms + cmd.instance_id, + cmd.user_id, + cmd.device_id, + cmd.is_syncing, + cmd.last_sync_ms, ) else: return None @@ -699,9 +703,9 @@ class ReplicationCommandHandler: ) now = self._clock.time_msec() - for user_id in currently_syncing: + for user_id, device_id in currently_syncing: connection.send_command( - UserSyncCommand(self._instance_id, user_id, True, now) + UserSyncCommand(self._instance_id, user_id, device_id, True, now) ) def lost_connection(self, connection: IReplicationConnection) -> None: @@ -753,11 +757,16 @@ class ReplicationCommandHandler: self.send_command(FederationAckCommand(self._instance_name, token)) def send_user_sync( - self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int + self, + instance_id: str, + user_id: str, + device_id: Optional[str], + is_syncing: bool, + last_sync_ms: int, ) -> None: """Poke the master that a user has started/stopped syncing.""" self.send_command( - UserSyncCommand(instance_id, user_id, is_syncing, last_sync_ms) + UserSyncCommand(instance_id, user_id, device_id, is_syncing, last_sync_ms) ) def send_user_ip( -- cgit 1.5.1 From d35bed8369514fe727b4fe1afb68f48cc8b2655a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 4 Sep 2023 17:14:09 +0100 Subject: Don't wake up destination transaction queue if they're not due for retry. (#16223) --- changelog.d/16223.feature | 1 + synapse/federation/send_queue.py | 12 +-- synapse/federation/sender/__init__.py | 86 +++++++++++++++------- synapse/federation/sender/per_destination_queue.py | 6 +- synapse/handlers/device.py | 26 +++---- synapse/handlers/devicemessage.py | 7 +- synapse/handlers/presence.py | 16 ++-- synapse/handlers/typing.py | 14 +++- synapse/module_api/__init__.py | 2 +- synapse/replication/tcp/client.py | 8 +- synapse/storage/databases/main/transactions.py | 26 ++++++- synapse/util/retryutils.py | 25 +++++++ tests/federation/test_federation_sender.py | 27 ++++--- tests/handlers/test_presence.py | 60 ++++++++++++--- tests/handlers/test_typing.py | 2 - 15 files changed, 228 insertions(+), 90 deletions(-) create mode 100644 changelog.d/16223.feature (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/16223.feature b/changelog.d/16223.feature new file mode 100644 index 0000000000..a52d66658b --- /dev/null +++ b/changelog.d/16223.feature @@ -0,0 +1 @@ +Improve resource usage when sending data to a large number of remote hosts that are marked as "down". diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index fb448f2155..6520795635 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -49,7 +49,7 @@ from synapse.api.presence import UserPresenceState from synapse.federation.sender import AbstractFederationSender, FederationSender from synapse.metrics import LaterGauge from synapse.replication.tcp.streams.federation import FederationStream -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util.metrics import Measure from .units import Edu @@ -229,7 +229,7 @@ class FederationRemoteSendQueue(AbstractFederationSender): """ # nothing to do here: the replication listener will handle it. - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """As per FederationSender @@ -245,7 +245,9 @@ class FederationRemoteSendQueue(AbstractFederationSender): self.notifier.on_new_replication_data() - def send_device_messages(self, destination: str, immediate: bool = True) -> None: + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: """As per FederationSender""" # We don't need to replicate this as it gets sent down a different # stream. @@ -463,7 +465,7 @@ class ParsedFederationStreamData: edus: Dict[str, List[Edu]] -def process_rows_for_federation( +async def process_rows_for_federation( transaction_queue: FederationSender, rows: List[FederationStream.FederationStreamRow], ) -> None: @@ -496,7 +498,7 @@ def process_rows_for_federation( parsed_row.add_to_buffer(buff) for state, destinations in buff.presence_destinations: - transaction_queue.send_presence_to_destinations( + await transaction_queue.send_presence_to_destinations( states=[state], destinations=destinations ) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 97abbdee18..fb20fd8a10 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -147,7 +147,10 @@ from twisted.internet import defer import synapse.metrics from synapse.api.presence import UserPresenceState from synapse.events import EventBase -from synapse.federation.sender.per_destination_queue import PerDestinationQueue +from synapse.federation.sender.per_destination_queue import ( + CATCHUP_RETRY_INTERVAL, + PerDestinationQueue, +) from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu from synapse.logging.context import make_deferred_yieldable, run_in_background @@ -161,9 +164,10 @@ from synapse.metrics.background_process_metrics import ( run_as_background_process, wrap_as_background_process, ) -from synapse.types import JsonDict, ReadReceipt, RoomStreamToken +from synapse.types import JsonDict, ReadReceipt, RoomStreamToken, StrCollection from synapse.util import Clock from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter if TYPE_CHECKING: from synapse.events.presence_router import PresenceRouter @@ -213,7 +217,7 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -242,9 +246,11 @@ class AbstractFederationSender(metaclass=abc.ABCMeta): raise NotImplementedError() @abc.abstractmethod - def send_device_messages(self, destination: str, immediate: bool = True) -> None: + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: """Tells the sender that a new device message is ready to be sent to the - destination. The `immediate` flag specifies whether the messages should + destinations. The `immediate` flag specifies whether the messages should be tried to be sent immediately, or whether it can be delayed for a short while (to aid performance). """ @@ -716,6 +722,13 @@ class FederationSender(AbstractFederationSender): pdu.internal_metadata.stream_ordering, ) + destinations = await filter_destinations_by_retry_limiter( + destinations, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: self._get_per_destination_queue(destination).send_pdu(pdu) @@ -763,12 +776,20 @@ class FederationSender(AbstractFederationSender): domains_set = await self._storage_controllers.state.get_current_hosts_in_room_or_partial_state_approximation( room_id ) - domains = [ + domains: StrCollection = [ d for d in domains_set if not self.is_mine_server_name(d) and self._federation_shard_config.should_handle(self._instance_name, d) ] + + domains = await filter_destinations_by_retry_limiter( + domains, + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + if not domains: return @@ -816,7 +837,7 @@ class FederationSender(AbstractFederationSender): for queue in queues: queue.flush_read_receipts_for_room(room_id) - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Iterable[UserPresenceState], destinations: Iterable[str] ) -> None: """Send the given presence states to the given destinations. @@ -831,13 +852,20 @@ class FederationSender(AbstractFederationSender): for state in states: assert self.is_mine_id(state.user_id) + destinations = await filter_destinations_by_retry_limiter( + [ + d + for d in destinations + if self._federation_shard_config.should_handle(self._instance_name, d) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) + for destination in destinations: if self.is_mine_server_name(destination): continue - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - continue self._get_per_destination_queue(destination).send_presence( states, start_loop=False @@ -896,21 +924,29 @@ class FederationSender(AbstractFederationSender): else: queue.send_edu(edu) - def send_device_messages(self, destination: str, immediate: bool = True) -> None: - if self.is_mine_server_name(destination): - logger.warning("Not sending device update to ourselves") - return - - if not self._federation_shard_config.should_handle( - self._instance_name, destination - ): - return + async def send_device_messages( + self, destinations: StrCollection, immediate: bool = True + ) -> None: + destinations = await filter_destinations_by_retry_limiter( + [ + destination + for destination in destinations + if self._federation_shard_config.should_handle( + self._instance_name, destination + ) + and not self.is_mine_server_name(destination) + ], + clock=self.clock, + store=self.store, + retry_due_within_ms=CATCHUP_RETRY_INTERVAL, + ) - if immediate: - self._get_per_destination_queue(destination).attempt_new_transaction() - else: - self._get_per_destination_queue(destination).mark_new_data() - self._destination_wakeup_queue.add_to_queue(destination) + for destination in destinations: + if immediate: + self._get_per_destination_queue(destination).attempt_new_transaction() + else: + self._get_per_destination_queue(destination).mark_new_data() + self._destination_wakeup_queue.add_to_queue(destination) def wake_destination(self, destination: str) -> None: """Called when we want to retry sending transactions to a remote. diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 31c5c2b7de..9105ba664c 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -59,6 +59,10 @@ sent_edus_by_type = Counter( ) +# If the retry interval is larger than this then we enter "catchup" mode +CATCHUP_RETRY_INTERVAL = 60 * 60 * 1000 + + class PerDestinationQueue: """ Manages the per-destination transmission queues. @@ -370,7 +374,7 @@ class PerDestinationQueue: ), ) - if e.retry_interval > 60 * 60 * 1000: + if e.retry_interval > CATCHUP_RETRY_INTERVAL: # we won't retry for another hour! # (this suggests a significant outage) # We drop pending EDUs because otherwise they will diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 5ae427d52c..763f56dfc1 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -836,17 +836,16 @@ class DeviceHandler(DeviceWorkerHandler): user_id, hosts, ) - for host in hosts: - self.federation_sender.send_device_messages( - host, immediate=False - ) - # TODO: when called, this isn't in a logging context. - # This leads to log spam, sentry event spam, and massive - # memory usage. - # See https://github.com/matrix-org/synapse/issues/12552. - # log_kv( - # {"message": "sent device update to host", "host": host} - # ) + await self.federation_sender.send_device_messages( + hosts, immediate=False + ) + # TODO: when called, this isn't in a logging context. + # This leads to log spam, sentry event spam, and massive + # memory usage. + # See https://github.com/matrix-org/synapse/issues/12552. + # log_kv( + # {"message": "sent device update to host", "host": host} + # ) if current_stream_id != stream_id: # Clear the set of hosts we've already sent to as we're @@ -951,8 +950,9 @@ class DeviceHandler(DeviceWorkerHandler): # Notify things that device lists need to be sent out. self.notifier.notify_replication() - for host in potentially_changed_hosts: - self.federation_sender.send_device_messages(host, immediate=False) + await self.federation_sender.send_device_messages( + potentially_changed_hosts, immediate=False + ) def _update_device_from_client_ips( diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py index 798c7039f9..1c79f7a61e 100644 --- a/synapse/handlers/devicemessage.py +++ b/synapse/handlers/devicemessage.py @@ -302,10 +302,9 @@ class DeviceMessageHandler: ) if self.federation_sender: - for destination in remote_messages.keys(): - # Enqueue a new federation transaction to send the new - # device messages to each remote destination. - self.federation_sender.send_device_messages(destination) + # Enqueue a new federation transaction to send the new + # device messages to each remote destination. + await self.federation_sender.send_device_messages(remote_messages.keys()) async def get_events_for_dehydrated_device( self, diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 2f841863ae..f31e18328b 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -354,7 +354,9 @@ class BasePresenceHandler(abc.ABC): ) for destination, host_states in hosts_to_states.items(): - self._federation.send_presence_to_destinations(host_states, [destination]) + await self._federation.send_presence_to_destinations( + host_states, [destination] + ) async def send_full_presence_to_users(self, user_ids: StrCollection) -> None: """ @@ -936,7 +938,7 @@ class PresenceHandler(BasePresenceHandler): ) for destination, states in hosts_to_states.items(): - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( states, [destination] ) @@ -1508,7 +1510,7 @@ class PresenceHandler(BasePresenceHandler): or state.status_msg is not None ] - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=newly_joined_remote_hosts, states=states, ) @@ -1519,7 +1521,7 @@ class PresenceHandler(BasePresenceHandler): prev_remote_hosts or newly_joined_remote_hosts ): local_states = await self.current_state_for_users(newly_joined_local_users) - self._federation_queue.send_presence_to_destinations( + await self._federation_queue.send_presence_to_destinations( destinations=prev_remote_hosts | newly_joined_remote_hosts, states=list(local_states.values()), ) @@ -2182,7 +2184,7 @@ class PresenceFederationQueue: index = bisect(self._queue, (clear_before,)) self._queue = self._queue[index:] - def send_presence_to_destinations( + async def send_presence_to_destinations( self, states: Collection[UserPresenceState], destinations: StrCollection ) -> None: """Send the presence states to the given destinations. @@ -2202,7 +2204,7 @@ class PresenceFederationQueue: return if self._federation: - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states, destinations=destinations, ) @@ -2325,7 +2327,7 @@ class PresenceFederationQueue: for host, user_ids in hosts_to_users.items(): states = await self._presence_handler.current_state_for_users(user_ids) - self._federation.send_presence_to_destinations( + await self._federation.send_presence_to_destinations( states=states.values(), destinations=[host], ) diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index 7aeae5319c..4b4227003d 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -26,9 +26,10 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.replication.tcp.streams import TypingStream from synapse.streams import EventSource -from synapse.types import JsonDict, Requester, StreamKeyType, UserID +from synapse.types import JsonDict, Requester, StrCollection, StreamKeyType, UserID from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.metrics import Measure +from synapse.util.retryutils import filter_destinations_by_retry_limiter from synapse.util.wheel_timer import WheelTimer if TYPE_CHECKING: @@ -150,8 +151,15 @@ class FollowerTypingHandler: now=now, obj=member, then=now + FEDERATION_PING_INTERVAL ) - hosts = await self._storage_controllers.state.get_current_hosts_in_room( - member.room_id + hosts: StrCollection = ( + await self._storage_controllers.state.get_current_hosts_in_room( + member.room_id + ) + ) + hosts = await filter_destinations_by_retry_limiter( + hosts, + clock=self.clock, + store=self.store, ) for domain in hosts: if not self.is_mine_server_name(domain): diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 9ad8e038ae..2f00a7ba20 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -1180,7 +1180,7 @@ class ModuleApi: # Send to remote destinations. destination = UserID.from_string(user).domain - presence_handler.get_federation_queue().send_presence_to_destinations( + await presence_handler.get_federation_queue().send_presence_to_destinations( presence_events, [destination] ) diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py index 3b88dc68ea..51285e6d33 100644 --- a/synapse/replication/tcp/client.py +++ b/synapse/replication/tcp/client.py @@ -422,7 +422,7 @@ class FederationSenderHandler: # The federation stream contains things that we want to send out, e.g. # presence, typing, etc. if stream_name == "federation": - send_queue.process_rows_for_federation(self.federation_sender, rows) + await send_queue.process_rows_for_federation(self.federation_sender, rows) await self.update_token(token) # ... and when new receipts happen @@ -439,16 +439,14 @@ class FederationSenderHandler: for row in rows if not row.entity.startswith("@") and not row.is_signature } - for host in hosts: - self.federation_sender.send_device_messages(host, immediate=False) + await self.federation_sender.send_device_messages(hosts, immediate=False) elif stream_name == ToDeviceStream.NAME: # The to_device stream includes stuff to be pushed to both local # clients and remote servers, so we ignore entities that start with # '@' (since they'll be local users rather than destinations). hosts = {row.entity for row in rows if not row.entity.startswith("@")} - for host in hosts: - self.federation_sender.send_device_messages(host) + await self.federation_sender.send_device_messages(hosts) async def _on_new_receipts( self, rows: Iterable[ReceiptsStream.ReceiptsStreamRow] diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py index 860bbf7c0f..efd21b5bfc 100644 --- a/synapse/storage/databases/main/transactions.py +++ b/synapse/storage/databases/main/transactions.py @@ -14,7 +14,7 @@ import logging from enum import Enum -from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, cast import attr from canonicaljson import encode_canonical_json @@ -28,8 +28,8 @@ from synapse.storage.database import ( LoggingTransaction, ) from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore -from synapse.types import JsonDict -from synapse.util.caches.descriptors import cached +from synapse.types import JsonDict, StrCollection +from synapse.util.caches.descriptors import cached, cachedList if TYPE_CHECKING: from synapse.server import HomeServer @@ -205,6 +205,26 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore): else: return None + @cachedList( + cached_method_name="get_destination_retry_timings", list_name="destinations" + ) + async def get_destination_retry_timings_batch( + self, destinations: StrCollection + ) -> Dict[str, Optional[DestinationRetryTimings]]: + rows = await self.db_pool.simple_select_many_batch( + table="destinations", + iterable=destinations, + column="destination", + retcols=("destination", "failure_ts", "retry_last_ts", "retry_interval"), + desc="get_destination_retry_timings_batch", + ) + + return { + row.pop("destination"): DestinationRetryTimings(**row) + for row in rows + if row["retry_last_ts"] and row["failure_ts"] and row["retry_interval"] + } + async def set_destination_retry_timings( self, destination: str, diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 9d2065372c..0e1f907667 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Any, Optional, Type from synapse.api.errors import CodeMessageException from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import DataStore +from synapse.types import StrCollection from synapse.util import Clock if TYPE_CHECKING: @@ -116,6 +117,30 @@ async def get_retry_limiter( ) +async def filter_destinations_by_retry_limiter( + destinations: StrCollection, + clock: Clock, + store: DataStore, + retry_due_within_ms: int = 0, +) -> StrCollection: + """Filter down the list of destinations to only those that will are either + alive or due for a retry (within `retry_due_within_ms`) + """ + if not destinations: + return destinations + + retry_timings = await store.get_destination_retry_timings_batch(destinations) + + now = int(clock.time_msec()) + + return [ + destination + for destination, timings in retry_timings.items() + if timings is None + or timings.retry_last_ts + timings.retry_interval <= now + retry_due_within_ms + ] + + class RetryDestinationLimiter: def __init__( self, diff --git a/tests/federation/test_federation_sender.py b/tests/federation/test_federation_sender.py index 7bd3d06859..caf04b54cb 100644 --- a/tests/federation/test_federation_sender.py +++ b/tests/federation/test_federation_sender.py @@ -75,7 +75,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): thread_id=None, data={"ts": 1234}, ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + self.get_success(sender.send_read_receipt(receipt)) self.pump() @@ -111,6 +111,9 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): # * The same room / user on multiple threads. # * A different user in the same room. sender = self.hs.get_federation_sender() + # Hack so that we have a txn in-flight so we batch up read receipts + # below + sender.wake_destination("host2") for user, thread in ( ("alice", None), ("alice", "thread"), @@ -125,9 +128,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): thread_id=thread, data={"ts": 1234}, ) - self.successResultOf( - defer.ensureDeferred(sender.send_read_receipt(receipt)) - ) + defer.ensureDeferred(sender.send_read_receipt(receipt)) self.pump() @@ -191,7 +192,7 @@ class FederationSenderReceiptsTestCases(HomeserverTestCase): thread_id=None, data={"ts": 1234}, ) - self.successResultOf(defer.ensureDeferred(sender.send_read_receipt(receipt))) + self.get_success(sender.send_read_receipt(receipt)) self.pump() @@ -342,7 +343,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): self.reactor.advance(1) # a second call should produce no new device EDUs - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) self.assertEqual(self.edus, []) # a second device @@ -550,7 +553,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. @@ -601,7 +606,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. @@ -656,7 +663,9 @@ class FederationSenderDevicesTestCases(HomeserverTestCase): # recover the server mock_send_txn.side_effect = self.record_transaction - self.hs.get_federation_sender().send_device_messages("host2") + self.get_success( + self.hs.get_federation_sender().send_device_messages(["host2"]) + ) # We queue up device list updates to be sent over federation, so we # advance to clear the queue. diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index a987267308..88a16193a3 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -909,8 +909,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) @@ -946,11 +952,17 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) now_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) rows, upto_token, limited = self.get_success( self.queue.get_replication_rows("master", prev_token, now_token, 10) @@ -989,8 +1001,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) self.reactor.advance(10 * 60 * 1000) @@ -1005,8 +1023,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) @@ -1033,11 +1057,17 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) self.reactor.advance(2 * 60 * 1000) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) self.reactor.advance(4 * 60 * 1000) @@ -1053,8 +1083,14 @@ class PresenceFederationQueueTestCase(unittest.HomeserverTestCase): prev_token = self.queue.get_current_token(self.instance_name) - self.queue.send_presence_to_destinations((state1, state2), ("dest1", "dest2")) - self.queue.send_presence_to_destinations((state3,), ("dest3",)) + self.get_success( + self.queue.send_presence_to_destinations( + (state1, state2), ("dest1", "dest2") + ) + ) + self.get_success( + self.queue.send_presence_to_destinations((state3,), ("dest3",)) + ) now_token = self.queue.get_current_token(self.instance_name) diff --git a/tests/handlers/test_typing.py b/tests/handlers/test_typing.py index 43c513b157..95106ec8f3 100644 --- a/tests/handlers/test_typing.py +++ b/tests/handlers/test_typing.py @@ -120,8 +120,6 @@ class TypingNotificationsTestCase(unittest.HomeserverTestCase): self.datastore = hs.get_datastores().main - self.datastore.get_destination_retry_timings = AsyncMock(return_value=None) - self.datastore.get_device_updates_by_remote = AsyncMock( # type: ignore[method-assign] return_value=(0, []) ) -- cgit 1.5.1