From 58f830511486271da72543dd20676b702bc52b2f Mon Sep 17 00:00:00 2001 From: Anshul Madnawat <100751856+anshulm333@users.noreply.github.com> Date: Thu, 27 Jul 2023 00:15:47 +0530 Subject: Inline SQL queries using boolean parameters (#15525) SQLite now supports TRUE and FALSE constants, simplify some queries by inlining those instead of passing them as arguments. --- synapse/storage/databases/main/event_federation.py | 3 +-- synapse/storage/databases/main/events.py | 12 ++++++------ synapse/storage/databases/main/purge_events.py | 9 ++++----- synapse/storage/databases/main/push_rule.py | 6 +++--- synapse/storage/databases/main/registration.py | 4 ++-- synapse/storage/databases/main/room.py | 10 +++++----- synapse/storage/databases/main/stream.py | 4 ++-- 7 files changed, 23 insertions(+), 25 deletions(-) (limited to 'synapse/storage/databases') diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index b2cda52ce5..534dc32413 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -843,7 +843,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas * because the schema change is in a background update, it's not * necessarily safe to assume that it will have been completed. */ - AND edge.is_state is ? /* False */ + AND edge.is_state is FALSE /** * We only want backwards extremities that are older than or at * the same position of the given `current_depth` (where older @@ -886,7 +886,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas sql, ( room_id, - False, current_depth, self._clock.time_msec(), BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 2b83a69426..bd3f14fb71 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1455,8 +1455,8 @@ class PersistEventsStore: }, ) - sql = "UPDATE events SET outlier = ? WHERE event_id = ?" - txn.execute(sql, (False, event.event_id)) + sql = "UPDATE events SET outlier = FALSE WHERE event_id = ?" + txn.execute(sql, (event.event_id,)) # Update the event_backward_extremities table now that this # event isn't an outlier any more. @@ -1549,13 +1549,13 @@ class PersistEventsStore: for event, _ in events_and_contexts if not event.internal_metadata.is_redacted() ] - sql = "UPDATE redactions SET have_censored = ? WHERE " + sql = "UPDATE redactions SET have_censored = FALSE WHERE " clause, args = make_in_list_sql_clause( self.database_engine, "redacts", unredacted_events, ) - txn.execute(sql + clause, [False] + args) + txn.execute(sql + clause, args) self.db_pool.simple_insert_many_txn( txn, @@ -2318,14 +2318,14 @@ class PersistEventsStore: " SELECT 1 FROM events" " LEFT JOIN event_edges edge" " ON edge.event_id = events.event_id" - " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = ? OR edge.event_id IS NULL)" + " WHERE events.event_id = ? AND events.room_id = ? AND (events.outlier = FALSE OR edge.event_id IS NULL)" " )" ) txn.execute_batch( query, [ - (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id, False) + (e_id, ev.room_id, e_id, ev.room_id, e_id, ev.room_id) for ev in events for e_id in ev.prev_event_ids() if not ev.internal_metadata.is_outlier() diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 9773c1fcd2..b52f48cf04 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -249,12 +249,11 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): # Mark all state and own events as outliers logger.info("[purge] marking remaining events as outliers") txn.execute( - "UPDATE events SET outlier = ?" + "UPDATE events SET outlier = TRUE" " WHERE event_id IN (" - " SELECT event_id FROM events_to_purge " - " WHERE NOT should_delete" - ")", - (True,), + " SELECT event_id FROM events_to_purge " + " WHERE NOT should_delete" + ")" ) # synapse tries to take out an exclusive lock on room_depth whenever it diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index e098ceea3c..c13c0bc7d7 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -560,19 +560,19 @@ class PushRuleStore(PushRulesWorkerStore): if isinstance(self.database_engine, PostgresEngine): sql = """ INSERT INTO push_rules_enable (id, user_name, rule_id, enabled) - VALUES (?, ?, ?, ?) + VALUES (?, ?, ?, 1) ON CONFLICT DO NOTHING """ elif isinstance(self.database_engine, Sqlite3Engine): sql = """ INSERT OR IGNORE INTO push_rules_enable (id, user_name, rule_id, enabled) - VALUES (?, ?, ?, ?) + VALUES (?, ?, ?, 1) """ else: raise RuntimeError("Unknown database engine") new_enable_id = self._push_rules_enable_id_gen.get_next() - txn.execute(sql, (new_enable_id, user_id, rule_id, 1)) + txn.execute(sql, (new_enable_id, user_id, rule_id)) async def delete_push_rule(self, user_id: str, rule_id: str) -> None: """ diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py index 676d03bb7e..c582cf0573 100644 --- a/synapse/storage/databases/main/registration.py +++ b/synapse/storage/databases/main/registration.py @@ -454,9 +454,9 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore): ) -> List[Tuple[str, int]]: sql = ( "SELECT user_id, expiration_ts_ms FROM account_validity" - " WHERE email_sent = ? AND (expiration_ts_ms - ?) <= ?" + " WHERE email_sent = FALSE AND (expiration_ts_ms - ?) <= ?" ) - values = [False, now_ms, renew_at] + values = [now_ms, renew_at] txn.execute(sql, values) return cast(List[Tuple[str, int]], txn.fetchall()) diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 830658f328..719e11aea6 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -936,11 +936,11 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): JOIN event_json USING (room_id, event_id) WHERE room_id = ? %(where_clause)s - AND contains_url = ? AND outlier = ? + AND contains_url = TRUE AND outlier = FALSE ORDER BY stream_ordering DESC LIMIT ? """ - txn.execute(sql % {"where_clause": ""}, (room_id, True, False, 100)) + txn.execute(sql % {"where_clause": ""}, (room_id, 100)) local_media_mxcs = [] remote_media_mxcs = [] @@ -976,7 +976,7 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): txn.execute( sql % {"where_clause": "AND stream_ordering < ?"}, - (room_id, next_token, True, False, 100), + (room_id, next_token, 100), ) return local_media_mxcs, remote_media_mxcs @@ -1086,9 +1086,9 @@ class RoomWorkerStore(CacheInvalidationWorkerStore): # set quarantine if quarantined_by is not None: - sql += "AND safe_from_quarantine = ?" + sql += "AND safe_from_quarantine = FALSE" txn.executemany( - sql, [(quarantined_by, media_id, False) for media_id in local_mxcs] + sql, [(quarantined_by, media_id) for media_id in local_mxcs] ) # remove from quarantine else: diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py index 92cbe262a6..5a3611c415 100644 --- a/synapse/storage/databases/main/stream.py +++ b/synapse/storage/databases/main/stream.py @@ -1401,7 +1401,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): `to_token`), or `limit` is zero. """ - args = [False, room_id] + args: List[Any] = [room_id] order, from_bound, to_bound = generate_pagination_bounds( direction, from_token, to_token @@ -1475,7 +1475,7 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore): event.topological_ordering, event.stream_ordering FROM events AS event %(join_clause)s - WHERE event.outlier = ? AND event.room_id = ? AND %(bounds)s + WHERE event.outlier = FALSE AND event.room_id = ? AND %(bounds)s ORDER BY event.topological_ordering %(order)s, event.stream_ordering %(order)s LIMIT ? """ % { -- cgit 1.5.1 From ae55cc1e6bc6527d0e359a823c474f5c9ed4382e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 31 Jul 2023 10:58:03 +0100 Subject: Add ability to wait for locks and add locks to purge history / room deletion (#15791) c.f. #13476 --- changelog.d/15791.bugfix | 1 + synapse/federation/federation_server.py | 17 +- synapse/handlers/message.py | 38 ++- synapse/handlers/pagination.py | 23 +- synapse/handlers/room_member.py | 45 +-- synapse/handlers/worker_lock.py | 333 +++++++++++++++++++++++ synapse/notifier.py | 16 ++ synapse/replication/tcp/commands.py | 33 +++ synapse/replication/tcp/handler.py | 22 ++ synapse/rest/client/room_upgrade_rest_servlet.py | 11 +- synapse/server.py | 5 + synapse/storage/controllers/persist_events.py | 27 +- synapse/storage/databases/main/lock.py | 190 ++++++++----- tests/handlers/test_worker_lock.py | 74 +++++ tests/rest/client/test_rooms.py | 4 +- tests/storage/databases/main/test_lock.py | 52 ++++ 16 files changed, 783 insertions(+), 108 deletions(-) create mode 100644 changelog.d/15791.bugfix create mode 100644 synapse/handlers/worker_lock.py create mode 100644 tests/handlers/test_worker_lock.py (limited to 'synapse/storage/databases') diff --git a/changelog.d/15791.bugfix b/changelog.d/15791.bugfix new file mode 100644 index 0000000000..182634b62f --- /dev/null +++ b/changelog.d/15791.bugfix @@ -0,0 +1 @@ +Fix bug where purging history and paginating simultaneously could lead to database corruption when using workers. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index fa61dd8c10..a90d99c4d6 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -63,6 +63,7 @@ from synapse.federation.federation_base import ( ) from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( make_deferred_yieldable, @@ -137,6 +138,7 @@ class FederationServer(FederationBase): self._event_auth_handler = hs.get_event_auth_handler() self._room_member_handler = hs.get_room_member_handler() self._e2e_keys_handler = hs.get_e2e_keys_handler() + self._worker_lock_handler = hs.get_worker_locks_handler() self._state_storage_controller = hs.get_storage_controllers().state @@ -1236,9 +1238,18 @@ class FederationServer(FederationBase): logger.info("handling received PDU in room %s: %s", room_id, event) try: with nested_logging_context(event.event_id): - await self._federation_event_handler.on_receive_pdu( - origin, event - ) + # We're taking out a lock within a lock, which could + # lead to deadlocks if we're not careful. However, it is + # safe on this occasion as we only ever take a write + # lock when deleting a room, which we would never do + # while holding the `_INBOUND_EVENT_HANDLING_LOCK_NAME` + # lock. + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + await self._federation_event_handler.on_receive_pdu( + origin, event + ) except FederationError as e: # XXX: Ideally we'd inform the remote we failed to process # the event, but we can't return an error in the transaction diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fff0b5fa12..187dedae7d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -53,6 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -485,6 +486,7 @@ class EventCreationHandler: self._events_shard_config = self.config.worker.events_shard_config self._instance_name = hs.get_instance_name() self._notifier = hs.get_notifier() + self._worker_lock_handler = hs.get_worker_locks_handler() self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state @@ -1010,6 +1012,37 @@ class EventCreationHandler: event.internal_metadata.stream_ordering, ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + return await self._create_and_send_nonmember_event_locked( + requester=requester, + event_dict=event_dict, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + ratelimit=ratelimit, + txn_id=txn_id, + ignore_shadow_ban=ignore_shadow_ban, + outlier=outlier, + depth=depth, + ) + + async def _create_and_send_nonmember_event_locked( + self, + requester: Requester, + event_dict: dict, + allow_no_prev_events: bool = False, + prev_event_ids: Optional[List[str]] = None, + state_event_ids: Optional[List[str]] = None, + ratelimit: bool = True, + txn_id: Optional[str] = None, + ignore_shadow_ban: bool = False, + outlier: bool = False, + depth: Optional[int] = None, + ) -> Tuple[EventBase, int]: + room_id = event_dict["room_id"] + # If we don't have any prev event IDs specified then we need to # check that the host is in the room (as otherwise populating the # prev events will fail), at which point we may as well check the @@ -1923,7 +1956,10 @@ class EventCreationHandler: ) for room_id in room_ids: - dummy_event_sent = await self._send_dummy_event_for_room(room_id) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + dummy_event_sent = await self._send_dummy_event_for_room(room_id) if not dummy_event_sent: # Did not find a valid user in the room, so remove from future attempts diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 19b8728db9..da34658470 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -46,6 +46,11 @@ logger = logging.getLogger(__name__) BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3 +PURGE_HISTORY_LOCK_NAME = "purge_history_lock" + +DELETE_ROOM_LOCK_NAME = "delete_room_lock" + + @attr.s(slots=True, auto_attribs=True) class PurgeStatus: """Object tracking the status of a purge request @@ -142,6 +147,7 @@ class PaginationHandler: self._server_name = hs.hostname self._room_shutdown_handler = hs.get_room_shutdown_handler() self._relations_handler = hs.get_relations_handler() + self._worker_locks = hs.get_worker_locks_handler() self.pagination_lock = ReadWriteLock() # IDs of rooms in which there currently an active purge *or delete* operation. @@ -356,7 +362,9 @@ class PaginationHandler: """ self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=True + ): await self._storage_controllers.purge_events.purge_history( room_id, token, delete_local_events ) @@ -412,7 +420,10 @@ class PaginationHandler: room_id: room to be purged force: set true to skip checking for joined users. """ - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_multi_read_write_lock( + [(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)], + write=True, + ): # first check that we have no users in this room if not force: joined = await self.store.is_host_joined(room_id, self._server_name) @@ -471,7 +482,9 @@ class PaginationHandler: room_token = from_token.room_key - async with self.pagination_lock.read(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=False + ): (membership, member_event_id) = (None, None) if not use_admin_priviledge: ( @@ -747,7 +760,9 @@ class PaginationHandler: self._purges_in_progress_by_room.add(room_id) try: - async with self.pagination_lock.write(room_id): + async with self._worker_locks.acquire_read_write_lock( + PURGE_HISTORY_LOCK_NAME, room_id, write=True + ): self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN self._delete_by_id[ delete_id diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 496e701f13..6cca2ec344 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -39,6 +39,7 @@ from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging import opentracing from synapse.metrics import event_processing_positions from synapse.metrics.background_process_metrics import run_as_background_process @@ -94,6 +95,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.event_creation_handler = hs.get_event_creation_handler() self.account_data_handler = hs.get_account_data_handler() self.event_auth_handler = hs.get_event_auth_handler() + self._worker_lock_handler = hs.get_worker_locks_handler() self.member_linearizer: Linearizer = Linearizer(name="member") self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter") @@ -638,26 +640,29 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # by application services), and then by room ID. async with self.member_as_limiter.queue(as_id): async with self.member_linearizer.queue(key): - with opentracing.start_active_span("update_membership_locked"): - result = await self.update_membership_locked( - requester, - target, - room_id, - action, - txn_id=txn_id, - remote_room_hosts=remote_room_hosts, - third_party_signed=third_party_signed, - ratelimit=ratelimit, - content=content, - new_room=new_room, - require_consent=require_consent, - outlier=outlier, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - depth=depth, - origin_server_ts=origin_server_ts, - ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + with opentracing.start_active_span("update_membership_locked"): + result = await self.update_membership_locked( + requester, + target, + room_id, + action, + txn_id=txn_id, + remote_room_hosts=remote_room_hosts, + third_party_signed=third_party_signed, + ratelimit=ratelimit, + content=content, + new_room=new_room, + require_consent=require_consent, + outlier=outlier, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + depth=depth, + origin_server_ts=origin_server_ts, + ) return result diff --git a/synapse/handlers/worker_lock.py b/synapse/handlers/worker_lock.py new file mode 100644 index 0000000000..72df773a86 --- /dev/null +++ b/synapse/handlers/worker_lock.py @@ -0,0 +1,333 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import random +from types import TracebackType +from typing import ( + TYPE_CHECKING, + AsyncContextManager, + Collection, + Dict, + Optional, + Tuple, + Type, + Union, +) +from weakref import WeakSet + +import attr + +from twisted.internet import defer +from twisted.internet.interfaces import IReactorTime + +from synapse.logging.context import PreserveLoggingContext +from synapse.logging.opentracing import start_active_span +from synapse.metrics.background_process_metrics import wrap_as_background_process +from synapse.storage.databases.main.lock import Lock, LockStore +from synapse.util.async_helpers import timeout_deferred + +if TYPE_CHECKING: + from synapse.logging.opentracing import opentracing + from synapse.server import HomeServer + + +DELETE_ROOM_LOCK_NAME = "delete_room_lock" + + +class WorkerLocksHandler: + """A class for waiting on taking out locks, rather than using the storage + functions directly (which don't support awaiting). + """ + + def __init__(self, hs: "HomeServer") -> None: + self._reactor = hs.get_reactor() + self._store = hs.get_datastores().main + self._clock = hs.get_clock() + self._notifier = hs.get_notifier() + self._instance_name = hs.get_instance_name() + + # Map from lock name/key to set of `WaitingLock` that are active for + # that lock. + self._locks: Dict[ + Tuple[str, str], WeakSet[Union[WaitingLock, WaitingMultiLock]] + ] = {} + + self._clock.looping_call(self._cleanup_locks, 30_000) + + self._notifier.add_lock_released_callback(self._on_lock_released) + + def acquire_lock(self, lock_name: str, lock_key: str) -> "WaitingLock": + """Acquire a standard lock, returns a context manager that will block + until the lock is acquired. + + Note: Care must be taken to avoid deadlocks. In particular, this + function does *not* timeout. + + Usage: + async with handler.acquire_lock(name, key): + # Do work while holding the lock... + """ + + lock = WaitingLock( + reactor=self._reactor, + store=self._store, + handler=self, + lock_name=lock_name, + lock_key=lock_key, + write=None, + ) + + self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock) + + return lock + + def acquire_read_write_lock( + self, + lock_name: str, + lock_key: str, + *, + write: bool, + ) -> "WaitingLock": + """Acquire a read/write lock, returns a context manager that will block + until the lock is acquired. + + Note: Care must be taken to avoid deadlocks. In particular, this + function does *not* timeout. + + Usage: + async with handler.acquire_read_write_lock(name, key, write=True): + # Do work while holding the lock... + """ + + lock = WaitingLock( + reactor=self._reactor, + store=self._store, + handler=self, + lock_name=lock_name, + lock_key=lock_key, + write=write, + ) + + self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock) + + return lock + + def acquire_multi_read_write_lock( + self, + lock_names: Collection[Tuple[str, str]], + *, + write: bool, + ) -> "WaitingMultiLock": + """Acquires multi read/write locks at once, returns a context manager + that will block until all the locks are acquired. + + This will try and acquire all locks at once, and will never hold on to a + subset of the locks. (This avoids accidentally creating deadlocks). + + Note: Care must be taken to avoid deadlocks. In particular, this + function does *not* timeout. + """ + + lock = WaitingMultiLock( + lock_names=lock_names, + write=write, + reactor=self._reactor, + store=self._store, + handler=self, + ) + + for lock_name, lock_key in lock_names: + self._locks.setdefault((lock_name, lock_key), WeakSet()).add(lock) + + return lock + + def notify_lock_released(self, lock_name: str, lock_key: str) -> None: + """Notify that a lock has been released. + + Pokes both the notifier and replication. + """ + + self._notifier.notify_lock_released(self._instance_name, lock_name, lock_key) + + def _on_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: + """Called when a lock has been released. + + Wakes up any locks that might be waiting on this. + """ + locks = self._locks.get((lock_name, lock_key)) + if not locks: + return + + def _wake_deferred(deferred: defer.Deferred) -> None: + if not deferred.called: + deferred.callback(None) + + for lock in locks: + self._clock.call_later(0, _wake_deferred, lock.deferred) + + @wrap_as_background_process("_cleanup_locks") + async def _cleanup_locks(self) -> None: + """Periodically cleans out stale entries in the locks map""" + self._locks = {key: value for key, value in self._locks.items() if value} + + +@attr.s(auto_attribs=True, eq=False) +class WaitingLock: + reactor: IReactorTime + store: LockStore + handler: WorkerLocksHandler + lock_name: str + lock_key: str + write: Optional[bool] + deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) + _inner_lock: Optional[Lock] = None + _retry_interval: float = 0.1 + _lock_span: "opentracing.Scope" = attr.Factory( + lambda: start_active_span("WaitingLock.lock") + ) + + async def __aenter__(self) -> None: + self._lock_span.__enter__() + + with start_active_span("WaitingLock.waiting_for_lock"): + while self._inner_lock is None: + self.deferred = defer.Deferred() + + if self.write is not None: + lock = await self.store.try_acquire_read_write_lock( + self.lock_name, self.lock_key, write=self.write + ) + else: + lock = await self.store.try_acquire_lock( + self.lock_name, self.lock_key + ) + + if lock: + self._inner_lock = lock + break + + try: + # Wait until the we get notified the lock might have been + # released (by the deferred being resolved). We also + # periodically wake up in case the lock was released but we + # weren't notified. + with PreserveLoggingContext(): + await timeout_deferred( + deferred=self.deferred, + timeout=self._get_next_retry_interval(), + reactor=self.reactor, + ) + except Exception: + pass + + return await self._inner_lock.__aenter__() + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> Optional[bool]: + assert self._inner_lock + + self.handler.notify_lock_released(self.lock_name, self.lock_key) + + try: + r = await self._inner_lock.__aexit__(exc_type, exc, tb) + finally: + self._lock_span.__exit__(exc_type, exc, tb) + + return r + + def _get_next_retry_interval(self) -> float: + next = self._retry_interval + self._retry_interval = max(5, next * 2) + return next * random.uniform(0.9, 1.1) + + +@attr.s(auto_attribs=True, eq=False) +class WaitingMultiLock: + lock_names: Collection[Tuple[str, str]] + + write: bool + + reactor: IReactorTime + store: LockStore + handler: WorkerLocksHandler + + deferred: "defer.Deferred[None]" = attr.Factory(defer.Deferred) + + _inner_lock_cm: Optional[AsyncContextManager] = None + _retry_interval: float = 0.1 + _lock_span: "opentracing.Scope" = attr.Factory( + lambda: start_active_span("WaitingLock.lock") + ) + + async def __aenter__(self) -> None: + self._lock_span.__enter__() + + with start_active_span("WaitingLock.waiting_for_lock"): + while self._inner_lock_cm is None: + self.deferred = defer.Deferred() + + lock_cm = await self.store.try_acquire_multi_read_write_lock( + self.lock_names, write=self.write + ) + + if lock_cm: + self._inner_lock_cm = lock_cm + break + + try: + # Wait until the we get notified the lock might have been + # released (by the deferred being resolved). We also + # periodically wake up in case the lock was released but we + # weren't notified. + with PreserveLoggingContext(): + await timeout_deferred( + deferred=self.deferred, + timeout=self._get_next_retry_interval(), + reactor=self.reactor, + ) + except Exception: + pass + + assert self._inner_lock_cm + await self._inner_lock_cm.__aenter__() + return + + async def __aexit__( + self, + exc_type: Optional[Type[BaseException]], + exc: Optional[BaseException], + tb: Optional[TracebackType], + ) -> Optional[bool]: + assert self._inner_lock_cm + + for lock_name, lock_key in self.lock_names: + self.handler.notify_lock_released(lock_name, lock_key) + + try: + r = await self._inner_lock_cm.__aexit__(exc_type, exc, tb) + finally: + self._lock_span.__exit__(exc_type, exc, tb) + + return r + + def _get_next_retry_interval(self) -> float: + next = self._retry_interval + self._retry_interval = max(5, next * 2) + return next * random.uniform(0.9, 1.1) diff --git a/synapse/notifier.py b/synapse/notifier.py index 897272ad5b..68115bca70 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -234,6 +234,9 @@ class Notifier: self._third_party_rules = hs.get_module_api_callbacks().third_party_event_rules + # List of callbacks to be notified when a lock is released + self._lock_released_callback: List[Callable[[str, str, str], None]] = [] + self.clock = hs.get_clock() self.appservice_handler = hs.get_application_service_handler() self._pusher_pool = hs.get_pusherpool() @@ -785,6 +788,19 @@ class Notifier: # that any in flight requests can be immediately retried. self._federation_client.wake_destination(server) + def add_lock_released_callback( + self, callback: Callable[[str, str, str], None] + ) -> None: + """Add a function to be called whenever we are notified about a released lock.""" + self._lock_released_callback.append(callback) + + def notify_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: + """Notify the callbacks that a lock has been released.""" + for cb in self._lock_released_callback: + cb(instance_name, lock_name, lock_key) + @attr.s(auto_attribs=True) class ReplicationNotifier: diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 32f52e54d8..10f5c98ff8 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -422,6 +422,36 @@ class RemoteServerUpCommand(_SimpleCommand): NAME = "REMOTE_SERVER_UP" +class LockReleasedCommand(Command): + """Sent to inform other instances that a given lock has been dropped. + + Format:: + + LOCK_RELEASED ["", "", ""] + """ + + NAME = "LOCK_RELEASED" + + def __init__( + self, + instance_name: str, + lock_name: str, + lock_key: str, + ): + self.instance_name = instance_name + self.lock_name = lock_name + self.lock_key = lock_key + + @classmethod + def from_line(cls: Type["LockReleasedCommand"], line: str) -> "LockReleasedCommand": + instance_name, lock_name, lock_key = json_decoder.decode(line) + + return cls(instance_name, lock_name, lock_key) + + def to_line(self) -> str: + return json_encoder.encode([self.instance_name, self.lock_name, self.lock_key]) + + _COMMANDS: Tuple[Type[Command], ...] = ( ServerCommand, RdataCommand, @@ -435,6 +465,7 @@ _COMMANDS: Tuple[Type[Command], ...] = ( UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, + LockReleasedCommand, ) # Map of command name to command type. @@ -448,6 +479,7 @@ VALID_SERVER_COMMANDS = ( ErrorCommand.NAME, PingCommand.NAME, RemoteServerUpCommand.NAME, + LockReleasedCommand.NAME, ) # The commands the client is allowed to send @@ -461,6 +493,7 @@ VALID_CLIENT_COMMANDS = ( UserIpCommand.NAME, ErrorCommand.NAME, RemoteServerUpCommand.NAME, + LockReleasedCommand.NAME, ) diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 5d108fe11b..a2cabba7b1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -39,6 +39,7 @@ from synapse.replication.tcp.commands import ( ClearUserSyncsCommand, Command, FederationAckCommand, + LockReleasedCommand, PositionCommand, RdataCommand, RemoteServerUpCommand, @@ -248,6 +249,9 @@ class ReplicationCommandHandler: if self._is_master or self._should_insert_client_ips: self.subscribe_to_channel("USER_IP") + if hs.config.redis.redis_enabled: + self._notifier.add_lock_released_callback(self.on_lock_released) + def subscribe_to_channel(self, channel_name: str) -> None: """ Indicates that we wish to subscribe to a Redis channel by name. @@ -648,6 +652,17 @@ class ReplicationCommandHandler: self._notifier.notify_remote_server_up(cmd.data) + def on_LOCK_RELEASED( + self, conn: IReplicationConnection, cmd: LockReleasedCommand + ) -> None: + """Called when we get a new LOCK_RELEASED command.""" + if cmd.instance_name == self._instance_name: + return + + self._notifier.notify_lock_released( + cmd.instance_name, cmd.lock_name, cmd.lock_key + ) + def new_connection(self, connection: IReplicationConnection) -> None: """Called when we have a new connection.""" self._connections.append(connection) @@ -754,6 +769,13 @@ class ReplicationCommandHandler: """ self.send_command(RdataCommand(stream_name, self._instance_name, token, data)) + def on_lock_released( + self, instance_name: str, lock_name: str, lock_key: str + ) -> None: + """Called when we released a lock and should notify other instances.""" + if instance_name == self._instance_name: + self.send_command(LockReleasedCommand(instance_name, lock_name, lock_key)) + UpdateToken = TypeVar("UpdateToken") UpdateRow = TypeVar("UpdateRow") diff --git a/synapse/rest/client/room_upgrade_rest_servlet.py b/synapse/rest/client/room_upgrade_rest_servlet.py index 6a7792e18b..4a5d9e13e7 100644 --- a/synapse/rest/client/room_upgrade_rest_servlet.py +++ b/synapse/rest/client/room_upgrade_rest_servlet.py @@ -17,6 +17,7 @@ from typing import TYPE_CHECKING, Tuple from synapse.api.errors import Codes, ShadowBanError, SynapseError from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, @@ -60,6 +61,7 @@ class RoomUpgradeRestServlet(RestServlet): self._hs = hs self._room_creation_handler = hs.get_room_creation_handler() self._auth = hs.get_auth() + self._worker_lock_handler = hs.get_worker_locks_handler() async def on_POST( self, request: SynapseRequest, room_id: str @@ -78,9 +80,12 @@ class RoomUpgradeRestServlet(RestServlet): ) try: - new_room_id = await self._room_creation_handler.upgrade_room( - requester, room_id, new_version - ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + new_room_id = await self._room_creation_handler.upgrade_room( + requester, room_id, new_version + ) except ShadowBanError: # Generate a random room ID. new_room_id = stringutils.random_string(18) diff --git a/synapse/server.py b/synapse/server.py index b72b76a38b..8430f99ef2 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -107,6 +107,7 @@ from synapse.handlers.stats import StatsHandler from synapse.handlers.sync import SyncHandler from synapse.handlers.typing import FollowerTypingHandler, TypingWriterHandler from synapse.handlers.user_directory import UserDirectoryHandler +from synapse.handlers.worker_lock import WorkerLocksHandler from synapse.http.client import ( InsecureInterceptableContextFactory, ReplicationClient, @@ -912,3 +913,7 @@ class HomeServer(metaclass=abc.ABCMeta): def get_common_usage_metrics_manager(self) -> CommonUsageMetricsManager: """Usage metrics shared between phone home stats and the prometheus exporter.""" return CommonUsageMetricsManager(self) + + @cache_in_self + def get_worker_locks_handler(self) -> WorkerLocksHandler: + return WorkerLocksHandler(self) diff --git a/synapse/storage/controllers/persist_events.py b/synapse/storage/controllers/persist_events.py index 35c0680365..35cd1089d6 100644 --- a/synapse/storage/controllers/persist_events.py +++ b/synapse/storage/controllers/persist_events.py @@ -45,6 +45,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.events import EventBase from synapse.events.snapshot import EventContext +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable from synapse.logging.opentracing import ( SynapseTags, @@ -338,6 +339,7 @@ class EventsPersistenceStorageController: ) self._state_resolution_handler = hs.get_state_resolution_handler() self._state_controller = state_controller + self.hs = hs async def _process_event_persist_queue_task( self, @@ -350,15 +352,22 @@ class EventsPersistenceStorageController: A dictionary of event ID to event ID we didn't persist as we already had another event persisted with the same TXN ID. """ - if isinstance(task, _PersistEventsTask): - return await self._persist_event_batch(room_id, task) - elif isinstance(task, _UpdateCurrentStateTask): - await self._update_current_state(room_id, task) - return {} - else: - raise AssertionError( - f"Found an unexpected task type in event persistence queue: {task}" - ) + + # Ensure that the room can't be deleted while we're persisting events to + # it. We might already have taken out the lock, but since this is just a + # "read" lock its inherently reentrant. + async with self.hs.get_worker_locks_handler().acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + if isinstance(task, _PersistEventsTask): + return await self._persist_event_batch(room_id, task) + elif isinstance(task, _UpdateCurrentStateTask): + await self._update_current_state(room_id, task) + return {} + else: + raise AssertionError( + f"Found an unexpected task type in event persistence queue: {task}" + ) @trace async def persist_events( diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py index c89b4f7919..1680bf6168 100644 --- a/synapse/storage/databases/main/lock.py +++ b/synapse/storage/databases/main/lock.py @@ -12,8 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from contextlib import AsyncExitStack from types import TracebackType -from typing import TYPE_CHECKING, Optional, Set, Tuple, Type +from typing import TYPE_CHECKING, Collection, Optional, Set, Tuple, Type from weakref import WeakValueDictionary from twisted.internet.interfaces import IReactorCore @@ -208,76 +209,85 @@ class LockStore(SQLBaseStore): used (otherwise the lock will leak). """ + try: + lock = await self.db_pool.runInteraction( + "try_acquire_read_write_lock", + self._try_acquire_read_write_lock_txn, + lock_name, + lock_key, + write, + ) + except self.database_engine.module.IntegrityError: + return None + + return lock + + def _try_acquire_read_write_lock_txn( + self, + txn: LoggingTransaction, + lock_name: str, + lock_key: str, + write: bool, + ) -> "Lock": + # We attempt to acquire the lock by inserting into + # `worker_read_write_locks` and seeing if that fails any + # constraints. If it doesn't then we have acquired the lock, + # otherwise we haven't. + # + # Before that though we clear the table of any stale locks. + now = self._clock.time_msec() token = random_string(6) - def _try_acquire_read_write_lock_txn(txn: LoggingTransaction) -> None: - # We attempt to acquire the lock by inserting into - # `worker_read_write_locks` and seeing if that fails any - # constraints. If it doesn't then we have acquired the lock, - # otherwise we haven't. - # - # Before that though we clear the table of any stale locks. - - delete_sql = """ - DELETE FROM worker_read_write_locks - WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; - """ - - insert_sql = """ - INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) - VALUES (?, ?, ?, ?, ?, ?) - """ - - if isinstance(self.database_engine, PostgresEngine): - # For Postgres we can send these queries at the same time. - txn.execute( - delete_sql + ";" + insert_sql, - ( - # DELETE args - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - # UPSERT args - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) - else: - # For SQLite these need to be two queries. - txn.execute( - delete_sql, - ( - now - _LOCK_TIMEOUT_MS, - lock_name, - lock_key, - ), - ) - txn.execute( - insert_sql, - ( - lock_name, - lock_key, - write, - self._instance_name, - token, - now, - ), - ) + delete_sql = """ + DELETE FROM worker_read_write_locks + WHERE last_renewed_ts < ? AND lock_name = ? AND lock_key = ?; + """ - return + insert_sql = """ + INSERT INTO worker_read_write_locks (lock_name, lock_key, write_lock, instance_name, token, last_renewed_ts) + VALUES (?, ?, ?, ?, ?, ?) + """ - try: - await self.db_pool.runInteraction( - "try_acquire_read_write_lock", - _try_acquire_read_write_lock_txn, + if isinstance(self.database_engine, PostgresEngine): + # For Postgres we can send these queries at the same time. + txn.execute( + delete_sql + ";" + insert_sql, + ( + # DELETE args + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + # UPSERT args + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), + ) + else: + # For SQLite these need to be two queries. + txn.execute( + delete_sql, + ( + now - _LOCK_TIMEOUT_MS, + lock_name, + lock_key, + ), + ) + txn.execute( + insert_sql, + ( + lock_name, + lock_key, + write, + self._instance_name, + token, + now, + ), ) - except self.database_engine.module.IntegrityError: - return None lock = Lock( self._reactor, @@ -289,10 +299,58 @@ class LockStore(SQLBaseStore): token=token, ) - self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock + def set_lock() -> None: + self._live_read_write_lock_tokens[(lock_name, lock_key, token)] = lock + + txn.call_after(set_lock) return lock + async def try_acquire_multi_read_write_lock( + self, + lock_names: Collection[Tuple[str, str]], + write: bool, + ) -> Optional[AsyncExitStack]: + """Try to acquire multiple locks for the given names/keys. Will return + an async context manager if the locks are successfully acquired, which + *must* be used (otherwise the lock will leak). + + If only a subset of the locks can be acquired then it will immediately + drop them and return `None`. + """ + try: + locks = await self.db_pool.runInteraction( + "try_acquire_multi_read_write_lock", + self._try_acquire_multi_read_write_lock_txn, + lock_names, + write, + ) + except self.database_engine.module.IntegrityError: + return None + + stack = AsyncExitStack() + + for lock in locks: + await stack.enter_async_context(lock) + + return stack + + def _try_acquire_multi_read_write_lock_txn( + self, + txn: LoggingTransaction, + lock_names: Collection[Tuple[str, str]], + write: bool, + ) -> Collection["Lock"]: + locks = [] + + for lock_name, lock_key in lock_names: + lock = self._try_acquire_read_write_lock_txn( + txn, lock_name, lock_key, write + ) + locks.append(lock) + + return locks + class Lock: """An async context manager that manages an acquired lock, ensuring it is diff --git a/tests/handlers/test_worker_lock.py b/tests/handlers/test_worker_lock.py new file mode 100644 index 0000000000..73e548726c --- /dev/null +++ b/tests/handlers/test_worker_lock.py @@ -0,0 +1,74 @@ +# Copyright 2023 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer +from twisted.test.proto_helpers import MemoryReactor + +from synapse.server import HomeServer +from synapse.util import Clock + +from tests import unittest +from tests.replication._base import BaseMultiWorkerStreamTestCase + + +class WorkerLockTestCase(unittest.HomeserverTestCase): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self.worker_lock_handler = self.hs.get_worker_locks_handler() + + def test_wait_for_lock_locally(self) -> None: + """Test waiting for a lock on a single worker""" + + lock1 = self.worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) + + lock2 = self.worker_lock_handler.acquire_lock("name", "key") + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None)) + + +class WorkerLockWorkersTestCase(BaseMultiWorkerStreamTestCase): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: + self.main_worker_lock_handler = self.hs.get_worker_locks_handler() + + def test_wait_for_lock_worker(self) -> None: + """Test waiting for a lock on another worker""" + + worker = self.make_worker_hs( + "synapse.app.generic_worker", + extra_config={ + "redis": {"enabled": True}, + }, + ) + worker_lock_handler = worker.get_worker_locks_handler() + + lock1 = self.main_worker_lock_handler.acquire_lock("name", "key") + self.get_success(lock1.__aenter__()) + + lock2 = worker_lock_handler.acquire_lock("name", "key") + d2 = defer.ensureDeferred(lock2.__aenter__()) + self.assertNoResult(d2) + + self.get_success(lock1.__aexit__(None, None, None)) + + self.get_success(d2) + self.get_success(lock2.__aexit__(None, None, None)) diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py index d013e75d55..4f6347be15 100644 --- a/tests/rest/client/test_rooms.py +++ b/tests/rest/client/test_rooms.py @@ -711,7 +711,7 @@ class RoomsCreateTestCase(RoomBase): self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(30, channel.resource_usage.db_txn_count) + self.assertEqual(32, channel.resource_usage.db_txn_count) def test_post_room_initial_state(self) -> None: # POST with initial_state config key, expect new room id @@ -724,7 +724,7 @@ class RoomsCreateTestCase(RoomBase): self.assertEqual(HTTPStatus.OK, channel.code, channel.result) self.assertTrue("room_id" in channel.json_body) assert channel.resource_usage is not None - self.assertEqual(32, channel.resource_usage.db_txn_count) + self.assertEqual(34, channel.resource_usage.db_txn_count) def test_post_room_visibility_key(self) -> None: # POST with visibility config key, expect new room id diff --git a/tests/storage/databases/main/test_lock.py b/tests/storage/databases/main/test_lock.py index ad454f6dd8..383da83dfb 100644 --- a/tests/storage/databases/main/test_lock.py +++ b/tests/storage/databases/main/test_lock.py @@ -448,3 +448,55 @@ class ReadWriteLockTestCase(unittest.HomeserverTestCase): self.get_success(self.store._on_shutdown()) self.assertEqual(self.store._live_read_write_lock_tokens, {}) + + def test_acquire_multiple_locks(self) -> None: + """Tests that acquiring multiple locks at once works.""" + + # Take out multiple locks and ensure that we can't get those locks out + # again. + lock = self.get_success( + self.store.try_acquire_multi_read_write_lock( + [("name1", "key1"), ("name2", "key2")], write=True + ) + ) + self.assertIsNotNone(lock) + + assert lock is not None + self.get_success(lock.__aenter__()) + + lock2 = self.get_success( + self.store.try_acquire_read_write_lock("name1", "key1", write=True) + ) + self.assertIsNone(lock2) + + lock3 = self.get_success( + self.store.try_acquire_read_write_lock("name2", "key2", write=False) + ) + self.assertIsNone(lock3) + + # Overlapping locks attempts will fail, and won't lock any locks. + lock4 = self.get_success( + self.store.try_acquire_multi_read_write_lock( + [("name1", "key1"), ("name3", "key3")], write=True + ) + ) + self.assertIsNone(lock4) + + lock5 = self.get_success( + self.store.try_acquire_read_write_lock("name3", "key3", write=True) + ) + self.assertIsNotNone(lock5) + assert lock5 is not None + self.get_success(lock5.__aenter__()) + self.get_success(lock5.__aexit__(None, None, None)) + + # Once we release the lock we can take out the locks again. + self.get_success(lock.__aexit__(None, None, None)) + + lock6 = self.get_success( + self.store.try_acquire_read_write_lock("name1", "key1", write=True) + ) + self.assertIsNotNone(lock6) + assert lock6 is not None + self.get_success(lock6.__aenter__()) + self.get_success(lock6.__aexit__(None, None, None)) -- cgit 1.5.1 From d98a43d9226cbb4b9ab5ad3abd9b630548c2f09f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 4 Aug 2023 07:47:18 -0400 Subject: Stabilize support for MSC3970: updated transaction semantics (scope to `device_id`) (#15629) For now this maintains compatible with old Synapses by falling back to using transaction semantics on a per-access token. A future version of Synapse will drop support for this. --- changelog.d/15629.feature | 1 + synapse/config/experimental.py | 9 ------- synapse/events/utils.py | 42 ++++++++++++++++---------------- synapse/handlers/message.py | 12 ++++----- synapse/rest/client/transactions.py | 12 ++++----- synapse/server.py | 4 +-- synapse/storage/databases/main/events.py | 15 +++++------- synapse/storage/schema/__init__.py | 5 +++- synapse/types/__init__.py | 7 +++--- 9 files changed, 48 insertions(+), 59 deletions(-) create mode 100644 changelog.d/15629.feature (limited to 'synapse/storage/databases') diff --git a/changelog.d/15629.feature b/changelog.d/15629.feature new file mode 100644 index 0000000000..16264effca --- /dev/null +++ b/changelog.d/15629.feature @@ -0,0 +1 @@ +Scope transaction IDs to devices (implement [MSC3970](https://github.com/matrix-org/matrix-spec-proposals/pull/3970)). diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 1695ed8ca3..ac9449b18f 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -216,12 +216,6 @@ class MSC3861: ("session_lifetime",), ) - if not root.experimental.msc3970_enabled: - raise ConfigError( - "experimental_features.msc3970_enabled must be 'true' when OAuth delegation is enabled", - ("experimental_features", "msc3970_enabled"), - ) - @attr.s(auto_attribs=True, frozen=True, slots=True) class MSC3866Config: @@ -397,9 +391,6 @@ class ExperimentalConfig(Config): "Invalid MSC3861 configuration", ("experimental", "msc3861") ) from exc - # MSC3970: Scope transaction IDs to devices - self.msc3970_enabled = experimental.get("msc3970_enabled", self.msc3861.enabled) - # Check that none of the other config options conflict with MSC3861 when enabled self.msc3861.check_config_conflicts(self.root) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 967a6c245b..52acb21955 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -394,7 +394,6 @@ def serialize_event( time_now_ms: int, *, config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG, - msc3970_enabled: bool = False, ) -> JsonDict: """Serialize event for clients @@ -402,8 +401,6 @@ def serialize_event( e time_now_ms config: Event serialization config - msc3970_enabled: Whether MSC3970 is enabled. It changes whether we should - include the `transaction_id` in the event's `unsigned` section. Returns: The serialized event dictionary. @@ -429,38 +426,46 @@ def serialize_event( e.unsigned["redacted_because"], time_now_ms, config=config, - msc3970_enabled=msc3970_enabled, ) # If we have a txn_id saved in the internal_metadata, we should include it in the # unsigned section of the event if it was sent by the same session as the one # requesting the event. txn_id: Optional[str] = getattr(e.internal_metadata, "txn_id", None) - if txn_id is not None and config.requester is not None: - # For the MSC3970 rules to be applied, we *need* to have the device ID in the - # event internal metadata. Since we were not recording them before, if it hasn't - # been recorded, we fallback to the old behaviour. + if ( + txn_id is not None + and config.requester is not None + and config.requester.user.to_string() == e.sender + ): + # Some events do not have the device ID stored in the internal metadata, + # this includes old events as well as those created by appservice, guests, + # or with tokens minted with the admin API. For those events, fallback + # to using the access token instead. event_device_id: Optional[str] = getattr(e.internal_metadata, "device_id", None) - if msc3970_enabled and event_device_id is not None: + if event_device_id is not None: if event_device_id == config.requester.device_id: d["unsigned"]["transaction_id"] = txn_id else: - # The pre-MSC3970 behaviour is to only include the transaction ID if the - # event was sent from the same access token. For regular users, we can use - # the access token ID to determine this. For guests, we can't, but since - # each guest only has one access token, we can just check that the event was - # sent by the same user as the one requesting the event. + # Fallback behaviour: only include the transaction ID if the event + # was sent from the same access token. + # + # For regular users, the access token ID can be used to determine this. + # This includes access tokens minted with the admin API. + # + # For guests and appservice users, we can't check the access token ID + # so assume it is the same session. event_token_id: Optional[int] = getattr( e.internal_metadata, "token_id", None ) - if config.requester.user.to_string() == e.sender and ( + if ( ( event_token_id is not None and config.requester.access_token_id is not None and event_token_id == config.requester.access_token_id ) or config.requester.is_guest + or config.requester.app_service ): d["unsigned"]["transaction_id"] = txn_id @@ -504,9 +509,6 @@ class EventClientSerializer: clients. """ - def __init__(self, *, msc3970_enabled: bool = False): - self._msc3970_enabled = msc3970_enabled - def serialize_event( self, event: Union[JsonDict, EventBase], @@ -531,9 +533,7 @@ class EventClientSerializer: if not isinstance(event, EventBase): return event - serialized_event = serialize_event( - event, time_now, config=config, msc3970_enabled=self._msc3970_enabled - ) + serialized_event = serialize_event(event, time_now, config=config) # Check if there are any bundled aggregations to include with the event. if bundle_aggregations: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c656e07d37..d485f21e49 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -561,8 +561,6 @@ class EventCreationHandler: expiry_ms=30 * 60 * 1000, ) - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - async def create_event( self, requester: Requester, @@ -897,9 +895,8 @@ class EventCreationHandler: """ existing_event_id = None - if self._msc3970_enabled and requester.device_id: - # When MSC3970 is enabled, we lookup for events sent by the same device first, - # and fallback to the old behaviour if none were found. + # According to the spec, transactions are scoped to a user's device ID. + if requester.device_id: existing_event_id = ( await self.store.get_event_id_from_transaction_id_and_device_id( room_id, @@ -911,8 +908,9 @@ class EventCreationHandler: if existing_event_id: return existing_event_id - # Pre-MSC3970, we looked up for events that were sent by the same session by - # using the access token ID. + # Some requsters don't have device IDs (appservice, guests, and access + # tokens minted with the admin API), fallback to checking the access token + # ID, which should be close enough. if requester.access_token_id: existing_event_id = ( await self.store.get_event_id_from_transaction_id_and_token_id( diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 0d8a63d8be..3d814c404d 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -50,8 +50,6 @@ class HttpTransactionCache: # for at *LEAST* 30 mins, and at *MOST* 60 mins. self.cleaner = self.clock.looping_call(self._cleanup, CLEANUP_PERIOD_MS) - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable: """A helper function which returns a transaction key that can be used with TransactionCache for idempotent requests. @@ -78,18 +76,20 @@ class HttpTransactionCache: elif requester.app_service is not None: return (path, "appservice", requester.app_service.id) - # With MSC3970, we use the user ID and device ID as the transaction key - elif self._msc3970_enabled: + # Use the user ID and device ID as the transaction key. + elif requester.device_id: assert requester.user, "Requester must have a user" assert requester.device_id, "Requester must have a device_id" return (path, "user", requester.user, requester.device_id) - # Otherwise, the pre-MSC3970 behaviour is to use the access token ID + # Some requsters don't have device IDs, these are mostly handled above + # (appservice and guest users), but does not cover access tokens minted + # by the admin API. Use the access token ID instead. else: assert ( requester.access_token_id is not None ), "Requester must have an access_token_id" - return (path, "user", requester.access_token_id) + return (path, "user_admin", requester.access_token_id) def fetch_or_execute_request( self, diff --git a/synapse/server.py b/synapse/server.py index 8430f99ef2..e753ff0377 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -785,9 +785,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer( - msc3970_enabled=self.config.experimental.msc3970_enabled - ) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index bd3f14fb71..c1353b18c1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -127,8 +127,6 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - @trace async def _persist_events_and_state_updates( self, @@ -1012,9 +1010,11 @@ class PersistEventsStore: ) ) - # Pre-MSC3970, we rely on the access_token_id to scope the txn_id for events. - # Since this is an experimental flag, we still store the mapping even if the - # flag is disabled. + # Synapse usually relies on the device_id to scope transactions for events, + # except for users without device IDs (appservice, guests, and access + # tokens minted with the admin API) which use the access token ID instead. + # + # TODO https://github.com/matrix-org/synapse/issues/16042 if to_insert_token_id: self.db_pool.simple_insert_many_txn( txn, @@ -1030,10 +1030,7 @@ class PersistEventsStore: values=to_insert_token_id, ) - # With MSC3970, we rely on the device_id instead to scope the txn_id for events. - # We're only inserting if MSC3970 is *enabled*, because else the pre-MSC3970 - # behaviour would allow for a UNIQUE constraint violation on this table - if to_insert_device_id and self._msc3970_enabled: + if to_insert_device_id: self.db_pool.simple_insert_many_txn( txn, table="event_txn_id_device_id", diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index d3ec648f6d..7de9949a5b 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 79 # remember to update the list below when updating +SCHEMA_VERSION = 80 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -110,6 +110,9 @@ Changes in SCHEMA_VERSION = 78 Changes in SCHEMA_VERSION = 79 - Add tables to handle in DB read-write locks. - Add some mitigations for a painful race between foreground and background updates, cf #15677. + +Changes in SCHEMA_VERSION = 80 + - The event_txn_id_device_id is always written to for new events. """ diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index fdfd465c8d..39a1ae4ac3 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -117,11 +117,12 @@ class Requester: Attributes: user: id of the user making the request - access_token_id: *ID* of the access token used for this - request, or None if it came via the appservice API or similar + access_token_id: *ID* of the access token used for this request, or + None for appservices, guests, and tokens generated by the admin API is_guest: True if the user making this request is a guest user shadow_banned: True if the user making this request has been shadow-banned. - device_id: device_id which was set at authentication time + device_id: device_id which was set at authentication time, or + None for appservices, guests, and tokens generated by the admin API app_service: the AS requesting on behalf of the user authenticated_entity: The entity that authenticated when making the request. This is different to the user_id when an admin user or the server is -- cgit 1.5.1 From f3dc6dc19f902b638c164097342136010f0769d1 Mon Sep 17 00:00:00 2001 From: reivilibre Date: Tue, 8 Aug 2023 10:10:07 +0000 Subject: Remove old rows from the `cache_invalidation_stream_by_instance` table automatically. (This table is not used when Synapse is configured to use SQLite.) (#15868) * Add a cache invalidation clean-up task * Run the cache invalidation stream clean-up on the background worker * Tune down * call_later is in millis! * Newsfile Signed-off-by: Olivier Wilkinson (reivilibre) * fixup! Add a cache invalidation clean-up task * Update synapse/storage/databases/main/cache.py Co-authored-by: Eric Eastwood * Update synapse/storage/databases/main/cache.py Co-authored-by: Eric Eastwood * MILLISEC -> MS * Expand on comment * Move and tweak comment about Postgres * Use `wrap_as_background_process` --------- Signed-off-by: Olivier Wilkinson (reivilibre) Co-authored-by: Eric Eastwood --- changelog.d/15868.feature | 1 + synapse/storage/databases/main/cache.py | 130 ++++++++++++++++++++++++++++++++ 2 files changed, 131 insertions(+) create mode 100644 changelog.d/15868.feature (limited to 'synapse/storage/databases') diff --git a/changelog.d/15868.feature b/changelog.d/15868.feature new file mode 100644 index 0000000000..a866bf5774 --- /dev/null +++ b/changelog.d/15868.feature @@ -0,0 +1 @@ +Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py index c940f864d1..2fbd389c71 100644 --- a/synapse/storage/databases/main/cache.py +++ b/synapse/storage/databases/main/cache.py @@ -18,6 +18,8 @@ import logging from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple from synapse.api.constants import EventTypes +from synapse.config._base import Config +from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.replication.tcp.streams import BackfillStream, CachesStream from synapse.replication.tcp.streams.events import ( EventsStream, @@ -52,6 +54,21 @@ PURGE_HISTORY_CACHE_NAME = "ph_cache_fake" # As above, but for invalidating room caches on room deletion DELETE_ROOM_CACHE_NAME = "dr_cache_fake" +# How long between cache invalidation table cleanups, once we have caught up +# with the backlog. +REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h") + +# How long between cache invalidation table cleanups, before we have caught +# up with the backlog. +CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m") + +# Maximum number of cache invalidation rows to delete at once. +CLEAN_UP_MAX_BATCH_SIZE = 20_000 + +# Keep cache invalidations for 7 days +# (This is likely to be quite excessive.) +RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d") + class CacheInvalidationWorkerStore(SQLBaseStore): def __init__( @@ -98,6 +115,18 @@ class CacheInvalidationWorkerStore(SQLBaseStore): else: self._cache_id_gen = None + # Occasionally clean up the cache invalidations stream table by deleting + # old rows. + # This is only applicable when Postgres is in use; this table is unused + # and not populated at all when SQLite is the active database engine. + if hs.config.worker.run_background_tasks and isinstance( + self.database_engine, PostgresEngine + ): + self.hs.get_clock().call_later( + CATCH_UP_CLEANUP_INTERVAL_MS / 1000, + self._clean_up_cache_invalidation_wrapper, + ) + async def get_all_updated_caches( self, instance_name: str, last_id: int, current_id: int, limit: int ) -> Tuple[List[Tuple[int, tuple]], int, bool]: @@ -554,3 +583,104 @@ class CacheInvalidationWorkerStore(SQLBaseStore): return self._cache_id_gen.get_current_token_for_writer(instance_name) else: return 0 + + @wrap_as_background_process("clean_up_old_cache_invalidations") + async def _clean_up_cache_invalidation_wrapper(self) -> None: + """ + Clean up cache invalidation stream table entries occasionally. + If we are behind (i.e. there are entries old enough to + be deleted but too many of them to be deleted in one go), + then we run slightly more frequently. + """ + delete_up_to: int = ( + self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS + ) + + in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to) + + # Vary how long we wait before calling again depending on whether we + # are still sifting through backlog or we have caught up. + if in_backlog: + next_interval = CATCH_UP_CLEANUP_INTERVAL_MS + else: + next_interval = REGULAR_CLEANUP_INTERVAL_MS + + self.hs.get_clock().call_later( + next_interval / 1000, self._clean_up_cache_invalidation_wrapper + ) + + async def _clean_up_batch_of_old_cache_invalidations( + self, delete_up_to_millisec: int + ) -> bool: + """ + Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite). + + Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once. + + Returns true if and only if we were limited by batch size (i.e. we are in backlog: + there are more things to clean up). + """ + + def _clean_up_batch_of_old_cache_invalidations_txn( + txn: LoggingTransaction, + ) -> bool: + # First get the earliest stream ID + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + ORDER BY stream_id ASC + LIMIT 1 + """ + ) + row = txn.fetchone() + if row is None: + return False + earliest_stream_id: int = row[0] + + # Then find the last stream ID of the range we will delete + txn.execute( + """ + SELECT stream_id FROM cache_invalidation_stream_by_instance + WHERE stream_id <= ? AND invalidation_ts <= ? + ORDER BY stream_id DESC + LIMIT 1 + """, + (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec), + ) + row = txn.fetchone() + if row is None: + return False + cutoff_stream_id: int = row[0] + + # Determine whether we are caught up or still catching up + txn.execute( + """ + SELECT invalidation_ts FROM cache_invalidation_stream_by_instance + WHERE stream_id > ? + ORDER BY stream_id ASC + LIMIT 1 + """, + (cutoff_stream_id,), + ) + row = txn.fetchone() + if row is None: + in_backlog = False + else: + # We are in backlog if the next row could have been deleted + # if we didn't have such a small batch size + in_backlog = row[0] <= delete_up_to_millisec + + txn.execute( + """ + DELETE FROM cache_invalidation_stream_by_instance + WHERE ? <= stream_id AND stream_id <= ? + """, + (earliest_stream_id, cutoff_stream_id), + ) + + return in_backlog + + return await self.db_pool.runInteraction( + "clean_up_old_cache_invalidations", + _clean_up_batch_of_old_cache_invalidations_txn, + ) -- cgit 1.5.1