diff --git a/synapse/notifier.py b/synapse/notifier.py
index 560866b26e..3c36a20868 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -159,6 +159,8 @@ class Notifier(object):
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
)
+ self.replication_deferred = ObservableDeferred(defer.Deferred())
+
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
@@ -207,6 +209,8 @@ class Notifier(object):
))
self._notify_pending_new_room_events(max_room_stream_id)
+ self.notify_replication()
+
def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
@@ -276,6 +280,8 @@ class Notifier(object):
except:
logger.exception("Failed to notify listener")
+ self.notify_replication()
+
@defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
from_token=StreamToken("s0", "0", "0", "0", "0")):
@@ -479,3 +485,45 @@ class Notifier(object):
room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream)
new_user_stream.rooms.add(room_id)
+
+ def notify_replication(self):
+ """Notify the any replication listeners that there's a new event"""
+ with PreserveLoggingContext():
+ deferred = self.replication_deferred
+ self.replication_deferred = ObservableDeferred(defer.Deferred())
+ deferred.callback(None)
+
+ @defer.inlineCallbacks
+ def wait_for_replication(self, callback, timeout):
+ """Wait for an event to happen.
+
+ :param callback:
+ Gets called whenever an event happens. If this returns a truthy
+ value then ``wait_for_replication`` returns, otherwise it waits
+ for another event.
+ :param int timeout:
+ How many milliseconds to wait for callback return a truthy value.
+ :returns:
+ A deferred that resolves with the value returned by the callback.
+ """
+ listener = _NotificationListener(None)
+
+ def timed_out():
+ listener.deferred.cancel()
+
+ timer = self.clock.call_later(timeout / 1000., timed_out)
+ while True:
+ listener.deferred = self.replication_deferred.observe()
+ result = yield callback()
+ if result:
+ break
+
+ try:
+ with PreserveLoggingContext():
+ yield listener.deferred
+ except defer.CancelledError:
+ break
+
+ self.clock.cancel_call_later(timer, ignore_errs=True)
+
+ defer.returnValue(result)
|