diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b212ee2172..544d8b046f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -27,6 +27,7 @@ from typing import (
Iterable,
List,
Optional,
+ Set,
Tuple,
Union,
)
@@ -169,6 +170,9 @@ class FederationHandler:
self.third_party_event_rules = hs.get_third_party_event_rules()
+ # A set of room IDs with partial state which we are currently syncing.
+ self._active_partial_state_syncs: Set[str] = set()
+
# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
# were shut down.
@@ -1534,81 +1538,90 @@ class FederationHandler:
# `destination` is the current remote homeserver we're pulling from.
destination = next(destination_iter)
- logger.info("Syncing state for room %s via %s", room_id, destination)
-
- # we work through the queue in order of increasing stream ordering.
- while True:
- batch = await self.store.get_partial_state_events_batch(room_id)
- if not batch:
- # all the events are updated, so we can update current state and
- # clear the lazy-loading flag.
- logger.info("Updating current state for %s", room_id)
- assert (
- self._storage_controllers.persistence is not None
- ), "TODO(faster_joins): support for workers"
- await self._storage_controllers.persistence.update_current_state(
- room_id
- )
- logger.info("Clearing partial-state flag for %s", room_id)
- success = await self.store.clear_partial_state_room(room_id)
- if success:
- logger.info("State resync complete for %s", room_id)
- self._storage_controllers.state.notify_room_un_partial_stated(
+ if room_id in self._active_partial_state_syncs:
+ logger.info("Syncing state for room %s is already in progress", room_id)
+ return
+ try:
+ self._active_partial_state_syncs.add(room_id)
+
+ logger.info("Syncing state for room %s via %s", room_id, destination)
+
+ # we work through the queue in order of increasing stream ordering.
+ while True:
+ batch = await self.store.get_partial_state_events_batch(room_id)
+ if not batch:
+ # all the events are updated, so we can update current state and
+ # clear the lazy-loading flag.
+ logger.info("Updating current state for %s", room_id)
+ assert (
+ self._storage_controllers.persistence is not None
+ ), "TODO(faster_joins): support for workers"
+ await self._storage_controllers.persistence.update_current_state(
room_id
)
- # TODO(faster_joins) update room stats and user directory?
- return
+ logger.info("Clearing partial-state flag for %s", room_id)
+ success = await self.store.clear_partial_state_room(room_id)
+ if success:
+ logger.info("State resync complete for %s", room_id)
+ self._storage_controllers.state.notify_room_un_partial_stated(
+ room_id
+ )
- # we raced against more events arriving with partial state. Go round
- # the loop again. We've already logged a warning, so no need for more.
- # TODO(faster_joins): there is still a race here, whereby incoming events which raced
- # with us will fail to be persisted after the call to `clear_partial_state_room` due to
- # having partial state.
- continue
+ # TODO(faster_joins) update room stats and user directory?
+ return
- events = await self.store.get_events_as_list(
- batch,
- redact_behaviour=EventRedactBehaviour.as_is,
- allow_rejected=True,
- )
- for event in events:
- for attempt in itertools.count():
- try:
- await self._federation_event_handler.update_state_for_partial_state_event(
- destination, event
- )
- break
- except FederationError as e:
- if attempt == len(destinations) - 1:
- # We have tried every remote server for this event. Give up.
- # TODO(faster_joins) giving up isn't the right thing to do
- # if there's a temporary network outage. retrying
- # indefinitely is also not the right thing to do if we can
- # reach all homeservers and they all claim they don't have
- # the state we want.
- logger.error(
- "Failed to get state for %s at %s from %s because %s, "
- "giving up!",
+ # we raced against more events arriving with partial state. Go round
+ # the loop again. We've already logged a warning, so no need for more.
+ # TODO(faster_joins): there is still a race here, whereby incoming events which raced
+ # with us will fail to be persisted after the call to `clear_partial_state_room` due to
+ # having partial state.
+ continue
+
+ events = await self.store.get_events_as_list(
+ batch,
+ redact_behaviour=EventRedactBehaviour.as_is,
+ allow_rejected=True,
+ )
+ for event in events:
+ for attempt in itertools.count():
+ try:
+ await self._federation_event_handler.update_state_for_partial_state_event(
+ destination, event
+ )
+ break
+ except FederationError as e:
+ if attempt == len(destinations) - 1:
+ # We have tried every remote server for this event. Give up.
+ # TODO(faster_joins) giving up isn't the right thing to do
+ # if there's a temporary network outage. retrying
+ # indefinitely is also not the right thing to do if we can
+ # reach all homeservers and they all claim they don't have
+ # the state we want.
+ logger.error(
+ "Failed to get state for %s at %s from %s because %s, "
+ "giving up!",
+ room_id,
+ event,
+ destination,
+ e,
+ )
+ raise
+
+ # Try the next remote server.
+ logger.info(
+ "Failed to get state for %s at %s from %s because %s",
room_id,
event,
destination,
e,
)
- raise
-
- # Try the next remote server.
- logger.info(
- "Failed to get state for %s at %s from %s because %s",
- room_id,
- event,
- destination,
- e,
- )
- destination = next(destination_iter)
- logger.info(
- "Syncing state for room %s via %s instead",
- room_id,
- destination,
- )
+ destination = next(destination_iter)
+ logger.info(
+ "Syncing state for room %s via %s instead",
+ room_id,
+ destination,
+ )
+ finally:
+ self._active_partial_state_syncs.remove(room_id)
|