summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2022-04-12 14:23:43 +0100
committerGitHub <noreply@github.com>2022-04-12 13:23:43 +0000
commit320186319ac4f1d16f8f964d92db8921a4b1073e (patch)
tree965d970fdea98a16bd2c23af3aabb0c9493eceb8 /synapse/handlers/federation.py
parentRemove references to unstable identifiers from MSC3440. (#12382) (diff)
downloadsynapse-320186319ac4f1d16f8f964d92db8921a4b1073e.tar.xz
Resync state after partial-state join (#12394)
We work through all the events with partial state, updating the state at each
of them. Once it's done, we recalculate the state for the whole room, and then
mark the room as having complete state.
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py75
1 files changed, 75 insertions, 0 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 78d149905f..1434e99056 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -466,6 +466,8 @@ class FederationHandler:
             )
 
             if ret.partial_state:
+                # TODO(faster_joins): roll this back if we don't manage to start the
+                #   background resync (eg process_remote_join fails)
                 await self.store.store_partial_state_room(room_id, ret.servers_in_room)
 
             max_stream_id = await self._federation_event_handler.process_remote_join(
@@ -478,6 +480,18 @@ class FederationHandler:
                 partial_state=ret.partial_state,
             )
 
+            if ret.partial_state:
+                # Kick off the process of asynchronously fetching the state for this
+                # room.
+                #
+                # TODO(faster_joins): pick this up again on restart
+                run_as_background_process(
+                    desc="sync_partial_state_room",
+                    func=self._sync_partial_state_room,
+                    destination=origin,
+                    room_id=room_id,
+                )
+
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
             await self._replication.wait_for_stream_position(
@@ -1370,3 +1384,64 @@ class FederationHandler:
         # We fell off the bottom, couldn't get the complexity from anyone. Oh
         # well.
         return None
+
+    async def _sync_partial_state_room(
+        self,
+        destination: str,
+        room_id: str,
+    ) -> None:
+        """Background process to resync the state of a partial-state room
+
+        Args:
+            destination: homeserver to pull the state from
+            room_id: room to be resynced
+        """
+
+        # TODO(faster_joins): do we need to lock to avoid races? What happens if other
+        #   worker processes kick off a resync in parallel? Perhaps we should just elect
+        #   a single worker to do the resync.
+        #
+        # TODO(faster_joins): what happens if we leave the room during a resync? if we
+        #   really leave, that might mean we have difficulty getting the room state over
+        #   federation.
+        #
+        # TODO(faster_joins): try other destinations if the one we have fails
+
+        logger.info("Syncing state for room %s via %s", room_id, destination)
+
+        # we work through the queue in order of increasing stream ordering.
+        while True:
+            batch = await self.store.get_partial_state_events_batch(room_id)
+            if not batch:
+                # all the events are updated, so we can update current state and
+                # clear the lazy-loading flag.
+                logger.info("Updating current state for %s", room_id)
+                assert (
+                    self.storage.persistence is not None
+                ), "TODO(faster_joins): support for workers"
+                await self.storage.persistence.update_current_state(room_id)
+
+                logger.info("Clearing partial-state flag for %s", room_id)
+                success = await self.store.clear_partial_state_room(room_id)
+                if success:
+                    logger.info("State resync complete for %s", room_id)
+
+                    # TODO(faster_joins) update room stats and user directory?
+                    return
+
+                # we raced against more events arriving with partial state. Go round
+                # the loop again. We've already logged a warning, so no need for more.
+                # TODO(faster_joins): there is still a race here, whereby incoming events which raced
+                #   with us will fail to be persisted after the call to `clear_partial_state_room` due to
+                #   having partial state.
+                continue
+
+            events = await self.store.get_events_as_list(
+                batch,
+                redact_behaviour=EventRedactBehaviour.AS_IS,
+                allow_rejected=True,
+            )
+            for event in events:
+                await self._federation_event_handler.update_state_for_partial_state_event(
+                    destination, event
+                )