From ae55cc1e6bc6527d0e359a823c474f5c9ed4382e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 31 Jul 2023 10:58:03 +0100 Subject: Add ability to wait for locks and add locks to purge history / room deletion (#15791) c.f. #13476 --- synapse/handlers/message.py | 38 +++++++++++++++++++++++++++++++++++++- 1 file changed, 37 insertions(+), 1 deletion(-) (limited to 'synapse/handlers/message.py') diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index fff0b5fa12..187dedae7d 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -53,6 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -485,6 +486,7 @@ class EventCreationHandler: self._events_shard_config = self.config.worker.events_shard_config self._instance_name = hs.get_instance_name() self._notifier = hs.get_notifier() + self._worker_lock_handler = hs.get_worker_locks_handler() self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state @@ -1010,6 +1012,37 @@ class EventCreationHandler: event.internal_metadata.stream_ordering, ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + return await self._create_and_send_nonmember_event_locked( + requester=requester, + event_dict=event_dict, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + ratelimit=ratelimit, + txn_id=txn_id, + ignore_shadow_ban=ignore_shadow_ban, + outlier=outlier, + depth=depth, + ) + + async def _create_and_send_nonmember_event_locked( + self, + requester: Requester, + event_dict: dict, + allow_no_prev_events: bool = False, + prev_event_ids: Optional[List[str]] = None, + state_event_ids: Optional[List[str]] = None, + ratelimit: bool = True, + txn_id: Optional[str] = None, + ignore_shadow_ban: bool = False, + outlier: bool = False, + depth: Optional[int] = None, + ) -> Tuple[EventBase, int]: + room_id = event_dict["room_id"] + # If we don't have any prev event IDs specified then we need to # check that the host is in the room (as otherwise populating the # prev events will fail), at which point we may as well check the @@ -1923,7 +1956,10 @@ class EventCreationHandler: ) for room_id in room_ids: - dummy_event_sent = await self._send_dummy_event_for_room(room_id) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + dummy_event_sent = await self._send_dummy_event_for_room(room_id) if not dummy_event_sent: # Did not find a valid user in the room, so remove from future attempts -- cgit 1.5.1 From b7695ac38843d679b7121495729e0d433c37688e Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 31 Jul 2023 08:44:45 -0400 Subject: Combine duplicated code for calculating an event ID from a txn ID (#16023) Refactoring related to stabilization of MSC3970, refactor to combine code which has the same logic. --- changelog.d/16023.misc | 1 + synapse/handlers/message.py | 39 +++++++++++++++++++++++++++++++-------- synapse/handlers/room_member.py | 28 ++++------------------------ 3 files changed, 36 insertions(+), 32 deletions(-) create mode 100644 changelog.d/16023.misc (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/16023.misc b/changelog.d/16023.misc new file mode 100644 index 0000000000..ee732318e4 --- /dev/null +++ b/changelog.d/16023.misc @@ -0,0 +1 @@ +Combine duplicated code. diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 187dedae7d..c656e07d37 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -878,14 +878,13 @@ class EventCreationHandler: return prev_event return None - async def get_event_from_transaction( + async def get_event_id_from_transaction( self, requester: Requester, txn_id: str, room_id: str, - ) -> Optional[EventBase]: - """For the given transaction ID and room ID, check if there is a matching event. - If so, fetch it and return it. + ) -> Optional[str]: + """For the given transaction ID and room ID, check if there is a matching event ID. Args: requester: The requester making the request in the context of which we want @@ -894,8 +893,9 @@ class EventCreationHandler: room_id: The room ID. Returns: - An event if one could be found, None otherwise. + An event ID if one could be found, None otherwise. """ + existing_event_id = None if self._msc3970_enabled and requester.device_id: # When MSC3970 is enabled, we lookup for events sent by the same device first, @@ -909,7 +909,7 @@ class EventCreationHandler: ) ) if existing_event_id: - return await self.store.get_event(existing_event_id) + return existing_event_id # Pre-MSC3970, we looked up for events that were sent by the same session by # using the access token ID. @@ -922,9 +922,32 @@ class EventCreationHandler: txn_id, ) ) - if existing_event_id: - return await self.store.get_event(existing_event_id) + return existing_event_id + + async def get_event_from_transaction( + self, + requester: Requester, + txn_id: str, + room_id: str, + ) -> Optional[EventBase]: + """For the given transaction ID and room ID, check if there is a matching event. + If so, fetch it and return it. + + Args: + requester: The requester making the request in the context of which we want + to fetch the event. + txn_id: The transaction ID. + room_id: The room ID. + + Returns: + An event if one could be found, None otherwise. + """ + existing_event_id = await self.get_event_id_from_transaction( + requester, txn_id, room_id + ) + if existing_event_id: + return await self.store.get_event(existing_event_id) return None async def create_and_send_nonmember_event( diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 6cca2ec344..e3cdf2bc61 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -176,8 +176,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): self.request_ratelimiter = hs.get_request_ratelimiter() hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room) - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - def _on_user_joined_room(self, event_id: str, room_id: str) -> None: """Notify the rate limiter that a room join has occurred. @@ -418,29 +416,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # do this check just before we persist an event as well, but may as well # do it up front for efficiency.) if txn_id: - existing_event_id = None - if self._msc3970_enabled and requester.device_id: - # When MSC3970 is enabled, we lookup for events sent by the same device - # first, and fallback to the old behaviour if none were found. - existing_event_id = ( - await self.store.get_event_id_from_transaction_id_and_device_id( - room_id, - requester.user.to_string(), - requester.device_id, - txn_id, - ) + existing_event_id = ( + await self.event_creation_handler.get_event_id_from_transaction( + requester, txn_id, room_id ) - - if requester.access_token_id and not existing_event_id: - existing_event_id = ( - await self.store.get_event_id_from_transaction_id_and_token_id( - room_id, - requester.user.to_string(), - requester.access_token_id, - txn_id, - ) - ) - + ) if existing_event_id: event_pos = await self.store.get_position_for_event(existing_event_id) return existing_event_id, event_pos.stream -- cgit 1.5.1 From d98a43d9226cbb4b9ab5ad3abd9b630548c2f09f Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 4 Aug 2023 07:47:18 -0400 Subject: Stabilize support for MSC3970: updated transaction semantics (scope to `device_id`) (#15629) For now this maintains compatible with old Synapses by falling back to using transaction semantics on a per-access token. A future version of Synapse will drop support for this. --- changelog.d/15629.feature | 1 + synapse/config/experimental.py | 9 ------- synapse/events/utils.py | 42 ++++++++++++++++---------------- synapse/handlers/message.py | 12 ++++----- synapse/rest/client/transactions.py | 12 ++++----- synapse/server.py | 4 +-- synapse/storage/databases/main/events.py | 15 +++++------- synapse/storage/schema/__init__.py | 5 +++- synapse/types/__init__.py | 7 +++--- 9 files changed, 48 insertions(+), 59 deletions(-) create mode 100644 changelog.d/15629.feature (limited to 'synapse/handlers/message.py') diff --git a/changelog.d/15629.feature b/changelog.d/15629.feature new file mode 100644 index 0000000000..16264effca --- /dev/null +++ b/changelog.d/15629.feature @@ -0,0 +1 @@ +Scope transaction IDs to devices (implement [MSC3970](https://github.com/matrix-org/matrix-spec-proposals/pull/3970)). diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py index 1695ed8ca3..ac9449b18f 100644 --- a/synapse/config/experimental.py +++ b/synapse/config/experimental.py @@ -216,12 +216,6 @@ class MSC3861: ("session_lifetime",), ) - if not root.experimental.msc3970_enabled: - raise ConfigError( - "experimental_features.msc3970_enabled must be 'true' when OAuth delegation is enabled", - ("experimental_features", "msc3970_enabled"), - ) - @attr.s(auto_attribs=True, frozen=True, slots=True) class MSC3866Config: @@ -397,9 +391,6 @@ class ExperimentalConfig(Config): "Invalid MSC3861 configuration", ("experimental", "msc3861") ) from exc - # MSC3970: Scope transaction IDs to devices - self.msc3970_enabled = experimental.get("msc3970_enabled", self.msc3861.enabled) - # Check that none of the other config options conflict with MSC3861 when enabled self.msc3861.check_config_conflicts(self.root) diff --git a/synapse/events/utils.py b/synapse/events/utils.py index 967a6c245b..52acb21955 100644 --- a/synapse/events/utils.py +++ b/synapse/events/utils.py @@ -394,7 +394,6 @@ def serialize_event( time_now_ms: int, *, config: SerializeEventConfig = _DEFAULT_SERIALIZE_EVENT_CONFIG, - msc3970_enabled: bool = False, ) -> JsonDict: """Serialize event for clients @@ -402,8 +401,6 @@ def serialize_event( e time_now_ms config: Event serialization config - msc3970_enabled: Whether MSC3970 is enabled. It changes whether we should - include the `transaction_id` in the event's `unsigned` section. Returns: The serialized event dictionary. @@ -429,38 +426,46 @@ def serialize_event( e.unsigned["redacted_because"], time_now_ms, config=config, - msc3970_enabled=msc3970_enabled, ) # If we have a txn_id saved in the internal_metadata, we should include it in the # unsigned section of the event if it was sent by the same session as the one # requesting the event. txn_id: Optional[str] = getattr(e.internal_metadata, "txn_id", None) - if txn_id is not None and config.requester is not None: - # For the MSC3970 rules to be applied, we *need* to have the device ID in the - # event internal metadata. Since we were not recording them before, if it hasn't - # been recorded, we fallback to the old behaviour. + if ( + txn_id is not None + and config.requester is not None + and config.requester.user.to_string() == e.sender + ): + # Some events do not have the device ID stored in the internal metadata, + # this includes old events as well as those created by appservice, guests, + # or with tokens minted with the admin API. For those events, fallback + # to using the access token instead. event_device_id: Optional[str] = getattr(e.internal_metadata, "device_id", None) - if msc3970_enabled and event_device_id is not None: + if event_device_id is not None: if event_device_id == config.requester.device_id: d["unsigned"]["transaction_id"] = txn_id else: - # The pre-MSC3970 behaviour is to only include the transaction ID if the - # event was sent from the same access token. For regular users, we can use - # the access token ID to determine this. For guests, we can't, but since - # each guest only has one access token, we can just check that the event was - # sent by the same user as the one requesting the event. + # Fallback behaviour: only include the transaction ID if the event + # was sent from the same access token. + # + # For regular users, the access token ID can be used to determine this. + # This includes access tokens minted with the admin API. + # + # For guests and appservice users, we can't check the access token ID + # so assume it is the same session. event_token_id: Optional[int] = getattr( e.internal_metadata, "token_id", None ) - if config.requester.user.to_string() == e.sender and ( + if ( ( event_token_id is not None and config.requester.access_token_id is not None and event_token_id == config.requester.access_token_id ) or config.requester.is_guest + or config.requester.app_service ): d["unsigned"]["transaction_id"] = txn_id @@ -504,9 +509,6 @@ class EventClientSerializer: clients. """ - def __init__(self, *, msc3970_enabled: bool = False): - self._msc3970_enabled = msc3970_enabled - def serialize_event( self, event: Union[JsonDict, EventBase], @@ -531,9 +533,7 @@ class EventClientSerializer: if not isinstance(event, EventBase): return event - serialized_event = serialize_event( - event, time_now, config=config, msc3970_enabled=self._msc3970_enabled - ) + serialized_event = serialize_event(event, time_now, config=config) # Check if there are any bundled aggregations to include with the event. if bundle_aggregations: diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c656e07d37..d485f21e49 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -561,8 +561,6 @@ class EventCreationHandler: expiry_ms=30 * 60 * 1000, ) - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - async def create_event( self, requester: Requester, @@ -897,9 +895,8 @@ class EventCreationHandler: """ existing_event_id = None - if self._msc3970_enabled and requester.device_id: - # When MSC3970 is enabled, we lookup for events sent by the same device first, - # and fallback to the old behaviour if none were found. + # According to the spec, transactions are scoped to a user's device ID. + if requester.device_id: existing_event_id = ( await self.store.get_event_id_from_transaction_id_and_device_id( room_id, @@ -911,8 +908,9 @@ class EventCreationHandler: if existing_event_id: return existing_event_id - # Pre-MSC3970, we looked up for events that were sent by the same session by - # using the access token ID. + # Some requsters don't have device IDs (appservice, guests, and access + # tokens minted with the admin API), fallback to checking the access token + # ID, which should be close enough. if requester.access_token_id: existing_event_id = ( await self.store.get_event_id_from_transaction_id_and_token_id( diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 0d8a63d8be..3d814c404d 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -50,8 +50,6 @@ class HttpTransactionCache: # for at *LEAST* 30 mins, and at *MOST* 60 mins. self.cleaner = self.clock.looping_call(self._cleanup, CLEANUP_PERIOD_MS) - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - def _get_transaction_key(self, request: IRequest, requester: Requester) -> Hashable: """A helper function which returns a transaction key that can be used with TransactionCache for idempotent requests. @@ -78,18 +76,20 @@ class HttpTransactionCache: elif requester.app_service is not None: return (path, "appservice", requester.app_service.id) - # With MSC3970, we use the user ID and device ID as the transaction key - elif self._msc3970_enabled: + # Use the user ID and device ID as the transaction key. + elif requester.device_id: assert requester.user, "Requester must have a user" assert requester.device_id, "Requester must have a device_id" return (path, "user", requester.user, requester.device_id) - # Otherwise, the pre-MSC3970 behaviour is to use the access token ID + # Some requsters don't have device IDs, these are mostly handled above + # (appservice and guest users), but does not cover access tokens minted + # by the admin API. Use the access token ID instead. else: assert ( requester.access_token_id is not None ), "Requester must have an access_token_id" - return (path, "user", requester.access_token_id) + return (path, "user_admin", requester.access_token_id) def fetch_or_execute_request( self, diff --git a/synapse/server.py b/synapse/server.py index 8430f99ef2..e753ff0377 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -785,9 +785,7 @@ class HomeServer(metaclass=abc.ABCMeta): @cache_in_self def get_event_client_serializer(self) -> EventClientSerializer: - return EventClientSerializer( - msc3970_enabled=self.config.experimental.msc3970_enabled - ) + return EventClientSerializer() @cache_in_self def get_password_policy_handler(self) -> PasswordPolicyHandler: diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index bd3f14fb71..c1353b18c1 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -127,8 +127,6 @@ class PersistEventsStore: self._backfill_id_gen: AbstractStreamIdGenerator = self.store._backfill_id_gen self._stream_id_gen: AbstractStreamIdGenerator = self.store._stream_id_gen - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - @trace async def _persist_events_and_state_updates( self, @@ -1012,9 +1010,11 @@ class PersistEventsStore: ) ) - # Pre-MSC3970, we rely on the access_token_id to scope the txn_id for events. - # Since this is an experimental flag, we still store the mapping even if the - # flag is disabled. + # Synapse usually relies on the device_id to scope transactions for events, + # except for users without device IDs (appservice, guests, and access + # tokens minted with the admin API) which use the access token ID instead. + # + # TODO https://github.com/matrix-org/synapse/issues/16042 if to_insert_token_id: self.db_pool.simple_insert_many_txn( txn, @@ -1030,10 +1030,7 @@ class PersistEventsStore: values=to_insert_token_id, ) - # With MSC3970, we rely on the device_id instead to scope the txn_id for events. - # We're only inserting if MSC3970 is *enabled*, because else the pre-MSC3970 - # behaviour would allow for a UNIQUE constraint violation on this table - if to_insert_device_id and self._msc3970_enabled: + if to_insert_device_id: self.db_pool.simple_insert_many_txn( txn, table="event_txn_id_device_id", diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py index d3ec648f6d..7de9949a5b 100644 --- a/synapse/storage/schema/__init__.py +++ b/synapse/storage/schema/__init__.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -SCHEMA_VERSION = 79 # remember to update the list below when updating +SCHEMA_VERSION = 80 # remember to update the list below when updating """Represents the expectations made by the codebase about the database schema This should be incremented whenever the codebase changes its requirements on the @@ -110,6 +110,9 @@ Changes in SCHEMA_VERSION = 78 Changes in SCHEMA_VERSION = 79 - Add tables to handle in DB read-write locks. - Add some mitigations for a painful race between foreground and background updates, cf #15677. + +Changes in SCHEMA_VERSION = 80 + - The event_txn_id_device_id is always written to for new events. """ diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py index fdfd465c8d..39a1ae4ac3 100644 --- a/synapse/types/__init__.py +++ b/synapse/types/__init__.py @@ -117,11 +117,12 @@ class Requester: Attributes: user: id of the user making the request - access_token_id: *ID* of the access token used for this - request, or None if it came via the appservice API or similar + access_token_id: *ID* of the access token used for this request, or + None for appservices, guests, and tokens generated by the admin API is_guest: True if the user making this request is a guest user shadow_banned: True if the user making this request has been shadow-banned. - device_id: device_id which was set at authentication time + device_id: device_id which was set at authentication time, or + None for appservices, guests, and tokens generated by the admin API app_service: the AS requesting on behalf of the user authenticated_entity: The entity that authenticated when making the request. This is different to the user_id when an admin user or the server is -- cgit 1.5.1