summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/14065.misc1
-rw-r--r--synapse/federation/transport/client.py29
-rw-r--r--synapse/util/__init__.py14
-rw-r--r--tests/federation/transport/test_client.py37
4 files changed, 76 insertions, 5 deletions
diff --git a/changelog.d/14065.misc b/changelog.d/14065.misc
new file mode 100644
index 0000000000..98998b0015
--- /dev/null
+++ b/changelog.d/14065.misc
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 1.35.0 where errors parsing a `/send_join` or `/state` response would produce excessive, low-quality Sentry events.
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 32074b8ca6..cd39d4d111 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -45,6 +45,7 @@ from synapse.federation.units import Transaction
 from synapse.http.matrixfederationclient import ByteParser
 from synapse.http.types import QueryParams
 from synapse.types import JsonDict
+from synapse.util import ExceptionBundle
 
 logger = logging.getLogger(__name__)
 
@@ -926,8 +927,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
         return len(data)
 
     def finish(self) -> SendJoinResponse:
-        for c in self._coros:
-            c.close()
+        _close_coros(self._coros)
 
         if self._response.event_dict:
             self._response.event = make_event_from_dict(
@@ -970,6 +970,27 @@ class _StateParser(ByteParser[StateRequestResponse]):
         return len(data)
 
     def finish(self) -> StateRequestResponse:
-        for c in self._coros:
-            c.close()
+        _close_coros(self._coros)
         return self._response
+
+
+def _close_coros(coros: Iterable[Generator[None, bytes, None]]) -> None:
+    """Close each of the given coroutines.
+
+    Always calls .close() on each coroutine, even if doing so raises an exception.
+    Any exceptions raised are aggregated into an ExceptionBundle.
+
+    :raises ExceptionBundle: if at least one coroutine fails to close.
+    """
+    exceptions = []
+    for c in coros:
+        try:
+            c.close()
+        except Exception as e:
+            exceptions.append(e)
+
+    if exceptions:
+        # raise from the first exception so that the traceback has slightly more context
+        raise ExceptionBundle(
+            f"There were {len(exceptions)} errors closing coroutines", exceptions
+        ) from exceptions[0]
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index a90f08dd4c..7be9d5f113 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -15,7 +15,7 @@
 import json
 import logging
 import typing
-from typing import Any, Callable, Dict, Generator, Optional
+from typing import Any, Callable, Dict, Generator, Optional, Sequence
 
 import attr
 from frozendict import frozendict
@@ -193,3 +193,15 @@ def log_failure(
 # Version string with git info. Computed here once so that we don't invoke git multiple
 # times.
 SYNAPSE_VERSION = get_distribution_version_string("matrix-synapse", __file__)
+
+
+class ExceptionBundle(Exception):
+    # A poor stand-in for something like Python 3.11's ExceptionGroup.
+    # (A backport called `exceptiongroup` exists but seems overkill: we just want a
+    # container type here.)
+    def __init__(self, message: str, exceptions: Sequence[Exception]):
+        parts = [message]
+        for e in exceptions:
+            parts.append(str(e))
+        super().__init__("\n  - ".join(parts))
+        self.exceptions = exceptions
diff --git a/tests/federation/transport/test_client.py b/tests/federation/transport/test_client.py
index c2320ce133..0926e0583d 100644
--- a/tests/federation/transport/test_client.py
+++ b/tests/federation/transport/test_client.py
@@ -13,6 +13,7 @@
 # limitations under the License.
 
 import json
+from unittest.mock import Mock
 
 from synapse.api.room_versions import RoomVersions
 from synapse.federation.transport.client import SendJoinParser
@@ -94,3 +95,39 @@ class SendJoinParserTestCase(TestCase):
         # Retrieve and check the parsed SendJoinResponse
         parsed_response = parser.finish()
         self.assertEqual(parsed_response.servers_in_room, ["hs1", "hs2"])
+
+    def test_errors_closing_coroutines(self) -> None:
+        """Check we close all coroutines, even if closing the first raises an Exception.
+
+        We also check that an Exception of some kind is raised, but we don't make any
+        assertions about its attributes or type.
+        """
+        parser = SendJoinParser(RoomVersions.V1, False)
+        response = {"org.matrix.msc3706.servers_in_room": ["hs1", "hs2"]}
+        serialisation = json.dumps(response).encode()
+
+        # Mock the coroutines managed by this parser.
+        # The first one will error when we try to close it.
+        coro_1 = Mock()
+        coro_1.close = Mock(side_effect=RuntimeError("Couldn't close coro 1"))
+
+        coro_2 = Mock()
+
+        coro_3 = Mock()
+        coro_3.close = Mock(side_effect=RuntimeError("Couldn't close coro 3"))
+
+        parser._coros = [coro_1, coro_2, coro_3]
+
+        # Send half of the data to the parser
+        parser.write(serialisation[: len(serialisation) // 2])
+
+        # Close the parser. There should be _some_ kind of exception, but it need not
+        # be that RuntimeError directly. E.g. we might want to raise a wrapper
+        # encompassing multiple errors from multiple coroutines.
+        with self.assertRaises(Exception):
+            parser.finish()
+
+        # In any case, we should have tried to close both coros.
+        coro_1.close.assert_called()
+        coro_2.close.assert_called()
+        coro_3.close.assert_called()