Use callbacks to notify tcp replication rather than deferreds
1 files changed, 1 insertions, 14 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 243a81d488..b70fa7334f 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -21,7 +21,6 @@ from twisted.internet.protocol import Factory
from streams import STREAMS_MAP, FederationStream
from protocol import ServerReplicationStreamProtocol
-from synapse.util.logcontext import preserve_fn
from synapse.util.metrics import Measure, measure_func
import logging
@@ -66,7 +65,6 @@ class ReplicationStreamer(object):
def __init__(self, hs):
self.store = hs.get_datastore()
- self.notifier = hs.get_notifier()
self.presence_handler = hs.get_presence_handler()
self.clock = hs.get_clock()
@@ -101,8 +99,7 @@ class ReplicationStreamer(object):
if not hs.config.send_federation:
self.federation_sender = hs.get_federation_sender()
- # Start listening for updates from the notifier
- preserve_fn(self.notifier_listener)()
+ hs.get_notifier().add_replication_callback(self.on_notifier_poke)
# Keeps track of whether we are currently checking for updates
self.is_looping = False
@@ -116,16 +113,6 @@ class ReplicationStreamer(object):
conn.send_error("server shutting down")
@defer.inlineCallbacks
- def notifier_listener(self):
- """Sits forever looping on the notifier waiting for new data.
- """
- while True:
- yield self.notifier.wait_once_for_replication()
- logger.debug("Woken up by notifier")
- # We need to call this each time we get woken up, as per docstring
- preserve_fn(self.on_notifier_poke)()
-
- @defer.inlineCallbacks
def on_notifier_poke(self):
"""Checks if there is actually any new data and sends it to the
connections if there are.
|