From ea75346f6af8c182a42d1ca29119a10361693a7b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 5 Sep 2023 09:58:51 -0400 Subject: Track presence state per-device and combine to a user state. (#16066) Tracks presence on an individual per-device basis and combine the per-device state into a per-user state. This should help in situations where a user has multiple devices with conflicting status (e.g. one is syncing with unavailable and one is syncing with online). The tie-breaking is done by priority: BUSY > ONLINE > UNAVAILABLE > OFFLINE --- synapse/handlers/presence.py | 279 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 232 insertions(+), 47 deletions(-) (limited to 'synapse/handlers/presence.py') diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f31e18328b..80190838b7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -13,13 +13,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""This module is responsible for keeping track of presence status of local +""" +This module is responsible for keeping track of presence status of local and remote users. The methods that define policy are: - PresenceHandler._update_states - PresenceHandler._handle_timeouts - should_notify + +# Tracking local presence + +For local users, presence is tracked on a per-device basis. When a user has multiple +devices the user presence state is derived by coalescing the presence from each +device: + + BUSY > ONLINE > UNAVAILABLE > OFFLINE + +The time that each device was last active and last synced is tracked in order to +automatically downgrade a device's presence state: + + A device may move from ONLINE -> UNAVAILABLE, if it has not been active for + a period of time. + + A device may go from any state -> OFFLINE, if it is not active and has not + synced for a period of time. + +The timeouts are handled using a wheel timer, which has coarse buckets. Timings +do not need to be exact. + +Generally a device's presence state is updated whenever a user syncs (via the +set_presence parameter), when the presence API is called, or if "pro-active" +events occur, including: + +* Sending an event, receipt, read marker. +* Updating typing status. + +The busy state has special status that it cannot is not downgraded by a call to +sync with a lower priority state *and* it takes a long period of time to transition +to offline. + +# Persisting (and restoring) presence + +For all users, presence is persisted on a per-user basis. Data is kept in-memory +and persisted periodically. When Synapse starts each worker loads the current +presence state and then tracks the presence stream to keep itself up-to-date. + +When restoring presence for local users a pseudo-device is created to match the +user state; this device follows the normal timeout logic (see above) and will +automatically be replaced with any information from currently available devices. + """ import abc import contextlib @@ -30,6 +73,7 @@ from contextlib import contextmanager from types import TracebackType from typing import ( TYPE_CHECKING, + AbstractSet, Any, Callable, Collection, @@ -49,7 +93,7 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError -from synapse.api.presence import UserPresenceState +from synapse.api.presence import UserDevicePresenceState, UserPresenceState from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background @@ -162,6 +206,7 @@ class BasePresenceHandler(abc.ABC): self.VALID_PRESENCE += (PresenceState.BUSY,) active_presence = self.store.take_presence_startup_info() + # The combined status across all user devices. self.user_to_current_state = {state.user_id: state for state in active_presence} @abc.abstractmethod @@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler): lambda: len(self.user_to_current_state), ) + # The per-device presence state, maps user to devices to per-device presence state. + self._user_to_device_to_current_state: Dict[ + str, Dict[Optional[str], UserDevicePresenceState] + ] = {} + now = self.clock.time_msec() if self._presence_enabled: for state in self.user_to_current_state.values(): + # Create a psuedo-device to properly handle time outs. This will + # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT. + pseudo_device_id = None + self._user_to_device_to_current_state[state.user_id] = { + pseudo_device_id: UserDevicePresenceState( + user_id=state.user_id, + device_id=pseudo_device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) + } + self.wheel_timer.insert( now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER ) @@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler): # Keeps track of the number of *ongoing* syncs on other processes. # - # While any sync is ongoing on another process the user will never + # While any sync is ongoing on another process the user's device will never # go offline. # # Each process has a unique identifier and an update frequency. If @@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler): timers_fired_counter.inc(len(states)) - syncing_user_ids = { - user_id - for (user_id, _), count in self._user_device_to_num_current_syncs.items() + # Set of user ID & device IDs which are currently syncing. + syncing_user_devices = { + user_id_device_id + for user_id_device_id, count in self._user_device_to_num_current_syncs.items() if count } - syncing_user_ids.update( - user_id - for user_id, _ in itertools.chain( - *self.external_process_to_current_syncs.values() - ) + syncing_user_devices.update( + itertools.chain(*self.external_process_to_current_syncs.values()) ) changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - syncing_user_ids=syncing_user_ids, + syncing_user_devices=syncing_user_devices, + user_to_devices=self._user_to_device_to_current_state, now=now, ) @@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler): bump_active_time_counter.inc() - prev_state = await self.current_state_for_user(user_id) + now = self.clock.time_msec() + + # Update the device information & mark the device as online if it was + # unavailable. + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, + UserDevicePresenceState.default(user_id, device_id), + ) + device_state.last_active_ts = now + if device_state.state == PresenceState.UNAVAILABLE: + device_state.state = PresenceState.ONLINE - new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()} - if prev_state.state == PresenceState.UNAVAILABLE: - new_fields["state"] = PresenceState.ONLINE + # Update the user state, this will always update last_active_ts and + # might update the presence state. + prev_state = await self.current_state_for_user(user_id) + new_fields: Dict[str, Any] = { + "last_active_ts": now, + "state": _combine_device_states(devices.values()), + } await self._update_states([prev_state.copy_and_replace(**new_fields)]) @@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler): if is_syncing and (user_id, device_id) not in process_presence: process_presence.add((user_id, device_id)) elif not is_syncing and (user_id, device_id) in process_presence: + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + device_state.last_sync_ts = sync_time_msec + new_state = prev_state.copy_and_replace( last_user_sync_ts=sync_time_msec ) @@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler): process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) - prev_states = await self.current_state_for_users( - {user_id for user_id, device_id in process_presence} - ) + time_now_ms = self.clock.time_msec() + # Mark each device as having a last sync time. + updated_users = set() + for user_id, device_id in process_presence: + device_state = self._user_to_device_to_current_state.setdefault( + user_id, {} + ).setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + + device_state.last_sync_ts = time_now_ms + updated_users.add(user_id) + + # Update each user (and insert into the appropriate timers to check if + # they've gone offline). + prev_states = await self.current_state_for_users(updated_users) await self._update_states( [ prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) @@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler): if prev_state.state == PresenceState.BUSY and is_sync: presence = PresenceState.BUSY + # Update the device specific information. + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, + UserDevicePresenceState.default(user_id, device_id), + ) + device_state.state = presence + device_state.last_active_ts = now + if is_sync: + device_state.last_sync_ts = now + + # Based on the state of each user's device calculate the new presence state. + presence = _combine_device_states(devices.values()) + new_fields = {"state": presence} if presence == PresenceState.ONLINE or presence == PresenceState.BUSY: @@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): def handle_timeouts( user_states: List[UserPresenceState], is_mine_fn: Callable[[str], bool], - syncing_user_ids: Set[str], + syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]], + user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]], now: int, ) -> List[UserPresenceState]: """Checks the presence of users that have timed out and updates as @@ -1882,7 +1993,8 @@ def handle_timeouts( Args: user_states: List of UserPresenceState's to check. is_mine_fn: Function that returns if a user_id is ours - syncing_user_ids: Set of user_ids with active syncs. + syncing_user_devices: A set of (user ID, device ID) tuples with active syncs.. + user_to_devices: A map of user ID to device ID to UserDevicePresenceState. now: Current time in ms. Returns: @@ -1891,9 +2003,16 @@ def handle_timeouts( changes = {} # Actual changes we need to notify people about for state in user_states: - is_mine = is_mine_fn(state.user_id) - - new_state = handle_timeout(state, is_mine, syncing_user_ids, now) + user_id = state.user_id + is_mine = is_mine_fn(user_id) + + new_state = handle_timeout( + state, + is_mine, + syncing_user_devices, + user_to_devices.get(user_id, {}), + now, + ) if new_state: changes[state.user_id] = new_state @@ -1901,14 +2020,19 @@ def handle_timeouts( def handle_timeout( - state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int + state: UserPresenceState, + is_mine: bool, + syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]], + user_devices: Dict[Optional[str], UserDevicePresenceState], + now: int, ) -> Optional[UserPresenceState]: """Checks the presence of the user to see if any of the timers have elapsed Args: - state + state: UserPresenceState to check. is_mine: Whether the user is ours - syncing_user_ids: Set of user_ids with active syncs. + syncing_user_devices: A set of (user ID, device ID) tuples with active syncs.. + user_devices: A map of device ID to UserDevicePresenceState. now: Current time in ms. Returns: @@ -1919,34 +2043,55 @@ def handle_timeout( return None changed = False - user_id = state.user_id if is_mine: - if state.state == PresenceState.ONLINE: - if now - state.last_active_ts > IDLE_TIMER: - # Currently online, but last activity ages ago so auto - # idle - state = state.copy_and_replace(state=PresenceState.UNAVAILABLE) - changed = True - elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: - # So that we send down a notification that we've - # stopped updating. + # Check per-device whether the device should be considered idle or offline + # due to timeouts. + device_changed = False + offline_devices = [] + for device_id, device_state in user_devices.items(): + if device_state.state == PresenceState.ONLINE: + if now - device_state.last_active_ts > IDLE_TIMER: + # Currently online, but last activity ages ago so auto + # idle + device_state.state = PresenceState.UNAVAILABLE + device_changed = True + + # If there are have been no sync for a while (and none ongoing), + # set presence to offline. + if (state.user_id, device_id) not in syncing_device_ids: + # If the user has done something recently but hasn't synced, + # don't set them as offline. + sync_or_active = max( + device_state.last_sync_ts, device_state.last_active_ts + ) + + if now - sync_or_active > SYNC_ONLINE_TIMEOUT: + # Mark the device as going offline. + offline_devices.append(device_id) + device_changed = True + + # Offline devices are not needed and do not add information. + for device_id in offline_devices: + user_devices.pop(device_id) + + # If the presence state of the devices changed, then (maybe) update + # the user's overall presence state. + if device_changed: + new_presence = _combine_device_states(user_devices.values()) + if new_presence != state.state: + state = state.copy_and_replace(state=new_presence) changed = True + if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: + # So that we send down a notification that we've + # stopped updating. + changed = True + if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: # Need to send ping to other servers to ensure they don't # timeout and set us to offline changed = True - - # If there are have been no sync for a while (and none ongoing), - # set presence to offline - if user_id not in syncing_user_ids: - # If the user has done something recently but hasn't synced, - # don't set them as offline. - sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) - if now - sync_or_active > SYNC_ONLINE_TIMEOUT: - state = state.copy_and_replace(state=PresenceState.OFFLINE) - changed = True else: # We expect to be poked occasionally by the other side. # This is to protect against forgetful/buggy servers, so that @@ -2036,6 +2181,46 @@ def handle_update( return new_state, persist_and_notify, federation_ping +PRESENCE_BY_PRIORITY = { + PresenceState.BUSY: 4, + PresenceState.ONLINE: 3, + PresenceState.UNAVAILABLE: 2, + PresenceState.OFFLINE: 1, +} + + +def _combine_device_states( + device_states: Iterable[UserDevicePresenceState], +) -> str: + """ + Find the device to use presence information from. + + Orders devices by priority, then last_active_ts. + + Args: + device_states: An iterable of device presence states + + Return: + The combined presence state. + """ + + # Based on (all) the user's devices calculate the new presence state. + presence = PresenceState.OFFLINE + last_active_ts = -1 + + # Find the device to use the presence state of based on the presence priority, + # but tie-break with how recently the device has been seen. + for device_state in device_states: + if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > ( + PRESENCE_BY_PRIORITY[presence], + last_active_ts, + ): + presence = device_state.state + last_active_ts = device_state.last_active_ts + + return presence + + async def get_interested_parties( store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState] ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]: -- cgit 1.5.1 From 8b5013dcbc5db16f0f771898da493e812be6fc8a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 5 Sep 2023 10:39:38 -0400 Subject: Time out busy presence status & test multi-device busy (#16174) Add a (long) timeout to when a "busy" device is considered not online. This does *not* match MSC3026, but is a reasonable thing for an implementation to do. Expands tests for the (unstable) busy presence with multiple devices. --- changelog.d/16174.bugfix | 1 + synapse/handlers/presence.py | 19 +++++++- tests/handlers/test_presence.py | 104 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 changelog.d/16174.bugfix (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/16174.bugfix b/changelog.d/16174.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16174.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 80190838b7..a4b05b72e7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -155,6 +155,8 @@ LAST_ACTIVE_GRANULARITY = 60 * 1000 # How long to wait until a new /events or /sync request before assuming # the client has gone. SYNC_ONLINE_TIMEOUT = 30 * 1000 +# Busy status waits longer, but does eventually go offline. +BUSY_ONLINE_TIMEOUT = 60 * 60 * 1000 # How long to wait before marking the user as idle. Compared against last active IDLE_TIMER = 5 * 60 * 1000 @@ -2066,7 +2068,15 @@ def handle_timeout( device_state.last_sync_ts, device_state.last_active_ts ) - if now - sync_or_active > SYNC_ONLINE_TIMEOUT: + # Implementations aren't meant to timeout a device with a busy + # state, but it needs to timeout *eventually* or else the user + # will be stuck in that state. + online_timeout = ( + BUSY_ONLINE_TIMEOUT + if device_state.state == PresenceState.BUSY + else SYNC_ONLINE_TIMEOUT + ) + if now - sync_or_active > online_timeout: # Mark the device as going offline. offline_devices.append(device_id) device_changed = True @@ -2166,6 +2176,13 @@ def handle_update( new_state = new_state.copy_and_replace(last_federation_update_ts=now) federation_ping = True + if new_state.state == PresenceState.BUSY: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + BUSY_ONLINE_TIMEOUT, + ) + else: wheel_timer.insert( now=now, diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 914415740a..638787b029 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -26,6 +26,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events.builder import EventBuilder from synapse.federation.sender import FederationSender from synapse.handlers.presence import ( + BUSY_ONLINE_TIMEOUT, EXTERNAL_PROCESS_EXPIRY, FEDERATION_PING_INTERVAL, FEDERATION_TIMEOUT, @@ -912,6 +913,13 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): for cases in [ # If both devices have the same state, online should eventually idle. # Otherwise, the state doesn't change. + ( + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.ONLINE, @@ -933,7 +941,29 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.OFFLINE, PresenceState.OFFLINE, ), - # If the second device has a "lower" state it should fallback to it. + # If the second device has a "lower" state it should fallback to it, + # except for "busy" which overrides. + ( + PresenceState.BUSY, + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.UNAVAILABLE, @@ -956,6 +986,27 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.UNAVAILABLE, ), # If the second device has a "higher" state it should override. + ( + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.UNAVAILABLE, PresenceState.ONLINE, @@ -1114,6 +1165,12 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): for workers in (False, True) for cases in [ # If both devices have the same state, nothing exciting should happen. + ( + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.ONLINE, @@ -1132,7 +1189,26 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.OFFLINE, PresenceState.OFFLINE, ), - # If the second device has a "lower" state it should fallback to it. + # If the second device has a "lower" state it should fallback to it, + # except for "busy" which overrides. + ( + PresenceState.BUSY, + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.UNAVAILABLE, @@ -1152,6 +1228,24 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.OFFLINE, ), # If the second device has a "higher" state it should override. + ( + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.UNAVAILABLE, PresenceState.ONLINE, @@ -1266,7 +1360,11 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # 8. Advance such that the second device should be discarded (the sync timeout), # then pump so _handle_timeouts function to called. - self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000) + if dev_1_state == PresenceState.BUSY or dev_2_state == PresenceState.BUSY: + timeout = BUSY_ONLINE_TIMEOUT + else: + timeout = SYNC_ONLINE_TIMEOUT + self.reactor.advance(timeout / 1000) self.reactor.pump([5]) # 9. There are no more devices, should be offline. -- cgit 1.5.1 From 4f1840a88ad3a93244fc23149c56245704eab824 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 6 Sep 2023 09:30:53 +0200 Subject: Delete device messages asynchronously and in staged batches (#16240) --- changelog.d/16240.misc | 1 + synapse/handlers/device.py | 48 ++++++++++++++++++++++ synapse/handlers/presence.py | 4 +- synapse/handlers/sync.py | 16 ++++++-- synapse/storage/databases/main/deviceinbox.py | 26 +++++++++--- synapse/storage/databases/main/devices.py | 8 ---- synapse/storage/databases/main/receipts.py | 6 +-- synapse/storage/engines/_base.py | 6 +++ synapse/storage/engines/postgres.py | 4 ++ synapse/storage/engines/sqlite.py | 4 ++ .../schema/main/delta/48/group_unique_indexes.py | 4 +- synapse/util/task_scheduler.py | 17 ++++---- tests/handlers/test_device.py | 47 +++++++++++++++++++++ 13 files changed, 154 insertions(+), 37 deletions(-) create mode 100644 changelog.d/16240.misc (limited to 'synapse/handlers/presence.py') diff --git a/changelog.d/16240.misc b/changelog.d/16240.misc new file mode 100644 index 0000000000..4f266c1fb0 --- /dev/null +++ b/changelog.d/16240.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 763f56dfc1..9e52af5f13 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.types import ( JsonDict, + JsonMapping, + ScheduledTask, StrCollection, StreamKeyType, StreamToken, + TaskStatus, UserID, get_domain_from_id, get_verify_key_from_cross_signing_key, @@ -62,6 +65,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages" MAX_DEVICE_DISPLAY_NAME_LEN = 100 DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 @@ -78,6 +82,7 @@ class DeviceWorkerHandler: self._appservice_handler = hs.get_application_service_handler() self._state_storage = hs.get_storage_controllers().state self._auth_handler = hs.get_auth_handler() + self._event_sources = hs.get_event_sources() self.server_name = hs.hostname self._msc3852_enabled = hs.config.experimental.msc3852_enabled self._query_appservices_for_keys = ( @@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler): self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() self.db_pool = hs.get_datastores().main.db_pool + self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListUpdater(hs, self) @@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler): self._delete_stale_devices, ) + self._task_scheduler.register_action( + self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME + ) + def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler): user_id: The user to delete devices from. device_ids: The list of device IDs to delete """ + to_device_stream_id = self._event_sources.get_current_token().to_device_key try: await self.store.delete_devices(user_id, device_ids) @@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler): f"org.matrix.msc3890.local_notification_settings.{device_id}", ) + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=device_id, + params={ + "user_id": user_id, + "device_id": device_id, + "up_to_stream_id": to_device_stream_id, + }, + ) + # Pushers are deleted after `delete_access_tokens_for_user` is called so that # modules using `on_logged_out` hook can use them if needed. await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) await self.notify_device_update(user_id, device_ids) + DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + + async def _delete_device_messages( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" + assert task.params is not None + user_id = task.params["user_id"] + device_id = task.params["device_id"] + up_to_stream_id = task.params["up_to_stream_id"] + + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) + + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + else: + # There is probably still device messages to be deleted, let's keep the task active and it will be run + # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). + return TaskStatus.ACTIVE, None, None + async def update_device(self, user_id: str, device_id: str, content: dict) -> None: """Update the given device diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a4b05b72e7..375c7d0901 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -183,6 +183,7 @@ class BasePresenceHandler(abc.ABC): writer""" def __init__(self, hs: "HomeServer"): + self.hs = hs self.clock = hs.get_clock() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() @@ -473,8 +474,6 @@ class _NullContextManager(ContextManager[None]): class WorkerPresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs - self._presence_writer_instance = hs.config.worker.writers.presence[0] # Route presence EDUs to the right worker @@ -738,7 +737,6 @@ class WorkerPresenceHandler(BasePresenceHandler): class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs self.wheel_timer: WheelTimer[str] = WheelTimer() self.notifier = hs.get_notifier() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 60a9f341b5..0ccd7d250c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase +from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME from synapse.handlers.relations import BundledAggregations from synapse.logging import issue9533_logger from synapse.logging.context import current_context @@ -268,6 +269,7 @@ class SyncHandler: self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state self._device_handler = hs.get_device_handler() + self._task_scheduler = hs.get_task_scheduler() self.should_calculate_push_rules = hs.config.push.enable_push @@ -360,11 +362,19 @@ class SyncHandler: # (since we now know that the device has received them) if since_token is not None: since_stream_id = since_token.to_device_key - deleted = await self.store.delete_messages_for_device( - sync_config.user.to_string(), sync_config.device_id, since_stream_id + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=sync_config.device_id, + params={ + "user_id": sync_config.user.to_string(), + "device_id": sync_config.device_id, + "up_to_stream_id": since_stream_id, + }, ) logger.debug( - "Deleted %d to-device messages up to %d", deleted, since_stream_id + "Deletion of to-device messages up to %d scheduled", + since_stream_id, ) if timeout == 0 or since_token is None or full_state: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 271cdf923c..744e98c6d0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore): @trace async def delete_messages_for_device( - self, user_id: str, device_id: Optional[str], up_to_stream_id: int + self, + user_id: str, + device_id: Optional[str], + up_to_stream_id: int, + limit: int, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. + limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 + ROW_ID_NAME = self.database_engine.row_id_name + def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - sql = ( - "DELETE FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" - ) + sql = f""" + DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( + SELECT {ROW_ID_NAME} FROM device_inbox + WHERE user_id = ? AND device_id = ? AND stream_id <= ? + LIMIT {limit} + ) + """ txn.execute(sql, (user_id, device_id, up_to_stream_id)) return txn.rowcount @@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": f"deleted {count} messages for device", "count": count}) + # In this case we don't know if we hit the limit or the delete is complete + # so let's not update the cache. + if count == limit: + return count + # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index fa69a4a298..7208fc8b33 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1766,14 +1766,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): keyvalues={"user_id": user_id, "hidden": False}, ) - self.db_pool.simple_delete_many_txn( - txn, - table="device_inbox", - column="device_id", - values=device_ids, - keyvalues={"user_id": user_id}, - ) - self.db_pool.simple_delete_many_txn( txn, table="device_auth_providers", diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 5ee5c7ad9f..e4d10ff250 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: - if isinstance(self.database_engine, PostgresEngine): - ROW_ID_NAME = "ctid" - else: - ROW_ID_NAME = "rowid" - + ROW_ID_NAME = self.database_engine.row_id_name # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # The following query takes less than a minute on matrix.org. diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 0b5b3bf03e..b1a2418cbd 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM """Gets a string giving the server version. For example: '3.22.0'""" ... + @property + @abc.abstractmethod + def row_id_name(self) -> str: + """Gets the literal name representing a row id for this engine.""" + ... + @abc.abstractmethod def in_transaction(self, conn: ConnectionType) -> bool: """Whether the connection is currently in a transaction.""" diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 05a72dc554..6309363217 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -211,6 +211,10 @@ class PostgresEngine( else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + @property + def row_id_name(self) -> str: + return "ctid" + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: return conn.status != psycopg2.extensions.STATUS_READY diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ca8c59297c..802069e1e1 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]): """Gets a string giving the server version. For example: '3.22.0'.""" return "%i.%i.%i" % sqlite3.sqlite_version_info + @property + def row_id_name(self) -> str: + return "rowid" + def in_transaction(self, conn: sqlite3.Connection) -> bool: return conn.in_transaction diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py index ad2da4c8af..622686d28f 100644 --- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py @@ -14,7 +14,7 @@ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements FIX_INDEXES = """ @@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" + rowid = database_engine.row_id_name # remove duplicates from group_users & group_invites tables cur.execute( diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 9e89aeb748..9b2581e51a 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -77,6 +77,7 @@ class TaskScheduler: LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs def __init__(self, hs: "HomeServer"): + self._hs = hs self._store = hs.get_datastores().main self._clock = hs.get_clock() self._running_tasks: Set[str] = set() @@ -97,8 +98,6 @@ class TaskScheduler: "handle_scheduled_tasks", self._handle_scheduled_tasks, ) - else: - self.replication_client = hs.get_replication_command_handler() def register_action( self, @@ -133,7 +132,7 @@ class TaskScheduler: params: Optional[JsonMapping] = None, ) -> str: """Schedule a new potentially resumable task. A function matching the specified - `action` should have been previously registered with `register_action`. + `action` should have be registered with `register_action` before the task is run. Args: action: the name of a previously registered action @@ -149,11 +148,6 @@ class TaskScheduler: Returns: The id of the scheduled task """ - if action not in self._actions: - raise Exception( - f"No function associated with action {action} of the scheduled task" - ) - status = TaskStatus.SCHEDULED if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() @@ -175,7 +169,7 @@ class TaskScheduler: if self._run_background_tasks: await self._launch_task(task) else: - self.replication_client.send_new_active_task(task.id) + self._hs.get_replication_command_handler().send_new_active_task(task.id) return task.id @@ -315,7 +309,10 @@ class TaskScheduler: """ assert self._run_background_tasks - assert task.action in self._actions + if task.action not in self._actions: + raise Exception( + f"No function associated with action {task.action} of the scheduled task {task.id}" + ) function = self._actions[task.action] async def wrapper() -> None: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 55a4f95ef3..9659a4a355 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -30,6 +30,7 @@ from synapse.server import HomeServer from synapse.storage.databases.main.appservice import _make_exclusive_regex from synapse.types import JsonDict, create_requester from synapse.util import Clock +from synapse.util.task_scheduler import TaskScheduler from tests import unittest from tests.unittest import override_config @@ -49,6 +50,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): assert isinstance(handler, DeviceHandler) self.handler = handler self.store = hs.get_datastores().main + self.device_message_handler = hs.get_device_message_handler() return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -211,6 +213,51 @@ class DeviceTestCase(unittest.HomeserverTestCase): ) self.assertIsNone(res) + def test_delete_device_and_big_device_inbox(self) -> None: + """Check that deleting a big device inbox is staged and batched asynchronously.""" + DEVICE_ID = "abc" + sender = "@sender:" + self.hs.hostname + receiver = "@receiver:" + self.hs.hostname + self._record_user(sender, DEVICE_ID, DEVICE_ID) + self._record_user(receiver, DEVICE_ID, DEVICE_ID) + + # queue a bunch of messages in the inbox + requester = create_requester(sender, device_id=DEVICE_ID) + for i in range(0, DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10): + self.get_success( + self.device_message_handler.send_device_message( + requester, "message_type", {receiver: {"*": {"val": i}}} + ) + ) + + # delete the device + self.get_success(self.handler.delete_devices(receiver, [DEVICE_ID])) + + # messages should be deleted up to DEVICE_MSGS_DELETE_BATCH_LIMIT straight away + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(10, len(res)) + + # wait for the task scheduler to do a second delete pass + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + + # remaining messages should now be deleted + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(0, len(res)) + def test_update_device(self) -> None: self._record_users() -- cgit 1.5.1