diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 1d60137411..bb20af6e91 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -61,7 +61,12 @@ from synapse.logging.context import (
nested_logging_context,
run_in_background,
)
-from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
+from synapse.logging.opentracing import (
+ log_kv,
+ start_active_span_from_edu,
+ tag_args,
+ trace,
+)
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.http.federation import (
ReplicationFederationSendEduRestServlet,
@@ -69,6 +74,8 @@ from synapse.replication.http.federation import (
)
from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.storage.databases.main.lock import Lock
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.roommember import MemberSummary
from synapse.types import JsonDict, StateMap, get_domain_from_id
from synapse.util import json_decoder, unwrapFirstError
from synapse.util.async_helpers import Linearizer, concurrently_execute, gather_results
@@ -476,6 +483,14 @@ class FederationServer(FederationBase):
pdu_results[pdu.event_id] = await process_pdu(pdu)
async def process_pdu(pdu: EventBase) -> JsonDict:
+ """
+ Processes a pushed PDU sent to us via a `/send` transaction
+
+ Returns:
+ JsonDict representing a "PDU Processing Result" that will be bundled up
+ with the other processed PDU's in the `/send` transaction and sent back
+ to remote homeserver.
+ """
event_id = pdu.event_id
with nested_logging_context(event_id):
try:
@@ -525,13 +540,10 @@ class FederationServer(FederationBase):
async def on_room_state_request(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, JsonDict]:
+ await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
# we grab the linearizer to protect ourselves from servers which hammer
# us. In theory we might already have the response to this query
# in the cache so we could return it without waiting for the linearizer
@@ -547,19 +559,18 @@ class FederationServer(FederationBase):
return 200, resp
+ @trace
+ @tag_args
async def on_state_ids_request(
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, JsonDict]:
if not event_id:
raise NotImplementedError("Specify an event")
+ await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
- in_room = await self._event_auth_handler.check_host_in_room(room_id, origin)
- if not in_room:
- raise AuthError(403, "Host not in room.")
-
resp = await self._state_ids_resp_cache.wrap(
(room_id, event_id),
self._on_state_ids_request_compute,
@@ -569,6 +580,8 @@ class FederationServer(FederationBase):
return 200, resp
+ @trace
+ @tag_args
async def _on_state_ids_request_compute(
self, room_id: str, event_id: str
) -> JsonDict:
@@ -680,8 +693,9 @@ class FederationServer(FederationBase):
state_event_ids: Collection[str]
servers_in_room: Optional[Collection[str]]
if caller_supports_partial_state:
+ summary = await self.store.get_room_summary(room_id)
state_event_ids = _get_event_ids_for_partial_state_join(
- event, prev_state_ids
+ event, prev_state_ids, summary
)
servers_in_room = await self.state.get_hosts_in_room_at_events(
room_id, event_ids=event.prev_event_ids()
@@ -754,6 +768,17 @@ class FederationServer(FederationBase):
The partial knock event.
"""
origin_host, _ = parse_server_name(origin)
+
+ if await self.store.is_partial_state_room(room_id):
+ # Before we do anything: check if the room is partial-stated.
+ # Note that at the time this check was added, `on_make_knock_request` would
+ # block due to https://github.com/matrix-org/synapse/issues/12997.
+ raise SynapseError(
+ 404,
+ "Unable to handle /make_knock right now; this server is not fully joined.",
+ errcode=Codes.NOT_FOUND,
+ )
+
await self.check_server_matches_acl(origin_host, room_id)
room_version = await self.store.get_room_version(room_id)
@@ -810,7 +835,14 @@ class FederationServer(FederationBase):
context, self._room_prejoin_state_types
)
)
- return {"knock_state_events": stripped_room_state}
+ return {
+ "knock_room_state": stripped_room_state,
+ # Since v1.37, Synapse incorrectly used "knock_state_events" for this field.
+ # Thus, we also populate a 'knock_state_events' with the same content to
+ # support old instances.
+ # See https://github.com/matrix-org/synapse/issues/14088.
+ "knock_state_events": stripped_room_state,
+ }
async def _on_send_membership_event(
self, origin: str, content: JsonDict, membership_type: str, room_id: str
@@ -843,8 +875,25 @@ class FederationServer(FederationBase):
Codes.BAD_JSON,
)
+ # Note that get_room_version throws if the room does not exist here.
room_version = await self.store.get_room_version(room_id)
+ if await self.store.is_partial_state_room(room_id):
+ # If our server is still only partially joined, we can't give a complete
+ # response to /send_join, /send_knock or /send_leave.
+ # This is because we will not be able to provide the server list (for partial
+ # joins) or the full state (for full joins).
+ # Return a 404 as we would if we weren't in the room at all.
+ logger.info(
+ f"Rejecting /send_{membership_type} to %s because it's a partial state room",
+ room_id,
+ )
+ raise SynapseError(
+ 404,
+ f"Unable to handle /send_{membership_type} right now; this server is not fully joined.",
+ errcode=Codes.NOT_FOUND,
+ )
+
if membership_type == Membership.KNOCK and not room_version.msc2403_knocking:
raise SynapseError(
403,
@@ -918,6 +967,7 @@ class FederationServer(FederationBase):
self, origin: str, room_id: str, event_id: str
) -> Tuple[int, Dict[str, Any]]:
async with self._server_linearizer.queue((origin, room_id)):
+ await self._event_auth_handler.assert_host_in_room(room_id, origin)
origin_host, _ = parse_server_name(origin)
await self.check_server_matches_acl(origin_host, room_id)
@@ -1448,6 +1498,7 @@ class FederationHandlerRegistry:
def _get_event_ids_for_partial_state_join(
join_event: EventBase,
prev_state_ids: StateMap[str],
+ summary: Dict[str, MemberSummary],
) -> Collection[str]:
"""Calculate state to be retuned in a partial_state send_join
@@ -1474,8 +1525,19 @@ def _get_event_ids_for_partial_state_join(
if current_membership_event_id is not None:
state_event_ids.add(current_membership_event_id)
- # TODO: return a few more members:
- # - those with invites
- # - those that are kicked? / banned
+ name_id = prev_state_ids.get((EventTypes.Name, ""))
+ canonical_alias_id = prev_state_ids.get((EventTypes.CanonicalAlias, ""))
+ if not name_id and not canonical_alias_id:
+ # Also include the hero members of the room (for DM rooms without a title).
+ # To do this properly, we should select the correct subset of membership events
+ # from `prev_state_ids`. Instead, we are lazier and use the (cached)
+ # `get_room_summary` function, which is based on the current state of the room.
+ # This introduces races; we choose to ignore them because a) they should be rare
+ # and b) even if it's wrong, joining servers will get the full state eventually.
+ heroes = extract_heroes_from_room_summary(summary, join_event.state_key)
+ for hero in heroes:
+ membership_event_id = prev_state_ids.get((EventTypes.Member, hero))
+ if membership_event_id:
+ state_event_ids.add(membership_event_id)
return state_event_ids
|