summary refs log tree commit diff
path: root/synapse/http
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/http')
-rw-r--r--synapse/http/client.py24
1 files changed, 23 insertions, 1 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 505872ee90..b80681135e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -31,6 +31,7 @@ from twisted.internet.interfaces import (
     IReactorPluggableNameResolver,
     IResolutionReceiver,
 )
+from twisted.internet.task import Cooperator
 from twisted.python.failure import Failure
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import Agent, HTTPConnectionPool, readBody
@@ -69,6 +70,21 @@ def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
     return False
 
 
+_EPSILON = 0.00000001
+
+
+def _make_scheduler(reactor):
+    """Makes a schedular suitable for a Cooperator using the given reactor.
+
+    (This is effectively just a copy from `twisted.internet.task`)
+    """
+
+    def _scheduler(x):
+        return reactor.callLater(_EPSILON, x)
+
+    return _scheduler
+
+
 class IPBlacklistingResolver(object):
     """
     A proxy for reactor.nameResolver which only produces non-blacklisted IP
@@ -212,6 +228,10 @@ class SimpleHttpClient(object):
         if hs.config.user_agent_suffix:
             self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
 
+        # We use this for our body producers to ensure that they use the correct
+        # reactor.
+        self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor()))
+
         self.user_agent = self.user_agent.encode("ascii")
 
         if self._ip_blacklist:
@@ -292,7 +312,9 @@ class SimpleHttpClient(object):
             try:
                 body_producer = None
                 if data is not None:
-                    body_producer = QuieterFileBodyProducer(BytesIO(data))
+                    body_producer = QuieterFileBodyProducer(
+                        BytesIO(data), cooperator=self._cooperator,
+                    )
 
                 request_deferred = treq.request(
                     method,