From f98f4f2e16a01928e0d442fef4669a1e3fca9b0f Mon Sep 17 00:00:00 2001 From: Shay Date: Wed, 26 Jul 2023 12:59:47 -0700 Subject: Remove support for legacy application service paths (#15964) --- tests/appservice/test_api.py | 53 -------------------------------------------- 1 file changed, 53 deletions(-) (limited to 'tests') diff --git a/tests/appservice/test_api.py b/tests/appservice/test_api.py index 15fce165b6..807dc2f21c 100644 --- a/tests/appservice/test_api.py +++ b/tests/appservice/test_api.py @@ -16,7 +16,6 @@ from unittest.mock import Mock from twisted.test.proto_helpers import MemoryReactor -from synapse.api.errors import HttpResponseException from synapse.appservice import ApplicationService from synapse.server import HomeServer from synapse.types import JsonDict @@ -107,58 +106,6 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): self.assertEqual(self.request_url, URL_LOCATION) self.assertEqual(result, SUCCESS_RESULT_LOCATION) - def test_fallback(self) -> None: - """ - Tests that the fallback to legacy URLs works. - """ - SUCCESS_RESULT_USER = [ - { - "protocol": PROTOCOL, - "userid": "@a:user", - "fields": { - "more": "fields", - }, - } - ] - - URL_USER = f"{URL}/_matrix/app/v1/thirdparty/user/{PROTOCOL}" - FALLBACK_URL_USER = f"{URL}/_matrix/app/unstable/thirdparty/user/{PROTOCOL}" - - self.request_url = None - self.v1_seen = False - - async def get_json( - url: str, - args: Mapping[Any, Any], - headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]], - ) -> List[JsonDict]: - # Ensure the access token is passed as both a header and query arg. - if not headers.get("Authorization") or not args.get(b"access_token"): - raise RuntimeError("Access token not provided") - - self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) - self.assertEqual(args.get(b"access_token"), TOKEN) - self.request_url = url - if url == URL_USER: - self.v1_seen = True - raise HttpResponseException(404, "NOT_FOUND", b"NOT_FOUND") - elif url == FALLBACK_URL_USER: - return SUCCESS_RESULT_USER - else: - raise RuntimeError( - "URL provided was invalid. This should never be seen." - ) - - # We assign to a method, which mypy doesn't like. - self.api.get_json = Mock(side_effect=get_json) # type: ignore[assignment] - - result = self.get_success( - self.api.query_3pe(self.service, "user", PROTOCOL, {b"some": [b"field"]}) - ) - self.assertTrue(self.v1_seen) - self.assertEqual(self.request_url, FALLBACK_URL_USER) - self.assertEqual(result, SUCCESS_RESULT_USER) - def test_claim_keys(self) -> None: """ Tests that the /keys/claim response is properly parsed for missing -- cgit 1.5.1 From a719b703d9bd0dade2565ddcad0e2f3a7a9d4c37 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 27 Jul 2023 15:45:05 +0200 Subject: Fix 404 on /profile when the display name is empty but not the avatar (#16012) --- changelog.d/16012.bugfix | 1 + synapse/handlers/profile.py | 2 +- tests/handlers/test_profile.py | 10 ++++++++++ 3 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16012.bugfix (limited to 'tests') diff --git a/changelog.d/16012.bugfix b/changelog.d/16012.bugfix new file mode 100644 index 0000000000..44ca9377ff --- /dev/null +++ b/changelog.d/16012.bugfix @@ -0,0 +1 @@ +Fix 404 not found code returned on profile endpoint when the display name is empty but not the avatar URL. diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index a7f8c5e636..c7fe101cd9 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -68,7 +68,7 @@ class ProfileHandler: if self.hs.is_mine(target_user): profileinfo = await self.store.get_profileinfo(target_user) - if profileinfo.display_name is None: + if profileinfo.display_name is None and profileinfo.avatar_url is None: raise SynapseError(404, "Profile was not found", Codes.NOT_FOUND) return { diff --git a/tests/handlers/test_profile.py b/tests/handlers/test_profile.py index 196ceb0b82..ec2f5d30be 100644 --- a/tests/handlers/test_profile.py +++ b/tests/handlers/test_profile.py @@ -179,6 +179,16 @@ class ProfileTestCase(unittest.HomeserverTestCase): self.assertEqual("http://my.server/me.png", avatar_url) + def test_get_profile_empty_displayname(self) -> None: + self.get_success(self.store.set_profile_displayname(self.frank, None)) + self.get_success( + self.store.set_profile_avatar_url(self.frank, "http://my.server/me.png") + ) + + profile = self.get_success(self.handler.get_profile(self.frank.to_string())) + + self.assertEqual("http://my.server/me.png", profile["avatar_url"]) + def test_set_my_avatar(self) -> None: self.get_success( self.handler.set_avatar_url( -- cgit 1.5.1 From ae55cc1e6bc6527d0e359a823c474f5c9ed4382e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 31 Jul 2023 10:58:03 +0100 Subject: Add ability to wait for locks and add locks to purge history / room deletion (#15791) c.f. #13476 --- changelog.d/15791.bugfix | 1 + synapse/federation/federation_server.py | 17 +- synapse/handlers/message.py | 38 ++- synapse/handlers/pagination.py | 23 +- synapse/handlers/room_member.py | 45 +-- synapse/handlers/worker_lock.py | 333 +++++++++++++++++++++++ synapse/notifier.py | 16 ++ synapse/replication/tcp/commands.py | 33 +++ synapse/replication/tcp/handler.py | 22 ++ synapse/rest/client/room_upgrade_rest_servlet.py | 11 +- synapse/server.py | 5 + synapse/storage/controllers/persist_events.py | 27 +- synapse/storage/databases/main/lock.py | 190 ++++++++----- tests/handlers/test_worker_lock.py | 74 +++++ tests/rest/client/test_rooms.py | 4 +- tests/storage/databases/main/test_lock.py | 52 ++++ 16 files changed, 783 insertions(+), 108 deletions(-) create mode 100644 changelog.d/15791.bugfix create mode 100644 synapse/handlers/worker_lock.py create mode 100644 tests/handlers/test_worker_lock.py (limited to 'tests') diff --git a/changelog.d/15791.bugfix b/changelog.d/15791.bugfix new file mode 100644 index 0000000000..182634b62f --- /dev/null +++ b/changelog.d/15791.bugfix @@ -0,0 +1 @@ +Fix bug where purging history and paginating simultaneously could lead to database corruption when using workers. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fa61dd8c10..a90d99c4d6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -63,6 +63,7 @@ from synapse.federation.federation_base import ( ) from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( make_deferred_yieldable, @@ -137,6 +138,7 @@ class FederationServer(FederationBase): self._event_auth_handler = hs.get_event_auth_handler() self._room_member_handler = hs.get_room_member_handler() self._e2e_keys_handler = hs.get_e2e_keys_handler() + self._worker_lock_handler = hs.get_worker_locks_handler() self._state_storage_controller = hs.get_storage_controllers().state @@ -1236,9 +1238,18 @@ class FederationServer(FederationBase): logger.info("handling received PDU in room %s: %s", room_id, event) try: with nested_logging_context(event.event_id): - await self._federation_event_handler.on_receive_pdu( - origin, event - ) + # We're taking out a lock within a lock, which could + # lead to deadlocks if we're not careful. However, it is + # safe on this occasion as we only ever take a write + # lock when deleting a room, which we would never do + # while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME` + # lock. + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + await self._federation_event_handler.on_receive_pdu( + origin, event + ) except FederationError as e: # XXX: Ideally we'd inform the remote we failed to process # the event, but we can't return an error in the transaction diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fff0b5fa12..187dedae7d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -53,6 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -485,6 +486,7 @@ class EventCreationHandler: self._events_shard_config = self.config.worker.events_shard_config self._instance_name = hs.get_instance_name() self._notifier = hs.get_notifier() + self._worker_lock_handler = hs.get_worker_locks_handler() self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state @@ -1010,6 +1012,37 @@ class EventCreationHandler: event.internal_metadata.stream_ordering, ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + return await self._create_and_send_nonmember_event_locked( + requester=requester, + event_dict=event_dict, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + ratelimit=ratelimit, + txn_id=txn_id, + ignore_shadow_ban=ignore_shadow_ban, + outlier=outlier, + depth=depth, + ) + + async def _create_and_send_nonmember_event_locked( + self, + requester: Requester, + event_dict: dict, + allow_no_prev_events: bool = False, + prev_event_ids: Optional[List[str]] = None, + state_event_ids: Optional[List[str]] = None, + ratelimit: bool = True, + txn_id: Optional[str] = None, + ignore_shadow_ban: bool = False, + outlier: bool = False, + depth: Optional[int] = None, + ) -> Tuple[EventBase, int]: + room_id = event_dict["room_id"] + # If we don't have any prev event IDs specified then we need to # check that the host is in the room (as otherwise populating the # prev events will fail), at which point we may as well check the @@ -1923,7 +1956,10 @@ class EventCreationHandler: ) for room_id in room_ids: - dummy_event_sent = await self._send_dummy_event_for_room(room_id) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + dummy_event_sent = await self._send_dummy_event_for_room(room_id) if not dummy_event_sent: # Did not find a valid user in the room, so remove from future attempts diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 19b8728db9..da34658470 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -46,6 +46,11 @@ logger = logging.getLogger(__name__) BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 +PURGE_HISTORY_LOCK_NAME = "purge_history_lock" + +DELETE_ROOM_LOCK_NAME = "delete_room_lock" + + @attr.s(slots=True, auto_attribs=True) class PurgeStatus: """Object tracking the status of a purge request @@ -142,6 +147,7 @@ class PaginationHandler: self._server_name = hs.hostname self._room_shutdown_handler = hs.get_room_shutdown_handler() self._relations_handler = hs.get_relations_handler() + self._worker_locks = hs.get_worker_locks_handler() self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. @@ -356,7 +362,9 @@ class PaginationHandler: """ self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=True + ): await self._storage_controllers.purge_events.purge_history( room_id, token, delete_local_events ) @@ -412,7 +420,10 @@ class PaginationHandler: room_id: room to be purged force: set true to skip checking for joined users. """ - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_multi_read_write_lock( + [(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)], + write=True, + ): # first check that we have no users in this room if not force: joined = await self.store.is_host_joined(room_id, self._server_name) @@ -471,7 +482,9 @@ class PaginationHandler: room_token = from_token.room_key - async with self.pagination_lock.read(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=False + ): (membership, member_event_id) = (None, None) if not use_admin_priviledge: ( @@ -747,7 +760,9 @@ class PaginationHandler: self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=True + ): self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN self._delete_by_id[ delete_id diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 496e701f13..6cca2ec344 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -39,6 +39,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging import opentracing from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process @@ -94,6 +95,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.event_creation_handler = hs.get_event_creation_handler() self.account_data_handler = hs.get_account_data_handler() self.event_auth_handler = hs.get_event_auth_handler() + self._worker_lock_handler = hs.get_worker_locks_handler() self.member_linearizer: Linearizer = Linearizer(name="member") self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter") @@ -638,26 +640,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # by application services), and then by room ID. async with self.member_as_limiter.queue(as_id): async with self.member_linearizer.queue(key): - with opentracing.start_active_span("update_membership_locked"): - result = await self.update_membership_locked( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - new_room=new_room, - require_consent=require_consent, - outlier=outlier, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - depth=depth, - origin_server_ts=origin_server_ts, - ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + with opentracing.start_active_span("update_membership_locked"): + result = await self.update_membership_locked( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + new_room=new_room, + require_consent=require_consent, + outlier=outlier, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + depth=depth, + origin_server_ts=origin_server_ts, + ) return result diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py new file mode 100644 index 0000000000..72df773a86 --- /dev/null +++ b/synapse/handlers/worker_lock.py @@ -0,0 +1,333 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +from types import TracebackType +from typing import ( + TYPE_CHECKING, + AsyncContextManager, + Collection, + Dict, + Optional, + Tuple, + Type, + Union, +) +from weakref import WeakSet + +import attr + +from twisted.internet import defer +from twisted.internet.interfaces import IReactorTime + +from synapse.logging.context import PreserveLoggingContext +from synapse.logging.opentracing import start_active_span +from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.storage.databases.main.lock import Lock, LockStore +from synapse.util.async_helpers import timeout_deferred + +if TYPE_CHECKING: + from synapse.logging.opentracing import opentracing + from synapse.server import HomeServer + + +DELETE_ROOM_LOCK_NAME = "delete_room_lock" + + +class WorkerLocksHandler: + """A class for waiting on taking out locks, rather than using the storage + functions directly (which don't support awaiting). + """ + + def __init__(self, hs: "HomeServer") -> None: + self._reactor = hs.get_reactor() + self._store = hs.get_datastores().main + self._clock = hs.get_clock() + self._notifier = hs.get_notifier() + self._instance_name = hs.get_instance_name() + + # Map from lock name/key to set of `WaitingLock` that are active for + # that lock. + self._locks: Dict[ + Tuple[str, str], WeakSet[Union[WaitingLock, WaitingMultiLock]] + ] = {} + + self._clock.looping_call(self._cleanup_locks, 30_000) + + self._notifier.add_lock_released_callback(self._on_lock_released) + + def acquire_lock(self, lock_name: str, lock_key: str) -> "WaitingLock": + """Acquire a standard lock, returns a context manager that will block + until the lock is acquired. + + Note: Care must be taken to avoid deadlocks. In particular, this + function does *not* timeout. + + Usage: + async with handler.acquire_lock(name, key): + # Do work while holding the lock... + """ + + lock = WaitingLock( + reactor=self._reactor, + store=self._store, + handler=self, + lock_name=lock_name, + lock_key=lock_key, + write=None, + ) + + self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock) + + return lock + + def acquire_read_write_lock( + self, + lock_name: str, + lock_key: str, + *, + write: bool, + ) -> "WaitingLock": + """Acquire a read/write lock, returns a context manager that will block + until the lock is acquired. + + Note: Care must be taken to avoid deadlocks. In particular, this + function does *not* timeout. + + Usage: + async with handler.acquire_read_write_lock(name, key, write=True): + # Do work while holding the lock... + """ + + lock = WaitingLock( + reactor=self._reactor, + store=self._store, + handler=self, + lock_name=lock_name, + lock_key=lock_key, + write=write, + ) + + self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock) + + return lock + + def acquire_multi_read_write_lock( + self, + lock_names: Collection[Tuple[str, str]], + *, + write: bool, + ) -> "WaitingMultiLock": + """Acquires multi read/write locks at once, returns a context manager + that will block until all the locks are acquired. + + This will try and acquire all locks at once, and will never hold on to a + subset of the locks. (This avoids accidentally creating deadlocks). + + Note: Care must be taken to avoid deadlocks. In particular, this + function does *not* timeout. + """ + + lock = WaitingMultiLock( + lock_names=lock_names, + write=write, + reactor=self._reactor, + store=self._store, + handler=self, + ) + + for lock_name, lock_key in lock_names: + self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock) + + return lock + + def notify_lock_released(self, lock_name: str, lock_key: str) -> None: + """Notify that a lock has been released. + + Pokes both the notifier and replication. + """ + + self._notifier.notify_lock_released(self._instance_name, lock_name, lock_key) + + def _on_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: + """Called when a lock has been released. + + Wakes up any locks that might be waiting on this. + """ + locks = self._locks.get((lock_name, lock_key)) + if not locks: + return + + def _wake_deferred(deferred: defer.Deferred) -> None: + if not deferred.called: + deferred.callback(None) + + for lock in locks: + self._clock.call_later(0, _wake_deferred, lock.deferred) + + @wrap_as_background_process("_cleanup_locks") + async def _cleanup_locks(self) -> None: + """Periodically cleans out stale entries in the locks map""" + self._locks = {key: value for key, value in self._locks.items() if value} + + +@attr.s(auto_attribs=True, eq=False) +class WaitingLock: + reactor: IReactorTime + store: LockStore + handler: WorkerLocksHandler + lock_name: str + lock_key: str + write: Optional[bool] + deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) + _inner_lock: Optional[Lock] = None + _retry_interval: float = 0.1 + _lock_span: "opentracing.Scope" = attr.Factory( + lambda: start_active_span("WaitingLock.lock") + ) + + async def __aenter__(self) -> None: + self._lock_span.__enter__() + + with start_active_span("WaitingLock.waiting_for_lock"): + while self._inner_lock is None: + self.deferred = defer.Deferred() + + if self.write is not None: + lock = await self.store.try_acquire_read_write_lock( + self.lock_name, self.lock_key, write=self.write + ) + else: + lock = await self.store.try_acquire_lock( + self.lock_name, self.lock_key + ) + + if lock: + self._inner_lock = lock + break + + try: + # Wait until the we get notified the lock might have been + # released (by the deferred being resolved). We also + # periodically wake up in case the lock was released but we + # weren't notified. + with PreserveLoggingContext(): + await timeout_deferred( + deferred=self.deferred, + timeout=self._get_next_retry_interval(), + reactor=self.reactor, + ) + except Exception: + pass + + return await self._inner_lock.__aenter__() + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> Optional[bool]: + assert self._inner_lock + + self.handler.notify_lock_released(self.lock_name, self.lock_key) + + try: + r = await self._inner_lock.__aexit__(exc_type, exc, tb) + finally: + self._lock_span.__exit__(exc_type, exc, tb) + + return r + + def _get_next_retry_interval(self) -> float: + next = self._retry_interval + self._retry_interval = max(5, next * 2) + return next * random.uniform(0.9, 1.1) + + +@attr.s(auto_attribs=True, eq=False) +class WaitingMultiLock: + lock_names: Collection[Tuple[str, str]] + + write: bool + + reactor: IReactorTime + store: LockStore + handler: WorkerLocksHandler + + deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) + + _inner_lock_cm: Optional[AsyncContextManager] = None + _retry_interval: float = 0.1 + _lock_span: "opentracing.Scope" = attr.Factory( + lambda: start_active_span("WaitingLock.lock") + ) + + async def __aenter__(self) -> None: + self._lock_span.__enter__() + + with start_active_span("WaitingLock.waiting_for_lock"): + while self._inner_lock_cm is None: + self.deferred = defer.Deferred() + + lock_cm = await self.store.try_acquire_multi_read_write_lock( + self.lock_names, write=self.write + ) + + if lock_cm: + self._inner_lock_cm = lock_cm + break + + try: + # Wait until the we get notified the lock might have been + # released (by the deferred being resolved). We also + # periodically wake up in case the lock was released but we + # weren't notified. + with PreserveLoggingContext(): + await timeout_deferred( + deferred=self.deferred, + timeout=self._get_next_retry_interval(), + reactor=self.reactor, + ) + except Exception: + pass + + assert self._inner_lock_cm + await self._inner_lock_cm.__aenter__() + return + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> Optional[bool]: + assert self._inner_lock_cm + + for lock_name, lock_key in self.lock_names: + self.handler.notify_lock_released(lock_name, lock_key) + + try: + r = await self._inner_lock_cm.__aexit__(exc_type, exc, tb) + finally: + self._lock_span.__exit__(exc_type, exc, tb) + + return r + + def _get_next_retry_interval(self) -> float: + next = self._retry_interval + self._retry_interval = max(5, next * 2) + return next * random.uniform(0.9, 1.1) diff --git a/synapse/notifier.py b/synapse/notifier.py index 897272ad5b..68115bca70 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -234,6 +234,9 @@ class Notifier: self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules + # List of callbacks to be notified when a lock is released + self._lock_released_callback: List[Callable[[str, str, str], None]] = [] + self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() self._pusher_pool = hs.get_pusherpool() @@ -785,6 +788,19 @@ class Notifier: # that any in flight requests can be immediately retried. self._federation_client.wake_destination(server) + def add_lock_released_callback( + self, callback: Callable[[str, str, str], None] + ) -> None: + """Add a function to be called whenever we are notified about a released lock.""" + self._lock_released_callback.append(callback) + + def notify_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: + """Notify the callbacks that a lock has been released.""" + for cb in self._lock_released_callback: + cb(instance_name, lock_name, lock_key) + @attr.s(auto_attribs=True) class ReplicationNotifier: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 32f52e54d8..10f5c98ff8 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -422,6 +422,36 @@ class RemoteServerUpCommand(_SimpleCommand): NAME = "REMOTE_SERVER_UP" +class LockReleasedCommand(Command): + """Sent to inform other instances that a given lock has been dropped. + + Format:: + + LOCK_RELEASED ["", "", ""] + """ + + NAME = "LOCK_RELEASED" + + def __init__( + self, + instance_name: str, + lock_name: str, + lock_key: str, + ): + self.instance_name = instance_name + self.lock_name = lock_name + self.lock_key = lock_key + + @classmethod + def from_line(cls: Type["LockReleasedCommand"], line: str) -> "LockReleasedCommand": + instance_name, lock_name, lock_key = json_decoder.decode(line) + + return cls(instance_name, lock_name, lock_key) + + def to_line(self) -> str: + return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key]) + + _COMMANDS: Tuple[Type[Command], ...] = ( ServerCommand, RdataCommand, @@ -435,6 +465,7 @@ _COMMANDS: Tuple[Type[Command], ...] = ( UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, + LockReleasedCommand, ) # Map of command name to command type. @@ -448,6 +479,7 @@ VALID_SERVER_COMMANDS = ( ErrorCommand.NAME, PingCommand.NAME, RemoteServerUpCommand.NAME, + LockReleasedCommand.NAME, ) # The commands the client is allowed to send @@ -461,6 +493,7 @@ VALID_CLIENT_COMMANDS = ( UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, + LockReleasedCommand.NAME, ) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 5d108fe11b..a2cabba7b1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -39,6 +39,7 @@ from synapse.replication.tcp.commands import ( ClearUserSyncsCommand, Command, FederationAckCommand, + LockReleasedCommand, PositionCommand, RdataCommand, RemoteServerUpCommand, @@ -248,6 +249,9 @@ class ReplicationCommandHandler: if self._is_master or self._should_insert_client_ips: self.subscribe_to_channel("USER_IP") + if hs.config.redis.redis_enabled: + self._notifier.add_lock_released_callback(self.on_lock_released) + def subscribe_to_channel(self, channel_name: str) -> None: """ Indicates that we wish to subscribe to a Redis channel by name. @@ -648,6 +652,17 @@ class ReplicationCommandHandler: self._notifier.notify_remote_server_up(cmd.data) + def on_LOCK_RELEASED( + self, conn: IReplicationConnection, cmd: LockReleasedCommand + ) -> None: + """Called when we get a new LOCK_RELEASED command.""" + if cmd.instance_name == self._instance_name: + return + + self._notifier.notify_lock_released( + cmd.instance_name, cmd.lock_name, cmd.lock_key + ) + def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" self._connections.append(connection) @@ -754,6 +769,13 @@ class ReplicationCommandHandler: """ self.send_command(RdataCommand(stream_name, self._instance_name, token, data)) + def on_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: + """Called when we released a lock and should notify other instances.""" + if instance_name == self._instance_name: + self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key)) + UpdateToken = TypeVar("UpdateToken") UpdateRow = TypeVar("UpdateRow") diff --git a/synapse/rest/client/room_upgrade_rest_servlet.py b/synapse/rest/client/room_upgrade_rest_servlet.py index 6a7792e18b..4a5d9e13e7 100644 --- a/synapse/rest/client/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/room_upgrade_rest_servlet.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, Tuple from synapse.api.errors import Codes, ShadowBanError, SynapseError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, @@ -60,6 +61,7 @@ class RoomUpgradeRestServlet(RestServlet): self._hs = hs self._room_creation_handler = hs.get_room_creation_handler() self._auth = hs.get_auth() + self._worker_lock_handler = hs.get_worker_locks_handler() async def on_POST( self, request: SynapseRequest, room_id: str @@ -78,9 +80,12 @@ class RoomUpgradeRestServlet(RestServlet): ) try: - new_room_id = await self._room_creation_handler.upgrade_room( - requester, room_id, new_version - ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + new_room_id = await self._room_creation_handler.upgrade_room( + requester, room_id, new_version + ) except ShadowBanError: # Generate a random room ID. new_room_id = stringutils.random_string(18) diff --git a/synapse/server.py b/synapse/server.py index b72b76a38b..8430f99ef2 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -107,6 +107,7 @@ from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler from synapse.handlers.user_directory import UserDirectoryHandler +from synapse.handlers.worker_lock import WorkerLocksHandler from synapse.http.client import ( InsecureInterceptableContextFactory, ReplicationClient, @@ -912,3 +913,7 @@ class HomeServer(metaclass=abc.ABCMeta): def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager: """Usage metrics shared between phone home stats and the prometheus exporter.""" return CommonUsageMetricsManager(self) + + @cache_in_self + def get_worker_locks_handler(self) -> WorkerLocksHandler: + return WorkerLocksHandler(self) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 35c0680365..35cd1089d6 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -45,6 +45,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.opentracing import ( SynapseTags, @@ -338,6 +339,7 @@ class EventsPersistenceStorageController: ) self._state_resolution_handler = hs.get_state_resolution_handler() self._state_controller = state_controller + self.hs = hs async def _process_event_persist_queue_task( self, @@ -350,15 +352,22 @@ class EventsPersistenceStorageController: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. """ - if isinstance(task, _PersistEventsTask): - return await self._persist_event_batch(room_id, task) - elif isinstance(task, _UpdateCurrentStateTask): - await self._update_current_state(room_id, task) - return {} - else: - raise AssertionError( - f"Found an unexpected task type in event persistence queue: {task}" - ) + + # Ensure that the room can't be deleted while we're persisting events to + # it. We might already have taken out the lock, but since this is just a + # "read" lock its inherently reentrant. + async with self.hs.get_worker_locks_handler().acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + if isinstance(task, _PersistEventsTask): + return await self._persist_event_batch(room_id, task) + elif isinstance(task, _UpdateCurrentStateTask): + await self._update_current_state(room_id, task) + return {} + else: + raise AssertionError( + f"Found an unexpected task type in event persistence queue: {task}" + ) @trace async def persist_events( diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index c89b4f7919..1680bf6168 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from contextlib import AsyncExitStack from types import TracebackType -from typing import TYPE_CHECKING, Optional, Set, Tuple, Type +from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type from weakref import WeakValueDictionary from twisted.internet.interfaces import IReactorCore @@ -208,76 +209,85 @@ class LockStore(SQLBaseStore): used (otherwise the lock will leak). """ + try: + lock = await self.db_pool.runInteraction( + "try_acquire_read_write_lock", + self._try_acquire_read_write_lock_txn, + lock_name, + lock_key, + write, + ) + except self.database_engine.module.IntegrityError: + return None + + return lock + + def _try_acquire_read_write_lock_txn( + self, + txn: LoggingTransaction, + lock_name: str, + lock_key: str, + write: bool, + ) -> "Lock": + # We attempt to acquire the lock by inserting into + # `worker_read_write_locks` and seeing if that fails any + # constraints. If it doesn't then we have acquired the lock, + # otherwise we haven't. + # + # Before that though we clear the table of any stale locks. + now = self._clock.time_msec() token = random_string(6) - def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None: - # We attempt to acquire the lock by inserting into - # `worker_read_write_locks` and seeing if that fails any - # constraints. If it doesn't then we have acquired the lock, - # otherwise we haven't. - # - # Before that though we clear the table of any stale locks. - - delete_sql = """ - DELETE FROM worker_read_write_locks - WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; - """ - - insert_sql = """ - INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) - VALUES (?, ?, ?, ?, ?, ?) - """ - - if isinstance(self.database_engine, PostgresEngine): - # For Postgres we can send these queries at the same time. - txn.execute( - delete_sql + ";" + insert_sql, - ( - # DELETE args - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - # UPSERT args - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) - else: - # For SQLite these need to be two queries. - txn.execute( - delete_sql, - ( - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - ), - ) - txn.execute( - insert_sql, - ( - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) + delete_sql = """ + DELETE FROM worker_read_write_locks + WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; + """ - return + insert_sql = """ + INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) + VALUES (?, ?, ?, ?, ?, ?) + """ - try: - await self.db_pool.runInteraction( - "try_acquire_read_write_lock", - _try_acquire_read_write_lock_txn, + if isinstance(self.database_engine, PostgresEngine): + # For Postgres we can send these queries at the same time. + txn.execute( + delete_sql + ";" + insert_sql, + ( + # DELETE args + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + # UPSERT args + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), + ) + else: + # For SQLite these need to be two queries. + txn.execute( + delete_sql, + ( + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + ), + ) + txn.execute( + insert_sql, + ( + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), ) - except self.database_engine.module.IntegrityError: - return None lock = Lock( self._reactor, @@ -289,10 +299,58 @@ class LockStore(SQLBaseStore): token=token, ) - self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock + def set_lock() -> None: + self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock + + txn.call_after(set_lock) return lock + async def try_acquire_multi_read_write_lock( + self, + lock_names: Collection[Tuple[str, str]], + write: bool, + ) -> Optional[AsyncExitStack]: + """Try to acquire multiple locks for the given names/keys. Will return + an async context manager if the locks are successfully acquired, which + *must* be used (otherwise the lock will leak). + + If only a subset of the locks can be acquired then it will immediately + drop them and return `None`. + """ + try: + locks = await self.db_pool.runInteraction( + "try_acquire_multi_read_write_lock", + self._try_acquire_multi_read_write_lock_txn, + lock_names, + write, + ) + except self.database_engine.module.IntegrityError: + return None + + stack = AsyncExitStack() + + for lock in locks: + await stack.enter_async_context(lock) + + return stack + + def _try_acquire_multi_read_write_lock_txn( + self, + txn: LoggingTransaction, + lock_names: Collection[Tuple[str, str]], + write: bool, + ) -> Collection["Lock"]: + locks = [] + + for lock_name, lock_key in lock_names: + lock = self._try_acquire_read_write_lock_txn( + txn, lock_name, lock_key, write + ) + locks.append(lock) + + return locks + class Lock: """An async context manager that manages an acquired lock, ensuring it is diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py new file mode 100644 index 0000000000..73e548726c --- /dev/null +++ b/tests/handlers/test_worker_lock.py @@ -0,0 +1,74 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from twisted.test.proto_helpers import MemoryReactor + +from synapse.server import HomeServer +from synapse.util import Clock + +from tests import unittest +from tests.replication._base import BaseMultiWorkerStreamTestCase + + +class WorkerLockTestCase(unittest.HomeserverTestCase): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self.worker_lock_handler = self.hs.get_worker_locks_handler() + + def test_wait_for_lock_locally(self) -> None: + """Test waiting for a lock on a single worker""" + + lock1 = self.worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) + + lock2 = self.worker_lock_handler.acquire_lock("name", "key") + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None)) + + +class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self.main_worker_lock_handler = self.hs.get_worker_locks_handler() + + def test_wait_for_lock_worker(self) -> None: + """Test waiting for a lock on another worker""" + + worker = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "redis": {"enabled": True}, + }, + ) + worker_lock_handler = worker.get_worker_locks_handler() + + lock1 = self.main_worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) + + lock2 = worker_lock_handler.acquire_lock("name", "key") + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None)) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index d013e75d55..4f6347be15 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -711,7 +711,7 @@ class RoomsCreateTestCase(RoomBase): self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(30, channel.resource_usage.db_txn_count) + self.assertEqual(32, channel.resource_usage.db_txn_count) def test_post_room_initial_state(self) -> None: # POST with initial_state config key, expect new room id @@ -724,7 +724,7 @@ class RoomsCreateTestCase(RoomBase): self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(32, channel.resource_usage.db_txn_count) + self.assertEqual(34, channel.resource_usage.db_txn_count) def test_post_room_visibility_key(self) -> None: # POST with visibility config key, expect new room id diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index ad454f6dd8..383da83dfb 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -448,3 +448,55 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): self.get_success(self.store._on_shutdown()) self.assertEqual(self.store._live_read_write_lock_tokens, {}) + + def test_acquire_multiple_locks(self) -> None: + """Tests that acquiring multiple locks at once works.""" + + # Take out multiple locks and ensure that we can't get those locks out + # again. + lock = self.get_success( + self.store.try_acquire_multi_read_write_lock( + [("name1", "key1"), ("name2", "key2")], write=True + ) + ) + self.assertIsNotNone(lock) + + assert lock is not None + self.get_success(lock.__aenter__()) + + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name1", "key1", write=True) + ) + self.assertIsNone(lock2) + + lock3 = self.get_success( + self.store.try_acquire_read_write_lock("name2", "key2", write=False) + ) + self.assertIsNone(lock3) + + # Overlapping locks attempts will fail, and won't lock any locks. + lock4 = self.get_success( + self.store.try_acquire_multi_read_write_lock( + [("name1", "key1"), ("name3", "key3")], write=True + ) + ) + self.assertIsNone(lock4) + + lock5 = self.get_success( + self.store.try_acquire_read_write_lock("name3", "key3", write=True) + ) + self.assertIsNotNone(lock5) + assert lock5 is not None + self.get_success(lock5.__aenter__()) + self.get_success(lock5.__aexit__(None, None, None)) + + # Once we release the lock we can take out the locks again. + self.get_success(lock.__aexit__(None, None, None)) + + lock6 = self.get_success( + self.store.try_acquire_read_write_lock("name1", "key1", write=True) + ) + self.assertIsNotNone(lock6) + assert lock6 is not None + self.get_success(lock6.__aenter__()) + self.get_success(lock6.__aexit__(None, None, None)) -- cgit 1.5.1 From 5eb3fd785bdbf2ae07031f13a6ac5fb578adc338 Mon Sep 17 00:00:00 2001 From: Mohit Rathee Date: Tue, 1 Aug 2023 18:44:02 +0530 Subject: Trim whitespace when setting display names (#16031) --- changelog.d/16031.bugfix | 1 + synapse/handlers/profile.py | 2 +- tests/rest/client/test_profile.py | 12 ++++++++++++ 3 files changed, 14 insertions(+), 1 deletion(-) create mode 100644 changelog.d/16031.bugfix (limited to 'tests') diff --git a/changelog.d/16031.bugfix b/changelog.d/16031.bugfix new file mode 100644 index 0000000000..e48bf3975c --- /dev/null +++ b/changelog.d/16031.bugfix @@ -0,0 +1 @@ +Remove leading and trailing spaces when setting a display name. diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index c7fe101cd9..c2109036ec 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -163,7 +163,7 @@ class ProfileHandler: 400, "Displayname is too long (max %i)" % (MAX_DISPLAYNAME_LEN,) ) - displayname_to_set: Optional[str] = new_displayname + displayname_to_set: Optional[str] = new_displayname.strip() if new_displayname == "": displayname_to_set = None diff --git a/tests/rest/client/test_profile.py b/tests/rest/client/test_profile.py index 27c93ad761..ecae092b47 100644 --- a/tests/rest/client/test_profile.py +++ b/tests/rest/client/test_profile.py @@ -68,6 +68,18 @@ class ProfileTestCase(unittest.HomeserverTestCase): res = self._get_displayname() self.assertEqual(res, "test") + def test_set_displayname_with_extra_spaces(self) -> None: + channel = self.make_request( + "PUT", + "/profile/%s/displayname" % (self.owner,), + content={"displayname": " test "}, + access_token=self.owner_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + res = self._get_displayname() + self.assertEqual(res, "test") + def test_set_displayname_noauth(self) -> None: channel = self.make_request( "PUT", -- cgit 1.5.1 From 01a45869f034265b9757992aa1a5eb7a0923351c Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Aug 2023 08:41:32 -0400 Subject: Update MSC3958 support to interact with intentional mentions. (#15992) * Updates the rule ID. * Use `event_property_is` instead of `event_match`. This updates the implementation of MSC3958 to match the latest text from the MSC. --- changelog.d/15992.misc | 1 + rust/benches/evaluator.rs | 27 +++++++++++---------- rust/src/push/base_rules.rs | 37 ++++++++++++++--------------- rust/src/push/evaluator.rs | 14 ++++++----- rust/src/push/mod.rs | 6 ++--- tests/push/test_bulk_push_rule_evaluator.py | 21 ++++++++++++++-- 6 files changed, 64 insertions(+), 42 deletions(-) create mode 100644 changelog.d/15992.misc (limited to 'tests') diff --git a/changelog.d/15992.misc b/changelog.d/15992.misc new file mode 100644 index 0000000000..539f55b475 --- /dev/null +++ b/changelog.d/15992.misc @@ -0,0 +1 @@ +Update support for [MSC3958](https://github.com/matrix-org/matrix-spec-proposals/pull/3958) to match the latest revision of the MSC. diff --git a/rust/benches/evaluator.rs b/rust/benches/evaluator.rs index c2f33258a4..6e1eab2a3b 100644 --- a/rust/benches/evaluator.rs +++ b/rust/benches/evaluator.rs @@ -13,6 +13,9 @@ // limitations under the License. #![feature(test)] + +use std::borrow::Cow; + use synapse::push::{ evaluator::PushRuleEvaluator, Condition, EventMatchCondition, FilteredPushRules, JsonValue, PushRules, SimpleJsonValue, @@ -26,15 +29,15 @@ fn bench_match_exact(b: &mut Bencher) { let flattened_keys = [ ( "type".to_string(), - JsonValue::Value(SimpleJsonValue::Str("m.text".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("m.text"))), ), ( "room_id".to_string(), - JsonValue::Value(SimpleJsonValue::Str("!room:server".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("!room:server"))), ), ( "content.body".to_string(), - JsonValue::Value(SimpleJsonValue::Str("test message".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("test message"))), ), ] .into_iter() @@ -71,15 +74,15 @@ fn bench_match_word(b: &mut Bencher) { let flattened_keys = [ ( "type".to_string(), - JsonValue::Value(SimpleJsonValue::Str("m.text".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("m.text"))), ), ( "room_id".to_string(), - JsonValue::Value(SimpleJsonValue::Str("!room:server".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("!room:server"))), ), ( "content.body".to_string(), - JsonValue::Value(SimpleJsonValue::Str("test message".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("test message"))), ), ] .into_iter() @@ -116,15 +119,15 @@ fn bench_match_word_miss(b: &mut Bencher) { let flattened_keys = [ ( "type".to_string(), - JsonValue::Value(SimpleJsonValue::Str("m.text".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("m.text"))), ), ( "room_id".to_string(), - JsonValue::Value(SimpleJsonValue::Str("!room:server".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("!room:server"))), ), ( "content.body".to_string(), - JsonValue::Value(SimpleJsonValue::Str("test message".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("test message"))), ), ] .into_iter() @@ -161,15 +164,15 @@ fn bench_eval_message(b: &mut Bencher) { let flattened_keys = [ ( "type".to_string(), - JsonValue::Value(SimpleJsonValue::Str("m.text".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("m.text"))), ), ( "room_id".to_string(), - JsonValue::Value(SimpleJsonValue::Str("!room:server".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("!room:server"))), ), ( "content.body".to_string(), - JsonValue::Value(SimpleJsonValue::Str("test message".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("test message"))), ), ] .into_iter() diff --git a/rust/src/push/base_rules.rs b/rust/src/push/base_rules.rs index 7eea9313f0..00baceda91 100644 --- a/rust/src/push/base_rules.rs +++ b/rust/src/push/base_rules.rs @@ -63,22 +63,6 @@ pub const BASE_PREPEND_OVERRIDE_RULES: &[PushRule] = &[PushRule { }]; pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[ - // We don't want to notify on edits. Not only can this be confusing in real - // time (2 notifications, one message) but it's especially confusing - // if a bridge needs to edit a previously backfilled message. - PushRule { - rule_id: Cow::Borrowed("global/override/.com.beeper.suppress_edits"), - priority_class: 5, - conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventMatch( - EventMatchCondition { - key: Cow::Borrowed("content.m\\.relates_to.rel_type"), - pattern: Cow::Borrowed("m.replace"), - }, - ))]), - actions: Cow::Borrowed(&[]), - default: true, - default_enabled: true, - }, PushRule { rule_id: Cow::Borrowed("global/override/.m.rule.suppress_notices"), priority_class: 5, @@ -146,7 +130,7 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[ priority_class: 5, conditions: Cow::Borrowed(&[Condition::Known( KnownCondition::ExactEventPropertyContainsType(EventPropertyIsTypeCondition { - key: Cow::Borrowed("content.m\\.mentions.user_ids"), + key: Cow::Borrowed(r"content.m\.mentions.user_ids"), value_type: Cow::Borrowed(&EventMatchPatternType::UserId), }), )]), @@ -167,8 +151,8 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[ priority_class: 5, conditions: Cow::Borrowed(&[ Condition::Known(KnownCondition::EventPropertyIs(EventPropertyIsCondition { - key: Cow::Borrowed("content.m\\.mentions.room"), - value: Cow::Borrowed(&SimpleJsonValue::Bool(true)), + key: Cow::Borrowed(r"content.m\.mentions.room"), + value: Cow::Owned(SimpleJsonValue::Bool(true)), })), Condition::Known(KnownCondition::SenderNotificationPermission { key: Cow::Borrowed("room"), @@ -241,6 +225,21 @@ pub const BASE_APPEND_OVERRIDE_RULES: &[PushRule] = &[ default: true, default_enabled: true, }, + // We don't want to notify on edits *unless* the edit directly mentions a + // user, which is handled above. + PushRule { + rule_id: Cow::Borrowed("global/override/.org.matrix.msc3958.suppress_edits"), + priority_class: 5, + conditions: Cow::Borrowed(&[Condition::Known(KnownCondition::EventPropertyIs( + EventPropertyIsCondition { + key: Cow::Borrowed(r"content.m\.relates_to.rel_type"), + value: Cow::Owned(SimpleJsonValue::Str(Cow::Borrowed("m.replace"))), + }, + ))]), + actions: Cow::Borrowed(&[]), + default: true, + default_enabled: true, + }, PushRule { rule_id: Cow::Borrowed("global/override/.org.matrix.msc3930.rule.poll_response"), priority_class: 5, diff --git a/rust/src/push/evaluator.rs b/rust/src/push/evaluator.rs index 59c53b1776..48e670478b 100644 --- a/rust/src/push/evaluator.rs +++ b/rust/src/push/evaluator.rs @@ -117,7 +117,7 @@ impl PushRuleEvaluator { msc3931_enabled: bool, ) -> Result { let body = match flattened_keys.get("content.body") { - Some(JsonValue::Value(SimpleJsonValue::Str(s))) => s.clone(), + Some(JsonValue::Value(SimpleJsonValue::Str(s))) => s.clone().into_owned(), _ => String::new(), }; @@ -313,13 +313,15 @@ impl PushRuleEvaluator { }; let pattern = match &*exact_event_match.value_type { - EventMatchPatternType::UserId => user_id, - EventMatchPatternType::UserLocalpart => get_localpart_from_id(user_id)?, + EventMatchPatternType::UserId => user_id.to_owned(), + EventMatchPatternType::UserLocalpart => { + get_localpart_from_id(user_id)?.to_owned() + } }; self.match_event_property_contains( exact_event_match.key.clone(), - Cow::Borrowed(&SimpleJsonValue::Str(pattern.to_string())), + Cow::Borrowed(&SimpleJsonValue::Str(Cow::Owned(pattern))), )? } KnownCondition::ContainsDisplayName => { @@ -494,7 +496,7 @@ fn push_rule_evaluator() { let mut flattened_keys = BTreeMap::new(); flattened_keys.insert( "content.body".to_string(), - JsonValue::Value(SimpleJsonValue::Str("foo bar bob hello".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("foo bar bob hello"))), ); let evaluator = PushRuleEvaluator::py_new( flattened_keys, @@ -522,7 +524,7 @@ fn test_requires_room_version_supports_condition() { let mut flattened_keys = BTreeMap::new(); flattened_keys.insert( "content.body".to_string(), - JsonValue::Value(SimpleJsonValue::Str("foo bar bob hello".to_string())), + JsonValue::Value(SimpleJsonValue::Str(Cow::Borrowed("foo bar bob hello"))), ); let flags = vec![RoomVersionFeatures::ExtensibleEvents.as_str().to_string()]; let evaluator = PushRuleEvaluator::py_new( diff --git a/rust/src/push/mod.rs b/rust/src/push/mod.rs index 514980579b..829fb79d0e 100644 --- a/rust/src/push/mod.rs +++ b/rust/src/push/mod.rs @@ -256,7 +256,7 @@ impl<'de> Deserialize<'de> for Action { #[derive(Serialize, Deserialize, Debug, Clone, PartialEq, Eq)] #[serde(untagged)] pub enum SimpleJsonValue { - Str(String), + Str(Cow<'static, str>), Int(i64), Bool(bool), Null, @@ -265,7 +265,7 @@ pub enum SimpleJsonValue { impl<'source> FromPyObject<'source> for SimpleJsonValue { fn extract(ob: &'source PyAny) -> PyResult { if let Ok(s) = ::try_from(ob) { - Ok(SimpleJsonValue::Str(s.to_string())) + Ok(SimpleJsonValue::Str(Cow::Owned(s.to_string()))) // A bool *is* an int, ensure we try bool first. } else if let Ok(b) = ::try_from(ob) { Ok(SimpleJsonValue::Bool(b.extract()?)) @@ -585,7 +585,7 @@ impl FilteredPushRules { } if !self.msc3958_suppress_edits_enabled - && rule.rule_id == "global/override/.com.beeper.suppress_edits" + && rule.rule_id == "global/override/.org.matrix.msc3958.suppress_edits" { return false; } diff --git a/tests/push/test_bulk_push_rule_evaluator.py b/tests/push/test_bulk_push_rule_evaluator.py index 1e06f86071..829b9df83d 100644 --- a/tests/push/test_bulk_push_rule_evaluator.py +++ b/tests/push/test_bulk_push_rule_evaluator.py @@ -409,12 +409,12 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase): ) ) - # Room mentions from those without power should not notify. + # The edit should not cause a notification. self.assertFalse( self._create_and_process( bulk_evaluator, { - "body": self.alice, + "body": "Test message", "m.relates_to": { "rel_type": RelationTypes.REPLACE, "event_id": event.event_id, @@ -422,3 +422,20 @@ class TestBulkPushRuleEvaluator(HomeserverTestCase): }, ) ) + + # An edit which is a mention will cause a notification. + self.assertTrue( + self._create_and_process( + bulk_evaluator, + { + "body": "Test message", + "m.relates_to": { + "rel_type": RelationTypes.REPLACE, + "event_id": event.event_id, + }, + "m.mentions": { + "user_ids": [self.alice], + }, + }, + ) + ) -- cgit 1.5.1 From 4f5bccbbba13ba10412497cb92a1460535cf7a25 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 2 Aug 2023 11:35:54 -0400 Subject: Add forward-compatibility for the redacts property (MSC2174). (#16013) The location of the redacts field changes in room version 11. Ensure it is copied to the *new* location for *old* room versions for forwards-compatibility with clients. Note that copying it to the *old* location for the *new* room version was previously handled. --- changelog.d/16013.misc | 1 + synapse/events/utils.py | 18 +++++----- tests/rest/client/test_redactions.py | 67 +++++++++++++++++++++++++++--------- 3 files changed, 61 insertions(+), 25 deletions(-) create mode 100644 changelog.d/16013.misc (limited to 'tests') diff --git a/changelog.d/16013.misc b/changelog.d/16013.misc new file mode 100644 index 0000000000..bd161e13ed --- /dev/null +++ b/changelog.d/16013.misc @@ -0,0 +1 @@ +Properly overwrite the `redacts` content-property for forwards-compatibility with room versions 1 through 10. diff --git a/synapse/events/utils.py b/synapse/events/utils.py index c890833b1d..967a6c245b 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -475,14 +475,16 @@ def serialize_event( if config.as_client_event: d = config.event_format(d) - # If the event is a redaction, copy the redacts field from the content to - # top-level for backwards compatibility. - if ( - e.type == EventTypes.Redaction - and e.room_version.updated_redaction_rules - and e.redacts is not None - ): - d["redacts"] = e.redacts + # If the event is a redaction, the field with the redacted event ID appears + # in a different location depending on the room version. e.redacts handles + # fetching from the proper location; copy it to the other location for forwards- + # and backwards-compatibility with clients. + if e.type == EventTypes.Redaction and e.redacts is not None: + if e.room_version.updated_redaction_rules: + d["redacts"] = e.redacts + else: + d["content"] = dict(d["content"]) + d["content"]["redacts"] = e.redacts only_event_fields = config.only_event_fields if only_event_fields: diff --git a/tests/rest/client/test_redactions.py b/tests/rest/client/test_redactions.py index 6028886bd6..180b635ea6 100644 --- a/tests/rest/client/test_redactions.py +++ b/tests/rest/client/test_redactions.py @@ -13,10 +13,12 @@ # limitations under the License. from typing import List, Optional +from parameterized import parameterized + from twisted.test.proto_helpers import MemoryReactor from synapse.api.constants import EventTypes, RelationTypes -from synapse.api.room_versions import RoomVersions +from synapse.api.room_versions import RoomVersion, RoomVersions from synapse.rest import admin from synapse.rest.client import login, room, sync from synapse.server import HomeServer @@ -569,50 +571,81 @@ class RedactionsTestCase(HomeserverTestCase): self.assertIn("body", event_dict["content"], event_dict) self.assertEqual("I'm in a thread!", event_dict["content"]["body"]) - def test_content_redaction(self) -> None: - """MSC2174 moved the redacts property to the content.""" + @parameterized.expand( + [ + # Tuples of: + # Room version + # Boolean: True if the redaction event content should include the event ID. + # Boolean: true if the resulting redaction event is expected to include the + # event ID in the content. + (RoomVersions.V10, False, False), + (RoomVersions.V11, True, True), + (RoomVersions.V11, False, True), + ] + ) + def test_redaction_content( + self, room_version: RoomVersion, include_content: bool, expect_content: bool + ) -> None: + """ + Room version 11 moved the redacts property to the content. + + Ensure that the event gets created properly and that the Client-Server + API servers the proper backwards-compatible version. + """ # Create a room with the newer room version. room_id = self.helper.create_room_as( self.mod_user_id, tok=self.mod_access_token, - room_version=RoomVersions.V11.identifier, + room_version=room_version.identifier, ) # Create an event. b = self.helper.send(room_id=room_id, tok=self.mod_access_token) event_id = b["event_id"] - # Attempt to redact it with a bogus event ID. - self._redact_event( + # Ensure the event ID in the URL and the content must match. + if include_content: + self._redact_event( + self.mod_access_token, + room_id, + event_id, + expect_code=400, + content={"redacts": "foo"}, + ) + + # Redact it for real. + result = self._redact_event( self.mod_access_token, room_id, event_id, - expect_code=400, - content={"redacts": "foo"}, + content={"redacts": event_id} if include_content else {}, ) - - # Redact it for real. - self._redact_event(self.mod_access_token, room_id, event_id) + redaction_event_id = result["event_id"] # Sync the room, to get the id of the create event timeline = self._sync_room_timeline(self.mod_access_token, room_id) redact_event = timeline[-1] self.assertEqual(redact_event["type"], EventTypes.Redaction) - # The redacts key should be in the content. + # The redacts key should be in the content and the redacts keys. self.assertEquals(redact_event["content"]["redacts"], event_id) - - # It should also be copied as the top-level redacts field for backwards - # compatibility. self.assertEquals(redact_event["redacts"], event_id) # But it isn't actually part of the event. def get_event(txn: LoggingTransaction) -> JsonDict: return db_to_json( - main_datastore._fetch_event_rows(txn, [event_id])[event_id].json + main_datastore._fetch_event_rows(txn, [redaction_event_id])[ + redaction_event_id + ].json ) main_datastore = self.hs.get_datastores().main event_json = self.get_success( main_datastore.db_pool.runInteraction("get_event", get_event) ) - self.assertNotIn("redacts", event_json) + self.assertEquals(event_json["type"], EventTypes.Redaction) + if expect_content: + self.assertNotIn("redacts", event_json) + self.assertEquals(event_json["content"]["redacts"], event_id) + else: + self.assertEquals(event_json["redacts"], event_id) + self.assertNotIn("redacts", event_json["content"]) -- cgit 1.5.1 From f0a860908ba0309c89c9dba452d99b4f9c6928f7 Mon Sep 17 00:00:00 2001 From: Mathieu Velten Date: Thu, 3 Aug 2023 20:36:55 +0200 Subject: Allow config of the backoff algorithm for the federation client. (#15754) Adds three new configuration variables: * destination_min_retry_interval is identical to before (10mn). * destination_retry_multiplier is now 2 instead of 5, the maximum value will be reached slower. * destination_max_retry_interval is one day instead of (essentially) infinity. Capping this will cause destinations to continue to be retried sometimes instead of being lost forever. The previous value was 2 ^ 62 milliseconds. --- changelog.d/15754.misc | 1 + docs/usage/configuration/config_documentation.md | 11 +++++++++ synapse/config/federation.py | 18 +++++++++++++++ synapse/util/retryutils.py | 29 +++++++++++++----------- tests/storage/test_transactions.py | 9 ++++++-- tests/util/test_retryutils.py | 22 +++++++++--------- 6 files changed, 64 insertions(+), 26 deletions(-) create mode 100644 changelog.d/15754.misc (limited to 'tests') diff --git a/changelog.d/15754.misc b/changelog.d/15754.misc new file mode 100644 index 0000000000..4314d415a3 --- /dev/null +++ b/changelog.d/15754.misc @@ -0,0 +1 @@ +Allow for the configuration of the backoff algorithm for federation destinations. diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index 4e6fcd085a..c32608da2b 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -1242,6 +1242,14 @@ like sending a federation transaction. * `max_short_retries`: maximum number of retries for the short retry algo. Default to 3 attempts. * `max_long_retries`: maximum number of retries for the long retry algo. Default to 10 attempts. +The following options control the retry logic when communicating with a specific homeserver destination. +Unlike the previous configuration options, these values apply across all requests +for a given destination and the state of the backoff is stored in the database. + +* `destination_min_retry_interval`: the initial backoff, after the first request fails. Defaults to 10m. +* `destination_retry_multiplier`: how much we multiply the backoff by after each subsequent fail. Defaults to 2. +* `destination_max_retry_interval`: a cap on the backoff. Defaults to a week. + Example configuration: ```yaml federation: @@ -1250,6 +1258,9 @@ federation: max_long_retry_delay: 100s max_short_retries: 5 max_long_retries: 20 + destination_min_retry_interval: 30s + destination_retry_multiplier: 5 + destination_max_retry_interval: 12h ``` --- ## Caching diff --git a/synapse/config/federation.py b/synapse/config/federation.py index 0e1cb8b6e3..97636039b8 100644 --- a/synapse/config/federation.py +++ b/synapse/config/federation.py @@ -65,5 +65,23 @@ class FederationConfig(Config): self.max_long_retries = federation_config.get("max_long_retries", 10) self.max_short_retries = federation_config.get("max_short_retries", 3) + # Allow for the configuration of the backoff algorithm used + # when trying to reach an unavailable destination. + # Unlike previous configuration those values applies across + # multiple requests and the state of the backoff is stored on DB. + self.destination_min_retry_interval_ms = Config.parse_duration( + federation_config.get("destination_min_retry_interval", "10m") + ) + self.destination_retry_multiplier = federation_config.get( + "destination_retry_multiplier", 2 + ) + self.destination_max_retry_interval_ms = min( + Config.parse_duration( + federation_config.get("destination_max_retry_interval", "7d") + ), + # Set a hard-limit to not overflow the database column. + 2**62, + ) + _METRICS_FOR_DOMAINS_SCHEMA = {"type": "array", "items": {"type": "string"}} diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index dcc037b982..27e9fc976c 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -27,15 +27,6 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -# the initial backoff, after the first transaction fails -MIN_RETRY_INTERVAL = 10 * 60 * 1000 - -# how much we multiply the backoff by after each subsequent fail -RETRY_MULTIPLIER = 5 - -# a cap on the backoff. (Essentially none) -MAX_RETRY_INTERVAL = 2**62 - class NotRetryingDestination(Exception): def __init__(self, retry_last_ts: int, retry_interval: int, destination: str): @@ -169,6 +160,16 @@ class RetryDestinationLimiter: self.notifier = notifier self.replication_client = replication_client + self.destination_min_retry_interval_ms = ( + self.store.hs.config.federation.destination_min_retry_interval_ms + ) + self.destination_retry_multiplier = ( + self.store.hs.config.federation.destination_retry_multiplier + ) + self.destination_max_retry_interval_ms = ( + self.store.hs.config.federation.destination_max_retry_interval_ms + ) + def __enter__(self) -> None: pass @@ -220,13 +221,15 @@ class RetryDestinationLimiter: # We couldn't connect. if self.retry_interval: self.retry_interval = int( - self.retry_interval * RETRY_MULTIPLIER * random.uniform(0.8, 1.4) + self.retry_interval + * self.destination_retry_multiplier + * random.uniform(0.8, 1.4) ) - if self.retry_interval >= MAX_RETRY_INTERVAL: - self.retry_interval = MAX_RETRY_INTERVAL + if self.retry_interval >= self.destination_max_retry_interval_ms: + self.retry_interval = self.destination_max_retry_interval_ms else: - self.retry_interval = MIN_RETRY_INTERVAL + self.retry_interval = self.destination_min_retry_interval_ms logger.info( "Connection to %s was unsuccessful (%s(%s)); backoff now %i", diff --git a/tests/storage/test_transactions.py b/tests/storage/test_transactions.py index 2fab84a529..ef06b50dbb 100644 --- a/tests/storage/test_transactions.py +++ b/tests/storage/test_transactions.py @@ -17,7 +17,6 @@ from twisted.test.proto_helpers import MemoryReactor from synapse.server import HomeServer from synapse.storage.databases.main.transactions import DestinationRetryTimings from synapse.util import Clock -from synapse.util.retryutils import MAX_RETRY_INTERVAL from tests.unittest import HomeserverTestCase @@ -57,8 +56,14 @@ class TransactionStoreTestCase(HomeserverTestCase): self.get_success(d) def test_large_destination_retry(self) -> None: + max_retry_interval_ms = ( + self.hs.config.federation.destination_max_retry_interval_ms + ) d = self.store.set_destination_retry_timings( - "example.com", MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL, MAX_RETRY_INTERVAL + "example.com", + max_retry_interval_ms, + max_retry_interval_ms, + max_retry_interval_ms, ) self.get_success(d) diff --git a/tests/util/test_retryutils.py b/tests/util/test_retryutils.py index 5f8f4e76b5..1277e1a865 100644 --- a/tests/util/test_retryutils.py +++ b/tests/util/test_retryutils.py @@ -11,12 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from synapse.util.retryutils import ( - MIN_RETRY_INTERVAL, - RETRY_MULTIPLIER, - NotRetryingDestination, - get_retry_limiter, -) +from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter from tests.unittest import HomeserverTestCase @@ -42,6 +37,11 @@ class RetryLimiterTestCase(HomeserverTestCase): limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store)) + min_retry_interval_ms = ( + self.hs.config.federation.destination_min_retry_interval_ms + ) + retry_multiplier = self.hs.config.federation.destination_retry_multiplier + self.pump(1) try: with limiter: @@ -57,7 +57,7 @@ class RetryLimiterTestCase(HomeserverTestCase): assert new_timings is not None self.assertEqual(new_timings.failure_ts, failure_ts) self.assertEqual(new_timings.retry_last_ts, failure_ts) - self.assertEqual(new_timings.retry_interval, MIN_RETRY_INTERVAL) + self.assertEqual(new_timings.retry_interval, min_retry_interval_ms) # now if we try again we should get a failure self.get_failure( @@ -68,7 +68,7 @@ class RetryLimiterTestCase(HomeserverTestCase): # advance the clock and try again # - self.pump(MIN_RETRY_INTERVAL) + self.pump(min_retry_interval_ms) limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store)) self.pump(1) @@ -87,16 +87,16 @@ class RetryLimiterTestCase(HomeserverTestCase): self.assertEqual(new_timings.failure_ts, failure_ts) self.assertEqual(new_timings.retry_last_ts, retry_ts) self.assertGreaterEqual( - new_timings.retry_interval, MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 0.5 + new_timings.retry_interval, min_retry_interval_ms * retry_multiplier * 0.5 ) self.assertLessEqual( - new_timings.retry_interval, MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0 + new_timings.retry_interval, min_retry_interval_ms * retry_multiplier * 2.0 ) # # one more go, with success # - self.reactor.advance(MIN_RETRY_INTERVAL * RETRY_MULTIPLIER * 2.0) + self.reactor.advance(min_retry_interval_ms * retry_multiplier * 2.0) limiter = self.get_success(get_retry_limiter("test_dest", self.clock, store)) self.pump(1) -- cgit 1.5.1 From 0a5f4f766514b84aff84ff17dffd5301a437c797 Mon Sep 17 00:00:00 2001 From: Shay Date: Thu, 3 Aug 2023 11:43:51 -0700 Subject: Move support for application service query parameter authorization behind a configuration option (#16017) --- changelog.d/16017.removal | 1 + docs/upgrade.md | 16 ++++- docs/usage/configuration/config_documentation.md | 14 ++++ synapse/appservice/api.py | 34 +++++++--- synapse/config/appservice.py | 8 +++ tests/appservice/test_api.py | 85 ++++++++++++++++++++++-- 6 files changed, 144 insertions(+), 14 deletions(-) create mode 100644 changelog.d/16017.removal (limited to 'tests') diff --git a/changelog.d/16017.removal b/changelog.d/16017.removal new file mode 100644 index 0000000000..6b72442892 --- /dev/null +++ b/changelog.d/16017.removal @@ -0,0 +1 @@ +Move support for application service query parameter authorization behind a configuration option. diff --git a/docs/upgrade.md b/docs/upgrade.md index 5dde6c769e..f50a279e98 100644 --- a/docs/upgrade.md +++ b/docs/upgrade.md @@ -88,6 +88,21 @@ process, for example: dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb ``` +# Upgrading to v1.90.0 + +## App service query parameter authorization is now a configuration option + +Synapse v1.81.0 deprecated application service authorization via query parameters as this is +considered insecure - and from Synapse v1.71.0 forwards the application service token has also been sent via +[the `Authorization` header](https://spec.matrix.org/v1.6/application-service-api/#authorization)], making the insecure +query parameter authorization redundant. Since removing the ability to continue to use query parameters could break +backwards compatibility it has now been put behind a configuration option, `use_appservice_legacy_authorization`. +This option defaults to false, but can be activated by adding +```yaml +use_appservice_legacy_authorization: true +``` +to your configuration. + # Upgrading to v1.89.0 ## Removal of unspecced `user` property for `/register` @@ -97,7 +112,6 @@ The standard `username` property should be used instead. See the [Application Service specification](https://spec.matrix.org/v1.7/application-service-api/#server-admin-style-permissions) for more information. - # Upgrading to v1.88.0 ## Minimum supported Python version diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md index c32608da2b..2987c9332d 100644 --- a/docs/usage/configuration/config_documentation.md +++ b/docs/usage/configuration/config_documentation.md @@ -2848,6 +2848,20 @@ Example configuration: ```yaml track_appservice_user_ips: true ``` +--- +### `use_appservice_legacy_authorization` + +Whether to send the application service access tokens via the `access_token` query parameter +per older versions of the Matrix specification. Defaults to false. Set to true to enable sending +access tokens via a query parameter. + +**Enabling this option is considered insecure and is not recommended. ** + +Example configuration: +```yaml +use_appservice_legacy_authorization: true +``` + --- ### `macaroon_secret_key` diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py index 359999f680..de7a94bf26 100644 --- a/synapse/appservice/api.py +++ b/synapse/appservice/api.py @@ -16,7 +16,6 @@ import logging import urllib.parse from typing import ( TYPE_CHECKING, - Any, Dict, Iterable, List, @@ -25,6 +24,7 @@ from typing import ( Sequence, Tuple, TypeVar, + Union, ) from prometheus_client import Counter @@ -119,6 +119,7 @@ class ApplicationServiceApi(SimpleHttpClient): def __init__(self, hs: "HomeServer"): super().__init__(hs) self.clock = hs.get_clock() + self.config = hs.config.appservice self.protocol_meta_cache: ResponseCache[Tuple[str, str]] = ResponseCache( hs.get_clock(), "as_protocol_meta", timeout_ms=HOUR_IN_MS @@ -132,9 +133,12 @@ class ApplicationServiceApi(SimpleHttpClient): assert service.hs_token is not None try: + args = None + if self.config.use_appservice_legacy_authorization: + args = {"access_token": service.hs_token} response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/users/{urllib.parse.quote(user_id)}", - {"access_token": service.hs_token}, + args, headers={"Authorization": [f"Bearer {service.hs_token}"]}, ) if response is not None: # just an empty json object @@ -155,9 +159,12 @@ class ApplicationServiceApi(SimpleHttpClient): assert service.hs_token is not None try: + args = None + if self.config.use_appservice_legacy_authorization: + args = {"access_token": service.hs_token} response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/rooms/{urllib.parse.quote(alias)}", - {"access_token": service.hs_token}, + args, headers={"Authorization": [f"Bearer {service.hs_token}"]}, ) if response is not None: # just an empty json object @@ -190,10 +197,12 @@ class ApplicationServiceApi(SimpleHttpClient): assert service.hs_token is not None try: - args: Mapping[Any, Any] = { - **fields, - b"access_token": service.hs_token, - } + args: Mapping[bytes, Union[List[bytes], str]] = fields + if self.config.use_appservice_legacy_authorization: + args = { + **fields, + b"access_token": service.hs_token, + } response = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/{kind}/{urllib.parse.quote(protocol)}", args=args, @@ -231,9 +240,12 @@ class ApplicationServiceApi(SimpleHttpClient): # This is required by the configuration. assert service.hs_token is not None try: + args = None + if self.config.use_appservice_legacy_authorization: + args = {"access_token": service.hs_token} info = await self.get_json( f"{service.url}{APP_SERVICE_PREFIX}/thirdparty/protocol/{urllib.parse.quote(protocol)}", - {"access_token": service.hs_token}, + args, headers={"Authorization": [f"Bearer {service.hs_token}"]}, ) @@ -344,10 +356,14 @@ class ApplicationServiceApi(SimpleHttpClient): } try: + args = None + if self.config.use_appservice_legacy_authorization: + args = {"access_token": service.hs_token} + await self.put_json( f"{service.url}{APP_SERVICE_PREFIX}/transactions/{urllib.parse.quote(str(txn_id))}", json_body=body, - args={"access_token": service.hs_token}, + args=args, headers={"Authorization": [f"Bearer {service.hs_token}"]}, ) if logger.isEnabledFor(logging.DEBUG): diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py index c2710fdf04..919f81a9b7 100644 --- a/synapse/config/appservice.py +++ b/synapse/config/appservice.py @@ -43,6 +43,14 @@ class AppServiceConfig(Config): ) self.track_appservice_user_ips = config.get("track_appservice_user_ips", False) + self.use_appservice_legacy_authorization = config.get( + "use_appservice_legacy_authorization", False + ) + if self.use_appservice_legacy_authorization: + logger.warning( + "The use of appservice legacy authorization via query params is deprecated" + " and should be considered insecure." + ) def load_appservices( diff --git a/tests/appservice/test_api.py b/tests/appservice/test_api.py index 807dc2f21c..3c635e3dcb 100644 --- a/tests/appservice/test_api.py +++ b/tests/appservice/test_api.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from typing import Any, List, Mapping, Sequence, Union +from typing import Any, List, Mapping, Optional, Sequence, Union from unittest.mock import Mock from twisted.test.proto_helpers import MemoryReactor @@ -22,6 +22,7 @@ from synapse.types import JsonDict from synapse.util import Clock from tests import unittest +from tests.unittest import override_config PROTOCOL = "myproto" TOKEN = "myastoken" @@ -39,7 +40,7 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): hs_token=TOKEN, ) - def test_query_3pe_authenticates_token(self) -> None: + def test_query_3pe_authenticates_token_via_header(self) -> None: """ Tests that 3pe queries to the appservice are authenticated with the appservice's token. @@ -74,12 +75,88 @@ class ApplicationServiceApiTestCase(unittest.HomeserverTestCase): args: Mapping[Any, Any], headers: Mapping[Union[str, bytes], Sequence[Union[str, bytes]]], ) -> List[JsonDict]: - # Ensure the access token is passed as both a header and query arg. - if not headers.get("Authorization") or not args.get(b"access_token"): + # Ensure the access token is passed as a header. + if not headers or not headers.get("Authorization"): raise RuntimeError("Access token not provided") + # ... and not as a query param + if b"access_token" in args: + raise RuntimeError( + "Access token should not be passed as a query param." + ) self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) + self.request_url = url + if url == URL_USER: + return SUCCESS_RESULT_USER + elif url == URL_LOCATION: + return SUCCESS_RESULT_LOCATION + else: + raise RuntimeError( + "URL provided was invalid. This should never be seen." + ) + + # We assign to a method, which mypy doesn't like. + self.api.get_json = Mock(side_effect=get_json) # type: ignore[assignment] + + result = self.get_success( + self.api.query_3pe(self.service, "user", PROTOCOL, {b"some": [b"field"]}) + ) + self.assertEqual(self.request_url, URL_USER) + self.assertEqual(result, SUCCESS_RESULT_USER) + result = self.get_success( + self.api.query_3pe( + self.service, "location", PROTOCOL, {b"some": [b"field"]} + ) + ) + self.assertEqual(self.request_url, URL_LOCATION) + self.assertEqual(result, SUCCESS_RESULT_LOCATION) + + @override_config({"use_appservice_legacy_authorization": True}) + def test_query_3pe_authenticates_token_via_param(self) -> None: + """ + Tests that 3pe queries to the appservice are authenticated + with the appservice's token. + """ + + SUCCESS_RESULT_USER = [ + { + "protocol": PROTOCOL, + "userid": "@a:user", + "fields": { + "more": "fields", + }, + } + ] + SUCCESS_RESULT_LOCATION = [ + { + "protocol": PROTOCOL, + "alias": "#a:room", + "fields": { + "more": "fields", + }, + } + ] + + URL_USER = f"{URL}/_matrix/app/v1/thirdparty/user/{PROTOCOL}" + URL_LOCATION = f"{URL}/_matrix/app/v1/thirdparty/location/{PROTOCOL}" + + self.request_url = None + + async def get_json( + url: str, + args: Mapping[Any, Any], + headers: Optional[ + Mapping[Union[str, bytes], Sequence[Union[str, bytes]]] + ] = None, + ) -> List[JsonDict]: + # Ensure the access token is passed as a both a query param and in the headers. + if not args.get(b"access_token"): + raise RuntimeError("Access token should be provided in query params.") + if not headers or not headers.get("Authorization"): + raise RuntimeError("Access token should be provided in auth headers.") + self.assertEqual(args.get(b"access_token"), TOKEN) + self.assertEqual(headers.get("Authorization"), [f"Bearer {TOKEN}"]) self.request_url = url if url == URL_USER: return SUCCESS_RESULT_USER -- cgit 1.5.1 From 84ae2e3f6fb86115df767bb2f1fb16ac2fbaa7c3 Mon Sep 17 00:00:00 2001 From: Shay Date: Fri, 4 Aug 2023 10:49:54 -0700 Subject: Fix deletion for Dehydrated Devices (#16046) --- changelog.d/16046.bugfix | 1 + synapse/handlers/device.py | 16 +++++ synapse/rest/client/devices.py | 14 ++-- tests/rest/client/test_devices.py | 139 +++++++++++++++++++++++++++++++++++++- 4 files changed, 165 insertions(+), 5 deletions(-) create mode 100644 changelog.d/16046.bugfix (limited to 'tests') diff --git a/changelog.d/16046.bugfix b/changelog.d/16046.bugfix new file mode 100644 index 0000000000..ce5a9ae4b5 --- /dev/null +++ b/changelog.d/16046.bugfix @@ -0,0 +1 @@ +Fix deletion in dehydrated devices v2. diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index f3a713f5fa..b7bf70a72d 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -722,6 +722,22 @@ class DeviceHandler(DeviceWorkerHandler): return {"success": True} + async def delete_dehydrated_device(self, user_id: str, device_id: str) -> None: + """ + Delete a stored dehydrated device. + + Args: + user_id: the user_id to delete the device from + device_id: id of the dehydrated device to delete + """ + success = await self.store.remove_dehydrated_device(user_id, device_id) + + if not success: + raise errors.NotFoundError() + + await self.delete_devices(user_id, [device_id]) + await self.store.delete_e2e_keys_by_device(user_id=user_id, device_id=device_id) + @wrap_as_background_process("_handle_new_device_update_async") async def _handle_new_device_update_async(self) -> None: """Called when we have a new local device list update that we need to diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py index 690d2ec406..dd3f7fd666 100644 --- a/synapse/rest/client/devices.py +++ b/synapse/rest/client/devices.py @@ -513,10 +513,8 @@ class DehydratedDeviceV2Servlet(RestServlet): if dehydrated_device is not None: (device_id, device_data) = dehydrated_device - result = await self.device_handler.rehydrate_device( - requester.user.to_string(), - self.auth.get_access_token_from_request(request), - device_id, + await self.device_handler.delete_dehydrated_device( + requester.user.to_string(), device_id ) result = {"device_id": device_id} @@ -538,6 +536,14 @@ class DehydratedDeviceV2Servlet(RestServlet): requester = await self.auth.get_user_by_req(request) user_id = requester.user.to_string() + old_dehydrated_device = await self.device_handler.get_dehydrated_device(user_id) + + # if an old device exists, delete it before creating a new one + if old_dehydrated_device: + await self.device_handler.delete_dehydrated_device( + user_id, old_dehydrated_device[0] + ) + device_info = submission.dict() if "device_keys" not in device_info.keys(): raise SynapseError( diff --git a/tests/rest/client/test_devices.py b/tests/rest/client/test_devices.py index b7d420cfec..3cf29c10ea 100644 --- a/tests/rest/client/test_devices.py +++ b/tests/rest/client/test_devices.py @@ -379,4 +379,141 @@ class DehydratedDeviceTestCase(unittest.HomeserverTestCase): access_token=token, shorthand=False, ) - self.assertEqual(channel.code, 404) + self.assertEqual(channel.code, 401) + + @unittest.override_config( + {"experimental_features": {"msc2697_enabled": False, "msc3814_enabled": True}} + ) + def test_msc3814_dehydrated_device_delete_works(self) -> None: + user = self.register_user("mikey", "pass") + token = self.login(user, "pass", device_id="device1") + content: JsonDict = { + "device_data": { + "algorithm": "m.dehydration.v1.olm", + }, + "device_id": "device2", + "initial_device_display_name": "foo bar", + "device_keys": { + "user_id": "@mikey:test", + "device_id": "device2", + "valid_until_ts": "80", + "algorithms": [ + "m.olm.curve25519-aes-sha2", + ], + "keys": { + ":": "", + }, + "signatures": { + "": {":": ""} + }, + }, + } + channel = self.make_request( + "PUT", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + device_id = channel.json_body.get("device_id") + assert device_id is not None + self.assertIsInstance(device_id, str) + self.assertEqual("device2", device_id) + + # ensure that keys were uploaded and available + channel = self.make_request( + "POST", + "/_matrix/client/r0/keys/query", + { + "device_keys": { + user: ["device2"], + }, + }, + token, + ) + self.assertEqual( + channel.json_body["device_keys"][user]["device2"]["keys"], + { + ":": "", + }, + ) + + # delete the dehydrated device + channel = self.make_request( + "DELETE", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + + # ensure that keys are no longer available for deleted device + channel = self.make_request( + "POST", + "/_matrix/client/r0/keys/query", + { + "device_keys": { + user: ["device2"], + }, + }, + token, + ) + self.assertEqual(channel.json_body["device_keys"], {"@mikey:test": {}}) + + # check that an old device is deleted when user PUTs a new device + # First, create a device + content["device_id"] = "device3" + content["device_keys"]["device_id"] = "device3" + channel = self.make_request( + "PUT", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + device_id = channel.json_body.get("device_id") + assert device_id is not None + self.assertIsInstance(device_id, str) + self.assertEqual("device3", device_id) + + # create a second device without deleting first device + content["device_id"] = "device4" + content["device_keys"]["device_id"] = "device4" + channel = self.make_request( + "PUT", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + content=content, + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + device_id = channel.json_body.get("device_id") + assert device_id is not None + self.assertIsInstance(device_id, str) + self.assertEqual("device4", device_id) + + # check that the second device that was created is what is returned when we GET + channel = self.make_request( + "GET", + "_matrix/client/unstable/org.matrix.msc3814.v1/dehydrated_device", + access_token=token, + shorthand=False, + ) + self.assertEqual(channel.code, 200) + returned_device_id = channel.json_body["device_id"] + self.assertEqual(returned_device_id, "device4") + + # and that if we query the keys for the first device they are not there + channel = self.make_request( + "POST", + "/_matrix/client/r0/keys/query", + { + "device_keys": { + user: ["device3"], + }, + }, + token, + ) + self.assertEqual(channel.json_body["device_keys"], {"@mikey:test": {}}) -- cgit 1.5.1