diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6eaa65071e..f00cd8c588 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,10 +18,13 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, ObservableDeferred
+from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import StreamToken
import synapse.metrics
+from collections import namedtuple
+
import logging
@@ -71,7 +74,8 @@ class _NotifierUserStream(object):
self.current_token = current_token
self.last_notified_ms = time_now_ms
- self.notify_deferred = ObservableDeferred(defer.Deferred())
+ with PreserveLoggingContext():
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
def notify(self, stream_key, stream_id, time_now_ms):
"""Notify any listeners for this user of a new event from an
@@ -86,8 +90,10 @@ class _NotifierUserStream(object):
)
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
- self.notify_deferred = ObservableDeferred(defer.Deferred())
- noify_deferred.callback(self.current_token)
+
+ with PreserveLoggingContext():
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
+ noify_deferred.callback(self.current_token)
def remove(self, notifier):
""" Remove this listener from all the indexes in the Notifier
@@ -118,6 +124,11 @@ class _NotifierUserStream(object):
return _NotificationListener(self.notify_deferred.observe())
+class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
+ def __nonzero__(self):
+ return bool(self.events)
+
+
class Notifier(object):
""" This class is responsible for notifying any listeners when there are
new events available for it.
@@ -148,6 +159,8 @@ class Notifier(object):
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
)
+ self.replication_deferred = ObservableDeferred(defer.Deferred())
+
# This is not a very cheap test to perform, but it's only executed
# when rendering the metrics page, which is likely once per minute at
# most when scraping it.
@@ -177,8 +190,6 @@ class Notifier(object):
lambda: count(bool, self.appservice_to_user_streams.values()),
)
- @log_function
- @defer.inlineCallbacks
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
""" Used by handlers to inform the notifier something has happened
@@ -192,12 +203,13 @@ class Notifier(object):
until all previous events have been persisted before notifying
the client streams.
"""
- yield run_on_reactor()
+ with PreserveLoggingContext():
+ self.pending_new_room_events.append((
+ room_stream_id, event, extra_users
+ ))
+ self._notify_pending_new_room_events(max_room_stream_id)
- self.pending_new_room_events.append((
- room_stream_id, event, extra_users
- ))
- self._notify_pending_new_room_events(max_room_stream_id)
+ self.notify_replication()
def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous
@@ -244,35 +256,41 @@ class Notifier(object):
extra_streams=app_streams,
)
- @defer.inlineCallbacks
- @log_function
def on_new_event(self, stream_key, new_token, users=[], rooms=[],
extra_streams=set()):
""" Used to inform listeners that something has happend event wise.
Will wake up all listeners for the given users and rooms.
"""
- yield run_on_reactor()
- user_streams = set()
+ with PreserveLoggingContext():
+ user_streams = set()
- for user in users:
- user_stream = self.user_to_user_stream.get(str(user))
- if user_stream is not None:
- user_streams.add(user_stream)
+ for user in users:
+ user_stream = self.user_to_user_stream.get(str(user))
+ if user_stream is not None:
+ user_streams.add(user_stream)
- for room in rooms:
- user_streams |= self.room_to_user_streams.get(room, set())
+ for room in rooms:
+ user_streams |= self.room_to_user_streams.get(room, set())
- time_now_ms = self.clock.time_msec()
- for user_stream in user_streams:
- try:
- user_stream.notify(stream_key, new_token, time_now_ms)
- except:
- logger.exception("Failed to notify listener")
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
+ try:
+ user_stream.notify(stream_key, new_token, time_now_ms)
+ except:
+ logger.exception("Failed to notify listener")
+
+ self.notify_replication()
+
+ def on_new_replication_data(self):
+ """Used to inform replication listeners that something has happend
+ without waking up any of the normal user event streams"""
+ with PreserveLoggingContext():
+ self.notify_replication()
@defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
- from_token=StreamToken("s0", "0", "0", "0", "0")):
+ from_token=StreamToken.START):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
@@ -301,7 +319,7 @@ class Notifier(object):
def timed_out():
if listener:
listener.deferred.cancel()
- timer = self.clock.call_later(timeout/1000., timed_out)
+ timer = self.clock.call_later(timeout / 1000., timed_out)
prev_token = from_token
while not result:
@@ -318,7 +336,8 @@ class Notifier(object):
# that we don't miss any current_token updates.
prev_token = current_token
listener = user_stream.new_listener(prev_token)
- yield listener.deferred
+ with PreserveLoggingContext():
+ yield listener.deferred
except defer.CancelledError:
break
@@ -356,7 +375,7 @@ class Notifier(object):
@defer.inlineCallbacks
def check_for_updates(before_token, after_token):
if not after_token.is_after(before_token):
- defer.returnValue(None)
+ defer.returnValue(EventStreamResult([], (from_token, from_token)))
events = []
end_token = from_token
@@ -369,6 +388,7 @@ class Notifier(object):
continue
if only_keys and name not in only_keys:
continue
+
new_events, new_key = yield source.get_new_events(
user=user,
from_key=getattr(from_token, keyname),
@@ -388,10 +408,7 @@ class Notifier(object):
events.extend(new_events)
end_token = end_token.copy_and_replace(keyname, new_key)
- if events:
- defer.returnValue((events, (from_token, end_token)))
- else:
- defer.returnValue(None)
+ defer.returnValue(EventStreamResult(events, (from_token, end_token)))
user_id_for_stream = user.to_string()
if is_peeking:
@@ -415,9 +432,6 @@ class Notifier(object):
from_token=from_token,
)
- if result is None:
- result = ([], (from_token, from_token))
-
defer.returnValue(result)
@defer.inlineCallbacks
@@ -477,3 +491,45 @@ class Notifier(object):
room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream)
new_user_stream.rooms.add(room_id)
+
+ def notify_replication(self):
+ """Notify the any replication listeners that there's a new event"""
+ with PreserveLoggingContext():
+ deferred = self.replication_deferred
+ self.replication_deferred = ObservableDeferred(defer.Deferred())
+ deferred.callback(None)
+
+ @defer.inlineCallbacks
+ def wait_for_replication(self, callback, timeout):
+ """Wait for an event to happen.
+
+ :param callback:
+ Gets called whenever an event happens. If this returns a truthy
+ value then ``wait_for_replication`` returns, otherwise it waits
+ for another event.
+ :param int timeout:
+ How many milliseconds to wait for callback return a truthy value.
+ :returns:
+ A deferred that resolves with the value returned by the callback.
+ """
+ listener = _NotificationListener(None)
+
+ def timed_out():
+ listener.deferred.cancel()
+
+ timer = self.clock.call_later(timeout / 1000., timed_out)
+ while True:
+ listener.deferred = self.replication_deferred.observe()
+ result = yield callback()
+ if result:
+ break
+
+ try:
+ with PreserveLoggingContext():
+ yield listener.deferred
+ except defer.CancelledError:
+ break
+
+ self.clock.cancel_call_later(timer, ignore_errs=True)
+
+ defer.returnValue(result)
|