summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13353.bugfix1
-rw-r--r--changelog.d/13355.misc1
-rw-r--r--changelog.d/13365.bugfix1
-rw-r--r--changelog.d/13383.misc1
-rw-r--r--synapse/handlers/federation_event.py130
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/relations.py3
-rw-r--r--synapse/handlers/room_summary.py1
-rw-r--r--synapse/state/__init__.py18
-rw-r--r--synapse/storage/controllers/state.py8
-rw-r--r--synapse/storage/databases/main/events_worker.py20
-rw-r--r--synapse/storage/databases/main/relations.py6
-rw-r--r--tests/handlers/test_federation.py1
-rw-r--r--tests/storage/test_events.py7
-rw-r--r--tests/test_state.py2
15 files changed, 153 insertions, 51 deletions
diff --git a/changelog.d/13353.bugfix b/changelog.d/13353.bugfix
new file mode 100644
index 0000000000..8e18bfae1f
--- /dev/null
+++ b/changelog.d/13353.bugfix
@@ -0,0 +1 @@
+Fix a bug in the experimental faster-room-joins support which could cause it to get stuck in an infinite loop.
diff --git a/changelog.d/13355.misc b/changelog.d/13355.misc
new file mode 100644
index 0000000000..7715075885
--- /dev/null
+++ b/changelog.d/13355.misc
@@ -0,0 +1 @@
+Faster room joins: avoid blocking when pulling events with partially missing prev events.
diff --git a/changelog.d/13365.bugfix b/changelog.d/13365.bugfix
new file mode 100644
index 0000000000..b915c3158c
--- /dev/null
+++ b/changelog.d/13365.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse v1.41.0 where the `/hierarchy` API returned non-standard information (a `room_id` field under each entry in `children_state`).
diff --git a/changelog.d/13383.misc b/changelog.d/13383.misc
new file mode 100644
index 0000000000..2236eced24
--- /dev/null
+++ b/changelog.d/13383.misc
@@ -0,0 +1 @@
+Remove an unused argument to `get_relations_for_event`.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 16f20c8be7..2ba2b1527e 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -278,7 +278,9 @@ class FederationEventHandler:
                 )
 
         try:
-            await self._process_received_pdu(origin, pdu, state_ids=None)
+            await self._process_received_pdu(
+                origin, pdu, state_ids=None, partial_state=None
+            )
         except PartialStateConflictError:
             # The room was un-partial stated while we were processing the PDU.
             # Try once more, with full state this time.
@@ -286,7 +288,9 @@ class FederationEventHandler:
                 "Room %s was un-partial stated while processing the PDU, trying again.",
                 room_id,
             )
