diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eca75f1108..e386f77de6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -27,6 +27,7 @@ from typing import (
Iterable,
List,
Optional,
+ Set,
Tuple,
Union,
)
@@ -171,12 +172,23 @@ class FederationHandler:
self.third_party_event_rules = hs.get_third_party_event_rules()
+ # Tracks running partial state syncs by room ID.
+ # Partial state syncs currently only run on the main process, so it's okay to
+ # track them in-memory for now.
+ self._active_partial_state_syncs: Set[str] = set()
+ # Tracks partial state syncs we may want to restart.
+ # A dictionary mapping room IDs to (initial destination, other destinations)
+ # tuples.
+ self._partial_state_syncs_maybe_needing_restart: Dict[
+ str, Tuple[Optional[str], Collection[str]]
+ ] = {}
+
# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
if not hs.config.worker.worker_app:
run_as_background_process(
- "resume_sync_partial_state_room", self._resume_sync_partial_state_room
+ "resume_sync_partial_state_room", self._resume_partial_state_room_sync
)
@trace
@@ -679,9 +691,7 @@ class FederationHandler:
if ret.partial_state:
# Kick off the process of asynchronously fetching the state for this
# room.
- run_as_background_process(
- desc="sync_partial_state_room",
- func=self._sync_partial_state_room,
+ self._start_partial_state_room_sync(
initial_destination=origin,
other_destinations=ret.servers_in_room,
room_id=room_id,
@@ -1660,20 +1670,100 @@ class FederationHandler:
# well.
return None
- async def _resume_sync_partial_state_room(self) -> None:
+ async def _resume_partial_state_room_sync(self) -> None:
"""Resumes resyncing of all partial-state rooms after a restart."""
assert not self.config.worker.worker_app
partial_state_rooms = await self.store.get_partial_state_room_resync_info()
for room_id, resync_info in partial_state_rooms.items():
- run_as_background_process(
- desc="sync_partial_state_room",
- func=self._sync_partial_state_room,
+ self._start_partial_state_room_sync(
initial_destination=resync_info.joined_via,
other_destinations=resync_info.servers_in_room,
room_id=room_id,
)
+ def _start_partial_state_room_sync(
+ self,
+ initial_destination: Optional[str],
+ other_destinations: Collection[str],
+ room_id: str,
+ ) -> None:
+ """Starts the background process to resync the state of a partial state room,
+ if it is not already running.
+
+ Args:
+ initial_destination: the initial homeserver to pull the state from
+ other_destinations: other homeservers to try to pull the state from, if
+ `initial_destination` is unavailable
+ room_id: room to be resynced
+ """
+
+ async def _sync_partial_state_room_wrapper() -> None:
+ if room_id in self._active_partial_state_syncs:
+ # Another local user has joined the room while there is already a
+ # partial state sync running. This implies that there is a new join
+ # event to un-partial state. We might find ourselves in one of a few
+ # scenarios:
+ # 1. There is an existing partial state sync. The partial state sync
+ # un-partial states the new join event before completing and all is
+ # well.
+ # 2. Before the latest join, the homeserver was no longer in the room
+ # and there is an existing partial state sync from our previous
+ # membership of the room. The partial state sync may have:
+ # a) succeeded, but not yet terminated. The room will not be
+ # un-partial stated again unless we restart the partial state
+ # sync.
+ # b) failed, because we were no longer in the room and remote
+ # homeservers were refusing our requests, but not yet
+ # terminated. After the latest join, remote homeservers may
+ # start answering our requests again, so we should restart the
+ # partial state sync.
+ # In the cases where we would want to restart the partial state sync,
+ # the room would have the partial state flag when the partial state sync
+ # terminates.
+ self._partial_state_syncs_maybe_needing_restart[room_id] = (
+ initial_destination,
+ other_destinations,
+ )
+ return
+
+ self._active_partial_state_syncs.add(room_id)
+
+ try:
+ await self._sync_partial_state_room(
+ initial_destination=initial_destination,
+ other_destinations=other_destinations,
+ room_id=room_id,
+ )
+ finally:
+ # Read the room's partial state flag while we still hold the claim to
+ # being the active partial state sync (so that another partial state
+ # sync can't come along and mess with it under us).
+ # Normally, the partial state flag will be gone. If it isn't, then we
+ # may find ourselves in scenario 2a or 2b as described in the comment
+ # above, where we want to restart the partial state sync.
+ is_still_partial_state_room = await self.store.is_partial_state_room(
+ room_id
+ )
+ self._active_partial_state_syncs.remove(room_id)
+
+ if room_id in self._partial_state_syncs_maybe_needing_restart:
+ (
+ restart_initial_destination,
+ restart_other_destinations,
+ ) = self._partial_state_syncs_maybe_needing_restart.pop(room_id)
+
+ if is_still_partial_state_room:
+ self._start_partial_state_room_sync(
+ initial_destination=restart_initial_destination,
+ other_destinations=restart_other_destinations,
+ room_id=room_id,
+ )
+
+ run_as_background_process(
+ desc="sync_partial_state_room", func=_sync_partial_state_room_wrapper
+ )
+
async def _sync_partial_state_room(
self,
initial_destination: Optional[str],
|