summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/federation.py215
1 files changed, 134 insertions, 81 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e386f77de6..2123ace8a6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -48,7 +48,6 @@ from synapse.api.errors import (
     FederationError,
     FederationPullAttemptBackoffError,
     HttpResponseException,
-    LimitExceededError,
     NotFoundError,
     RequestSendFailed,
     SynapseError,
@@ -182,6 +181,12 @@ class FederationHandler:
         self._partial_state_syncs_maybe_needing_restart: Dict[
             str, Tuple[Optional[str], Collection[str]]
         ] = {}
+        # A lock guarding the partial state flag for rooms.
+        # When the lock is held for a given room, no other concurrent code may
+        # partial state or un-partial state the room.
+        self._is_partial_state_room_linearizer = Linearizer(
+            name="_is_partial_state_room_linearizer"
+        )
 
         # if this is the main process, fire off a background process to resume
         # any partial-state-resync operations which were in flight when we
@@ -599,7 +604,23 @@ class FederationHandler:
 
         self._federation_event_handler.room_queues[room_id] = []
 
-        await self._clean_room_for_join(room_id)
+        is_host_joined = await self.store.is_host_joined(room_id, self.server_name)
+
+        if not is_host_joined:
+            # We may have old forward extremities lying around if the homeserver left
+            # the room completely in the past. Clear them out.
+            #
+            # Note that this check-then-clear is subject to races where
+            #  * the homeserver is in the room and stops being in the room just after
+            #    the check. We won't reset the forward extremities, but that's okay,
+            #    since they will be almost up to date.
+            #  * the homeserver is not in the room and starts being in the room just
+            #    after the check. This can't happen, since `RoomMemberHandler` has a
+            #    linearizer lock which prevents concurrent remote joins into the same
+            #    room.
+            # In short, the races either have an acceptable outcome or should be
+            # impossible.
+            await self._clean_room_for_join(room_id)
 
         try:
             # Try the host we successfully got a response to /make_join/
@@ -611,91 +632,115 @@ class FederationHandler:
             except ValueError:
                 pass
 
-            ret = await self.federation_client.send_join(
-                host_list, event, room_version_obj
-            )
-
-            event = ret.event
-            origin = ret.origin
-            state = ret.state
-            auth_chain = ret.auth_chain
-            auth_chain.sort(key=lambda e: e.depth)
-
-            logger.debug("do_invite_join auth_chain: %s", auth_chain)
-            logger.debug("do_invite_join state: %s", state)
-
-            logger.debug("do_invite_join event: %s", event)
+            async with self._is_partial_state_room_linearizer.queue(room_id):
+                already_partial_state_room = await self.store.is_partial_state_room(
+                    room_id
+                )
 
-            # if this is the first time we've joined this room, it's time to add
-            # a row to `rooms` with the correct room version. If there's already a
-            # row there, we should override it, since it may have been populated
-            # based on an invite request which lied about the room version.
-            #
-            # federation_client.send_join has already checked that the room
-            # version in the received create event is the same as room_version_obj,
-            # so we can rely on it now.
-            #
-            await self.store.upsert_room_on_join(
-                room_id=room_id,
-                room_version=room_version_obj,
-                state_events=state,
-            )
+                ret = await self.federation_client.send_join(
+                    host_list,
+                    event,
+                    room_version_obj,
+                    # Perform a full join when we are already in the room and it is a
+                    # full state room, since we are not allowed to persist a partial
+                    # state join event in a full state room. In the future, we could
+                    # optimize this by always performing a partial state join and
+                    # computing the state ourselves or retrieving it from the remote
+                    # homeserver if necessary.
+                    #
+                    # There's a race where we leave the room, then perform a full join
+                    # anyway. This should end up being fast anyway, since we would
+                    # already have the full room state and auth chain persisted.
+                    partial_state=not is_host_joined or already_partial_state_room,
+                )
 
-            if ret.partial_state:
-                # Mark the room as having partial state.
-                # The background process is responsible for unmarking this flag,
-                # even if the join fails.
-                await self.store.store_partial_state_room(
+                event = ret.event
+                origin = ret.origin
+                state = ret.state
+                auth_chain = ret.auth_chain
+                auth_chain.sort(key=lambda e: e.depth)
+
+                logger.debug("do_invite_join auth_chain: %s", auth_chain)
+                logger.debug("do_invite_join state: %s", state)
+
+                logger.debug("do_invite_join event: %s", event)
+
+                # if this is the first time we've joined this room, it's time to add
+                # a row to `rooms` with the correct room version. If there's already a
+                # row there, we should override it, since it may have been populated
+                # based on an invite request which lied about the room version.
+                #
+                # federation_client.send_join has already checked that the room
+                # version in the received create event is the same as room_version_obj,
+                # so we can rely on it now.
+                #
+                await self.store.upsert_room_on_join(
                     room_id=room_id,
-                    servers=ret.servers_in_room,
-                    device_lists_stream_id=self.store.get_device_stream_token(),
-                    joined_via=origin,
+                    room_version=room_version_obj,
+                    state_events=state,
                 )
 
-            try:
-                max_stream_id = (
-                    await self._federation_event_handler.process_remote_join(
-                        origin,
-                        room_id,
-                        auth_chain,
-                        state,
-                        event,
-                        room_version_obj,
-                        partial_state=ret.partial_state,
+                if ret.partial_state and not already_partial_state_room:
+                    # Mark the room as having partial state.
+                    # The background process is responsible for unmarking this flag,
+                    # even if the join fails.
+                    # TODO(faster_joins):
+                    #     We may want to reset the partial state info if it's from an
+                    #     old, failed partial state join.
+                    #     https://github.com/matrix-org/synapse/issues/13000
+                    await self.store.store_partial_state_room(
+                        room_id=room_id,
+                        servers=ret.servers_in_room,
+                        device_lists_stream_id=self.store.get_device_stream_token(),
+                        joined_via=origin,
                     )
-                )
-            except PartialStateConflictError as e:
-                # The homeserver was already in the room and it is no longer partial
-                # stated. We ought to be doing a local join instead. Turn the error into
-                # a 429, as a hint to the client to try again.
-                # TODO(faster_joins): `_should_perform_remote_join` suggests that we may
-                #   do a remote join for restricted rooms even if we have full state.
-                logger.error(
-                    "Room %s was un-partial stated while processing remote join.",
-                    room_id,
-                )
-                raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
-            else:
-                # Record the join event id for future use (when we finish the full
-                # join). We have to do this after persisting the event to keep foreign
-                # key constraints intact.
-                if ret.partial_state:
-                    await self.store.write_partial_state_rooms_join_event_id(
-                        room_id, event.event_id
+
+                try:
+                    max_stream_id = (
+                        await self._federation_event_handler.process_remote_join(
+                            origin,
+                            room_id,
+                            auth_chain,
+                            state,
+                            event,
+                            room_version_obj,
+                            partial_state=ret.partial_state,
+                        )
                     )
-            finally:
-                # Always kick off the background process that asynchronously fetches
-                # state for the room.
-                # If the join failed, the background process is responsible for
-                # cleaning up — including unmarking the room as a partial state room.
-                if ret.partial_state:
-                    # Kick off the process of asynchronously fetching the state for this
-                    # room.
-                    self._start_partial_state_room_sync(
-                        initial_destination=origin,
-                        other_destinations=ret.servers_in_room,
-                        room_id=room_id,
+                except PartialStateConflictError:
+                    # This should be impossible, since we hold the lock on the room's
+                    # partial statedness.
+                    logger.error(
+                        "Room %s was un-partial stated while processing remote join.",
+                        room_id,
                     )
+                    raise
+                else:
+                    # Record the join event id for future use (when we finish the full
+                    # join). We have to do this after persisting the event to keep
+                    # foreign key constraints intact.
+                    if ret.partial_state and not already_partial_state_room:
+                        # TODO(faster_joins):
+                        #     We may want to reset the partial state info if it's from
+                        #     an old, failed partial state join.
+                        #     https://github.com/matrix-org/synapse/issues/13000
+                        await self.store.write_partial_state_rooms_join_event_id(
+                            room_id, event.event_id
+                        )
+                finally:
+                    # Always kick off the background process that asynchronously fetches
+                    # state for the room.
+                    # If the join failed, the background process is responsible for
+                    # cleaning up — including unmarking the room as a partial state
+                    # room.
+                    if ret.partial_state:
+                        # Kick off the process of asynchronously fetching the state for
+                        # this room.
+                        self._start_partial_state_room_sync(
+                            initial_destination=origin,
+                            other_destinations=ret.servers_in_room,
+                            room_id=room_id,
+                        )
 
             # We wait here until this instance has seen the events come down
             # replication (if we're using replication) as the below uses caches.
@@ -1778,6 +1823,12 @@ class FederationHandler:
                 `initial_destination` is unavailable
             room_id: room to be resynced
         """
+        # Assume that we run on the main process for now.
+        # TODO(faster_joins,multiple workers)
+        # When moving the sync to workers, we need to ensure that
+        #  * `_start_partial_state_room_sync` still prevents duplicate resyncs
+        #  * `_is_partial_state_room_linearizer` correctly guards partial state flags
+        #    for rooms between the workers doing remote joins and resync.
         assert not self.config.worker.worker_app
 
         # TODO(faster_joins): do we need to lock to avoid races? What happens if other
@@ -1815,8 +1866,10 @@ class FederationHandler:
                 logger.info("Handling any pending device list updates")
                 await self._device_handler.handle_room_un_partial_stated(room_id)
 
-                logger.info("Clearing partial-state flag for %s", room_id)
-                success = await self.store.clear_partial_state_room(room_id)
+                async with self._is_partial_state_room_linearizer.queue(room_id):
+                    logger.info("Clearing partial-state flag for %s", room_id)
+                    success = await self.store.clear_partial_state_room(room_id)
+
                 if success:
                     logger.info("State resync complete for %s", room_id)
                     self._storage_controllers.state.notify_room_un_partial_stated(