diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 32074b8ca6..cd39d4d111 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -45,6 +45,7 @@ from synapse.federation.units import Transaction
from synapse.http.matrixfederationclient import ByteParser
from synapse.http.types import QueryParams
from synapse.types import JsonDict
+from synapse.util import ExceptionBundle
logger = logging.getLogger(__name__)
@@ -926,8 +927,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
return len(data)
def finish(self) -> SendJoinResponse:
- for c in self._coros:
- c.close()
+ _close_coros(self._coros)
if self._response.event_dict:
self._response.event = make_event_from_dict(
@@ -970,6 +970,27 @@ class _StateParser(ByteParser[StateRequestResponse]):
return len(data)
def finish(self) -> StateRequestResponse:
- for c in self._coros:
- c.close()
+ _close_coros(self._coros)
return self._response
+
+
+def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
+ """Close each of the given coroutines.
+
+ Always calls .close() on each coroutine, even if doing so raises an exception.
+ Any exceptions raised are aggregated into an ExceptionBundle.
+
+ :raises ExceptionBundle: if at least one coroutine fails to close.
+ """
+ exceptions = []
+ for c in coros:
+ try:
+ c.close()
+ except Exception as e:
+ exceptions.append(e)
+
+ if exceptions:
+ # raise from the first exception so that the traceback has slightly more context
+ raise ExceptionBundle(
+ f"There were {len(exceptions)} errors closing coroutines", exceptions
+ ) from exceptions[0]
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index a90f08dd4c..7be9d5f113 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -15,7 +15,7 @@
import json
import logging
import typing
-from typing import Any, Callable, Dict, Generator, Optional
+from typing import Any, Callable, Dict, Generator, Optional, Sequence
import attr
from frozendict import frozendict
@@ -193,3 +193,15 @@ def log_failure(
# Version string with git info. Computed here once so that we don't invoke git multiple
# times.
SYNAPSE_VERSION = get_distribution_version_string("matrix-synapse", __file__)
+
+
+class ExceptionBundle(Exception):
+ # A poor stand-in for something like Python 3.11's ExceptionGroup.
+ # (A backport called `exceptiongroup` exists but seems overkill: we just want a
+ # container type here.)
+ def __init__(self, message: str, exceptions: Sequence[Exception]):
+ parts = [message]
+ for e in exceptions:
+ parts.append(str(e))
+ super().__init__("\n - ".join(parts))
+ self.exceptions = exceptions
|