-            await self._process_received_pdu(origin, pdu, state_ids=None)
+            await self._process_received_pdu(
+                origin, pdu, state_ids=None, partial_state=None
+            )
 
     async def on_send_membership_event(
         self, origin: str, event: EventBase
@@ -534,24 +538,46 @@ class FederationEventHandler:
             #
             # This is the same operation as we do when we receive a regular event
             # over federation.
-            state_ids = await self._resolve_state_at_missing_prevs(destination, event)
-
-            # build a new state group for it if need be
-            context = await self._state_handler.compute_event_context(
-                event,
-                state_ids_before_event=state_ids,
+            state_ids, partial_state = await self._resolve_state_at_missing_prevs(
+                destination, event
             )
-            if context.partial_state:
+
+            # There are three possible cases for (state_ids, partial_state):
+            #   * `state_ids` and `partial_state` are both `None` if we had all the
+            #     prev_events. The prev_events may or may not have partial state and
+            #     we won't know until we compute the event context.
+            #   * `state_ids` is not `None` and `partial_state` is `False` if we were
+            #     missing some prev_events (but we have full state for any we did
+            #     have). We calculated the full state after the prev_events.
+            #   * `state_ids` is not `None` and `partial_state` is `True` if we were
+            #     missing some, but not all, prev_events. At least one of the
+            #     prev_events we did have had partial state, so we calculated a partial
+            #     state after the prev_events.
+
+            context = None
+            if state_ids is not None and partial_state:
+                # the state after the prev events is still partial. We can't de-partial
+                # state the event, so don't bother building the event context.
+                pass
+            else:
+                # build a new state group for it if need be
+                context = await self._state_handler.compute_event_context(
+                    event,
+                    state_ids_before_event=state_ids,
+                    partial_state=partial_state,
+                )
+
+            if context is None or context.partial_state:
                 # this can happen if some or all of the event's prev_events still have
-                # partial state - ie, an event has an earlier stream_ordering than one
-                # or more of its prev_events, so we de-partial-state it before its
-                # prev_events.
+                # partial state. We were careful to only pick events from the db without
+                # partial-state prev events, so that implies that a prev event has
+                # been persisted (with partial state) since we did the query.
                 #
-                # TODO(faster_joins): we probably need to be more intelligent, and
-                #    exclude partial-state prev_events from consideration
-                #    https://github.com/matrix-org/synapse/issues/13001
+                # So, let's just ignore `event` for now; when we re-run the db query
+                # we should instead get its partial-state prev event, which we will
+                # de-partial-state, and then come back to event.
                 logger.warning(
-                    "%s still has partial state: can't de-partial-state it yet",
+                    "%s still has prev_events with partial state: can't de-partial-state it yet",
                     event.event_id,
                 )
                 return
@@ -806,14 +832,39 @@ class FederationEventHandler:
             return
 
         try:
-            state_ids = await self._resolve_state_at_missing_prevs(origin, event)
-            # TODO(faster_joins): make sure that _resolve_state_at_missing_prevs does
-            #   not return partial state
-            #   https://github.com/matrix-org/synapse/issues/13002
+            try:
+                state_ids, partial_state = await self._resolve_state_at_missing_prevs(
+                    origin, event
+                )
+                await self._process_received_pdu(
+                    origin,
+                    event,
+                    state_ids=state_ids,
+                    partial_state=partial_state,
+                    backfilled=backfilled,
+                )
+            except PartialStateConflictError:
+                # The room was un-partial stated while we were processing the event.
+                # Try once more, with full state this time.
+                state_ids, partial_state = await self._resolve_state_at_missing_prevs(
+                    origin, event
+                )
 
-            await self._process_received_pdu(
-                origin, event, state_ids=state_ids, backfilled=backfilled
-            )
+                # We ought to have full state now, barring some unlikely race where we left and
+                # rejoned the room in the background.
+                if state_ids is not None and partial_state:
+                    raise AssertionError(
+                        f"Event {event.event_id} still has a partial resolved state "
+                        f"after room {event.room_id} was un-partial stated"
+                    )
+
+                await self._process_received_pdu(
+                    origin,
+                    event,
+                    state_ids=state_ids,
+                    partial_state=partial_state,
+                    backfilled=backfilled,
+                )
         except FederationError as e:
             if e.code == 403:
                 logger.warning("Pulled event %s failed history check.", event_id)
@@ -822,7 +873,7 @@ class FederationEventHandler:
 
     async def _resolve_state_at_missing_prevs(
         self, dest: str, event: EventBase
-    ) -> Optional[StateMap[str]]:
+    ) -> Tuple[Optional[StateMap[str]], Optional[bool]]:
         """Calculate the state at an event with missing prev_events.
 
         This is used when we have pulled a batch of events from a remote server, and
@@ -849,8 +900,10 @@ class FederationEventHandler:
             event: an event to check for missing prevs.
 
         Returns:
-            if we already had all the prev events, `None`. Otherwise, returns
-            the event ids of the state at `event`.
+            if we already had all the prev events, `None, None`. Otherwise, returns a
+            tuple containing:
+             * the event ids of the state at `event`.
+             * a boolean indicating whether the state may be partial.
 
         Raises:
             FederationError if we fail to get the state from the remote server after any
@@ -864,7 +917,7 @@ class FederationEventHandler:
         missing_prevs = prevs - seen
 
         if not missing_prevs:
-            return None
+            return None, None
 
         logger.info(
             "Event %s is missing prev_events %s: calculating state for a "
@@ -876,9 +929,15 @@ class FederationEventHandler:
         # resolve them to find the correct state at the current event.
 
         try:
+            # Determine whether we may be about to retrieve partial state
+            # Events may be un-partial stated right after we compute the partial state
+            # flag, but that's okay, as long as the flag errs on the conservative side.
+            partial_state_flags = await self._store.get_partial_state_events(seen)
+            partial_state = any(partial_state_flags.values())
+
             # Get the state of the events we know about
             ours = await self._state_storage_controller.get_state_groups_ids(
-                room_id, seen
+                room_id, seen, await_full_state=False
             )
 
             # state_maps is a list of mappings from (type, state_key) to event_id
@@ -924,7 +983,7 @@ class FederationEventHandler:
                 "We can't get valid state history.",
                 affected=event_id,
             )
-        return state_map
+        return state_map, partial_state
 
     async def _get_state_ids_after_missing_prev_event(
         self,
@@ -1094,6 +1153,7 @@ class FederationEventHandler:
         origin: str,
         event: EventBase,
         state_ids: Optional[StateMap[str]],
+        partial_state: Optional[bool],
         backfilled: bool = False,
     ) -> None:
         """Called when we have a new non-outlier event.
@@ -1117,14 +1177,21 @@ class FederationEventHandler:
 
             state_ids: Normally None, but if we are handling a gap in the graph
                 (ie, we are missing one or more prev_events), the resolved state at the
-                event. Must not be partial state.
+                event
+
+            partial_state:
+                `True` if `state_ids` is partial and omits non-critical membership
+                events.
+                `False` if `state_ids` is the full state.
+                `None` if `state_ids` is not provided. In this case, the flag will be
+                calculated based on `event`'s prev events.
 
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
 
         PartialStateConflictError: if the room was un-partial stated in between
             computing the state at the event and persisting it. The caller should retry
-            exactly once in this case. Will never be raised if `state_ids` is provided.
+            exactly once in this case.
         """
         logger.debug("Processing event: %s", event)
         assert not event.internal_metadata.outlier
@@ -1132,6 +1199,7 @@ class FederationEventHandler:
         context = await self._state_handler.compute_event_context(
             event,
             state_ids_before_event=state_ids,
+            partial_state=partial_state,
         )
         try:
             await self._check_event_auth(origin, event, context)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index bd7baef051..e0bcc40b93 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -1135,6 +1135,10 @@ class EventCreationHandler:
             context = await self.state.compute_event_context(
                 event,
                 state_ids_before_event=state_map_for_event,
+                # TODO(faster_joins): check how MSC2716 works and whether we can have
+                #   partial state here
+                #   https://github.com/matrix-org/synapse/issues/13003
+                partial_state=False,
             )
         else:
             context = await self.state.compute_event_context(event)
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 0b63cd2186..8f797e3ae9 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -73,7 +73,6 @@ class RelationsHandler:
         room_id: str,
         relation_type: Optional[str] = None,
         event_type: Optional[str] = None,
-        aggregation_key: Optional[str] = None,
         limit: int = 5,
         direction: str = "b",
         from_token: Optional[StreamToken] = None,
@@ -89,7 +88,6 @@ class RelationsHandler:
             room_id: The room the event belongs to.
             relation_type: Only fetch events with this relation type, if given.
             event_type: Only fetch events with this event type, if given.
-            aggregation_key: Only fetch events with this aggregation key, if given.
             limit: Only fetch the most recent `limit` events.
             direction: Whether to fetch the most recent first (`"b"`) or the
                 oldest first (`"f"`).
@@ -122,7 +120,6 @@ class RelationsHandler:
             room_id=room_id,
             relation_type=relation_type,
             event_type=event_type,
-            aggregation_key=aggregation_key,
             limit=limit,
             direction=direction,
             from_token=from_token,
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 13098f56ed..85811b5bde 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -452,7 +452,6 @@ class RoomSummaryHandler:
                 "type": e.type,
                 "state_key": e.state_key,
                 "content": e.content,
-                "room_id": e.room_id,
                 "sender": e.sender,
                 "origin_server_ts": e.origin_server_ts,
             }
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 87ccd52f0a..69834de0de 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -255,7 +255,7 @@ class StateHandler:
         self,
         event: EventBase,
         state_ids_before_event: Optional[StateMap[str]] = None,
-        partial_state: bool = False,
+        partial_state: Optional[bool] = None,
     ) -> EventContext:
         """Build an EventContext structure for a non-outlier event.
 
@@ -270,8 +270,12 @@ class StateHandler:
                 it can't be calculated from existing events. This is normally
                 only specified when receiving an event from federation where we
                 don't have the prev events, e.g. when backfilling.
-            partial_state: True if `state_ids_before_event` is partial and omits
-                non-critical membership events
+            partial_state:
+                `True` if `state_ids_before_event` is partial and omits non-critical
+                membership events.
+                `False` if `state_ids_before_event` is the full state.
+                `None` when `state_ids_before_event` is not provided. In this case, the
+                flag will be calculated based on `event`'s prev events.
         Returns:
             The event context.
         """
@@ -298,12 +302,14 @@ class StateHandler:
                 )
             )
 
+            # the partial_state flag must be provided
+            assert partial_state is not None
         else:
             # otherwise, we'll need to resolve the state across the prev_events.
 
             # partial_state should not be set explicitly in this case:
             # we work it out dynamically
-            assert not partial_state
+            assert partial_state is None
 
             # if any of the prev-events have partial state, so do we.
             # (This is slightly racy - the prev-events might get fixed up before we use
@@ -313,13 +319,13 @@ class StateHandler:
             incomplete_prev_events = await self.store.get_partial_state_events(
                 prev_event_ids
             )
-            if any(incomplete_prev_events.values()):
+            partial_state = any(incomplete_prev_events.values())
+            if partial_state:
                 logger.debug(
                     "New/incoming event %s refers to prev_events %s with partial state",
                     event.event_id,
                     [k for (k, v) in incomplete_prev_events.items() if v],
                 )
-                partial_state = True
 
             logger.debug("calling resolve_state_groups from compute_event_context")
             # we've already taken into account partial state, so no need to wait for
diff --git a/synapse/storage/controllers/state.py b/synapse/storage/controllers/state.py
index e08f956e6e..20805c94fa 100644
--- a/synapse/storage/controllers/state.py
+++ b/synapse/storage/controllers/state.py
@@ -82,13 +82,15 @@ class StateStorageController:
         return state_group_delta.prev_group, state_group_delta.delta_ids
 
     async def get_state_groups_ids(
-        self, _room_id: str, event_ids: Collection[str]
+        self, _room_id: str, event_ids: Collection[str], await_full_state: bool = True
     ) -> Dict[int, MutableStateMap[str]]:
         """Get the event IDs of all the state for the state groups for the given events
 
         Args:
             _room_id: id of the room for these events
             event_ids: ids of the events
+            await_full_state: if `True`, will block if we do not yet have complete
+               state at these events.
 
         Returns:
             dict of state_group_id -> (dict of (type, state_key) -> event id)
@@ -100,7 +102,9 @@ class StateStorageController:
         if not event_ids:
             return {}
 
-        event_to_groups = await self.get_state_group_for_events(event_ids)
+        event_to_groups = await self.get_state_group_for_events(
+            event_ids, await_full_state=await_full_state
+        )
 
         groups = set(event_to_groups.values())
         group_to_state = await self.stores.state._get_state_for_groups(groups)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 5914a35420..29c99c6357 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -2110,11 +2110,29 @@ class EventsWorkerStore(SQLBaseStore):
     def _get_partial_state_events_batch_txn(
         txn: LoggingTransaction, room_id: str
     ) -> List[str]:
+        # we want to work through the events from oldest to newest, so
+        # we only want events whose prev_events do *not* have partial state - hence
+        # the 'NOT EXISTS' clause in the below.
+        #
+        # This is necessary because ordering by stream ordering isn't quite enough
+        # to ensure that we work from oldest to newest event (in particular,
+        # if an event is initially persisted as an outlier and later de-outliered,
+        # it can end up with a lower stream_ordering than its prev_events).
+        #
+        # Typically this means we'll only return one event per batch, but that's
+        # hard to do much about.
+        #
+        # See also: https://github.com/matrix-org/synapse/issues/13001
         txn.execute(
             """
             SELECT event_id FROM partial_state_events AS pse
                 JOIN events USING (event_id)
-            WHERE pse.room_id = ?
+            WHERE pse.room_id = ? AND
+               NOT EXISTS(
+                  SELECT 1 FROM event_edges AS ee
+                     JOIN partial_state_events AS prev_pse ON (prev_pse.event_id=ee.prev_event_id)
+                     WHERE ee.event_id=pse.event_id
+               )
             ORDER BY events.stream_ordering
             LIMIT 100
             """,
diff --git a/synapse/storage/databases/main/relations.py b/synapse/storage/databases/main/relations.py
index b457bc189e..7bd27790eb 100644
--- a/synapse/storage/databases/main/relations.py
+++ b/synapse/storage/databases/main/relations.py
@@ -62,7 +62,6 @@ class RelationsWorkerStore(SQLBaseStore):
         room_id: str,
         relation_type: Optional[str] = None,
         event_type: Optional[str] = None,
-        aggregation_key: Optional[str] = None,
         limit: int = 5,
         direction: str = "b",
         from_token: Optional[StreamToken] = None,
@@ -76,7 +75,6 @@ class RelationsWorkerStore(SQLBaseStore):
             room_id: The room the event belongs to.
             relation_type: Only fetch events with this relation type, if given.
             event_type: Only fetch events with this event type, if given.
-            aggregation_key: Only fetch events with this aggregation key, if given.
             limit: Only fetch the most recent `limit` events.
             direction: Whether to fetch the most recent first (`"b"`) or the
                 oldest first (`"f"`).
@@ -105,10 +103,6 @@ class RelationsWorkerStore(SQLBaseStore):
             where_clause.append("type = ?")
             where_args.append(event_type)
 
-        if aggregation_key:
-            where_clause.append("aggregation_key = ?")
-            where_args.append(aggregation_key)
-
         pagination_clause = generate_pagination_where_clause(
             direction=direction,
             column_names=("topological_ordering", "stream_ordering"),
diff --git a/tests/handlers/test_federation.py b/tests/handlers/test_federation.py
index 8a0bb91f40..fb06e5e812 100644
--- a/tests/handlers/test_federation.py
+++ b/tests/handlers/test_federation.py
@@ -287,6 +287,7 @@ class FederationTestCase(unittest.FederatingHomeserverTestCase):
                     state_ids={
                         (e.type, e.state_key): e.event_id for e in current_state
                     },
+                    partial_state=False,
                 )
             )
 
diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py
index 2ff88e64a5..3ce4f35cb7 100644
--- a/tests/storage/test_events.py
+++ b/tests/storage/test_events.py
@@ -70,7 +70,11 @@ class ExtremPruneTestCase(HomeserverTestCase):
     def persist_event(self, event, state=None):
         """Persist the event, with optional state"""
         context = self.get_success(
-            self.state.compute_event_context(event, state_ids_before_event=state)
+            self.state.compute_event_context(
+                event,
+                state_ids_before_event=state,
+                partial_state=None if state is None else False,
+            )
         )
         self.get_success(self._persistence.persist_event(event, context))
 
@@ -148,6 +152,7 @@ class ExtremPruneTestCase(HomeserverTestCase):
             self.state.compute_event_context(
                 remote_event_2,
                 state_ids_before_event=state_before_gap,
+                partial_state=False,
             )
         )
 
diff --git a/tests/test_state.py b/tests/test_state.py
index bafd6d1750..504530b49a 100644
--- a/tests/test_state.py
+++ b/tests/test_state.py
@@ -462,6 +462,7 @@ class StateTestCase(unittest.TestCase):
                 state_ids_before_event={
                     (e.type, e.state_key): e.event_id for e in old_state
                 },
+                partial_state=False,
             )
         )
 
@@ -492,6 +493,7 @@ class StateTestCase(unittest.TestCase):
                 state_ids_before_event={
                     (e.type, e.state_key): e.event_id for e in old_state
                 },
+                partial_state=False,
             )
         )