From 684d19a11c3b93c9dd5fb90f43d38aa7e8c6005f Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 4 Aug 2021 12:07:57 -0500 Subject: Add support for MSC2716 marker events (#10498) * Make historical messages available to federated servers Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716 Follow-up to https://github.com/matrix-org/synapse/pull/9247 * Debug message not available on federation * Add base starting insertion point when no chunk ID is provided * Fix messages from multiple senders in historical chunk Follow-up to https://github.com/matrix-org/synapse/pull/9247 Part of MSC2716: https://github.com/matrix-org/matrix-doc/pull/2716 --- Previously, Synapse would throw a 403, `Cannot force another user to join.`, because we were trying to use `?user_id` from a single virtual user which did not match with messages from other users in the chunk. * Remove debug lines * Messing with selecting insertion event extremeties * Move db schema change to new version * Add more better comments * Make a fake requester with just what we need See https://github.com/matrix-org/synapse/pull/10276#discussion_r660999080 * Store insertion events in table * Make base insertion event float off on its own See https://github.com/matrix-org/synapse/pull/10250#issuecomment-875711889 Conflicts: synapse/rest/client/v1/room.py * Validate that the app service can actually control the given user See https://github.com/matrix-org/synapse/pull/10276#issuecomment-876316455 Conflicts: synapse/rest/client/v1/room.py * Add some better comments on what we're trying to check for * Continue debugging * Share validation logic * Add inserted historical messages to /backfill response * Remove debug sql queries * Some marker event implemntation trials * Clean up PR * Rename insertion_event_id to just event_id * Add some better sql comments * More accurate description * Add changelog * Make it clear what MSC the change is part of * Add more detail on which insertion event came through * Address review and improve sql queries * Only use event_id as unique constraint * Fix test case where insertion event is already in the normal DAG * Remove debug changes * Add support for MSC2716 marker events * Process markers when we receive it over federation * WIP: make hs2 backfill historical messages after marker event * hs2 to better ask for insertion event extremity But running into the `sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group` error * Add insertion_event_extremities table * Switch to chunk events so we can auth via power_levels Previously, we were using `content.chunk_id` to connect one chunk to another. But these events can be from any `sender` and we can't tell who should be able to send historical events. We know we only want the application service to do it but these events have the sender of a real historical message, not the application service user ID as the sender. Other federated homeservers also have no indicator which senders are an application service on the originating homeserver. So we want to auth all of the MSC2716 events via power_levels and have them be sent by the application service with proper PL levels in the room. * Switch to chunk events for federation * Add unstable room version to support new historical PL * Messy: Fix undefined state_group for federated historical events ``` 2021-07-13 02:27:57,810 - synapse.handlers.federation - 1248 - ERROR - GET-4 - Failed to backfill from hs1 because NOT NULL constraint failed: event_to_state_groups.state_group Traceback (most recent call last): File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1216, in try_backfill await self.backfill( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 1035, in backfill await self._auth_and_persist_event(dest, event, context, backfilled=True) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2222, in _auth_and_persist_event await self._run_push_actions_and_persist_event(event, context, backfilled) File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 2244, in _run_push_actions_and_persist_event await self.persist_events_and_notify( File "/usr/local/lib/python3.8/site-packages/synapse/handlers/federation.py", line 3290, in persist_events_and_notify events, max_stream_token = await self.storage.persistence.persist_events( File "/usr/local/lib/python3.8/site-packages/synapse/logging/opentracing.py", line 774, in _trace_inner return await func(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 320, in persist_events ret_vals = await yieldable_gather_results(enqueue, partitioned.items()) File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 237, in handle_queue_loop ret = await self._per_item_callback( File "/usr/local/lib/python3.8/site-packages/synapse/storage/persist_events.py", line 577, in _persist_event_batch await self.persist_events_store._persist_events_and_state_updates( File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 176, in _persist_events_and_state_updates await self.db_pool.runInteraction( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 681, in runInteraction result = await self.runWithConnection( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 770, in runWithConnection return await make_deferred_yieldable( File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 238, in inContext result = inContext.theWork() # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/threadpool.py", line 254, in inContext.theWork = lambda: context.call( # type: ignore[attr-defined] File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 118, in callWithContext return self.currentContext().callWithContext(ctx, func, *args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/python/context.py", line 83, in callWithContext return func(*args, **kw) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 293, in _runWithConnection compat.reraise(excValue, excTraceback) File "/usr/local/lib/python3.8/site-packages/twisted/python/deprecate.py", line 298, in deprecatedFunction return function(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/twisted/python/compat.py", line 403, in reraise raise exception.with_traceback(traceback) File "/usr/local/lib/python3.8/site-packages/twisted/enterprise/adbapi.py", line 284, in _runWithConnection result = func(conn, *args, **kw) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 765, in inner_func return func(db_conn, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 549, in new_transaction r = func(cursor, *args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/logging/utils.py", line 69, in wrapped return f(*args, **kwargs) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 385, in _persist_events_txn self._store_event_state_mappings_txn(txn, events_and_contexts) File "/usr/local/lib/python3.8/site-packages/synapse/storage/databases/main/events.py", line 2065, in _store_event_state_mappings_txn self.db_pool.simple_insert_many_txn( File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 923, in simple_insert_many_txn txn.execute_batch(sql, vals) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 280, in execute_batch self.executemany(sql, args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 300, in executemany self._do_execute(self.txn.executemany, sql, *args) File "/usr/local/lib/python3.8/site-packages/synapse/storage/database.py", line 330, in _do_execute return func(sql, *args) sqlite3.IntegrityError: NOT NULL constraint failed: event_to_state_groups.state_group ``` * Revert "Messy: Fix undefined state_group for federated historical events" This reverts commit 187ab28611546321e02770944c86f30ee2bc742a. * Fix federated events being rejected for no state_groups Add fix from https://github.com/matrix-org/synapse/pull/10439 until it merges. * Adapting to experimental room version * Some log cleanup * Add better comments around extremity fetching code and why * Rename to be more accurate to what the function returns * Add changelog * Ignore rejected events * Use simplified upsert * Add Erik's explanation of extra event checks See https://github.com/matrix-org/synapse/pull/10498#discussion_r680880332 * Clarify that the depth is not directly correlated to the backwards extremity that we return See https://github.com/matrix-org/synapse/pull/10498#discussion_r681725404 * lock only matters for sqlite See https://github.com/matrix-org/synapse/pull/10498#discussion_r681728061 * Move new SQL changes to its own delta file * Clean up upsert docstring * Bump database schema version (62) --- synapse/handlers/federation.py | 119 ++++++++++++++++++++++++++++++++++++++--- 1 file changed, 113 insertions(+), 6 deletions(-) (limited to 'synapse/handlers/federation.py') diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8197b60b76..8b602e3813 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -42,6 +42,7 @@ from twisted.internet import defer from synapse import event_auth from synapse.api.constants import ( + EventContentFields, EventTypes, Membership, RejectedReason, @@ -262,7 +263,12 @@ class FederationHandler(BaseHandler): state = None - # Get missing pdus if necessary. + # Check that the event passes auth based on the state at the event. This is + # done for events that are to be added to the timeline (non-outliers). + # + # Get missing pdus if necessary: + # - Fetching any missing prev events to fill in gaps in the graph + # - Fetching state if we have a hole in the graph if not pdu.internal_metadata.is_outlier(): # We only backfill backwards to the min depth. min_depth = await self.get_min_depth_for_context(pdu.room_id) @@ -432,6 +438,13 @@ class FederationHandler(BaseHandler): affected=event_id, ) + # A second round of checks for all events. Check that the event passes auth + # based on `auth_events`, this allows us to assert that the event would + # have been allowed at some point. If an event passes this check its OK + # for it to be used as part of a returned `/state` request, as either + # a) we received the event as part of the original join and so trust it, or + # b) we'll do a state resolution with existing state before it becomes + # part of the "current state", which adds more protection. await self._process_received_pdu(origin, pdu, state=state) async def _get_missing_events_for_pdu( @@ -889,6 +902,79 @@ class FederationHandler(BaseHandler): "resync_device_due_to_pdu", self._resync_device, event.sender ) + await self._handle_marker_event(origin, event) + + async def _handle_marker_event(self, origin: str, marker_event: EventBase): + """Handles backfilling the insertion event when we receive a marker + event that points to one. + + Args: + origin: Origin of the event. Will be called to get the insertion event + marker_event: The event to process + """ + + if marker_event.type != EventTypes.MSC2716_MARKER: + # Not a marker event + return + + if marker_event.rejected_reason is not None: + # Rejected event + return + + # Skip processing a marker event if the room version doesn't + # support it. + room_version = await self.store.get_room_version(marker_event.room_id) + if not room_version.msc2716_historical: + return + + logger.debug("_handle_marker_event: received %s", marker_event) + + insertion_event_id = marker_event.content.get( + EventContentFields.MSC2716_MARKER_INSERTION + ) + + if insertion_event_id is None: + # Nothing to retrieve then (invalid marker) + return + + logger.debug( + "_handle_marker_event: backfilling insertion event %s", insertion_event_id + ) + + await self._get_events_and_persist( + origin, + marker_event.room_id, + [insertion_event_id], + ) + + insertion_event = await self.store.get_event( + insertion_event_id, allow_none=True + ) + if insertion_event is None: + logger.warning( + "_handle_marker_event: server %s didn't return insertion event %s for marker %s", + origin, + insertion_event_id, + marker_event.event_id, + ) + return + + logger.debug( + "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s", + insertion_event, + marker_event, + ) + + await self.store.insert_insertion_extremity( + insertion_event_id, marker_event.room_id + ) + + logger.debug( + "_handle_marker_event: insertion extremity added for %s from marker event %s", + insertion_event, + marker_event, + ) + async def _resync_device(self, sender: str) -> None: """We have detected that the device list for the given user may be out of sync, so we try and resync them. @@ -1057,9 +1143,19 @@ class FederationHandler(BaseHandler): async def _maybe_backfill_inner( self, room_id: str, current_depth: int, limit: int ) -> bool: - extremities = await self.store.get_oldest_events_with_depth_in_room(room_id) + oldest_events_with_depth = ( + await self.store.get_oldest_event_ids_with_depth_in_room(room_id) + ) + insertion_events_to_be_backfilled = ( + await self.store.get_insertion_event_backwards_extremities_in_room(room_id) + ) + logger.debug( + "_maybe_backfill_inner: extremities oldest_events_with_depth=%s insertion_events_to_be_backfilled=%s", + oldest_events_with_depth, + insertion_events_to_be_backfilled, + ) - if not extremities: + if not oldest_events_with_depth and not insertion_events_to_be_backfilled: logger.debug("Not backfilling as no extremeties found.") return False @@ -1089,10 +1185,12 @@ class FederationHandler(BaseHandler): # state *before* the event, ignoring the special casing certain event # types have. - forward_events = await self.store.get_successor_events(list(extremities)) + forward_event_ids = await self.store.get_successor_events( + list(oldest_events_with_depth) + ) extremities_events = await self.store.get_events( - forward_events, + forward_event_ids, redact_behaviour=EventRedactBehaviour.AS_IS, get_prev_content=False, ) @@ -1106,10 +1204,19 @@ class FederationHandler(BaseHandler): redact=False, check_history_visibility_only=True, ) + logger.debug( + "_maybe_backfill_inner: filtered_extremities %s", filtered_extremities + ) - if not filtered_extremities: + if not filtered_extremities and not insertion_events_to_be_backfilled: return False + extremities = { + **oldest_events_with_depth, + # TODO: insertion_events_to_be_backfilled is currently skipping the filtered_extremities checks + **insertion_events_to_be_backfilled, + } + # Check if we reached a point where we should start backfilling. sorted_extremeties_tuple = sorted(extremities.items(), key=lambda e: -int(e[1])) max_depth = sorted_extremeties_tuple[0][1] -- cgit 1.5.1 From 1bebc0b78cbedffb6b69fd76327f0eb7663c3c96 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 6 Aug 2021 13:54:23 +0100 Subject: Clean up federation event auth code (#10539) * drop old-room hack pretty sure we don't need this any more. * Remove incorrect comment about modifying `context` It doesn't look like the supplied context is ever modified. * Stop `_auth_and_persist_event` modifying its parameters This is only called in three places. Two of them don't pass `auth_events`, and the third doesn't use the dict after passing it in, so this should be non-functional. * Stop `_check_event_auth` modifying its parameters `_check_event_auth` is only called in three places. `on_send_membership_event` doesn't pass an `auth_events`, and `prep` and `_auth_and_persist_event` do not use the map after passing it in. * Stop `_update_auth_events_and_context_for_auth` modifying its parameters Return the updated auth event dict, rather than modifying the parameter. This is only called from `_check_event_auth`. * Improve documentation on `_auth_and_persist_event` Rename `auth_events` parameter to better reflect what it contains. * Improve documentation on `_NewEventInfo` * Improve documentation on `_check_event_auth` rename `auth_events` parameter to better describe what it contains * changelog --- changelog.d/10539.misc | 1 + synapse/handlers/federation.py | 118 +++++++++++++++++++++++------------------ tests/test_federation.py | 6 +-- 3 files changed, 69 insertions(+), 56 deletions(-) create mode 100644 changelog.d/10539.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10539.misc b/changelog.d/10539.misc new file mode 100644 index 0000000000..9a765435db --- /dev/null +++ b/changelog.d/10539.misc @@ -0,0 +1 @@ +Clean up some of the federation event authentication code for clarity. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 8b602e3813..9a5e726533 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -109,21 +109,33 @@ soft_failed_event_counter = Counter( ) -@attr.s(slots=True) +@attr.s(slots=True, frozen=True, auto_attribs=True) class _NewEventInfo: """Holds information about a received event, ready for passing to _auth_and_persist_events Attributes: event: the received event - state: the state at that event + state: the state at that event, according to /state_ids from a remote + homeserver. Only populated for backfilled events which are going to be a + new backwards extremity. + + claimed_auth_event_map: a map of (type, state_key) => event for the event's + claimed auth_events. + + This can include events which have not yet been persisted, in the case that + we are backfilling a batch of events. + + Note: May be incomplete: if we were unable to find all of the claimed auth + events. Also, treat the contents with caution: the events might also have + been rejected, might not yet have been authorized themselves, or they might + be in the wrong room. - auth_events: the auth_event map for that event """ - event = attr.ib(type=EventBase) - state = attr.ib(type=Optional[Sequence[EventBase]], default=None) - auth_events = attr.ib(type=Optional[MutableStateMap[EventBase]], default=None) + event: EventBase + state: Optional[Sequence[EventBase]] + claimed_auth_event_map: StateMap[EventBase] class FederationHandler(BaseHandler): @@ -1086,7 +1098,7 @@ class FederationHandler(BaseHandler): _NewEventInfo( event=ev, state=events_to_state[e_id], - auth_events={ + claimed_auth_event_map={ ( auth_events[a_id].type, auth_events[a_id].state_key, @@ -2315,7 +2327,7 @@ class FederationHandler(BaseHandler): event: EventBase, context: EventContext, state: Optional[Iterable[EventBase]] = None, - auth_events: Optional[MutableStateMap[EventBase]] = None, + claimed_auth_event_map: Optional[StateMap[EventBase]] = None, backfilled: bool = False, ) -> None: """ @@ -2327,17 +2339,18 @@ class FederationHandler(BaseHandler): context: The event context. - NB that this function potentially modifies it. state: The state events used to check the event for soft-fail. If this is not provided the current state events will be used. - auth_events: - Map from (event_type, state_key) to event - Normally, our calculated auth_events based on the state of the room - at the event's position in the DAG, though occasionally (eg if the - event is an outlier), may be the auth events claimed by the remote - server. + claimed_auth_event_map: + A map of (type, state_key) => event for the event's claimed auth_events. + Possibly incomplete, and possibly including events that are not yet + persisted, or authed, or in the right room. + + Only populated where we may not already have persisted these events - + for example, when populating outliers. + backfilled: True if the event was backfilled. """ context = await self._check_event_auth( @@ -2345,7 +2358,7 @@ class FederationHandler(BaseHandler): event, context, state=state, - auth_events=auth_events, + claimed_auth_event_map=claimed_auth_event_map, backfilled=backfilled, ) @@ -2409,7 +2422,7 @@ class FederationHandler(BaseHandler): event, res, state=ev_info.state, - auth_events=ev_info.auth_events, + claimed_auth_event_map=ev_info.claimed_auth_event_map, backfilled=backfilled, ) return res @@ -2675,7 +2688,7 @@ class FederationHandler(BaseHandler): event: EventBase, context: EventContext, state: Optional[Iterable[EventBase]] = None, - auth_events: Optional[MutableStateMap[EventBase]] = None, + claimed_auth_event_map: Optional[StateMap[EventBase]] = None, backfilled: bool = False, ) -> EventContext: """ @@ -2687,21 +2700,19 @@ class FederationHandler(BaseHandler): context: The event context. - NB that this function potentially modifies it. state: The state events used to check the event for soft-fail. If this is not provided the current state events will be used. - auth_events: - Map from (event_type, state_key) to event - Normally, our calculated auth_events based on the state of the room - at the event's position in the DAG, though occasionally (eg if the - event is an outlier), may be the auth events claimed by the remote - server. + claimed_auth_event_map: + A map of (type, state_key) => event for the event's claimed auth_events. + Possibly incomplete, and possibly including events that are not yet + persisted, or authed, or in the right room. - Also NB that this function adds entries to it. + Only populated where we may not already have persisted these events - + for example, when populating outliers, or the state for a backwards + extremity. - If this is not provided, it is calculated from the previous state IDs. backfilled: True if the event was backfilled. Returns: @@ -2710,7 +2721,12 @@ class FederationHandler(BaseHandler): room_version = await self.store.get_room_version_id(event.room_id) room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - if not auth_events: + if claimed_auth_event_map: + # if we have a copy of the auth events from the event, use that as the + # basis for auth. + auth_events = claimed_auth_event_map + else: + # otherwise, we calculate what the auth events *should* be, and use that prev_state_ids = await context.get_prev_state_ids() auth_events_ids = self._event_auth_handler.compute_auth_events( event, prev_state_ids, for_verification=True @@ -2718,18 +2734,11 @@ class FederationHandler(BaseHandler): auth_events_x = await self.store.get_events(auth_events_ids) auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()} - # This is a hack to fix some old rooms where the initial join event - # didn't reference the create event in its auth events. - if event.type == EventTypes.Member and not event.auth_event_ids(): - if len(event.prev_event_ids()) == 1 and event.depth < 5: - c = await self.store.get_event( - event.prev_event_ids()[0], allow_none=True - ) - if c and c.type == EventTypes.Create: - auth_events[(c.type, c.state_key)] = c - try: - context = await self._update_auth_events_and_context_for_auth( + ( + context, + auth_events_for_auth, + ) = await self._update_auth_events_and_context_for_auth( origin, event, context, auth_events ) except Exception: @@ -2742,9 +2751,10 @@ class FederationHandler(BaseHandler): "Ignoring failure and continuing processing of event.", event.event_id, ) + auth_events_for_auth = auth_events try: - event_auth.check(room_version_obj, event, auth_events=auth_events) + event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth) except AuthError as e: logger.warning("Failed auth resolution for %r because %s", event, e) context.rejected = RejectedReason.AUTH_ERROR @@ -2769,8 +2779,8 @@ class FederationHandler(BaseHandler): origin: str, event: EventBase, context: EventContext, - auth_events: MutableStateMap[EventBase], - ) -> EventContext: + input_auth_events: StateMap[EventBase], + ) -> Tuple[EventContext, StateMap[EventBase]]: """Helper for _check_event_auth. See there for docs. Checks whether a given event has the expected auth events. If it @@ -2787,7 +2797,7 @@ class FederationHandler(BaseHandler): event: context: - auth_events: + input_auth_events: Map from (event_type, state_key) to event Normally, our calculated auth_events based on the state of the room @@ -2795,11 +2805,12 @@ class FederationHandler(BaseHandler): event is an outlier), may be the auth events claimed by the remote server. - Also NB that this function adds entries to it. - Returns: - updated context + updated context, updated auth event map """ + # take a copy of input_auth_events before we modify it. + auth_events: MutableStateMap[EventBase] = dict(input_auth_events) + event_auth_events = set(event.auth_event_ids()) # missing_auth is the set of the event's auth_events which we don't yet have @@ -2828,7 +2839,7 @@ class FederationHandler(BaseHandler): # The other side isn't around or doesn't implement the # endpoint, so lets just bail out. logger.info("Failed to get event auth from remote: %s", e1) - return context + return context, auth_events seen_remotes = await self.store.have_seen_events( event.room_id, [e.event_id for e in remote_auth_chain] @@ -2859,7 +2870,10 @@ class FederationHandler(BaseHandler): await self.state_handler.compute_event_context(e) ) await self._auth_and_persist_event( - origin, e, missing_auth_event_context, auth_events=auth + origin, + e, + missing_auth_event_context, + claimed_auth_event_map=auth, ) if e.event_id in event_auth_events: @@ -2877,14 +2891,14 @@ class FederationHandler(BaseHandler): # obviously be empty # (b) alternatively, why don't we do it earlier? logger.info("Skipping auth_event fetch for outlier") - return context + return context, auth_events different_auth = event_auth_events.difference( e.event_id for e in auth_events.values() ) if not different_auth: - return context + return context, auth_events logger.info( "auth_events refers to events which are not in our calculated auth " @@ -2910,7 +2924,7 @@ class FederationHandler(BaseHandler): # XXX: should we reject the event in this case? It feels like we should, # but then shouldn't we also do so if we've failed to fetch any of the # auth events? - return context + return context, auth_events # now we state-resolve between our own idea of the auth events, and the remote's # idea of them. @@ -2940,7 +2954,7 @@ class FederationHandler(BaseHandler): event, context, auth_events ) - return context + return context, auth_events async def _update_context_for_auth_events( self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] diff --git a/tests/test_federation.py b/tests/test_federation.py index 0ed8326f55..3785799f46 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -75,10 +75,8 @@ class MessageAcceptTests(unittest.HomeserverTestCase): ) self.handler = self.homeserver.get_federation_handler() - self.handler._check_event_auth = ( - lambda origin, event, context, state, auth_events, backfilled: succeed( - context - ) + self.handler._check_event_auth = lambda origin, event, context, state, claimed_auth_event_map, backfilled: succeed( + context ) self.client = self.homeserver.get_federation_client() self.client._check_sigs_and_hash_and_fetch = lambda dest, pdus, **k: succeed( -- cgit 1.5.1 From 2d9ca4ca77c2cdf98ddb738aee8d5699c7c8749f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Mon, 16 Aug 2021 13:19:02 +0100 Subject: Clean up some logging in the federation event handler (#10591) * Include outlier status in `str(event)` In places where we log event objects, knowing whether or not you're dealing with an outlier is super useful. * Remove duplicated logging in get_missing_events When we process events received from get_missing_events, we log them twice (once in `_get_missing_events_for_pdu`, and once in `on_receive_pdu`). Reduce the duplication by removing the logging in `on_receive_pdu`, and ensuring the call sites do sensible logging. * log in `on_receive_pdu` when we already have the event * Log which prev_events we are missing * changelog --- changelog.d/10591.misc | 1 + synapse/events/__init__.py | 3 +- synapse/federation/federation_server.py | 1 + synapse/handlers/federation.py | 52 +++++++++++++++------------------ 4 files changed, 28 insertions(+), 29 deletions(-) create mode 100644 changelog.d/10591.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10591.misc b/changelog.d/10591.misc new file mode 100644 index 0000000000..9a765435db --- /dev/null +++ b/changelog.d/10591.misc @@ -0,0 +1 @@ +Clean up some of the federation event authentication code for clarity. diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py index 0298af4c02..a730c1719a 100644 --- a/synapse/events/__init__.py +++ b/synapse/events/__init__.py @@ -396,10 +396,11 @@ class FrozenEvent(EventBase): return self.__repr__() def __repr__(self): - return "" % ( + return "" % ( self.get("event_id", None), self.get("type", None), self.get("state_key", None), + self.internal_metadata.is_outlier(), ) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 78d5aac6af..afd8f8580a 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1003,6 +1003,7 @@ class FederationServer(FederationBase): # has started processing). while True: async with lock: + logger.info("handling received PDU: %s", event) try: await self.handler.on_receive_pdu( origin, event, sent_to_us_directly=True diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 9a5e726533..c0e13bdaac 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -220,8 +220,6 @@ class FederationHandler(BaseHandler): room_id = pdu.room_id event_id = pdu.event_id - logger.info("handling received PDU: %s", pdu) - # We reprocess pdus when we have seen them only as outliers existing = await self.store.get_event( event_id, allow_none=True, allow_rejected=True @@ -229,14 +227,19 @@ class FederationHandler(BaseHandler): # FIXME: Currently we fetch an event again when we already have it # if it has been marked as an outlier. - - already_seen = existing and ( - not existing.internal_metadata.is_outlier() - or pdu.internal_metadata.is_outlier() - ) - if already_seen: - logger.debug("Already seen pdu") - return + if existing: + if not existing.internal_metadata.is_outlier(): + logger.info( + "Ignoring received event %s which we have already seen", event_id + ) + return + if pdu.internal_metadata.is_outlier(): + logger.info( + "Ignoring received outlier %s which we already have as an outlier", + event_id, + ) + return + logger.info("De-outliering event %s", event_id) # do some initial sanity-checking of the event. In particular, make # sure it doesn't have hundreds of prev_events or auth_events, which @@ -331,7 +334,8 @@ class FederationHandler(BaseHandler): "Found all missing prev_events", ) - if prevs - seen: + missing_prevs = prevs - seen + if missing_prevs: # We've still not been able to get all of the prev_events for this event. # # In this case, we need to fall back to asking another server in the @@ -359,8 +363,8 @@ class FederationHandler(BaseHandler): if sent_to_us_directly: logger.warning( "Rejecting: failed to fetch %d prev events: %s", - len(prevs - seen), - shortstr(prevs - seen), + len(missing_prevs), + shortstr(missing_prevs), ) raise FederationError( "ERROR", @@ -373,9 +377,10 @@ class FederationHandler(BaseHandler): ) logger.info( - "Event %s is missing prev_events: calculating state for a " + "Event %s is missing prev_events %s: calculating state for a " "backwards extremity", event_id, + shortstr(missing_prevs), ) # Calculate the state after each of the previous events, and @@ -393,7 +398,7 @@ class FederationHandler(BaseHandler): # Ask the remote server for the states we don't # know about - for p in prevs - seen: + for p in missing_prevs: logger.info("Requesting state after missing prev_event %s", p) with nested_logging_context(p): @@ -556,21 +561,14 @@ class FederationHandler(BaseHandler): logger.warning("Failed to get prev_events: %s", e) return - logger.info( - "Got %d prev_events: %s", - len(missing_events), - shortstr(missing_events), - ) + logger.info("Got %d prev_events", len(missing_events)) # We want to sort these by depth so we process them and # tell clients about them in order. missing_events.sort(key=lambda x: x.depth) for ev in missing_events: - logger.info( - "Handling received prev_event %s", - ev.event_id, - ) + logger.info("Handling received prev_event %s", ev) with nested_logging_context(ev.event_id): try: await self.on_receive_pdu(origin, ev, sent_to_us_directly=False) @@ -1762,10 +1760,8 @@ class FederationHandler(BaseHandler): for p, origin in room_queue: try: logger.info( - "Processing queued PDU %s which was received " - "while we were joining %s", - p.event_id, - p.room_id, + "Processing queued PDU %s which was received while we were joining", + p, ) with nested_logging_context(p.event_id): await self.on_receive_pdu(origin, p, sent_to_us_directly=True) -- cgit 1.5.1 From 272b89d547a80628fd90ed35953d2d7c95410b65 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Tue, 17 Aug 2021 13:13:42 +0100 Subject: Stop setting the outlier flag for things that aren't (#10614) Marking things as outliers to inhibit pushes is a sledgehammer to crack a nut. Move the test further down the stack so that we just inhibit the thing we want. --- changelog.d/10614.misc | 1 + synapse/handlers/federation.py | 9 ++------- 2 files changed, 3 insertions(+), 7 deletions(-) create mode 100644 changelog.d/10614.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10614.misc b/changelog.d/10614.misc new file mode 100644 index 0000000000..9a765435db --- /dev/null +++ b/changelog.d/10614.misc @@ -0,0 +1 @@ +Clean up some of the federation event authentication code for clarity. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index c0e13bdaac..a257d87fed 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -293,13 +293,7 @@ class FederationHandler(BaseHandler): prevs = set(pdu.prev_event_ids()) seen = await self.store.have_events_in_timeline(prevs) - if min_depth is not None and pdu.depth < min_depth: - # This is so that we don't notify the user about this - # message, to work around the fact that some events will - # reference really really old events we really don't want to - # send to the clients. - pdu.internal_metadata.outlier = True - elif min_depth is not None and pdu.depth > min_depth: + if min_depth is not None and pdu.depth > min_depth: missing_prevs = prevs - seen if sent_to_us_directly and missing_prevs: # If we're missing stuff, ensure we only fetch stuff one @@ -2375,6 +2369,7 @@ class FederationHandler(BaseHandler): not event.internal_metadata.is_outlier() and not backfilled and not context.rejected + and (await self.store.get_min_depth(event.room_id)) <= event.depth ): await self.action_generator.handle_push_actions_for_event( event, context -- cgit 1.5.1 From 964f29cb6f4ff5892575ac170aa872822f2c1a0e Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 18 Aug 2021 12:36:22 +0100 Subject: Refactor `on_receive_pdu` code (#10615) * drop room pdu linearizer sooner No point holding onto it while we recheck the db * move out `missing_prevs` calculation we're going to need `missing_prevs` whatever we do, so we may as well calculate it eagerly and just update it if it gets outdated. * Add another `if missing_prevs` condition this should be a no-op, since all the code inside the block already checks `if missing_prevs` * reorder if conditions This shouldn't change the logic at all. * Push down `min_depth` read No point reading it from the database unless we're going to use it. * Collect the sent_to_us_directly code together Move the remaining `sent_to_us_directly` code inside the `if sent_to_us_directly` block. * Properly separate the `not sent_to_us_directly` branch Since the only way this second block is now reachable is if we *didn't* go into the `sent_to_us_directly` branch, we can replace it with a simple `else`. * changelog --- changelog.d/10615.misc | 1 + synapse/handlers/federation.py | 271 +++++++++++++++++++++-------------------- 2 files changed, 138 insertions(+), 134 deletions(-) create mode 100644 changelog.d/10615.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10615.misc b/changelog.d/10615.misc new file mode 100644 index 0000000000..9a765435db --- /dev/null +++ b/changelog.d/10615.misc @@ -0,0 +1 @@ +Clean up some of the federation event authentication code for clarity. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index a257d87fed..529d025c39 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -285,169 +285,172 @@ class FederationHandler(BaseHandler): # - Fetching any missing prev events to fill in gaps in the graph # - Fetching state if we have a hole in the graph if not pdu.internal_metadata.is_outlier(): - # We only backfill backwards to the min depth. - min_depth = await self.get_min_depth_for_context(pdu.room_id) - - logger.debug("min_depth: %d", min_depth) - prevs = set(pdu.prev_event_ids()) seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen - if min_depth is not None and pdu.depth > min_depth: - missing_prevs = prevs - seen - if sent_to_us_directly and missing_prevs: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. - logger.info( - "Acquiring room lock to fetch %d missing prev_events: %s", - len(missing_prevs), - shortstr(missing_prevs), - ) - with (await self._room_pdu_linearizer.queue(pdu.room_id)): + if missing_prevs: + if sent_to_us_directly: + # We only backfill backwards to the min depth. + min_depth = await self.get_min_depth_for_context(pdu.room_id) + logger.debug("min_depth: %d", min_depth) + + if min_depth is not None and pdu.depth > min_depth: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. logger.info( - "Acquired room lock to fetch %d missing prev_events", + "Acquiring room lock to fetch %d missing prev_events: %s", len(missing_prevs), + shortstr(missing_prevs), ) - - try: - await self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth + with (await self._room_pdu_linearizer.queue(pdu.room_id)): + logger.info( + "Acquired room lock to fetch %d missing prev_events", + len(missing_prevs), ) - except Exception as e: - raise Exception( - "Error fetching missing prev_events for %s: %s" - % (event_id, e) - ) from e + + try: + await self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth + ) + except Exception as e: + raise Exception( + "Error fetching missing prev_events for %s: %s" + % (event_id, e) + ) from e # Update the set of things we've seen after trying to # fetch the missing stuff seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if not missing_prevs: + logger.info("Found all missing prev_events") + + if missing_prevs: + # since this event was pushed to us, it is possible for it to + # become the only forward-extremity in the room, and we would then + # trust its state to be the state for the whole room. This is very + # bad. Further, if the event was pushed to us, there is no excuse + # for us not to have all the prev_events. (XXX: apart from + # min_depth?) + # + # We therefore reject any such events. + logger.warning( + "Rejecting: failed to fetch %d prev events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, + ) - if not prevs - seen: - logger.info( - "Found all missing prev_events", - ) - - missing_prevs = prevs - seen - if missing_prevs: - # We've still not been able to get all of the prev_events for this event. - # - # In this case, we need to fall back to asking another server in the - # federation for the state at this event. That's ok provided we then - # resolve the state against other bits of the DAG before using it (which - # will ensure that you can't just take over a room by sending an event, - # withholding its prev_events, and declaring yourself to be an admin in - # the subsequent state request). - # - # Now, if we're pulling this event as a missing prev_event, then clearly - # this event is not going to become the only forward-extremity and we are - # guaranteed to resolve its state against our existing forward - # extremities, so that should be fine. - # - # On the other hand, if this event was pushed to us, it is possible for - # it to become the only forward-extremity in the room, and we would then - # trust its state to be the state for the whole room. This is very bad. - # Further, if the event was pushed to us, there is no excuse for us not to - # have all the prev_events. We therefore reject any such events. - # - # XXX this really feels like it could/should be merged with the above, - # but there is an interaction with min_depth that I'm not really - # following. - - if sent_to_us_directly: - logger.warning( - "Rejecting: failed to fetch %d prev events: %s", - len(missing_prevs), + else: + # We don't have all of the prev_events for this event. + # + # In this case, we need to fall back to asking another server in the + # federation for the state at this event. That's ok provided we then + # resolve the state against other bits of the DAG before using it (which + # will ensure that you can't just take over a room by sending an event, + # withholding its prev_events, and declaring yourself to be an admin in + # the subsequent state request). + # + # Since we're pulling this event as a missing prev_event, then clearly + # this event is not going to become the only forward-extremity and we are + # guaranteed to resolve its state against our existing forward + # extremities, so that should be fine. + # + # XXX this really feels like it could/should be merged with the above, + # but there is an interaction with min_depth that I'm not really + # following. + logger.info( + "Event %s is missing prev_events %s: calculating state for a " + "backwards extremity", + event_id, shortstr(missing_prevs), ) - raise FederationError( - "ERROR", - 403, - ( - "Your server isn't divulging details about prev_events " - "referenced in this event." - ), - affected=pdu.event_id, - ) - logger.info( - "Event %s is missing prev_events %s: calculating state for a " - "backwards extremity", - event_id, - shortstr(missing_prevs), - ) + # Calculate the state after each of the previous events, and + # resolve them to find the correct state at the current event. + event_map = {event_id: pdu} + try: + # Get the state of the events we know about + ours = await self.state_store.get_state_groups_ids( + room_id, seen + ) - # Calculate the state after each of the previous events, and - # resolve them to find the correct state at the current event. - event_map = {event_id: pdu} - try: - # Get the state of the events we know about - ours = await self.state_store.get_state_groups_ids(room_id, seen) - - # state_maps is a list of mappings from (type, state_key) to event_id - state_maps: List[StateMap[str]] = list(ours.values()) - - # we don't need this any more, let's delete it. - del ours - - # Ask the remote server for the states we don't - # know about - for p in missing_prevs: - logger.info("Requesting state after missing prev_event %s", p) - - with nested_logging_context(p): - # note that if any of the missing prevs share missing state or - # auth events, the requests to fetch those events are deduped - # by the get_pdu_cache in federation_client. - remote_state = ( - await self._get_state_after_missing_prev_event( - origin, room_id, p - ) + # state_maps is a list of mappings from (type, state_key) to event_id + state_maps: List[StateMap[str]] = list(ours.values()) + + # we don't need this any more, let's delete it. + del ours + + # Ask the remote server for the states we don't + # know about + for p in missing_prevs: + logger.info( + "Requesting state after missing prev_event %s", p ) - remote_state_map = { - (x.type, x.state_key): x.event_id for x in remote_state - } - state_maps.append(remote_state_map) + with nested_logging_context(p): + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. + remote_state = ( + await self._get_state_after_missing_prev_event( + origin, room_id, p + ) + ) + + remote_state_map = { + (x.type, x.state_key): x.event_id + for x in remote_state + } + state_maps.append(remote_state_map) - for x in remote_state: - event_map[x.event_id] = x + for x in remote_state: + event_map[x.event_id] = x - room_version = await self.store.get_room_version_id(room_id) - state_map = ( - await self._state_resolution_handler.resolve_events_with_store( + room_version = await self.store.get_room_version_id(room_id) + state_map = await self._state_resolution_handler.resolve_events_with_store( room_id, room_version, state_maps, event_map, state_res_store=StateResolutionStore(self.store), ) - ) - # We need to give _process_received_pdu the actual state events - # rather than event ids, so generate that now. + # We need to give _process_received_pdu the actual state events + # rather than event ids, so generate that now. - # First though we need to fetch all the events that are in - # state_map, so we can build up the state below. - evs = await self.store.get_events( - list(state_map.values()), - get_prev_content=False, - redact_behaviour=EventRedactBehaviour.AS_IS, - ) - event_map.update(evs) + # First though we need to fetch all the events that are in + # state_map, so we can build up the state below. + evs = await self.store.get_events( + list(state_map.values()), + get_prev_content=False, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + event_map.update(evs) - state = [event_map[e] for e in state_map.values()] - except Exception: - logger.warning( - "Error attempting to resolve state at missing " "prev_events", - exc_info=True, - ) - raise FederationError( - "ERROR", - 403, - "We can't get valid state history.", - affected=event_id, - ) + state = [event_map[e] for e in state_map.values()] + except Exception: + logger.warning( + "Error attempting to resolve state at missing " + "prev_events", + exc_info=True, + ) + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=event_id, + ) # A second round of checks for all events. Check that the event passes auth # based on `auth_events`, this allows us to assert that the event would -- cgit 1.5.1 From 50af1efe4be8c93ee1fd642b60ab66d32317827b Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 19 Aug 2021 17:31:40 +0100 Subject: Extract `_resolve_state_at_missing_prevs` (#10624) This is a follow-up to #10615: it takes the code that constructs the state at a backwards extremity, and extracts it to a separate method. --- changelog.d/10624.misc | 1 + synapse/handlers/federation.py | 229 ++++++++++++++++++++++------------------- 2 files changed, 125 insertions(+), 105 deletions(-) create mode 100644 changelog.d/10624.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10624.misc b/changelog.d/10624.misc new file mode 100644 index 0000000000..9a765435db --- /dev/null +++ b/changelog.d/10624.misc @@ -0,0 +1 @@ +Clean up some of the federation event authentication code for clarity. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 529d025c39..de86918b7b 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -285,12 +285,12 @@ class FederationHandler(BaseHandler): # - Fetching any missing prev events to fill in gaps in the graph # - Fetching state if we have a hole in the graph if not pdu.internal_metadata.is_outlier(): - prevs = set(pdu.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen + if sent_to_us_directly: + prevs = set(pdu.prev_event_ids()) + seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen - if missing_prevs: - if sent_to_us_directly: + if missing_prevs: # We only backfill backwards to the min depth. min_depth = await self.get_min_depth_for_context(pdu.room_id) logger.debug("min_depth: %d", min_depth) @@ -351,106 +351,8 @@ class FederationHandler(BaseHandler): affected=pdu.event_id, ) - else: - # We don't have all of the prev_events for this event. - # - # In this case, we need to fall back to asking another server in the - # federation for the state at this event. That's ok provided we then - # resolve the state against other bits of the DAG before using it (which - # will ensure that you can't just take over a room by sending an event, - # withholding its prev_events, and declaring yourself to be an admin in - # the subsequent state request). - # - # Since we're pulling this event as a missing prev_event, then clearly - # this event is not going to become the only forward-extremity and we are - # guaranteed to resolve its state against our existing forward - # extremities, so that should be fine. - # - # XXX this really feels like it could/should be merged with the above, - # but there is an interaction with min_depth that I'm not really - # following. - logger.info( - "Event %s is missing prev_events %s: calculating state for a " - "backwards extremity", - event_id, - shortstr(missing_prevs), - ) - - # Calculate the state after each of the previous events, and - # resolve them to find the correct state at the current event. - event_map = {event_id: pdu} - try: - # Get the state of the events we know about - ours = await self.state_store.get_state_groups_ids( - room_id, seen - ) - - # state_maps is a list of mappings from (type, state_key) to event_id - state_maps: List[StateMap[str]] = list(ours.values()) - - # we don't need this any more, let's delete it. - del ours - - # Ask the remote server for the states we don't - # know about - for p in missing_prevs: - logger.info( - "Requesting state after missing prev_event %s", p - ) - - with nested_logging_context(p): - # note that if any of the missing prevs share missing state or - # auth events, the requests to fetch those events are deduped - # by the get_pdu_cache in federation_client. - remote_state = ( - await self._get_state_after_missing_prev_event( - origin, room_id, p - ) - ) - - remote_state_map = { - (x.type, x.state_key): x.event_id - for x in remote_state - } - state_maps.append(remote_state_map) - - for x in remote_state: - event_map[x.event_id] = x - - room_version = await self.store.get_room_version_id(room_id) - state_map = await self._state_resolution_handler.resolve_events_with_store( - room_id, - room_version, - state_maps, - event_map, - state_res_store=StateResolutionStore(self.store), - ) - - # We need to give _process_received_pdu the actual state events - # rather than event ids, so generate that now. - - # First though we need to fetch all the events that are in - # state_map, so we can build up the state below. - evs = await self.store.get_events( - list(state_map.values()), - get_prev_content=False, - redact_behaviour=EventRedactBehaviour.AS_IS, - ) - event_map.update(evs) - - state = [event_map[e] for e in state_map.values()] - except Exception: - logger.warning( - "Error attempting to resolve state at missing " - "prev_events", - exc_info=True, - ) - raise FederationError( - "ERROR", - 403, - "We can't get valid state history.", - affected=event_id, - ) + else: + state = await self._resolve_state_at_missing_prevs(origin, pdu) # A second round of checks for all events. Check that the event passes auth # based on `auth_events`, this allows us to assert that the event would @@ -1493,6 +1395,123 @@ class FederationHandler(BaseHandler): event_infos, ) + async def _resolve_state_at_missing_prevs( + self, dest: str, event: EventBase + ) -> Optional[Iterable[EventBase]]: + """Calculate the state at an event with missing prev_events. + + This is used when we have pulled a batch of events from a remote server, and + still don't have all the prev_events. + + If we already have all the prev_events for `event`, this method does nothing. + + Otherwise, the missing prevs become new backwards extremities, and we fall back + to asking the remote server for the state after each missing `prev_event`, + and resolving across them. + + That's ok provided we then resolve the state against other bits of the DAG + before using it - in other words, that the received event `event` is not going + to become the only forwards_extremity in the room (which will ensure that you + can't just take over a room by sending an event, withholding its prev_events, + and declaring yourself to be an admin in the subsequent state request). + + In other words: we should only call this method if `event` has been *pulled* + as part of a batch of missing prev events, or similar. + + Params: + dest: the remote server to ask for state at the missing prevs. Typically, + this will be the server we got `event` from. + event: an event to check for missing prevs. + + Returns: + if we already had all the prev events, `None`. Otherwise, returns a list of + the events in the state at `event`. + """ + room_id = event.room_id + event_id = event.event_id + + prevs = set(event.prev_event_ids()) + seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if not missing_prevs: + return None + + logger.info( + "Event %s is missing prev_events %s: calculating state for a " + "backwards extremity", + event_id, + shortstr(missing_prevs), + ) + # Calculate the state after each of the previous events, and + # resolve them to find the correct state at the current event. + event_map = {event_id: event} + try: + # Get the state of the events we know about + ours = await self.state_store.get_state_groups_ids(room_id, seen) + + # state_maps is a list of mappings from (type, state_key) to event_id + state_maps: List[StateMap[str]] = list(ours.values()) + + # we don't need this any more, let's delete it. + del ours + + # Ask the remote server for the states we don't + # know about + for p in missing_prevs: + logger.info("Requesting state after missing prev_event %s", p) + + with nested_logging_context(p): + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. + remote_state = await self._get_state_after_missing_prev_event( + dest, room_id, p + ) + + remote_state_map = { + (x.type, x.state_key): x.event_id for x in remote_state + } + state_maps.append(remote_state_map) + + for x in remote_state: + event_map[x.event_id] = x + + room_version = await self.store.get_room_version_id(room_id) + state_map = await self._state_resolution_handler.resolve_events_with_store( + room_id, + room_version, + state_maps, + event_map, + state_res_store=StateResolutionStore(self.store), + ) + + # We need to give _process_received_pdu the actual state events + # rather than event ids, so generate that now. + + # First though we need to fetch all the events that are in + # state_map, so we can build up the state below. + evs = await self.store.get_events( + list(state_map.values()), + get_prev_content=False, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + event_map.update(evs) + + state = [event_map[e] for e in state_map.values()] + except Exception: + logger.warning( + "Error attempting to resolve state at missing prev_events", + exc_info=True, + ) + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=event_id, + ) + return state + def _sanity_check_event(self, ev: EventBase) -> None: """ Do some early sanity checks of a received event -- cgit 1.5.1 From e81d62009ee091d69d56bca702ba0edd39becb1c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 19 Aug 2021 18:05:12 +0100 Subject: Split `on_receive_pdu` in half (#10640) Here we split on_receive_pdu into two functions (on_receive_pdu and process_pulled_event), rather than having both cases in the same method. There's a tiny bit of overlap, but not that much. --- changelog.d/10640.misc | 1 + synapse/federation/federation_server.py | 4 +- synapse/handlers/federation.py | 236 +++++++++++++++++++------------- tests/test_federation.py | 10 +- 4 files changed, 142 insertions(+), 109 deletions(-) create mode 100644 changelog.d/10640.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10640.misc b/changelog.d/10640.misc new file mode 100644 index 0000000000..9a765435db --- /dev/null +++ b/changelog.d/10640.misc @@ -0,0 +1 @@ +Clean up some of the federation event authentication code for clarity. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index afd8f8580a..e1b58d40c5 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -1005,9 +1005,7 @@ class FederationServer(FederationBase): async with lock: logger.info("handling received PDU: %s", event) try: - await self.handler.on_receive_pdu( - origin, event, sent_to_us_directly=True - ) + await self.handler.on_receive_pdu(origin, event) except FederationError as e: # XXX: Ideally we'd inform the remote we failed to process # the event, but we can't return an error in the transaction diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index de86918b7b..246df43501 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -203,18 +203,13 @@ class FederationHandler(BaseHandler): self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages - async def on_receive_pdu( - self, origin: str, pdu: EventBase, sent_to_us_directly: bool = False - ) -> None: - """Process a PDU received via a federation /send/ transaction, or - via backfill of missing prev_events + async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: + """Process a PDU received via a federation /send/ transaction Args: origin: server which initiated the /send/ transaction. Will be used to fetch missing events or state. pdu: received PDU - sent_to_us_directly: True if this event was pushed to us; False if - we pulled it as the result of a missing prev_event. """ room_id = pdu.room_id @@ -276,8 +271,6 @@ class FederationHandler(BaseHandler): ) return None - state = None - # Check that the event passes auth based on the state at the event. This is # done for events that are to be added to the timeline (non-outliers). # @@ -285,83 +278,72 @@ class FederationHandler(BaseHandler): # - Fetching any missing prev events to fill in gaps in the graph # - Fetching state if we have a hole in the graph if not pdu.internal_metadata.is_outlier(): - if sent_to_us_directly: - prevs = set(pdu.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen - - if missing_prevs: - # We only backfill backwards to the min depth. - min_depth = await self.get_min_depth_for_context(pdu.room_id) - logger.debug("min_depth: %d", min_depth) - - if min_depth is not None and pdu.depth > min_depth: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. + prevs = set(pdu.prev_event_ids()) + seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if missing_prevs: + # We only backfill backwards to the min depth. + min_depth = await self.get_min_depth_for_context(pdu.room_id) + logger.debug("min_depth: %d", min_depth) + + if min_depth is not None and pdu.depth > min_depth: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + logger.info( + "Acquiring room lock to fetch %d missing prev_events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + with (await self._room_pdu_linearizer.queue(pdu.room_id)): logger.info( - "Acquiring room lock to fetch %d missing prev_events: %s", + "Acquired room lock to fetch %d missing prev_events", len(missing_prevs), - shortstr(missing_prevs), ) - with (await self._room_pdu_linearizer.queue(pdu.room_id)): - logger.info( - "Acquired room lock to fetch %d missing prev_events", - len(missing_prevs), + + try: + await self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth ) + except Exception as e: + raise Exception( + "Error fetching missing prev_events for %s: %s" + % (event_id, e) + ) from e - try: - await self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth - ) - except Exception as e: - raise Exception( - "Error fetching missing prev_events for %s: %s" - % (event_id, e) - ) from e - - # Update the set of things we've seen after trying to - # fetch the missing stuff - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen - - if not missing_prevs: - logger.info("Found all missing prev_events") - - if missing_prevs: - # since this event was pushed to us, it is possible for it to - # become the only forward-extremity in the room, and we would then - # trust its state to be the state for the whole room. This is very - # bad. Further, if the event was pushed to us, there is no excuse - # for us not to have all the prev_events. (XXX: apart from - # min_depth?) - # - # We therefore reject any such events. - logger.warning( - "Rejecting: failed to fetch %d prev events: %s", - len(missing_prevs), - shortstr(missing_prevs), - ) - raise FederationError( - "ERROR", - 403, - ( - "Your server isn't divulging details about prev_events " - "referenced in this event." - ), - affected=pdu.event_id, - ) + # Update the set of things we've seen after trying to + # fetch the missing stuff + seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen - else: - state = await self._resolve_state_at_missing_prevs(origin, pdu) + if not missing_prevs: + logger.info("Found all missing prev_events") + + if missing_prevs: + # since this event was pushed to us, it is possible for it to + # become the only forward-extremity in the room, and we would then + # trust its state to be the state for the whole room. This is very + # bad. Further, if the event was pushed to us, there is no excuse + # for us not to have all the prev_events. (XXX: apart from + # min_depth?) + # + # We therefore reject any such events. + logger.warning( + "Rejecting: failed to fetch %d prev events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, + ) - # A second round of checks for all events. Check that the event passes auth - # based on `auth_events`, this allows us to assert that the event would - # have been allowed at some point. If an event passes this check its OK - # for it to be used as part of a returned `/state` request, as either - # a) we received the event as part of the original join and so trust it, or - # b) we'll do a state resolution with existing state before it becomes - # part of the "current state", which adds more protection. - await self._process_received_pdu(origin, pdu, state=state) + await self._process_received_pdu(origin, pdu, state=None) async def _get_missing_events_for_pdu( self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int @@ -461,24 +443,7 @@ class FederationHandler(BaseHandler): return logger.info("Got %d prev_events", len(missing_events)) - - # We want to sort these by depth so we process them and - # tell clients about them in order. - missing_events.sort(key=lambda x: x.depth) - - for ev in missing_events: - logger.info("Handling received prev_event %s", ev) - with nested_logging_context(ev.event_id): - try: - await self.on_receive_pdu(origin, ev, sent_to_us_directly=False) - except FederationError as e: - if e.code == 403: - logger.warning( - "Received prev_event %s failed history check.", - ev.event_id, - ) - else: - raise + await self._process_pulled_events(origin, missing_events) async def _get_state_for_room( self, @@ -1395,6 +1360,81 @@ class FederationHandler(BaseHandler): event_infos, ) + async def _process_pulled_events( + self, origin: str, events: Iterable[EventBase] + ) -> None: + """Process a batch of events we have pulled from a remote server + + Pulls in any events required to auth the events, persists the received events, + and notifies clients, if appropriate. + + Assumes the events have already had their signatures and hashes checked. + + Params: + origin: The server we received these events from + events: The received events. + """ + + # We want to sort these by depth so we process them and + # tell clients about them in order. + sorted_events = sorted(events, key=lambda x: x.depth) + + for ev in sorted_events: + with nested_logging_context(ev.event_id): + await self._process_pulled_event(origin, ev) + + async def _process_pulled_event(self, origin: str, event: EventBase) -> None: + """Process a single event that we have pulled from a remote server + + Pulls in any events required to auth the event, persists the received event, + and notifies clients, if appropriate. + + Assumes the event has already had its signatures and hashes checked. + + This is somewhat equivalent to on_receive_pdu, but applies somewhat different + logic in the case that we are missing prev_events (in particular, it just + requests the state at that point, rather than triggering a get_missing_events) - + so is appropriate when we have pulled the event from a remote server, rather + than having it pushed to us. + + Params: + origin: The server we received this event from + events: The received event + """ + logger.info("Processing pulled event %s", event) + + # these should not be outliers. + assert not event.internal_metadata.is_outlier() + + event_id = event.event_id + + existing = await self.store.get_event( + event_id, allow_none=True, allow_rejected=True + ) + if existing: + if not existing.internal_metadata.is_outlier(): + logger.info( + "Ignoring received event %s which we have already seen", + event_id, + ) + return + logger.info("De-outliering event %s", event_id) + + try: + self._sanity_check_event(event) + except SynapseError as err: + logger.warning("Event %s failed sanity check: %s", event_id, err) + return + + try: + state = await self._resolve_state_at_missing_prevs(origin, event) + await self._process_received_pdu(origin, event, state=state) + except FederationError as e: + if e.code == 403: + logger.warning("Pulled event %s failed history check.", event_id) + else: + raise + async def _resolve_state_at_missing_prevs( self, dest: str, event: EventBase ) -> Optional[Iterable[EventBase]]: @@ -1780,7 +1820,7 @@ class FederationHandler(BaseHandler): p, ) with nested_logging_context(p.event_id): - await self.on_receive_pdu(origin, p, sent_to_us_directly=True) + await self.on_receive_pdu(origin, p) except Exception as e: logger.warning( "Error handling queued PDU %s from %s: %s", p.event_id, origin, e diff --git a/tests/test_federation.py b/tests/test_federation.py index 3785799f46..348fcb72a7 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -85,11 +85,7 @@ class MessageAcceptTests(unittest.HomeserverTestCase): # Send the join, it should return None (which is not an error) self.assertEqual( - self.get_success( - self.handler.on_receive_pdu( - "test.serv", join_event, sent_to_us_directly=True - ) - ), + self.get_success(self.handler.on_receive_pdu("test.serv", join_event)), None, ) @@ -135,9 +131,7 @@ class MessageAcceptTests(unittest.HomeserverTestCase): with LoggingContext("test-context"): failure = self.get_failure( - self.handler.on_receive_pdu( - "test.serv", lying_event, sent_to_us_directly=True - ), + self.handler.on_receive_pdu("test.serv", lying_event), FederationError, ) -- cgit 1.5.1 From 96715d763362a7027c39d571cfde3aa8b7b82fcf Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 26 Aug 2021 18:34:57 +0100 Subject: Make `backfill` and `get_missing_events` use the same codepath (#10645) Given that backfill and get_missing_events are basically the same thing, it's somewhat crazy that we have entirely separate code paths for them. This makes backfill use the existing get_missing_events code, and then clears up all the unused code. --- changelog.d/10645.misc | 1 + synapse/handlers/federation.py | 273 ++++--------------------- synapse/storage/databases/main/purge_events.py | 1 + 3 files changed, 42 insertions(+), 233 deletions(-) create mode 100644 changelog.d/10645.misc (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10645.misc b/changelog.d/10645.misc new file mode 100644 index 0000000000..ac19263cd8 --- /dev/null +++ b/changelog.d/10645.misc @@ -0,0 +1 @@ +Make `backfill` and `get_missing_events` use the same codepath. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 246df43501..6fa2fc8f52 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -65,6 +65,7 @@ from synapse.event_auth import auth_types_for_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.events.validator import EventValidator +from synapse.federation.federation_client import InvalidResponseError from synapse.handlers._base import BaseHandler from synapse.http.servlet import assert_params_in_dict from synapse.logging.context import ( @@ -116,10 +117,6 @@ class _NewEventInfo: Attributes: event: the received event - state: the state at that event, according to /state_ids from a remote - homeserver. Only populated for backfilled events which are going to be a - new backwards extremity. - claimed_auth_event_map: a map of (type, state_key) => event for the event's claimed auth_events. @@ -134,7 +131,6 @@ class _NewEventInfo: """ event: EventBase - state: Optional[Sequence[EventBase]] claimed_auth_event_map: StateMap[EventBase] @@ -443,113 +439,7 @@ class FederationHandler(BaseHandler): return logger.info("Got %d prev_events", len(missing_events)) - await self._process_pulled_events(origin, missing_events) - - async def _get_state_for_room( - self, - destination: str, - room_id: str, - event_id: str, - ) -> List[EventBase]: - """Requests all of the room state at a given event from a remote - homeserver. - - Will also fetch any missing events reported in the `auth_chain_ids` - section of `/state_ids`. - - Args: - destination: The remote homeserver to query for the state. - room_id: The id of the room we're interested in. - event_id: The id of the event we want the state at. - - Returns: - A list of events in the state, not including the event itself. - """ - ( - state_event_ids, - auth_event_ids, - ) = await self.federation_client.get_room_state_ids( - destination, room_id, event_id=event_id - ) - - # Fetch the state events from the DB, and check we have the auth events. - event_map = await self.store.get_events(state_event_ids, allow_rejected=True) - auth_events_in_store = await self.store.have_seen_events( - room_id, auth_event_ids - ) - - # Check for missing events. We handle state and auth event seperately, - # as we want to pull the state from the DB, but we don't for the auth - # events. (Note: we likely won't use the majority of the auth chain, and - # it can be *huge* for large rooms, so it's worth ensuring that we don't - # unnecessarily pull it from the DB). - missing_state_events = set(state_event_ids) - set(event_map) - missing_auth_events = set(auth_event_ids) - set(auth_events_in_store) - if missing_state_events or missing_auth_events: - await self._get_events_and_persist( - destination=destination, - room_id=room_id, - events=missing_state_events | missing_auth_events, - ) - - if missing_state_events: - new_events = await self.store.get_events( - missing_state_events, allow_rejected=True - ) - event_map.update(new_events) - - missing_state_events.difference_update(new_events) - - if missing_state_events: - logger.warning( - "Failed to fetch missing state events for %s %s", - event_id, - missing_state_events, - ) - - if missing_auth_events: - auth_events_in_store = await self.store.have_seen_events( - room_id, missing_auth_events - ) - missing_auth_events.difference_update(auth_events_in_store) - - if missing_auth_events: - logger.warning( - "Failed to fetch missing auth events for %s %s", - event_id, - missing_auth_events, - ) - - remote_state = list(event_map.values()) - - # check for events which were in the wrong room. - # - # this can happen if a remote server claims that the state or - # auth_events at an event in room A are actually events in room B - - bad_events = [ - (event.event_id, event.room_id) - for event in remote_state - if event.room_id != room_id - ] - - for bad_event_id, bad_room_id in bad_events: - # This is a bogus situation, but since we may only discover it a long time - # after it happened, we try our best to carry on, by just omitting the - # bad events from the returned auth/state set. - logger.warning( - "Remote server %s claims event %s in room %s is an auth/state " - "event in room %s", - destination, - bad_event_id, - bad_room_id, - room_id, - ) - - if bad_events: - remote_state = [e for e in remote_state if e.room_id == room_id] - - return remote_state + await self._process_pulled_events(origin, missing_events, backfilled=False) async def _get_state_after_missing_prev_event( self, @@ -567,10 +457,6 @@ class FederationHandler(BaseHandler): Returns: A list of events in the state, including the event itself """ - # TODO: This function is basically the same as _get_state_for_room. Can - # we make backfill() use it, rather than having two code paths? I think the - # only difference is that backfill() persists the prev events separately. - ( state_event_ids, auth_event_ids, @@ -681,6 +567,7 @@ class FederationHandler(BaseHandler): origin: str, event: EventBase, state: Optional[Iterable[EventBase]], + backfilled: bool = False, ) -> None: """Called when we have a new pdu. We need to do auth checks and put it through the StateHandler. @@ -693,6 +580,9 @@ class FederationHandler(BaseHandler): state: Normally None, but if we are handling a gap in the graph (ie, we are missing one or more prev_events), the resolved state at the event + + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) """ logger.debug("Processing event: %s", event) @@ -700,10 +590,15 @@ class FederationHandler(BaseHandler): context = await self.state_handler.compute_event_context( event, old_state=state ) - await self._auth_and_persist_event(origin, event, context, state=state) + await self._auth_and_persist_event( + origin, event, context, state=state, backfilled=backfilled + ) except AuthError as e: raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) + if backfilled: + return + # For encrypted messages we check that we know about the sending device, # if we don't then we mark the device cache for that user as stale. if event.type == EventTypes.Encrypted: @@ -868,7 +763,7 @@ class FederationHandler(BaseHandler): @log_function async def backfill( self, dest: str, room_id: str, limit: int, extremities: List[str] - ) -> List[EventBase]: + ) -> None: """Trigger a backfill request to `dest` for the given `room_id` This will attempt to get more events from the remote. If the other side @@ -878,6 +773,9 @@ class FederationHandler(BaseHandler): sanity-checking on them. If any of the backfilled events are invalid, this method throws a SynapseError. + We might also raise an InvalidResponseError if the response from the remote + server is just bogus. + TODO: make this more useful to distinguish failures of the remote server from invalid events (there is probably no point in trying to re-fetch invalid events from every other HS in the room.) @@ -890,111 +788,18 @@ class FederationHandler(BaseHandler): ) if not events: - return [] - - # ideally we'd sanity check the events here for excess prev_events etc, - # but it's hard to reject events at this point without completely - # breaking backfill in the same way that it is currently broken by - # events whose signature we cannot verify (#3121). - # - # So for now we accept the events anyway. #3124 tracks this. - # - # for ev in events: - # self._sanity_check_event(ev) - - # Don't bother processing events we already have. - seen_events = await self.store.have_events_in_timeline( - {e.event_id for e in events} - ) - - events = [e for e in events if e.event_id not in seen_events] - - if not events: - return [] - - event_map = {e.event_id: e for e in events} - - event_ids = {e.event_id for e in events} - - # build a list of events whose prev_events weren't in the batch. - # (XXX: this will include events whose prev_events we already have; that doesn't - # sound right?) - edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids] - - logger.info("backfill: Got %d events with %d edges", len(events), len(edges)) - - # For each edge get the current state. - - state_events = {} - events_to_state = {} - for e_id in edges: - state = await self._get_state_for_room( - destination=dest, - room_id=room_id, - event_id=e_id, - ) - state_events.update({s.event_id: s for s in state}) - events_to_state[e_id] = state + return - required_auth = { - a_id - for event in events + list(state_events.values()) - for a_id in event.auth_event_ids() - } - auth_events = await self.store.get_events(required_auth, allow_rejected=True) - auth_events.update( - {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map} - ) - - ev_infos = [] - - # Step 1: persist the events in the chunk we fetched state for (i.e. - # the backwards extremities), with custom auth events and state - for e_id in events_to_state: - # For paranoia we ensure that these events are marked as - # non-outliers - ev = event_map[e_id] - assert not ev.internal_metadata.is_outlier() - - ev_infos.append( - _NewEventInfo( - event=ev, - state=events_to_state[e_id], - claimed_auth_event_map={ - ( - auth_events[a_id].type, - auth_events[a_id].state_key, - ): auth_events[a_id] - for a_id in ev.auth_event_ids() - if a_id in auth_events - }, + # if there are any events in the wrong room, the remote server is buggy and + # should not be trusted. + for ev in events: + if ev.room_id != room_id: + raise InvalidResponseError( + f"Remote server {dest} returned event {ev.event_id} which is in " + f"room {ev.room_id}, when we were backfilling in {room_id}" ) - ) - - if ev_infos: - await self._auth_and_persist_events( - dest, room_id, ev_infos, backfilled=True - ) - - # Step 2: Persist the rest of the events in the chunk one by one - events.sort(key=lambda e: e.depth) - - for event in events: - if event in events_to_state: - continue - - # For paranoia we ensure that these events are marked as - # non-outliers - assert not event.internal_metadata.is_outlier() - - context = await self.state_handler.compute_event_context(event) - - # We store these one at a time since each event depends on the - # previous to work out the state. - # TODO: We can probably do something more clever here. - await self._auth_and_persist_event(dest, event, context, backfilled=True) - return events + await self._process_pulled_events(dest, events, backfilled=True) async def maybe_backfill( self, room_id: str, current_depth: int, limit: int @@ -1197,7 +1002,7 @@ class FederationHandler(BaseHandler): # appropriate stuff. # TODO: We can probably do something more intelligent here. return True - except SynapseError as e: + except (SynapseError, InvalidResponseError) as e: logger.info("Failed to backfill from %s because %s", dom, e) continue except HttpResponseException as e: @@ -1351,7 +1156,7 @@ class FederationHandler(BaseHandler): else: logger.info("Missing auth event %s", auth_event_id) - event_infos.append(_NewEventInfo(event, None, auth)) + event_infos.append(_NewEventInfo(event, auth)) if event_infos: await self._auth_and_persist_events( @@ -1361,7 +1166,7 @@ class FederationHandler(BaseHandler): ) async def _process_pulled_events( - self, origin: str, events: Iterable[EventBase] + self, origin: str, events: Iterable[EventBase], backfilled: bool ) -> None: """Process a batch of events we have pulled from a remote server @@ -1373,6 +1178,8 @@ class FederationHandler(BaseHandler): Params: origin: The server we received these events from events: The received events. + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) """ # We want to sort these by depth so we process them and @@ -1381,9 +1188,11 @@ class FederationHandler(BaseHandler): for ev in sorted_events: with nested_logging_context(ev.event_id): - await self._process_pulled_event(origin, ev) + await self._process_pulled_event(origin, ev, backfilled=backfilled) - async def _process_pulled_event(self, origin: str, event: EventBase) -> None: + async def _process_pulled_event( + self, origin: str, event: EventBase, backfilled: bool + ) -> None: """Process a single event that we have pulled from a remote server Pulls in any events required to auth the event, persists the received event, @@ -1400,6 +1209,8 @@ class FederationHandler(BaseHandler): Params: origin: The server we received this event from events: The received event + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) """ logger.info("Processing pulled event %s", event) @@ -1428,7 +1239,9 @@ class FederationHandler(BaseHandler): try: state = await self._resolve_state_at_missing_prevs(origin, event) - await self._process_received_pdu(origin, event, state=state) + await self._process_received_pdu( + origin, event, state=state, backfilled=backfilled + ) except FederationError as e: if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) @@ -2451,7 +2264,6 @@ class FederationHandler(BaseHandler): origin: str, room_id: str, event_infos: Collection[_NewEventInfo], - backfilled: bool = False, ) -> None: """Creates the appropriate contexts and persists events. The events should not depend on one another, e.g. this should be used to persist @@ -2467,16 +2279,12 @@ class FederationHandler(BaseHandler): async def prep(ev_info: _NewEventInfo): event = ev_info.event with nested_logging_context(suffix=event.event_id): - res = await self.state_handler.compute_event_context( - event, old_state=ev_info.state - ) + res = await self.state_handler.compute_event_context(event) res = await self._check_event_auth( origin, event, res, - state=ev_info.state, claimed_auth_event_map=ev_info.claimed_auth_event_map, - backfilled=backfilled, ) return res @@ -2493,7 +2301,6 @@ class FederationHandler(BaseHandler): (ev_info.event, context) for ev_info, context in zip(event_infos, contexts) ], - backfilled=backfilled, ) async def _persist_auth_tree( diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 664c65dac5..bccff5e5b9 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -295,6 +295,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): self._invalidate_cache_and_stream( txn, self.have_seen_event, (room_id, event_id) ) + self._invalidate_get_event_cache(event_id) logger.info("[purge] done") -- cgit 1.5.1 From 1800aabfc226938036479d2ab1a750aa34cf3974 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 26 Aug 2021 21:41:44 +0100 Subject: Split `FederationHandler` in half (#10692) The idea here is to take anything to do with incoming events and move it out to a separate handler, as a way of making FederationHandler smaller. --- changelog.d/10692.misc | 1 + synapse/federation/federation_server.py | 7 +- synapse/handlers/federation.py | 1789 +------------------- synapse/handlers/federation_event.py | 1825 +++++++++++++++++++++ synapse/replication/http/federation.py | 4 +- synapse/server.py | 5 + tests/federation/transport/test_knocking.py | 2 +- tests/handlers/test_federation.py | 16 +- tests/handlers/test_presence.py | 4 +- tests/replication/test_federation_sender_shard.py | 2 +- tests/test_federation.py | 10 +- 11 files changed, 1884 insertions(+), 1781 deletions(-) create mode 100644 changelog.d/10692.misc create mode 100644 synapse/handlers/federation_event.py (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10692.misc b/changelog.d/10692.misc new file mode 100644 index 0000000000..a1b0def76b --- /dev/null +++ b/changelog.d/10692.misc @@ -0,0 +1 @@ +Split the event-processing methods in `FederationHandler` into a separate `FederationEventHandler`. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index e1b58d40c5..214ee948fa 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -110,6 +110,7 @@ class FederationServer(FederationBase): super().__init__(hs) self.handler = hs.get_federation_handler() + self._federation_event_handler = hs.get_federation_event_handler() self.state = hs.get_state_handler() self._event_auth_handler = hs.get_event_auth_handler() @@ -787,7 +788,9 @@ class FederationServer(FederationBase): event = await self._check_sigs_and_hash(room_version, event) - return await self.handler.on_send_membership_event(origin, event) + return await self._federation_event_handler.on_send_membership_event( + origin, event + ) async def on_event_auth( self, origin: str, room_id: str, event_id: str @@ -1005,7 +1008,7 @@ class FederationServer(FederationBase): async with lock: logger.info("handling received PDU: %s", event) try: - await self.handler.on_receive_pdu(origin, event) + await self._federation_event_handler.on_receive_pdu(origin, event) except FederationError as e: # XXX: Ideally we'd inform the remote we failed to process # the event, but we can't return an error in the transaction diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 6fa2fc8f52..daf1d3bfb3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -17,23 +17,9 @@ import itertools import logging -from collections.abc import Container from http import HTTPStatus -from typing import ( - TYPE_CHECKING, - Collection, - Dict, - Iterable, - List, - Optional, - Sequence, - Set, - Tuple, - Union, -) +from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union -import attr -from prometheus_client import Counter from signedjson.key import decode_verify_key_bytes from signedjson.sign import verify_signed_json from unpaddedbase64 import decode_base64 @@ -41,19 +27,12 @@ from unpaddedbase64 import decode_base64 from twisted.internet import defer from synapse import event_auth -from synapse.api.constants import ( - EventContentFields, - EventTypes, - Membership, - RejectedReason, - RoomEncryptionAlgorithms, -) +from synapse.api.constants import EventTypes, Membership, RejectedReason from synapse.api.errors import ( AuthError, CodeMessageException, Codes, FederationDeniedError, - FederationError, HttpResponseException, NotFoundError, RequestSendFailed, @@ -61,7 +40,6 @@ from synapse.api.errors import ( ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions from synapse.crypto.event_signing import compute_event_signature -from synapse.event_auth import auth_types_for_event from synapse.events import EventBase from synapse.events.snapshot import EventContext from synapse.events.validator import EventValidator @@ -75,28 +53,14 @@ from synapse.logging.context import ( run_in_background, ) from synapse.logging.utils import log_function -from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, - ReplicationFederationSendEventsRestServlet, ReplicationStoreRoomOnOutlierMembershipRestServlet, ) -from synapse.state import StateResolutionStore from synapse.storage.databases.main.events_worker import EventRedactBehaviour -from synapse.types import ( - JsonDict, - MutableStateMap, - PersistedEventPosition, - RoomStreamToken, - StateMap, - UserID, - get_domain_from_id, -) -from synapse.util.async_helpers import Linearizer, concurrently_execute -from synapse.util.iterutils import batch_iter +from synapse.types import JsonDict, StateMap, get_domain_from_id +from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination -from synapse.util.stringutils import shortstr from synapse.visibility import filter_events_for_server if TYPE_CHECKING: @@ -104,45 +68,11 @@ if TYPE_CHECKING: logger = logging.getLogger(__name__) -soft_failed_event_counter = Counter( - "synapse_federation_soft_failed_events_total", - "Events received over federation that we marked as soft_failed", -) - - -@attr.s(slots=True, frozen=True, auto_attribs=True) -class _NewEventInfo: - """Holds information about a received event, ready for passing to _auth_and_persist_events - - Attributes: - event: the received event - - claimed_auth_event_map: a map of (type, state_key) => event for the event's - claimed auth_events. - - This can include events which have not yet been persisted, in the case that - we are backfilling a batch of events. - - Note: May be incomplete: if we were unable to find all of the claimed auth - events. Also, treat the contents with caution: the events might also have - been rejected, might not yet have been authorized themselves, or they might - be in the wrong room. - - """ - - event: EventBase - claimed_auth_event_map: StateMap[EventBase] - class FederationHandler(BaseHandler): - """Handles events that originated from federation. - Responsible for: - a) handling received Pdus before handing them on as Events to the rest - of the homeserver (including auth and state conflict resolutions) - b) converting events that were produced by local clients that may need - to be sent to remote homeservers. - c) doing the necessary dances to invite remote users and join remote - rooms. + """Handles general incoming federation requests + + Incoming events are *not* handled here, for which see FederationEventHandler. """ def __init__(self, hs: "HomeServer"): @@ -155,652 +85,35 @@ class FederationHandler(BaseHandler): self.state_store = self.storage.state self.federation_client = hs.get_federation_client() self.state_handler = hs.get_state_handler() - self._state_resolution_handler = hs.get_state_resolution_handler() self.server_name = hs.hostname self.keyring = hs.get_keyring() - self.action_generator = hs.get_action_generator() self.is_mine_id = hs.is_mine_id self.spam_checker = hs.get_spam_checker() self.event_creation_handler = hs.get_event_creation_handler() self._event_auth_handler = hs.get_event_auth_handler() - self._message_handler = hs.get_message_handler() self._server_notices_mxid = hs.config.server_notices_mxid self.config = hs.config self.http_client = hs.get_proxied_blacklisted_http_client() - self._instance_name = hs.get_instance_name() self._replication = hs.get_replication_data_handler() + self._federation_event_handler = hs.get_federation_event_handler() - self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client( hs ) if hs.config.worker_app: - self._user_device_resync = ( - ReplicationUserDevicesResyncRestServlet.make_client(hs) - ) self._maybe_store_room_on_outlier_membership = ( ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs) ) else: - self._device_list_updater = hs.get_device_handler().device_list_updater self._maybe_store_room_on_outlier_membership = ( self.store.maybe_store_room_on_outlier_membership ) - # When joining a room we need to queue any events for that room up. - # For each room, a list of (pdu, origin) tuples. - self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {} - self._room_pdu_linearizer = Linearizer("fed_room_pdu") - self._room_backfill = Linearizer("room_backfill") self.third_party_event_rules = hs.get_third_party_event_rules() - self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages - - async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: - """Process a PDU received via a federation /send/ transaction - - Args: - origin: server which initiated the /send/ transaction. Will - be used to fetch missing events or state. - pdu: received PDU - """ - - room_id = pdu.room_id - event_id = pdu.event_id - - # We reprocess pdus when we have seen them only as outliers - existing = await self.store.get_event( - event_id, allow_none=True, allow_rejected=True - ) - - # FIXME: Currently we fetch an event again when we already have it - # if it has been marked as an outlier. - if existing: - if not existing.internal_metadata.is_outlier(): - logger.info( - "Ignoring received event %s which we have already seen", event_id - ) - return - if pdu.internal_metadata.is_outlier(): - logger.info( - "Ignoring received outlier %s which we already have as an outlier", - event_id, - ) - return - logger.info("De-outliering event %s", event_id) - - # do some initial sanity-checking of the event. In particular, make - # sure it doesn't have hundreds of prev_events or auth_events, which - # could cause a huge state resolution or cascade of event fetches. - try: - self._sanity_check_event(pdu) - except SynapseError as err: - logger.warning("Received event failed sanity checks") - raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id) - - # If we are currently in the process of joining this room, then we - # queue up events for later processing. - if room_id in self.room_queues: - logger.info( - "Queuing PDU from %s for now: join in progress", - origin, - ) - self.room_queues[room_id].append((pdu, origin)) - return - - # If we're not in the room just ditch the event entirely. This is - # probably an old server that has come back and thinks we're still in - # the room (or we've been rejoined to the room by a state reset). - # - # Note that if we were never in the room then we would have already - # dropped the event, since we wouldn't know the room version. - is_in_room = await self._event_auth_handler.check_host_in_room( - room_id, self.server_name - ) - if not is_in_room: - logger.info( - "Ignoring PDU from %s as we're not in the room", - origin, - ) - return None - - # Check that the event passes auth based on the state at the event. This is - # done for events that are to be added to the timeline (non-outliers). - # - # Get missing pdus if necessary: - # - Fetching any missing prev events to fill in gaps in the graph - # - Fetching state if we have a hole in the graph - if not pdu.internal_metadata.is_outlier(): - prevs = set(pdu.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen - - if missing_prevs: - # We only backfill backwards to the min depth. - min_depth = await self.get_min_depth_for_context(pdu.room_id) - logger.debug("min_depth: %d", min_depth) - - if min_depth is not None and pdu.depth > min_depth: - # If we're missing stuff, ensure we only fetch stuff one - # at a time. - logger.info( - "Acquiring room lock to fetch %d missing prev_events: %s", - len(missing_prevs), - shortstr(missing_prevs), - ) - with (await self._room_pdu_linearizer.queue(pdu.room_id)): - logger.info( - "Acquired room lock to fetch %d missing prev_events", - len(missing_prevs), - ) - - try: - await self._get_missing_events_for_pdu( - origin, pdu, prevs, min_depth - ) - except Exception as e: - raise Exception( - "Error fetching missing prev_events for %s: %s" - % (event_id, e) - ) from e - - # Update the set of things we've seen after trying to - # fetch the missing stuff - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen - - if not missing_prevs: - logger.info("Found all missing prev_events") - - if missing_prevs: - # since this event was pushed to us, it is possible for it to - # become the only forward-extremity in the room, and we would then - # trust its state to be the state for the whole room. This is very - # bad. Further, if the event was pushed to us, there is no excuse - # for us not to have all the prev_events. (XXX: apart from - # min_depth?) - # - # We therefore reject any such events. - logger.warning( - "Rejecting: failed to fetch %d prev events: %s", - len(missing_prevs), - shortstr(missing_prevs), - ) - raise FederationError( - "ERROR", - 403, - ( - "Your server isn't divulging details about prev_events " - "referenced in this event." - ), - affected=pdu.event_id, - ) - - await self._process_received_pdu(origin, pdu, state=None) - - async def _get_missing_events_for_pdu( - self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int - ) -> None: - """ - Args: - origin: Origin of the pdu. Will be called to get the missing events - pdu: received pdu - prevs: List of event ids which we are missing - min_depth: Minimum depth of events to return. - """ - - room_id = pdu.room_id - event_id = pdu.event_id - - seen = await self.store.have_events_in_timeline(prevs) - - if not prevs - seen: - return - - latest_list = await self.store.get_latest_event_ids_in_room(room_id) - - # We add the prev events that we have seen to the latest - # list to ensure the remote server doesn't give them to us - latest = set(latest_list) - latest |= seen - - logger.info( - "Requesting missing events between %s and %s", - shortstr(latest), - event_id, - ) - - # XXX: we set timeout to 10s to help workaround - # https://github.com/matrix-org/synapse/issues/1733. - # The reason is to avoid holding the linearizer lock - # whilst processing inbound /send transactions, causing - # FDs to stack up and block other inbound transactions - # which empirically can currently take up to 30 minutes. - # - # N.B. this explicitly disables retry attempts. - # - # N.B. this also increases our chances of falling back to - # fetching fresh state for the room if the missing event - # can't be found, which slightly reduces our security. - # it may also increase our DAG extremity count for the room, - # causing additional state resolution? See #1760. - # However, fetching state doesn't hold the linearizer lock - # apparently. - # - # see https://github.com/matrix-org/synapse/pull/1744 - # - # ---- - # - # Update richvdh 2018/09/18: There are a number of problems with timing this - # request out aggressively on the client side: - # - # - it plays badly with the server-side rate-limiter, which starts tarpitting you - # if you send too many requests at once, so you end up with the server carefully - # working through the backlog of your requests, which you have already timed - # out. - # - # - for this request in particular, we now (as of - # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the - # server can't produce a plausible-looking set of prev_events - so we becone - # much more likely to reject the event. - # - # - contrary to what it says above, we do *not* fall back to fetching fresh state - # for the room if get_missing_events times out. Rather, we give up processing - # the PDU whose prevs we are missing, which then makes it much more likely that - # we'll end up back here for the *next* PDU in the list, which exacerbates the - # problem. - # - # - the aggressive 10s timeout was introduced to deal with incoming federation - # requests taking 8 hours to process. It's not entirely clear why that was going - # on; certainly there were other issues causing traffic storms which are now - # resolved, and I think in any case we may be more sensible about our locking - # now. We're *certainly* more sensible about our logging. - # - # All that said: Let's try increasing the timeout to 60s and see what happens. - - try: - missing_events = await self.federation_client.get_missing_events( - origin, - room_id, - earliest_events_ids=list(latest), - latest_events=[pdu], - limit=10, - min_depth=min_depth, - timeout=60000, - ) - except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e: - # We failed to get the missing events, but since we need to handle - # the case of `get_missing_events` not returning the necessary - # events anyway, it is safe to simply log the error and continue. - logger.warning("Failed to get prev_events: %s", e) - return - - logger.info("Got %d prev_events", len(missing_events)) - await self._process_pulled_events(origin, missing_events, backfilled=False) - - async def _get_state_after_missing_prev_event( - self, - destination: str, - room_id: str, - event_id: str, - ) -> List[EventBase]: - """Requests all of the room state at a given event from a remote homeserver. - - Args: - destination: The remote homeserver to query for the state. - room_id: The id of the room we're interested in. - event_id: The id of the event we want the state at. - - Returns: - A list of events in the state, including the event itself - """ - ( - state_event_ids, - auth_event_ids, - ) = await self.federation_client.get_room_state_ids( - destination, room_id, event_id=event_id - ) - - logger.debug( - "state_ids returned %i state events, %i auth events", - len(state_event_ids), - len(auth_event_ids), - ) - - # start by just trying to fetch the events from the store - desired_events = set(state_event_ids) - desired_events.add(event_id) - logger.debug("Fetching %i events from cache/store", len(desired_events)) - fetched_events = await self.store.get_events( - desired_events, allow_rejected=True - ) - - missing_desired_events = desired_events - fetched_events.keys() - logger.debug( - "We are missing %i events (got %i)", - len(missing_desired_events), - len(fetched_events), - ) - - # We probably won't need most of the auth events, so let's just check which - # we have for now, rather than thrashing the event cache with them all - # unnecessarily. - - # TODO: we probably won't actually need all of the auth events, since we - # already have a bunch of the state events. It would be nice if the - # federation api gave us a way of finding out which we actually need. - - missing_auth_events = set(auth_event_ids) - fetched_events.keys() - missing_auth_events.difference_update( - await self.store.have_seen_events(room_id, missing_auth_events) - ) - logger.debug("We are also missing %i auth events", len(missing_auth_events)) - - missing_events = missing_desired_events | missing_auth_events - logger.debug("Fetching %i events from remote", len(missing_events)) - await self._get_events_and_persist( - destination=destination, room_id=room_id, events=missing_events - ) - - # we need to make sure we re-load from the database to get the rejected - # state correct. - fetched_events.update( - await self.store.get_events(missing_desired_events, allow_rejected=True) - ) - - # check for events which were in the wrong room. - # - # this can happen if a remote server claims that the state or - # auth_events at an event in room A are actually events in room B - - bad_events = [ - (event_id, event.room_id) - for event_id, event in fetched_events.items() - if event.room_id != room_id - ] - - for bad_event_id, bad_room_id in bad_events: - # This is a bogus situation, but since we may only discover it a long time - # after it happened, we try our best to carry on, by just omitting the - # bad events from the returned state set. - logger.warning( - "Remote server %s claims event %s in room %s is an auth/state " - "event in room %s", - destination, - bad_event_id, - bad_room_id, - room_id, - ) - - del fetched_events[bad_event_id] - - # if we couldn't get the prev event in question, that's a problem. - remote_event = fetched_events.get(event_id) - if not remote_event: - raise Exception("Unable to get missing prev_event %s" % (event_id,)) - - # missing state at that event is a warning, not a blocker - # XXX: this doesn't sound right? it means that we'll end up with incomplete - # state. - failed_to_fetch = desired_events - fetched_events.keys() - if failed_to_fetch: - logger.warning( - "Failed to fetch missing state events for %s %s", - event_id, - failed_to_fetch, - ) - - remote_state = [ - fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events - ] - - if remote_event.is_state() and remote_event.rejected_reason is None: - remote_state.append(remote_event) - - return remote_state - - async def _process_received_pdu( - self, - origin: str, - event: EventBase, - state: Optional[Iterable[EventBase]], - backfilled: bool = False, - ) -> None: - """Called when we have a new pdu. We need to do auth checks and put it - through the StateHandler. - - Args: - origin: server sending the event - - event: event to be persisted - - state: Normally None, but if we are handling a gap in the graph - (ie, we are missing one or more prev_events), the resolved state at the - event - - backfilled: True if this is part of a historical batch of events (inhibits - notification to clients, and validation of device keys.) - """ - logger.debug("Processing event: %s", event) - - try: - context = await self.state_handler.compute_event_context( - event, old_state=state - ) - await self._auth_and_persist_event( - origin, event, context, state=state, backfilled=backfilled - ) - except AuthError as e: - raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) - - if backfilled: - return - - # For encrypted messages we check that we know about the sending device, - # if we don't then we mark the device cache for that user as stale. - if event.type == EventTypes.Encrypted: - device_id = event.content.get("device_id") - sender_key = event.content.get("sender_key") - - cached_devices = await self.store.get_cached_devices_for_user(event.sender) - - resync = False # Whether we should resync device lists. - - device = None - if device_id is not None: - device = cached_devices.get(device_id) - if device is None: - logger.info( - "Received event from remote device not in our cache: %s %s", - event.sender, - device_id, - ) - resync = True - - # We also check if the `sender_key` matches what we expect. - if sender_key is not None: - # Figure out what sender key we're expecting. If we know the - # device and recognize the algorithm then we can work out the - # exact key to expect. Otherwise check it matches any key we - # have for that device. - - current_keys: Container[str] = [] - - if device: - keys = device.get("keys", {}).get("keys", {}) - - if ( - event.content.get("algorithm") - == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2 - ): - # For this algorithm we expect a curve25519 key. - key_name = "curve25519:%s" % (device_id,) - current_keys = [keys.get(key_name)] - else: - # We don't know understand the algorithm, so we just - # check it matches a key for the device. - current_keys = keys.values() - elif device_id: - # We don't have any keys for the device ID. - pass - else: - # The event didn't include a device ID, so we just look for - # keys across all devices. - current_keys = [ - key - for device in cached_devices.values() - for key in device.get("keys", {}).get("keys", {}).values() - ] - - # We now check that the sender key matches (one of) the expected - # keys. - if sender_key not in current_keys: - logger.info( - "Received event from remote device with unexpected sender key: %s %s: %s", - event.sender, - device_id or "", - sender_key, - ) - resync = True - - if resync: - run_as_background_process( - "resync_device_due_to_pdu", self._resync_device, event.sender - ) - - await self._handle_marker_event(origin, event) - - async def _handle_marker_event(self, origin: str, marker_event: EventBase): - """Handles backfilling the insertion event when we receive a marker - event that points to one. - - Args: - origin: Origin of the event. Will be called to get the insertion event - marker_event: The event to process - """ - - if marker_event.type != EventTypes.MSC2716_MARKER: - # Not a marker event - return - - if marker_event.rejected_reason is not None: - # Rejected event - return - - # Skip processing a marker event if the room version doesn't - # support it. - room_version = await self.store.get_room_version(marker_event.room_id) - if not room_version.msc2716_historical: - return - - logger.debug("_handle_marker_event: received %s", marker_event) - - insertion_event_id = marker_event.content.get( - EventContentFields.MSC2716_MARKER_INSERTION - ) - - if insertion_event_id is None: - # Nothing to retrieve then (invalid marker) - return - - logger.debug( - "_handle_marker_event: backfilling insertion event %s", insertion_event_id - ) - - await self._get_events_and_persist( - origin, - marker_event.room_id, - [insertion_event_id], - ) - - insertion_event = await self.store.get_event( - insertion_event_id, allow_none=True - ) - if insertion_event is None: - logger.warning( - "_handle_marker_event: server %s didn't return insertion event %s for marker %s", - origin, - insertion_event_id, - marker_event.event_id, - ) - return - - logger.debug( - "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s", - insertion_event, - marker_event, - ) - - await self.store.insert_insertion_extremity( - insertion_event_id, marker_event.room_id - ) - - logger.debug( - "_handle_marker_event: insertion extremity added for %s from marker event %s", - insertion_event, - marker_event, - ) - - async def _resync_device(self, sender: str) -> None: - """We have detected that the device list for the given user may be out - of sync, so we try and resync them. - """ - - try: - await self.store.mark_remote_user_device_cache_as_stale(sender) - - # Immediately attempt a resync in the background - if self.config.worker_app: - await self._user_device_resync(user_id=sender) - else: - await self._device_list_updater.user_device_resync(sender) - except Exception: - logger.exception("Failed to resync device for %s", sender) - - @log_function - async def backfill( - self, dest: str, room_id: str, limit: int, extremities: List[str] - ) -> None: - """Trigger a backfill request to `dest` for the given `room_id` - - This will attempt to get more events from the remote. If the other side - has no new events to offer, this will return an empty list. - - As the events are received, we check their signatures, and also do some - sanity-checking on them. If any of the backfilled events are invalid, - this method throws a SynapseError. - - We might also raise an InvalidResponseError if the response from the remote - server is just bogus. - - TODO: make this more useful to distinguish failures of the remote - server from invalid events (there is probably no point in trying to - re-fetch invalid events from every other HS in the room.) - """ - if dest == self.server_name: - raise SynapseError(400, "Can't backfill from self.") - - events = await self.federation_client.backfill( - dest, room_id, limit=limit, extremities=extremities - ) - - if not events: - return - - # if there are any events in the wrong room, the remote server is buggy and - # should not be trusted. - for ev in events: - if ev.room_id != room_id: - raise InvalidResponseError( - f"Remote server {dest} returned event {ev.event_id} which is in " - f"room {ev.room_id}, when we were backfilling in {room_id}" - ) - - await self._process_pulled_events(dest, events, backfilled=True) - async def maybe_backfill( self, room_id: str, current_depth: int, limit: int ) -> bool: @@ -995,7 +308,7 @@ class FederationHandler(BaseHandler): # TODO: Should we try multiple of these at a time? for dom in domains: try: - await self.backfill( + await self._federation_event_handler.backfill( dom, room_id, limit=100, extremities=extremities ) # If this succeeded then we probably already have the @@ -1084,316 +397,7 @@ class FederationHandler(BaseHandler): tried_domains.update(dom for dom, _ in likely_extremeties_domains) - return False - - async def _get_events_and_persist( - self, destination: str, room_id: str, events: Iterable[str] - ) -> None: - """Fetch the given events from a server, and persist them as outliers. - - This function *does not* recursively get missing auth events of the - newly fetched events. Callers must include in the `events` argument - any missing events from the auth chain. - - Logs a warning if we can't find the given event. - """ - - room_version = await self.store.get_room_version(room_id) - - event_map: Dict[str, EventBase] = {} - - async def get_event(event_id: str): - with nested_logging_context(event_id): - try: - event = await self.federation_client.get_pdu( - [destination], - event_id, - room_version, - outlier=True, - ) - if event is None: - logger.warning( - "Server %s didn't return event %s", - destination, - event_id, - ) - return - - event_map[event.event_id] = event - - except Exception as e: - logger.warning( - "Error fetching missing state/auth event %s: %s %s", - event_id, - type(e), - e, - ) - - await concurrently_execute(get_event, events, 5) - - # Make a map of auth events for each event. We do this after fetching - # all the events as some of the events' auth events will be in the list - # of requested events. - - auth_events = [ - aid - for event in event_map.values() - for aid in event.auth_event_ids() - if aid not in event_map - ] - persisted_events = await self.store.get_events( - auth_events, - allow_rejected=True, - ) - - event_infos = [] - for event in event_map.values(): - auth = {} - for auth_event_id in event.auth_event_ids(): - ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id) - if ae: - auth[(ae.type, ae.state_key)] = ae - else: - logger.info("Missing auth event %s", auth_event_id) - - event_infos.append(_NewEventInfo(event, auth)) - - if event_infos: - await self._auth_and_persist_events( - destination, - room_id, - event_infos, - ) - - async def _process_pulled_events( - self, origin: str, events: Iterable[EventBase], backfilled: bool - ) -> None: - """Process a batch of events we have pulled from a remote server - - Pulls in any events required to auth the events, persists the received events, - and notifies clients, if appropriate. - - Assumes the events have already had their signatures and hashes checked. - - Params: - origin: The server we received these events from - events: The received events. - backfilled: True if this is part of a historical batch of events (inhibits - notification to clients, and validation of device keys.) - """ - - # We want to sort these by depth so we process them and - # tell clients about them in order. - sorted_events = sorted(events, key=lambda x: x.depth) - - for ev in sorted_events: - with nested_logging_context(ev.event_id): - await self._process_pulled_event(origin, ev, backfilled=backfilled) - - async def _process_pulled_event( - self, origin: str, event: EventBase, backfilled: bool - ) -> None: - """Process a single event that we have pulled from a remote server - - Pulls in any events required to auth the event, persists the received event, - and notifies clients, if appropriate. - - Assumes the event has already had its signatures and hashes checked. - - This is somewhat equivalent to on_receive_pdu, but applies somewhat different - logic in the case that we are missing prev_events (in particular, it just - requests the state at that point, rather than triggering a get_missing_events) - - so is appropriate when we have pulled the event from a remote server, rather - than having it pushed to us. - - Params: - origin: The server we received this event from - events: The received event - backfilled: True if this is part of a historical batch of events (inhibits - notification to clients, and validation of device keys.) - """ - logger.info("Processing pulled event %s", event) - - # these should not be outliers. - assert not event.internal_metadata.is_outlier() - - event_id = event.event_id - - existing = await self.store.get_event( - event_id, allow_none=True, allow_rejected=True - ) - if existing: - if not existing.internal_metadata.is_outlier(): - logger.info( - "Ignoring received event %s which we have already seen", - event_id, - ) - return - logger.info("De-outliering event %s", event_id) - - try: - self._sanity_check_event(event) - except SynapseError as err: - logger.warning("Event %s failed sanity check: %s", event_id, err) - return - - try: - state = await self._resolve_state_at_missing_prevs(origin, event) - await self._process_received_pdu( - origin, event, state=state, backfilled=backfilled - ) - except FederationError as e: - if e.code == 403: - logger.warning("Pulled event %s failed history check.", event_id) - else: - raise - - async def _resolve_state_at_missing_prevs( - self, dest: str, event: EventBase - ) -> Optional[Iterable[EventBase]]: - """Calculate the state at an event with missing prev_events. - - This is used when we have pulled a batch of events from a remote server, and - still don't have all the prev_events. - - If we already have all the prev_events for `event`, this method does nothing. - - Otherwise, the missing prevs become new backwards extremities, and we fall back - to asking the remote server for the state after each missing `prev_event`, - and resolving across them. - - That's ok provided we then resolve the state against other bits of the DAG - before using it - in other words, that the received event `event` is not going - to become the only forwards_extremity in the room (which will ensure that you - can't just take over a room by sending an event, withholding its prev_events, - and declaring yourself to be an admin in the subsequent state request). - - In other words: we should only call this method if `event` has been *pulled* - as part of a batch of missing prev events, or similar. - - Params: - dest: the remote server to ask for state at the missing prevs. Typically, - this will be the server we got `event` from. - event: an event to check for missing prevs. - - Returns: - if we already had all the prev events, `None`. Otherwise, returns a list of - the events in the state at `event`. - """ - room_id = event.room_id - event_id = event.event_id - - prevs = set(event.prev_event_ids()) - seen = await self.store.have_events_in_timeline(prevs) - missing_prevs = prevs - seen - - if not missing_prevs: - return None - - logger.info( - "Event %s is missing prev_events %s: calculating state for a " - "backwards extremity", - event_id, - shortstr(missing_prevs), - ) - # Calculate the state after each of the previous events, and - # resolve them to find the correct state at the current event. - event_map = {event_id: event} - try: - # Get the state of the events we know about - ours = await self.state_store.get_state_groups_ids(room_id, seen) - - # state_maps is a list of mappings from (type, state_key) to event_id - state_maps: List[StateMap[str]] = list(ours.values()) - - # we don't need this any more, let's delete it. - del ours - - # Ask the remote server for the states we don't - # know about - for p in missing_prevs: - logger.info("Requesting state after missing prev_event %s", p) - - with nested_logging_context(p): - # note that if any of the missing prevs share missing state or - # auth events, the requests to fetch those events are deduped - # by the get_pdu_cache in federation_client. - remote_state = await self._get_state_after_missing_prev_event( - dest, room_id, p - ) - - remote_state_map = { - (x.type, x.state_key): x.event_id for x in remote_state - } - state_maps.append(remote_state_map) - - for x in remote_state: - event_map[x.event_id] = x - - room_version = await self.store.get_room_version_id(room_id) - state_map = await self._state_resolution_handler.resolve_events_with_store( - room_id, - room_version, - state_maps, - event_map, - state_res_store=StateResolutionStore(self.store), - ) - - # We need to give _process_received_pdu the actual state events - # rather than event ids, so generate that now. - - # First though we need to fetch all the events that are in - # state_map, so we can build up the state below. - evs = await self.store.get_events( - list(state_map.values()), - get_prev_content=False, - redact_behaviour=EventRedactBehaviour.AS_IS, - ) - event_map.update(evs) - - state = [event_map[e] for e in state_map.values()] - except Exception: - logger.warning( - "Error attempting to resolve state at missing prev_events", - exc_info=True, - ) - raise FederationError( - "ERROR", - 403, - "We can't get valid state history.", - affected=event_id, - ) - return state - - def _sanity_check_event(self, ev: EventBase) -> None: - """ - Do some early sanity checks of a received event - - In particular, checks it doesn't have an excessive number of - prev_events or auth_events, which could cause a huge state resolution - or cascade of event fetches. - - Args: - ev: event to be checked - - Raises: - SynapseError if the event does not pass muster - """ - if len(ev.prev_event_ids()) > 20: - logger.warning( - "Rejecting event %s which has %i prev_events", - ev.event_id, - len(ev.prev_event_ids()), - ) - raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events") - - if len(ev.auth_event_ids()) > 10: - logger.warning( - "Rejecting event %s which has %i auth_events", - ev.event_id, - len(ev.auth_event_ids()), - ) - raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") + return False async def send_invite(self, target_host: str, event: EventBase) -> EventBase: """Sends the invite to the remote server for signing. @@ -1460,9 +464,9 @@ class FederationHandler(BaseHandler): # This shouldn't happen, because the RoomMemberHandler has a # linearizer lock which only allows one operation per user per room # at a time - so this is just paranoia. - assert room_id not in self.room_queues + assert room_id not in self._federation_event_handler.room_queues - self.room_queues[room_id] = [] + self._federation_event_handler.room_queues[room_id] = [] await self._clean_room_for_join(room_id) @@ -1536,8 +540,8 @@ class FederationHandler(BaseHandler): logger.debug("Finished joining %s to %s", joinee, room_id) return event.event_id, max_stream_id finally: - room_queue = self.room_queues[room_id] - del self.room_queues[room_id] + room_queue = self._federation_event_handler.room_queues[room_id] + del self._federation_event_handler.room_queues[room_id] # we don't need to wait for the queued events to be processed - # it's just a best-effort thing at this point. We do want to do @@ -1613,7 +617,7 @@ class FederationHandler(BaseHandler): event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"] context = await self.state_handler.compute_event_context(event) - stream_id = await self.persist_events_and_notify( + stream_id = await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) return event.event_id, stream_id @@ -1633,7 +637,7 @@ class FederationHandler(BaseHandler): p, ) with nested_logging_context(p.event_id): - await self.on_receive_pdu(origin, p) + await self._federation_event_handler.on_receive_pdu(origin, p) except Exception as e: logger.warning( "Error handling queued PDU %s from %s: %s", p.event_id, origin, e @@ -1726,7 +730,7 @@ class FederationHandler(BaseHandler): raise # Ensure the user can even join the room. - await self._check_join_restrictions(context, event) + await self._federation_event_handler.check_join_restrictions(context, event) # The remote hasn't signed it yet, obviously. We'll do the full checks # when we get the event back in `on_send_join_request` @@ -1803,7 +807,9 @@ class FederationHandler(BaseHandler): ) context = await self.state_handler.compute_event_context(event) - await self.persist_events_and_notify(event.room_id, [(event, context)]) + await self._federation_event_handler.persist_events_and_notify( + event.room_id, [(event, context)] + ) return event @@ -1830,7 +836,7 @@ class FederationHandler(BaseHandler): await self.federation_client.send_leave(host_list, event) context = await self.state_handler.compute_event_context(event) - stream_id = await self.persist_events_and_notify( + stream_id = await self._federation_event_handler.persist_events_and_notify( event.room_id, [(event, context)] ) @@ -1973,116 +979,6 @@ class FederationHandler(BaseHandler): return event - @log_function - async def on_send_membership_event( - self, origin: str, event: EventBase - ) -> Tuple[EventBase, EventContext]: - """ - We have received a join/leave/knock event for a room via send_join/leave/knock. - - Verify that event and send it into the room on the remote homeserver's behalf. - - This is quite similar to on_receive_pdu, with the following principal - differences: - * only membership events are permitted (and only events with - sender==state_key -- ie, no kicks or bans) - * *We* send out the event on behalf of the remote server. - * We enforce the membership restrictions of restricted rooms. - * Rejected events result in an exception rather than being stored. - - There are also other differences, however it is not clear if these are by - design or omission. In particular, we do not attempt to backfill any missing - prev_events. - - Args: - origin: The homeserver of the remote (joining/invited/knocking) user. - event: The member event that has been signed by the remote homeserver. - - Returns: - The event and context of the event after inserting it into the room graph. - - Raises: - SynapseError if the event is not accepted into the room - """ - logger.debug( - "on_send_membership_event: Got event: %s, signatures: %s", - event.event_id, - event.signatures, - ) - - if get_domain_from_id(event.sender) != origin: - logger.info( - "Got send_membership request for user %r from different origin %s", - event.sender, - origin, - ) - raise SynapseError(403, "User not from origin", Codes.FORBIDDEN) - - if event.sender != event.state_key: - raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON) - - assert not event.internal_metadata.outlier - - # Send this event on behalf of the other server. - # - # The remote server isn't a full participant in the room at this point, so - # may not have an up-to-date list of the other homeservers participating in - # the room, so we send it on their behalf. - event.internal_metadata.send_on_behalf_of = origin - - context = await self.state_handler.compute_event_context(event) - context = await self._check_event_auth(origin, event, context) - if context.rejected: - raise SynapseError( - 403, f"{event.membership} event was rejected", Codes.FORBIDDEN - ) - - # for joins, we need to check the restrictions of restricted rooms - if event.membership == Membership.JOIN: - await self._check_join_restrictions(context, event) - - # for knock events, we run the third-party event rules. It's not entirely clear - # why we don't do this for other sorts of membership events. - if event.membership == Membership.KNOCK: - event_allowed, _ = await self.third_party_event_rules.check_event_allowed( - event, context - ) - if not event_allowed: - logger.info("Sending of knock %s forbidden by third-party rules", event) - raise SynapseError( - 403, "This event is not allowed in this context", Codes.FORBIDDEN - ) - - # all looks good, we can persist the event. - await self._run_push_actions_and_persist_event(event, context) - return event, context - - async def _check_join_restrictions( - self, context: EventContext, event: EventBase - ) -> None: - """Check that restrictions in restricted join rules are matched - - Called when we receive a join event via send_join. - - Raises an auth error if the restrictions are not matched. - """ - prev_state_ids = await context.get_prev_state_ids() - - # Check if the user is already in the room or invited to the room. - user_id = event.state_key - prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) - prev_member_event = None - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - - # Check if the member should be allowed access via membership in a space. - await self._event_auth_handler.check_restricted_join_rules( - prev_state_ids, - event.room_version, - user_id, - prev_member_event, - ) - async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]: """Returns the state at the event. i.e. not including said event.""" @@ -2183,126 +1079,6 @@ class FederationHandler(BaseHandler): else: return None - async def get_min_depth_for_context(self, context: str) -> int: - return await self.store.get_min_depth(context) - - async def _auth_and_persist_event( - self, - origin: str, - event: EventBase, - context: EventContext, - state: Optional[Iterable[EventBase]] = None, - claimed_auth_event_map: Optional[StateMap[EventBase]] = None, - backfilled: bool = False, - ) -> None: - """ - Process an event by performing auth checks and then persisting to the database. - - Args: - origin: The host the event originates from. - event: The event itself. - context: - The event context. - - state: - The state events used to check the event for soft-fail. If this is - not provided the current state events will be used. - - claimed_auth_event_map: - A map of (type, state_key) => event for the event's claimed auth_events. - Possibly incomplete, and possibly including events that are not yet - persisted, or authed, or in the right room. - - Only populated where we may not already have persisted these events - - for example, when populating outliers. - - backfilled: True if the event was backfilled. - """ - context = await self._check_event_auth( - origin, - event, - context, - state=state, - claimed_auth_event_map=claimed_auth_event_map, - backfilled=backfilled, - ) - - await self._run_push_actions_and_persist_event(event, context, backfilled) - - async def _run_push_actions_and_persist_event( - self, event: EventBase, context: EventContext, backfilled: bool = False - ): - """Run the push actions for a received event, and persist it. - - Args: - event: The event itself. - context: The event context. - backfilled: True if the event was backfilled. - """ - try: - if ( - not event.internal_metadata.is_outlier() - and not backfilled - and not context.rejected - and (await self.store.get_min_depth(event.room_id)) <= event.depth - ): - await self.action_generator.handle_push_actions_for_event( - event, context - ) - - await self.persist_events_and_notify( - event.room_id, [(event, context)], backfilled=backfilled - ) - except Exception: - run_in_background( - self.store.remove_push_actions_from_staging, event.event_id - ) - raise - - async def _auth_and_persist_events( - self, - origin: str, - room_id: str, - event_infos: Collection[_NewEventInfo], - ) -> None: - """Creates the appropriate contexts and persists events. The events - should not depend on one another, e.g. this should be used to persist - a bunch of outliers, but not a chunk of individual events that depend - on each other for state calculations. - - Notifies about the events where appropriate. - """ - - if not event_infos: - return - - async def prep(ev_info: _NewEventInfo): - event = ev_info.event - with nested_logging_context(suffix=event.event_id): - res = await self.state_handler.compute_event_context(event) - res = await self._check_event_auth( - origin, - event, - res, - claimed_auth_event_map=ev_info.claimed_auth_event_map, - ) - return res - - contexts = await make_deferred_yieldable( - defer.gatherResults( - [run_in_background(prep, ev_info) for ev_info in event_infos], - consumeErrors=True, - ) - ) - - await self.persist_events_and_notify( - room_id, - [ - (ev_info.event, context) - for ev_info, context in zip(event_infos, contexts) - ], - ) - async def _persist_auth_tree( self, origin: str, @@ -2400,7 +1176,7 @@ class FederationHandler(BaseHandler): events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR if auth_events or state: - await self.persist_events_and_notify( + await self._federation_event_handler.persist_events_and_notify( room_id, [ (e, events_to_context[e.event_id]) @@ -2412,108 +1188,10 @@ class FederationHandler(BaseHandler): event, old_state=state ) - return await self.persist_events_and_notify( + return await self._federation_event_handler.persist_events_and_notify( room_id, [(event, new_event_context)] ) - async def _check_for_soft_fail( - self, - event: EventBase, - state: Optional[Iterable[EventBase]], - backfilled: bool, - origin: str, - ) -> None: - """Checks if we should soft fail the event; if so, marks the event as - such. - - Args: - event - state: The state at the event if we don't have all the event's prev events - backfilled: Whether the event is from backfill - origin: The host the event originates from. - """ - # For new (non-backfilled and non-outlier) events we check if the event - # passes auth based on the current state. If it doesn't then we - # "soft-fail" the event. - if backfilled or event.internal_metadata.is_outlier(): - return - - extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id) - extrem_ids = set(extrem_ids_list) - prev_event_ids = set(event.prev_event_ids()) - - if extrem_ids == prev_event_ids: - # If they're the same then the current state is the same as the - # state at the event, so no point rechecking auth for soft fail. - return - - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - # Calculate the "current state". - if state is not None: - # If we're explicitly given the state then we won't have all the - # prev events, and so we have a gap in the graph. In this case - # we want to be a little careful as we might have been down for - # a while and have an incorrect view of the current state, - # however we still want to do checks as gaps are easy to - # maliciously manufacture. - # - # So we use a "current state" that is actually a state - # resolution across the current forward extremities and the - # given state at the event. This should correctly handle cases - # like bans, especially with state res v2. - - state_sets_d = await self.state_store.get_state_groups( - event.room_id, extrem_ids - ) - state_sets: List[Iterable[EventBase]] = list(state_sets_d.values()) - state_sets.append(state) - current_states = await self.state_handler.resolve_events( - room_version, state_sets, event - ) - current_state_ids: StateMap[str] = { - k: e.event_id for k, e in current_states.items() - } - else: - current_state_ids = await self.state_handler.get_current_state_ids( - event.room_id, latest_event_ids=extrem_ids - ) - - logger.debug( - "Doing soft-fail check for %s: state %s", - event.event_id, - current_state_ids, - ) - - # Now check if event pass auth against said current state - auth_types = auth_types_for_event(room_version_obj, event) - current_state_ids_list = [ - e for k, e in current_state_ids.items() if k in auth_types - ] - - auth_events_map = await self.store.get_events(current_state_ids_list) - current_auth_events = { - (e.type, e.state_key): e for e in auth_events_map.values() - } - - try: - event_auth.check(room_version_obj, event, auth_events=current_auth_events) - except AuthError as e: - logger.warning( - "Soft-failing %r (from %s) because %s", - event, - e, - origin, - extra={ - "room_id": event.room_id, - "mxid": event.sender, - "hs": origin, - }, - ) - soft_failed_event_counter.inc() - event.internal_metadata.soft_failed = True - async def on_get_missing_events( self, origin: str, @@ -2542,334 +1220,6 @@ class FederationHandler(BaseHandler): return missing_events - async def _check_event_auth( - self, - origin: str, - event: EventBase, - context: EventContext, - state: Optional[Iterable[EventBase]] = None, - claimed_auth_event_map: Optional[StateMap[EventBase]] = None, - backfilled: bool = False, - ) -> EventContext: - """ - Checks whether an event should be rejected (for failing auth checks). - - Args: - origin: The host the event originates from. - event: The event itself. - context: - The event context. - - state: - The state events used to check the event for soft-fail. If this is - not provided the current state events will be used. - - claimed_auth_event_map: - A map of (type, state_key) => event for the event's claimed auth_events. - Possibly incomplete, and possibly including events that are not yet - persisted, or authed, or in the right room. - - Only populated where we may not already have persisted these events - - for example, when populating outliers, or the state for a backwards - extremity. - - backfilled: True if the event was backfilled. - - Returns: - The updated context object. - """ - room_version = await self.store.get_room_version_id(event.room_id) - room_version_obj = KNOWN_ROOM_VERSIONS[room_version] - - if claimed_auth_event_map: - # if we have a copy of the auth events from the event, use that as the - # basis for auth. - auth_events = claimed_auth_event_map - else: - # otherwise, we calculate what the auth events *should* be, and use that - prev_state_ids = await context.get_prev_state_ids() - auth_events_ids = self._event_auth_handler.compute_auth_events( - event, prev_state_ids, for_verification=True - ) - auth_events_x = await self.store.get_events(auth_events_ids) - auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()} - - try: - ( - context, - auth_events_for_auth, - ) = await self._update_auth_events_and_context_for_auth( - origin, event, context, auth_events - ) - except Exception: - # We don't really mind if the above fails, so lets not fail - # processing if it does. However, it really shouldn't fail so - # let's still log as an exception since we'll still want to fix - # any bugs. - logger.exception( - "Failed to double check auth events for %s with remote. " - "Ignoring failure and continuing processing of event.", - event.event_id, - ) - auth_events_for_auth = auth_events - - try: - event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth) - except AuthError as e: - logger.warning("Failed auth resolution for %r because %s", event, e) - context.rejected = RejectedReason.AUTH_ERROR - - if not context.rejected: - await self._check_for_soft_fail(event, state, backfilled, origin=origin) - - if event.type == EventTypes.GuestAccess and not context.rejected: - await self.maybe_kick_guest_users(event) - - # If we are going to send this event over federation we precaclculate - # the joined hosts. - if event.internal_metadata.get_send_on_behalf_of(): - await self.event_creation_handler.cache_joined_hosts_for_event( - event, context - ) - - return context - - async def _update_auth_events_and_context_for_auth( - self, - origin: str, - event: EventBase, - context: EventContext, - input_auth_events: StateMap[EventBase], - ) -> Tuple[EventContext, StateMap[EventBase]]: - """Helper for _check_event_auth. See there for docs. - - Checks whether a given event has the expected auth events. If it - doesn't then we talk to the remote server to compare state to see if - we can come to a consensus (e.g. if one server missed some valid - state). - - This attempts to resolve any potential divergence of state between - servers, but is not essential and so failures should not block further - processing of the event. - - Args: - origin: - event: - context: - - input_auth_events: - Map from (event_type, state_key) to event - - Normally, our calculated auth_events based on the state of the room - at the event's position in the DAG, though occasionally (eg if the - event is an outlier), may be the auth events claimed by the remote - server. - - Returns: - updated context, updated auth event map - """ - # take a copy of input_auth_events before we modify it. - auth_events: MutableStateMap[EventBase] = dict(input_auth_events) - - event_auth_events = set(event.auth_event_ids()) - - # missing_auth is the set of the event's auth_events which we don't yet have - # in auth_events. - missing_auth = event_auth_events.difference( - e.event_id for e in auth_events.values() - ) - - # if we have missing events, we need to fetch those events from somewhere. - # - # we start by checking if they are in the store, and then try calling /event_auth/. - if missing_auth: - have_events = await self.store.have_seen_events(event.room_id, missing_auth) - logger.debug("Events %s are in the store", have_events) - missing_auth.difference_update(have_events) - - if missing_auth: - # If we don't have all the auth events, we need to get them. - logger.info("auth_events contains unknown events: %s", missing_auth) - try: - try: - remote_auth_chain = await self.federation_client.get_event_auth( - origin, event.room_id, event.event_id - ) - except RequestSendFailed as e1: - # The other side isn't around or doesn't implement the - # endpoint, so lets just bail out. - logger.info("Failed to get event auth from remote: %s", e1) - return context, auth_events - - seen_remotes = await self.store.have_seen_events( - event.room_id, [e.event_id for e in remote_auth_chain] - ) - - for e in remote_auth_chain: - if e.event_id in seen_remotes: - continue - - if e.event_id == event.event_id: - continue - - try: - auth_ids = e.auth_event_ids() - auth = { - (e.type, e.state_key): e - for e in remote_auth_chain - if e.event_id in auth_ids or e.type == EventTypes.Create - } - e.internal_metadata.outlier = True - - logger.debug( - "_check_event_auth %s missing_auth: %s", - event.event_id, - e.event_id, - ) - missing_auth_event_context = ( - await self.state_handler.compute_event_context(e) - ) - await self._auth_and_persist_event( - origin, - e, - missing_auth_event_context, - claimed_auth_event_map=auth, - ) - - if e.event_id in event_auth_events: - auth_events[(e.type, e.state_key)] = e - except AuthError: - pass - - except Exception: - logger.exception("Failed to get auth chain") - - if event.internal_metadata.is_outlier(): - # XXX: given that, for an outlier, we'll be working with the - # event's *claimed* auth events rather than those we calculated: - # (a) is there any point in this test, since different_auth below will - # obviously be empty - # (b) alternatively, why don't we do it earlier? - logger.info("Skipping auth_event fetch for outlier") - return context, auth_events - - different_auth = event_auth_events.difference( - e.event_id for e in auth_events.values() - ) - - if not different_auth: - return context, auth_events - - logger.info( - "auth_events refers to events which are not in our calculated auth " - "chain: %s", - different_auth, - ) - - # XXX: currently this checks for redactions but I'm not convinced that is - # necessary? - different_events = await self.store.get_events_as_list(different_auth) - - for d in different_events: - if d.room_id != event.room_id: - logger.warning( - "Event %s refers to auth_event %s which is in a different room", - event.event_id, - d.event_id, - ) - - # don't attempt to resolve the claimed auth events against our own - # in this case: just use our own auth events. - # - # XXX: should we reject the event in this case? It feels like we should, - # but then shouldn't we also do so if we've failed to fetch any of the - # auth events? - return context, auth_events - - # now we state-resolve between our own idea of the auth events, and the remote's - # idea of them. - - local_state = auth_events.values() - remote_auth_events = dict(auth_events) - remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) - remote_state = remote_auth_events.values() - - room_version = await self.store.get_room_version_id(event.room_id) - new_state = await self.state_handler.resolve_events( - room_version, (local_state, remote_state), event - ) - - logger.info( - "After state res: updating auth_events with new state %s", - { - (d.type, d.state_key): d.event_id - for d in new_state.values() - if auth_events.get((d.type, d.state_key)) != d - }, - ) - - auth_events.update(new_state) - - context = await self._update_context_for_auth_events( - event, context, auth_events - ) - - return context, auth_events - - async def _update_context_for_auth_events( - self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] - ) -> EventContext: - """Update the state_ids in an event context after auth event resolution, - storing the changes as a new state group. - - Args: - event: The event we're handling the context for - - context: initial event context - - auth_events: Events to update in the event context. - - Returns: - new event context - """ - # exclude the state key of the new event from the current_state in the context. - if event.is_state(): - event_key: Optional[Tuple[str, str]] = (event.type, event.state_key) - else: - event_key = None - state_updates = { - k: a.event_id for k, a in auth_events.items() if k != event_key - } - - current_state_ids = await context.get_current_state_ids() - current_state_ids = dict(current_state_ids) # type: ignore - - current_state_ids.update(state_updates) - - prev_state_ids = await context.get_prev_state_ids() - prev_state_ids = dict(prev_state_ids) - - prev_state_ids.update({k: a.event_id for k, a in auth_events.items()}) - - # create a new state group as a delta from the existing one. - prev_group = context.state_group - state_group = await self.state_store.store_state_group( - event.event_id, - event.room_id, - prev_group=prev_group, - delta_ids=state_updates, - current_state_ids=current_state_ids, - ) - - return EventContext.with_state( - state_group=state_group, - state_group_before_event=context.state_group_before_event, - current_state_ids=current_state_ids, - prev_state_ids=prev_state_ids, - prev_group=prev_group, - delta_ids=state_updates, - ) - async def construct_auth_difference( self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase] ) -> Dict: @@ -3256,99 +1606,6 @@ class FederationHandler(BaseHandler): if "valid" not in response or not response["valid"]: raise AuthError(403, "Third party certificate was invalid") - async def persist_events_and_notify( - self, - room_id: str, - event_and_contexts: Sequence[Tuple[EventBase, EventContext]], - backfilled: bool = False, - ) -> int: - """Persists events and tells the notifier/pushers about them, if - necessary. - - Args: - room_id: The room ID of events being persisted. - event_and_contexts: Sequence of events with their associated - context that should be persisted. All events must belong to - the same room. - backfilled: Whether these events are a result of - backfilling or not - - Returns: - The stream ID after which all events have been persisted. - """ - if not event_and_contexts: - return self.store.get_current_events_token() - - instance = self.config.worker.events_shard_config.get_instance(room_id) - if instance != self._instance_name: - # Limit the number of events sent over replication. We choose 200 - # here as that is what we default to in `max_request_body_size(..)` - for batch in batch_iter(event_and_contexts, 200): - result = await self._send_events( - instance_name=instance, - store=self.store, - room_id=room_id, - event_and_contexts=batch, - backfilled=backfilled, - ) - return result["max_stream_id"] - else: - assert self.storage.persistence - - # Note that this returns the events that were persisted, which may not be - # the same as were passed in if some were deduplicated due to transaction IDs. - events, max_stream_token = await self.storage.persistence.persist_events( - event_and_contexts, backfilled=backfilled - ) - - if self._ephemeral_messages_enabled: - for event in events: - # If there's an expiry timestamp on the event, schedule its expiry. - self._message_handler.maybe_schedule_expiry(event) - - if not backfilled: # Never notify for backfilled events - for event in events: - await self._notify_persisted_event(event, max_stream_token) - - return max_stream_token.stream - - async def _notify_persisted_event( - self, event: EventBase, max_stream_token: RoomStreamToken - ) -> None: - """Checks to see if notifier/pushers should be notified about the - event or not. - - Args: - event: - max_stream_id: The max_stream_id returned by persist_events - """ - - extra_users = [] - if event.type == EventTypes.Member: - target_user_id = event.state_key - - # We notify for memberships if its an invite for one of our - # users - if event.internal_metadata.is_outlier(): - if event.membership != Membership.INVITE: - if not self.is_mine_id(target_user_id): - return - - target_user = UserID.from_string(target_user_id) - extra_users.append(target_user) - elif event.internal_metadata.is_outlier(): - return - - # the event has been persisted so it should have a stream ordering. - assert event.internal_metadata.stream_ordering - - event_pos = PersistedEventPosition( - self._instance_name, event.internal_metadata.stream_ordering - ) - self.notifier.on_new_room_event( - event, event_pos, max_stream_token, extra_users=extra_users - ) - async def _clean_room_for_join(self, room_id: str) -> None: """Called to clean up any data in DB for a given room, ready for the server to join the room. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py new file mode 100644 index 0000000000..9f055f00cf --- /dev/null +++ b/synapse/handlers/federation_event.py @@ -0,0 +1,1825 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging +from http import HTTPStatus +from typing import ( + TYPE_CHECKING, + Collection, + Container, + Dict, + Iterable, + List, + Optional, + Sequence, + Set, + Tuple, +) + +import attr +from prometheus_client import Counter + +from twisted.internet import defer + +from synapse import event_auth +from synapse.api.constants import ( + EventContentFields, + EventTypes, + Membership, + RejectedReason, + RoomEncryptionAlgorithms, +) +from synapse.api.errors import ( + AuthError, + Codes, + FederationError, + HttpResponseException, + RequestSendFailed, + SynapseError, +) +from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.event_auth import auth_types_for_event +from synapse.events import EventBase +from synapse.events.snapshot import EventContext +from synapse.federation.federation_client import InvalidResponseError +from synapse.handlers._base import BaseHandler +from synapse.logging.context import ( + make_deferred_yieldable, + nested_logging_context, + run_in_background, +) +from synapse.logging.utils import log_function +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet +from synapse.replication.http.federation import ( + ReplicationFederationSendEventsRestServlet, +) +from synapse.state import StateResolutionStore +from synapse.storage.databases.main.events_worker import EventRedactBehaviour +from synapse.types import ( + MutableStateMap, + PersistedEventPosition, + RoomStreamToken, + StateMap, + UserID, + get_domain_from_id, +) +from synapse.util.async_helpers import Linearizer, concurrently_execute +from synapse.util.iterutils import batch_iter +from synapse.util.retryutils import NotRetryingDestination +from synapse.util.stringutils import shortstr + +if TYPE_CHECKING: + from synapse.server import HomeServer + + +logger = logging.getLogger(__name__) + +soft_failed_event_counter = Counter( + "synapse_federation_soft_failed_events_total", + "Events received over federation that we marked as soft_failed", +) + + +@attr.s(slots=True, frozen=True, auto_attribs=True) +class _NewEventInfo: + """Holds information about a received event, ready for passing to _auth_and_persist_events + + Attributes: + event: the received event + + claimed_auth_event_map: a map of (type, state_key) => event for the event's + claimed auth_events. + + This can include events which have not yet been persisted, in the case that + we are backfilling a batch of events. + + Note: May be incomplete: if we were unable to find all of the claimed auth + events. Also, treat the contents with caution: the events might also have + been rejected, might not yet have been authorized themselves, or they might + be in the wrong room. + + """ + + event: EventBase + claimed_auth_event_map: StateMap[EventBase] + + +class FederationEventHandler(BaseHandler): + """Handles events that originated from federation. + + Responsible for handing incoming events and passing them on to the rest + of the homeserver (including auth and state conflict resolutions) + """ + + def __init__(self, hs: "HomeServer"): + super().__init__(hs) + + self.store = hs.get_datastore() + self.storage = hs.get_storage() + self.state_store = self.storage.state + + self.state_handler = hs.get_state_handler() + self.event_creation_handler = hs.get_event_creation_handler() + self._event_auth_handler = hs.get_event_auth_handler() + self._message_handler = hs.get_message_handler() + self.action_generator = hs.get_action_generator() + self._state_resolution_handler = hs.get_state_resolution_handler() + + self.federation_client = hs.get_federation_client() + self.third_party_event_rules = hs.get_third_party_event_rules() + + self.is_mine_id = hs.is_mine_id + self._instance_name = hs.get_instance_name() + + self.config = hs.config + self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages + + self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs) + if hs.config.worker_app: + self._user_device_resync = ( + ReplicationUserDevicesResyncRestServlet.make_client(hs) + ) + else: + self._device_list_updater = hs.get_device_handler().device_list_updater + + # When joining a room we need to queue any events for that room up. + # For each room, a list of (pdu, origin) tuples. + # TODO: replace this with something more elegant, probably based around the + # federation event staging area. + self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {} + + self._room_pdu_linearizer = Linearizer("fed_room_pdu") + + async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None: + """Process a PDU received via a federation /send/ transaction + + Args: + origin: server which initiated the /send/ transaction. Will + be used to fetch missing events or state. + pdu: received PDU + """ + + room_id = pdu.room_id + event_id = pdu.event_id + + # We reprocess pdus when we have seen them only as outliers + existing = await self.store.get_event( + event_id, allow_none=True, allow_rejected=True + ) + + # FIXME: Currently we fetch an event again when we already have it + # if it has been marked as an outlier. + if existing: + if not existing.internal_metadata.is_outlier(): + logger.info( + "Ignoring received event %s which we have already seen", event_id + ) + return + if pdu.internal_metadata.is_outlier(): + logger.info( + "Ignoring received outlier %s which we already have as an outlier", + event_id, + ) + return + logger.info("De-outliering event %s", event_id) + + # do some initial sanity-checking of the event. In particular, make + # sure it doesn't have hundreds of prev_events or auth_events, which + # could cause a huge state resolution or cascade of event fetches. + try: + self._sanity_check_event(pdu) + except SynapseError as err: + logger.warning("Received event failed sanity checks") + raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id) + + # If we are currently in the process of joining this room, then we + # queue up events for later processing. + if room_id in self.room_queues: + logger.info( + "Queuing PDU from %s for now: join in progress", + origin, + ) + self.room_queues[room_id].append((pdu, origin)) + return + + # If we're not in the room just ditch the event entirely. This is + # probably an old server that has come back and thinks we're still in + # the room (or we've been rejoined to the room by a state reset). + # + # Note that if we were never in the room then we would have already + # dropped the event, since we wouldn't know the room version. + is_in_room = await self._event_auth_handler.check_host_in_room( + room_id, self.server_name + ) + if not is_in_room: + logger.info( + "Ignoring PDU from %s as we're not in the room", + origin, + ) + return None + + # Check that the event passes auth based on the state at the event. This is + # done for events that are to be added to the timeline (non-outliers). + # + # Get missing pdus if necessary: + # - Fetching any missing prev events to fill in gaps in the graph + # - Fetching state if we have a hole in the graph + if not pdu.internal_metadata.is_outlier(): + prevs = set(pdu.prev_event_ids()) + seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if missing_prevs: + # We only backfill backwards to the min depth. + min_depth = await self.get_min_depth_for_context(pdu.room_id) + logger.debug("min_depth: %d", min_depth) + + if min_depth is not None and pdu.depth > min_depth: + # If we're missing stuff, ensure we only fetch stuff one + # at a time. + logger.info( + "Acquiring room lock to fetch %d missing prev_events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + with (await self._room_pdu_linearizer.queue(pdu.room_id)): + logger.info( + "Acquired room lock to fetch %d missing prev_events", + len(missing_prevs), + ) + + try: + await self._get_missing_events_for_pdu( + origin, pdu, prevs, min_depth + ) + except Exception as e: + raise Exception( + "Error fetching missing prev_events for %s: %s" + % (event_id, e) + ) from e + + # Update the set of things we've seen after trying to + # fetch the missing stuff + seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if not missing_prevs: + logger.info("Found all missing prev_events") + + if missing_prevs: + # since this event was pushed to us, it is possible for it to + # become the only forward-extremity in the room, and we would then + # trust its state to be the state for the whole room. This is very + # bad. Further, if the event was pushed to us, there is no excuse + # for us not to have all the prev_events. (XXX: apart from + # min_depth?) + # + # We therefore reject any such events. + logger.warning( + "Rejecting: failed to fetch %d prev events: %s", + len(missing_prevs), + shortstr(missing_prevs), + ) + raise FederationError( + "ERROR", + 403, + ( + "Your server isn't divulging details about prev_events " + "referenced in this event." + ), + affected=pdu.event_id, + ) + + await self._process_received_pdu(origin, pdu, state=None) + + @log_function + async def on_send_membership_event( + self, origin: str, event: EventBase + ) -> Tuple[EventBase, EventContext]: + """ + We have received a join/leave/knock event for a room via send_join/leave/knock. + + Verify that event and send it into the room on the remote homeserver's behalf. + + This is quite similar to on_receive_pdu, with the following principal + differences: + * only membership events are permitted (and only events with + sender==state_key -- ie, no kicks or bans) + * *We* send out the event on behalf of the remote server. + * We enforce the membership restrictions of restricted rooms. + * Rejected events result in an exception rather than being stored. + + There are also other differences, however it is not clear if these are by + design or omission. In particular, we do not attempt to backfill any missing + prev_events. + + Args: + origin: The homeserver of the remote (joining/invited/knocking) user. + event: The member event that has been signed by the remote homeserver. + + Returns: + The event and context of the event after inserting it into the room graph. + + Raises: + SynapseError if the event is not accepted into the room + """ + logger.debug( + "on_send_membership_event: Got event: %s, signatures: %s", + event.event_id, + event.signatures, + ) + + if get_domain_from_id(event.sender) != origin: + logger.info( + "Got send_membership request for user %r from different origin %s", + event.sender, + origin, + ) + raise SynapseError(403, "User not from origin", Codes.FORBIDDEN) + + if event.sender != event.state_key: + raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON) + + assert not event.internal_metadata.outlier + + # Send this event on behalf of the other server. + # + # The remote server isn't a full participant in the room at this point, so + # may not have an up-to-date list of the other homeservers participating in + # the room, so we send it on their behalf. + event.internal_metadata.send_on_behalf_of = origin + + context = await self.state_handler.compute_event_context(event) + context = await self._check_event_auth(origin, event, context) + if context.rejected: + raise SynapseError( + 403, f"{event.membership} event was rejected", Codes.FORBIDDEN + ) + + # for joins, we need to check the restrictions of restricted rooms + if event.membership == Membership.JOIN: + await self.check_join_restrictions(context, event) + + # for knock events, we run the third-party event rules. It's not entirely clear + # why we don't do this for other sorts of membership events. + if event.membership == Membership.KNOCK: + event_allowed, _ = await self.third_party_event_rules.check_event_allowed( + event, context + ) + if not event_allowed: + logger.info("Sending of knock %s forbidden by third-party rules", event) + raise SynapseError( + 403, "This event is not allowed in this context", Codes.FORBIDDEN + ) + + # all looks good, we can persist the event. + await self._run_push_actions_and_persist_event(event, context) + return event, context + + async def check_join_restrictions( + self, context: EventContext, event: EventBase + ) -> None: + """Check that restrictions in restricted join rules are matched + + Called when we receive a join event via send_join. + + Raises an auth error if the restrictions are not matched. + """ + prev_state_ids = await context.get_prev_state_ids() + + # Check if the user is already in the room or invited to the room. + user_id = event.state_key + prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) + prev_member_event = None + if prev_member_event_id: + prev_member_event = await self.store.get_event(prev_member_event_id) + + # Check if the member should be allowed access via membership in a space. + await self._event_auth_handler.check_restricted_join_rules( + prev_state_ids, + event.room_version, + user_id, + prev_member_event, + ) + + @log_function + async def backfill( + self, dest: str, room_id: str, limit: int, extremities: List[str] + ) -> None: + """Trigger a backfill request to `dest` for the given `room_id` + + This will attempt to get more events from the remote. If the other side + has no new events to offer, this will return an empty list. + + As the events are received, we check their signatures, and also do some + sanity-checking on them. If any of the backfilled events are invalid, + this method throws a SynapseError. + + We might also raise an InvalidResponseError if the response from the remote + server is just bogus. + + TODO: make this more useful to distinguish failures of the remote + server from invalid events (there is probably no point in trying to + re-fetch invalid events from every other HS in the room.) + """ + if dest == self.server_name: + raise SynapseError(400, "Can't backfill from self.") + + events = await self.federation_client.backfill( + dest, room_id, limit=limit, extremities=extremities + ) + + if not events: + return + + # if there are any events in the wrong room, the remote server is buggy and + # should not be trusted. + for ev in events: + if ev.room_id != room_id: + raise InvalidResponseError( + f"Remote server {dest} returned event {ev.event_id} which is in " + f"room {ev.room_id}, when we were backfilling in {room_id}" + ) + + await self._process_pulled_events(dest, events, backfilled=True) + + async def _get_missing_events_for_pdu( + self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int + ) -> None: + """ + Args: + origin: Origin of the pdu. Will be called to get the missing events + pdu: received pdu + prevs: List of event ids which we are missing + min_depth: Minimum depth of events to return. + """ + + room_id = pdu.room_id + event_id = pdu.event_id + + seen = await self.store.have_events_in_timeline(prevs) + + if not prevs - seen: + return + + latest_list = await self.store.get_latest_event_ids_in_room(room_id) + + # We add the prev events that we have seen to the latest + # list to ensure the remote server doesn't give them to us + latest = set(latest_list) + latest |= seen + + logger.info( + "Requesting missing events between %s and %s", + shortstr(latest), + event_id, + ) + + # XXX: we set timeout to 10s to help workaround + # https://github.com/matrix-org/synapse/issues/1733. + # The reason is to avoid holding the linearizer lock + # whilst processing inbound /send transactions, causing + # FDs to stack up and block other inbound transactions + # which empirically can currently take up to 30 minutes. + # + # N.B. this explicitly disables retry attempts. + # + # N.B. this also increases our chances of falling back to + # fetching fresh state for the room if the missing event + # can't be found, which slightly reduces our security. + # it may also increase our DAG extremity count for the room, + # causing additional state resolution? See #1760. + # However, fetching state doesn't hold the linearizer lock + # apparently. + # + # see https://github.com/matrix-org/synapse/pull/1744 + # + # ---- + # + # Update richvdh 2018/09/18: There are a number of problems with timing this + # request out aggressively on the client side: + # + # - it plays badly with the server-side rate-limiter, which starts tarpitting you + # if you send too many requests at once, so you end up with the server carefully + # working through the backlog of your requests, which you have already timed + # out. + # + # - for this request in particular, we now (as of + # https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the + # server can't produce a plausible-looking set of prev_events - so we becone + # much more likely to reject the event. + # + # - contrary to what it says above, we do *not* fall back to fetching fresh state + # for the room if get_missing_events times out. Rather, we give up processing + # the PDU whose prevs we are missing, which then makes it much more likely that + # we'll end up back here for the *next* PDU in the list, which exacerbates the + # problem. + # + # - the aggressive 10s timeout was introduced to deal with incoming federation + # requests taking 8 hours to process. It's not entirely clear why that was going + # on; certainly there were other issues causing traffic storms which are now + # resolved, and I think in any case we may be more sensible about our locking + # now. We're *certainly* more sensible about our logging. + # + # All that said: Let's try increasing the timeout to 60s and see what happens. + + try: + missing_events = await self.federation_client.get_missing_events( + origin, + room_id, + earliest_events_ids=list(latest), + latest_events=[pdu], + limit=10, + min_depth=min_depth, + timeout=60000, + ) + except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e: + # We failed to get the missing events, but since we need to handle + # the case of `get_missing_events` not returning the necessary + # events anyway, it is safe to simply log the error and continue. + logger.warning("Failed to get prev_events: %s", e) + return + + logger.info("Got %d prev_events", len(missing_events)) + await self._process_pulled_events(origin, missing_events, backfilled=False) + + async def _process_pulled_events( + self, origin: str, events: Iterable[EventBase], backfilled: bool + ) -> None: + """Process a batch of events we have pulled from a remote server + + Pulls in any events required to auth the events, persists the received events, + and notifies clients, if appropriate. + + Assumes the events have already had their signatures and hashes checked. + + Params: + origin: The server we received these events from + events: The received events. + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) + """ + + # We want to sort these by depth so we process them and + # tell clients about them in order. + sorted_events = sorted(events, key=lambda x: x.depth) + + for ev in sorted_events: + with nested_logging_context(ev.event_id): + await self._process_pulled_event(origin, ev, backfilled=backfilled) + + async def _process_pulled_event( + self, origin: str, event: EventBase, backfilled: bool + ) -> None: + """Process a single event that we have pulled from a remote server + + Pulls in any events required to auth the event, persists the received event, + and notifies clients, if appropriate. + + Assumes the event has already had its signatures and hashes checked. + + This is somewhat equivalent to on_receive_pdu, but applies somewhat different + logic in the case that we are missing prev_events (in particular, it just + requests the state at that point, rather than triggering a get_missing_events) - + so is appropriate when we have pulled the event from a remote server, rather + than having it pushed to us. + + Params: + origin: The server we received this event from + events: The received event + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) + """ + logger.info("Processing pulled event %s", event) + + # these should not be outliers. + assert not event.internal_metadata.is_outlier() + + event_id = event.event_id + + existing = await self.store.get_event( + event_id, allow_none=True, allow_rejected=True + ) + if existing: + if not existing.internal_metadata.is_outlier(): + logger.info( + "Ignoring received event %s which we have already seen", + event_id, + ) + return + logger.info("De-outliering event %s", event_id) + + try: + self._sanity_check_event(event) + except SynapseError as err: + logger.warning("Event %s failed sanity check: %s", event_id, err) + return + + try: + state = await self._resolve_state_at_missing_prevs(origin, event) + await self._process_received_pdu( + origin, event, state=state, backfilled=backfilled + ) + except FederationError as e: + if e.code == 403: + logger.warning("Pulled event %s failed history check.", event_id) + else: + raise + + async def _resolve_state_at_missing_prevs( + self, dest: str, event: EventBase + ) -> Optional[Iterable[EventBase]]: + """Calculate the state at an event with missing prev_events. + + This is used when we have pulled a batch of events from a remote server, and + still don't have all the prev_events. + + If we already have all the prev_events for `event`, this method does nothing. + + Otherwise, the missing prevs become new backwards extremities, and we fall back + to asking the remote server for the state after each missing `prev_event`, + and resolving across them. + + That's ok provided we then resolve the state against other bits of the DAG + before using it - in other words, that the received event `event` is not going + to become the only forwards_extremity in the room (which will ensure that you + can't just take over a room by sending an event, withholding its prev_events, + and declaring yourself to be an admin in the subsequent state request). + + In other words: we should only call this method if `event` has been *pulled* + as part of a batch of missing prev events, or similar. + + Params: + dest: the remote server to ask for state at the missing prevs. Typically, + this will be the server we got `event` from. + event: an event to check for missing prevs. + + Returns: + if we already had all the prev events, `None`. Otherwise, returns a list of + the events in the state at `event`. + """ + room_id = event.room_id + event_id = event.event_id + + prevs = set(event.prev_event_ids()) + seen = await self.store.have_events_in_timeline(prevs) + missing_prevs = prevs - seen + + if not missing_prevs: + return None + + logger.info( + "Event %s is missing prev_events %s: calculating state for a " + "backwards extremity", + event_id, + shortstr(missing_prevs), + ) + # Calculate the state after each of the previous events, and + # resolve them to find the correct state at the current event. + event_map = {event_id: event} + try: + # Get the state of the events we know about + ours = await self.state_store.get_state_groups_ids(room_id, seen) + + # state_maps is a list of mappings from (type, state_key) to event_id + state_maps: List[StateMap[str]] = list(ours.values()) + + # we don't need this any more, let's delete it. + del ours + + # Ask the remote server for the states we don't + # know about + for p in missing_prevs: + logger.info("Requesting state after missing prev_event %s", p) + + with nested_logging_context(p): + # note that if any of the missing prevs share missing state or + # auth events, the requests to fetch those events are deduped + # by the get_pdu_cache in federation_client. + remote_state = await self._get_state_after_missing_prev_event( + dest, room_id, p + ) + + remote_state_map = { + (x.type, x.state_key): x.event_id for x in remote_state + } + state_maps.append(remote_state_map) + + for x in remote_state: + event_map[x.event_id] = x + + room_version = await self.store.get_room_version_id(room_id) + state_map = await self._state_resolution_handler.resolve_events_with_store( + room_id, + room_version, + state_maps, + event_map, + state_res_store=StateResolutionStore(self.store), + ) + + # We need to give _process_received_pdu the actual state events + # rather than event ids, so generate that now. + + # First though we need to fetch all the events that are in + # state_map, so we can build up the state below. + evs = await self.store.get_events( + list(state_map.values()), + get_prev_content=False, + redact_behaviour=EventRedactBehaviour.AS_IS, + ) + event_map.update(evs) + + state = [event_map[e] for e in state_map.values()] + except Exception: + logger.warning( + "Error attempting to resolve state at missing prev_events", + exc_info=True, + ) + raise FederationError( + "ERROR", + 403, + "We can't get valid state history.", + affected=event_id, + ) + return state + + async def _get_state_after_missing_prev_event( + self, + destination: str, + room_id: str, + event_id: str, + ) -> List[EventBase]: + """Requests all of the room state at a given event from a remote homeserver. + + Args: + destination: The remote homeserver to query for the state. + room_id: The id of the room we're interested in. + event_id: The id of the event we want the state at. + + Returns: + A list of events in the state, including the event itself + """ + ( + state_event_ids, + auth_event_ids, + ) = await self.federation_client.get_room_state_ids( + destination, room_id, event_id=event_id + ) + + logger.debug( + "state_ids returned %i state events, %i auth events", + len(state_event_ids), + len(auth_event_ids), + ) + + # start by just trying to fetch the events from the store + desired_events = set(state_event_ids) + desired_events.add(event_id) + logger.debug("Fetching %i events from cache/store", len(desired_events)) + fetched_events = await self.store.get_events( + desired_events, allow_rejected=True + ) + + missing_desired_events = desired_events - fetched_events.keys() + logger.debug( + "We are missing %i events (got %i)", + len(missing_desired_events), + len(fetched_events), + ) + + # We probably won't need most of the auth events, so let's just check which + # we have for now, rather than thrashing the event cache with them all + # unnecessarily. + + # TODO: we probably won't actually need all of the auth events, since we + # already have a bunch of the state events. It would be nice if the + # federation api gave us a way of finding out which we actually need. + + missing_auth_events = set(auth_event_ids) - fetched_events.keys() + missing_auth_events.difference_update( + await self.store.have_seen_events(room_id, missing_auth_events) + ) + logger.debug("We are also missing %i auth events", len(missing_auth_events)) + + missing_events = missing_desired_events | missing_auth_events + logger.debug("Fetching %i events from remote", len(missing_events)) + await self._get_events_and_persist( + destination=destination, room_id=room_id, events=missing_events + ) + + # we need to make sure we re-load from the database to get the rejected + # state correct. + fetched_events.update( + await self.store.get_events(missing_desired_events, allow_rejected=True) + ) + + # check for events which were in the wrong room. + # + # this can happen if a remote server claims that the state or + # auth_events at an event in room A are actually events in room B + + bad_events = [ + (event_id, event.room_id) + for event_id, event in fetched_events.items() + if event.room_id != room_id + ] + + for bad_event_id, bad_room_id in bad_events: + # This is a bogus situation, but since we may only discover it a long time + # after it happened, we try our best to carry on, by just omitting the + # bad events from the returned state set. + logger.warning( + "Remote server %s claims event %s in room %s is an auth/state " + "event in room %s", + destination, + bad_event_id, + bad_room_id, + room_id, + ) + + del fetched_events[bad_event_id] + + # if we couldn't get the prev event in question, that's a problem. + remote_event = fetched_events.get(event_id) + if not remote_event: + raise Exception("Unable to get missing prev_event %s" % (event_id,)) + + # missing state at that event is a warning, not a blocker + # XXX: this doesn't sound right? it means that we'll end up with incomplete + # state. + failed_to_fetch = desired_events - fetched_events.keys() + if failed_to_fetch: + logger.warning( + "Failed to fetch missing state events for %s %s", + event_id, + failed_to_fetch, + ) + + remote_state = [ + fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events + ] + + if remote_event.is_state() and remote_event.rejected_reason is None: + remote_state.append(remote_event) + + return remote_state + + async def _process_received_pdu( + self, + origin: str, + event: EventBase, + state: Optional[Iterable[EventBase]], + backfilled: bool = False, + ) -> None: + """Called when we have a new pdu. We need to do auth checks and put it + through the StateHandler. + + Args: + origin: server sending the event + + event: event to be persisted + + state: Normally None, but if we are handling a gap in the graph + (ie, we are missing one or more prev_events), the resolved state at the + event + + backfilled: True if this is part of a historical batch of events (inhibits + notification to clients, and validation of device keys.) + """ + logger.debug("Processing event: %s", event) + + try: + context = await self.state_handler.compute_event_context( + event, old_state=state + ) + await self._auth_and_persist_event( + origin, event, context, state=state, backfilled=backfilled + ) + except AuthError as e: + raise FederationError("ERROR", e.code, e.msg, affected=event.event_id) + + if backfilled: + return + + # For encrypted messages we check that we know about the sending device, + # if we don't then we mark the device cache for that user as stale. + if event.type == EventTypes.Encrypted: + device_id = event.content.get("device_id") + sender_key = event.content.get("sender_key") + + cached_devices = await self.store.get_cached_devices_for_user(event.sender) + + resync = False # Whether we should resync device lists. + + device = None + if device_id is not None: + device = cached_devices.get(device_id) + if device is None: + logger.info( + "Received event from remote device not in our cache: %s %s", + event.sender, + device_id, + ) + resync = True + + # We also check if the `sender_key` matches what we expect. + if sender_key is not None: + # Figure out what sender key we're expecting. If we know the + # device and recognize the algorithm then we can work out the + # exact key to expect. Otherwise check it matches any key we + # have for that device. + + current_keys: Container[str] = [] + + if device: + keys = device.get("keys", {}).get("keys", {}) + + if ( + event.content.get("algorithm") + == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2 + ): + # For this algorithm we expect a curve25519 key. + key_name = "curve25519:%s" % (device_id,) + current_keys = [keys.get(key_name)] + else: + # We don't know understand the algorithm, so we just + # check it matches a key for the device. + current_keys = keys.values() + elif device_id: + # We don't have any keys for the device ID. + pass + else: + # The event didn't include a device ID, so we just look for + # keys across all devices. + current_keys = [ + key + for device in cached_devices.values() + for key in device.get("keys", {}).get("keys", {}).values() + ] + + # We now check that the sender key matches (one of) the expected + # keys. + if sender_key not in current_keys: + logger.info( + "Received event from remote device with unexpected sender key: %s %s: %s", + event.sender, + device_id or "", + sender_key, + ) + resync = True + + if resync: + run_as_background_process( + "resync_device_due_to_pdu", + self._resync_device, + event.sender, + ) + + await self._handle_marker_event(origin, event) + + async def _resync_device(self, sender: str) -> None: + """We have detected that the device list for the given user may be out + of sync, so we try and resync them. + """ + + try: + await self.store.mark_remote_user_device_cache_as_stale(sender) + + # Immediately attempt a resync in the background + if self.config.worker_app: + await self._user_device_resync(user_id=sender) + else: + await self._device_list_updater.user_device_resync(sender) + except Exception: + logger.exception("Failed to resync device for %s", sender) + + async def _handle_marker_event(self, origin: str, marker_event: EventBase): + """Handles backfilling the insertion event when we receive a marker + event that points to one. + + Args: + origin: Origin of the event. Will be called to get the insertion event + marker_event: The event to process + """ + + if marker_event.type != EventTypes.MSC2716_MARKER: + # Not a marker event + return + + if marker_event.rejected_reason is not None: + # Rejected event + return + + # Skip processing a marker event if the room version doesn't + # support it. + room_version = await self.store.get_room_version(marker_event.room_id) + if not room_version.msc2716_historical: + return + + logger.debug("_handle_marker_event: received %s", marker_event) + + insertion_event_id = marker_event.content.get( + EventContentFields.MSC2716_MARKER_INSERTION + ) + + if insertion_event_id is None: + # Nothing to retrieve then (invalid marker) + return + + logger.debug( + "_handle_marker_event: backfilling insertion event %s", insertion_event_id + ) + + await self._get_events_and_persist( + origin, + marker_event.room_id, + [insertion_event_id], + ) + + insertion_event = await self.store.get_event( + insertion_event_id, allow_none=True + ) + if insertion_event is None: + logger.warning( + "_handle_marker_event: server %s didn't return insertion event %s for marker %s", + origin, + insertion_event_id, + marker_event.event_id, + ) + return + + logger.debug( + "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s", + insertion_event, + marker_event, + ) + + await self.store.insert_insertion_extremity( + insertion_event_id, marker_event.room_id + ) + + logger.debug( + "_handle_marker_event: insertion extremity added for %s from marker event %s", + insertion_event, + marker_event, + ) + + async def _get_events_and_persist( + self, destination: str, room_id: str, events: Iterable[str] + ) -> None: + """Fetch the given events from a server, and persist them as outliers. + + This function *does not* recursively get missing auth events of the + newly fetched events. Callers must include in the `events` argument + any missing events from the auth chain. + + Logs a warning if we can't find the given event. + """ + + room_version = await self.store.get_room_version(room_id) + + event_map: Dict[str, EventBase] = {} + + async def get_event(event_id: str): + with nested_logging_context(event_id): + try: + event = await self.federation_client.get_pdu( + [destination], + event_id, + room_version, + outlier=True, + ) + if event is None: + logger.warning( + "Server %s didn't return event %s", + destination, + event_id, + ) + return + + event_map[event.event_id] = event + + except Exception as e: + logger.warning( + "Error fetching missing state/auth event %s: %s %s", + event_id, + type(e), + e, + ) + + await concurrently_execute(get_event, events, 5) + + # Make a map of auth events for each event. We do this after fetching + # all the events as some of the events' auth events will be in the list + # of requested events. + + auth_events = [ + aid + for event in event_map.values() + for aid in event.auth_event_ids() + if aid not in event_map + ] + persisted_events = await self.store.get_events( + auth_events, + allow_rejected=True, + ) + + event_infos = [] + for event in event_map.values(): + auth = {} + for auth_event_id in event.auth_event_ids(): + ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id) + if ae: + auth[(ae.type, ae.state_key)] = ae + else: + logger.info("Missing auth event %s", auth_event_id) + + event_infos.append(_NewEventInfo(event, auth)) + + if event_infos: + await self._auth_and_persist_events( + destination, + room_id, + event_infos, + ) + + async def _auth_and_persist_events( + self, + origin: str, + room_id: str, + event_infos: Collection[_NewEventInfo], + ) -> None: + """Creates the appropriate contexts and persists events. The events + should not depend on one another, e.g. this should be used to persist + a bunch of outliers, but not a chunk of individual events that depend + on each other for state calculations. + + Notifies about the events where appropriate. + """ + + if not event_infos: + return + + async def prep(ev_info: _NewEventInfo): + event = ev_info.event + with nested_logging_context(suffix=event.event_id): + res = await self.state_handler.compute_event_context(event) + res = await self._check_event_auth( + origin, + event, + res, + claimed_auth_event_map=ev_info.claimed_auth_event_map, + ) + return res + + contexts = await make_deferred_yieldable( + defer.gatherResults( + [run_in_background(prep, ev_info) for ev_info in event_infos], + consumeErrors=True, + ) + ) + + await self.persist_events_and_notify( + room_id, + [ + (ev_info.event, context) + for ev_info, context in zip(event_infos, contexts) + ], + ) + + async def _auth_and_persist_event( + self, + origin: str, + event: EventBase, + context: EventContext, + state: Optional[Iterable[EventBase]] = None, + claimed_auth_event_map: Optional[StateMap[EventBase]] = None, + backfilled: bool = False, + ) -> None: + """ + Process an event by performing auth checks and then persisting to the database. + + Args: + origin: The host the event originates from. + event: The event itself. + context: + The event context. + + state: + The state events used to check the event for soft-fail. If this is + not provided the current state events will be used. + + claimed_auth_event_map: + A map of (type, state_key) => event for the event's claimed auth_events. + Possibly incomplete, and possibly including events that are not yet + persisted, or authed, or in the right room. + + Only populated where we may not already have persisted these events - + for example, when populating outliers. + + backfilled: True if the event was backfilled. + """ + context = await self._check_event_auth( + origin, + event, + context, + state=state, + claimed_auth_event_map=claimed_auth_event_map, + backfilled=backfilled, + ) + + await self._run_push_actions_and_persist_event(event, context, backfilled) + + async def _check_event_auth( + self, + origin: str, + event: EventBase, + context: EventContext, + state: Optional[Iterable[EventBase]] = None, + claimed_auth_event_map: Optional[StateMap[EventBase]] = None, + backfilled: bool = False, + ) -> EventContext: + """ + Checks whether an event should be rejected (for failing auth checks). + + Args: + origin: The host the event originates from. + event: The event itself. + context: + The event context. + + state: + The state events used to check the event for soft-fail. If this is + not provided the current state events will be used. + + claimed_auth_event_map: + A map of (type, state_key) => event for the event's claimed auth_events. + Possibly incomplete, and possibly including events that are not yet + persisted, or authed, or in the right room. + + Only populated where we may not already have persisted these events - + for example, when populating outliers, or the state for a backwards + extremity. + + backfilled: True if the event was backfilled. + + Returns: + The updated context object. + """ + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + if claimed_auth_event_map: + # if we have a copy of the auth events from the event, use that as the + # basis for auth. + auth_events = claimed_auth_event_map + else: + # otherwise, we calculate what the auth events *should* be, and use that + prev_state_ids = await context.get_prev_state_ids() + auth_events_ids = self._event_auth_handler.compute_auth_events( + event, prev_state_ids, for_verification=True + ) + auth_events_x = await self.store.get_events(auth_events_ids) + auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()} + + try: + ( + context, + auth_events_for_auth, + ) = await self._update_auth_events_and_context_for_auth( + origin, event, context, auth_events + ) + except Exception: + # We don't really mind if the above fails, so lets not fail + # processing if it does. However, it really shouldn't fail so + # let's still log as an exception since we'll still want to fix + # any bugs. + logger.exception( + "Failed to double check auth events for %s with remote. " + "Ignoring failure and continuing processing of event.", + event.event_id, + ) + auth_events_for_auth = auth_events + + try: + event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth) + except AuthError as e: + logger.warning("Failed auth resolution for %r because %s", event, e) + context.rejected = RejectedReason.AUTH_ERROR + + if not context.rejected: + await self._check_for_soft_fail(event, state, backfilled, origin=origin) + + if event.type == EventTypes.GuestAccess and not context.rejected: + await self.maybe_kick_guest_users(event) + + # If we are going to send this event over federation we precaclculate + # the joined hosts. + if event.internal_metadata.get_send_on_behalf_of(): + await self.event_creation_handler.cache_joined_hosts_for_event( + event, context + ) + + return context + + async def _check_for_soft_fail( + self, + event: EventBase, + state: Optional[Iterable[EventBase]], + backfilled: bool, + origin: str, + ) -> None: + """Checks if we should soft fail the event; if so, marks the event as + such. + + Args: + event + state: The state at the event if we don't have all the event's prev events + backfilled: Whether the event is from backfill + origin: The host the event originates from. + """ + # For new (non-backfilled and non-outlier) events we check if the event + # passes auth based on the current state. If it doesn't then we + # "soft-fail" the event. + if backfilled or event.internal_metadata.is_outlier(): + return + + extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id) + extrem_ids = set(extrem_ids_list) + prev_event_ids = set(event.prev_event_ids()) + + if extrem_ids == prev_event_ids: + # If they're the same then the current state is the same as the + # state at the event, so no point rechecking auth for soft fail. + return + + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + # Calculate the "current state". + if state is not None: + # If we're explicitly given the state then we won't have all the + # prev events, and so we have a gap in the graph. In this case + # we want to be a little careful as we might have been down for + # a while and have an incorrect view of the current state, + # however we still want to do checks as gaps are easy to + # maliciously manufacture. + # + # So we use a "current state" that is actually a state + # resolution across the current forward extremities and the + # given state at the event. This should correctly handle cases + # like bans, especially with state res v2. + + state_sets_d = await self.state_store.get_state_groups( + event.room_id, extrem_ids + ) + state_sets: List[Iterable[EventBase]] = list(state_sets_d.values()) + state_sets.append(state) + current_states = await self.state_handler.resolve_events( + room_version, state_sets, event + ) + current_state_ids: StateMap[str] = { + k: e.event_id for k, e in current_states.items() + } + else: + current_state_ids = await self.state_handler.get_current_state_ids( + event.room_id, latest_event_ids=extrem_ids + ) + + logger.debug( + "Doing soft-fail check for %s: state %s", + event.event_id, + current_state_ids, + ) + + # Now check if event pass auth against said current state + auth_types = auth_types_for_event(room_version_obj, event) + current_state_ids_list = [ + e for k, e in current_state_ids.items() if k in auth_types + ] + + auth_events_map = await self.store.get_events(current_state_ids_list) + current_auth_events = { + (e.type, e.state_key): e for e in auth_events_map.values() + } + + try: + event_auth.check(room_version_obj, event, auth_events=current_auth_events) + except AuthError as e: + logger.warning( + "Soft-failing %r (from %s) because %s", + event, + e, + origin, + extra={ + "room_id": event.room_id, + "mxid": event.sender, + "hs": origin, + }, + ) + soft_failed_event_counter.inc() + event.internal_metadata.soft_failed = True + + async def _update_auth_events_and_context_for_auth( + self, + origin: str, + event: EventBase, + context: EventContext, + input_auth_events: StateMap[EventBase], + ) -> Tuple[EventContext, StateMap[EventBase]]: + """Helper for _check_event_auth. See there for docs. + + Checks whether a given event has the expected auth events. If it + doesn't then we talk to the remote server to compare state to see if + we can come to a consensus (e.g. if one server missed some valid + state). + + This attempts to resolve any potential divergence of state between + servers, but is not essential and so failures should not block further + processing of the event. + + Args: + origin: + event: + context: + + input_auth_events: + Map from (event_type, state_key) to event + + Normally, our calculated auth_events based on the state of the room + at the event's position in the DAG, though occasionally (eg if the + event is an outlier), may be the auth events claimed by the remote + server. + + Returns: + updated context, updated auth event map + """ + # take a copy of input_auth_events before we modify it. + auth_events: MutableStateMap[EventBase] = dict(input_auth_events) + + event_auth_events = set(event.auth_event_ids()) + + # missing_auth is the set of the event's auth_events which we don't yet have + # in auth_events. + missing_auth = event_auth_events.difference( + e.event_id for e in auth_events.values() + ) + + # if we have missing events, we need to fetch those events from somewhere. + # + # we start by checking if they are in the store, and then try calling /event_auth/. + if missing_auth: + have_events = await self.store.have_seen_events(event.room_id, missing_auth) + logger.debug("Events %s are in the store", have_events) + missing_auth.difference_update(have_events) + + if missing_auth: + # If we don't have all the auth events, we need to get them. + logger.info("auth_events contains unknown events: %s", missing_auth) + try: + try: + remote_auth_chain = await self.federation_client.get_event_auth( + origin, event.room_id, event.event_id + ) + except RequestSendFailed as e1: + # The other side isn't around or doesn't implement the + # endpoint, so lets just bail out. + logger.info("Failed to get event auth from remote: %s", e1) + return context, auth_events + + seen_remotes = await self.store.have_seen_events( + event.room_id, [e.event_id for e in remote_auth_chain] + ) + + for e in remote_auth_chain: + if e.event_id in seen_remotes: + continue + + if e.event_id == event.event_id: + continue + + try: + auth_ids = e.auth_event_ids() + auth = { + (e.type, e.state_key): e + for e in remote_auth_chain + if e.event_id in auth_ids or e.type == EventTypes.Create + } + e.internal_metadata.outlier = True + + logger.debug( + "_check_event_auth %s missing_auth: %s", + event.event_id, + e.event_id, + ) + missing_auth_event_context = ( + await self.state_handler.compute_event_context(e) + ) + await self._auth_and_persist_event( + origin, + e, + missing_auth_event_context, + claimed_auth_event_map=auth, + ) + + if e.event_id in event_auth_events: + auth_events[(e.type, e.state_key)] = e + except AuthError: + pass + + except Exception: + logger.exception("Failed to get auth chain") + + if event.internal_metadata.is_outlier(): + # XXX: given that, for an outlier, we'll be working with the + # event's *claimed* auth events rather than those we calculated: + # (a) is there any point in this test, since different_auth below will + # obviously be empty + # (b) alternatively, why don't we do it earlier? + logger.info("Skipping auth_event fetch for outlier") + return context, auth_events + + different_auth = event_auth_events.difference( + e.event_id for e in auth_events.values() + ) + + if not different_auth: + return context, auth_events + + logger.info( + "auth_events refers to events which are not in our calculated auth " + "chain: %s", + different_auth, + ) + + # XXX: currently this checks for redactions but I'm not convinced that is + # necessary? + different_events = await self.store.get_events_as_list(different_auth) + + for d in different_events: + if d.room_id != event.room_id: + logger.warning( + "Event %s refers to auth_event %s which is in a different room", + event.event_id, + d.event_id, + ) + + # don't attempt to resolve the claimed auth events against our own + # in this case: just use our own auth events. + # + # XXX: should we reject the event in this case? It feels like we should, + # but then shouldn't we also do so if we've failed to fetch any of the + # auth events? + return context, auth_events + + # now we state-resolve between our own idea of the auth events, and the remote's + # idea of them. + + local_state = auth_events.values() + remote_auth_events = dict(auth_events) + remote_auth_events.update({(d.type, d.state_key): d for d in different_events}) + remote_state = remote_auth_events.values() + + room_version = await self.store.get_room_version_id(event.room_id) + new_state = await self.state_handler.resolve_events( + room_version, (local_state, remote_state), event + ) + + logger.info( + "After state res: updating auth_events with new state %s", + { + (d.type, d.state_key): d.event_id + for d in new_state.values() + if auth_events.get((d.type, d.state_key)) != d + }, + ) + + auth_events.update(new_state) + + context = await self._update_context_for_auth_events( + event, context, auth_events + ) + + return context, auth_events + + async def _update_context_for_auth_events( + self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase] + ) -> EventContext: + """Update the state_ids in an event context after auth event resolution, + storing the changes as a new state group. + + Args: + event: The event we're handling the context for + + context: initial event context + + auth_events: Events to update in the event context. + + Returns: + new event context + """ + # exclude the state key of the new event from the current_state in the context. + if event.is_state(): + event_key: Optional[Tuple[str, str]] = (event.type, event.state_key) + else: + event_key = None + state_updates = { + k: a.event_id for k, a in auth_events.items() if k != event_key + } + + current_state_ids = await context.get_current_state_ids() + current_state_ids = dict(current_state_ids) # type: ignore + + current_state_ids.update(state_updates) + + prev_state_ids = await context.get_prev_state_ids() + prev_state_ids = dict(prev_state_ids) + + prev_state_ids.update({k: a.event_id for k, a in auth_events.items()}) + + # create a new state group as a delta from the existing one. + prev_group = context.state_group + state_group = await self.state_store.store_state_group( + event.event_id, + event.room_id, + prev_group=prev_group, + delta_ids=state_updates, + current_state_ids=current_state_ids, + ) + + return EventContext.with_state( + state_group=state_group, + state_group_before_event=context.state_group_before_event, + current_state_ids=current_state_ids, + prev_state_ids=prev_state_ids, + prev_group=prev_group, + delta_ids=state_updates, + ) + + async def _run_push_actions_and_persist_event( + self, event: EventBase, context: EventContext, backfilled: bool = False + ): + """Run the push actions for a received event, and persist it. + + Args: + event: The event itself. + context: The event context. + backfilled: True if the event was backfilled. + """ + try: + if ( + not event.internal_metadata.is_outlier() + and not backfilled + and not context.rejected + and (await self.store.get_min_depth(event.room_id)) <= event.depth + ): + await self.action_generator.handle_push_actions_for_event( + event, context + ) + + await self.persist_events_and_notify( + event.room_id, [(event, context)], backfilled=backfilled + ) + except Exception: + run_in_background( + self.store.remove_push_actions_from_staging, event.event_id + ) + raise + + async def persist_events_and_notify( + self, + room_id: str, + event_and_contexts: Sequence[Tuple[EventBase, EventContext]], + backfilled: bool = False, + ) -> int: + """Persists events and tells the notifier/pushers about them, if + necessary. + + Args: + room_id: The room ID of events being persisted. + event_and_contexts: Sequence of events with their associated + context that should be persisted. All events must belong to + the same room. + backfilled: Whether these events are a result of + backfilling or not + + Returns: + The stream ID after which all events have been persisted. + """ + if not event_and_contexts: + return self.store.get_current_events_token() + + instance = self.config.worker.events_shard_config.get_instance(room_id) + if instance != self._instance_name: + # Limit the number of events sent over replication. We choose 200 + # here as that is what we default to in `max_request_body_size(..)` + for batch in batch_iter(event_and_contexts, 200): + result = await self._send_events( + instance_name=instance, + store=self.store, + room_id=room_id, + event_and_contexts=batch, + backfilled=backfilled, + ) + return result["max_stream_id"] + else: + assert self.storage.persistence + + # Note that this returns the events that were persisted, which may not be + # the same as were passed in if some were deduplicated due to transaction IDs. + events, max_stream_token = await self.storage.persistence.persist_events( + event_and_contexts, backfilled=backfilled + ) + + if self._ephemeral_messages_enabled: + for event in events: + # If there's an expiry timestamp on the event, schedule its expiry. + self._message_handler.maybe_schedule_expiry(event) + + if not backfilled: # Never notify for backfilled events + for event in events: + await self._notify_persisted_event(event, max_stream_token) + + return max_stream_token.stream + + async def _notify_persisted_event( + self, event: EventBase, max_stream_token: RoomStreamToken + ) -> None: + """Checks to see if notifier/pushers should be notified about the + event or not. + + Args: + event: + max_stream_token: The max_stream_id returned by persist_events + """ + + extra_users = [] + if event.type == EventTypes.Member: + target_user_id = event.state_key + + # We notify for memberships if its an invite for one of our + # users + if event.internal_metadata.is_outlier(): + if event.membership != Membership.INVITE: + if not self.is_mine_id(target_user_id): + return + + target_user = UserID.from_string(target_user_id) + extra_users.append(target_user) + elif event.internal_metadata.is_outlier(): + return + + # the event has been persisted so it should have a stream ordering. + assert event.internal_metadata.stream_ordering + + event_pos = PersistedEventPosition( + self._instance_name, event.internal_metadata.stream_ordering + ) + self.notifier.on_new_room_event( + event, event_pos, max_stream_token, extra_users=extra_users + ) + + def _sanity_check_event(self, ev: EventBase) -> None: + """ + Do some early sanity checks of a received event + + In particular, checks it doesn't have an excessive number of + prev_events or auth_events, which could cause a huge state resolution + or cascade of event fetches. + + Args: + ev: event to be checked + + Raises: + SynapseError if the event does not pass muster + """ + if len(ev.prev_event_ids()) > 20: + logger.warning( + "Rejecting event %s which has %i prev_events", + ev.event_id, + len(ev.prev_event_ids()), + ) + raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events") + + if len(ev.auth_event_ids()) > 10: + logger.warning( + "Rejecting event %s which has %i auth_events", + ev.event_id, + len(ev.auth_event_ids()), + ) + raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events") + + async def get_min_depth_for_context(self, context: str) -> int: + return await self.store.get_min_depth(context) diff --git a/synapse/replication/http/federation.py b/synapse/replication/http/federation.py index 79cadb7b57..a0b3145f4e 100644 --- a/synapse/replication/http/federation.py +++ b/synapse/replication/http/federation.py @@ -62,7 +62,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): self.store = hs.get_datastore() self.storage = hs.get_storage() self.clock = hs.get_clock() - self.federation_handler = hs.get_federation_handler() + self.federation_event_handler = hs.get_federation_event_handler() @staticmethod async def _serialize_payload(store, room_id, event_and_contexts, backfilled): @@ -127,7 +127,7 @@ class ReplicationFederationSendEventsRestServlet(ReplicationEndpoint): logger.info("Got %d events from federation", len(event_and_contexts)) - max_stream_id = await self.federation_handler.persist_events_and_notify( + max_stream_id = await self.federation_event_handler.persist_events_and_notify( room_id, event_and_contexts, backfilled ) diff --git a/synapse/server.py b/synapse/server.py index de6517663e..5adeeff61a 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -76,6 +76,7 @@ from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler from synapse.handlers.event_auth import EventAuthHandler from synapse.handlers.events import EventHandler, EventStreamHandler from synapse.handlers.federation import FederationHandler +from synapse.handlers.federation_event import FederationEventHandler from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler from synapse.handlers.identity import IdentityHandler from synapse.handlers.initial_sync import InitialSyncHandler @@ -546,6 +547,10 @@ class HomeServer(metaclass=abc.ABCMeta): def get_federation_handler(self) -> FederationHandler: return FederationHandler(self) + @cache_in_self + def get_federation_event_handler(self) -> FederationEventHandler: + return FederationEventHandler(self) + @cache_in_self def get_identity_handler(self) -> IdentityHandler: return IdentityHandler(self) diff --git a/tests/federation/transport/test_knocking.py b/tests/federation/transport/test_knocking.py index 383214ab50..663960ff53 100644 --- a/tests/federation/transport/test_knocking.py +++ b/tests/federation/transport/test_knocking.py @@ -208,7 +208,7 @@ class FederationKnockingTestCase( async def _check_event_auth(origin, event, context, *args, **kwargs): return context - homeserver.get_federation_handler()._check_event_auth = _check_event_auth + homeserver.get_federation_event_handler()._check_event_auth = _check_event_auth return super().prepare(reactor, clock, homeserver) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index c72a8972a3..6c67a16de9 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -130,7 +130,9 @@ class FederationTestCase(unittest.HomeserverTestCase): ) with LoggingContext("send_rejected"): - d = run_in_background(self.handler.on_receive_pdu, OTHER_SERVER, ev) + d = run_in_background( + self.hs.get_federation_event_handler().on_receive_pdu, OTHER_SERVER, ev + ) self.get_success(d) # that should have been rejected @@ -182,7 +184,9 @@ class FederationTestCase(unittest.HomeserverTestCase): ) with LoggingContext("send_rejected"): - d = run_in_background(self.handler.on_receive_pdu, OTHER_SERVER, ev) + d = run_in_background( + self.hs.get_federation_event_handler().on_receive_pdu, OTHER_SERVER, ev + ) self.get_success(d) # that should have been rejected @@ -311,7 +315,9 @@ class FederationTestCase(unittest.HomeserverTestCase): with LoggingContext("receive_pdu"): # Fake the OTHER_SERVER federating the message event over to our local homeserver d = run_in_background( - self.handler.on_receive_pdu, OTHER_SERVER, message_event + self.hs.get_federation_event_handler().on_receive_pdu, + OTHER_SERVER, + message_event, ) self.get_success(d) @@ -382,7 +388,9 @@ class FederationTestCase(unittest.HomeserverTestCase): join_event.signatures[other_server] = {"x": "y"} with LoggingContext("send_join"): d = run_in_background( - self.handler.on_send_membership_event, other_server, join_event + self.hs.get_federation_event_handler().on_send_membership_event, + other_server, + join_event, ) self.get_success(d) diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py index 0a52bc8b72..671dc7d083 100644 --- a/tests/handlers/test_presence.py +++ b/tests/handlers/test_presence.py @@ -885,7 +885,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): def prepare(self, reactor, clock, hs): self.federation_sender = hs.get_federation_sender() self.event_builder_factory = hs.get_event_builder_factory() - self.federation_handler = hs.get_federation_handler() + self.federation_event_handler = hs.get_federation_event_handler() self.presence_handler = hs.get_presence_handler() # self.event_builder_for_2 = EventBuilderFactory(hs) @@ -1026,7 +1026,7 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase): builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None) ) - self.get_success(self.federation_handler.on_receive_pdu(hostname, event)) + self.get_success(self.federation_event_handler.on_receive_pdu(hostname, event)) # Check that it was successfully persisted. self.get_success(self.store.get_event(event.event_id)) diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py index af5dfca752..92a5b53e11 100644 --- a/tests/replication/test_federation_sender_shard.py +++ b/tests/replication/test_federation_sender_shard.py @@ -205,7 +205,7 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase): def create_room_with_remote_server(self, user, token, remote_server="other_server"): room = self.helper.create_room_as(user, tok=token) store = self.hs.get_datastore() - federation = self.hs.get_federation_handler() + federation = self.hs.get_federation_event_handler() prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room)) room_version = self.get_success(store.get_room_version(room)) diff --git a/tests/test_federation.py b/tests/test_federation.py index 348fcb72a7..61c9d7c2ef 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -75,7 +75,8 @@ class MessageAcceptTests(unittest.HomeserverTestCase): ) self.handler = self.homeserver.get_federation_handler() - self.handler._check_event_auth = lambda origin, event, context, state, claimed_auth_event_map, backfilled: succeed( + federation_event_handler = self.homeserver.get_federation_event_handler() + federation_event_handler._check_event_auth = lambda origin, event, context, state, claimed_auth_event_map, backfilled: succeed( context ) self.client = self.homeserver.get_federation_client() @@ -85,7 +86,9 @@ class MessageAcceptTests(unittest.HomeserverTestCase): # Send the join, it should return None (which is not an error) self.assertEqual( - self.get_success(self.handler.on_receive_pdu("test.serv", join_event)), + self.get_success( + federation_event_handler.on_receive_pdu("test.serv", join_event) + ), None, ) @@ -129,9 +132,10 @@ class MessageAcceptTests(unittest.HomeserverTestCase): } ) + federation_event_handler = self.homeserver.get_federation_event_handler() with LoggingContext("test-context"): failure = self.get_failure( - self.handler.on_receive_pdu("test.serv", lying_event), + federation_event_handler.on_receive_pdu("test.serv", lying_event), FederationError, ) -- cgit 1.5.1 From dc75fb7f0552e6a9903a7c173672c96610219ec0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Wed, 1 Sep 2021 10:27:58 -0500 Subject: Populate `rooms.creator` field for easy lookup (#10697) Part of https://github.com/matrix-org/synapse/pull/10566 - Fill in creator whenever we insert into the rooms table - Add background update to backfill any missing creator values --- changelog.d/10697.misc | 1 + synapse/api/constants.py | 3 + synapse/handlers/federation.py | 1 + synapse/storage/databases/main/room.py | 97 ++++++++++++++++++++- .../main/delta/63/02populate-rooms-creator.sql | 17 ++++ tests/storage/databases/main/test_room.py | 98 ++++++++++++++++++++++ 6 files changed, 213 insertions(+), 4 deletions(-) create mode 100644 changelog.d/10697.misc create mode 100644 synapse/storage/schema/main/delta/63/02populate-rooms-creator.sql create mode 100644 tests/storage/databases/main/test_room.py (limited to 'synapse/handlers/federation.py') diff --git a/changelog.d/10697.misc b/changelog.d/10697.misc new file mode 100644 index 0000000000..a9ad17faf2 --- /dev/null +++ b/changelog.d/10697.misc @@ -0,0 +1 @@ +Ensure `rooms.creator` field is always populated for easy lookup in [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) usage later. diff --git a/synapse/api/constants.py b/synapse/api/constants.py index 829061c870..5e34eb7e13 100644 --- a/synapse/api/constants.py +++ b/synapse/api/constants.py @@ -198,6 +198,9 @@ class EventContentFields: # cf https://github.com/matrix-org/matrix-doc/pull/1772 ROOM_TYPE = "type" + # The creator of the room, as used in `m.room.create` events. + ROOM_CREATOR = "creator" + # Used on normal messages to indicate they were historically imported after the fact MSC2716_HISTORICAL = "org.matrix.msc2716.historical" # For "insertion" events to indicate what the next chunk ID should be in diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index daf1d3bfb3..77df9185f6 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -507,6 +507,7 @@ class FederationHandler(BaseHandler): await self.store.upsert_room_on_join( room_id=room_id, room_version=room_version_obj, + auth_events=auth_chain, ) max_stream_id = await self._persist_auth_tree( diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index f98b892598..6e7312266d 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -19,9 +19,10 @@ from abc import abstractmethod from enum import Enum from typing import Any, Dict, List, Optional, Tuple -from synapse.api.constants import EventTypes, JoinRules +from synapse.api.constants import EventContentFields, EventTypes, JoinRules from synapse.api.errors import StoreError from synapse.api.room_versions import RoomVersion, RoomVersions +from synapse.events import EventBase from synapse.storage._base import SQLBaseStore, db_to_json from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.databases.main.search import SearchStore @@ -1013,6 +1014,7 @@ class _BackgroundUpdates: ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column" POPULATE_ROOM_DEPTH_MIN_DEPTH2 = "populate_room_depth_min_depth2" REPLACE_ROOM_DEPTH_MIN_DEPTH = "replace_room_depth_min_depth" + POPULATE_ROOMS_CREATOR_COLUMN = "populate_rooms_creator_column" _REPLACE_ROOM_DEPTH_SQL_COMMANDS = ( @@ -1054,6 +1056,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore): self._background_replace_room_depth_min_depth, ) + self.db_pool.updates.register_background_update_handler( + _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN, + self._background_populate_rooms_creator_column, + ) + async def _background_insert_retention(self, progress, batch_size): """Retrieves a list of all rooms within a range and inserts an entry for each of them into the room_retention table. @@ -1273,7 +1280,7 @@ class RoomBackgroundUpdateStore(SQLBaseStore): keyvalues={"room_id": room_id}, retcol="MAX(stream_ordering)", allow_none=True, - desc="upsert_room_on_join", + desc="has_auth_chain_index_fallback", ) return max_ordering is None @@ -1343,6 +1350,65 @@ class RoomBackgroundUpdateStore(SQLBaseStore): return 0 + async def _background_populate_rooms_creator_column( + self, progress: dict, batch_size: int + ): + """Background update to go and add creator information to `rooms` + table from `current_state_events` table. + """ + + last_room_id = progress.get("room_id", "") + + def _background_populate_rooms_creator_column_txn(txn: LoggingTransaction): + sql = """ + SELECT room_id, json FROM event_json + INNER JOIN rooms AS room USING (room_id) + INNER JOIN current_state_events AS state_event USING (room_id, event_id) + WHERE room_id > ? AND (room.creator IS NULL OR room.creator = '') AND state_event.type = 'm.room.create' AND state_event.state_key = '' + ORDER BY room_id + LIMIT ? + """ + + txn.execute(sql, (last_room_id, batch_size)) + room_id_to_create_event_results = txn.fetchall() + + new_last_room_id = "" + for room_id, event_json in room_id_to_create_event_results: + event_dict = db_to_json(event_json) + + creator = event_dict.get("content").get(EventContentFields.ROOM_CREATOR) + + self.db_pool.simple_update_txn( + txn, + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"creator": creator}, + ) + new_last_room_id = room_id + + if new_last_room_id == "": + return True + + self.db_pool.updates._background_update_progress_txn( + txn, + _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN, + {"room_id": new_last_room_id}, + ) + + return False + + end = await self.db_pool.runInteraction( + "_background_populate_rooms_creator_column", + _background_populate_rooms_creator_column_txn, + ) + + if end: + await self.db_pool.updates._end_background_update( + _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN + ) + + return batch_size + class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): def __init__(self, database: DatabasePool, db_conn, hs): @@ -1350,7 +1416,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): self.config = hs.config - async def upsert_room_on_join(self, room_id: str, room_version: RoomVersion): + async def upsert_room_on_join( + self, room_id: str, room_version: RoomVersion, auth_events: List[EventBase] + ): """Ensure that the room is stored in the table Called when we join a room over federation, and overwrites any room version @@ -1361,6 +1429,24 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): # mark the room as having an auth chain cover index. has_auth_chain_index = await self.has_auth_chain_index(room_id) + create_event = None + for e in auth_events: + if (e.type, e.state_key) == (EventTypes.Create, ""): + create_event = e + break + + if create_event is None: + # If the state doesn't have a create event then the room is + # invalid, and it would fail auth checks anyway. + raise StoreError(400, "No create event in state") + + room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + + if not isinstance(room_creator, str): + # If the create event does not have a creator then the room is + # invalid, and it would fail auth checks anyway. + raise StoreError(400, "No creator defined on the create event") + await self.db_pool.simple_upsert( desc="upsert_room_on_join", table="rooms", @@ -1368,7 +1454,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): values={"room_version": room_version.identifier}, insertion_values={ "is_public": False, - "creator": "", + "creator": room_creator, "has_auth_chain_index": has_auth_chain_index, }, # rooms has a unique constraint on room_id, so no need to lock when doing an @@ -1396,6 +1482,9 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore): insertion_values={ "room_version": room_version.identifier, "is_public": False, + # We don't worry about setting the `creator` here because + # we don't process any messages in a room while a user is + # invited (only after the join). "creator": "", "has_auth_chain_index": has_auth_chain_index, }, diff --git a/synapse/storage/schema/main/delta/63/02populate-rooms-creator.sql b/synapse/storage/schema/main/delta/63/02populate-rooms-creator.sql new file mode 100644 index 0000000000..f7c0b31261 --- /dev/null +++ b/synapse/storage/schema/main/delta/63/02populate-rooms-creator.sql @@ -0,0 +1,17 @@ +/* Copyright 2021 The Matrix.org Foundation C.I.C + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +INSERT INTO background_updates (ordering, update_name, progress_json) + VALUES (6302, 'populate_rooms_creator_column', '{}'); diff --git a/tests/storage/databases/main/test_room.py b/tests/storage/databases/main/test_room.py new file mode 100644 index 0000000000..ffee707153 --- /dev/null +++ b/tests/storage/databases/main/test_room.py @@ -0,0 +1,98 @@ +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.rest import admin +from synapse.rest.client import login, room +from synapse.storage.databases.main.room import _BackgroundUpdates + +from tests.unittest import HomeserverTestCase + + +class RoomBackgroundUpdateStoreTestCase(HomeserverTestCase): + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor, clock, hs): + self.store = hs.get_datastore() + self.user_id = self.register_user("foo", "pass") + self.token = self.login("foo", "pass") + + def _generate_room(self) -> str: + room_id = self.helper.create_room_as(self.user_id, tok=self.token) + + return room_id + + def test_background_populate_rooms_creator_column(self): + """Test that the background update to populate the rooms creator column + works properly. + """ + + # Insert a room without the creator + room_id = self._generate_room() + self.get_success( + self.store.db_pool.simple_update( + table="rooms", + keyvalues={"room_id": room_id}, + updatevalues={"creator": None}, + desc="test", + ) + ) + + # Make sure the test is starting out with a room without a creator + room_creator_before = self.get_success( + self.store.db_pool.simple_select_one_onecol( + table="rooms", + keyvalues={"room_id": room_id}, + retcol="creator", + allow_none=True, + ) + ) + self.assertEqual(room_creator_before, None) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": _BackgroundUpdates.POPULATE_ROOMS_CREATOR_COLUMN, + "progress_json": "{}", + }, + ) + ) + + # ... and tell the DataStore that it hasn't finished all updates yet + self.store.db_pool.updates._all_done = False + + # Now let's actually drive the updates to completion + while not self.get_success( + self.store.db_pool.updates.has_completed_background_updates() + ): + self.get_success( + self.store.db_pool.updates.do_next_background_update(100), by=0.1 + ) + + # Make sure the background update filled in the room creator + room_creator_after = self.get_success( + self.store.db_pool.simple_select_one_onecol( + table="rooms", + keyvalues={"room_id": room_id}, + retcol="creator", + allow_none=True, + ) + ) + self.assertEqual(room_creator_after, self.user_id) -- cgit 1.5.1