diff --git a/synapse/http/client.py b/synapse/http/client.py
index 505872ee90..b80681135e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -31,6 +31,7 @@ from twisted.internet.interfaces import (
IReactorPluggableNameResolver,
IResolutionReceiver,
)
+from twisted.internet.task import Cooperator
from twisted.python.failure import Failure
from twisted.web._newclient import ResponseDone
from twisted.web.client import Agent, HTTPConnectionPool, readBody
@@ -69,6 +70,21 @@ def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
return False
+_EPSILON = 0.00000001
+
+
+def _make_scheduler(reactor):
+ """Makes a schedular suitable for a Cooperator using the given reactor.
+
+ (This is effectively just a copy from `twisted.internet.task`)
+ """
+
+ def _scheduler(x):
+ return reactor.callLater(_EPSILON, x)
+
+ return _scheduler
+
+
class IPBlacklistingResolver(object):
"""
A proxy for reactor.nameResolver which only produces non-blacklisted IP
@@ -212,6 +228,10 @@ class SimpleHttpClient(object):
if hs.config.user_agent_suffix:
self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
+ # We use this for our body producers to ensure that they use the correct
+ # reactor.
+ self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor()))
+
self.user_agent = self.user_agent.encode("ascii")
if self._ip_blacklist:
@@ -292,7 +312,9 @@ class SimpleHttpClient(object):
try:
body_producer = None
if data is not None:
- body_producer = QuieterFileBodyProducer(BytesIO(data))
+ body_producer = QuieterFileBodyProducer(
+ BytesIO(data), cooperator=self._cooperator,
+ )
request_deferred = treq.request(
method,
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 58cd099e6d..cd50c721b8 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -20,6 +20,7 @@ import synapse.handlers.room
import synapse.handlers.room_member
import synapse.handlers.set_password
import synapse.http.client
+import synapse.http.matrixfederationclient
import synapse.notifier
import synapse.push.pusherpool
import synapse.replication.tcp.client
@@ -143,3 +144,7 @@ class HomeServer(object):
pass
def get_replication_streams(self) -> Dict[str, Stream]:
pass
+ def get_http_client(
+ self,
+ ) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient:
+ pass
|