diff options
author | Sean Quah <seanq@matrix.org> | 2022-06-01 16:41:59 +0100 |
---|---|---|
committer | Sean Quah <seanq@matrix.org> | 2022-06-01 16:41:59 +0100 |
commit | e237ff91bb6be7415cacf5511284162e577be93d (patch) | |
tree | 6c033a3adf2c5e79c132db82ac51113d0d9a3b47 | |
parent | Wait for lazy join to complete when getting current state (#12872) (diff) | |
download | synapse-e237ff91bb6be7415cacf5511284162e577be93d.tar.xz |
WIP github/squah/faster_room_joins_handle_second_join_while_resyncing squah/faster_room_joins_handle_second_join_while_resyncing
-rw-r--r-- | synapse/handlers/federation.py | 147 |
1 files changed, 80 insertions, 67 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b212ee2172..544d8b046f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -27,6 +27,7 @@ from typing import ( Iterable, List, Optional, + Set, Tuple, Union, ) @@ -169,6 +170,9 @@ class FederationHandler: self.third_party_event_rules = hs.get_third_party_event_rules() + # A set of room IDs with partial state which we are currently syncing. + self._active_partial_state_syncs: Set[str] = set() + # if this is the main process, fire off a background process to resume # any partial-state-resync operations which were in flight when we # were shut down. @@ -1534,81 +1538,90 @@ class FederationHandler: # `destination` is the current remote homeserver we're pulling from. destination = next(destination_iter) - logger.info("Syncing state for room %s via %s", room_id, destination) - - # we work through the queue in order of increasing stream ordering. - while True: - batch = await self.store.get_partial_state_events_batch(room_id) - if not batch: - # all the events are updated, so we can update current state and - # clear the lazy-loading flag. - logger.info("Updating current state for %s", room_id) - assert ( - self._storage_controllers.persistence is not None - ), "TODO(faster_joins): support for workers" - await self._storage_controllers.persistence.update_current_state( - room_id - ) - logger.info("Clearing partial-state flag for %s", room_id) - success = await self.store.clear_partial_state_room(room_id) - if success: - logger.info("State resync complete for %s", room_id) - self._storage_controllers.state.notify_room_un_partial_stated( + if room_id in self._active_partial_state_syncs: + logger.info("Syncing state for room %s is already in progress", room_id) + return + try: + self._active_partial_state_syncs.add(room_id) + + logger.info("Syncing state for room %s via %s", room_id, destination) + + # we work through the queue in order of increasing stream ordering. + while True: + batch = await self.store.get_partial_state_events_batch(room_id) + if not batch: + # all the events are updated, so we can update current state and + # clear the lazy-loading flag. + logger.info("Updating current state for %s", room_id) + assert ( + self._storage_controllers.persistence is not None + ), "TODO(faster_joins): support for workers" + await self._storage_controllers.persistence.update_current_state( room_id ) - # TODO(faster_joins) update room stats and user directory? - return + logger.info("Clearing partial-state flag for %s", room_id) + success = await self.store.clear_partial_state_room(room_id) + if success: + logger.info("State resync complete for %s", room_id) + self._storage_controllers.state.notify_room_un_partial_stated( + room_id + ) - # we raced against more events arriving with partial state. Go round - # the loop again. We've already logged a warning, so no need for more. - # TODO(faster_joins): there is still a race here, whereby incoming events which raced - # with us will fail to be persisted after the call to `clear_partial_state_room` due to - # having partial state. - continue + # TODO(faster_joins) update room stats and user directory? + return - events = await self.store.get_events_as_list( - batch, - redact_behaviour=EventRedactBehaviour.as_is, - allow_rejected=True, - ) - for event in events: - for attempt in itertools.count(): - try: - await self._federation_event_handler.update_state_for_partial_state_event( - destination, event - ) - break - except FederationError as e: - if attempt == len(destinations) - 1: - # We have tried every remote server for this event. Give up. - # TODO(faster_joins) giving up isn't the right thing to do - # if there's a temporary network outage. retrying - # indefinitely is also not the right thing to do if we can - # reach all homeservers and they all claim they don't have - # the state we want. - logger.error( - "Failed to get state for %s at %s from %s because %s, " - "giving up!", + # we raced against more events arriving with partial state. Go round + # the loop again. We've already logged a warning, so no need for more. + # TODO(faster_joins): there is still a race here, whereby incoming events which raced + # with us will fail to be persisted after the call to `clear_partial_state_room` due to + # having partial state. + continue + + events = await self.store.get_events_as_list( + batch, + redact_behaviour=EventRedactBehaviour.as_is, + allow_rejected=True, + ) + for event in events: + for attempt in itertools.count(): + try: + await self._federation_event_handler.update_state_for_partial_state_event( + destination, event + ) + break + except FederationError as e: + if attempt == len(destinations) - 1: + # We have tried every remote server for this event. Give up. + # TODO(faster_joins) giving up isn't the right thing to do + # if there's a temporary network outage. retrying + # indefinitely is also not the right thing to do if we can + # reach all homeservers and they all claim they don't have + # the state we want. + logger.error( + "Failed to get state for %s at %s from %s because %s, " + "giving up!", + room_id, + event, + destination, + e, + ) + raise + + # Try the next remote server. + logger.info( + "Failed to get state for %s at %s from %s because %s", room_id, event, destination, e, ) - raise - - # Try the next remote server. - logger.info( - "Failed to get state for %s at %s from %s because %s", - room_id, - event, - destination, - e, - ) - destination = next(destination_iter) - logger.info( - "Syncing state for room %s via %s instead", - room_id, - destination, - ) + destination = next(destination_iter) + logger.info( + "Syncing state for room %s via %s instead", + room_id, + destination, + ) + finally: + self._active_partial_state_syncs.remove(room_id) |