summary refs log tree commit diff
diff options
context:
space:
mode:
authorSean Quah <seanq@matrix.org>2022-06-01 16:41:59 +0100
committerSean Quah <seanq@matrix.org>2022-06-01 16:41:59 +0100
commite237ff91bb6be7415cacf5511284162e577be93d (patch)
tree6c033a3adf2c5e79c132db82ac51113d0d9a3b47
parentWait for lazy join to complete when getting current state (#12872) (diff)
downloadsynapse-e237ff91bb6be7415cacf5511284162e577be93d.tar.xz
-rw-r--r--synapse/handlers/federation.py147
1 files changed, 80 insertions, 67 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b212ee2172..544d8b046f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -27,6 +27,7 @@ from typing import (
     Iterable,
     List,
     Optional,
+    Set,
     Tuple,
     Union,
 )
@@ -169,6 +170,9 @@ class FederationHandler:
 
         self.third_party_event_rules = hs.get_third_party_event_rules()
 
+        # A set of room IDs with partial state which we are currently syncing.
+        self._active_partial_state_syncs: Set[str] = set()
+
         # if this is the main process, fire off a background process to resume
         # any partial-state-resync operations which were in flight when we
         # were shut down.
@@ -1534,81 +1538,90 @@ class FederationHandler:
 
         # `destination` is the current remote homeserver we're pulling from.
         destination = next(destination_iter)
-        logger.info("Syncing state for room %s via %s", room_id, destination)
-
-        # we work through the queue in order of increasing stream ordering.
-        while True:
-            batch = await self.store.get_partial_state_events_batch(room_id)
-            if not batch:
-                # all the events are updated, so we can update current state and
-                # clear the lazy-loading flag.
-                logger.info("Updating current state for %s", room_id)
-                assert (
-                    self._storage_controllers.persistence is not None
-                ), "TODO(faster_joins): support for workers"
-                await self._storage_controllers.persistence.update_current_state(
-                    room_id
-                )
 
-                logger.info("Clearing partial-state flag for %s", room_id)
-                success = await self.store.clear_partial_state_room(room_id)
-                if success:
-                    logger.info("State resync complete for %s", room_id)
-                    self._storage_controllers.state.notify_room_un_partial_stated(
+        if room_id in self._active_partial_state_syncs:
+            logger.info("Syncing state for room %s is already in progress", room_id)
+            return
+        try:
+            self._active_partial_state_syncs.add(room_id)
+
+            logger.info("Syncing state for room %s via %s", room_id, destination)
+
+            # we work through the queue in order of increasing stream ordering.
+            while True:
+                batch = await self.store.get_partial_state_events_batch(room_id)
+                if not batch:
+                    # all the events are updated, so we can update current state and
+                    # clear the lazy-loading flag.
+                    logger.info("Updating current state for %s", room_id)
+                    assert (
+                        self._storage_controllers.persistence is not None
+                    ), "TODO(faster_joins): support for workers"
+                    await self._storage_controllers.persistence.update_current_state(
                         room_id
                     )
 
-                    # TODO(faster_joins) update room stats and user directory?
-                    return
+                    logger.info("Clearing partial-state flag for %s", room_id)
+                    success = await self.store.clear_partial_state_room(room_id)
+                    if success:
+                        logger.info("State resync complete for %s", room_id)
+                        self._storage_controllers.state.notify_room_un_partial_stated(
+                            room_id
+                        )
 
-                # we raced against more events arriving with partial state. Go round
-                # the loop again. We've already logged a warning, so no need for more.
-                # TODO(faster_joins): there is still a race here, whereby incoming events which raced
-                #   with us will fail to be persisted after the call to `clear_partial_state_room` due to
-                #   having partial state.
-                continue
+                        # TODO(faster_joins) update room stats and user directory?
+                        return
 
-            events = await self.store.get_events_as_list(
-                batch,
-                redact_behaviour=EventRedactBehaviour.as_is,
-                allow_rejected=True,
-            )
-            for event in events:
-                for attempt in itertools.count():
-                    try:
-                        await self._federation_event_handler.update_state_for_partial_state_event(
-                            destination, event
-                        )
-                        break
-                    except FederationError as e:
-                        if attempt == len(destinations) - 1:
-                            # We have tried every remote server for this event. Give up.
-                            # TODO(faster_joins) giving up isn't the right thing to do
-                            #   if there's a temporary network outage. retrying
-                            #   indefinitely is also not the right thing to do if we can
-                            #   reach all homeservers and they all claim they don't have
-                            #   the state we want.
-                            logger.error(
-                                "Failed to get state for %s at %s from %s because %s, "
-                                "giving up!",
+                    # we raced against more events arriving with partial state. Go round
+                    # the loop again. We've already logged a warning, so no need for more.
+                    # TODO(faster_joins): there is still a race here, whereby incoming events which raced
+                    #   with us will fail to be persisted after the call to `clear_partial_state_room` due to
+                    #   having partial state.
+                    continue
+
+                events = await self.store.get_events_as_list(
+                    batch,
+                    redact_behaviour=EventRedactBehaviour.as_is,
+                    allow_rejected=True,
+                )
+                for event in events:
+                    for attempt in itertools.count():
+                        try:
+                            await self._federation_event_handler.update_state_for_partial_state_event(
+                                destination, event
+                            )
+                            break
+                        except FederationError as e:
+                            if attempt == len(destinations) - 1:
+                                # We have tried every remote server for this event. Give up.
+                                # TODO(faster_joins) giving up isn't the right thing to do
+                                #   if there's a temporary network outage. retrying
+                                #   indefinitely is also not the right thing to do if we can
+                                #   reach all homeservers and they all claim they don't have
+                                #   the state we want.
+                                logger.error(
+                                    "Failed to get state for %s at %s from %s because %s, "
+                                    "giving up!",
+                                    room_id,
+                                    event,
+                                    destination,
+                                    e,
+                                )
+                                raise
+
+                            # Try the next remote server.
+                            logger.info(
+                                "Failed to get state for %s at %s from %s because %s",
                                 room_id,
                                 event,
                                 destination,
                                 e,
                             )
-                            raise
-
-                        # Try the next remote server.
-                        logger.info(
-                            "Failed to get state for %s at %s from %s because %s",
-                            room_id,
-                            event,
-                            destination,
-                            e,
-                        )
-                        destination = next(destination_iter)
-                        logger.info(
-                            "Syncing state for room %s via %s instead",
-                            room_id,
-                            destination,
-                        )
+                            destination = next(destination_iter)
+                            logger.info(
+                                "Syncing state for room %s via %s instead",
+                                room_id,
+                                destination,
+                            )
+        finally:
+            self._active_partial_state_syncs.remove(room_id)