summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/notifier.py17
-rw-r--r--synapse/replication/tcp/resource.py15
2 files changed, 12 insertions, 20 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f9fcc0ca25..4fda184b7a 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -163,6 +163,8 @@ class Notifier(object):
         self.store = hs.get_datastore()
         self.pending_new_room_events = []
 
+        self.replication_callbacks = []
+
         self.clock = hs.get_clock()
         self.appservice_handler = hs.get_application_service_handler()
 
@@ -202,6 +204,12 @@ class Notifier(object):
             lambda: len(self.user_to_user_stream),
         )
 
+    def add_replication_callback(self, cb):
+        """Add a callback that will be called when some new data is available.
+        Callback is not given any arguments.
+        """
+        self.replication_callbacks.append(cb)
+
     @preserve_fn
     def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
                           extra_users=[]):
@@ -510,6 +518,9 @@ class Notifier(object):
             self.replication_deferred = ObservableDeferred(defer.Deferred())
             deferred.callback(None)
 
+        for cb in self.replication_callbacks:
+            preserve_fn(cb)()
+
     @defer.inlineCallbacks
     def wait_for_replication(self, callback, timeout):
         """Wait for an event to happen.
@@ -550,9 +561,3 @@ class Notifier(object):
                 break
 
         defer.returnValue(result)
-
-    def wait_once_for_replication(self):
-        """Returns a deferred which resolves when there is new data for
-        replication to handle.
-        """
-        return self.replication_deferred.observe()
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 243a81d488..b70fa7334f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -21,7 +21,6 @@ from twisted.internet.protocol import Factory
 from streams import STREAMS_MAP, FederationStream
 from protocol import ServerReplicationStreamProtocol
 
-from synapse.util.logcontext import preserve_fn
 from synapse.util.metrics import Measure, measure_func
 
 import logging
@@ -66,7 +65,6 @@ class ReplicationStreamer(object):
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
-        self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
         self.clock = hs.get_clock()
 
@@ -101,8 +99,7 @@ class ReplicationStreamer(object):
         if not hs.config.send_federation:
             self.federation_sender = hs.get_federation_sender()
 
-        # Start listening for updates from the notifier
-        preserve_fn(self.notifier_listener)()
+        hs.get_notifier().add_replication_callback(self.on_notifier_poke)
 
         # Keeps track of whether we are currently checking for updates
         self.is_looping = False
@@ -116,16 +113,6 @@ class ReplicationStreamer(object):
             conn.send_error("server shutting down")
 
     @defer.inlineCallbacks
-    def notifier_listener(self):
-        """Sits forever looping on the notifier waiting for new data.
-        """
-        while True:
-            yield self.notifier.wait_once_for_replication()
-            logger.debug("Woken up by notifier")
-            # We need to call this each time we get woken up, as per docstring
-            preserve_fn(self.on_notifier_poke)()
-
-    @defer.inlineCallbacks
     def on_notifier_poke(self):
         """Checks if there is actually any new data and sends it to the
         connections if there are.