summary refs log tree commit diff
diff options
context:
space:
mode:
authorSean Quah <8349537+squahtx@users.noreply.github.com>2022-05-27 11:03:05 +0100
committerGitHub <noreply@github.com>2022-05-27 11:03:05 +0100
commitbb7a6377652755584c327d28b131c467d999280d (patch)
treeb7ef4629948d813cba7f0127ad1fd46c838c81d1
parentAdd an option allowing users to use their password to reauthenticate even tho... (diff)
downloadsynapse-bb7a6377652755584c327d28b131c467d999280d.tar.xz
Close `ijson` coroutines ourselves instead of letting the GC close them (#12875)
Hopefully this means that exceptions raised due to truncated JSON
get a sensible logging context and stack.

Signed-off-by: Sean Quah <seanq@matrix.org>
-rw-r--r--changelog.d/12875.bugfix1
-rw-r--r--synapse/federation/transport/client.py9
-rw-r--r--synapse/http/matrixfederationclient.py11
3 files changed, 19 insertions, 2 deletions
diff --git a/changelog.d/12875.bugfix b/changelog.d/12875.bugfix
new file mode 100644
index 0000000000..7b011e0f90
--- /dev/null
+++ b/changelog.d/12875.bugfix
@@ -0,0 +1 @@
+Explicitly close `ijson` coroutines once we are done with them, instead of leaving the garbage collector to close them.
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 9ce06dfa28..2686ee2e51 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -1363,7 +1363,7 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
     def __init__(self, room_version: RoomVersion, v1_api: bool):
         self._response = SendJoinResponse([], [], event_dict={})
         self._room_version = room_version
-        self._coros = []
+        self._coros: List[Generator[None, bytes, None]] = []
 
         # The V1 API has the shape of `[200, {...}]`, which we handle by
         # prefixing with `item.*`.
@@ -1411,6 +1411,9 @@ class SendJoinParser(ByteParser[SendJoinResponse]):
         return len(data)
 
     def finish(self) -> SendJoinResponse:
+        for c in self._coros:
+            c.close()
+
         if self._response.event_dict:
             self._response.event = make_event_from_dict(
                 self._response.event_dict, self._room_version
@@ -1430,7 +1433,7 @@ class _StateParser(ByteParser[StateRequestResponse]):
     def __init__(self, room_version: RoomVersion):
         self._response = StateRequestResponse([], [])
         self._room_version = room_version
-        self._coros = [
+        self._coros: List[Generator[None, bytes, None]] = [
             ijson.items_coro(
                 _event_list_parser(room_version, self._response.state),
                 "pdus.item",
@@ -1449,4 +1452,6 @@ class _StateParser(ByteParser[StateRequestResponse]):
         return len(data)
 
     def finish(self) -> StateRequestResponse:
+        for c in self._coros:
+            c.close()
         return self._response
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 0b9475debd..901c47f756 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -225,6 +225,7 @@ async def _handle_response(
     if max_response_size is None:
         max_response_size = MAX_RESPONSE_SIZE
 
+    finished = False
     try:
         check_content_type_is(response.headers, parser.CONTENT_TYPE)
 
@@ -233,6 +234,7 @@ async def _handle_response(
 
         length = await make_deferred_yieldable(d)
 
+        finished = True
         value = parser.finish()
     except BodyExceededMaxSize as e:
         # The response was too big.
@@ -283,6 +285,15 @@ async def _handle_response(
             e,
         )
         raise
+    finally:
+        if not finished:
+            # There was an exception and we didn't `finish()` the parse.
+            # Let the parser know that it can free up any resources.
+            try:
+                parser.finish()
+            except Exception:
+                # Ignore any additional exceptions.
+                pass
 
     time_taken_secs = reactor.seconds() - start_ms / 1000