diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 78d149905f..1434e99056 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -466,6 +466,8 @@ class FederationHandler:
)
if ret.partial_state:
+ # TODO(faster_joins): roll this back if we don't manage to start the
+ # background resync (eg process_remote_join fails)
await self.store.store_partial_state_room(room_id, ret.servers_in_room)
max_stream_id = await self._federation_event_handler.process_remote_join(
@@ -478,6 +480,18 @@ class FederationHandler:
partial_state=ret.partial_state,
)
+ if ret.partial_state:
+ # Kick off the process of asynchronously fetching the state for this
+ # room.
+ #
+ # TODO(faster_joins): pick this up again on restart
+ run_as_background_process(
+ desc="sync_partial_state_room",
+ func=self._sync_partial_state_room,
+ destination=origin,
+ room_id=room_id,
+ )
+
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
await self._replication.wait_for_stream_position(
@@ -1370,3 +1384,64 @@ class FederationHandler:
# We fell off the bottom, couldn't get the complexity from anyone. Oh
# well.
return None
+
+ async def _sync_partial_state_room(
+ self,
+ destination: str,
+ room_id: str,
+ ) -> None:
+ """Background process to resync the state of a partial-state room
+
+ Args:
+ destination: homeserver to pull the state from
+ room_id: room to be resynced
+ """
+
+ # TODO(faster_joins): do we need to lock to avoid races? What happens if other
+ # worker processes kick off a resync in parallel? Perhaps we should just elect
+ # a single worker to do the resync.
+ #
+ # TODO(faster_joins): what happens if we leave the room during a resync? if we
+ # really leave, that might mean we have difficulty getting the room state over
+ # federation.
+ #
+ # TODO(faster_joins): try other destinations if the one we have fails
+
+ logger.info("Syncing state for room %s via %s", room_id, destination)
+
+ # we work through the queue in order of increasing stream ordering.
+ while True:
+ batch = await self.store.get_partial_state_events_batch(room_id)
+ if not batch:
+ # all the events are updated, so we can update current state and
+ # clear the lazy-loading flag.
+ logger.info("Updating current state for %s", room_id)
+ assert (
+ self.storage.persistence is not None
+ ), "TODO(faster_joins): support for workers"
+ await self.storage.persistence.update_current_state(room_id)
+
+ logger.info("Clearing partial-state flag for %s", room_id)
+ success = await self.store.clear_partial_state_room(room_id)
+ if success:
+ logger.info("State resync complete for %s", room_id)
+
+ # TODO(faster_joins) update room stats and user directory?
+ return
+
+ # we raced against more events arriving with partial state. Go round
+ # the loop again. We've already logged a warning, so no need for more.
+ # TODO(faster_joins): there is still a race here, whereby incoming events which raced
+ # with us will fail to be persisted after the call to `clear_partial_state_room` due to
+ # having partial state.
+ continue
+
+ events = await self.store.get_events_as_list(
+ batch,
+ redact_behaviour=EventRedactBehaviour.AS_IS,
+ allow_rejected=True,
+ )
+ for event in events:
+ await self._federation_event_handler.update_state_for_partial_state_event(
+ destination, event
+ )
|