summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/presence.py4
-rw-r--r--synapse/http/endpoint.py16
-rw-r--r--synapse/push/emailpusher.py4
-rw-r--r--synapse/push/httppusher.py6
-rw-r--r--synapse/replication/tcp/client.py6
-rw-r--r--synapse/replication/tcp/resource.py4
-rw-r--r--synapse/util/__init__.py3
9 files changed, 29 insertions, 22 deletions
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 8ec5ba2012..6116842764 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,7 +12,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 
 from ._base import BaseHandler
 from synapse.types import UserID, create_requester
@@ -39,7 +39,7 @@ class DeactivateAccountHandler(BaseHandler):
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
-        reactor.callWhenRunning(self._start_user_parting)
+        hs.get_reactor().callWhenRunning(self._start_user_parting)
 
     @defer.inlineCallbacks
     def deactivate_account(self, user_id):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b9946ab91..a812117dea 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,7 +20,7 @@ import sys
 from canonicaljson import encode_canonical_json
 import six
 from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.defer import succeed
 from twisted.python.failure import Failure
 
@@ -157,7 +157,7 @@ class MessageHandler(BaseHandler):
             # remove the purge from the list 24 hours after it completes
             def clear_purge():
                 del self._purges_by_id[purge_id]
-            reactor.callLater(24 * 3600, clear_purge)
+            self.hs.get_reactor().callLater(24 * 3600, clear_purge)
 
     def get_purge_status(self, purge_id):
         """Get the current status of an active purge
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..7db59fba00 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,7 +22,7 @@ The methods that define policy are:
     - should_notify
 """
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from contextlib import contextmanager
 
 from six import itervalues, iteritems
@@ -179,7 +179,7 @@ class PresenceHandler(object):
         # have not yet been persisted
         self.unpersisted_users_changes = set()
 
-        reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
 
         self.serial_to_user = {}
         self._next_serial = 1
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 87a482650d..928c1c7407 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.error import ConnectError
 from twisted.names import client, dns
 from twisted.names.error import DNSNameError, DomainError
@@ -78,17 +78,18 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
     else:
         return _WrappingEndpointFac(transport_endpoint(
             reactor, domain, port, **endpoint_kw_args
-        ))
+        ), reactor)
 
 
 class _WrappingEndpointFac(object):
-    def __init__(self, endpoint_fac):
+    def __init__(self, endpoint_fac, reactor):
         self.endpoint_fac = endpoint_fac
+        self.reactor = reactor
 
     @defer.inlineCallbacks
     def connect(self, protocolFactory):
         conn = yield self.endpoint_fac.connect(protocolFactory)
-        conn = _WrappedConnection(conn)
+        conn = _WrappedConnection(conn, self.reactor)
         defer.returnValue(conn)
 
 
@@ -98,9 +99,10 @@ class _WrappedConnection(object):
     """
     __slots__ = ["conn", "last_request"]
 
-    def __init__(self, conn):
+    def __init__(self, conn, reactor):
         object.__setattr__(self, "conn", conn)
         object.__setattr__(self, "last_request", time.time())
+        self._reactor = reactor
 
     def __getattr__(self, name):
         return getattr(self.conn, name)
@@ -131,14 +133,14 @@ class _WrappedConnection(object):
         # Time this connection out if we haven't send a request in the last
         # N minutes
         # TODO: Cancel the previous callLater?
-        reactor.callLater(3 * 60, self._time_things_out_maybe)
+        self._reactor.callLater(3 * 60, self._time_things_out_maybe)
 
         d = self.conn.request(request)
 
         def update_request_time(res):
             self.last_request = time.time()
             # TODO: Cancel the previous callLater?
-            reactor.callLater(3 * 60, self._time_things_out_maybe)
+            self._reactor.callLater(3 * 60, self._time_things_out_maybe)
             return res
 
         d.addCallback(update_request_time)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index ba7286cb72..52d4f087ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 import logging
@@ -199,7 +199,7 @@ class EmailPusher(object):
                     self.timed_call = None
 
         if soonest_due_at is not None:
-            self.timed_call = reactor.callLater(
+            self.timed_call = self.hs.get_reactor().callLater(
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bf7ff74a1a..7a481b5a1e 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from . import push_rule_evaluator
@@ -220,7 +220,9 @@ class HttpPusher(object):
                     )
                 else:
                     logger.info("Push failed: delaying for %ds", self.backoff_delay)
-                    self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+                    self.timed_call = self.hs.get_reactor().callLater(
+                        self.backoff_delay, self.on_timer
+                    )
                     self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
                     break
 
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6d2513c4e2..bb852b00af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -15,7 +15,7 @@
 """A replication client for use by synapse workers.
 """
 
-from twisted.internet import reactor, defer
+from twisted.internet import defer
 from twisted.internet.protocol import ReconnectingClientFactory
 
 from .commands import (
@@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
         self.server_name = hs.config.server_name
         self._clock = hs.get_clock()  # As self.clock is defined in super class
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
 
     def startedConnecting(self, connector):
         logger.info("Connecting to replication: %r", connector.getDestination())
@@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
         factory = ReplicationClientFactory(hs, client_name, self)
         host = hs.config.worker_replication_host
         port = hs.config.worker_replication_port
-        reactor.connectTCP(host, port, factory)
+        hs.get_reactor().connectTCP(host, port, factory)
 
     def on_rdata(self, stream_name, token, rows):
         """Called when we get new replication data. By default this just pokes
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 63bd6d2652..95ad8c1b4c 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -15,7 +15,7 @@
 """The server side of the replication stream.
 """
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.protocol import Factory
 
 from .streams import STREAMS_MAP, FederationStream
@@ -109,7 +109,7 @@ class ReplicationStreamer(object):
         self.is_looping = False
         self.pending_updates = False
 
-        reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
+        hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
 
     def on_shutdown(self):
         # close all connections on shutdown
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 2a3df7c71d..e9886ef299 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -34,6 +34,9 @@ def unwrapFirstError(failure):
 class Clock(object):
     """
     A Clock wraps a Twisted reactor and provides utilities on top of it.
+
+    Args:
+        reactor: The Twisted reactor to use.
     """
     _reactor = attr.ib()