diff options
-rw-r--r-- | changelog.d/13353.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/13355.misc | 1 | ||||
-rw-r--r-- | changelog.d/13365.bugfix | 1 | ||||
-rw-r--r-- | changelog.d/13383.misc | 1 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 130 | ||||
-rw-r--r-- | synapse/handlers/message.py | 4 | ||||
-rw-r--r-- | synapse/handlers/relations.py | 3 | ||||
-rw-r--r-- | synapse/handlers/room_summary.py | 1 | ||||
-rw-r--r-- | synapse/state/__init__.py | 18 | ||||
-rw-r--r-- | synapse/storage/controllers/state.py | 8 | ||||
-rw-r--r-- | synapse/storage/databases/main/events_worker.py | 20 | ||||
-rw-r--r-- | synapse/storage/databases/main/relations.py | 6 | ||||
-rw-r--r-- | tests/handlers/test_federation.py | 1 | ||||
-rw-r--r-- | tests/storage/test_events.py | 7 | ||||
-rw-r--r-- | tests/test_state.py | 2 |
15 files changed, 153 insertions, 51 deletions
diff --git a/changelog.d/13353.bugfix b/changelog.d/13353.bugfix new file mode 100644 index 0000000000..8e18bfae1f --- /dev/null +++ b/changelog.d/13353.bugfix @@ -0,0 +1 @@ +Fix a bug in the experimental faster-room-joins support which could cause it to get stuck in an infinite loop. diff --git a/changelog.d/13355.misc b/changelog.d/13355.misc new file mode 100644 index 0000000000..7715075885 --- /dev/null +++ b/changelog.d/13355.misc @@ -0,0 +1 @@ +Faster room joins: avoid blocking when pulling events with partially missing prev events. diff --git a/changelog.d/13365.bugfix b/changelog.d/13365.bugfix new file mode 100644 index 0000000000..b915c3158c --- /dev/null +++ b/changelog.d/13365.bugfix @@ -0,0 +1 @@ +Fix a bug introduced in Synapse v1.41.0 where the `/hierarchy` API returned non-standard information (a `room_id` field under each entry in `children_state`). diff --git a/changelog.d/13383.misc b/changelog.d/13383.misc new file mode 100644 index 0000000000..2236eced24 --- /dev/null +++ b/changelog.d/13383.misc @@ -0,0 +1 @@ +Remove an unused argument to `get_relations_for_event`. diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index 16f20c8be7..2ba2b1527e 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -278,7 +278,9 @@ class FederationEventHandler: ) try: - await self._process_received_pdu(origin, pdu, state_ids=None) + await self._process_received_pdu( + origin, pdu, state_ids=None, partial_state=None + ) except PartialStateConflictError: # The room was un-partial stated while we were processing the PDU. # Try once more, with full state this time. @@ -286,7 +288,9 @@ class FederationEventHandler: "Room %s was un-partial stated while processing the PDU, trying again.", room_id, ) - await self._process_received_pdu(origin, pdu, state_ids=None) + await self._process_received_pdu( + origin, pdu, state_ids=None, partial_state=None + ) async def on_send_membership_event( self, origin: str, event: EventBase @@ -534,24 +538,46 @@ class FederationEventHandler: # # This is the same operation as we do when we receive a regular event # over federation. - state_ids = await self._resolve_state_at_missing_prevs(destination, event) - - # build a new state group for it if need be - context = await self._state_handler.compute_event_context( - event, - state_ids_before_event=state_ids, + state_ids, partial_state = await self._resolve_state_at_missing_prevs( + destination, event ) - if context.partial_state: + + # There are three possible cases for (state_ids, partial_state): + # * `state_ids` and `partial_state` are both `None` if we had all the + # prev_events. The prev_events may or may not have partial state and + # we won't know until we compute the event context. + # * `state_ids` is not `None` and `partial_state` is `False` if we were + # missing some prev_events (but we have full state for any we did + # have). We calculated the full state after the prev_events. + # * `state_ids` is not `None` and `partial_state` is `True` if we were + # missing some, but not all, prev_events. At least one of the + # prev_events we did have had partial state, so we calculated a partial + # state after the prev_events. + + context = None + if state_ids is not None and partial_state: + # the state after the prev events is still partial. We can't de-partial + # state the event, so don't bother building the event context. + pass + else: + # build a new state group for it if need be + context = await self._state_handler.compute_event_context( + event, + state_ids_before_event=state_ids, + partial_state=partial_state, + ) + + if context is None or context.partial_state: # this can happen if some or all of the event's prev_events still have - # partial state - ie, an event has an earlier stream_ordering than one - # or more of its prev_events, so we de-partial-state it before its - # prev_events. + # partial state. We were careful to only pick events from the db without + # partial-state prev events, so that implies that a prev event has + # been persisted (with partial state) since we did the query. # - # TODO(faster_joins): we probably need to be more intelligent, and - # exclude partial-state prev_events from consideration - # https://github.com/matrix-org/synapse/issues/13001 + # So, let's just ignore `event` for now; when we re-run the db query + # we should instead get its partial-state prev event, which we will + # de-partial-state, and then come back to event. logger.warning( - "%s still has partial state: can't de-partial-state it yet", + "%s still has prev_events with partial state: can't de-partial-state it yet", event.event_id, ) return @@ -806,14 +832,39 @@ class FederationEventHandler: return try: - state_ids = await self._resolve_state_at_missing_prevs(origin, event) - # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does - # not return partial state - # https://github.com/matrix-org/synapse/issues/13002 + try: + state_ids, partial_state = await self._resolve_state_at_missing_prevs( + origin, event + ) + await self._process_received_pdu( + origin, + event, + state_ids=state_ids, + partial_state=partial_state, + backfilled=backfilled, + ) + except PartialStateConflictError: + # The room was un-partial stated while we were processing the event. + # Try once more, with full state this time. + state_ids, partial_state = await self._resolve_state_at_missing_prevs( + origin, event + ) - await self._process_received_pdu( - origin, event, state_ids=state_ids, backfilled=backfilled - ) + # We ought to have full state now, barring some unlikely race where we left and + # rejoned the room in the background. + if state_ids is not None and partial_state: + raise AssertionError( + f"Event {event.event_id} still has a partial resolved state " + f"after room {event.room_id} was un-partial stated" + ) + + await self._process_received_pdu( + origin, + event, + state_ids=state_ids, + partial_state=partial_state, + backfilled=backfilled, + ) except FederationError as e: if e.code == 403: logger.warning("Pulled event %s failed history check.", event_id) @@ -822,7 +873,7 @@ class FederationEventHandler: async def _resolve_state_at_missing_prevs( self, dest: str, event: EventBase - ) -> Optional[StateMap[str]]: + ) -> Tuple[Optional[StateMap[str]], Optional[bool]]: """Calculate the state at an event with missing prev_events. This is used when we have pulled a batch of events from a remote server, and @@ -849,8 +900,10 @@ class FederationEventHandler: event: an event to check for missing prevs. Returns: - if we already had all the prev events, `None`. Otherwise, returns - the event ids of the state at `event`. + if we already had all the prev events, `None, None`. Otherwise, returns a + tuple containing: + * the event ids of the state at `event`. + * a boolean indicating whether the state may be partial. Raises: FederationError if we fail to get the state from the remote server after any @@ -864,7 +917,7 @@ class FederationEventHandler: missing_prevs = prevs - seen if not missing_prevs: - return None + return None, None logger.info( "Event %s is missing prev_events %s: calculating state for a " @@ -876,9 +929,15 @@ class FederationEventHandler: # resolve them to find the correct state at the current event. try: + # Determine whether we may be about to retrieve partial state + # Events may be un-partial stated right after we compute the partial state + # flag, but that's okay, as long as the flag errs on the conservative side. + partial_state_flags = await self._store.get_partial_state_events(seen) + partial_state = any(partial_state_flags.values()) + # Get the state of the events we know about ours = await self._state_storage_controller.get_state_groups_ids( - room_id, seen + room_id, seen, await_full_state=False ) # state_maps is a list of mappings from (type, state_key) to event_id @@ -924,7 +983,7 @@ class FederationEventHandler: "We can't get valid state history.", affected=event_id, ) - return state_map + return state_map, partial_state async def _get_state_ids_after_missing_prev_event( self, @@ -1094,6 +1153,7 @@ class FederationEventHandler: origin: str, event: EventBase, state_ids: Optional[StateMap[str]], + partial_state: Optional[bool], backfilled: bool = False, ) -> None: """Called when we have a new non-outlier event. @@ -1117,14 +1177,21 @@ class FederationEventHandler: state_ids: Normally None, but if we are handling a gap in the graph (ie, we are missing one or more prev_events), the resolved state at the - event. Must not be partial state. + event + + partial_state: + `True` if `state_ids` is partial and omits non-critical membership + events. + `False` if `state_ids` is the full state. + `None` if `state_ids` is not provided. In this case, the flag will be + calculated based on `event`'s prev events. backfilled: True if this is part of a historical batch of events (inhibits notification to clients, and validation of device keys.) PartialStateConflictError: if the room was un-partial stated in between computing the state at the event and persisting it. The caller should retry - exactly once in this case. Will never be raised if `state_ids` is provided. + exactly once in this case. """ logger.debug("Processing event: %s", event) assert not event.internal_metadata.outlier @@ -1132,6 +1199,7 @@ class FederationEventHandler: context = await self._state_handler.compute_event_context( event, state_ids_before_event=state_ids, + partial_state=partial_state, ) try: await self._check_event_auth(origin, event, context) diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index bd7baef051..e0bcc40b93 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -1135,6 +1135,10 @@ class EventCreationHandler: context = await self.state.compute_event_context( event, state_ids_before_event=state_map_for_event, + # TODO(faster_joins): check how MSC2716 works and whether we can have + # partial state here + # https://github.com/matrix-org/synapse/issues/13003 + partial_state=False, ) else: context = await self.state.compute_event_context(event) diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py index 0b63cd2186..8f797e3ae9 100644 --- a/synapse/handlers/relations.py +++ b/synapse/handlers/relations.py @@ -73,7 +73,6 @@ class RelationsHandler: room_id: str, relation_type: Optional[str] = None, event_type: Optional[str] = None, - aggregation_key: Optional[str] = None, limit: int = 5, direction: str = "b", from_token: Optional[StreamToken] = None, @@ -89,7 +88,6 @@ class RelationsHandler: room_id: The room the event belongs to. relation_type: Only fetch events with this relation type, if given. event_type: Only fetch events with this event type, if given. - aggregation_key: Only fetch events with this aggregation key, if given. limit: Only fetch the most recent `limit` events. direction: Whether to fetch the most recent first (`"b"`) or the oldest first (`"f"`). @@ -122,7 +120,6 @@ class RelationsHandler: room_id=room_id, relation_type=relation_type, event_type=event_type, - aggregation_key=aggregation_key, limit=limit, direction=direction, from_token=from_token, diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py index 13098f56ed..85811b5bde 100644 --- a/synapse/handlers/room_summary.py +++ b/synapse/handlers/room_summary.py @@ -452,7 +452,6 @@ class RoomSummaryHandler: "type": e.type, "state_key": e.state_key, "content": e.content, - "room_id": e.room_id, "sender": e.sender, "origin_server_ts": e.origin_server_ts, } diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 87ccd52f0a..69834de0de 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -255,7 +255,7 @@ class StateHandler: self, event: EventBase, state_ids_before_event: Optional[StateMap[str]] = None, - partial_state: bool = False, + partial_state: Optional[bool] = None, ) -> EventContext: """Build an EventContext structure for a non-outlier event. @@ -270,8 +270,12 @@ class StateHandler: it can't be calculated from existing events. This is normally only specified when receiving an event from federation where we don't have the prev events, e.g. when backfilling. - partial_state: True if `state_ids_before_event` is partial and omits - non-critical membership events + partial_state: + `True` if `state_ids_before_event` is partial and omits non-critical + membership events. + `False` if `state_ids_before_event` is the full state. + `None` when `state_ids_before_event` is not provided. In this case, the + flag will be calculated based on `event`'s prev events. Returns: The event context. """ @@ -298,12 +302,14 @@ class StateHandler: ) ) + # the partial_state flag must be provided + assert partial_state is not None else: # otherwise, we'll need to resolve the state across the prev_events. # partial_state should not be set explicitly in this case: # we work it out dynamically - assert not partial_state + assert partial_state is None # if any of the prev-events have partial state, so do we. # (This is slightly racy - the prev-events might get fixed up before we use @@ -313,13 +319,13 @@ class StateHandler: incomplete_prev_events = await self.store.get_partial_state_events( prev_event_ids ) - if any(incomplete_prev_events.values()): + partial_state = any(incomplete_prev_events.values()) + if partial_state: logger.debug( "New/incoming event %s refers to prev_events %s with partial state", event.event_id, [k for (k, v) in incomplete_prev_events.items() if v], ) - partial_state = True logger.debug("calling resolve_state_groups from compute_event_context") # we've already taken into account partial state, so no need to wait for diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py index e08f956e6e..20805c94fa 100644 --- a/synapse/storage/controllers/state.py +++ b/synapse/storage/controllers/state.py @@ -82,13 +82,15 @@ class StateStorageController: return state_group_delta.prev_group, state_group_delta.delta_ids async def get_state_groups_ids( - self, _room_id: str, event_ids: Collection[str] + self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True ) -> Dict[int, MutableStateMap[str]]: """Get the event IDs of all the state for the state groups for the given events Args: _room_id: id of the room for these events event_ids: ids of the events + await_full_state: if `True`, will block if we do not yet have complete + state at these events. Returns: dict of state_group_id -> (dict of (type, state_key) -> event id) @@ -100,7 +102,9 @@ class StateStorageController: if not event_ids: return {} - event_to_groups = await self.get_state_group_for_events(event_ids) + event_to_groups = await self.get_state_group_for_events( + event_ids, await_full_state=await_full_state + ) groups = set(event_to_groups.values()) group_to_state = await self.stores.state._get_state_for_groups(groups) diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py index 5914a35420..29c99c6357 100644 --- a/synapse/storage/databases/main/events_worker.py +++ b/synapse/storage/databases/main/events_worker.py @@ -2110,11 +2110,29 @@ class EventsWorkerStore(SQLBaseStore): def _get_partial_state_events_batch_txn( txn: LoggingTransaction, room_id: str ) -> List[str]: + # we want to work through the events from oldest to newest, so + # we only want events whose prev_events do *not* have partial state - hence + # the 'NOT EXISTS' clause in the below. + # + # This is necessary because ordering by stream ordering isn't quite enough + # to ensure that we work from oldest to newest event (in particular, + # if an event is initially persisted as an outlier and later de-outliered, + # it can end up with a lower stream_ordering than its prev_events). + # + # Typically this means we'll only return one event per batch, but that's + # hard to do much about. + # + # See also: https://github.com/matrix-org/synapse/issues/13001 txn.execute( """ SELECT event_id FROM partial_state_events AS pse JOIN events USING (event_id) - WHERE pse.room_id = ? + WHERE pse.room_id = ? AND + NOT EXISTS( + SELECT 1 FROM event_edges AS ee + JOIN partial_state_events AS prev_pse ON (prev_pse.event_id=ee.prev_event_id) + WHERE ee.event_id=pse.event_id + ) ORDER BY events.stream_ordering LIMIT 100 """, diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py index b457bc189e..7bd27790eb 100644 --- a/synapse/storage/databases/main/relations.py +++ b/synapse/storage/databases/main/relations.py @@ -62,7 +62,6 @@ class RelationsWorkerStore(SQLBaseStore): room_id: str, relation_type: Optional[str] = None, event_type: Optional[str] = None, - aggregation_key: Optional[str] = None, limit: int = 5, direction: str = "b", from_token: Optional[StreamToken] = None, @@ -76,7 +75,6 @@ class RelationsWorkerStore(SQLBaseStore): room_id: The room the event belongs to. relation_type: Only fetch events with this relation type, if given. event_type: Only fetch events with this event type, if given. - aggregation_key: Only fetch events with this aggregation key, if given. limit: Only fetch the most recent `limit` events. direction: Whether to fetch the most recent first (`"b"`) or the oldest first (`"f"`). @@ -105,10 +103,6 @@ class RelationsWorkerStore(SQLBaseStore): where_clause.append("type = ?") where_args.append(event_type) - if aggregation_key: - where_clause.append("aggregation_key = ?") - where_args.append(aggregation_key) - pagination_clause = generate_pagination_where_clause( direction=direction, column_names=("topological_ordering", "stream_ordering"), diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py index 8a0bb91f40..fb06e5e812 100644 --- a/tests/handlers/test_federation.py +++ b/tests/handlers/test_federation.py @@ -287,6 +287,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase): state_ids={ (e.type, e.state_key): e.event_id for e in current_state }, + partial_state=False, ) ) diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 2ff88e64a5..3ce4f35cb7 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -70,7 +70,11 @@ class ExtremPruneTestCase(HomeserverTestCase): def persist_event(self, event, state=None): """Persist the event, with optional state""" context = self.get_success( - self.state.compute_event_context(event, state_ids_before_event=state) + self.state.compute_event_context( + event, + state_ids_before_event=state, + partial_state=None if state is None else False, + ) ) self.get_success(self._persistence.persist_event(event, context)) @@ -148,6 +152,7 @@ class ExtremPruneTestCase(HomeserverTestCase): self.state.compute_event_context( remote_event_2, state_ids_before_event=state_before_gap, + partial_state=False, ) ) diff --git a/tests/test_state.py b/tests/test_state.py index bafd6d1750..504530b49a 100644 --- a/tests/test_state.py +++ b/tests/test_state.py @@ -462,6 +462,7 @@ class StateTestCase(unittest.TestCase): state_ids_before_event={ (e.type, e.state_key): e.event_id for e in old_state }, + partial_state=False, ) ) @@ -492,6 +493,7 @@ class StateTestCase(unittest.TestCase): state_ids_before_event={ (e.type, e.state_key): e.event_id for e in old_state }, + partial_state=False, ) ) |