summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-03-31 13:36:38 +0100
committerErik Johnston <erik@matrix.org>2017-03-31 15:42:51 +0100
commit1df7c28661207df8575fd519ce9c23690b9156ec (patch)
tree552fe2ad1e12d81baec550c1dc47c2eeed71639e /synapse/replication/tcp/resource.py
parentAdd a timestamp to USER_SYNC command (diff)
downloadsynapse-1df7c28661207df8575fd519ce9c23690b9156ec.tar.xz
Use callbacks to notify tcp replication rather than deferreds
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py15
1 files changed, 1 insertions, 14 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 243a81d488..b70fa7334f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -21,7 +21,6 @@ from twisted.internet.protocol import Factory
 from streams import STREAMS_MAP, FederationStream
 from protocol import ServerReplicationStreamProtocol
 
-from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import Measure, measure_func
 
 import logging
@@ -66,7 +65,6 @@ class ReplicationStreamer(object):
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
 
@@ -101,8 +99,7 @@ class ReplicationStreamer(object):
         if not hs.config.send_federation:
             self.federation_sender = hs.get_federation_sender()
 
-        # Start listening for updates from the notifier
-        preserve_fn(self.notifier_listener)()
+        hs.get_notifier().add_replication_callback(self.on_notifier_poke)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False
@@ -116,16 +113,6 @@ class ReplicationStreamer(object):
             conn.send_error("server shutting down")
 
     @defer.inlineCallbacks
-    def notifier_listener(self):
-        """Sits forever looping on the notifier waiting for new data.
-        """
-        while True:
-            yield self.notifier.wait_once_for_replication()
-            logger.debug("Woken up by notifier")
-            # We need to call this each time we get woken up, as per docstring
-            preserve_fn(self.on_notifier_poke)()
-
-    @defer.inlineCallbacks
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the
         connections if there are.