From 335ebb21ccc0ae906169f21dcfc456c869bdd301 Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Tue, 26 Jul 2022 12:39:23 +0100 Subject: Faster room joins: avoid blocking when pulling events with missing prevs (#13355) Avoid blocking on full state in `_resolve_state_at_missing_prevs` and return a new flag indicating whether the resolved state is partial. Thread that flag around so that it makes it into the event context. Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> --- tests/handlers/test_federation.py | 1 + 1 file changed, 1 insertion(+) (limited to 'tests/handlers/test_federation.py') diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 8a0bb91f40..fb06e5e812 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -287,6 +287,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): state_ids={ (e.type, e.state_key): e.event_id for e in current_state }, + partial_state=False, ) ) -- cgit 1.5.1 From 224d792dd7827fb53b9ed3b73a99f4acefb456ba Mon Sep 17 00:00:00 2001 From: Sean Quah <8349537+squahtx@users.noreply.github.com> Date: Mon, 1 Aug 2022 13:53:56 +0100 Subject: Refactor `_resolve_state_at_missing_prevs` to return an `EventContext` (#13404) Previously, `_resolve_state_at_missing_prevs` returned the resolved state before an event and a partial state flag. These were unwieldy to carry around would only ever be used to build an event context. Build the event context directly instead. Signed-off-by: Sean Quah --- changelog.d/13404.misc | 1 + synapse/handlers/federation_event.py | 126 ++++++++++++----------------------- synapse/state/__init__.py | 8 +++ synapse/storage/controllers/state.py | 4 ++ tests/handlers/test_federation.py | 15 +++-- 5 files changed, 68 insertions(+), 86 deletions(-) create mode 100644 changelog.d/13404.misc (limited to 'tests/handlers/test_federation.py') diff --git a/changelog.d/13404.misc b/changelog.d/13404.misc new file mode 100644 index 0000000000..655be4061b --- /dev/null +++ b/changelog.d/13404.misc @@ -0,0 +1 @@ +Refactor `_resolve_state_at_missing_prevs` to compute an `EventContext` instead. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index bcc755a376..612e5aaa5b 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -23,7 +23,6 @@ from typing import ( Dict, Iterable, List, - Optional, Sequence, Set, Tuple, @@ -278,9 +277,8 @@ class FederationEventHandler: ) try: - await self._process_received_pdu( - origin, pdu, state_ids=None, partial_state=None - ) + context = await self._state_handler.compute_event_context(pdu) + await self._process_received_pdu(origin, pdu, context) except PartialStateConflictError: # The room was un-partial stated while we were processing the PDU. # Try once more, with full state this time. @@ -288,9 +286,8 @@ class FederationEventHandler: "Room %s was un-partial stated while processing the PDU, trying again.", room_id, ) - await self._process_received_pdu( - origin, pdu, state_ids=None, partial_state=None - ) + context = await self._state_handler.compute_event_context(pdu) + await self._process_received_pdu(origin, pdu, context) async def on_send_membership_event( self, origin: str, event: EventBase @@ -320,6 +317,7 @@ class FederationEventHandler: The event and context of the event after inserting it into the room graph. Raises: + RuntimeError if any prev_events are missing SynapseError if the event is not accepted into the room PartialStateConflictError if the room was un-partial stated in between computing the state at the event and persisting it. The caller should @@ -380,7 +378,7 @@ class FederationEventHandler: # need to. await self._event_creation_handler.cache_joined_hosts_for_event(event, context) - await self._check_for_soft_fail(event, None, origin=origin) + await self._check_for_soft_fail(event, context=context, origin=origin) await self._run_push_actions_and_persist_event(event, context) return event, context @@ -538,36 +536,10 @@ class FederationEventHandler: # # This is the same operation as we do when we receive a regular event # over federation. - state_ids, partial_state = await self._resolve_state_at_missing_prevs( + context = await self._compute_event_context_with_maybe_missing_prevs( destination, event ) - - # There are three possible cases for (state_ids, partial_state): - # * `state_ids` and `partial_state` are both `None` if we had all the - # prev_events. The prev_events may or may not have partial state and - # we won't know until we compute the event context. - # * `state_ids` is not `None` and `partial_state` is `False` if we were - # missing some prev_events (but we have full state for any we did - # have). We calculated the full state after the prev_events. - # * `state_ids` is not `None` and `partial_state` is `True` if we were - # missing some, but not all, prev_events. At least one of the - # prev_events we did have had partial state, so we calculated a partial - # state after the prev_events. - - context = None - if state_ids is not None and partial_state: - # the state after the prev events is still partial. We can't de-partial - # state the event, so don't bother building the event context. - pass - else: - # build a new state group for it if need be - context = await self._state_handler.compute_event_context( - event, - state_ids_before_event=state_ids, - partial_state=partial_state, - ) - - if context is None or context.partial_state: + if context.partial_state: # this can happen if some or all of the event's prev_events still have # partial state. We were careful to only pick events from the db without # partial-state prev events, so that implies that a prev event has @@ -840,26 +812,25 @@ class FederationEventHandler: try: try: - state_ids, partial_state = await self._resolve_state_at_missing_prevs( + context = await self._compute_event_context_with_maybe_missing_prevs( origin, event ) await self._process_received_pdu( origin, event, - state_ids=state_ids, - partial_state=partial_state, + context, backfilled=backfilled, ) except PartialStateConflictError: # The room was un-partial stated while we were processing the event. # Try once more, with full state this time. - state_ids, partial_state = await self._resolve_state_at_missing_prevs( + context = await self._compute_event_context_with_maybe_missing_prevs( origin, event ) # We ought to have full state now, barring some unlikely race where we left and # rejoned the room in the background. - if state_ids is not None and partial_state: + if context.partial_state: raise AssertionError( f"Event {event.event_id} still has a partial resolved state " f"after room {event.room_id} was un-partial stated" @@ -868,8 +839,7 @@ class FederationEventHandler: await self._process_received_pdu( origin, event, - state_ids=state_ids, - partial_state=partial_state, + context, backfilled=backfilled, ) except FederationError as e: @@ -878,15 +848,18 @@ class FederationEventHandler: else: raise - async def _resolve_state_at_missing_prevs( + async def _compute_event_context_with_maybe_missing_prevs( self, dest: str, event: EventBase - ) -> Tuple[Optional[StateMap[str]], Optional[bool]]: - """Calculate the state at an event with missing prev_events. + ) -> EventContext: + """Build an EventContext structure for a non-outlier event whose prev_events may + be missing. - This is used when we have pulled a batch of events from a remote server, and - still don't have all the prev_events. + This is used when we have pulled a batch of events from a remote server, and may + not have all the prev_events. - If we already have all the prev_events for `event`, this method does nothing. + To build an EventContext, we need to calculate the state before the event. If we + already have all the prev_events for `event`, we can simply use the state after + the prev_events to calculate the state before `event`. Otherwise, the missing prevs become new backwards extremities, and we fall back to asking the remote server for the state after each missing `prev_event`, @@ -907,10 +880,7 @@ class FederationEventHandler: event: an event to check for missing prevs. Returns: - if we already had all the prev events, `None, None`. Otherwise, returns a - tuple containing: - * the event ids of the state at `event`. - * a boolean indicating whether the state may be partial. + The event context. Raises: FederationError if we fail to get the state from the remote server after any @@ -924,7 +894,7 @@ class FederationEventHandler: missing_prevs = prevs - seen if not missing_prevs: - return None, None + return await self._state_handler.compute_event_context(event) logger.info( "Event %s is missing prev_events %s: calculating state for a " @@ -990,7 +960,9 @@ class FederationEventHandler: "We can't get valid state history.", affected=event_id, ) - return state_map, partial_state + return await self._state_handler.compute_event_context( + event, state_ids_before_event=state_map, partial_state=partial_state + ) async def _get_state_ids_after_missing_prev_event( self, @@ -1159,8 +1131,7 @@ class FederationEventHandler: self, origin: str, event: EventBase, - state_ids: Optional[StateMap[str]], - partial_state: Optional[bool], + context: EventContext, backfilled: bool = False, ) -> None: """Called when we have a new non-outlier event. @@ -1182,32 +1153,18 @@ class FederationEventHandler: event: event to be persisted - state_ids: Normally None, but if we are handling a gap in the graph - (ie, we are missing one or more prev_events), the resolved state at the - event - - partial_state: - `True` if `state_ids` is partial and omits non-critical membership - events. - `False` if `state_ids` is the full state. - `None` if `state_ids` is not provided. In this case, the flag will be - calculated based on `event`'s prev events. + context: The `EventContext` to persist the event with. backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) PartialStateConflictError: if the room was un-partial stated in between - computing the state at the event and persisting it. The caller should retry - exactly once in this case. + computing the state at the event and persisting it. The caller should + recompute `context` and retry exactly once when this happens. """ logger.debug("Processing event: %s", event) assert not event.internal_metadata.outlier - context = await self._state_handler.compute_event_context( - event, - state_ids_before_event=state_ids, - partial_state=partial_state, - ) try: await self._check_event_auth(origin, event, context) except AuthError as e: @@ -1219,7 +1176,7 @@ class FederationEventHandler: # For new (non-backfilled and non-outlier) events we check if the event # passes auth based on the current state. If it doesn't then we # "soft-fail" the event. - await self._check_for_soft_fail(event, state_ids, origin=origin) + await self._check_for_soft_fail(event, context=context, origin=origin) await self._run_push_actions_and_persist_event(event, context, backfilled) @@ -1782,7 +1739,7 @@ class FederationEventHandler: async def _check_for_soft_fail( self, event: EventBase, - state_ids: Optional[StateMap[str]], + context: EventContext, origin: str, ) -> None: """Checks if we should soft fail the event; if so, marks the event as @@ -1793,7 +1750,7 @@ class FederationEventHandler: Args: event - state_ids: The state at the event if we don't have all the event's prev events + context: The `EventContext` which we are about to persist the event with. origin: The host the event originates from. """ if await self._store.is_partial_state_room(event.room_id): @@ -1819,11 +1776,15 @@ class FederationEventHandler: auth_types = auth_types_for_event(room_version_obj, event) # Calculate the "current state". - if state_ids is not None: - # If we're explicitly given the state then we won't have all the - # prev events, and so we have a gap in the graph. In this case - # we want to be a little careful as we might have been down for - # a while and have an incorrect view of the current state, + seen_event_ids = await self._store.have_events_in_timeline(prev_event_ids) + has_missing_prevs = bool(prev_event_ids - seen_event_ids) + if has_missing_prevs: + # We don't have all the prev_events of this event, which means we have a + # gap in the graph, and the new event is going to become a new backwards + # extremity. + # + # In this case we want to be a little careful as we might have been + # down for a while and have an incorrect view of the current state, # however we still want to do checks as gaps are easy to # maliciously manufacture. # @@ -1836,6 +1797,7 @@ class FederationEventHandler: event.room_id, extrem_ids ) state_sets: List[StateMap[str]] = list(state_sets_d.values()) + state_ids = await context.get_prev_state_ids() state_sets.append(state_ids) current_state_ids = ( await self._state_resolution_handler.resolve_events_with_store( diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 69834de0de..c355e4f98a 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -278,6 +278,10 @@ class StateHandler: flag will be calculated based on `event`'s prev events. Returns: The event context. + + Raises: + RuntimeError if `state_ids_before_event` is not provided and one or more + prev events are missing or outliers. """ assert not event.internal_metadata.is_outlier() @@ -432,6 +436,10 @@ class StateHandler: Returns: The resolved state + + Raises: + RuntimeError if we don't have a state group for one or more of the events + (ie. they are outliers or unknown) """ logger.debug("resolve_state_groups event_ids %s", event_ids) diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index 20805c94fa..1e35046e07 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -338,6 +338,10 @@ class StateStorageController: event_ids: events to get state groups for await_full_state: if true, will block if we do not yet have complete state at these events. + + Raises: + RuntimeError if we don't have a state group for one or more of the events + (ie. they are outliers or unknown) """ if await_full_state: await self._partial_state_events_tracker.await_full_state(event_ids) diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index fb06e5e812..aea96a0986 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -280,16 +280,23 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): # we poke this directly into _process_received_pdu, to avoid the # federation handler wanting to backfill the fake event. - self.get_success( - federation_event_handler._process_received_pdu( - self.OTHER_SERVER_NAME, + state_handler = self.hs.get_state_handler() + context = self.get_success( + state_handler.compute_event_context( event, - state_ids={ + state_ids_before_event={ (e.type, e.state_key): e.event_id for e in current_state }, partial_state=False, ) ) + self.get_success( + federation_event_handler._process_received_pdu( + self.OTHER_SERVER_NAME, + event, + context, + ) + ) # we should now have 8 backwards extremities. backwards_extremities = self.get_success( -- cgit 1.5.1 From e17e5c97e0d2be7a82c782e8ef9774e682968c7b Mon Sep 17 00:00:00 2001 From: reivilibre Date: Mon, 1 Aug 2022 16:45:39 +0000 Subject: Faster Room Joins: don't leave a stuck room partial state flag if the join fails. (#13403) --- changelog.d/13403.misc | 1 + synapse/handlers/federation.py | 32 +++++----- tests/handlers/test_federation.py | 122 +++++++++++++++++++++++++++++++++++++- 3 files changed, 140 insertions(+), 15 deletions(-) create mode 100644 changelog.d/13403.misc (limited to 'tests/handlers/test_federation.py') diff --git a/changelog.d/13403.misc b/changelog.d/13403.misc new file mode 100644 index 0000000000..cb7b38153c --- /dev/null +++ b/changelog.d/13403.misc @@ -0,0 +1 @@ +Faster Room Joins: don't leave a stuck room partial state flag if the join fails. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 3b5eaf5156..1cf6cb32e3 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -546,9 +546,9 @@ class FederationHandler: ) if ret.partial_state: - # TODO(faster_joins): roll this back if we don't manage to start the - # background resync (eg process_remote_join fails) - # https://github.com/matrix-org/synapse/issues/12998 + # Mark the room as having partial state. + # The background process is responsible for unmarking this flag, + # even if the join fails. await self.store.store_partial_state_room(room_id, ret.servers_in_room) try: @@ -574,17 +574,21 @@ class FederationHandler: room_id, ) raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) - - if ret.partial_state: - # Kick off the process of asynchronously fetching the state for this - # room. - run_as_background_process( - desc="sync_partial_state_room", - func=self._sync_partial_state_room, - initial_destination=origin, - other_destinations=ret.servers_in_room, - room_id=room_id, - ) + finally: + # Always kick off the background process that asynchronously fetches + # state for the room. + # If the join failed, the background process is responsible for + # cleaning up — including unmarking the room as a partial state room. + if ret.partial_state: + # Kick off the process of asynchronously fetching the state for this + # room. + run_as_background_process( + desc="sync_partial_state_room", + func=self._sync_partial_state_room, + initial_destination=origin, + other_destinations=ret.servers_in_room, + room_id=room_id, + ) # We wait here until this instance has seen the events come down # replication (if we're using replication) as the below uses caches. diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index aea96a0986..745750b1d7 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -14,6 +14,7 @@ import logging from typing import cast from unittest import TestCase +from unittest.mock import Mock, patch from twisted.test.proto_helpers import MemoryReactor @@ -22,6 +23,7 @@ from synapse.api.errors import AuthError, Codes, LimitExceededError, SynapseErro from synapse.api.room_versions import RoomVersions from synapse.events import EventBase, make_event_from_dict from synapse.federation.federation_base import event_from_pdu_json +from synapse.federation.federation_client import SendJoinResult from synapse.logging.context import LoggingContext, run_in_background from synapse.rest import admin from synapse.rest.client import login, room @@ -30,7 +32,7 @@ from synapse.util import Clock from synapse.util.stringutils import random_string from tests import unittest -from tests.test_utils import event_injection +from tests.test_utils import event_injection, make_awaitable logger = logging.getLogger(__name__) @@ -456,3 +458,121 @@ class EventFromPduTestCase(TestCase): }, RoomVersions.V6, ) + + +class PartialJoinTestCase(unittest.FederatingHomeserverTestCase): + def test_failed_partial_join_is_clean(self) -> None: + """ + Tests that, when failing to partial-join a room, we don't get stuck with + a partial-state flag on a room. + """ + + fed_handler = self.hs.get_federation_handler() + fed_client = fed_handler.federation_client + + room_id = "!room:example.com" + membership_event = make_event_from_dict( + { + "room_id": room_id, + "type": "m.room.member", + "sender": "@alice:test", + "state_key": "@alice:test", + "content": {"membership": "join"}, + }, + RoomVersions.V10, + ) + + mock_make_membership_event = Mock( + return_value=make_awaitable( + ( + "example.com", + membership_event, + RoomVersions.V10, + ) + ) + ) + + EVENT_CREATE = make_event_from_dict( + { + "room_id": room_id, + "type": "m.room.create", + "sender": "@kristina:example.com", + "state_key": "", + "depth": 0, + "content": {"creator": "@kristina:example.com", "room_version": "10"}, + "auth_events": [], + "origin_server_ts": 1, + }, + room_version=RoomVersions.V10, + ) + EVENT_CREATOR_MEMBERSHIP = make_event_from_dict( + { + "room_id": room_id, + "type": "m.room.member", + "sender": "@kristina:example.com", + "state_key": "@kristina:example.com", + "content": {"membership": "join"}, + "depth": 1, + "prev_events": [EVENT_CREATE.event_id], + "auth_events": [EVENT_CREATE.event_id], + "origin_server_ts": 1, + }, + room_version=RoomVersions.V10, + ) + EVENT_INVITATION_MEMBERSHIP = make_event_from_dict( + { + "room_id": room_id, + "type": "m.room.member", + "sender": "@kristina:example.com", + "state_key": "@alice:test", + "content": {"membership": "invite"}, + "depth": 2, + "prev_events": [EVENT_CREATOR_MEMBERSHIP.event_id], + "auth_events": [ + EVENT_CREATE.event_id, + EVENT_CREATOR_MEMBERSHIP.event_id, + ], + "origin_server_ts": 1, + }, + room_version=RoomVersions.V10, + ) + mock_send_join = Mock( + return_value=make_awaitable( + SendJoinResult( + membership_event, + "example.com", + state=[ + EVENT_CREATE, + EVENT_CREATOR_MEMBERSHIP, + EVENT_INVITATION_MEMBERSHIP, + ], + auth_chain=[ + EVENT_CREATE, + EVENT_CREATOR_MEMBERSHIP, + EVENT_INVITATION_MEMBERSHIP, + ], + partial_state=True, + servers_in_room=["example.com"], + ) + ) + ) + + with patch.object( + fed_client, "make_membership_event", mock_make_membership_event + ), patch.object(fed_client, "send_join", mock_send_join): + # Join and check that our join event is rejected + # (The join event is rejected because it doesn't have any signatures) + join_exc = self.get_failure( + fed_handler.do_invite_join(["example.com"], room_id, "@alice:test", {}), + SynapseError, + ) + self.assertIn("Join event was rejected", str(join_exc)) + + store = self.hs.get_datastores().main + + # Check that we don't have a left-over partial_state entry. + self.assertFalse( + self.get_success(store.is_partial_state_room(room_id)), + f"Stale partial-stated room flag left over for {room_id} after a" + f" failed do_invite_join!", + ) -- cgit 1.5.1