diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 8ec5ba2012..6116842764 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -12,7 +12,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from ._base import BaseHandler
from synapse.types import UserID, create_requester
@@ -39,7 +39,7 @@ class DeactivateAccountHandler(BaseHandler):
# Start the user parter loop so it can resume parting users from rooms where
# it left off (if it has work left to do).
- reactor.callWhenRunning(self._start_user_parting)
+ hs.get_reactor().callWhenRunning(self._start_user_parting)
@defer.inlineCallbacks
def deactivate_account(self, user_id):
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 7b9946ab91..a812117dea 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -20,7 +20,7 @@ import sys
from canonicaljson import encode_canonical_json
import six
from six import string_types, itervalues, iteritems
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.defer import succeed
from twisted.python.failure import Failure
@@ -157,7 +157,7 @@ class MessageHandler(BaseHandler):
# remove the purge from the list 24 hours after it completes
def clear_purge():
del self._purges_by_id[purge_id]
- reactor.callLater(24 * 3600, clear_purge)
+ self.hs.get_reactor().callLater(24 * 3600, clear_purge)
def get_purge_status(self, purge_id):
"""Get the current status of an active purge
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7fe568132f..7db59fba00 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -22,7 +22,7 @@ The methods that define policy are:
- should_notify
"""
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from contextlib import contextmanager
from six import itervalues, iteritems
@@ -179,7 +179,7 @@ class PresenceHandler(object):
# have not yet been persisted
self.unpersisted_users_changes = set()
- reactor.addSystemEventTrigger("before", "shutdown", self._on_shutdown)
+ hs.get_reactor().addSystemEventTrigger("before", "shutdown", self._on_shutdown)
self.serial_to_user = {}
self._next_serial = 1
diff --git a/synapse/http/endpoint.py b/synapse/http/endpoint.py
index 87a482650d..928c1c7407 100644
--- a/synapse/http/endpoint.py
+++ b/synapse/http/endpoint.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
from twisted.internet.endpoints import HostnameEndpoint, wrapClientTLS
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import ConnectError
from twisted.names import client, dns
from twisted.names.error import DNSNameError, DomainError
@@ -78,17 +78,18 @@ def matrix_federation_endpoint(reactor, destination, ssl_context_factory=None,
else:
return _WrappingEndpointFac(transport_endpoint(
reactor, domain, port, **endpoint_kw_args
- ))
+ ), reactor)
class _WrappingEndpointFac(object):
- def __init__(self, endpoint_fac):
+ def __init__(self, endpoint_fac, reactor):
self.endpoint_fac = endpoint_fac
+ self.reactor = reactor
@defer.inlineCallbacks
def connect(self, protocolFactory):
conn = yield self.endpoint_fac.connect(protocolFactory)
- conn = _WrappedConnection(conn)
+ conn = _WrappedConnection(conn, self.reactor)
defer.returnValue(conn)
@@ -98,9 +99,10 @@ class _WrappedConnection(object):
"""
__slots__ = ["conn", "last_request"]
- def __init__(self, conn):
+ def __init__(self, conn, reactor):
object.__setattr__(self, "conn", conn)
object.__setattr__(self, "last_request", time.time())
+ self._reactor = reactor
def __getattr__(self, name):
return getattr(self.conn, name)
@@ -131,14 +133,14 @@ class _WrappedConnection(object):
# Time this connection out if we haven't send a request in the last
# N minutes
# TODO: Cancel the previous callLater?
- reactor.callLater(3 * 60, self._time_things_out_maybe)
+ self._reactor.callLater(3 * 60, self._time_things_out_maybe)
d = self.conn.request(request)
def update_request_time(res):
self.last_request = time.time()
# TODO: Cancel the previous callLater?
- reactor.callLater(3 * 60, self._time_things_out_maybe)
+ self._reactor.callLater(3 * 60, self._time_things_out_maybe)
return res
d.addCallback(update_request_time)
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index ba7286cb72..52d4f087ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
import logging
@@ -199,7 +199,7 @@ class EmailPusher(object):
self.timed_call = None
if soonest_due_at is not None:
- self.timed_call = reactor.callLater(
+ self.timed_call = self.hs.get_reactor().callLater(
self.seconds_until(soonest_due_at), self.on_timer
)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index bf7ff74a1a..7a481b5a1e 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -15,7 +15,7 @@
# limitations under the License.
import logging
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from . import push_rule_evaluator
@@ -220,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
- self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.timed_call = self.hs.get_reactor().callLater(
+ self.backoff_delay, self.on_timer
+ )
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index 6d2513c4e2..bb852b00af 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -15,7 +15,7 @@
"""A replication client for use by synapse workers.
"""
-from twisted.internet import reactor, defer
+from twisted.internet import defer
from twisted.internet.protocol import ReconnectingClientFactory
from .commands import (
@@ -44,7 +44,7 @@ class ReplicationClientFactory(ReconnectingClientFactory):
self.server_name = hs.config.server_name
self._clock = hs.get_clock() # As self.clock is defined in super class
- reactor.addSystemEventTrigger("before", "shutdown", self.stopTrying)
+ hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.stopTrying)
def startedConnecting(self, connector):
logger.info("Connecting to replication: %r", connector.getDestination())
@@ -95,7 +95,7 @@ class ReplicationClientHandler(object):
factory = ReplicationClientFactory(hs, client_name, self)
host = hs.config.worker_replication_host
port = hs.config.worker_replication_port
- reactor.connectTCP(host, port, factory)
+ hs.get_reactor().connectTCP(host, port, factory)
def on_rdata(self, stream_name, token, rows):
"""Called when we get new replication data. By default this just pokes
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 63bd6d2652..95ad8c1b4c 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -15,7 +15,7 @@
"""The server side of the replication stream.
"""
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.protocol import Factory
from .streams import STREAMS_MAP, FederationStream
@@ -109,7 +109,7 @@ class ReplicationStreamer(object):
self.is_looping = False
self.pending_updates = False
- reactor.addSystemEventTrigger("before", "shutdown", self.on_shutdown)
+ hs.get_reactor().addSystemEventTrigger("before", "shutdown", self.on_shutdown)
def on_shutdown(self):
# close all connections on shutdown
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 2a3df7c71d..e9886ef299 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -34,6 +34,9 @@ def unwrapFirstError(failure):
class Clock(object):
"""
A Clock wraps a Twisted reactor and provides utilities on top of it.
+
+ Args:
+ reactor: The Twisted reactor to use.
"""
_reactor = attr.ib()
|