From 757010905ea85333672289a0ac124d41bd923bb3 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Tue, 5 Sep 2023 11:14:14 +0000 Subject: Bump twisted from 22.10.0 to 23.8.0 (#16235) * Bump twisted from 22.10.0 to 23.8.0 Bumps [twisted](https://github.com/twisted/twisted) from 22.10.0 to 23.8.0. - [Release notes](https://github.com/twisted/twisted/releases) - [Changelog](https://github.com/twisted/twisted/blob/trunk/NEWS.rst) - [Commits](https://github.com/twisted/twisted/compare/twisted-22.10.0...twisted-23.8.0) --- updated-dependencies: - dependency-name: twisted dependency-type: direct:production update-type: version-update:semver-major ... Signed-off-by: dependabot[bot] * Fix types * Fix lint * Newsfile --------- Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: Erik Johnston --- synapse/handlers/initial_sync.py | 8 ++------ synapse/logging/context.py | 4 ++-- synapse/util/gai_resolver.py | 2 +- 3 files changed, 5 insertions(+), 9 deletions(-) (limited to 'synapse') diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index b3be7a86f0..5dc76ef588 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -13,7 +13,7 @@ # limitations under the License. import logging -from typing import TYPE_CHECKING, List, Optional, Tuple, cast +from typing import TYPE_CHECKING, List, Optional, Tuple from synapse.api.constants import ( AccountDataTypes, @@ -23,7 +23,6 @@ from synapse.api.constants import ( Membership, ) from synapse.api.errors import SynapseError -from synapse.events import EventBase from synapse.events.utils import SerializeEventConfig from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state @@ -35,7 +34,6 @@ from synapse.types import ( JsonDict, Requester, RoomStreamToken, - StateMap, StreamKeyType, StreamToken, UserID, @@ -199,9 +197,7 @@ class InitialSyncHandler: deferred_room_state = run_in_background( self._state_storage_controller.get_state_for_events, [event.event_id], - ).addCallback( - lambda states: cast(StateMap[EventBase], states[event.event_id]) - ) + ).addCallback(lambda states: states[event.event_id]) (messages, token), current_state = await make_deferred_yieldable( gather_results( diff --git a/synapse/logging/context.py b/synapse/logging/context.py index 64c6ae4512..bf7e311026 100644 --- a/synapse/logging/context.py +++ b/synapse/logging/context.py @@ -728,7 +728,7 @@ async def _unwrap_awaitable(awaitable: Awaitable[R]) -> R: @overload -def preserve_fn( # type: ignore[misc] +def preserve_fn( f: Callable[P, Awaitable[R]], ) -> Callable[P, "defer.Deferred[R]"]: # The `type: ignore[misc]` above suppresses @@ -756,7 +756,7 @@ def preserve_fn( @overload -def run_in_background( # type: ignore[misc] +def run_in_background( f: Callable[P, Awaitable[R]], *args: P.args, **kwargs: P.kwargs ) -> "defer.Deferred[R]": # The `type: ignore[misc]` above suppresses diff --git a/synapse/util/gai_resolver.py b/synapse/util/gai_resolver.py index 214eb17fbc..fecf829ade 100644 --- a/synapse/util/gai_resolver.py +++ b/synapse/util/gai_resolver.py @@ -136,7 +136,7 @@ class GAIResolver: # The types on IHostnameResolver is incorrect in Twisted, see # https://twistedmatrix.com/trac/ticket/10276 - def resolveHostName( # type: ignore[override] + def resolveHostName( self, resolutionReceiver: IResolutionReceiver, hostName: str, -- cgit 1.5.1 From ea75346f6af8c182a42d1ca29119a10361693a7b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 5 Sep 2023 09:58:51 -0400 Subject: Track presence state per-device and combine to a user state. (#16066) Tracks presence on an individual per-device basis and combine the per-device state into a per-user state. This should help in situations where a user has multiple devices with conflicting status (e.g. one is syncing with unavailable and one is syncing with online). The tie-breaking is done by priority: BUSY > ONLINE > UNAVAILABLE > OFFLINE --- changelog.d/16066.bugfix | 1 + changelog.d/16170.bugfix | 1 + changelog.d/16170.misc | 1 - changelog.d/16171.bugfix | 1 + changelog.d/16171.misc | 1 - changelog.d/16172.bugfix | 1 + changelog.d/16172.misc | 1 - synapse/api/presence.py | 43 +++- synapse/handlers/presence.py | 279 ++++++++++++++++++---- tests/handlers/test_presence.py | 500 +++++++++++++++++++++++++++++++++++++++- 10 files changed, 765 insertions(+), 64 deletions(-) create mode 100644 changelog.d/16066.bugfix create mode 100644 changelog.d/16170.bugfix delete mode 100644 changelog.d/16170.misc create mode 100644 changelog.d/16171.bugfix delete mode 100644 changelog.d/16171.misc create mode 100644 changelog.d/16172.bugfix delete mode 100644 changelog.d/16172.misc (limited to 'synapse') diff --git a/changelog.d/16066.bugfix b/changelog.d/16066.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16066.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16170.bugfix b/changelog.d/16170.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16170.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16170.misc b/changelog.d/16170.misc deleted file mode 100644 index c950b54367..0000000000 --- a/changelog.d/16170.misc +++ /dev/null @@ -1 +0,0 @@ -Simplify presence code when using workers. diff --git a/changelog.d/16171.bugfix b/changelog.d/16171.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16171.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16171.misc b/changelog.d/16171.misc deleted file mode 100644 index 4d709cb56e..0000000000 --- a/changelog.d/16171.misc +++ /dev/null @@ -1 +0,0 @@ -Track per-device information in the presence code. diff --git a/changelog.d/16172.bugfix b/changelog.d/16172.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16172.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/changelog.d/16172.misc b/changelog.d/16172.misc deleted file mode 100644 index 4d709cb56e..0000000000 --- a/changelog.d/16172.misc +++ /dev/null @@ -1 +0,0 @@ -Track per-device information in the presence code. diff --git a/synapse/api/presence.py b/synapse/api/presence.py index b80aa83cb3..b78f419994 100644 --- a/synapse/api/presence.py +++ b/synapse/api/presence.py @@ -20,18 +20,53 @@ from synapse.api.constants import PresenceState from synapse.types import JsonDict +@attr.s(slots=True, auto_attribs=True) +class UserDevicePresenceState: + """ + Represents the current presence state of a user's device. + + user_id: The user ID. + device_id: The user's device ID. + state: The presence state, see PresenceState. + last_active_ts: Time in msec that the device last interacted with server. + last_sync_ts: Time in msec that the device last *completed* a sync + (or event stream). + """ + + user_id: str + device_id: Optional[str] + state: str + last_active_ts: int + last_sync_ts: int + + @classmethod + def default( + cls, user_id: str, device_id: Optional[str] + ) -> "UserDevicePresenceState": + """Returns a default presence state.""" + return cls( + user_id=user_id, + device_id=device_id, + state=PresenceState.OFFLINE, + last_active_ts=0, + last_sync_ts=0, + ) + + @attr.s(slots=True, frozen=True, auto_attribs=True) class UserPresenceState: """Represents the current presence state of the user. - user_id - last_active: Time in msec that the user last interacted with server. - last_federation_update: Time in msec since either a) we sent a presence + user_id: The user ID. + state: The presence state, see PresenceState. + last_active_ts: Time in msec that the user last interacted with server. + last_federation_update_ts: Time in msec since either a) we sent a presence update to other servers or b) we received a presence update, depending on if is a local user or not. - last_user_sync: Time in msec that the user last *completed* a sync + last_user_sync_ts: Time in msec that the user last *completed* a sync (or event stream). status_msg: User set status message. + currently_active: True if the user is currently syncing. """ user_id: str diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index f31e18328b..80190838b7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -13,13 +13,56 @@ # See the License for the specific language governing permissions and # limitations under the License. -"""This module is responsible for keeping track of presence status of local +""" +This module is responsible for keeping track of presence status of local and remote users. The methods that define policy are: - PresenceHandler._update_states - PresenceHandler._handle_timeouts - should_notify + +# Tracking local presence + +For local users, presence is tracked on a per-device basis. When a user has multiple +devices the user presence state is derived by coalescing the presence from each +device: + + BUSY > ONLINE > UNAVAILABLE > OFFLINE + +The time that each device was last active and last synced is tracked in order to +automatically downgrade a device's presence state: + + A device may move from ONLINE -> UNAVAILABLE, if it has not been active for + a period of time. + + A device may go from any state -> OFFLINE, if it is not active and has not + synced for a period of time. + +The timeouts are handled using a wheel timer, which has coarse buckets. Timings +do not need to be exact. + +Generally a device's presence state is updated whenever a user syncs (via the +set_presence parameter), when the presence API is called, or if "pro-active" +events occur, including: + +* Sending an event, receipt, read marker. +* Updating typing status. + +The busy state has special status that it cannot is not downgraded by a call to +sync with a lower priority state *and* it takes a long period of time to transition +to offline. + +# Persisting (and restoring) presence + +For all users, presence is persisted on a per-user basis. Data is kept in-memory +and persisted periodically. When Synapse starts each worker loads the current +presence state and then tracks the presence stream to keep itself up-to-date. + +When restoring presence for local users a pseudo-device is created to match the +user state; this device follows the normal timeout logic (see above) and will +automatically be replaced with any information from currently available devices. + """ import abc import contextlib @@ -30,6 +73,7 @@ from contextlib import contextmanager from types import TracebackType from typing import ( TYPE_CHECKING, + AbstractSet, Any, Callable, Collection, @@ -49,7 +93,7 @@ from prometheus_client import Counter import synapse.metrics from synapse.api.constants import EduTypes, EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError -from synapse.api.presence import UserPresenceState +from synapse.api.presence import UserDevicePresenceState, UserPresenceState from synapse.appservice import ApplicationService from synapse.events.presence_router import PresenceRouter from synapse.logging.context import run_in_background @@ -162,6 +206,7 @@ class BasePresenceHandler(abc.ABC): self.VALID_PRESENCE += (PresenceState.BUSY,) active_presence = self.store.take_presence_startup_info() + # The combined status across all user devices. self.user_to_current_state = {state.user_id: state for state in active_presence} @abc.abstractmethod @@ -708,9 +753,27 @@ class PresenceHandler(BasePresenceHandler): lambda: len(self.user_to_current_state), ) + # The per-device presence state, maps user to devices to per-device presence state. + self._user_to_device_to_current_state: Dict[ + str, Dict[Optional[str], UserDevicePresenceState] + ] = {} + now = self.clock.time_msec() if self._presence_enabled: for state in self.user_to_current_state.values(): + # Create a psuedo-device to properly handle time outs. This will + # be overridden by any "real" devices within SYNC_ONLINE_TIMEOUT. + pseudo_device_id = None + self._user_to_device_to_current_state[state.user_id] = { + pseudo_device_id: UserDevicePresenceState( + user_id=state.user_id, + device_id=pseudo_device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) + } + self.wheel_timer.insert( now=now, obj=state.user_id, then=state.last_active_ts + IDLE_TIMER ) @@ -752,7 +815,7 @@ class PresenceHandler(BasePresenceHandler): # Keeps track of the number of *ongoing* syncs on other processes. # - # While any sync is ongoing on another process the user will never + # While any sync is ongoing on another process the user's device will never # go offline. # # Each process has a unique identifier and an update frequency. If @@ -981,22 +1044,21 @@ class PresenceHandler(BasePresenceHandler): timers_fired_counter.inc(len(states)) - syncing_user_ids = { - user_id - for (user_id, _), count in self._user_device_to_num_current_syncs.items() + # Set of user ID & device IDs which are currently syncing. + syncing_user_devices = { + user_id_device_id + for user_id_device_id, count in self._user_device_to_num_current_syncs.items() if count } - syncing_user_ids.update( - user_id - for user_id, _ in itertools.chain( - *self.external_process_to_current_syncs.values() - ) + syncing_user_devices.update( + itertools.chain(*self.external_process_to_current_syncs.values()) ) changes = handle_timeouts( states, is_mine_fn=self.is_mine_id, - syncing_user_ids=syncing_user_ids, + syncing_user_devices=syncing_user_devices, + user_to_devices=self._user_to_device_to_current_state, now=now, ) @@ -1016,11 +1078,26 @@ class PresenceHandler(BasePresenceHandler): bump_active_time_counter.inc() - prev_state = await self.current_state_for_user(user_id) + now = self.clock.time_msec() + + # Update the device information & mark the device as online if it was + # unavailable. + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, + UserDevicePresenceState.default(user_id, device_id), + ) + device_state.last_active_ts = now + if device_state.state == PresenceState.UNAVAILABLE: + device_state.state = PresenceState.ONLINE - new_fields: Dict[str, Any] = {"last_active_ts": self.clock.time_msec()} - if prev_state.state == PresenceState.UNAVAILABLE: - new_fields["state"] = PresenceState.ONLINE + # Update the user state, this will always update last_active_ts and + # might update the presence state. + prev_state = await self.current_state_for_user(user_id) + new_fields: Dict[str, Any] = { + "last_active_ts": now, + "state": _combine_device_states(devices.values()), + } await self._update_states([prev_state.copy_and_replace(**new_fields)]) @@ -1132,6 +1209,12 @@ class PresenceHandler(BasePresenceHandler): if is_syncing and (user_id, device_id) not in process_presence: process_presence.add((user_id, device_id)) elif not is_syncing and (user_id, device_id) in process_presence: + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + device_state.last_sync_ts = sync_time_msec + new_state = prev_state.copy_and_replace( last_user_sync_ts=sync_time_msec ) @@ -1151,11 +1234,24 @@ class PresenceHandler(BasePresenceHandler): process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) - prev_states = await self.current_state_for_users( - {user_id for user_id, device_id in process_presence} - ) + time_now_ms = self.clock.time_msec() + # Mark each device as having a last sync time. + updated_users = set() + for user_id, device_id in process_presence: + device_state = self._user_to_device_to_current_state.setdefault( + user_id, {} + ).setdefault( + device_id, UserDevicePresenceState.default(user_id, device_id) + ) + + device_state.last_sync_ts = time_now_ms + updated_users.add(user_id) + + # Update each user (and insert into the appropriate timers to check if + # they've gone offline). + prev_states = await self.current_state_for_users(updated_users) await self._update_states( [ prev_state.copy_and_replace(last_user_sync_ts=time_now_ms) @@ -1277,6 +1373,20 @@ class PresenceHandler(BasePresenceHandler): if prev_state.state == PresenceState.BUSY and is_sync: presence = PresenceState.BUSY + # Update the device specific information. + devices = self._user_to_device_to_current_state.setdefault(user_id, {}) + device_state = devices.setdefault( + device_id, + UserDevicePresenceState.default(user_id, device_id), + ) + device_state.state = presence + device_state.last_active_ts = now + if is_sync: + device_state.last_sync_ts = now + + # Based on the state of each user's device calculate the new presence state. + presence = _combine_device_states(devices.values()) + new_fields = {"state": presence} if presence == PresenceState.ONLINE or presence == PresenceState.BUSY: @@ -1873,7 +1983,8 @@ class PresenceEventSource(EventSource[int, UserPresenceState]): def handle_timeouts( user_states: List[UserPresenceState], is_mine_fn: Callable[[str], bool], - syncing_user_ids: Set[str], + syncing_user_devices: AbstractSet[Tuple[str, Optional[str]]], + user_to_devices: Dict[str, Dict[Optional[str], UserDevicePresenceState]], now: int, ) -> List[UserPresenceState]: """Checks the presence of users that have timed out and updates as @@ -1882,7 +1993,8 @@ def handle_timeouts( Args: user_states: List of UserPresenceState's to check. is_mine_fn: Function that returns if a user_id is ours - syncing_user_ids: Set of user_ids with active syncs. + syncing_user_devices: A set of (user ID, device ID) tuples with active syncs.. + user_to_devices: A map of user ID to device ID to UserDevicePresenceState. now: Current time in ms. Returns: @@ -1891,9 +2003,16 @@ def handle_timeouts( changes = {} # Actual changes we need to notify people about for state in user_states: - is_mine = is_mine_fn(state.user_id) - - new_state = handle_timeout(state, is_mine, syncing_user_ids, now) + user_id = state.user_id + is_mine = is_mine_fn(user_id) + + new_state = handle_timeout( + state, + is_mine, + syncing_user_devices, + user_to_devices.get(user_id, {}), + now, + ) if new_state: changes[state.user_id] = new_state @@ -1901,14 +2020,19 @@ def handle_timeouts( def handle_timeout( - state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int + state: UserPresenceState, + is_mine: bool, + syncing_device_ids: AbstractSet[Tuple[str, Optional[str]]], + user_devices: Dict[Optional[str], UserDevicePresenceState], + now: int, ) -> Optional[UserPresenceState]: """Checks the presence of the user to see if any of the timers have elapsed Args: - state + state: UserPresenceState to check. is_mine: Whether the user is ours - syncing_user_ids: Set of user_ids with active syncs. + syncing_user_devices: A set of (user ID, device ID) tuples with active syncs.. + user_devices: A map of device ID to UserDevicePresenceState. now: Current time in ms. Returns: @@ -1919,34 +2043,55 @@ def handle_timeout( return None changed = False - user_id = state.user_id if is_mine: - if state.state == PresenceState.ONLINE: - if now - state.last_active_ts > IDLE_TIMER: - # Currently online, but last activity ages ago so auto - # idle - state = state.copy_and_replace(state=PresenceState.UNAVAILABLE) - changed = True - elif now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: - # So that we send down a notification that we've - # stopped updating. + # Check per-device whether the device should be considered idle or offline + # due to timeouts. + device_changed = False + offline_devices = [] + for device_id, device_state in user_devices.items(): + if device_state.state == PresenceState.ONLINE: + if now - device_state.last_active_ts > IDLE_TIMER: + # Currently online, but last activity ages ago so auto + # idle + device_state.state = PresenceState.UNAVAILABLE + device_changed = True + + # If there are have been no sync for a while (and none ongoing), + # set presence to offline. + if (state.user_id, device_id) not in syncing_device_ids: + # If the user has done something recently but hasn't synced, + # don't set them as offline. + sync_or_active = max( + device_state.last_sync_ts, device_state.last_active_ts + ) + + if now - sync_or_active > SYNC_ONLINE_TIMEOUT: + # Mark the device as going offline. + offline_devices.append(device_id) + device_changed = True + + # Offline devices are not needed and do not add information. + for device_id in offline_devices: + user_devices.pop(device_id) + + # If the presence state of the devices changed, then (maybe) update + # the user's overall presence state. + if device_changed: + new_presence = _combine_device_states(user_devices.values()) + if new_presence != state.state: + state = state.copy_and_replace(state=new_presence) changed = True + if now - state.last_active_ts > LAST_ACTIVE_GRANULARITY: + # So that we send down a notification that we've + # stopped updating. + changed = True + if now - state.last_federation_update_ts > FEDERATION_PING_INTERVAL: # Need to send ping to other servers to ensure they don't # timeout and set us to offline changed = True - - # If there are have been no sync for a while (and none ongoing), - # set presence to offline - if user_id not in syncing_user_ids: - # If the user has done something recently but hasn't synced, - # don't set them as offline. - sync_or_active = max(state.last_user_sync_ts, state.last_active_ts) - if now - sync_or_active > SYNC_ONLINE_TIMEOUT: - state = state.copy_and_replace(state=PresenceState.OFFLINE) - changed = True else: # We expect to be poked occasionally by the other side. # This is to protect against forgetful/buggy servers, so that @@ -2036,6 +2181,46 @@ def handle_update( return new_state, persist_and_notify, federation_ping +PRESENCE_BY_PRIORITY = { + PresenceState.BUSY: 4, + PresenceState.ONLINE: 3, + PresenceState.UNAVAILABLE: 2, + PresenceState.OFFLINE: 1, +} + + +def _combine_device_states( + device_states: Iterable[UserDevicePresenceState], +) -> str: + """ + Find the device to use presence information from. + + Orders devices by priority, then last_active_ts. + + Args: + device_states: An iterable of device presence states + + Return: + The combined presence state. + """ + + # Based on (all) the user's devices calculate the new presence state. + presence = PresenceState.OFFLINE + last_active_ts = -1 + + # Find the device to use the presence state of based on the presence priority, + # but tie-break with how recently the device has been seen. + for device_state in device_states: + if (PRESENCE_BY_PRIORITY[device_state.state], device_state.last_active_ts) > ( + PRESENCE_BY_PRIORITY[presence], + last_active_ts, + ): + presence = device_state.state + last_active_ts = device_state.last_active_ts + + return presence + + async def get_interested_parties( store: DataStore, presence_router: PresenceRouter, states: List[UserPresenceState] ) -> Tuple[Dict[str, List[UserPresenceState]], Dict[str, List[UserPresenceState]]]: diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 88a16193a3..914415740a 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -21,7 +21,7 @@ from signedjson.key import generate_signing_key from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, Membership, PresenceState -from synapse.api.presence import UserPresenceState +from synapse.api.presence import UserDevicePresenceState, UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events.builder import EventBuilder from synapse.federation.sender import FederationSender @@ -352,6 +352,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_idle_timer(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -362,8 +363,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) assert new_state is not None @@ -376,6 +390,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): presence state into unavailable. """ user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -386,8 +401,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) assert new_state is not None @@ -396,6 +424,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_sync_timeout(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -406,8 +435,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) assert new_state is not None @@ -416,6 +458,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_sync_online(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -426,9 +469,20 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now - SYNC_ONLINE_TIMEOUT - 1, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) new_state = handle_timeout( - state, is_mine=True, syncing_user_ids={user_id}, now=now + state, + is_mine=True, + syncing_device_ids={(user_id, device_id)}, + user_devices={device_id: device_state}, + now=now, ) self.assertIsNotNone(new_state) @@ -438,6 +492,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_federation_ping(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -449,14 +504,28 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_federation_update_ts=now - FEDERATION_PING_INTERVAL - 1, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) self.assertEqual(state, new_state) def test_no_timeout(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" now = 5000000 state = UserPresenceState.default(user_id) @@ -466,8 +535,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_user_sync_ts=now, last_federation_update_ts=now, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNone(new_state) @@ -485,8 +567,9 @@ class PresenceTimeoutTestCase(unittest.TestCase): status_msg=status_msg, ) + # Note that this is a remote user so we do not have their device information. new_state = handle_timeout( - state, is_mine=False, syncing_user_ids=set(), now=now + state, is_mine=False, syncing_device_ids=set(), user_devices={}, now=now ) self.assertIsNotNone(new_state) @@ -496,6 +579,7 @@ class PresenceTimeoutTestCase(unittest.TestCase): def test_last_active(self) -> None: user_id = "@foo:bar" + device_id = "dev-1" status_msg = "I'm here!" now = 5000000 @@ -507,8 +591,21 @@ class PresenceTimeoutTestCase(unittest.TestCase): last_federation_update_ts=now, status_msg=status_msg, ) + device_state = UserDevicePresenceState( + user_id=user_id, + device_id=device_id, + state=state.state, + last_active_ts=state.last_active_ts, + last_sync_ts=state.last_user_sync_ts, + ) - new_state = handle_timeout(state, is_mine=True, syncing_user_ids=set(), now=now) + new_state = handle_timeout( + state, + is_mine=True, + syncing_device_ids=set(), + user_devices={device_id: device_state}, + now=now, + ) self.assertIsNotNone(new_state) self.assertEqual(state, new_state) @@ -579,7 +676,7 @@ class PresenceHandlerInitTestCase(unittest.HomeserverTestCase): [ (PresenceState.BUSY, PresenceState.BUSY), (PresenceState.ONLINE, PresenceState.ONLINE), - (PresenceState.UNAVAILABLE, PresenceState.UNAVAILABLE), + (PresenceState.UNAVAILABLE, PresenceState.ONLINE), # Offline syncs don't update the state. (PresenceState.OFFLINE, PresenceState.ONLINE), ] @@ -800,6 +897,389 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # we should now be online self.assertEqual(state.state, PresenceState.ONLINE) + @parameterized.expand( + # A list of tuples of 4 strings: + # + # * The presence state of device 1. + # * The presence state of device 2. + # * The expected user presence state after both devices have synced. + # * The expected user presence state after device 1 has idled. + # * The expected user presence state after device 2 has idled. + # * True to use workers, False a monolith. + [ + (*cases, workers) + for workers in (False, True) + for cases in [ + # If both devices have the same state, online should eventually idle. + # Otherwise, the state doesn't change. + ( + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + ), + # If the second device has a "lower" state it should fallback to it. + ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.ONLINE, + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + # If the second device has a "higher" state it should override. + ( + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ] + ], + name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[5] else 'monolith'}", + ) + @unittest.override_config({"experimental_features": {"msc3026_enabled": True}}) + def test_set_presence_from_syncing_multi_device( + self, + dev_1_state: str, + dev_2_state: str, + expected_state_1: str, + expected_state_2: str, + expected_state_3: str, + test_with_workers: bool, + ) -> None: + """ + Test the behaviour of multiple devices syncing at the same time. + + Roughly the user's presence state should be set to the "highest" priority + of all the devices. When a device then goes offline its state should be + discarded and the next highest should win. + + Note that these tests use the idle timer (and don't close the syncs), it + is unlikely that a *single* sync would last this long, but is close enough + to continually syncing with that current state. + """ + user_id = f"@test:{self.hs.config.server.server_name}" + + # By default, we call /sync against the main process. + worker_presence_handler = self.presence_handler + if test_with_workers: + # Create a worker and use it to handle /sync traffic instead. + # This is used to test that presence changes get replicated from workers + # to the main process correctly. + worker_to_sync_against = self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "synchrotron"} + ) + worker_presence_handler = worker_to_sync_against.get_presence_handler() + + # 1. Sync with the first device. + self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-1", + affect_presence=dev_1_state != PresenceState.OFFLINE, + presence_state=dev_1_state, + ), + by=0.01, + ) + + # 2. Wait half the idle timer. + self.reactor.advance(IDLE_TIMER / 1000 / 2) + self.reactor.pump([0.1]) + + # 3. Sync with the second device. + self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-2", + affect_presence=dev_2_state != PresenceState.OFFLINE, + presence_state=dev_2_state, + ), + by=0.01, + ) + + # 4. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + + # When testing with workers, make another random sync (with any *different* + # user) to keep the process information from expiring. + # + # This is due to EXTERNAL_PROCESS_EXPIRY being equivalent to IDLE_TIMER. + if test_with_workers: + with self.get_success( + worker_presence_handler.user_syncing( + f"@other-user:{self.hs.config.server.server_name}", + "dev-3", + affect_presence=True, + presence_state=PresenceState.ONLINE, + ), + by=0.01, + ): + pass + + # 5. Advance such that the first device should be discarded (the idle timer), + # then pump so _handle_timeouts function to called. + self.reactor.advance(IDLE_TIMER / 1000 / 2) + self.reactor.pump([0.01]) + + # 6. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + + # 7. Advance such that the second device should be discarded (half the idle timer), + # then pump so _handle_timeouts function to called. + self.reactor.advance(IDLE_TIMER / 1000 / 2) + self.reactor.pump([0.1]) + + # 8. The devices are still "syncing" (the sync context managers were never + # closed), so might idle. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_3) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_3) + + @parameterized.expand( + # A list of tuples of 4 strings: + # + # * The presence state of device 1. + # * The presence state of device 2. + # * The expected user presence state after both devices have synced. + # * The expected user presence state after device 1 has stopped syncing. + # * True to use workers, False a monolith. + [ + (*cases, workers) + for workers in (False, True) + for cases in [ + # If both devices have the same state, nothing exciting should happen. + ( + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + PresenceState.OFFLINE, + ), + # If the second device has a "lower" state it should fallback to it. + ( + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.UNAVAILABLE, + ), + ( + PresenceState.ONLINE, + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.OFFLINE, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.OFFLINE, + ), + # If the second device has a "higher" state it should override. + ( + PresenceState.UNAVAILABLE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + ), + ( + PresenceState.OFFLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + PresenceState.ONLINE, + ), + ( + PresenceState.OFFLINE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + PresenceState.UNAVAILABLE, + ), + ] + ], + name_func=lambda testcase_func, param_num, params: f"{testcase_func.__name__}_{param_num}_{'workers' if params.args[4] else 'monolith'}", + ) + @unittest.override_config({"experimental_features": {"msc3026_enabled": True}}) + def test_set_presence_from_non_syncing_multi_device( + self, + dev_1_state: str, + dev_2_state: str, + expected_state_1: str, + expected_state_2: str, + test_with_workers: bool, + ) -> None: + """ + Test the behaviour of multiple devices syncing at the same time. + + Roughly the user's presence state should be set to the "highest" priority + of all the devices. When a device then goes offline its state should be + discarded and the next highest should win. + + Note that these tests use the idle timer (and don't close the syncs), it + is unlikely that a *single* sync would last this long, but is close enough + to continually syncing with that current state. + """ + user_id = f"@test:{self.hs.config.server.server_name}" + + # By default, we call /sync against the main process. + worker_presence_handler = self.presence_handler + if test_with_workers: + # Create a worker and use it to handle /sync traffic instead. + # This is used to test that presence changes get replicated from workers + # to the main process correctly. + worker_to_sync_against = self.make_worker_hs( + "synapse.app.generic_worker", {"worker_name": "synchrotron"} + ) + worker_presence_handler = worker_to_sync_against.get_presence_handler() + + # 1. Sync with the first device. + sync_1 = self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-1", + affect_presence=dev_1_state != PresenceState.OFFLINE, + presence_state=dev_1_state, + ), + by=0.1, + ) + + # 2. Sync with the second device. + sync_2 = self.get_success( + worker_presence_handler.user_syncing( + user_id, + "dev-2", + affect_presence=dev_2_state != PresenceState.OFFLINE, + presence_state=dev_2_state, + ), + by=0.1, + ) + + # 3. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_1) + + # 4. Disconnect the first device. + with sync_1: + pass + + # 5. Advance such that the first device should be discarded (the sync timeout), + # then pump so _handle_timeouts function to called. + self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000) + self.reactor.pump([5]) + + # 6. Assert the expected presence state. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, expected_state_2) + + # 7. Disconnect the second device. + with sync_2: + pass + + # 8. Advance such that the second device should be discarded (the sync timeout), + # then pump so _handle_timeouts function to called. + self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000) + self.reactor.pump([5]) + + # 9. There are no more devices, should be offline. + state = self.get_success( + self.presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, PresenceState.OFFLINE) + if test_with_workers: + state = self.get_success( + worker_presence_handler.get_state(UserID.from_string(user_id)) + ) + self.assertEqual(state.state, PresenceState.OFFLINE) + def test_set_presence_from_syncing_keeps_status(self) -> None: """Test that presence set by syncing retains status message""" status_msg = "I'm here!" -- cgit 1.5.1 From 8b5013dcbc5db16f0f771898da493e812be6fc8a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 5 Sep 2023 10:39:38 -0400 Subject: Time out busy presence status & test multi-device busy (#16174) Add a (long) timeout to when a "busy" device is considered not online. This does *not* match MSC3026, but is a reasonable thing for an implementation to do. Expands tests for the (unstable) busy presence with multiple devices. --- changelog.d/16174.bugfix | 1 + synapse/handlers/presence.py | 19 +++++++- tests/handlers/test_presence.py | 104 ++++++++++++++++++++++++++++++++++++++-- 3 files changed, 120 insertions(+), 4 deletions(-) create mode 100644 changelog.d/16174.bugfix (limited to 'synapse') diff --git a/changelog.d/16174.bugfix b/changelog.d/16174.bugfix new file mode 100644 index 0000000000..83649cf42a --- /dev/null +++ b/changelog.d/16174.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where multi-device accounts could cause high load due to presence. diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index 80190838b7..a4b05b72e7 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -155,6 +155,8 @@ LAST_ACTIVE_GRANULARITY = 60 * 1000 # How long to wait until a new /events or /sync request before assuming # the client has gone. SYNC_ONLINE_TIMEOUT = 30 * 1000 +# Busy status waits longer, but does eventually go offline. +BUSY_ONLINE_TIMEOUT = 60 * 60 * 1000 # How long to wait before marking the user as idle. Compared against last active IDLE_TIMER = 5 * 60 * 1000 @@ -2066,7 +2068,15 @@ def handle_timeout( device_state.last_sync_ts, device_state.last_active_ts ) - if now - sync_or_active > SYNC_ONLINE_TIMEOUT: + # Implementations aren't meant to timeout a device with a busy + # state, but it needs to timeout *eventually* or else the user + # will be stuck in that state. + online_timeout = ( + BUSY_ONLINE_TIMEOUT + if device_state.state == PresenceState.BUSY + else SYNC_ONLINE_TIMEOUT + ) + if now - sync_or_active > online_timeout: # Mark the device as going offline. offline_devices.append(device_id) device_changed = True @@ -2166,6 +2176,13 @@ def handle_update( new_state = new_state.copy_and_replace(last_federation_update_ts=now) federation_ping = True + if new_state.state == PresenceState.BUSY: + wheel_timer.insert( + now=now, + obj=user_id, + then=new_state.last_user_sync_ts + BUSY_ONLINE_TIMEOUT, + ) + else: wheel_timer.insert( now=now, diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 914415740a..638787b029 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -26,6 +26,7 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events.builder import EventBuilder from synapse.federation.sender import FederationSender from synapse.handlers.presence import ( + BUSY_ONLINE_TIMEOUT, EXTERNAL_PROCESS_EXPIRY, FEDERATION_PING_INTERVAL, FEDERATION_TIMEOUT, @@ -912,6 +913,13 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): for cases in [ # If both devices have the same state, online should eventually idle. # Otherwise, the state doesn't change. + ( + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.ONLINE, @@ -933,7 +941,29 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.OFFLINE, PresenceState.OFFLINE, ), - # If the second device has a "lower" state it should fallback to it. + # If the second device has a "lower" state it should fallback to it, + # except for "busy" which overrides. + ( + PresenceState.BUSY, + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.UNAVAILABLE, @@ -956,6 +986,27 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.UNAVAILABLE, ), # If the second device has a "higher" state it should override. + ( + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.UNAVAILABLE, PresenceState.ONLINE, @@ -1114,6 +1165,12 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): for workers in (False, True) for cases in [ # If both devices have the same state, nothing exciting should happen. + ( + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.ONLINE, @@ -1132,7 +1189,26 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.OFFLINE, PresenceState.OFFLINE, ), - # If the second device has a "lower" state it should fallback to it. + # If the second device has a "lower" state it should fallback to it, + # except for "busy" which overrides. + ( + PresenceState.BUSY, + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.BUSY, + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.ONLINE, PresenceState.UNAVAILABLE, @@ -1152,6 +1228,24 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): PresenceState.OFFLINE, ), # If the second device has a "higher" state it should override. + ( + PresenceState.ONLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.UNAVAILABLE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), + ( + PresenceState.OFFLINE, + PresenceState.BUSY, + PresenceState.BUSY, + PresenceState.BUSY, + ), ( PresenceState.UNAVAILABLE, PresenceState.ONLINE, @@ -1266,7 +1360,11 @@ class PresenceHandlerTestCase(BaseMultiWorkerStreamTestCase): # 8. Advance such that the second device should be discarded (the sync timeout), # then pump so _handle_timeouts function to called. - self.reactor.advance(SYNC_ONLINE_TIMEOUT / 1000) + if dev_1_state == PresenceState.BUSY or dev_2_state == PresenceState.BUSY: + timeout = BUSY_ONLINE_TIMEOUT + else: + timeout = SYNC_ONLINE_TIMEOUT + self.reactor.advance(timeout / 1000) self.reactor.pump([5]) # 9. There are no more devices, should be offline. -- cgit 1.5.1 From c9cec2daed00406b5337a8ce7064e3394ceaf656 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 5 Sep 2023 20:27:41 +0100 Subject: Fix bug where we kept re-requesting a remote server's key repeatedly. (#16257) * Correctly handle multiple rows per server/key * Newsfile --- changelog.d/16257.bugfix | 1 + synapse/storage/databases/main/keys.py | 17 +++++++++++------ 2 files changed, 12 insertions(+), 6 deletions(-) create mode 100644 changelog.d/16257.bugfix (limited to 'synapse') diff --git a/changelog.d/16257.bugfix b/changelog.d/16257.bugfix new file mode 100644 index 0000000000..28a5319749 --- /dev/null +++ b/changelog.d/16257.bugfix @@ -0,0 +1 @@ +Fix long-standing bug where we kept re-requesting a remote server's key repeatedly, potentially causing delays in receiving events over federation. diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index a3b4744855..57aa4921e1 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -221,12 +221,17 @@ class KeyStore(CacheInvalidationWorkerStore): """Processes a batch of keys to fetch, and adds the result to `keys`.""" # batch_iter always returns tuples so it's safe to do len(batch) - sql = """ - SELECT server_name, key_id, key_json, ts_valid_until_ms - FROM server_keys_json WHERE 1=0 - """ + " OR (server_name=? AND key_id=?)" * len( - batch - ) + where_clause = " OR (server_name=? AND key_id=?)" * len(batch) + + # `server_keys_json` can have multiple entries per server (one per + # remote server we fetched from, if using perspectives). Order by + # `ts_added_ms` so the most recently fetched one always wins. + sql = f""" + SELECT server_name, key_id, key_json, ts_valid_until_ms + FROM server_keys_json WHERE 1=0 + {where_clause} + ORDER BY ts_added_ms + """ txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) -- cgit 1.5.1 From b1d71c687ae55ce67e4cfc82c475e61f959dfeb0 Mon Sep 17 00:00:00 2001 From: Travis Ralston Date: Tue, 5 Sep 2023 13:45:39 -0600 Subject: Add MSC4040 `matrix-fed` service lookups (#16137) --- changelog.d/16137.feature | 1 + scripts-dev/federation_client.py | 12 + synapse/http/federation/matrix_federation_agent.py | 29 +- .../federation/test_matrix_federation_agent.py | 323 +++++++++++++++++++-- 4 files changed, 331 insertions(+), 34 deletions(-) create mode 100644 changelog.d/16137.feature (limited to 'synapse') diff --git a/changelog.d/16137.feature b/changelog.d/16137.feature new file mode 100644 index 0000000000..bba6f161cd --- /dev/null +++ b/changelog.d/16137.feature @@ -0,0 +1 @@ +Support resolving homeservers using `matrix-fed` DNS SRV records from [MSC4040](https://github.com/matrix-org/matrix-spec-proposals/pull/4040). diff --git a/scripts-dev/federation_client.py b/scripts-dev/federation_client.py index 5ad334b4d8..e8baeac5e2 100755 --- a/scripts-dev/federation_client.py +++ b/scripts-dev/federation_client.py @@ -329,6 +329,17 @@ class MatrixConnectionAdapter(HTTPAdapter): raise ValueError("Invalid host:port '%s'" % (server_name,)) return out[0], port, out[0] + # Look up SRV for Matrix 1.8 `matrix-fed` service first + try: + srv = srvlookup.lookup("matrix-fed", "tcp", server_name)[0] + print( + f"SRV lookup on _matrix-fed._tcp.{server_name} gave {srv}", + file=sys.stderr, + ) + return srv.host, srv.port, server_name + except Exception: + pass + # Fall back to deprecated `matrix` service try: srv = srvlookup.lookup("matrix", "tcp", server_name)[0] print( @@ -337,6 +348,7 @@ class MatrixConnectionAdapter(HTTPAdapter): ) return srv.host, srv.port, server_name except Exception: + # Fall even further back to just port 8448 return server_name, 8448, server_name @staticmethod diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 91a24efcd0..a3a396bb37 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -399,15 +399,34 @@ class MatrixHostnameEndpoint: if port or _is_ip_literal(host): return [Server(host, port or 8448)] + # Check _matrix-fed._tcp SRV record. logger.debug("Looking up SRV record for %s", host.decode(errors="replace")) + server_list = await self._srv_resolver.resolve_service( + b"_matrix-fed._tcp." + host + ) + + if server_list: + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Got %s from SRV lookup for %s", + ", ".join(map(str, server_list)), + host.decode(errors="replace"), + ) + return server_list + + # No _matrix-fed._tcp SRV record, fallback to legacy _matrix._tcp SRV record. + logger.debug( + "Looking up deprecated SRV record for %s", host.decode(errors="replace") + ) server_list = await self._srv_resolver.resolve_service(b"_matrix._tcp." + host) if server_list: - logger.debug( - "Got %s from SRV lookup for %s", - ", ".join(map(str, server_list)), - host.decode(errors="replace"), - ) + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Got %s from deprecated SRV lookup for %s", + ", ".join(map(str, server_list)), + host.decode(errors="replace"), + ) return server_list # No SRV records, so we fallback to host and 8448 diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 0d17f2fe5b..9f63fa6fa8 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -15,7 +15,7 @@ import base64 import logging import os from typing import Generator, List, Optional, cast -from unittest.mock import AsyncMock, patch +from unittest.mock import AsyncMock, call, patch import treq from netaddr import IPSet @@ -651,9 +651,9 @@ class MatrixFederationAgentTests(unittest.TestCase): # .well-known request fails. self.reactor.pump((0.4,)) - # now there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv1" + # now there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv1"), call(b"_matrix._tcp.testserv1")] ) # we should fall back to a direct connection @@ -737,9 +737,9 @@ class MatrixFederationAgentTests(unittest.TestCase): # .well-known request fails. self.reactor.pump((0.4,)) - # now there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + # now there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # we should fall back to a direct connection @@ -788,9 +788,12 @@ class MatrixFederationAgentTests(unittest.TestCase): content=b'{ "m.server": "target-server" }', ) - # there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.target-server" + # there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.target-server"), + call(b"_matrix._tcp.target-server"), + ] ) # now we should get a connection to the target server @@ -878,9 +881,12 @@ class MatrixFederationAgentTests(unittest.TestCase): self.reactor.pump((0.1,)) - # there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.target-server" + # there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.target-server"), + call(b"_matrix._tcp.target-server"), + ] ) # now we should get a connection to the target server @@ -942,9 +948,9 @@ class MatrixFederationAgentTests(unittest.TestCase): client_factory, expected_sni=b"testserv", content=b"NOT JSON" ) - # now there should be a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + # now there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # we should fall back to a direct connection @@ -1016,14 +1022,14 @@ class MatrixFederationAgentTests(unittest.TestCase): # there should be no requests self.assertEqual(len(http_proto.requests), 0) - # and there should be a SRV lookup instead - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + # and there should be two SRV lookups instead + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) def test_get_hostname_srv(self) -> None: """ - Test the behaviour when there is a single SRV record + Test the behaviour when there is a single SRV record for _matrix-fed. """ self.agent = self._make_agent() @@ -1039,7 +1045,51 @@ class MatrixFederationAgentTests(unittest.TestCase): # the request for a .well-known will have failed with a DNS lookup error. self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + b"_matrix-fed._tcp.testserv" + ) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # make a test server, and wire up the client + http_server = self._make_connection(client_factory, expected_sni=b"testserv") + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"]) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_get_hostname_srv_legacy(self) -> None: + """ + Test the behaviour when there is a single SRV record for _matrix. + """ + self.agent = self._make_agent() + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [Server(host=b"srvtarget", port=8443)], + ] + self.reactor.lookups["srvtarget"] = "1.2.3.4" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + # the request for a .well-known will have failed with a DNS lookup error. + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # Make sure treq is trying to connect @@ -1065,7 +1115,7 @@ class MatrixFederationAgentTests(unittest.TestCase): def test_get_well_known_srv(self) -> None: """Test the behaviour when the .well-known redirects to a place where there - is a SRV. + is a _matrix-fed SRV record. """ self.agent = self._make_agent() @@ -1096,7 +1146,72 @@ class MatrixFederationAgentTests(unittest.TestCase): # there should be a SRV lookup self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.target-server" + b"_matrix-fed._tcp.target-server" + ) + + # now we should get a connection to the target of the SRV record + self.assertEqual(len(clients), 2) + (host, port, client_factory, _timeout, _bindAddress) = clients[1] + self.assertEqual(host, "5.6.7.8") + self.assertEqual(port, 8443) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, expected_sni=b"target-server" + ) + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual( + request.requestHeaders.getRawHeaders(b"host"), [b"target-server"] + ) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_get_well_known_srv_legacy(self) -> None: + """Test the behaviour when the .well-known redirects to a place where there + is a _matrix SRV record. + """ + self.agent = self._make_agent() + + self.reactor.lookups["testserv"] = "1.2.3.4" + self.reactor.lookups["srvtarget"] = "5.6.7.8" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + # there should be an attempt to connect on port 443 for the .well-known + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 443) + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [Server(host=b"srvtarget", port=8443)], + ] + + self._handle_well_known_connection( + client_factory, + expected_sni=b"testserv", + content=b'{ "m.server": "target-server" }', + ) + + # there should be two SRV lookups + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.target-server"), + call(b"_matrix._tcp.target-server"), + ] ) # now we should get a connection to the target of the SRV record @@ -1158,8 +1273,11 @@ class MatrixFederationAgentTests(unittest.TestCase): self.reactor.pump((0.4,)) # now there should have been a SRV lookup - self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.xn--bcher-kva.com" + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.xn--bcher-kva.com"), + call(b"_matrix._tcp.xn--bcher-kva.com"), + ] ) # We should fall back to port 8448 @@ -1188,7 +1306,7 @@ class MatrixFederationAgentTests(unittest.TestCase): self.successResultOf(test_d) def test_idna_srv_target(self) -> None: - """test the behaviour when the target of a SRV record has idna chars""" + """test the behaviour when the target of a _matrix-fed SRV record has idna chars""" self.agent = self._make_agent() self.mock_resolver.resolve_service.return_value = [ @@ -1204,7 +1322,57 @@ class MatrixFederationAgentTests(unittest.TestCase): self.assertNoResult(test_d) self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.xn--bcher-kva.com" + b"_matrix-fed._tcp.xn--bcher-kva.com" + ) + + # Make sure treq is trying to connect + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients[0] + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # make a test server, and wire up the client + http_server = self._make_connection( + client_factory, expected_sni=b"xn--bcher-kva.com" + ) + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual( + request.requestHeaders.getRawHeaders(b"host"), [b"xn--bcher-kva.com"] + ) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_idna_srv_target_legacy(self) -> None: + """test the behaviour when the target of a _matrix SRV record has idna chars""" + self.agent = self._make_agent() + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [Server(host=b"xn--trget-3qa.com", port=8443)], + ] # târget.com + self.reactor.lookups["xn--trget-3qa.com"] = "1.2.3.4" + + test_d = self._make_get_request( + b"matrix-federation://xn--bcher-kva.com/foo/bar" + ) + + # Nothing happened yet + self.assertNoResult(test_d) + + self.mock_resolver.resolve_service.assert_has_calls( + [ + call(b"_matrix-fed._tcp.xn--bcher-kva.com"), + call(b"_matrix._tcp.xn--bcher-kva.com"), + ] ) # Make sure treq is trying to connect @@ -1394,7 +1562,7 @@ class MatrixFederationAgentTests(unittest.TestCase): self.assertIsNone(r.delegated_server) def test_srv_fallbacks(self) -> None: - """Test that other SRV results are tried if the first one fails.""" + """Test that other SRV results are tried if the first one fails for _matrix-fed SRV.""" self.agent = self._make_agent() self.mock_resolver.resolve_service.return_value = [ @@ -1409,7 +1577,67 @@ class MatrixFederationAgentTests(unittest.TestCase): self.assertNoResult(test_d) self.mock_resolver.resolve_service.assert_called_once_with( - b"_matrix._tcp.testserv" + b"_matrix-fed._tcp.testserv" + ) + + # We should see an attempt to connect to the first server + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # Fonx the connection + client_factory.clientConnectionFailed(None, Exception("nope")) + + # There's a 300ms delay in HostnameEndpoint + self.reactor.pump((0.4,)) + + # Hasn't failed yet + self.assertNoResult(test_d) + + # We shouldnow see an attempt to connect to the second server + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8444) + + # make a test server, and wire up the client + http_server = self._make_connection(client_factory, expected_sni=b"testserv") + + self.assertEqual(len(http_server.requests), 1) + request = http_server.requests[0] + self.assertEqual(request.method, b"GET") + self.assertEqual(request.path, b"/foo/bar") + self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"]) + + # finish the request + request.finish() + self.reactor.pump((0.1,)) + self.successResultOf(test_d) + + def test_srv_fallbacks_legacy(self) -> None: + """Test that other SRV results are tried if the first one fails for _matrix SRV.""" + self.agent = self._make_agent() + + # Return no entries for the _matrix-fed lookup, and a response for _matrix. + self.mock_resolver.resolve_service.side_effect = [ + [], + [ + Server(host=b"target.com", port=8443), + Server(host=b"target.com", port=8444), + ], + ] + self.reactor.lookups["target.com"] = "1.2.3.4" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + self.mock_resolver.resolve_service.assert_has_calls( + [call(b"_matrix-fed._tcp.testserv"), call(b"_matrix._tcp.testserv")] ) # We should see an attempt to connect to the first server @@ -1449,6 +1677,43 @@ class MatrixFederationAgentTests(unittest.TestCase): self.reactor.pump((0.1,)) self.successResultOf(test_d) + def test_srv_no_fallback_to_legacy(self) -> None: + """Test that _matrix SRV results are not tried if the _matrix-fed one fails.""" + self.agent = self._make_agent() + + # Return a failing entry for _matrix-fed. + self.mock_resolver.resolve_service.side_effect = [ + [Server(host=b"target.com", port=8443)], + [], + ] + self.reactor.lookups["target.com"] = "1.2.3.4" + + test_d = self._make_get_request(b"matrix-federation://testserv/foo/bar") + + # Nothing happened yet + self.assertNoResult(test_d) + + # Only the _matrix-fed is checked, _matrix is ignored. + self.mock_resolver.resolve_service.assert_called_once_with( + b"_matrix-fed._tcp.testserv" + ) + + # We should see an attempt to connect to the first server + clients = self.reactor.tcpClients + self.assertEqual(len(clients), 1) + (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0) + self.assertEqual(host, "1.2.3.4") + self.assertEqual(port, 8443) + + # Fonx the connection + client_factory.clientConnectionFailed(None, Exception("nope")) + + # There's a 300ms delay in HostnameEndpoint + self.reactor.pump((0.4,)) + + # Failed to resolve a server. + self.assertFailure(test_d, Exception) + class TestCachePeriodFromHeaders(unittest.TestCase): def test_cache_control(self) -> None: -- cgit 1.5.1 From 1e571cd66437ea2455c203dafb94c20ba48cdcc1 Mon Sep 17 00:00:00 2001 From: Will Hunt Date: Tue, 5 Sep 2023 20:46:57 +0100 Subject: Fix appservices being unable to handle to_device messages for multiple users (#16251) --- changelog.d/16251.bugfix | 1 + synapse/storage/databases/main/deviceinbox.py | 2 +- tests/handlers/test_appservice.py | 125 ++++++++++++++++++++++++++ 3 files changed, 127 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16251.bugfix (limited to 'synapse') diff --git a/changelog.d/16251.bugfix b/changelog.d/16251.bugfix new file mode 100644 index 0000000000..6d3157c7aa --- /dev/null +++ b/changelog.d/16251.bugfix @@ -0,0 +1 @@ +Fix a long-standing bug where appservices using MSC2409 to receive to_device messages, would only get messages for one user. \ No newline at end of file diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index b471fcb064..271cdf923c 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -349,7 +349,7 @@ class DeviceInboxWorkerStore(SQLBaseStore): table="devices", column="user_id", iterable=user_ids_to_query, - keyvalues={"user_id": user_id, "hidden": False}, + keyvalues={"hidden": False}, retcols=("device_id",), ) diff --git a/tests/handlers/test_appservice.py b/tests/handlers/test_appservice.py index 46d022092e..a7e6cdd66a 100644 --- a/tests/handlers/test_appservice.py +++ b/tests/handlers/test_appservice.py @@ -422,6 +422,18 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): "exclusive_as_user", "password", self.exclusive_as_user_device_id ) + self.exclusive_as_user_2_device_id = "exclusive_as_device_2" + self.exclusive_as_user_2 = self.register_user("exclusive_as_user_2", "password") + self.exclusive_as_user_2_token = self.login( + "exclusive_as_user_2", "password", self.exclusive_as_user_2_device_id + ) + + self.exclusive_as_user_3_device_id = "exclusive_as_device_3" + self.exclusive_as_user_3 = self.register_user("exclusive_as_user_3", "password") + self.exclusive_as_user_3_token = self.login( + "exclusive_as_user_3", "password", self.exclusive_as_user_3_device_id + ) + def _notify_interested_services(self) -> None: # This is normally set in `notify_interested_services` but we need to call the # internal async version so the reactor gets pushed to completion. @@ -849,6 +861,119 @@ class ApplicationServicesHandlerSendEventsTestCase(unittest.HomeserverTestCase): for count in service_id_to_message_count.values(): self.assertEqual(count, number_of_messages) + @unittest.override_config( + {"experimental_features": {"msc2409_to_device_messages_enabled": True}} + ) + def test_application_services_receive_local_to_device_for_many_users(self) -> None: + """ + Test that when a user sends a to-device message to many users + in an application service's user namespace, the + application service will receive all of them. + """ + interested_appservice = self._register_application_service( + namespaces={ + ApplicationService.NS_USERS: [ + { + "regex": "@exclusive_as_user:.+", + "exclusive": True, + }, + { + "regex": "@exclusive_as_user_2:.+", + "exclusive": True, + }, + { + "regex": "@exclusive_as_user_3:.+", + "exclusive": True, + }, + ], + }, + ) + + # Have local_user send a to-device message to exclusive_as_users + message_content = {"some_key": "some really interesting value"} + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/3", + content={ + "messages": { + self.exclusive_as_user: { + self.exclusive_as_user_device_id: message_content + }, + self.exclusive_as_user_2: { + self.exclusive_as_user_2_device_id: message_content + }, + self.exclusive_as_user_3: { + self.exclusive_as_user_3_device_id: message_content + }, + } + }, + access_token=self.local_user_token, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Have exclusive_as_user send a to-device message to local_user + for user_token in [ + self.exclusive_as_user_token, + self.exclusive_as_user_2_token, + self.exclusive_as_user_3_token, + ]: + chan = self.make_request( + "PUT", + "/_matrix/client/r0/sendToDevice/m.room_key_request/4", + content={ + "messages": { + self.local_user: {self.local_user_device_id: message_content} + } + }, + access_token=user_token, + ) + self.assertEqual(chan.code, 200, chan.result) + + # Check if our application service - that is interested in exclusive_as_user - received + # the to-device message as part of an AS transaction. + # Only the local_user -> exclusive_as_user to-device message should have been forwarded to the AS. + # + # The uninterested application service should not have been notified at all. + self.send_mock.assert_called_once() + ( + service, + _events, + _ephemeral, + to_device_messages, + _otks, + _fbks, + _device_list_summary, + ) = self.send_mock.call_args[0] + + # Assert that this was the same to-device message that local_user sent + self.assertEqual(service, interested_appservice) + + # Assert expected number of messages + self.assertEqual(len(to_device_messages), 3) + + for device_msg in to_device_messages: + self.assertEqual(device_msg["type"], "m.room_key_request") + self.assertEqual(device_msg["sender"], self.local_user) + self.assertEqual(device_msg["content"], message_content) + + self.assertEqual(to_device_messages[0]["to_user_id"], self.exclusive_as_user) + self.assertEqual( + to_device_messages[0]["to_device_id"], + self.exclusive_as_user_device_id, + ) + + self.assertEqual(to_device_messages[1]["to_user_id"], self.exclusive_as_user_2) + self.assertEqual( + to_device_messages[1]["to_device_id"], + self.exclusive_as_user_2_device_id, + ) + + self.assertEqual(to_device_messages[2]["to_user_id"], self.exclusive_as_user_3) + self.assertEqual( + to_device_messages[2]["to_device_id"], + self.exclusive_as_user_3_device_id, + ) + def _register_application_service( self, namespaces: Optional[Dict[str, Iterable[Dict]]] = None, -- cgit 1.5.1 From 4f1840a88ad3a93244fc23149c56245704eab824 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Wed, 6 Sep 2023 09:30:53 +0200 Subject: Delete device messages asynchronously and in staged batches (#16240) --- changelog.d/16240.misc | 1 + synapse/handlers/device.py | 48 ++++++++++++++++++++++ synapse/handlers/presence.py | 4 +- synapse/handlers/sync.py | 16 ++++++-- synapse/storage/databases/main/deviceinbox.py | 26 +++++++++--- synapse/storage/databases/main/devices.py | 8 ---- synapse/storage/databases/main/receipts.py | 6 +-- synapse/storage/engines/_base.py | 6 +++ synapse/storage/engines/postgres.py | 4 ++ synapse/storage/engines/sqlite.py | 4 ++ .../schema/main/delta/48/group_unique_indexes.py | 4 +- synapse/util/task_scheduler.py | 17 ++++---- tests/handlers/test_device.py | 47 +++++++++++++++++++++ 13 files changed, 154 insertions(+), 37 deletions(-) create mode 100644 changelog.d/16240.misc (limited to 'synapse') diff --git a/changelog.d/16240.misc b/changelog.d/16240.misc new file mode 100644 index 0000000000..4f266c1fb0 --- /dev/null +++ b/changelog.d/16240.misc @@ -0,0 +1 @@ +Delete device messages asynchronously and in staged batches using the task scheduler. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 763f56dfc1..9e52af5f13 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -43,9 +43,12 @@ from synapse.metrics.background_process_metrics import ( ) from synapse.types import ( JsonDict, + JsonMapping, + ScheduledTask, StrCollection, StreamKeyType, StreamToken, + TaskStatus, UserID, get_domain_from_id, get_verify_key_from_cross_signing_key, @@ -62,6 +65,7 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) +DELETE_DEVICE_MSGS_TASK_NAME = "delete_device_messages" MAX_DEVICE_DISPLAY_NAME_LEN = 100 DELETE_STALE_DEVICES_INTERVAL_MS = 24 * 60 * 60 * 1000 @@ -78,6 +82,7 @@ class DeviceWorkerHandler: self._appservice_handler = hs.get_application_service_handler() self._state_storage = hs.get_storage_controllers().state self._auth_handler = hs.get_auth_handler() + self._event_sources = hs.get_event_sources() self.server_name = hs.hostname self._msc3852_enabled = hs.config.experimental.msc3852_enabled self._query_appservices_for_keys = ( @@ -386,6 +391,7 @@ class DeviceHandler(DeviceWorkerHandler): self._account_data_handler = hs.get_account_data_handler() self._storage_controllers = hs.get_storage_controllers() self.db_pool = hs.get_datastores().main.db_pool + self._task_scheduler = hs.get_task_scheduler() self.device_list_updater = DeviceListUpdater(hs, self) @@ -419,6 +425,10 @@ class DeviceHandler(DeviceWorkerHandler): self._delete_stale_devices, ) + self._task_scheduler.register_action( + self._delete_device_messages, DELETE_DEVICE_MSGS_TASK_NAME + ) + def _check_device_name_length(self, name: Optional[str]) -> None: """ Checks whether a device name is longer than the maximum allowed length. @@ -530,6 +540,7 @@ class DeviceHandler(DeviceWorkerHandler): user_id: The user to delete devices from. device_ids: The list of device IDs to delete """ + to_device_stream_id = self._event_sources.get_current_token().to_device_key try: await self.store.delete_devices(user_id, device_ids) @@ -559,12 +570,49 @@ class DeviceHandler(DeviceWorkerHandler): f"org.matrix.msc3890.local_notification_settings.{device_id}", ) + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=device_id, + params={ + "user_id": user_id, + "device_id": device_id, + "up_to_stream_id": to_device_stream_id, + }, + ) + # Pushers are deleted after `delete_access_tokens_for_user` is called so that # modules using `on_logged_out` hook can use them if needed. await self.hs.get_pusherpool().remove_pushers_by_devices(user_id, device_ids) await self.notify_device_update(user_id, device_ids) + DEVICE_MSGS_DELETE_BATCH_LIMIT = 100 + + async def _delete_device_messages( + self, + task: ScheduledTask, + ) -> Tuple[TaskStatus, Optional[JsonMapping], Optional[str]]: + """Scheduler task to delete device messages in batch of `DEVICE_MSGS_DELETE_BATCH_LIMIT`.""" + assert task.params is not None + user_id = task.params["user_id"] + device_id = task.params["device_id"] + up_to_stream_id = task.params["up_to_stream_id"] + + res = await self.store.delete_messages_for_device( + user_id=user_id, + device_id=device_id, + up_to_stream_id=up_to_stream_id, + limit=DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT, + ) + + if res < DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT: + return TaskStatus.COMPLETE, None, None + else: + # There is probably still device messages to be deleted, let's keep the task active and it will be run + # again in a subsequent scheduler loop run (probably the next one, if not too many tasks are running). + return TaskStatus.ACTIVE, None, None + async def update_device(self, user_id: str, device_id: str, content: dict) -> None: """Update the given device diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index a4b05b72e7..375c7d0901 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -183,6 +183,7 @@ class BasePresenceHandler(abc.ABC): writer""" def __init__(self, hs: "HomeServer"): + self.hs = hs self.clock = hs.get_clock() self.store = hs.get_datastores().main self._storage_controllers = hs.get_storage_controllers() @@ -473,8 +474,6 @@ class _NullContextManager(ContextManager[None]): class WorkerPresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs - self._presence_writer_instance = hs.config.worker.writers.presence[0] # Route presence EDUs to the right worker @@ -738,7 +737,6 @@ class WorkerPresenceHandler(BasePresenceHandler): class PresenceHandler(BasePresenceHandler): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.hs = hs self.wheel_timer: WheelTimer[str] = WheelTimer() self.notifier = hs.get_notifier() diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 60a9f341b5..0ccd7d250c 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -40,6 +40,7 @@ from synapse.api.filtering import FilterCollection from synapse.api.presence import UserPresenceState from synapse.api.room_versions import KNOWN_ROOM_VERSIONS from synapse.events import EventBase +from synapse.handlers.device import DELETE_DEVICE_MSGS_TASK_NAME from synapse.handlers.relations import BundledAggregations from synapse.logging import issue9533_logger from synapse.logging.context import current_context @@ -268,6 +269,7 @@ class SyncHandler: self._storage_controllers = hs.get_storage_controllers() self._state_storage_controller = self._storage_controllers.state self._device_handler = hs.get_device_handler() + self._task_scheduler = hs.get_task_scheduler() self.should_calculate_push_rules = hs.config.push.enable_push @@ -360,11 +362,19 @@ class SyncHandler: # (since we now know that the device has received them) if since_token is not None: since_stream_id = since_token.to_device_key - deleted = await self.store.delete_messages_for_device( - sync_config.user.to_string(), sync_config.device_id, since_stream_id + # Delete device messages asynchronously and in batches using the task scheduler + await self._task_scheduler.schedule_task( + DELETE_DEVICE_MSGS_TASK_NAME, + resource_id=sync_config.device_id, + params={ + "user_id": sync_config.user.to_string(), + "device_id": sync_config.device_id, + "up_to_stream_id": since_stream_id, + }, ) logger.debug( - "Deleted %d to-device messages up to %d", deleted, since_stream_id + "Deletion of to-device messages up to %d scheduled", + since_stream_id, ) if timeout == 0 or since_token is None or full_state: diff --git a/synapse/storage/databases/main/deviceinbox.py b/synapse/storage/databases/main/deviceinbox.py index 271cdf923c..744e98c6d0 100644 --- a/synapse/storage/databases/main/deviceinbox.py +++ b/synapse/storage/databases/main/deviceinbox.py @@ -445,13 +445,18 @@ class DeviceInboxWorkerStore(SQLBaseStore): @trace async def delete_messages_for_device( - self, user_id: str, device_id: Optional[str], up_to_stream_id: int + self, + user_id: str, + device_id: Optional[str], + up_to_stream_id: int, + limit: int, ) -> int: """ Args: user_id: The recipient user_id. device_id: The recipient device_id. up_to_stream_id: Where to delete messages up to. + limit: maximum number of messages to delete Returns: The number of messages deleted. @@ -472,12 +477,16 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": "No changes in cache since last check"}) return 0 + ROW_ID_NAME = self.database_engine.row_id_name + def delete_messages_for_device_txn(txn: LoggingTransaction) -> int: - sql = ( - "DELETE FROM device_inbox" - " WHERE user_id = ? AND device_id = ?" - " AND stream_id <= ?" - ) + sql = f""" + DELETE FROM device_inbox WHERE {ROW_ID_NAME} IN ( + SELECT {ROW_ID_NAME} FROM device_inbox + WHERE user_id = ? AND device_id = ? AND stream_id <= ? + LIMIT {limit} + ) + """ txn.execute(sql, (user_id, device_id, up_to_stream_id)) return txn.rowcount @@ -487,6 +496,11 @@ class DeviceInboxWorkerStore(SQLBaseStore): log_kv({"message": f"deleted {count} messages for device", "count": count}) + # In this case we don't know if we hit the limit or the delete is complete + # so let's not update the cache. + if count == limit: + return count + # Update the cache, ensuring that we only ever increase the value updated_last_deleted_stream_id = self._last_device_delete_cache.get( (user_id, device_id), 0 diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index fa69a4a298..7208fc8b33 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1766,14 +1766,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): keyvalues={"user_id": user_id, "hidden": False}, ) - self.db_pool.simple_delete_many_txn( - txn, - table="device_inbox", - column="device_id", - values=device_ids, - keyvalues={"user_id": user_id}, - ) - self.db_pool.simple_delete_many_txn( txn, table="device_auth_providers", diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py index 5ee5c7ad9f..e4d10ff250 100644 --- a/synapse/storage/databases/main/receipts.py +++ b/synapse/storage/databases/main/receipts.py @@ -939,11 +939,7 @@ class ReceiptsBackgroundUpdateStore(SQLBaseStore): receipts.""" def _remote_duplicate_receipts_txn(txn: LoggingTransaction) -> None: - if isinstance(self.database_engine, PostgresEngine): - ROW_ID_NAME = "ctid" - else: - ROW_ID_NAME = "rowid" - + ROW_ID_NAME = self.database_engine.row_id_name # Identify any duplicate receipts arising from # https://github.com/matrix-org/synapse/issues/14406. # The following query takes less than a minute on matrix.org. diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py index 0b5b3bf03e..b1a2418cbd 100644 --- a/synapse/storage/engines/_base.py +++ b/synapse/storage/engines/_base.py @@ -100,6 +100,12 @@ class BaseDatabaseEngine(Generic[ConnectionType, CursorType], metaclass=abc.ABCM """Gets a string giving the server version. For example: '3.22.0'""" ... + @property + @abc.abstractmethod + def row_id_name(self) -> str: + """Gets the literal name representing a row id for this engine.""" + ... + @abc.abstractmethod def in_transaction(self, conn: ConnectionType) -> bool: """Whether the connection is currently in a transaction.""" diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py index 05a72dc554..6309363217 100644 --- a/synapse/storage/engines/postgres.py +++ b/synapse/storage/engines/postgres.py @@ -211,6 +211,10 @@ class PostgresEngine( else: return "%i.%i.%i" % (numver / 10000, (numver % 10000) / 100, numver % 100) + @property + def row_id_name(self) -> str: + return "ctid" + def in_transaction(self, conn: psycopg2.extensions.connection) -> bool: return conn.status != psycopg2.extensions.STATUS_READY diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py index ca8c59297c..802069e1e1 100644 --- a/synapse/storage/engines/sqlite.py +++ b/synapse/storage/engines/sqlite.py @@ -123,6 +123,10 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection, sqlite3.Cursor]): """Gets a string giving the server version. For example: '3.22.0'.""" return "%i.%i.%i" % sqlite3.sqlite_version_info + @property + def row_id_name(self) -> str: + return "rowid" + def in_transaction(self, conn: sqlite3.Connection) -> bool: return conn.in_transaction diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py index ad2da4c8af..622686d28f 100644 --- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py @@ -14,7 +14,7 @@ from synapse.storage.database import LoggingTransaction -from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements FIX_INDEXES = """ @@ -37,7 +37,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: - rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" + rowid = database_engine.row_id_name # remove duplicates from group_users & group_invites tables cur.execute( diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 9e89aeb748..9b2581e51a 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -77,6 +77,7 @@ class TaskScheduler: LAST_UPDATE_BEFORE_WARNING_MS = 24 * 60 * 60 * 1000 # 24hrs def __init__(self, hs: "HomeServer"): + self._hs = hs self._store = hs.get_datastores().main self._clock = hs.get_clock() self._running_tasks: Set[str] = set() @@ -97,8 +98,6 @@ class TaskScheduler: "handle_scheduled_tasks", self._handle_scheduled_tasks, ) - else: - self.replication_client = hs.get_replication_command_handler() def register_action( self, @@ -133,7 +132,7 @@ class TaskScheduler: params: Optional[JsonMapping] = None, ) -> str: """Schedule a new potentially resumable task. A function matching the specified - `action` should have been previously registered with `register_action`. + `action` should have be registered with `register_action` before the task is run. Args: action: the name of a previously registered action @@ -149,11 +148,6 @@ class TaskScheduler: Returns: The id of the scheduled task """ - if action not in self._actions: - raise Exception( - f"No function associated with action {action} of the scheduled task" - ) - status = TaskStatus.SCHEDULED if timestamp is None or timestamp < self._clock.time_msec(): timestamp = self._clock.time_msec() @@ -175,7 +169,7 @@ class TaskScheduler: if self._run_background_tasks: await self._launch_task(task) else: - self.replication_client.send_new_active_task(task.id) + self._hs.get_replication_command_handler().send_new_active_task(task.id) return task.id @@ -315,7 +309,10 @@ class TaskScheduler: """ assert self._run_background_tasks - assert task.action in self._actions + if task.action not in self._actions: + raise Exception( + f"No function associated with action {task.action} of the scheduled task {task.id}" + ) function = self._actions[task.action] async def wrapper() -> None: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 55a4f95ef3..9659a4a355 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -30,6 +30,7 @@ from synapse.server import HomeServer from synapse.storage.databases.main.appservice import _make_exclusive_regex from synapse.types import JsonDict, create_requester from synapse.util import Clock +from synapse.util.task_scheduler import TaskScheduler from tests import unittest from tests.unittest import override_config @@ -49,6 +50,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): assert isinstance(handler, DeviceHandler) self.handler = handler self.store = hs.get_datastores().main + self.device_message_handler = hs.get_device_message_handler() return hs def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -211,6 +213,51 @@ class DeviceTestCase(unittest.HomeserverTestCase): ) self.assertIsNone(res) + def test_delete_device_and_big_device_inbox(self) -> None: + """Check that deleting a big device inbox is staged and batched asynchronously.""" + DEVICE_ID = "abc" + sender = "@sender:" + self.hs.hostname + receiver = "@receiver:" + self.hs.hostname + self._record_user(sender, DEVICE_ID, DEVICE_ID) + self._record_user(receiver, DEVICE_ID, DEVICE_ID) + + # queue a bunch of messages in the inbox + requester = create_requester(sender, device_id=DEVICE_ID) + for i in range(0, DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10): + self.get_success( + self.device_message_handler.send_device_message( + requester, "message_type", {receiver: {"*": {"val": i}}} + ) + ) + + # delete the device + self.get_success(self.handler.delete_devices(receiver, [DEVICE_ID])) + + # messages should be deleted up to DEVICE_MSGS_DELETE_BATCH_LIMIT straight away + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(10, len(res)) + + # wait for the task scheduler to do a second delete pass + self.reactor.advance(TaskScheduler.SCHEDULE_INTERVAL_MS / 1000) + + # remaining messages should now be deleted + res = self.get_success( + self.store.db_pool.simple_select_list( + table="device_inbox", + keyvalues={"user_id": receiver}, + retcols=("user_id", "device_id", "stream_id"), + desc="get_device_id_from_device_inbox", + ) + ) + self.assertEqual(0, len(res)) + def test_update_device(self) -> None: self._record_users() -- cgit 1.5.1 From 698f6fa2508dbff1a4353d57da60be5d13bbd61d Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 6 Sep 2023 10:50:07 +0000 Subject: Allow modules to delete rooms. (#15997) * Allow user_id to be optional for room deletion * Add module API method to delete a room * Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) * Don't worry about the case block=True && requester_user_id is None --------- Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/15997.misc | 1 + synapse/handlers/pagination.py | 12 ++++++++++-- synapse/handlers/room.py | 10 +++++++++- synapse/module_api/__init__.py | 13 +++++++++++++ .../callbacks/third_party_event_rules_callbacks.py | 11 ++++++++--- 5 files changed, 41 insertions(+), 6 deletions(-) create mode 100644 changelog.d/15997.misc (limited to 'synapse') diff --git a/changelog.d/15997.misc b/changelog.d/15997.misc new file mode 100644 index 0000000000..94768c3cb8 --- /dev/null +++ b/changelog.d/15997.misc @@ -0,0 +1 @@ +Allow modules to delete rooms. \ No newline at end of file diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index e5ac9096cc..19cf5a2b43 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -713,7 +713,7 @@ class PaginationHandler: self, delete_id: str, room_id: str, - requester_user_id: str, + requester_user_id: Optional[str], new_room_user_id: Optional[str] = None, new_room_name: Optional[str] = None, message: Optional[str] = None, @@ -732,6 +732,10 @@ class PaginationHandler: requester_user_id: User who requested the action. Will be recorded as putting the room on the blocking list. + If None, the action was not manually requested but instead + triggered automatically, e.g. through a Synapse module + or some other policy. + MUST NOT be None if block=True. new_room_user_id: If set, a new room will be created with this user ID as the creator and admin, and all users in the old room will be @@ -818,7 +822,7 @@ class PaginationHandler: def start_shutdown_and_purge_room( self, room_id: str, - requester_user_id: str, + requester_user_id: Optional[str], new_room_user_id: Optional[str] = None, new_room_name: Optional[str] = None, message: Optional[str] = None, @@ -833,6 +837,10 @@ class PaginationHandler: requester_user_id: User who requested the action and put the room on the blocking list. + If None, the action was not manually requested but instead + triggered automatically, e.g. through a Synapse module + or some other policy. + MUST NOT be None if block=True. new_room_user_id: If set, a new room will be created with this user ID as the creator and admin, and all users in the old room will be diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 0513e28aab..7a762c8511 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -1787,7 +1787,7 @@ class RoomShutdownHandler: async def shutdown_room( self, room_id: str, - requester_user_id: str, + requester_user_id: Optional[str], new_room_user_id: Optional[str] = None, new_room_name: Optional[str] = None, message: Optional[str] = None, @@ -1811,6 +1811,10 @@ class RoomShutdownHandler: requester_user_id: User who requested the action and put the room on the blocking list. + If None, the action was not manually requested but instead + triggered automatically, e.g. through a Synapse module + or some other policy. + MUST NOT be None if block=True. new_room_user_id: If set, a new room will be created with this user ID as the creator and admin, and all users in the old room will be @@ -1863,6 +1867,10 @@ class RoomShutdownHandler: # Action the block first (even if the room doesn't exist yet) if block: + if requester_user_id is None: + raise ValueError( + "shutdown_room: block=True not allowed when requester_user_id is None." + ) # This will work even if the room is already blocked, but that is # desirable in case the first attempt at blocking the room failed below. await self.store.block_room(room_id, requester_user_id) diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py index 2f00a7ba20..d6efe10a28 100644 --- a/synapse/module_api/__init__.py +++ b/synapse/module_api/__init__.py @@ -1730,6 +1730,19 @@ class ModuleApi: room_alias_str = room_alias.to_string() if room_alias else None return room_id, room_alias_str + async def delete_room(self, room_id: str) -> None: + """ + Schedules the deletion of a room from Synapse's database. + + If the room is already being deleted, this method does nothing. + This method does not wait for the room to be deleted. + + Added in Synapse v1.89.0. + """ + # Future extensions to this method might want to e.g. allow use of `force_purge`. + # TODO In the future we should make sure this is persistent. + self._hs.get_pagination_handler().start_shutdown_and_purge_room(room_id, None) + async def set_displayname( self, user_id: UserID, diff --git a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py index 911f37ba42..ecaeef3511 100644 --- a/synapse/module_api/callbacks/third_party_event_rules_callbacks.py +++ b/synapse/module_api/callbacks/third_party_event_rules_callbacks.py @@ -40,7 +40,7 @@ CHECK_VISIBILITY_CAN_BE_MODIFIED_CALLBACK = Callable[ [str, StateMap[EventBase], str], Awaitable[bool] ] ON_NEW_EVENT_CALLBACK = Callable[[EventBase, StateMap[EventBase]], Awaitable] -CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[str, str], Awaitable[bool]] +CHECK_CAN_SHUTDOWN_ROOM_CALLBACK = Callable[[Optional[str], str], Awaitable[bool]] CHECK_CAN_DEACTIVATE_USER_CALLBACK = Callable[[str, bool], Awaitable[bool]] ON_PROFILE_UPDATE_CALLBACK = Callable[[str, ProfileInfo, bool, bool], Awaitable] ON_USER_DEACTIVATION_STATUS_CHANGED_CALLBACK = Callable[[str, bool, bool], Awaitable] @@ -429,12 +429,17 @@ class ThirdPartyEventRulesModuleApiCallbacks: "Failed to run module API callback %s: %s", callback, e ) - async def check_can_shutdown_room(self, user_id: str, room_id: str) -> bool: + async def check_can_shutdown_room( + self, user_id: Optional[str], room_id: str + ) -> bool: """Intercept requests to shutdown a room. If `False` is returned, the room must not be shut down. Args: - requester: The ID of the user requesting the shutdown. + user_id: The ID of the user requesting the shutdown. + If no user ID is supplied, then the room is being shut down through + some mechanism other than a user's request, e.g. through a module's + request. room_id: The ID of the room. """ for callback in self._check_can_shutdown_room_callbacks: -- cgit 1.5.1 From e937e2111a45d0cb3ecc973f95dafafecb6e9c36 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Wed, 6 Sep 2023 13:01:10 +0000 Subject: Add the ability to use `G` (GiB) and `T` (TiB) suffixes in configuration options that refer to numbers of bytes. (#16219) * Add more suffixes to `parse_size` * Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) --------- Signed-off-by: Olivier Wilkinson (reivilibre) --- changelog.d/16219.feature | 1 + docs/usage/configuration/config_documentation.md | 4 +++- synapse/config/_base.py | 7 ++++--- 3 files changed, 8 insertions(+), 4 deletions(-) create mode 100644 changelog.d/16219.feature (limited to 'synapse') diff --git a/changelog.d/16219.feature b/changelog.d/16219.feature new file mode 100644 index 0000000000..c789f2abb7 --- /dev/null +++ b/changelog.d/16219.feature @@ -0,0 +1 @@ +Add the ability to use `G` (GiB) and `T` (TiB) suffixes in configuration options that refer to numbers of bytes. \ No newline at end of file diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 0b1725816e..97fd1beb39 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -25,8 +25,10 @@ messages from the database after 5 minutes, rather than 5 months. In addition, configuration options referring to size use the following suffixes: -* `M` = MiB, or 1,048,576 bytes * `K` = KiB, or 1024 bytes +* `M` = MiB, or 1,048,576 bytes +* `G` = GiB, or 1,073,741,824 bytes +* `T` = TiB, or 1,099,511,627,776 bytes For example, setting `max_avatar_size: 10M` means that Synapse will not accept files larger than 10,485,760 bytes for a user avatar. diff --git a/synapse/config/_base.py b/synapse/config/_base.py index 69a8318127..58856839e1 100644 --- a/synapse/config/_base.py +++ b/synapse/config/_base.py @@ -179,8 +179,9 @@ class Config: If an integer is provided it is treated as bytes and is unchanged. - String byte sizes can have a suffix of 'K' or `M`, representing kibibytes and - mebibytes respectively. No suffix is understood as a plain byte count. + String byte sizes can have a suffix of 'K', `M`, `G` or `T`, + representing kibibytes, mebibytes, gibibytes and tebibytes respectively. + No suffix is understood as a plain byte count. Raises: TypeError, if given something other than an integer or a string @@ -189,7 +190,7 @@ class Config: if type(value) is int: # noqa: E721 return value elif isinstance(value, str): - sizes = {"K": 1024, "M": 1024 * 1024} + sizes = {"K": 1024, "M": 1024 * 1024, "G": 1024**3, "T": 1024**4} size = 1 suffix = value[-1] if suffix in sizes: -- cgit 1.5.1 From fe69e7f617199f51eb97f510a0a934fdcf02fbad Mon Sep 17 00:00:00 2001 From: Aurélien Grimpard Date: Wed, 6 Sep 2023 20:32:24 +0200 Subject: Handle "registration_enabled" parameter for CAS (#16262) Similar to OIDC, CAS providers can now disable registration such that only existing users are able to login via SSO. --- changelog.d/16262.feature | 1 + docs/usage/configuration/config_documentation.md | 7 +++++++ synapse/config/cas.py | 3 +++ synapse/handlers/cas.py | 2 ++ tests/handlers/test_cas.py | 17 +++++++++++++++++ 5 files changed, 30 insertions(+) create mode 100644 changelog.d/16262.feature (limited to 'synapse') diff --git a/changelog.d/16262.feature b/changelog.d/16262.feature new file mode 100644 index 0000000000..7c8e7e349b --- /dev/null +++ b/changelog.d/16262.feature @@ -0,0 +1 @@ +Add the ability to enable/disable registrations when in the CAS flow. Contributed by Aurélien Grimpard. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 97fd1beb39..42df53d52b 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -3430,6 +3430,12 @@ Has the following sub-options: and the values must match the given value. Alternately if the given value is `None` then any value is allowed (the attribute just must exist). All of the listed attributes must match for the login to be permitted. +* `enable_registration`: set to 'false' to disable automatic registration of new + users. This allows the CAS SSO flow to be limited to sign in only, rather than + automatically registering users that have a valid SSO login but do not have + a pre-registered account. Defaults to true. + + *Added in Synapse 1.93.0.* Example configuration: ```yaml @@ -3441,6 +3447,7 @@ cas_config: required_attributes: userGroup: "staff" department: None + enable_registration: true ``` --- ### `sso` diff --git a/synapse/config/cas.py b/synapse/config/cas.py index 6e2d9addbf..bbc8f43073 100644 --- a/synapse/config/cas.py +++ b/synapse/config/cas.py @@ -57,6 +57,8 @@ class CasConfig(Config): required_attributes ) + self.cas_enable_registration = cas_config.get("enable_registration", True) + self.idp_name = cas_config.get("idp_name", "CAS") self.idp_icon = cas_config.get("idp_icon") self.idp_brand = cas_config.get("idp_brand") @@ -67,6 +69,7 @@ class CasConfig(Config): self.cas_protocol_version = None self.cas_displayname_attribute = None self.cas_required_attributes = [] + self.cas_enable_registration = False # CAS uses a legacy required attributes mapping, not the one provided by diff --git a/synapse/handlers/cas.py b/synapse/handlers/cas.py index a850545453..b5b8b9bd35 100644 --- a/synapse/handlers/cas.py +++ b/synapse/handlers/cas.py @@ -70,6 +70,7 @@ class CasHandler: self._cas_protocol_version = hs.config.cas.cas_protocol_version self._cas_displayname_attribute = hs.config.cas.cas_displayname_attribute self._cas_required_attributes = hs.config.cas.cas_required_attributes + self._cas_enable_registration = hs.config.cas.cas_enable_registration self._http_client = hs.get_proxied_http_client() @@ -395,4 +396,5 @@ class CasHandler: client_redirect_url, cas_response_to_user_attributes, grandfather_existing_users, + registration_enabled=self._cas_enable_registration, ) diff --git a/tests/handlers/test_cas.py b/tests/handlers/test_cas.py index 8582b1cd1e..13e2cd153a 100644 --- a/tests/handlers/test_cas.py +++ b/tests/handlers/test_cas.py @@ -197,6 +197,23 @@ class CasHandlerTestCase(HomeserverTestCase): auth_provider_session_id=None, ) + @override_config({"cas_config": {"enable_registration": False}}) + def test_map_cas_user_does_not_register_new_user(self) -> None: + """Ensures new users are not registered if the enabled registration flag is disabled.""" + + # stub out the auth handler + auth_handler = self.hs.get_auth_handler() + auth_handler.complete_sso_login = AsyncMock() # type: ignore[method-assign] + + cas_response = CasResponse("test_user", {}) + request = _mock_request() + self.get_success( + self.handler._handle_cas_response(request, cas_response, "redirect_uri", "") + ) + + # check that the auth handler was not called as expected + auth_handler.complete_sso_login.assert_not_called() + def _mock_request() -> Mock: """Returns a mock which will stand in as a SynapseRequest""" -- cgit 1.5.1 From 13e9cad537a16108b0cb544ccdc24e7dc2ca33ae Mon Sep 17 00:00:00 2001 From: Marcel Date: Wed, 6 Sep 2023 21:19:17 +0200 Subject: Send the opentracing span information to appservices (#16227) --- changelog.d/16227.feature | 1 + synapse/appservice/api.py | 32 ++++++++++++++++++++++++-------- tests/appservice/test_api.py | 18 ++++++++++++------ 3 files changed, 37 insertions(+), 14 deletions(-) create mode 100644 changelog.d/16227.feature (limited to 'synapse') diff --git a/changelog.d/16227.feature b/changelog.d/16227.feature new file mode 100644 index 0000000000..510062b622 --- /dev/null +++ b/changelog.d/16227.feature @@ -0,0 +1 @@ +Add span information to requests sent to appservices. Contributed by MTRNord. \ No newline at end of file diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index de7a94bf26..b1523be208 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -40,6 +40,7 @@ from synapse.appservice import ( from synapse.events import EventBase from synapse.events.utils import SerializeEventConfig, serialize_event from synapse.http.client import SimpleHttpClient, is_unknown_endpoint +from synapse.logging import opentracing from synapse.types import DeviceListUpdates, JsonDict, ThirdPartyInstanceID from synapse.util.caches.response_cache import ResponseCache @@ -125,6 +126,17 @@ class ApplicationServiceApi(SimpleHttpClient): hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS ) + def _get_headers(self, service: "ApplicationService") -> Dict[bytes, List[bytes]]: + """This makes sure we have always the auth header and opentracing headers set.""" + + # This is also ensured before in the functions. However this is needed to please + # the typechecks. + assert service.hs_token is not None + + headers = {b"Authorization": [b"Bearer " + service.hs_token.encode("ascii")]} + opentracing.inject_header_dict(headers, check_destination=False) + return headers + async def query_user(self, service: "ApplicationService", user_id: str) -> bool: if service.url is None: return False @@ -136,10 +148,11 @@ class ApplicationServiceApi(SimpleHttpClient): args = None if self.config.use_appservice_legacy_authorization: args = {"access_token": service.hs_token} + response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/users/{urllib.parse.quote(user_id)}", args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if response is not None: # just an empty json object return True @@ -162,10 +175,11 @@ class ApplicationServiceApi(SimpleHttpClient): args = None if self.config.use_appservice_legacy_authorization: args = {"access_token": service.hs_token} + response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/rooms/{urllib.parse.quote(alias)}", args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if response is not None: # just an empty json object return True @@ -203,10 +217,11 @@ class ApplicationServiceApi(SimpleHttpClient): **fields, b"access_token": service.hs_token, } + response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/{kind}/{urllib.parse.quote(protocol)}", args=args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if not isinstance(response, list): logger.warning( @@ -243,10 +258,11 @@ class ApplicationServiceApi(SimpleHttpClient): args = None if self.config.use_appservice_legacy_authorization: args = {"access_token": service.hs_token} + info = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/protocol/{urllib.parse.quote(protocol)}", args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if not _is_valid_3pe_metadata(info): @@ -283,7 +299,7 @@ class ApplicationServiceApi(SimpleHttpClient): await self.post_json_get_json( uri=f"{service.url}{APP_SERVICE_PREFIX}/ping", post_json={"transaction_id": txn_id}, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) async def push_bulk( @@ -364,7 +380,7 @@ class ApplicationServiceApi(SimpleHttpClient): f"{service.url}{APP_SERVICE_PREFIX}/transactions/{urllib.parse.quote(str(txn_id))}", json_body=body, args=args, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -437,7 +453,7 @@ class ApplicationServiceApi(SimpleHttpClient): response = await self.post_json_get_json( uri, body, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) except HttpResponseException as e: # The appservice doesn't support this endpoint. @@ -498,7 +514,7 @@ class ApplicationServiceApi(SimpleHttpClient): response = await self.post_json_get_json( uri, query, - headers={"Authorization": [f"Bearer {service.hs_token}"]}, + headers=self._get_headers(service), ) except HttpResponseException as e: # The appservice doesn't support this endpoint. diff --git a/tests/appservice/test_api.py b/tests/appservice/test_api.py index 75fb5fae6b..366b6fd5f0 100644 --- a/tests/appservice/test_api.py +++ b/tests/appservice/test_api.py @@ -76,7 +76,7 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]], ) -> List[JsonDict]: # Ensure the access token is passed as a header. - if not headers or not headers.get("Authorization"): + if not headers or not headers.get(b"Authorization"): raise RuntimeError("Access token not provided") # ... and not as a query param if b"access_token" in args: @@ -84,7 +84,9 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): "Access token should not be passed as a query param." ) - self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) + self.assertEqual( + headers.get(b"Authorization"), [f"Bearer {TOKEN}".encode()] + ) self.request_url = url if url == URL_USER: return SUCCESS_RESULT_USER @@ -152,11 +154,13 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): # Ensure the access token is passed as a both a query param and in the headers. if not args.get(b"access_token"): raise RuntimeError("Access token should be provided in query params.") - if not headers or not headers.get("Authorization"): + if not headers or not headers.get(b"Authorization"): raise RuntimeError("Access token should be provided in auth headers.") self.assertEqual(args.get(b"access_token"), TOKEN) - self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) + self.assertEqual( + headers.get(b"Authorization"), [f"Bearer {TOKEN}".encode()] + ) self.request_url = url if url == URL_USER: return SUCCESS_RESULT_USER @@ -208,10 +212,12 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]], ) -> JsonDict: # Ensure the access token is passed as both a header and query arg. - if not headers.get("Authorization"): + if not headers.get(b"Authorization"): raise RuntimeError("Access token not provided") - self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) + self.assertEqual( + headers.get(b"Authorization"), [f"Bearer {TOKEN}".encode()] + ) return RESPONSE # We assign to a method, which mypy doesn't like. -- cgit 1.5.1 From 8940d1b28ecbaf9185459e2af62169ecf39a96f5 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Sep 2023 10:26:07 +0100 Subject: Add `/notifications` endpoint to workers (#16265) --- changelog.d/16265.feature | 1 + docker/configure_workers_and_start.py | 1 + docs/workers.md | 1 + synapse/rest/__init__.py | 2 +- synapse/rest/client/notifications.py | 2 + .../storage/databases/main/event_push_actions.py | 72 +++++++++++----------- 6 files changed, 42 insertions(+), 37 deletions(-) create mode 100644 changelog.d/16265.feature (limited to 'synapse') diff --git a/changelog.d/16265.feature b/changelog.d/16265.feature new file mode 100644 index 0000000000..3ffa16dbcb --- /dev/null +++ b/changelog.d/16265.feature @@ -0,0 +1 @@ +Allow `/notifications` endpoint to be routed to workers. diff --git a/docker/configure_workers_and_start.py b/docker/configure_workers_and_start.py index 400a7515aa..62952e6b26 100755 --- a/docker/configure_workers_and_start.py +++ b/docker/configure_workers_and_start.py @@ -183,6 +183,7 @@ WORKERS_CONFIG: Dict[str, Dict[str, Any]] = { "^/_matrix/client/(r0|v3|unstable)/password_policy$", "^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$", "^/_matrix/client/(r0|v3|unstable)/capabilities$", + "^/_matrix/client/(r0|v3|unstable)/notifications$", ], "shared_extra_conf": {}, "worker_extra_conf": "", diff --git a/docs/workers.md b/docs/workers.md index 24bd22724e..dc76b073de 100644 --- a/docs/workers.md +++ b/docs/workers.md @@ -246,6 +246,7 @@ information. ^/_matrix/client/(r0|v3|unstable)/user/.*/filter(/|$) ^/_matrix/client/(api/v1|r0|v3|unstable)/directory/room/.*$ ^/_matrix/client/(r0|v3|unstable)/capabilities$ + ^/_matrix/client/(r0|v3|unstable)/notifications$ # Encryption requests ^/_matrix/client/(r0|v3|unstable)/keys/query$ diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index df0845edb2..1be9c47c61 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -123,7 +123,7 @@ class ClientRestResource(JsonResource): if is_main_process: report_event.register_servlets(hs, client_resource) openid.register_servlets(hs, client_resource) - notifications.register_servlets(hs, client_resource) + notifications.register_servlets(hs, client_resource) devices.register_servlets(hs, client_resource) if is_main_process: thirdparty.register_servlets(hs, client_resource) diff --git a/synapse/rest/client/notifications.py b/synapse/rest/client/notifications.py index ea10042569..e7fe1332e7 100644 --- a/synapse/rest/client/notifications.py +++ b/synapse/rest/client/notifications.py @@ -36,6 +36,8 @@ logger = logging.getLogger(__name__) class NotificationsServlet(RestServlet): PATTERNS = client_patterns("/notifications$") + CATEGORY = "Client API requests" + def __init__(self, hs: "HomeServer"): super().__init__() self.store = hs.get_datastores().main diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 07bda7d6be..b958a39aeb 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -1740,42 +1740,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas # We sleep to ensure that we don't overwhelm the DB. await self._clock.sleep(1.0) - -class EventPushActionsStore(EventPushActionsWorkerStore): - EPA_HIGHLIGHT_INDEX = "epa_highlight_index" - - def __init__( - self, - database: DatabasePool, - db_conn: LoggingDatabaseConnection, - hs: "HomeServer", - ): - super().__init__(database, db_conn, hs) - - self.db_pool.updates.register_background_index_update( - self.EPA_HIGHLIGHT_INDEX, - index_name="event_push_actions_u_highlight", - table="event_push_actions", - columns=["user_id", "stream_ordering"], - ) - - self.db_pool.updates.register_background_index_update( - "event_push_actions_highlights_index", - index_name="event_push_actions_highlights_index", - table="event_push_actions", - columns=["user_id", "room_id", "topological_ordering", "stream_ordering"], - where_clause="highlight=1", - ) - - # Add index to make deleting old push actions faster. - self.db_pool.updates.register_background_index_update( - "event_push_actions_stream_highlight_index", - index_name="event_push_actions_stream_highlight_index", - table="event_push_actions", - columns=["highlight", "stream_ordering"], - where_clause="highlight=0", - ) - async def get_push_actions_for_user( self, user_id: str, @@ -1834,6 +1798,42 @@ class EventPushActionsStore(EventPushActionsWorkerStore): ] +class EventPushActionsStore(EventPushActionsWorkerStore): + EPA_HIGHLIGHT_INDEX = "epa_highlight_index" + + def __init__( + self, + database: DatabasePool, + db_conn: LoggingDatabaseConnection, + hs: "HomeServer", + ): + super().__init__(database, db_conn, hs) + + self.db_pool.updates.register_background_index_update( + self.EPA_HIGHLIGHT_INDEX, + index_name="event_push_actions_u_highlight", + table="event_push_actions", + columns=["user_id", "stream_ordering"], + ) + + self.db_pool.updates.register_background_index_update( + "event_push_actions_highlights_index", + index_name="event_push_actions_highlights_index", + table="event_push_actions", + columns=["user_id", "room_id", "topological_ordering", "stream_ordering"], + where_clause="highlight=1", + ) + + # Add index to make deleting old push actions faster. + self.db_pool.updates.register_background_index_update( + "event_push_actions_stream_highlight_index", + index_name="event_push_actions_stream_highlight_index", + table="event_push_actions", + columns=["highlight", "stream_ordering"], + where_clause="highlight=0", + ) + + def _action_has_highlight(actions: Collection[Union[Mapping, str]]) -> bool: for action in actions: if not isinstance(action, dict): -- cgit 1.5.1 From 1cd410a7833984ef69a7dcecf8997f4c45d609cd Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 7 Sep 2023 13:45:43 +0100 Subject: Recheck if remote device is cached before requesting it (#16252) This fixes a bug where we could get stuck re-requesting the device over replication again and again. --- changelog.d/16252.bugfix | 1 + synapse/handlers/device.py | 21 +++++++++++++++------ synapse/replication/http/devices.py | 4 ++-- synapse/storage/databases/main/devices.py | 26 +++++++++++++++++--------- 4 files changed, 35 insertions(+), 17 deletions(-) create mode 100644 changelog.d/16252.bugfix (limited to 'synapse') diff --git a/changelog.d/16252.bugfix b/changelog.d/16252.bugfix new file mode 100644 index 0000000000..881bc00e61 --- /dev/null +++ b/changelog.d/16252.bugfix @@ -0,0 +1 @@ +Fix bug when using workers where Synapse could end up re-requesting the same remote device repeatedly. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9e52af5f13..9356ae998e 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -1030,7 +1030,7 @@ class DeviceListWorkerUpdater: async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True - ) -> Dict[str, Optional[JsonDict]]: + ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. @@ -1059,6 +1059,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): self._notifier = hs.get_notifier() self._remote_edu_linearizer = Linearizer(name="remote_device_list") + self._resync_linearizer = Linearizer(name="remote_device_resync") # user_id -> list of updates waiting to be handled. self._pending_updates: Dict[ @@ -1301,7 +1302,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): async def multi_user_device_resync( self, user_ids: List[str], mark_failed_as_stale: bool = True - ) -> Dict[str, Optional[JsonDict]]: + ) -> Dict[str, Optional[JsonMapping]]: """ Like `user_device_resync` but operates on multiple users **from the same origin** at once. @@ -1321,9 +1322,11 @@ class DeviceListUpdater(DeviceListWorkerUpdater): failed = set() # TODO(Perf): Actually batch these up for user_id in user_ids: - user_result, user_failed = await self._user_device_resync_returning_failed( - user_id - ) + async with self._resync_linearizer.queue(user_id): + ( + user_result, + user_failed, + ) = await self._user_device_resync_returning_failed(user_id) result[user_id] = user_result if user_failed: failed.add(user_id) @@ -1335,7 +1338,7 @@ class DeviceListUpdater(DeviceListWorkerUpdater): async def _user_device_resync_returning_failed( self, user_id: str - ) -> Tuple[Optional[JsonDict], bool]: + ) -> Tuple[Optional[JsonMapping], bool]: """Fetches all devices for a user and updates the device cache with them. Args: @@ -1348,6 +1351,12 @@ class DeviceListUpdater(DeviceListWorkerUpdater): e.g. due to a connection problem. - True iff the resync failed and the device list should be marked as stale. """ + # Check that we haven't gone and fetched the devices since we last + # checked if we needed to resync these device lists. + if await self.store.get_users_whose_devices_are_cached([user_id]): + cached = await self.store.get_cached_devices_for_user(user_id) + return cached, False + logger.debug("Attempting to resync the device list for %s", user_id) log_kv({"message": "Doing resync to update device list."}) # Fetch all devices for the user. diff --git a/synapse/replication/http/devices.py b/synapse/replication/http/devices.py index 209833d287..b8198e059c 100644 --- a/synapse/replication/http/devices.py +++ b/synapse/replication/http/devices.py @@ -20,7 +20,7 @@ from twisted.web.server import Request from synapse.http.server import HttpServer from synapse.logging.opentracing import active_span from synapse.replication.http._base import ReplicationEndpoint -from synapse.types import JsonDict +from synapse.types import JsonDict, JsonMapping if TYPE_CHECKING: from synapse.server import HomeServer @@ -82,7 +82,7 @@ class ReplicationMultiUserDevicesResyncRestServlet(ReplicationEndpoint): async def _handle_request( # type: ignore[override] self, request: Request, content: JsonDict - ) -> Tuple[int, Dict[str, Optional[JsonDict]]]: + ) -> Tuple[int, Dict[str, Optional[JsonMapping]]]: user_ids: List[str] = content["user_ids"] logger.info("Resync for %r", user_ids) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 324fdfa892..70faf4b1ec 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -759,18 +759,10 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): mapping of user_id -> device_id -> device_info. """ unique_user_ids = user_ids | {user_id for user_id, _ in user_and_device_ids} - user_map = await self.get_device_list_last_stream_id_for_remotes( - list(unique_user_ids) - ) - # We go and check if any of the users need to have their device lists - # resynced. If they do then we remove them from the cached list. - users_needing_resync = await self.get_user_ids_requiring_device_list_resync( + user_ids_in_cache = await self.get_users_whose_devices_are_cached( unique_user_ids ) - user_ids_in_cache = { - user_id for user_id, stream_id in user_map.items() if stream_id - } - users_needing_resync user_ids_not_in_cache = unique_user_ids - user_ids_in_cache # First fetch all the users which all devices are to be returned. @@ -792,6 +784,22 @@ class DeviceWorkerStore(RoomMemberWorkerStore, EndToEndKeyWorkerStore): return user_ids_not_in_cache, results + async def get_users_whose_devices_are_cached( + self, user_ids: StrCollection + ) -> Set[str]: + """Checks which of the given users we have cached the devices for.""" + user_map = await self.get_device_list_last_stream_id_for_remotes(user_ids) + + # We go and check if any of the users need to have their device lists + # resynced. If they do then we remove them from the cached list. + users_needing_resync = await self.get_user_ids_requiring_device_list_resync( + user_ids + ) + user_ids_in_cache = { + user_id for user_id, stream_id in user_map.items() if stream_id + } - users_needing_resync + return user_ids_in_cache + @cached(num_args=2, tree=True) async def _get_cached_user_device(self, user_id: str, device_id: str) -> JsonDict: content = await self.db_pool.simple_select_one_onecol( -- cgit 1.5.1 From d23c394669660a7226c818f222a76ec0905e126e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 Sep 2023 13:06:00 +0100 Subject: Reduce CPU overhead of change password endpoint (#16264) --- changelog.d/16264.misc | 1 + synapse/rest/client/account.py | 112 ++++++++++++++++++++--------------------- 2 files changed, 55 insertions(+), 58 deletions(-) create mode 100644 changelog.d/16264.misc (limited to 'synapse') diff --git a/changelog.d/16264.misc b/changelog.d/16264.misc new file mode 100644 index 0000000000..a744434bef --- /dev/null +++ b/changelog.d/16264.misc @@ -0,0 +1 @@ +Reduce CPU overhead of change password endpoint. diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 679ab9f266..196b292890 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -179,85 +179,81 @@ class PasswordRestServlet(RestServlet): # # In the second case, we require a password to confirm their identity. - requester = None - if self.auth.has_access_token(request): - requester = await self.auth.get_user_by_req(request) - try: + try: + requester = None + if self.auth.has_access_token(request): + requester = await self.auth.get_user_by_req(request) params, session_id = await self.auth_handler.validate_user_via_ui_auth( requester, request, body.dict(exclude_unset=True), "modify your account password", ) - except InteractiveAuthIncompleteError as e: - # The user needs to provide more steps to complete auth, but - # they're not required to provide the password again. - # - # If a password is available now, hash the provided password and - # store it for later. - if new_password: - new_password_hash = await self.auth_handler.hash(new_password) - await self.auth_handler.set_session_data( - e.session_id, - UIAuthSessionDataConstants.PASSWORD_HASH, - new_password_hash, - ) - raise - user_id = requester.user.to_string() - else: - try: + user_id = requester.user.to_string() + else: result, params, session_id = await self.auth_handler.check_ui_auth( [[LoginType.EMAIL_IDENTITY]], request, body.dict(exclude_unset=True), "modify your account password", ) - except InteractiveAuthIncompleteError as e: - # The user needs to provide more steps to complete auth, but - # they're not required to provide the password again. - # - # If a password is available now, hash the provided password and - # store it for later. - if new_password: - new_password_hash = await self.auth_handler.hash(new_password) - await self.auth_handler.set_session_data( - e.session_id, - UIAuthSessionDataConstants.PASSWORD_HASH, - new_password_hash, + + if LoginType.EMAIL_IDENTITY in result: + threepid = result[LoginType.EMAIL_IDENTITY] + if "medium" not in threepid or "address" not in threepid: + raise SynapseError(500, "Malformed threepid") + if threepid["medium"] == "email": + # For emails, canonicalise the address. + # We store all email addresses canonicalised in the DB. + # (See add_threepid in synapse/handlers/auth.py) + try: + threepid["address"] = validate_email(threepid["address"]) + except ValueError as e: + raise SynapseError(400, str(e)) + # if using email, we must know about the email they're authing with! + threepid_user_id = await self.datastore.get_user_id_by_threepid( + threepid["medium"], threepid["address"] ) + if not threepid_user_id: + raise SynapseError( + 404, "Email address not found", Codes.NOT_FOUND + ) + user_id = threepid_user_id + else: + logger.error("Auth succeeded but no known type! %r", result.keys()) + raise SynapseError(500, "", Codes.UNKNOWN) + + except InteractiveAuthIncompleteError as e: + # The user needs to provide more steps to complete auth, but + # they're not required to provide the password again. + # + # If a password is available now, hash the provided password and + # store it for later. We only do this if we don't already have the + # password hash stored, to avoid repeatedly hashing the password. + + if not new_password: raise - if LoginType.EMAIL_IDENTITY in result: - threepid = result[LoginType.EMAIL_IDENTITY] - if "medium" not in threepid or "address" not in threepid: - raise SynapseError(500, "Malformed threepid") - if threepid["medium"] == "email": - # For emails, canonicalise the address. - # We store all email addresses canonicalised in the DB. - # (See add_threepid in synapse/handlers/auth.py) - try: - threepid["address"] = validate_email(threepid["address"]) - except ValueError as e: - raise SynapseError(400, str(e)) - # if using email, we must know about the email they're authing with! - threepid_user_id = await self.datastore.get_user_id_by_threepid( - threepid["medium"], threepid["address"] - ) - if not threepid_user_id: - raise SynapseError(404, "Email address not found", Codes.NOT_FOUND) - user_id = threepid_user_id - else: - logger.error("Auth succeeded but no known type! %r", result.keys()) - raise SynapseError(500, "", Codes.UNKNOWN) + existing_session_password_hash = await self.auth_handler.get_session_data( + e.session_id, UIAuthSessionDataConstants.PASSWORD_HASH, None + ) + if existing_session_password_hash: + raise + + new_password_hash = await self.auth_handler.hash(new_password) + await self.auth_handler.set_session_data( + e.session_id, + UIAuthSessionDataConstants.PASSWORD_HASH, + new_password_hash, + ) + raise # If we have a password in this request, prefer it. Otherwise, use the # password hash from an earlier request. if new_password: password_hash: Optional[str] = await self.auth_handler.hash(new_password) elif session_id is not None: - password_hash = await self.auth_handler.get_session_data( - session_id, UIAuthSessionDataConstants.PASSWORD_HASH, None - ) + password_hash = existing_session_password_hash else: # UI validation was skipped, but the request did not include a new # password. -- cgit 1.5.1 From 69b74d9330e42fc91a9c7423d00a06cd6d3732bf Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 8 Sep 2023 08:57:56 -0400 Subject: Avoid temporary storage of sensitive information. (#16272) During the UI auth process, avoid storing sensitive information into the database. --- changelog.d/16272.bugfix | 1 + synapse/rest/client/account.py | 4 ++-- tests/rest/client/test_account.py | 13 +++++++++++++ 3 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 changelog.d/16272.bugfix (limited to 'synapse') diff --git a/changelog.d/16272.bugfix b/changelog.d/16272.bugfix new file mode 100644 index 0000000000..afb22a999f --- /dev/null +++ b/changelog.d/16272.bugfix @@ -0,0 +1 @@ +Avoid temporary storage of sensitive information. diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py index 196b292890..49cd0805fd 100644 --- a/synapse/rest/client/account.py +++ b/synapse/rest/client/account.py @@ -186,7 +186,7 @@ class PasswordRestServlet(RestServlet): params, session_id = await self.auth_handler.validate_user_via_ui_auth( requester, request, - body.dict(exclude_unset=True), + body.dict(exclude_unset=True, exclude={"new_password"}), "modify your account password", ) user_id = requester.user.to_string() @@ -194,7 +194,7 @@ class PasswordRestServlet(RestServlet): result, params, session_id = await self.auth_handler.check_ui_auth( [[LoginType.EMAIL_IDENTITY]], request, - body.dict(exclude_unset=True), + body.dict(exclude_unset=True, exclude={"new_password"}), "modify your account password", ) diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py index e9f495e206..4a0eca5b30 100644 --- a/tests/rest/client/test_account.py +++ b/tests/rest/client/test_account.py @@ -31,6 +31,7 @@ from synapse.rest import admin from synapse.rest.client import account, login, register, room from synapse.rest.synapse.client.password_reset import PasswordResetSubmitTokenResource from synapse.server import HomeServer +from synapse.storage._base import db_to_json from synapse.types import JsonDict, UserID from synapse.util import Clock @@ -134,6 +135,18 @@ class PasswordResetTestCase(unittest.HomeserverTestCase): # Assert we can't log in with the old password self.attempt_wrong_password_login("kermit", old_password) + # Check that the UI Auth information doesn't store the password in the database. + # + # Note that we don't have the UI Auth session ID, so just pull out the single + # row. + ui_auth_data = self.get_success( + self.store.db_pool.simple_select_one( + "ui_auth_sessions", keyvalues={}, retcols=("clientdict",) + ) + ) + client_dict = db_to_json(ui_auth_data["clientdict"]) + self.assertNotIn("new_password", client_dict) + @override_config({"rc_3pid_validation": {"burst_count": 3}}) def test_ratelimit_by_email(self) -> None: """Test that we ratelimit /requestToken for the same email.""" -- cgit 1.5.1 From f43d99462413b0b572da2e52037db8b1135f5ea6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 8 Sep 2023 14:43:01 +0100 Subject: Fix bug with new task scheduler using lots of CPU. (#16278) Using the new `TaskScheduler` meant that we'ed create lots of new metrics (due to adding task ID to the desc of background process), resulting in requests for metrics taking an increasing amount of CPU. --- changelog.d/16278.misc | 1 + synapse/util/task_scheduler.py | 43 +++++++++++++++++++++--------------------- 2 files changed, 23 insertions(+), 21 deletions(-) create mode 100644 changelog.d/16278.misc (limited to 'synapse') diff --git a/changelog.d/16278.misc b/changelog.d/16278.misc new file mode 100644 index 0000000000..e82a470c45 --- /dev/null +++ b/changelog.d/16278.misc @@ -0,0 +1 @@ +Fix using the new task scheduler causing lots of CPU to be used. diff --git a/synapse/util/task_scheduler.py b/synapse/util/task_scheduler.py index 9b2581e51a..b7de201bde 100644 --- a/synapse/util/task_scheduler.py +++ b/synapse/util/task_scheduler.py @@ -19,6 +19,7 @@ from prometheus_client import Gauge from twisted.python.failure import Failure +from synapse.logging.context import nested_logging_context from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import JsonMapping, ScheduledTask, TaskStatus from synapse.util.stringutils import random_string @@ -316,26 +317,27 @@ class TaskScheduler: function = self._actions[task.action] async def wrapper() -> None: - try: - (status, result, error) = await function(task) - except Exception: - f = Failure() - logger.error( - f"scheduled task {task.id} failed", - exc_info=(f.type, f.value, f.getTracebackObject()), + with nested_logging_context(task.id): + try: + (status, result, error) = await function(task) + except Exception: + f = Failure() + logger.error( + f"scheduled task {task.id} failed", + exc_info=(f.type, f.value, f.getTracebackObject()), + ) + status = TaskStatus.FAILED + result = None + error = f.getErrorMessage() + + await self._store.update_scheduled_task( + task.id, + self._clock.time_msec(), + status=status, + result=result, + error=error, ) - status = TaskStatus.FAILED - result = None - error = f.getErrorMessage() - - await self._store.update_scheduled_task( - task.id, - self._clock.time_msec(), - status=status, - result=result, - error=error, - ) - self._running_tasks.remove(task.id) + self._running_tasks.remove(task.id) if len(self._running_tasks) >= TaskScheduler.MAX_CONCURRENT_RUNNING_TASKS: return @@ -353,5 +355,4 @@ class TaskScheduler: self._running_tasks.add(task.id) await self.update_task(task.id, status=TaskStatus.ACTIVE) - description = f"{task.id}-{task.action}" - run_as_background_process(description, wrapper) + run_as_background_process(task.action, wrapper) -- cgit 1.5.1 From c1c6c95d72b5c9fc6c0e527eeb6b9d3a59889b16 Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 8 Sep 2023 14:50:13 +0100 Subject: Log values at DEBUG level with execute_values (#16281) --- changelog.d/16281.misc | 1 + synapse/storage/database.py | 5 +++-- 2 files changed, 4 insertions(+), 2 deletions(-) create mode 100644 changelog.d/16281.misc (limited to 'synapse') diff --git a/changelog.d/16281.misc b/changelog.d/16281.misc new file mode 100644 index 0000000000..de48396aff --- /dev/null +++ b/changelog.d/16281.misc @@ -0,0 +1 @@ +Include values in SQL debug when using `execute_values` with Postgres. diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 55ac313f33..6c5fcdcec3 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -422,10 +422,11 @@ class LoggingTransaction: return self._do_execute( # TODO: is it safe for values to be Iterable[Iterable[Any]] here? # https://www.psycopg.org/docs/extras.html?highlight=execute_batch#psycopg2.extras.execute_values says values should be Sequence[Sequence] - lambda the_sql: execute_values( - self.txn, the_sql, values, template=template, fetch=fetch + lambda the_sql, the_values: execute_values( + self.txn, the_sql, the_values, template=template, fetch=fetch ), sql, + values, ) def execute(self, sql: str, parameters: SQLQueryParameters = ()) -> None: -- cgit 1.5.1 From aa483cb4c905bbe483ffe8e8a8f439655a57481b Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 8 Sep 2023 11:24:36 -0400 Subject: Update ruff config (#16283) Enable additional checks & clean-up unneeded configuration. --- changelog.d/16283.misc | 1 + contrib/cmdclient/http.py | 2 -- docker/start.py | 2 +- pyproject.toml | 28 +++++++++++++++---------- scripts-dev/mypy_synapse_plugin.py | 7 ++++--- synapse/_scripts/update_synapse_database.py | 1 - synapse/events/snapshot.py | 2 -- synapse/media/url_previewer.py | 4 +--- synapse/storage/background_updates.py | 2 -- synmark/suites/logging.py | 2 +- tests/handlers/test_device.py | 2 +- tests/handlers/test_federation.py | 2 +- tests/logging/test_remote_handler.py | 12 +++++------ tests/replication/tcp/streams/test_to_device.py | 2 +- tests/rest/admin/test_federation.py | 6 +++--- tests/rest/client/test_account.py | 2 +- tests/rest/client/test_login.py | 8 +++---- tests/rest/client/test_register.py | 6 +++--- tests/storage/databases/main/test_lock.py | 2 +- tests/storage/test_event_chain.py | 6 +++--- tests/storage/test_event_federation.py | 6 +++--- tests/storage/test_profile.py | 4 ++-- tests/storage/test_txn_limit.py | 2 +- tests/storage/test_user_filters.py | 4 ++-- tests/test_visibility.py | 8 +++---- tests/util/caches/test_descriptors.py | 4 ++-- 26 files changed, 63 insertions(+), 64 deletions(-) create mode 100644 changelog.d/16283.misc (limited to 'synapse') diff --git a/changelog.d/16283.misc b/changelog.d/16283.misc new file mode 100644 index 0000000000..4b9d6f76ae --- /dev/null +++ b/changelog.d/16283.misc @@ -0,0 +1 @@ +Enable additional linting checks. diff --git a/contrib/cmdclient/http.py b/contrib/cmdclient/http.py index 1310f078e3..508de5dcbd 100644 --- a/contrib/cmdclient/http.py +++ b/contrib/cmdclient/http.py @@ -37,7 +37,6 @@ class HttpClient: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. """ - pass def get_json(self, url, args=None): """Gets some json from the given host homeserver and path @@ -53,7 +52,6 @@ class HttpClient: Deferred: Succeeds when we get a 2xx HTTP response. The result will be the decoded JSON body. """ - pass class TwistedHttpClient(HttpClient): diff --git a/docker/start.py b/docker/start.py index aebc7e4aaa..12c444da9a 100755 --- a/docker/start.py +++ b/docker/start.py @@ -239,7 +239,7 @@ def main(args: List[str], environ: MutableMapping[str, str]) -> None: log("Could not find %s, will not use" % (jemallocpath,)) # if there are no config files passed to synapse, try adding the default file - if not any(p.startswith("--config-path") or p.startswith("-c") for p in args): + if not any(p.startswith(("--config-path", "-c")) for p in args): config_dir = environ.get("SYNAPSE_CONFIG_DIR", "/data") config_path = environ.get( "SYNAPSE_CONFIG_PATH", config_dir + "/homeserver.yaml" diff --git a/pyproject.toml b/pyproject.toml index 5b43abe907..8747782b29 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,33 +43,39 @@ target-version = ['py38', 'py39', 'py310', 'py311'] [tool.ruff] line-length = 88 -# See https://github.com/charliermarsh/ruff/#pycodestyle +# See https://beta.ruff.rs/docs/rules/#error-e # for error codes. The ones we ignore are: -# E731: do not assign a lambda expression, use a def # E501: Line too long (black enforces this for us) +# E731: do not assign a lambda expression, use a def # # flake8-bugbear compatible checks. Its error codes are described at -# https://github.com/charliermarsh/ruff/#flake8-bugbear -# B019: Use of functools.lru_cache or functools.cache on methods can lead to memory leaks +# https://beta.ruff.rs/docs/rules/#flake8-bugbear-b # B023: Functions defined inside a loop must not use variables redefined in the loop -# B024: Abstract base class with no abstract method. ignore = [ - "B019", "B023", - "B024", "E501", "E731", ] select = [ - # pycodestyle checks. + # pycodestyle "E", "W", - # pyflakes checks. + # pyflakes "F", - # flake8-bugbear checks. + # flake8-bugbear "B0", - # flake8-comprehensions checks. + # flake8-comprehensions "C4", + # flake8-2020 + "YTT", + # flake8-slots + "SLOT", + # flake8-debugger + "T10", + # flake8-pie + "PIE", + # flake8-executable + "EXE", ] [tool.isort] diff --git a/scripts-dev/mypy_synapse_plugin.py b/scripts-dev/mypy_synapse_plugin.py index 8058e9c993..a0b3854f1b 100644 --- a/scripts-dev/mypy_synapse_plugin.py +++ b/scripts-dev/mypy_synapse_plugin.py @@ -30,9 +30,10 @@ class SynapsePlugin(Plugin): self, fullname: str ) -> Optional[Callable[[MethodSigContext], CallableType]]: if fullname.startswith( - "synapse.util.caches.descriptors.CachedFunction.__call__" - ) or fullname.startswith( - "synapse.util.caches.descriptors._LruCachedFunction.__call__" + ( + "synapse.util.caches.descriptors.CachedFunction.__call__", + "synapse.util.caches.descriptors._LruCachedFunction.__call__", + ) ): return cached_function_method_signature return None diff --git a/synapse/_scripts/update_synapse_database.py b/synapse/_scripts/update_synapse_database.py index f97aecf8d5..992ae43881 100644 --- a/synapse/_scripts/update_synapse_database.py +++ b/synapse/_scripts/update_synapse_database.py @@ -1,4 +1,3 @@ -#!/usr/bin/env python # Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a9e3d4e556..5bdfa3a8ac 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -55,7 +55,6 @@ class UnpersistedEventContextBase(ABC): A method to convert an UnpersistedEventContext to an EventContext, suitable for sending to the database with the associated event. """ - pass @abstractmethod async def get_prev_state_ids( @@ -69,7 +68,6 @@ class UnpersistedEventContextBase(ABC): state_filter: specifies the type of state event to fetch from DB, example: EventTypes.JoinRules """ - pass @attr.s(slots=True, auto_attribs=True) diff --git a/synapse/media/url_previewer.py b/synapse/media/url_previewer.py index 70b32cee17..9b5a3dd5f4 100644 --- a/synapse/media/url_previewer.py +++ b/synapse/media/url_previewer.py @@ -846,9 +846,7 @@ def _is_media(content_type: str) -> bool: def _is_html(content_type: str) -> bool: content_type = content_type.lower() - return content_type.startswith("text/html") or content_type.startswith( - "application/xhtml" - ) + return content_type.startswith(("text/html", "application/xhtml")) def _is_json(content_type: str) -> bool: diff --git a/synapse/storage/background_updates.py b/synapse/storage/background_updates.py index 7619f405fa..99ebd96f84 100644 --- a/synapse/storage/background_updates.py +++ b/synapse/storage/background_updates.py @@ -62,7 +62,6 @@ class Constraint(metaclass=abc.ABCMeta): @abc.abstractmethod def make_check_clause(self, table: str) -> str: """Returns an SQL expression that checks the row passes the constraint.""" - pass @abc.abstractmethod def make_constraint_clause_postgres(self) -> str: @@ -70,7 +69,6 @@ class Constraint(metaclass=abc.ABCMeta): Only used on Postgres DBs """ - pass @attr.s(auto_attribs=True) diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py index 8beb077e0a..04e5b29dc9 100644 --- a/synmark/suites/logging.py +++ b/synmark/suites/logging.py @@ -112,7 +112,7 @@ async def main(reactor, loops): start = perf_counter() # Send a bunch of useful messages - for i in range(0, loops): + for i in range(loops): logger.info("test message %s", i) if len(handler._buffer) == handler.maximum_buffer: diff --git a/tests/handlers/test_device.py b/tests/handlers/test_device.py index 9659a4a355..79d327499b 100644 --- a/tests/handlers/test_device.py +++ b/tests/handlers/test_device.py @@ -223,7 +223,7 @@ class DeviceTestCase(unittest.HomeserverTestCase): # queue a bunch of messages in the inbox requester = create_requester(sender, device_id=DEVICE_ID) - for i in range(0, DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10): + for i in range(DeviceHandler.DEVICE_MSGS_DELETE_BATCH_LIMIT + 10): self.get_success( self.device_message_handler.send_device_message( requester, "message_type", {receiver: {"*": {"val": i}}} diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 21d63ab1f2..4fc0742413 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -262,7 +262,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): if (ev.type, ev.state_key) in {("m.room.create", ""), ("m.room.member", remote_server_user_id)} ] - for _ in range(0, 8): + for _ in range(8): event = make_event_from_dict( self.add_hashes_and_signatures_from_other_server( { diff --git a/tests/logging/test_remote_handler.py b/tests/logging/test_remote_handler.py index 5191e31a8a..45eac100bf 100644 --- a/tests/logging/test_remote_handler.py +++ b/tests/logging/test_remote_handler.py @@ -78,11 +78,11 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase): logger = self.get_logger(handler) # Send some debug messages - for i in range(0, 3): + for i in range(3): logger.debug("debug %s" % (i,)) # Send a bunch of useful messages - for i in range(0, 7): + for i in range(7): logger.info("info %s" % (i,)) # The last debug message pushes it past the maximum buffer @@ -108,15 +108,15 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase): logger = self.get_logger(handler) # Send some debug messages - for i in range(0, 3): + for i in range(3): logger.debug("debug %s" % (i,)) # Send a bunch of useful messages - for i in range(0, 10): + for i in range(10): logger.warning("warn %s" % (i,)) # Send a bunch of info messages - for i in range(0, 3): + for i in range(3): logger.info("info %s" % (i,)) # The last debug message pushes it past the maximum buffer @@ -144,7 +144,7 @@ class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase): logger = self.get_logger(handler) # Send a bunch of useful messages - for i in range(0, 20): + for i in range(20): logger.warning("warn %s" % (i,)) # Allow the reconnection diff --git a/tests/replication/tcp/streams/test_to_device.py b/tests/replication/tcp/streams/test_to_device.py index fb9eac668f..ab379e8cf1 100644 --- a/tests/replication/tcp/streams/test_to_device.py +++ b/tests/replication/tcp/streams/test_to_device.py @@ -49,7 +49,7 @@ class ToDeviceStreamTestCase(BaseStreamTestCase): # add messages to the device inbox for user1 up until the # limit defined for a stream update batch - for i in range(0, _STREAM_UPDATE_TARGET_ROW_COUNT): + for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT): msg["content"] = {"device": {}} messages = {user1: {"device": msg}} diff --git a/tests/rest/admin/test_federation.py b/tests/rest/admin/test_federation.py index 4c7864c629..0e2824d1b5 100644 --- a/tests/rest/admin/test_federation.py +++ b/tests/rest/admin/test_federation.py @@ -510,7 +510,7 @@ class FederationTestCase(unittest.HomeserverTestCase): Args: number_destinations: Number of destinations to be created """ - for i in range(0, number_destinations): + for i in range(number_destinations): dest = f"sub{i}.example.com" self._create_destination(dest, 50, 50, 50, 100) @@ -690,7 +690,7 @@ class DestinationMembershipTestCase(unittest.HomeserverTestCase): self._check_fields(channel_desc.json_body["rooms"]) # test that both lists have different directions - for i in range(0, number_rooms): + for i in range(number_rooms): self.assertEqual( channel_asc.json_body["rooms"][i]["room_id"], channel_desc.json_body["rooms"][number_rooms - 1 - i]["room_id"], @@ -777,7 +777,7 @@ class DestinationMembershipTestCase(unittest.HomeserverTestCase): Args: number_rooms: Number of rooms to be created """ - for _ in range(0, number_rooms): + for _ in range(number_rooms): room_id = self.helper.create_room_as( self.admin_user, tok=self.admin_user_tok ) diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py index 4a0eca5b30..cffbda9a7d 100644 --- a/tests/rest/client/test_account.py +++ b/tests/rest/client/test_account.py @@ -575,7 +575,7 @@ class DeactivateTestCase(unittest.HomeserverTestCase): # create a bunch of users and add keys for them users = [] - for i in range(0, 20): + for i in range(20): user_id = self.register_user("missPiggy" + str(i), "test") users.append((user_id,)) diff --git a/tests/rest/client/test_login.py b/tests/rest/client/test_login.py index a2a6589564..768d7ad4c2 100644 --- a/tests/rest/client/test_login.py +++ b/tests/rest/client/test_login.py @@ -176,10 +176,10 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): def test_POST_ratelimiting_per_address(self) -> None: # Create different users so we're sure not to be bothered by the per-user # ratelimiter. - for i in range(0, 6): + for i in range(6): self.register_user("kermit" + str(i), "monkey") - for i in range(0, 6): + for i in range(6): params = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": "kermit" + str(i)}, @@ -228,7 +228,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): def test_POST_ratelimiting_per_account(self) -> None: self.register_user("kermit", "monkey") - for i in range(0, 6): + for i in range(6): params = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": "kermit"}, @@ -277,7 +277,7 @@ class LoginRestServletTestCase(unittest.HomeserverTestCase): def test_POST_ratelimiting_per_account_failed_attempts(self) -> None: self.register_user("kermit", "monkey") - for i in range(0, 6): + for i in range(6): params = { "type": "m.login.password", "identifier": {"type": "m.id.user", "user": "kermit"}, diff --git a/tests/rest/client/test_register.py b/tests/rest/client/test_register.py index c33393dc28..ba4e017a0e 100644 --- a/tests/rest/client/test_register.py +++ b/tests/rest/client/test_register.py @@ -169,7 +169,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase): @override_config({"rc_registration": {"per_second": 0.17, "burst_count": 5}}) def test_POST_ratelimiting_guest(self) -> None: - for i in range(0, 6): + for i in range(6): url = self.url + b"?kind=guest" channel = self.make_request(b"POST", url, b"{}") @@ -187,7 +187,7 @@ class RegisterRestServletTestCase(unittest.HomeserverTestCase): @override_config({"rc_registration": {"per_second": 0.17, "burst_count": 5}}) def test_POST_ratelimiting(self) -> None: - for i in range(0, 6): + for i in range(6): request_data = { "username": "kermit" + str(i), "password": "monkey", @@ -1223,7 +1223,7 @@ class RegistrationTokenValidityRestServletTestCase(unittest.HomeserverTestCase): def test_GET_ratelimiting(self) -> None: token = "1234" - for i in range(0, 6): + for i in range(6): channel = self.make_request( b"GET", f"{self.url}?token={token}", diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index 650b4941ba..35f77052a7 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -382,7 +382,7 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): self.get_success(lock.__aenter__()) # Wait for ages with the lock, we should not be able to get the lock. - for _ in range(0, 10): + for _ in range(10): self.reactor.advance((_RENEWAL_INTERVAL_MS / 1000)) lock2 = self.get_success( diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 48ebfadaab..b55dd07f14 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): # Add a bunch of state so that it takes multiple iterations of the # background update to process the room. - for i in range(0, 150): + for i in range(150): self.helper.send_state( room_id, event_type="m.test", body={"index": i}, tok=self.token ) @@ -718,12 +718,12 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): # Add a bunch of state so that it takes multiple iterations of the # background update to process the room. - for i in range(0, 150): + for i in range(150): self.helper.send_state( room_id1, event_type="m.test", body={"index": i}, tok=self.token ) - for i in range(0, 150): + for i in range(150): self.helper.send_state( room_id2, event_type="m.test", body={"index": i}, tok=self.token ) diff --git a/tests/storage/test_event_federation.py b/tests/storage/test_event_federation.py index 7a4ecab2d5..d3e20f44b2 100644 --- a/tests/storage/test_event_federation.py +++ b/tests/storage/test_event_federation.py @@ -227,7 +227,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): (room_id, event_id), ) - for i in range(0, 20): + for i in range(20): self.get_success( self.store.db_pool.runInteraction("insert", insert_event, i) ) @@ -235,7 +235,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): # this should get the last ten r = self.get_success(self.store.get_prev_events_for_room(room_id)) self.assertEqual(10, len(r)) - for i in range(0, 10): + for i in range(10): self.assertEqual("$event_%i:local" % (19 - i), r[i]) def test_get_rooms_with_many_extremities(self) -> None: @@ -277,7 +277,7 @@ class EventFederationWorkerStoreTestCase(tests.unittest.HomeserverTestCase): (room_id, event_id), ) - for i in range(0, 20): + for i in range(20): self.get_success( self.store.db_pool.runInteraction("insert", insert_event, i, room1) ) diff --git a/tests/storage/test_profile.py b/tests/storage/test_profile.py index fe5bb77913..95f99f4130 100644 --- a/tests/storage/test_profile.py +++ b/tests/storage/test_profile.py @@ -82,7 +82,7 @@ class ProfileStoreTestCase(unittest.HomeserverTestCase): self.get_success(self.store.db_pool.runInteraction("", f)) - for i in range(0, 70): + for i in range(70): self.get_success( self.store.db_pool.simple_insert( "profiles", @@ -115,7 +115,7 @@ class ProfileStoreTestCase(unittest.HomeserverTestCase): ) expected_values = [] - for i in range(0, 70): + for i in range(70): expected_values.append((f"@hello{i:02}:{self.hs.hostname}",)) res = self.get_success( diff --git a/tests/storage/test_txn_limit.py b/tests/storage/test_txn_limit.py index 15ea4770bd..22f074982f 100644 --- a/tests/storage/test_txn_limit.py +++ b/tests/storage/test_txn_limit.py @@ -38,5 +38,5 @@ class SQLTransactionLimitTestCase(unittest.HomeserverTestCase): db_pool = self.hs.get_datastores().databases[0] # force txn limit to roll over at least once - for _ in range(0, 1001): + for _ in range(1001): self.get_success_or_raise(db_pool.runInteraction("test_select", do_select)) diff --git a/tests/storage/test_user_filters.py b/tests/storage/test_user_filters.py index bab802f56e..d4637d9d1e 100644 --- a/tests/storage/test_user_filters.py +++ b/tests/storage/test_user_filters.py @@ -45,7 +45,7 @@ class UserFiltersStoreTestCase(unittest.HomeserverTestCase): self.get_success(self.store.db_pool.runInteraction("", f)) - for i in range(0, 70): + for i in range(70): self.get_success( self.store.db_pool.simple_insert( "user_filters", @@ -82,7 +82,7 @@ class UserFiltersStoreTestCase(unittest.HomeserverTestCase): ) expected_values = [] - for i in range(0, 70): + for i in range(70): expected_values.append((f"@hello{i:02}:{self.hs.hostname}",)) res = self.get_success( diff --git a/tests/test_visibility.py b/tests/test_visibility.py index a46c29ddf4..434902c3f0 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py @@ -51,12 +51,12 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase): # before we do that, we persist some other events to act as state. self._inject_visibility("@admin:hs", "joined") - for i in range(0, 10): + for i in range(10): self._inject_room_member("@resident%i:hs" % i) events_to_filter = [] - for i in range(0, 10): + for i in range(10): user = "@user%i:%s" % (i, "test_server" if i == 5 else "other_server") evt = self._inject_room_member(user, extra_content={"a": "b"}) events_to_filter.append(evt) @@ -74,7 +74,7 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase): ) # the result should be 5 redacted events, and 5 unredacted events. - for i in range(0, 5): + for i in range(5): self.assertEqual(events_to_filter[i].event_id, filtered[i].event_id) self.assertNotIn("a", filtered[i].content) @@ -177,7 +177,7 @@ class FilterEventsForServerTestCase(unittest.HomeserverTestCase): ) ) - for i in range(0, len(events_to_filter)): + for i in range(len(events_to_filter)): self.assertEqual( events_to_filter[i].event_id, filtered[i].event_id, diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 064f4987df..168419f440 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -623,14 +623,14 @@ class CacheDecoratorTestCase(unittest.HomeserverTestCase): a = A() - for k in range(0, 12): + for k in range(12): yield a.func(k) self.assertEqual(callcount[0], 12) # There must have been at least 2 evictions, meaning if we calculate # all 12 values again, we must get called at least 2 more times - for k in range(0, 12): + for k in range(12): yield a.func(k) self.assertTrue( -- cgit 1.5.1 From edd83f23b710f0caae05d5766b474de3b6f24e9e Mon Sep 17 00:00:00 2001 From: David Robertson Date: Fri, 8 Sep 2023 19:29:38 +0100 Subject: Improve type hints for attrs classes (#16276) --- changelog.d/16276.misc | 1 + synapse/config/oembed.py | 2 +- synapse/storage/controllers/persist_events.py | 8 +++----- synapse/util/async_helpers.py | 25 +++++++++++-------------- synapse/util/caches/dictionary_cache.py | 10 ++++------ synapse/util/caches/expiringcache.py | 20 ++++++++++++-------- synapse/util/caches/ttlcache.py | 10 +++++----- 7 files changed, 37 insertions(+), 39 deletions(-) create mode 100644 changelog.d/16276.misc (limited to 'synapse') diff --git a/changelog.d/16276.misc b/changelog.d/16276.misc new file mode 100644 index 0000000000..93ceaeafc9 --- /dev/null +++ b/changelog.d/16276.misc @@ -0,0 +1 @@ +Improve type hints. diff --git a/synapse/config/oembed.py b/synapse/config/oembed.py index d7959639ee..59bc0b55f4 100644 --- a/synapse/config/oembed.py +++ b/synapse/config/oembed.py @@ -30,7 +30,7 @@ class OEmbedEndpointConfig: # The API endpoint to fetch. api_endpoint: str # The patterns to match. - url_patterns: List[Pattern] + url_patterns: List[Pattern[str]] # The supported formats. formats: Optional[List[str]] diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index abd1d149db..6864f93090 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -154,12 +154,13 @@ class _UpdateCurrentStateTask: _EventPersistQueueTask = Union[_PersistEventsTask, _UpdateCurrentStateTask] +_PersistResult = TypeVar("_PersistResult") @attr.s(auto_attribs=True, slots=True) -class _EventPersistQueueItem: +class _EventPersistQueueItem(Generic[_PersistResult]): task: _EventPersistQueueTask - deferred: ObservableDeferred + deferred: ObservableDeferred[_PersistResult] parent_opentracing_span_contexts: List = attr.ib(factory=list) """A list of opentracing spans waiting for this batch""" @@ -168,9 +169,6 @@ class _EventPersistQueueItem: """The opentracing span under which the persistence actually happened""" -_PersistResult = TypeVar("_PersistResult") - - class _EventPeristenceQueue(Generic[_PersistResult]): """Queues up tasks so that they can be processed with only one concurrent transaction per room. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 943ad54456..0cbeb0c365 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -19,6 +19,7 @@ import collections import inspect import itertools import logging +import typing from contextlib import asynccontextmanager from typing import ( Any, @@ -29,6 +30,7 @@ from typing import ( Collection, Coroutine, Dict, + Generator, Generic, Hashable, Iterable, @@ -398,7 +400,7 @@ class _LinearizerEntry: # The number of things executing. count: int # Deferreds for the things blocked from executing. - deferreds: collections.OrderedDict + deferreds: typing.OrderedDict["defer.Deferred[None]", Literal[1]] class Linearizer: @@ -717,30 +719,25 @@ def timeout_deferred( return new_d -# This class can't be generic because it uses slots with attrs. -# See: https://github.com/python-attrs/attrs/issues/313 @attr.s(slots=True, frozen=True, auto_attribs=True) -class DoneAwaitable: # should be: Generic[R] +class DoneAwaitable(Awaitable[R]): """Simple awaitable that returns the provided value.""" - value: Any # should be: R + value: R - def __await__(self) -> Any: - return self - - def __iter__(self) -> "DoneAwaitable": - return self - - def __next__(self) -> None: - raise StopIteration(self.value) + def __await__(self) -> Generator[Any, None, R]: + yield None + return self.value def maybe_awaitable(value: Union[Awaitable[R], R]) -> Awaitable[R]: """Convert a value to an awaitable if not already an awaitable.""" if inspect.isawaitable(value): - assert isinstance(value, Awaitable) return value + # For some reason mypy doesn't deduce that value is not Awaitable here, even though + # inspect.isawaitable returns a TypeGuard. + assert not isinstance(value, Awaitable) return DoneAwaitable(value) diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 5eaf70c7ab..2fbc7b1e6c 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -14,7 +14,7 @@ import enum import logging import threading -from typing import Any, Dict, Generic, Iterable, Optional, Set, Tuple, TypeVar, Union +from typing import Dict, Generic, Iterable, Optional, Set, Tuple, TypeVar, Union import attr from typing_extensions import Literal @@ -33,10 +33,8 @@ DKT = TypeVar("DKT") DV = TypeVar("DV") -# This class can't be generic because it uses slots with attrs. -# See: https://github.com/python-attrs/attrs/issues/313 @attr.s(slots=True, frozen=True, auto_attribs=True) -class DictionaryEntry: # should be: Generic[DKT, DV]. +class DictionaryEntry(Generic[DKT, DV]): """Returned when getting an entry from the cache If `full` is true then `known_absent` will be the empty set. @@ -50,8 +48,8 @@ class DictionaryEntry: # should be: Generic[DKT, DV]. """ full: bool - known_absent: Set[Any] # should be: Set[DKT] - value: Dict[Any, Any] # should be: Dict[DKT, DV] + known_absent: Set[DKT] + value: Dict[DKT, DV] def __len__(self) -> int: return len(self.value) diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 01ad02af67..8e4c34039d 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -14,7 +14,7 @@ import logging from collections import OrderedDict -from typing import Any, Generic, Optional, TypeVar, Union, overload +from typing import Any, Generic, Iterable, Optional, TypeVar, Union, overload import attr from typing_extensions import Literal @@ -73,7 +73,7 @@ class ExpiringCache(Generic[KT, VT]): self._expiry_ms = expiry_ms self._reset_expiry_on_get = reset_expiry_on_get - self._cache: OrderedDict[KT, _CacheEntry] = OrderedDict() + self._cache: OrderedDict[KT, _CacheEntry[VT]] = OrderedDict() self.iterable = iterable @@ -100,7 +100,10 @@ class ExpiringCache(Generic[KT, VT]): while self._max_size and len(self) > self._max_size: _key, value = self._cache.popitem(last=False) if self.iterable: - self.metrics.inc_evictions(EvictionReason.size, len(value.value)) + # type-ignore, here and below: if self.iterable is true, then the value + # type VT should be Sized (i.e. have a __len__ method). We don't enforce + # this via the type system at present. + self.metrics.inc_evictions(EvictionReason.size, len(value.value)) # type: ignore[arg-type] else: self.metrics.inc_evictions(EvictionReason.size) @@ -134,7 +137,7 @@ class ExpiringCache(Generic[KT, VT]): return default if self.iterable: - self.metrics.inc_evictions(EvictionReason.invalidation, len(value.value)) + self.metrics.inc_evictions(EvictionReason.invalidation, len(value.value)) # type: ignore[arg-type] else: self.metrics.inc_evictions(EvictionReason.invalidation) @@ -182,7 +185,7 @@ class ExpiringCache(Generic[KT, VT]): for k in keys_to_delete: value = self._cache.pop(k) if self.iterable: - self.metrics.inc_evictions(EvictionReason.time, len(value.value)) + self.metrics.inc_evictions(EvictionReason.time, len(value.value)) # type: ignore[arg-type] else: self.metrics.inc_evictions(EvictionReason.time) @@ -195,7 +198,8 @@ class ExpiringCache(Generic[KT, VT]): def __len__(self) -> int: if self.iterable: - return sum(len(entry.value) for entry in self._cache.values()) + g: Iterable[int] = (len(entry.value) for entry in self._cache.values()) # type: ignore[arg-type] + return sum(g) else: return len(self._cache) @@ -218,6 +222,6 @@ class ExpiringCache(Generic[KT, VT]): @attr.s(slots=True, auto_attribs=True) -class _CacheEntry: +class _CacheEntry(Generic[VT]): time: int - value: Any + value: VT diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index f6b3ee31e4..48a6e4a906 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -35,10 +35,10 @@ class TTLCache(Generic[KT, VT]): def __init__(self, cache_name: str, timer: Callable[[], float] = time.time): # map from key to _CacheEntry - self._data: Dict[KT, _CacheEntry] = {} + self._data: Dict[KT, _CacheEntry[KT, VT]] = {} # the _CacheEntries, sorted by expiry time - self._expiry_list: SortedList[_CacheEntry] = SortedList() + self._expiry_list: SortedList[_CacheEntry[KT, VT]] = SortedList() self._timer = timer @@ -160,11 +160,11 @@ class TTLCache(Generic[KT, VT]): @attr.s(frozen=True, slots=True, auto_attribs=True) -class _CacheEntry: # Should be Generic[KT, VT]. See python-attrs/attrs#313 +class _CacheEntry(Generic[KT, VT]): """TTLCache entry""" # expiry_time is the first attribute, so that entries are sorted by expiry. expiry_time: float ttl: float - key: Any # should be KT - value: Any # should be VT + key: KT + value: VT -- cgit 1.5.1 From 151e4bbc45dbf7b767b1a6a74ffb4cd7889ccf78 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 11 Sep 2023 13:11:02 +0100 Subject: Filter out down hosts when retrying fetching device lists (#16298) --- changelog.d/16298.misc | 1 + synapse/handlers/device.py | 15 ++++++++++++++- 2 files changed, 15 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16298.misc (limited to 'synapse') diff --git a/changelog.d/16298.misc b/changelog.d/16298.misc new file mode 100644 index 0000000000..75b546d424 --- /dev/null +++ b/changelog.d/16298.misc @@ -0,0 +1 @@ +Don't try refetching device lists for users on remote hosts that are marked as "down". diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index 9356ae998e..9d240ad4ee 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -58,7 +58,10 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.cancellation import cancellable from synapse.util.metrics import measure_func -from synapse.util.retryutils import NotRetryingDestination +from synapse.util.retryutils import ( + NotRetryingDestination, + filter_destinations_by_retry_limiter, +) if TYPE_CHECKING: from synapse.server import HomeServer @@ -1269,8 +1272,18 @@ class DeviceListUpdater(DeviceListWorkerUpdater): self._resync_retry_in_progress = True # Get all of the users that need resyncing. need_resync = await self.store.get_user_ids_requiring_device_list_resync() + + # Filter out users whose host is marked as "down" up front. + hosts = await filter_destinations_by_retry_limiter( + {get_domain_from_id(u) for u in need_resync}, self.clock, self.store + ) + hosts = set(hosts) + # Iterate over the set of user IDs. for user_id in need_resync: + if get_domain_from_id(user_id) not in hosts: + continue + try: # Try to resync the current user's devices list. result = (await self.multi_user_device_resync([user_id], False))[ -- cgit 1.5.1 From 9400dc05357b4272425c7be47ceeced26fa3f28c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 11 Sep 2023 09:49:48 -0400 Subject: Add the List-Unsubscribe header for notification emails. (#16274) Adds both the List-Unsubscribe (RFC2369) and List-Unsubscribe-Post (RFC8058) headers to push notification emails, which together should: * Show an "Unsubscribe" link in the MUA UI when viewing Synapse notification emails. * Enable "one-click" unsubscribe (the user never leaves their MUA, which automatically makes a POST request to the specified endpoint). --- changelog.d/16274.feature | 1 + synapse/handlers/send_email.py | 10 +++++- synapse/push/mailer.py | 33 +++++++++++++++--- synapse/rest/synapse/client/unsubscribe.py | 17 +++++++++ tests/push/test_email.py | 55 ++++++++++++++++++++++++++++++ 5 files changed, 110 insertions(+), 6 deletions(-) create mode 100644 changelog.d/16274.feature (limited to 'synapse') diff --git a/changelog.d/16274.feature b/changelog.d/16274.feature new file mode 100644 index 0000000000..0d9da2bbef --- /dev/null +++ b/changelog.d/16274.feature @@ -0,0 +1 @@ +Enable users to easily unsubscribe to notifications emails via the `List-Unsubscribe` header. diff --git a/synapse/handlers/send_email.py b/synapse/handlers/send_email.py index 05e21509de..4f5fe62fe8 100644 --- a/synapse/handlers/send_email.py +++ b/synapse/handlers/send_email.py @@ -17,7 +17,7 @@ import logging from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText from io import BytesIO -from typing import TYPE_CHECKING, Any, Optional +from typing import TYPE_CHECKING, Any, Dict, Optional from pkg_resources import parse_version @@ -151,6 +151,7 @@ class SendEmailHandler: app_name: str, html: str, text: str, + additional_headers: Optional[Dict[str, str]] = None, ) -> None: """Send a multipart email with the given information. @@ -160,6 +161,7 @@ class SendEmailHandler: app_name: The app name to include in the From header. html: The HTML content to include in the email. text: The plain text content to include in the email. + additional_headers: A map of additional headers to include. """ try: from_string = self._from % {"app": app_name} @@ -181,6 +183,7 @@ class SendEmailHandler: multipart_msg["To"] = email_address multipart_msg["Date"] = email.utils.formatdate() multipart_msg["Message-ID"] = email.utils.make_msgid() + # Discourage automatic responses to Synapse's emails. # Per RFC 3834, automatic responses should not be sent if the "Auto-Submitted" # header is present with any value other than "no". See @@ -194,6 +197,11 @@ class SendEmailHandler: # https://stackoverflow.com/a/25324691/5252017 # https://stackoverflow.com/a/61646381/5252017 multipart_msg["X-Auto-Response-Suppress"] = "All" + + if additional_headers: + for header, value in additional_headers.items(): + multipart_msg[header] = value + multipart_msg.attach(text_part) multipart_msg.attach(html_part) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 79e0627b6a..b6cad18c2d 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -298,20 +298,26 @@ class Mailer: notifs_by_room, state_by_room, notif_events, reason ) + unsubscribe_link = self._make_unsubscribe_link(user_id, app_id, email_address) + template_vars: TemplateVars = { "user_display_name": user_display_name, - "unsubscribe_link": self._make_unsubscribe_link( - user_id, app_id, email_address - ), + "unsubscribe_link": unsubscribe_link, "summary_text": summary_text, "rooms": rooms, "reason": reason, } - await self.send_email(email_address, summary_text, template_vars) + await self.send_email( + email_address, summary_text, template_vars, unsubscribe_link + ) async def send_email( - self, email_address: str, subject: str, extra_template_vars: TemplateVars + self, + email_address: str, + subject: str, + extra_template_vars: TemplateVars, + unsubscribe_link: Optional[str] = None, ) -> None: """Send an email with the given information and template text""" template_vars: TemplateVars = { @@ -330,6 +336,23 @@ class Mailer: app_name=self.app_name, html=html_text, text=plain_text, + # Include the List-Unsubscribe header which some clients render in the UI. + # Per RFC 2369, this can be a URL or mailto URL. See + # https://www.rfc-editor.org/rfc/rfc2369.html#section-3.2 + # + # It is preferred to use email, but Synapse doesn't support incoming email. + # + # Also include the List-Unsubscribe-Post header from RFC 8058. See + # https://www.rfc-editor.org/rfc/rfc8058.html#section-3.1 + # + # Note that many email clients will not render the unsubscribe link + # unless DKIM, etc. is properly setup. + additional_headers={ + "List-Unsubscribe-Post": "List-Unsubscribe=One-Click", + "List-Unsubscribe": f"<{unsubscribe_link}>", + } + if unsubscribe_link + else None, ) async def _get_room_vars( diff --git a/synapse/rest/synapse/client/unsubscribe.py b/synapse/rest/synapse/client/unsubscribe.py index 60321018f9..050fd7bba1 100644 --- a/synapse/rest/synapse/client/unsubscribe.py +++ b/synapse/rest/synapse/client/unsubscribe.py @@ -38,6 +38,10 @@ class UnsubscribeResource(DirectServeHtmlResource): self.macaroon_generator = hs.get_macaroon_generator() async def _async_render_GET(self, request: SynapseRequest) -> None: + """ + Handle a user opening an unsubscribe link in the browser, either via an + HTML/Text email or via the List-Unsubscribe header. + """ token = parse_string(request, "access_token", required=True) app_id = parse_string(request, "app_id", required=True) pushkey = parse_string(request, "pushkey", required=True) @@ -62,3 +66,16 @@ class UnsubscribeResource(DirectServeHtmlResource): 200, UnsubscribeResource.SUCCESS_HTML, ) + + async def _async_render_POST(self, request: SynapseRequest) -> None: + """ + Handle a mail user agent POSTing to the unsubscribe URL via the + List-Unsubscribe & List-Unsubscribe-Post headers. + """ + + # TODO Assert that the body has a single field + + # Assert the body has form encoded key/value pair of + # List-Unsubscribe=One-Click. + + await self._async_render_GET(request) diff --git a/tests/push/test_email.py b/tests/push/test_email.py index 4b5c96aeae..73a430ddc6 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -13,10 +13,12 @@ # limitations under the License. import email.message import os +from http import HTTPStatus from typing import Any, Dict, List, Sequence, Tuple import attr import pkg_resources +from parameterized import parameterized from twisted.internet.defer import Deferred from twisted.test.proto_helpers import MemoryReactor @@ -25,9 +27,11 @@ import synapse.rest.admin from synapse.api.errors import Codes, SynapseError from synapse.push.emailpusher import EmailPusher from synapse.rest.client import login, room +from synapse.rest.synapse.client.unsubscribe import UnsubscribeResource from synapse.server import HomeServer from synapse.util import Clock +from tests.server import FakeSite, make_request from tests.unittest import HomeserverTestCase @@ -175,6 +179,57 @@ class EmailPusherTests(HomeserverTestCase): self._check_for_mail() + @parameterized.expand([(False,), (True,)]) + def test_unsubscribe(self, use_post: bool) -> None: + # Create a simple room with two users + room = self.helper.create_room_as(self.user_id, tok=self.access_token) + self.helper.invite( + room=room, src=self.user_id, tok=self.access_token, targ=self.others[0].id + ) + self.helper.join(room=room, user=self.others[0].id, tok=self.others[0].token) + + # The other user sends a single message. + self.helper.send(room, body="Hi!", tok=self.others[0].token) + + # We should get emailed about that message + args, kwargs = self._check_for_mail() + + # That email should contain an unsubscribe link in the body and header. + msg: bytes = args[5] + + # Multipart: plain text, base 64 encoded; html, base 64 encoded + multipart_msg = email.message_from_bytes(msg) + txt = multipart_msg.get_payload()[0].get_payload(decode=True).decode() + html = multipart_msg.get_payload()[1].get_payload(decode=True).decode() + self.assertIn("/_synapse/client/unsubscribe", txt) + self.assertIn("/_synapse/client/unsubscribe", html) + + # The unsubscribe headers should exist. + assert multipart_msg.get("List-Unsubscribe") is not None + self.assertIsNotNone(multipart_msg.get("List-Unsubscribe-Post")) + + # Open the unsubscribe link. + unsubscribe_link = multipart_msg["List-Unsubscribe"].strip("<>") + unsubscribe_resource = UnsubscribeResource(self.hs) + channel = make_request( + self.reactor, + FakeSite(unsubscribe_resource, self.reactor), + "POST" if use_post else "GET", + unsubscribe_link, + shorthand=False, + ) + self.assertEqual(HTTPStatus.OK, channel.code, channel.result) + + # Ensure the pusher was removed. + pushers = list( + self.get_success( + self.hs.get_datastores().main.get_pushers_by( + {"user_name": self.user_id} + ) + ) + ) + self.assertEqual(pushers, []) + def test_invite_sends_email(self) -> None: # Create a room and invite the user to it room = self.helper.create_room_as(self.others[0].id, tok=self.others[0].token) -- cgit 1.5.1 From 2b35626b6b7aed52a626734a5a85fe77c847251d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 12 Sep 2023 11:08:04 +0100 Subject: Refactor storing of server keys (#16261) --- changelog.d/16261.misc | 1 + synapse/crypto/keyring.py | 35 ++---- synapse/storage/databases/main/keys.py | 219 +++++++++++---------------------- tests/crypto/test_keyring.py | 53 ++------ tests/storage/test_keys.py | 137 --------------------- tests/unittest.py | 26 ++-- 6 files changed, 106 insertions(+), 365 deletions(-) create mode 100644 changelog.d/16261.misc delete mode 100644 tests/storage/test_keys.py (limited to 'synapse') diff --git a/changelog.d/16261.misc b/changelog.d/16261.misc new file mode 100644 index 0000000000..d3ad59ca4a --- /dev/null +++ b/changelog.d/16261.misc @@ -0,0 +1 @@ +Simplify server key storage. diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 260aab3241..fe86f54d80 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -23,12 +23,7 @@ from signedjson.key import ( get_verify_key, is_signing_algorithm_supported, ) -from signedjson.sign import ( - SignatureVerifyException, - encode_canonical_json, - signature_ids, - verify_signed_json, -) +from signedjson.sign import SignatureVerifyException, signature_ids, verify_signed_json from signedjson.types import VerifyKey from unpaddedbase64 import decode_base64 @@ -596,24 +591,12 @@ class BaseV2KeyFetcher(KeyFetcher): verify_key=verify_key, valid_until_ts=key_data["expired_ts"] ) - key_json_bytes = encode_canonical_json(response_json) - - await make_deferred_yieldable( - defer.gatherResults( - [ - run_in_background( - self.store.store_server_keys_json, - server_name=server_name, - key_id=key_id, - from_server=from_server, - ts_now_ms=time_added_ms, - ts_expires_ms=ts_valid_until_ms, - key_json_bytes=key_json_bytes, - ) - for key_id in verify_keys - ], - consumeErrors=True, - ).addErrback(unwrapFirstError) + await self.store.store_server_keys_response( + server_name=server_name, + from_server=from_server, + ts_added_ms=time_added_ms, + verify_keys=verify_keys, + response_json=response_json, ) return verify_keys @@ -775,10 +758,6 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher): keys.setdefault(server_name, {}).update(processed_response) - await self.store.store_server_signature_keys( - perspective_name, time_now_ms, added_keys - ) - return keys def _validate_perspectives_response( diff --git a/synapse/storage/databases/main/keys.py b/synapse/storage/databases/main/keys.py index 57aa4921e1..41563371dc 100644 --- a/synapse/storage/databases/main/keys.py +++ b/synapse/storage/databases/main/keys.py @@ -16,14 +16,17 @@ import itertools import json import logging -from typing import Dict, Iterable, Mapping, Optional, Tuple +from typing import Dict, Iterable, Optional, Tuple +from canonicaljson import encode_canonical_json from signedjson.key import decode_verify_key_bytes from unpaddedbase64 import decode_base64 +from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore from synapse.storage.keys import FetchKeyResult, FetchKeyResultForRemote from synapse.storage.types import Cursor +from synapse.types import JsonDict from synapse.util.caches.descriptors import cached, cachedList from synapse.util.iterutils import batch_iter @@ -36,162 +39,84 @@ db_binary_type = memoryview class KeyStore(CacheInvalidationWorkerStore): """Persistence for signature verification keys""" - @cached() - def _get_server_signature_key( - self, server_name_and_key_id: Tuple[str, str] - ) -> FetchKeyResult: - raise NotImplementedError() - - @cachedList( - cached_method_name="_get_server_signature_key", - list_name="server_name_and_key_ids", - ) - async def get_server_signature_keys( - self, server_name_and_key_ids: Iterable[Tuple[str, str]] - ) -> Dict[Tuple[str, str], FetchKeyResult]: - """ - Args: - server_name_and_key_ids: - iterable of (server_name, key-id) tuples to fetch keys for - - Returns: - A map from (server_name, key_id) -> FetchKeyResult, or None if the - key is unknown - """ - keys = {} - - def _get_keys(txn: Cursor, batch: Tuple[Tuple[str, str], ...]) -> None: - """Processes a batch of keys to fetch, and adds the result to `keys`.""" - - # batch_iter always returns tuples so it's safe to do len(batch) - sql = """ - SELECT server_name, key_id, verify_key, ts_valid_until_ms - FROM server_signature_keys WHERE 1=0 - """ + " OR (server_name=? AND key_id=?)" * len( - batch - ) - - txn.execute(sql, tuple(itertools.chain.from_iterable(batch))) - - for row in txn: - server_name, key_id, key_bytes, ts_valid_until_ms = row - - if ts_valid_until_ms is None: - # Old keys may be stored with a ts_valid_until_ms of null, - # in which case we treat this as if it was set to `0`, i.e. - # it won't match key requests that define a minimum - # `ts_valid_until_ms`. - ts_valid_until_ms = 0 - - keys[(server_name, key_id)] = FetchKeyResult( - verify_key=decode_verify_key_bytes(key_id, bytes(key_bytes)), - valid_until_ts=ts_valid_until_ms, - ) - - def _txn(txn: Cursor) -> Dict[Tuple[str, str], FetchKeyResult]: - for batch in batch_iter(server_name_and_key_ids, 50): - _get_keys(txn, batch) - return keys - - return await self.db_pool.runInteraction("get_server_signature_keys", _txn) - - async def store_server_signature_keys( + async def store_server_keys_response( self, + server_name: str, from_server: str, ts_added_ms: int, - verify_keys: Mapping[Tuple[str, str], FetchKeyResult], + verify_keys: Dict[str, FetchKeyResult], + response_json: JsonDict, ) -> None: - """Stores NACL verification keys for remote servers. + """Stores the keys for the given server that we got from `from_server`. + Args: - from_server: Where the verification keys were looked up - ts_added_ms: The time to record that the key was added - verify_keys: - keys to be stored. Each entry is a triplet of - (server_name, key_id, key). + server_name: The owner of the keys + from_server: Which server we got the keys from + ts_added_ms: When we're adding the keys + verify_keys: The decoded keys + response_json: The full *signed* response JSON that contains the keys. """ - key_values = [] - value_values = [] - invalidations = [] - for (server_name, key_id), fetch_result in verify_keys.items(): - key_values.append((server_name, key_id)) - value_values.append( - ( - from_server, - ts_added_ms, - fetch_result.valid_until_ts, - db_binary_type(fetch_result.verify_key.encode()), - ) - ) - # invalidate takes a tuple corresponding to the params of - # _get_server_signature_key. _get_server_signature_key only takes one - # param, which is itself the 2-tuple (server_name, key_id). - invalidations.append((server_name, key_id)) - await self.db_pool.simple_upsert_many( - table="server_signature_keys", - key_names=("server_name", "key_id"), - key_values=key_values, - value_names=( - "from_server", - "ts_added_ms", - "ts_valid_until_ms", - "verify_key", - ), - value_values=value_values, - desc="store_server_signature_keys", - ) + key_json_bytes = encode_canonical_json(response_json) + + def store_server_keys_response_txn(txn: LoggingTransaction) -> None: + self.db_pool.simple_upsert_many_txn( + txn, + table="server_signature_keys", + key_names=("server_name", "key_id"), + key_values=[(server_name, key_id) for key_id in verify_keys], + value_names=( + "from_server", + "ts_added_ms", + "ts_valid_until_ms", + "verify_key", + ), + value_values=[ + ( + from_server, + ts_added_ms, + fetch_result.valid_until_ts, + db_binary_type(fetch_result.verify_key.encode()), + ) + for fetch_result in verify_keys.values() + ], + ) - invalidate = self._get_server_signature_key.invalidate - for i in invalidations: - invalidate((i,)) + self.db_pool.simple_upsert_many_txn( + txn, + table="server_keys_json", + key_names=("server_name", "key_id", "from_server"), + key_values=[ + (server_name, key_id, from_server) for key_id in verify_keys + ], + value_names=( + "ts_added_ms", + "ts_valid_until_ms", + "key_json", + ), + value_values=[ + ( + ts_added_ms, + fetch_result.valid_until_ts, + db_binary_type(key_json_bytes), + ) + for fetch_result in verify_keys.values() + ], + ) - async def store_server_keys_json( - self, - server_name: str, - key_id: str, - from_server: str, - ts_now_ms: int, - ts_expires_ms: int, - key_json_bytes: bytes, - ) -> None: - """Stores the JSON bytes for a set of keys from a server - The JSON should be signed by the originating server, the intermediate - server, and by this server. Updates the value for the - (server_name, key_id, from_server) triplet if one already existed. - Args: - server_name: The name of the server. - key_id: The identifier of the key this JSON is for. - from_server: The server this JSON was fetched from. - ts_now_ms: The time now in milliseconds. - ts_valid_until_ms: The time when this json stops being valid. - key_json_bytes: The encoded JSON. - """ - await self.db_pool.simple_upsert( - table="server_keys_json", - keyvalues={ - "server_name": server_name, - "key_id": key_id, - "from_server": from_server, - }, - values={ - "server_name": server_name, - "key_id": key_id, - "from_server": from_server, - "ts_added_ms": ts_now_ms, - "ts_valid_until_ms": ts_expires_ms, - "key_json": db_binary_type(key_json_bytes), - }, - desc="store_server_keys_json", - ) + # invalidate takes a tuple corresponding to the params of + # _get_server_keys_json. _get_server_keys_json only takes one + # param, which is itself the 2-tuple (server_name, key_id). + for key_id in verify_keys: + self._invalidate_cache_and_stream( + txn, self._get_server_keys_json, ((server_name, key_id),) + ) + self._invalidate_cache_and_stream( + txn, self.get_server_key_json_for_remote, (server_name, key_id) + ) - # invalidate takes a tuple corresponding to the params of - # _get_server_keys_json. _get_server_keys_json only takes one - # param, which is itself the 2-tuple (server_name, key_id). - await self.invalidate_cache_and_stream( - "_get_server_keys_json", ((server_name, key_id),) - ) - await self.invalidate_cache_and_stream( - "get_server_key_json_for_remote", (server_name, key_id) + await self.db_pool.runInteraction( + "store_server_keys_response", store_server_keys_response_txn ) @cached() diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index f93ba5d4cf..c5700771b0 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -13,7 +13,7 @@ # limitations under the License. import time from typing import Any, Dict, List, Optional, cast -from unittest.mock import AsyncMock, Mock +from unittest.mock import Mock import attr import canonicaljson @@ -189,23 +189,24 @@ class KeyringTestCase(unittest.HomeserverTestCase): kr = keyring.Keyring(self.hs) key1 = signedjson.key.generate_signing_key("1") - r = self.hs.get_datastores().main.store_server_keys_json( + r = self.hs.get_datastores().main.store_server_keys_response( "server9", - get_key_id(key1), from_server="test", - ts_now_ms=int(time.time() * 1000), - ts_expires_ms=1000, + ts_added_ms=int(time.time() * 1000), + verify_keys={ + get_key_id(key1): FetchKeyResult( + verify_key=get_verify_key(key1), valid_until_ts=1000 + ) + }, # The entire response gets signed & stored, just include the bits we # care about. - key_json_bytes=canonicaljson.encode_canonical_json( - { - "verify_keys": { - get_key_id(key1): { - "key": encode_verify_key_base64(get_verify_key(key1)) - } + response_json={ + "verify_keys": { + get_key_id(key1): { + "key": encode_verify_key_base64(get_verify_key(key1)) } } - ), + }, ) self.get_success(r) @@ -285,34 +286,6 @@ class KeyringTestCase(unittest.HomeserverTestCase): d = kr.verify_json_for_server(self.hs.hostname, json1, 0) self.get_success(d) - def test_verify_json_for_server_with_null_valid_until_ms(self) -> None: - """Tests that we correctly handle key requests for keys we've stored - with a null `ts_valid_until_ms` - """ - mock_fetcher = Mock() - mock_fetcher.get_keys = AsyncMock(return_value={}) - - key1 = signedjson.key.generate_signing_key("1") - r = self.hs.get_datastores().main.store_server_signature_keys( - "server9", - int(time.time() * 1000), - # None is not a valid value in FetchKeyResult, but we're abusing this - # API to insert null values into the database. The nulls get converted - # to 0 when fetched in KeyStore.get_server_signature_keys. - {("server9", get_key_id(key1)): FetchKeyResult(get_verify_key(key1), None)}, # type: ignore[arg-type] - ) - self.get_success(r) - - json1: JsonDict = {} - signedjson.sign.sign_json(json1, "server9", key1) - - # should succeed on a signed object with a 0 minimum_valid_until_ms - d = self.hs.get_datastores().main.get_server_signature_keys( - [("server9", get_key_id(key1))] - ) - result = self.get_success(d) - self.assertEqual(result[("server9", get_key_id(key1))].valid_until_ts, 0) - def test_verify_json_dedupes_key_requests(self) -> None: """Two requests for the same key should be deduped.""" key1 = signedjson.key.generate_signing_key("1") diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py deleted file mode 100644 index 5d7c13e6d0..0000000000 --- a/tests/storage/test_keys.py +++ /dev/null @@ -1,137 +0,0 @@ -# Copyright 2017 Vector Creations Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import signedjson.key -import signedjson.types -import unpaddedbase64 - -from synapse.storage.keys import FetchKeyResult - -import tests.unittest - - -def decode_verify_key_base64( - key_id: str, key_base64: str -) -> signedjson.types.VerifyKey: - key_bytes = unpaddedbase64.decode_base64(key_base64) - return signedjson.key.decode_verify_key_bytes(key_id, key_bytes) - - -KEY_1 = decode_verify_key_base64( - "ed25519:key1", "fP5l4JzpZPq/zdbBg5xx6lQGAAOM9/3w94cqiJ5jPrw" -) -KEY_2 = decode_verify_key_base64( - "ed25519:key2", "Noi6WqcDj0QmPxCNQqgezwTlBKrfqehY1u2FyWP9uYw" -) - - -class KeyStoreTestCase(tests.unittest.HomeserverTestCase): - def test_get_server_signature_keys(self) -> None: - store = self.hs.get_datastores().main - - key_id_1 = "ed25519:key1" - key_id_2 = "ed25519:KEY_ID_2" - self.get_success( - store.store_server_signature_keys( - "from_server", - 10, - { - ("server1", key_id_1): FetchKeyResult(KEY_1, 100), - ("server1", key_id_2): FetchKeyResult(KEY_2, 200), - }, - ) - ) - - res = self.get_success( - store.get_server_signature_keys( - [ - ("server1", key_id_1), - ("server1", key_id_2), - ("server1", "ed25519:key3"), - ] - ) - ) - - self.assertEqual(len(res.keys()), 3) - res1 = res[("server1", key_id_1)] - self.assertEqual(res1.verify_key, KEY_1) - self.assertEqual(res1.verify_key.version, "key1") - self.assertEqual(res1.valid_until_ts, 100) - - res2 = res[("server1", key_id_2)] - self.assertEqual(res2.verify_key, KEY_2) - # version comes from the ID it was stored with - self.assertEqual(res2.verify_key.version, "KEY_ID_2") - self.assertEqual(res2.valid_until_ts, 200) - - # non-existent result gives None - self.assertIsNone(res[("server1", "ed25519:key3")]) - - def test_cache(self) -> None: - """Check that updates correctly invalidate the cache.""" - - store = self.hs.get_datastores().main - - key_id_1 = "ed25519:key1" - key_id_2 = "ed25519:key2" - - self.get_success( - store.store_server_signature_keys( - "from_server", - 0, - { - ("srv1", key_id_1): FetchKeyResult(KEY_1, 100), - ("srv1", key_id_2): FetchKeyResult(KEY_2, 200), - }, - ) - ) - - res = self.get_success( - store.get_server_signature_keys([("srv1", key_id_1), ("srv1", key_id_2)]) - ) - self.assertEqual(len(res.keys()), 2) - - res1 = res[("srv1", key_id_1)] - self.assertEqual(res1.verify_key, KEY_1) - self.assertEqual(res1.valid_until_ts, 100) - - res2 = res[("srv1", key_id_2)] - self.assertEqual(res2.verify_key, KEY_2) - self.assertEqual(res2.valid_until_ts, 200) - - # we should be able to look up the same thing again without a db hit - res = self.get_success(store.get_server_signature_keys([("srv1", key_id_1)])) - self.assertEqual(len(res.keys()), 1) - self.assertEqual(res[("srv1", key_id_1)].verify_key, KEY_1) - - new_key_2 = signedjson.key.get_verify_key( - signedjson.key.generate_signing_key("key2") - ) - d = store.store_server_signature_keys( - "from_server", 10, {("srv1", key_id_2): FetchKeyResult(new_key_2, 300)} - ) - self.get_success(d) - - res = self.get_success( - store.get_server_signature_keys([("srv1", key_id_1), ("srv1", key_id_2)]) - ) - self.assertEqual(len(res.keys()), 2) - - res1 = res[("srv1", key_id_1)] - self.assertEqual(res1.verify_key, KEY_1) - self.assertEqual(res1.valid_until_ts, 100) - - res2 = res[("srv1", key_id_2)] - self.assertEqual(res2.verify_key, new_key_2) - self.assertEqual(res2.valid_until_ts, 300) diff --git a/tests/unittest.py b/tests/unittest.py index 5d3640d8ac..dbaff361b4 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -70,6 +70,7 @@ from synapse.logging.context import ( ) from synapse.rest import RegisterServletsFunc from synapse.server import HomeServer +from synapse.storage.keys import FetchKeyResult from synapse.types import JsonDict, Requester, UserID, create_requester from synapse.util import Clock from synapse.util.httpresourcetree import create_resource_tree @@ -858,23 +859,22 @@ class FederatingHomeserverTestCase(HomeserverTestCase): verify_key_id = "%s:%s" % (verify_key.alg, verify_key.version) self.get_success( - hs.get_datastores().main.store_server_keys_json( + hs.get_datastores().main.store_server_keys_response( self.OTHER_SERVER_NAME, - verify_key_id, from_server=self.OTHER_SERVER_NAME, - ts_now_ms=clock.time_msec(), - ts_expires_ms=clock.time_msec() + 10000, - key_json_bytes=canonicaljson.encode_canonical_json( - { - "verify_keys": { - verify_key_id: { - "key": signedjson.key.encode_verify_key_base64( - verify_key - ) - } + ts_added_ms=clock.time_msec(), + verify_keys={ + verify_key_id: FetchKeyResult( + verify_key=verify_key, valid_until_ts=clock.time_msec() + 10000 + ), + }, + response_json={ + "verify_keys": { + verify_key_id: { + "key": signedjson.key.encode_verify_key_base64(verify_key) } } - ), + }, ) ) -- cgit 1.5.1 From 16ef6f1e3c8d0cfe959e4209fd04528658383ab4 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 12 Sep 2023 07:12:31 -0400 Subject: Stop purging tables which are slated for removal. (#16273) --- changelog.d/16273.misc | 1 + synapse/storage/databases/main/purge_events.py | 4 ---- synapse/storage/schema/__init__.py | 6 +++++- 3 files changed, 6 insertions(+), 5 deletions(-) create mode 100644 changelog.d/16273.misc (limited to 'synapse') diff --git a/changelog.d/16273.misc b/changelog.d/16273.misc new file mode 100644 index 0000000000..19882f6754 --- /dev/null +++ b/changelog.d/16273.misc @@ -0,0 +1 @@ +Stop purging from tables slated for removal. diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index b52f48cf04..dea0e0458c 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -450,10 +450,6 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): "e2e_room_keys", "event_push_summary", "pusher_throttle", - "insertion_events", - "insertion_event_extremities", - "insertion_event_edges", - "batch_events", "room_account_data", "room_tags", # "rooms" happens last, to keep the foreign keys in the other tables diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index 422f11f59e..5b50bd66bc 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 81 # remember to update the list below when updating +SCHEMA_VERSION = 82 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -117,6 +117,10 @@ Changes in SCHEMA_VERSION = 80 Changes in SCHEMA_VERSION = 81 - The event_txn_id is no longer written to for new events. + +Changes in SCHEMA_VERSION = 82 + - The insertion_events, insertion_event_extremities, insertion_event_edges, and + batch_events tables are no longer purged in preparation for their removal. """ -- cgit 1.5.1 From ba48c563c98966400488c8972d2e9964f9510399 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 12 Sep 2023 07:16:09 -0400 Subject: Bump mypy from 1.4.1 to 1.5.1. (#16300) --- changelog.d/16300.misc | 1 + mypy.ini | 1 - poetry.lock | 68 ++++++++++++++++++------------------------ synapse/logging/opentracing.py | 10 ++----- 4 files changed, 32 insertions(+), 48 deletions(-) create mode 100644 changelog.d/16300.misc (limited to 'synapse') diff --git a/changelog.d/16300.misc b/changelog.d/16300.misc new file mode 100644 index 0000000000..8cc2e52369 --- /dev/null +++ b/changelog.d/16300.misc @@ -0,0 +1 @@ +Bump mypy from 1.4.1 to 1.5.1. diff --git a/mypy.ini b/mypy.ini index fb5f44c939..88aea301b9 100644 --- a/mypy.ini +++ b/mypy.ini @@ -23,7 +23,6 @@ warn_unused_ignores = True # warn_return_any = True # no_implicit_reexport = True strict_equality = True -strict_concatenate = True # Run mypy type checking with the minimum supported Python version to catch new usage # that isn't backwards-compatible (types, overloads, etc). diff --git a/poetry.lock b/poetry.lock index e4cea28282..c01312579e 100644 --- a/poetry.lock +++ b/poetry.lock @@ -1445,37 +1445,38 @@ files = [ [[package]] name = "mypy" -version = "1.4.1" +version = "1.5.1" description = "Optional static typing for Python" optional = false -python-versions = ">=3.7" +python-versions = ">=3.8" files = [ - {file = "mypy-1.4.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:566e72b0cd6598503e48ea610e0052d1b8168e60a46e0bfd34b3acf2d57f96a8"}, - {file = "mypy-1.4.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:ca637024ca67ab24a7fd6f65d280572c3794665eaf5edcc7e90a866544076878"}, - {file = "mypy-1.4.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:0dde1d180cd84f0624c5dcaaa89c89775550a675aff96b5848de78fb11adabcd"}, - {file = "mypy-1.4.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:8c4d8e89aa7de683e2056a581ce63c46a0c41e31bd2b6d34144e2c80f5ea53dc"}, - {file = "mypy-1.4.1-cp310-cp310-win_amd64.whl", hash = "sha256:bfdca17c36ae01a21274a3c387a63aa1aafe72bff976522886869ef131b937f1"}, - {file = "mypy-1.4.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:7549fbf655e5825d787bbc9ecf6028731973f78088fbca3a1f4145c39ef09462"}, - {file = "mypy-1.4.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:98324ec3ecf12296e6422939e54763faedbfcc502ea4a4c38502082711867258"}, - {file = "mypy-1.4.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:141dedfdbfe8a04142881ff30ce6e6653c9685b354876b12e4fe6c78598b45e2"}, - {file = "mypy-1.4.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:8207b7105829eca6f3d774f64a904190bb2231de91b8b186d21ffd98005f14a7"}, - {file = "mypy-1.4.1-cp311-cp311-win_amd64.whl", hash = "sha256:16f0db5b641ba159eff72cff08edc3875f2b62b2fa2bc24f68c1e7a4e8232d01"}, - {file = "mypy-1.4.1-cp37-cp37m-macosx_10_9_x86_64.whl", hash = "sha256:470c969bb3f9a9efcedbadcd19a74ffb34a25f8e6b0e02dae7c0e71f8372f97b"}, - {file = "mypy-1.4.1-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:e5952d2d18b79f7dc25e62e014fe5a23eb1a3d2bc66318df8988a01b1a037c5b"}, - {file = "mypy-1.4.1-cp37-cp37m-musllinux_1_1_x86_64.whl", hash = "sha256:190b6bab0302cec4e9e6767d3eb66085aef2a1cc98fe04936d8a42ed2ba77bb7"}, - {file = "mypy-1.4.1-cp37-cp37m-win_amd64.whl", hash = "sha256:9d40652cc4fe33871ad3338581dca3297ff5f2213d0df345bcfbde5162abf0c9"}, - {file = "mypy-1.4.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:01fd2e9f85622d981fd9063bfaef1aed6e336eaacca00892cd2d82801ab7c042"}, - {file = "mypy-1.4.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:2460a58faeea905aeb1b9b36f5065f2dc9a9c6e4c992a6499a2360c6c74ceca3"}, - {file = "mypy-1.4.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a2746d69a8196698146a3dbe29104f9eb6a2a4d8a27878d92169a6c0b74435b6"}, - {file = "mypy-1.4.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:ae704dcfaa180ff7c4cfbad23e74321a2b774f92ca77fd94ce1049175a21c97f"}, - {file = "mypy-1.4.1-cp38-cp38-win_amd64.whl", hash = "sha256:43d24f6437925ce50139a310a64b2ab048cb2d3694c84c71c3f2a1626d8101dc"}, - {file = "mypy-1.4.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:c482e1246726616088532b5e964e39765b6d1520791348e6c9dc3af25b233828"}, - {file = "mypy-1.4.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:43b592511672017f5b1a483527fd2684347fdffc041c9ef53428c8dc530f79a3"}, - {file = "mypy-1.4.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:34a9239d5b3502c17f07fd7c0b2ae6b7dd7d7f6af35fbb5072c6208e76295816"}, - {file = "mypy-1.4.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:5703097c4936bbb9e9bce41478c8d08edd2865e177dc4c52be759f81ee4dd26c"}, - {file = "mypy-1.4.1-cp39-cp39-win_amd64.whl", hash = "sha256:e02d700ec8d9b1859790c0475df4e4092c7bf3272a4fd2c9f33d87fac4427b8f"}, - {file = "mypy-1.4.1-py3-none-any.whl", hash = "sha256:45d32cec14e7b97af848bddd97d85ea4f0db4d5a149ed9676caa4eb2f7402bb4"}, - {file = "mypy-1.4.1.tar.gz", hash = "sha256:9bbcd9ab8ea1f2e1c8031c21445b511442cc45c89951e49bbf852cbb70755b1b"}, + {file = "mypy-1.5.1-cp310-cp310-macosx_10_9_x86_64.whl", hash = "sha256:f33592ddf9655a4894aef22d134de7393e95fcbdc2d15c1ab65828eee5c66c70"}, + {file = "mypy-1.5.1-cp310-cp310-macosx_11_0_arm64.whl", hash = "sha256:258b22210a4a258ccd077426c7a181d789d1121aca6db73a83f79372f5569ae0"}, + {file = "mypy-1.5.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:a9ec1f695f0c25986e6f7f8778e5ce61659063268836a38c951200c57479cc12"}, + {file = "mypy-1.5.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:abed92d9c8f08643c7d831300b739562b0a6c9fcb028d211134fc9ab20ccad5d"}, + {file = "mypy-1.5.1-cp310-cp310-win_amd64.whl", hash = "sha256:a156e6390944c265eb56afa67c74c0636f10283429171018446b732f1a05af25"}, + {file = "mypy-1.5.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6ac9c21bfe7bc9f7f1b6fae441746e6a106e48fc9de530dea29e8cd37a2c0cc4"}, + {file = "mypy-1.5.1-cp311-cp311-macosx_11_0_arm64.whl", hash = "sha256:51cb1323064b1099e177098cb939eab2da42fea5d818d40113957ec954fc85f4"}, + {file = "mypy-1.5.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:596fae69f2bfcb7305808c75c00f81fe2829b6236eadda536f00610ac5ec2243"}, + {file = "mypy-1.5.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:32cb59609b0534f0bd67faebb6e022fe534bdb0e2ecab4290d683d248be1b275"}, + {file = "mypy-1.5.1-cp311-cp311-win_amd64.whl", hash = "sha256:159aa9acb16086b79bbb0016145034a1a05360626046a929f84579ce1666b315"}, + {file = "mypy-1.5.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:f6b0e77db9ff4fda74de7df13f30016a0a663928d669c9f2c057048ba44f09bb"}, + {file = "mypy-1.5.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:26f71b535dfc158a71264e6dc805a9f8d2e60b67215ca0bfa26e2e1aa4d4d373"}, + {file = "mypy-1.5.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:2fc3a600f749b1008cc75e02b6fb3d4db8dbcca2d733030fe7a3b3502902f161"}, + {file = "mypy-1.5.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:26fb32e4d4afa205b24bf645eddfbb36a1e17e995c5c99d6d00edb24b693406a"}, + {file = "mypy-1.5.1-cp312-cp312-win_amd64.whl", hash = "sha256:82cb6193de9bbb3844bab4c7cf80e6227d5225cc7625b068a06d005d861ad5f1"}, + {file = "mypy-1.5.1-cp38-cp38-macosx_10_9_x86_64.whl", hash = "sha256:4a465ea2ca12804d5b34bb056be3a29dc47aea5973b892d0417c6a10a40b2d65"}, + {file = "mypy-1.5.1-cp38-cp38-macosx_11_0_arm64.whl", hash = "sha256:9fece120dbb041771a63eb95e4896791386fe287fefb2837258925b8326d6160"}, + {file = "mypy-1.5.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d28ddc3e3dfeab553e743e532fb95b4e6afad51d4706dd22f28e1e5e664828d2"}, + {file = "mypy-1.5.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:57b10c56016adce71fba6bc6e9fd45d8083f74361f629390c556738565af8eeb"}, + {file = "mypy-1.5.1-cp38-cp38-win_amd64.whl", hash = "sha256:ff0cedc84184115202475bbb46dd99f8dcb87fe24d5d0ddfc0fe6b8575c88d2f"}, + {file = "mypy-1.5.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:8f772942d372c8cbac575be99f9cc9d9fb3bd95c8bc2de6c01411e2c84ebca8a"}, + {file = "mypy-1.5.1-cp39-cp39-macosx_11_0_arm64.whl", hash = "sha256:5d627124700b92b6bbaa99f27cbe615c8ea7b3402960f6372ea7d65faf376c14"}, + {file = "mypy-1.5.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:361da43c4f5a96173220eb53340ace68cda81845cd88218f8862dfb0adc8cddb"}, + {file = "mypy-1.5.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:330857f9507c24de5c5724235e66858f8364a0693894342485e543f5b07c8693"}, + {file = "mypy-1.5.1-cp39-cp39-win_amd64.whl", hash = "sha256:c543214ffdd422623e9fedd0869166c2f16affe4ba37463975043ef7d2ea8770"}, + {file = "mypy-1.5.1-py3-none-any.whl", hash = "sha256:f757063a83970d67c444f6e01d9550a7402322af3557ce7630d3c957386fa8f5"}, + {file = "mypy-1.5.1.tar.gz", hash = "sha256:b031b9601f1060bf1281feab89697324726ba0c0bae9d7cd7ab4b690940f0b92"}, ] [package.dependencies] @@ -1486,7 +1487,6 @@ typing-extensions = ">=4.1.0" [package.extras] dmypy = ["psutil (>=4.0)"] install-types = ["pip"] -python2 = ["typed-ast (>=1.4.0,<2)"] reports = ["lxml"] [[package]] @@ -2077,7 +2077,6 @@ files = [ {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:69b023b2b4daa7548bcfbd4aa3da05b3a74b772db9e23b982788168117739938"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:81e0b275a9ecc9c0c0c07b4b90ba548307583c125f54d5b6946cfee6360c733d"}, {file = "PyYAML-6.0.1-cp310-cp310-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:ba336e390cd8e4d1739f42dfe9bb83a3cc2e80f567d8805e11b46f4a943f5515"}, - {file = "PyYAML-6.0.1-cp310-cp310-musllinux_1_1_x86_64.whl", hash = "sha256:326c013efe8048858a6d312ddd31d56e468118ad4cdeda36c719bf5bb6192290"}, {file = "PyYAML-6.0.1-cp310-cp310-win32.whl", hash = "sha256:bd4af7373a854424dabd882decdc5579653d7868b8fb26dc7d0e99f823aa5924"}, {file = "PyYAML-6.0.1-cp310-cp310-win_amd64.whl", hash = "sha256:fd1592b3fdf65fff2ad0004b5e363300ef59ced41c2e6b3a99d4089fa8c5435d"}, {file = "PyYAML-6.0.1-cp311-cp311-macosx_10_9_x86_64.whl", hash = "sha256:6965a7bc3cf88e5a1c3bd2e0b5c22f8d677dc88a455344035f03399034eb3007"}, @@ -2085,15 +2084,8 @@ files = [ {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:42f8152b8dbc4fe7d96729ec2b99c7097d656dc1213a3229ca5383f973a5ed6d"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:062582fca9fabdd2c8b54a3ef1c978d786e0f6b3a1510e0ac93ef59e0ddae2bc"}, {file = "PyYAML-6.0.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:d2b04aac4d386b172d5b9692e2d2da8de7bfb6c387fa4f801fbf6fb2e6ba4673"}, - {file = "PyYAML-6.0.1-cp311-cp311-musllinux_1_1_x86_64.whl", hash = "sha256:e7d73685e87afe9f3b36c799222440d6cf362062f78be1013661b00c5c6f678b"}, {file = "PyYAML-6.0.1-cp311-cp311-win32.whl", hash = "sha256:1635fd110e8d85d55237ab316b5b011de701ea0f29d07611174a1b42f1444741"}, {file = "PyYAML-6.0.1-cp311-cp311-win_amd64.whl", hash = "sha256:bf07ee2fef7014951eeb99f56f39c9bb4af143d8aa3c21b1677805985307da34"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_10_9_x86_64.whl", hash = "sha256:855fb52b0dc35af121542a76b9a84f8d1cd886ea97c84703eaa6d88e37a2ad28"}, - {file = "PyYAML-6.0.1-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:40df9b996c2b73138957fe23a16a4f0ba614f4c0efce1e9406a184b6d07fa3a9"}, - {file = "PyYAML-6.0.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:6c22bec3fbe2524cde73d7ada88f6566758a8f7227bfbf93a408a9d86bcc12a0"}, - {file = "PyYAML-6.0.1-cp312-cp312-musllinux_1_1_x86_64.whl", hash = "sha256:8d4e9c88387b0f5c7d5f281e55304de64cf7f9c0021a3525bd3b1c542da3b0e4"}, - {file = "PyYAML-6.0.1-cp312-cp312-win32.whl", hash = "sha256:d483d2cdf104e7c9fa60c544d92981f12ad66a457afae824d146093b8c294c54"}, - {file = "PyYAML-6.0.1-cp312-cp312-win_amd64.whl", hash = "sha256:0d3304d8c0adc42be59c5f8a4d9e3d7379e6955ad754aa9d6ab7a398b59dd1df"}, {file = "PyYAML-6.0.1-cp36-cp36m-macosx_10_9_x86_64.whl", hash = "sha256:50550eb667afee136e9a77d6dc71ae76a44df8b3e51e41b77f6de2932bfe0f47"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:1fe35611261b29bd1de0070f0b2f47cb6ff71fa6595c077e42bd0c419fa27b98"}, {file = "PyYAML-6.0.1-cp36-cp36m-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:704219a11b772aea0d8ecd7058d0082713c3562b4e271b849ad7dc4a5c90c13c"}, @@ -2110,7 +2102,6 @@ files = [ {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:a0cd17c15d3bb3fa06978b4e8958dcdc6e0174ccea823003a106c7d4d7899ac5"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:28c119d996beec18c05208a8bd78cbe4007878c6dd15091efb73a30e90539696"}, {file = "PyYAML-6.0.1-cp38-cp38-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:7e07cbde391ba96ab58e532ff4803f79c4129397514e1413a7dc761ccd755735"}, - {file = "PyYAML-6.0.1-cp38-cp38-musllinux_1_1_x86_64.whl", hash = "sha256:49a183be227561de579b4a36efbb21b3eab9651dd81b1858589f796549873dd6"}, {file = "PyYAML-6.0.1-cp38-cp38-win32.whl", hash = "sha256:184c5108a2aca3c5b3d3bf9395d50893a7ab82a38004c8f61c258d4428e80206"}, {file = "PyYAML-6.0.1-cp38-cp38-win_amd64.whl", hash = "sha256:1e2722cc9fbb45d9b87631ac70924c11d3a401b2d7f410cc0e3bbf249f2dca62"}, {file = "PyYAML-6.0.1-cp39-cp39-macosx_10_9_x86_64.whl", hash = "sha256:9eb6caa9a297fc2c2fb8862bc5370d0303ddba53ba97e71f08023b6cd73d16a8"}, @@ -2118,7 +2109,6 @@ files = [ {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_aarch64.manylinux2014_aarch64.whl", hash = "sha256:5773183b6446b2c99bb77e77595dd486303b4faab2b086e7b17bc6bef28865f6"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_s390x.manylinux2014_s390x.whl", hash = "sha256:b786eecbdf8499b9ca1d697215862083bd6d2a99965554781d0d8d1ad31e13a0"}, {file = "PyYAML-6.0.1-cp39-cp39-manylinux_2_17_x86_64.manylinux2014_x86_64.whl", hash = "sha256:bc1bf2925a1ecd43da378f4db9e4f799775d6367bdb94671027b73b393a7c42c"}, - {file = "PyYAML-6.0.1-cp39-cp39-musllinux_1_1_x86_64.whl", hash = "sha256:04ac92ad1925b2cff1db0cfebffb6ffc43457495c9b3c39d3fcae417d7125dc5"}, {file = "PyYAML-6.0.1-cp39-cp39-win32.whl", hash = "sha256:faca3bdcf85b2fc05d06ff3fbc1f83e1391b3e724afa3feba7d13eeab355484c"}, {file = "PyYAML-6.0.1-cp39-cp39-win_amd64.whl", hash = "sha256:510c9deebc5c0225e8c96813043e62b680ba2f9c50a08d3724c7f28a747d1486"}, {file = "PyYAML-6.0.1.tar.gz", hash = "sha256:bfdf460b1736c775f2ba9f6a92bca30bc2095067b8a9d77876d1fad6cc3b4a43"}, diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py index 5c3045e197..4454fe29a5 100644 --- a/synapse/logging/opentracing.py +++ b/synapse/logging/opentracing.py @@ -991,11 +991,7 @@ def trace_with_opname( if not opentracing: return func - # type-ignore: mypy seems to be confused by the ParamSpecs here. - # I think the problem is https://github.com/python/mypy/issues/12909 - return _custom_sync_async_decorator( - func, _wrapping_logic # type: ignore[arg-type] - ) + return _custom_sync_async_decorator(func, _wrapping_logic) return _decorator @@ -1040,9 +1036,7 @@ def tag_args(func: Callable[P, R]) -> Callable[P, R]: set_tag(SynapseTags.FUNC_KWARGS, str(kwargs)) yield - # type-ignore: mypy seems to be confused by the ParamSpecs here. - # I think the problem is https://github.com/python/mypy/issues/12909 - return _custom_sync_async_decorator(func, _wrapping_logic) # type: ignore[arg-type] + return _custom_sync_async_decorator(func, _wrapping_logic) @contextlib.contextmanager -- cgit 1.5.1 From ab13fb08bf7c20a992ec2796c72d0fbb2a06545c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 13 Sep 2023 10:51:50 +0100 Subject: Improve logging of replication (#16309) --- changelog.d/16309.misc | 1 + synapse/replication/tcp/handler.py | 2 +- synapse/replication/tcp/resource.py | 7 ++++++- 3 files changed, 8 insertions(+), 2 deletions(-) create mode 100644 changelog.d/16309.misc (limited to 'synapse') diff --git a/changelog.d/16309.misc b/changelog.d/16309.misc new file mode 100644 index 0000000000..bef5563ee9 --- /dev/null +++ b/changelog.d/16309.misc @@ -0,0 +1 @@ +Small improvements to logging in replication code. diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index d9045d7b73..5642666411 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -644,7 +644,7 @@ class ReplicationCommandHandler: [stream.parse_row(row) for row in rows], ) - logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token) + logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token) # We've now caught up to position sent to us, notify handler. await self._replication_data_handler.on_position( diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 347467d863..1d9a29d22e 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -191,7 +191,12 @@ class ReplicationStreamer: if updates: logger.info( - "Streaming: %s -> %s", stream.NAME, updates[-1][0] + "Streaming: %s -> %s (limited: %s, updates: %s, max token: %s)", + stream.NAME, + updates[-1][0], + limited, + len(updates), + current_token, ) stream_updates_counter.labels(stream.NAME).inc(len(updates)) -- cgit 1.5.1