summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/server.py75
1 files changed, 43 insertions, 32 deletions
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 37fdf14405..8d791bd2ca 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -500,7 +500,7 @@ class RootOptionsRedirectResource(OptionsResource, RootRedirect):
     pass
 
 
-@implementer(interfaces.IPullProducer)
+@implementer(interfaces.IPushProducer)
 class _ByteProducer:
     """
     Iteratively write bytes to the request.
@@ -515,52 +515,64 @@ class _ByteProducer:
     ):
         self._request = request
         self._iterator = iterator
+        self._paused = False
 
-    def start(self) -> None:
-        self._request.registerProducer(self, False)
+        # Register the producer and start producing data.
+        self._request.registerProducer(self, True)
+        self.resumeProducing()
 
     def _send_data(self, data: List[bytes]) -> None:
         """
-        Send a list of strings as a response to the request.
+        Send a list of bytes as a chunk of a response.
         """
         if not data:
             return
         self._request.write(b"".join(data))
 
+    def pauseProducing(self) -> None:
+        self._paused = True
+
     def resumeProducing(self) -> None:
         # We've stopped producing in the meantime (note that this might be
         # re-entrant after calling write).
         if not self._request:
             return
 
-        # Get the next chunk and write it to the request.
-        #
-        # The output of the JSON encoder is coalesced until min_chunk_size is
-        # reached. (This is because JSON encoders produce a very small output
-        # per iteration.)
-        #
-        # Note that buffer stores a list of bytes (instead of appending to
-        # bytes) to hopefully avoid many allocations.
-        buffer = []
-        buffered_bytes = 0
-        while buffered_bytes < self.min_chunk_size:
-            try:
-                data = next(self._iterator)
-                buffer.append(data)
-                buffered_bytes += len(data)
-            except StopIteration:
-                # The entire JSON object has been serialized, write any
-                # remaining data, finalize the producer and the request, and
-                # clean-up any references.
-                self._send_data(buffer)
-                self._request.unregisterProducer()
-                self._request.finish()
-                self.stopProducing()
-                return
-
-        self._send_data(buffer)
+        self._paused = False
+
+        # Write until there's backpressure telling us to stop.
+        while not self._paused:
+            # Get the next chunk and write it to the request.
+            #
+            # The output of the JSON encoder is buffered and coalesced until
+            # min_chunk_size is reached. This is because JSON encoders produce
+            # very small output per iteration and the Request object converts
+            # each call to write() to a separate chunk. Without this there would
+            # be an explosion in bytes written (e.g. b"{" becoming "1\r\n{\r\n").
+            #
+            # Note that buffer stores a list of bytes (instead of appending to
+            # bytes) to hopefully avoid many allocations.
+            buffer = []
+            buffered_bytes = 0
+            while buffered_bytes < self.min_chunk_size:
+                try:
+                    data = next(self._iterator)
+                    buffer.append(data)
+                    buffered_bytes += len(data)
+                except StopIteration:
+                    # The entire JSON object has been serialized, write any
+                    # remaining data, finalize the producer and the request, and
+                    # clean-up any references.
+                    self._send_data(buffer)
+                    self._request.unregisterProducer()
+                    self._request.finish()
+                    self.stopProducing()
+                    return
+
+            self._send_data(buffer)
 
     def stopProducing(self) -> None:
+        # Clear a circular reference.
         self._request = None
 
 
@@ -620,8 +632,7 @@ def respond_with_json(
     if send_cors:
         set_cors_headers(request)
 
-    producer = _ByteProducer(request, encoder(json_object))
-    producer.start()
+    _ByteProducer(request, encoder(json_object))
     return NOT_DONE_YET