summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-15 15:27:35 +0100
committerGitHub <noreply@github.com>2020-07-15 15:27:35 +0100
commitf13061d5153eca9bd7054d5b89ade41f3a430f3b (patch)
tree827797d67d99b93f41e35fac8ff21cb886c01a66 /synapse
parentConvert E2E key and room key handlers to async/await. (#7851) (diff)
downloadsynapse-f13061d5153eca9bd7054d5b89ade41f3a430f3b.tar.xz
Fix client reader sharding tests (#7853)
* Fix client reader sharding tests

* Newsfile

* Fix typing

* Update changelog.d/7853.misc

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>

* Move mocking of http_client to tests

Co-authored-by: Patrick Cloke <clokep@users.noreply.github.com>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/http/client.py24
-rw-r--r--synapse/server.pyi5
2 files changed, 28 insertions, 1 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 505872ee90..b80681135e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -31,6 +31,7 @@ from twisted.internet.interfaces import (
     IReactorPluggableNameResolver,
     IResolutionReceiver,
 )
+from twisted.internet.task import Cooperator
 from twisted.python.failure import Failure
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import Agent, HTTPConnectionPool, readBody
@@ -69,6 +70,21 @@ def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
     return False
 
 
+_EPSILON = 0.00000001
+
+
+def _make_scheduler(reactor):
+    """Makes a schedular suitable for a Cooperator using the given reactor.
+
+    (This is effectively just a copy from `twisted.internet.task`)
+    """
+
+    def _scheduler(x):
+        return reactor.callLater(_EPSILON, x)
+
+    return _scheduler
+
+
 class IPBlacklistingResolver(object):
     """
     A proxy for reactor.nameResolver which only produces non-blacklisted IP
@@ -212,6 +228,10 @@ class SimpleHttpClient(object):
         if hs.config.user_agent_suffix:
             self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
 
+        # We use this for our body producers to ensure that they use the correct
+        # reactor.
+        self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor()))
+
         self.user_agent = self.user_agent.encode("ascii")
 
         if self._ip_blacklist:
@@ -292,7 +312,9 @@ class SimpleHttpClient(object):
             try:
                 body_producer = None
                 if data is not None:
-                    body_producer = QuieterFileBodyProducer(BytesIO(data))
+                    body_producer = QuieterFileBodyProducer(
+                        BytesIO(data), cooperator=self._cooperator,
+                    )
 
                 request_deferred = treq.request(
                     method,
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 58cd099e6d..cd50c721b8 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -20,6 +20,7 @@ import synapse.handlers.room
 import synapse.handlers.room_member
 import synapse.handlers.set_password
 import synapse.http.client
+import synapse.http.matrixfederationclient
 import synapse.notifier
 import synapse.push.pusherpool
 import synapse.replication.tcp.client
@@ -143,3 +144,7 @@ class HomeServer(object):
         pass
     def get_replication_streams(self) -> Dict[str, Stream]:
         pass
+    def get_http_client(
+        self,
+    ) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient:
+        pass