diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 32074b8ca6..a3cfc701cd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -45,6 +45,7 @@ from synapse.federation.units import Transaction
from synapse.http.matrixfederationclient import ByteParser
from synapse.http.types import QueryParams
from synapse.types import JsonDict
+from synapse.util import ExceptionBundle
logger = logging.getLogger(__name__)
@@ -279,12 +280,11 @@ class TransportLayerClient:
Note that this does not append any events to any graphs.
Args:
- destination (str): address of remote homeserver
- room_id (str): room to join/leave
- user_id (str): user to be joined/left
- membership (str): one of join/leave
- params (dict[str, str|Iterable[str]]): Query parameters to include in the
- request.
+ destination: address of remote homeserver
+ room_id: room to join/leave
+ user_id: user to be joined/left
+ membership: one of join/leave
+ params: Query parameters to include in the request.
Returns:
Succeeds when we get a 2xx HTTP response. The result
@@ -926,8 +926,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
return len(data)
def finish(self) -> SendJoinResponse:
- for c in self._coros:
- c.close()
+ _close_coros(self._coros)
if self._response.event_dict:
self._response.event = make_event_from_dict(
@@ -970,6 +969,27 @@ class _StateParser(ByteParser[StateRequestResponse]):
return len(data)
def finish(self) -> StateRequestResponse:
- for c in self._coros:
- c.close()
+ _close_coros(self._coros)
return self._response
+
+
+def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
+ """Close each of the given coroutines.
+
+ Always calls .close() on each coroutine, even if doing so raises an exception.
+ Any exceptions raised are aggregated into an ExceptionBundle.
+
+ :raises ExceptionBundle: if at least one coroutine fails to close.
+ """
+ exceptions = []
+ for c in coros:
+ try:
+ c.close()
+ except Exception as e:
+ exceptions.append(e)
+
+ if exceptions:
+ # raise from the first exception so that the traceback has slightly more context
+ raise ExceptionBundle(
+ f"There were {len(exceptions)} errors closing coroutines", exceptions
+ ) from exceptions[0]
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index bb0f8d6b7b..cdaf0d5de7 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Awaitable, Callable, Dict, Optional, Tupl
from synapse.api.errors import Codes, FederationDeniedError, SynapseError
from synapse.api.urls import FEDERATION_V1_PREFIX
-from synapse.http.server import HttpServer, ServletCallback, is_method_cancellable
+from synapse.http.server import HttpServer, ServletCallback
from synapse.http.servlet import parse_json_object_from_request
from synapse.http.site import SynapseRequest
from synapse.logging.context import run_in_background
@@ -34,6 +34,7 @@ from synapse.logging.opentracing import (
whitelisted_homeserver,
)
from synapse.types import JsonDict
+from synapse.util.cancellation import is_function_cancellable
from synapse.util.ratelimitutils import FederationRateLimiter
from synapse.util.stringutils import parse_and_validate_server_name
@@ -223,10 +224,10 @@ class BaseFederationServlet:
With arguments:
- origin (unicode|None): The authenticated server_name of the calling server,
+ origin (str|None): The authenticated server_name of the calling server,
unless REQUIRE_AUTH is set to False and authentication failed.
- content (unicode|None): decoded json body of the request. None if the
+ content (str|None): decoded json body of the request. None if the
request was a GET.
query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
@@ -375,7 +376,7 @@ class BaseFederationServlet:
if code is None:
continue
- if is_method_cancellable(code):
+ if is_function_cancellable(code):
# The wrapper added by `self._wrap` will inherit the cancellable flag,
# but the wrapper itself does not support cancellation yet.
# Once resolved, the cancellation tests in
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index f7884bfbe0..205fd16daa 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -489,7 +489,7 @@ class FederationV2InviteServlet(BaseFederationServerServlet):
room_version = content["room_version"]
event = content["event"]
- invite_room_state = content["invite_room_state"]
+ invite_room_state = content.get("invite_room_state", [])
# Synapse expects invite_room_state to be in unsigned, as it is in v1
# API
@@ -499,6 +499,11 @@ class FederationV2InviteServlet(BaseFederationServerServlet):
result = await self.handler.on_invite_request(
origin, event, room_version_id=room_version
)
+
+ # We only store invite_room_state for internal use, so remove it before
+ # returning the event to the remote homeserver.
+ result["event"].get("unsigned", {}).pop("invite_room_state", None)
+
return 200, result
@@ -549,8 +554,7 @@ class FederationClientKeysClaimServlet(BaseFederationServerServlet):
class FederationGetMissingEventsServlet(BaseFederationServerServlet):
- # TODO(paul): Why does this path alone end with "/?" optional?
- PATH = "/get_missing_events/(?P<room_id>[^/]*)/?"
+ PATH = "/get_missing_events/(?P<room_id>[^/]*)"
async def on_POST(
self,
|