diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index e386f77de6..2123ace8a6 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -48,7 +48,6 @@ from synapse.api.errors import (
FederationError,
FederationPullAttemptBackoffError,
HttpResponseException,
- LimitExceededError,
NotFoundError,
RequestSendFailed,
SynapseError,
@@ -182,6 +181,12 @@ class FederationHandler:
self._partial_state_syncs_maybe_needing_restart: Dict[
str, Tuple[Optional[str], Collection[str]]
] = {}
+ # A lock guarding the partial state flag for rooms.
+ # When the lock is held for a given room, no other concurrent code may
+ # partial state or un-partial state the room.
+ self._is_partial_state_room_linearizer = Linearizer(
+ name="_is_partial_state_room_linearizer"
+ )
# if this is the main process, fire off a background process to resume
# any partial-state-resync operations which were in flight when we
@@ -599,7 +604,23 @@ class FederationHandler:
self._federation_event_handler.room_queues[room_id] = []
- await self._clean_room_for_join(room_id)
+ is_host_joined = await self.store.is_host_joined(room_id, self.server_name)
+
+ if not is_host_joined:
+ # We may have old forward extremities lying around if the homeserver left
+ # the room completely in the past. Clear them out.
+ #
+ # Note that this check-then-clear is subject to races where
+ # * the homeserver is in the room and stops being in the room just after
+ # the check. We won't reset the forward extremities, but that's okay,
+ # since they will be almost up to date.
+ # * the homeserver is not in the room and starts being in the room just
+ # after the check. This can't happen, since `RoomMemberHandler` has a
+ # linearizer lock which prevents concurrent remote joins into the same
+ # room.
+ # In short, the races either have an acceptable outcome or should be
+ # impossible.
+ await self._clean_room_for_join(room_id)
try:
# Try the host we successfully got a response to /make_join/
@@ -611,91 +632,115 @@ class FederationHandler:
except ValueError:
pass
- ret = await self.federation_client.send_join(
- host_list, event, room_version_obj
- )
-
- event = ret.event
- origin = ret.origin
- state = ret.state
- auth_chain = ret.auth_chain
- auth_chain.sort(key=lambda e: e.depth)
-
- logger.debug("do_invite_join auth_chain: %s", auth_chain)
- logger.debug("do_invite_join state: %s", state)
-
- logger.debug("do_invite_join event: %s", event)
+ async with self._is_partial_state_room_linearizer.queue(room_id):
+ already_partial_state_room = await self.store.is_partial_state_room(
+ room_id
+ )
- # if this is the first time we've joined this room, it's time to add
- # a row to `rooms` with the correct room version. If there's already a
- # row there, we should override it, since it may have been populated
- # based on an invite request which lied about the room version.
- #
- # federation_client.send_join has already checked that the room
- # version in the received create event is the same as room_version_obj,
- # so we can rely on it now.
- #
- await self.store.upsert_room_on_join(
- room_id=room_id,
- room_version=room_version_obj,
- state_events=state,
- )
+ ret = await self.federation_client.send_join(
+ host_list,
+ event,
+ room_version_obj,
+ # Perform a full join when we are already in the room and it is a
+ # full state room, since we are not allowed to persist a partial
+ # state join event in a full state room. In the future, we could
+ # optimize this by always performing a partial state join and
+ # computing the state ourselves or retrieving it from the remote
+ # homeserver if necessary.
+ #
+ # There's a race where we leave the room, then perform a full join
+ # anyway. This should end up being fast anyway, since we would
+ # already have the full room state and auth chain persisted.
+ partial_state=not is_host_joined or already_partial_state_room,
+ )
- if ret.partial_state:
- # Mark the room as having partial state.
- # The background process is responsible for unmarking this flag,
- # even if the join fails.
- await self.store.store_partial_state_room(
+ event = ret.event
+ origin = ret.origin
+ state = ret.state
+ auth_chain = ret.auth_chain
+ auth_chain.sort(key=lambda e: e.depth)
+
+ logger.debug("do_invite_join auth_chain: %s", auth_chain)
+ logger.debug("do_invite_join state: %s", state)
+
+ logger.debug("do_invite_join event: %s", event)
+
+ # if this is the first time we've joined this room, it's time to add
+ # a row to `rooms` with the correct room version. If there's already a
+ # row there, we should override it, since it may have been populated
+ # based on an invite request which lied about the room version.
+ #
+ # federation_client.send_join has already checked that the room
+ # version in the received create event is the same as room_version_obj,
+ # so we can rely on it now.
+ #
+ await self.store.upsert_room_on_join(
room_id=room_id,
- servers=ret.servers_in_room,
- device_lists_stream_id=self.store.get_device_stream_token(),
- joined_via=origin,
+ room_version=room_version_obj,
+ state_events=state,
)
- try:
- max_stream_id = (
- await self._federation_event_handler.process_remote_join(
- origin,
- room_id,
- auth_chain,
- state,
- event,
- room_version_obj,
- partial_state=ret.partial_state,
+ if ret.partial_state and not already_partial_state_room:
+ # Mark the room as having partial state.
+ # The background process is responsible for unmarking this flag,
+ # even if the join fails.
+ # TODO(faster_joins):
+ # We may want to reset the partial state info if it's from an
+ # old, failed partial state join.
+ # https://github.com/matrix-org/synapse/issues/13000
+ await self.store.store_partial_state_room(
+ room_id=room_id,
+ servers=ret.servers_in_room,
+ device_lists_stream_id=self.store.get_device_stream_token(),
+ joined_via=origin,
)
- )
- except PartialStateConflictError as e:
- # The homeserver was already in the room and it is no longer partial
- # stated. We ought to be doing a local join instead. Turn the error into
- # a 429, as a hint to the client to try again.
- # TODO(faster_joins): `_should_perform_remote_join` suggests that we may
- # do a remote join for restricted rooms even if we have full state.
- logger.error(
- "Room %s was un-partial stated while processing remote join.",
- room_id,
- )
- raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
- else:
- # Record the join event id for future use (when we finish the full
- # join). We have to do this after persisting the event to keep foreign
- # key constraints intact.
- if ret.partial_state:
- await self.store.write_partial_state_rooms_join_event_id(
- room_id, event.event_id
+
+ try:
+ max_stream_id = (
+ await self._federation_event_handler.process_remote_join(
+ origin,
+ room_id,
+ auth_chain,
+ state,
+ event,
+ room_version_obj,
+ partial_state=ret.partial_state,
+ )
)
- finally:
- # Always kick off the background process that asynchronously fetches
- # state for the room.
- # If the join failed, the background process is responsible for
- # cleaning up — including unmarking the room as a partial state room.
- if ret.partial_state:
- # Kick off the process of asynchronously fetching the state for this
- # room.
- self._start_partial_state_room_sync(
- initial_destination=origin,
- other_destinations=ret.servers_in_room,
- room_id=room_id,
+ except PartialStateConflictError:
+ # This should be impossible, since we hold the lock on the room's
+ # partial statedness.
+ logger.error(
+ "Room %s was un-partial stated while processing remote join.",
+ room_id,
)
+ raise
+ else:
+ # Record the join event id for future use (when we finish the full
+ # join). We have to do this after persisting the event to keep
+ # foreign key constraints intact.
+ if ret.partial_state and not already_partial_state_room:
+ # TODO(faster_joins):
+ # We may want to reset the partial state info if it's from
+ # an old, failed partial state join.
+ # https://github.com/matrix-org/synapse/issues/13000
+ await self.store.write_partial_state_rooms_join_event_id(
+ room_id, event.event_id
+ )
+ finally:
+ # Always kick off the background process that asynchronously fetches
+ # state for the room.
+ # If the join failed, the background process is responsible for
+ # cleaning up — including unmarking the room as a partial state
+ # room.
+ if ret.partial_state:
+ # Kick off the process of asynchronously fetching the state for
+ # this room.
+ self._start_partial_state_room_sync(
+ initial_destination=origin,
+ other_destinations=ret.servers_in_room,
+ room_id=room_id,
+ )
# We wait here until this instance has seen the events come down
# replication (if we're using replication) as the below uses caches.
@@ -1778,6 +1823,12 @@ class FederationHandler:
`initial_destination` is unavailable
room_id: room to be resynced
"""
+ # Assume that we run on the main process for now.
+ # TODO(faster_joins,multiple workers)
+ # When moving the sync to workers, we need to ensure that
+ # * `_start_partial_state_room_sync` still prevents duplicate resyncs
+ # * `_is_partial_state_room_linearizer` correctly guards partial state flags
+ # for rooms between the workers doing remote joins and resync.
assert not self.config.worker.worker_app
# TODO(faster_joins): do we need to lock to avoid races? What happens if other
@@ -1815,8 +1866,10 @@ class FederationHandler:
logger.info("Handling any pending device list updates")
await self._device_handler.handle_room_un_partial_stated(room_id)
- logger.info("Clearing partial-state flag for %s", room_id)
- success = await self.store.clear_partial_state_room(room_id)
+ async with self._is_partial_state_room_linearizer.queue(room_id):
+ logger.info("Clearing partial-state flag for %s", room_id)
+ success = await self.store.clear_partial_state_room(room_id)
+
if success:
logger.info("State resync complete for %s", room_id)
self._storage_controllers.state.notify_room_un_partial_stated(
|