summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/federation.py75
-rw-r--r--synapse/handlers/federation_event.py39
2 files changed, 114 insertions, 0 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 78d149905f..1434e99056 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -466,6 +466,8 @@ class FederationHandler:
             )
 
             if ret.partial_state:
+                # TODO(faster_joins): roll this back if we don't manage to start the
+                #   background resync (eg process_remote_join fails)
                 await self.store.store_partial_state_room(room_id, ret.servers_in_room)
 
             max_stream_id = await self._federation_event_handler.process_remote_join(
@@ -478,6 +480,18 @@ class FederationHandler:
                 partial_state=ret.partial_state,
             )
 
+            if ret.partial_state:
+                # Kick off the process of asynchronously fetching the state for this
+                # room.
+                #
+                # TODO(faster_joins): pick this up again on restart
+                run_as_background_process(
+                    desc="sync_partial_state_room",
+                    func=self._sync_partial_state_room,
+                    destination=origin,
+                    room_id=room_id,
+                )
+
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
             await self._replication.wait_for_stream_position(
@@ -1370,3 +1384,64 @@ class FederationHandler:
         # We fell off the bottom, couldn't get the complexity from anyone. Oh
         # well.
         return None
+
+    async def _sync_partial_state_room(
+        self,
+        destination: str,
+        room_id: str,
+    ) -> None:
+        """Background process to resync the state of a partial-state room
+
+        Args:
+            destination: homeserver to pull the state from
+            room_id: room to be resynced
+        """
+
+        # TODO(faster_joins): do we need to lock to avoid races? What happens if other
+        #   worker processes kick off a resync in parallel? Perhaps we should just elect
+        #   a single worker to do the resync.
+        #
+        # TODO(faster_joins): what happens if we leave the room during a resync? if we
+        #   really leave, that might mean we have difficulty getting the room state over
+        #   federation.
+        #
+        # TODO(faster_joins): try other destinations if the one we have fails
+
+        logger.info("Syncing state for room %s via %s", room_id, destination)
+
+        # we work through the queue in order of increasing stream ordering.
+        while True:
+            batch = await self.store.get_partial_state_events_batch(room_id)
+            if not batch:
+                # all the events are updated, so we can update current state and
+                # clear the lazy-loading flag.
+                logger.info("Updating current state for %s", room_id)
+                assert (
+                    self.storage.persistence is not None
+                ), "TODO(faster_joins): support for workers"
+                await self.storage.persistence.update_current_state(room_id)
+
+                logger.info("Clearing partial-state flag for %s", room_id)
+                success = await self.store.clear_partial_state_room(room_id)
+                if success:
+                    logger.info("State resync complete for %s", room_id)
+
+                    # TODO(faster_joins) update room stats and user directory?
+                    return
+
+                # we raced against more events arriving with partial state. Go round
+                # the loop again. We've already logged a warning, so no need for more.
+                # TODO(faster_joins): there is still a race here, whereby incoming events which raced
+                #   with us will fail to be persisted after the call to `clear_partial_state_room` due to
+                #   having partial state.
+                continue
+
+            events = await self.store.get_events_as_list(
+                batch,
+                redact_behaviour=EventRedactBehaviour.AS_IS,
+                allow_rejected=True,
+            )
+            for event in events:
+                await self._federation_event_handler.update_state_for_partial_state_event(
+                    destination, event
+                )
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 03c1197c99..32bf02818c 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -477,6 +477,45 @@ class FederationEventHandler:
 
             return await self.persist_events_and_notify(room_id, [(event, context)])
 
+    async def update_state_for_partial_state_event(
+        self, destination: str, event: EventBase
+    ) -> None:
+        """Recalculate the state at an event as part of a de-partial-stating process
+
+        Args:
+            destination: server to request full state from
+            event: partial-state event to be de-partial-stated
+        """
+        logger.info("Updating state for %s", event.event_id)
+        with nested_logging_context(suffix=event.event_id):
+            # if we have all the event's prev_events, then we can work out the
+            # state based on their states. Otherwise, we request it from the destination
+            # server.
+            #
+            # This is the same operation as we do when we receive a regular event
+            # over federation.
+            state = await self._resolve_state_at_missing_prevs(destination, event)
+
+            # build a new state group for it if need be
+            context = await self._state_handler.compute_event_context(
+                event,
+                old_state=state,
+            )
+            if context.partial_state:
+                # this can happen if some or all of the event's prev_events still have
+                # partial state - ie, an event has an earlier stream_ordering than one
+                # or more of its prev_events, so we de-partial-state it before its
+                # prev_events.
+                #
+                # TODO(faster_joins): we probably need to be more intelligent, and
+                #    exclude partial-state prev_events from consideration
+                logger.warning(
+                    "%s still has partial state: can't de-partial-state it yet",
+                    event.event_id,
+                )
+                return
+            await self._store.update_state_for_partial_state_event(event, context)
+
     async def backfill(
         self, dest: str, room_id: str, limit: int, extremities: Collection[str]
     ) -> None: