summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py132
1 files changed, 94 insertions, 38 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 6eaa65071e..f00cd8c588 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,10 +18,13 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError
 
 from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, ObservableDeferred
+from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import PreserveLoggingContext
 from synapse.types import StreamToken
 import synapse.metrics
 
+from collections import namedtuple
+
 import logging
 
 
@@ -71,7 +74,8 @@ class _NotifierUserStream(object):
         self.current_token = current_token
         self.last_notified_ms = time_now_ms
 
-        self.notify_deferred = ObservableDeferred(defer.Deferred())
+        with PreserveLoggingContext():
+            self.notify_deferred = ObservableDeferred(defer.Deferred())
 
     def notify(self, stream_key, stream_id, time_now_ms):
         """Notify any listeners for this user of a new event from an
@@ -86,8 +90,10 @@ class _NotifierUserStream(object):
         )
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
-        self.notify_deferred = ObservableDeferred(defer.Deferred())
-        noify_deferred.callback(self.current_token)
+
+        with PreserveLoggingContext():
+            self.notify_deferred = ObservableDeferred(defer.Deferred())
+            noify_deferred.callback(self.current_token)
 
     def remove(self, notifier):
         """ Remove this listener from all the indexes in the Notifier
@@ -118,6 +124,11 @@ class _NotifierUserStream(object):
             return _NotificationListener(self.notify_deferred.observe())
 
 
+class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
+    def __nonzero__(self):
+        return bool(self.events)
+
+
 class Notifier(object):
     """ This class is responsible for notifying any listeners when there are
     new events available for it.
@@ -148,6 +159,8 @@ class Notifier(object):
             self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
         )
 
+        self.replication_deferred = ObservableDeferred(defer.Deferred())
+
         # This is not a very cheap test to perform, but it's only executed
         # when rendering the metrics page, which is likely once per minute at
         # most when scraping it.
@@ -177,8 +190,6 @@ class Notifier(object):
             lambda: count(bool, self.appservice_to_user_streams.values()),
         )
 
-    @log_function
-    @defer.inlineCallbacks
     def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
                           extra_users=[]):
         """ Used by handlers to inform the notifier something has happened
@@ -192,12 +203,13 @@ class Notifier(object):
         until all previous events have been persisted before notifying
         the client streams.
         """
-        yield run_on_reactor()
+        with PreserveLoggingContext():
+            self.pending_new_room_events.append((
+                room_stream_id, event, extra_users
+            ))
+            self._notify_pending_new_room_events(max_room_stream_id)
 
-        self.pending_new_room_events.append((
-            room_stream_id, event, extra_users
-        ))
-        self._notify_pending_new_room_events(max_room_stream_id)
+            self.notify_replication()
 
     def _notify_pending_new_room_events(self, max_room_stream_id):
         """Notify for the room events that were queued waiting for a previous
@@ -244,35 +256,41 @@ class Notifier(object):
             extra_streams=app_streams,
         )
 
-    @defer.inlineCallbacks
-    @log_function
     def on_new_event(self, stream_key, new_token, users=[], rooms=[],
                      extra_streams=set()):
         """ Used to inform listeners that something has happend event wise.
 
         Will wake up all listeners for the given users and rooms.
         """
-        yield run_on_reactor()
-        user_streams = set()
+        with PreserveLoggingContext():
+            user_streams = set()
 
-        for user in users:
-            user_stream = self.user_to_user_stream.get(str(user))
-            if user_stream is not None:
-                user_streams.add(user_stream)
+            for user in users:
+                user_stream = self.user_to_user_stream.get(str(user))
+                if user_stream is not None:
+                    user_streams.add(user_stream)
 
-        for room in rooms:
-            user_streams |= self.room_to_user_streams.get(room, set())
+            for room in rooms:
+                user_streams |= self.room_to_user_streams.get(room, set())
 
-        time_now_ms = self.clock.time_msec()
-        for user_stream in user_streams:
-            try:
-                user_stream.notify(stream_key, new_token, time_now_ms)
-            except:
-                logger.exception("Failed to notify listener")
+            time_now_ms = self.clock.time_msec()
+            for user_stream in user_streams:
+                try:
+                    user_stream.notify(stream_key, new_token, time_now_ms)
+                except:
+                    logger.exception("Failed to notify listener")
+
+            self.notify_replication()
+
+    def on_new_replication_data(self):
+        """Used to inform replication listeners that something has happend
+        without waking up any of the normal user event streams"""
+        with PreserveLoggingContext():
+            self.notify_replication()
 
     @defer.inlineCallbacks
     def wait_for_events(self, user_id, timeout, callback, room_ids=None,
-                        from_token=StreamToken("s0", "0", "0", "0", "0")):
+                        from_token=StreamToken.START):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
@@ -301,7 +319,7 @@ class Notifier(object):
             def timed_out():
                 if listener:
                     listener.deferred.cancel()
-            timer = self.clock.call_later(timeout/1000., timed_out)
+            timer = self.clock.call_later(timeout / 1000., timed_out)
 
             prev_token = from_token
             while not result:
@@ -318,7 +336,8 @@ class Notifier(object):
                     # that we don't miss any current_token updates.
                     prev_token = current_token
                     listener = user_stream.new_listener(prev_token)
-                    yield listener.deferred
+                    with PreserveLoggingContext():
+                        yield listener.deferred
                 except defer.CancelledError:
                     break
 
@@ -356,7 +375,7 @@ class Notifier(object):
         @defer.inlineCallbacks
         def check_for_updates(before_token, after_token):
             if not after_token.is_after(before_token):
-                defer.returnValue(None)
+                defer.returnValue(EventStreamResult([], (from_token, from_token)))
 
             events = []
             end_token = from_token
@@ -369,6 +388,7 @@ class Notifier(object):
                     continue
                 if only_keys and name not in only_keys:
                     continue
+
                 new_events, new_key = yield source.get_new_events(
                     user=user,
                     from_key=getattr(from_token, keyname),
@@ -388,10 +408,7 @@ class Notifier(object):
                 events.extend(new_events)
                 end_token = end_token.copy_and_replace(keyname, new_key)
 
-            if events:
-                defer.returnValue((events, (from_token, end_token)))
-            else:
-                defer.returnValue(None)
+            defer.returnValue(EventStreamResult(events, (from_token, end_token)))
 
         user_id_for_stream = user.to_string()
         if is_peeking:
@@ -415,9 +432,6 @@ class Notifier(object):
             from_token=from_token,
         )
 
-        if result is None:
-            result = ([], (from_token, from_token))
-
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -477,3 +491,45 @@ class Notifier(object):
             room_streams = self.room_to_user_streams.setdefault(room_id, set())
             room_streams.add(new_user_stream)
             new_user_stream.rooms.add(room_id)
+
+    def notify_replication(self):
+        """Notify the any replication listeners that there's a new event"""
+        with PreserveLoggingContext():
+            deferred = self.replication_deferred
+            self.replication_deferred = ObservableDeferred(defer.Deferred())
+            deferred.callback(None)
+
+    @defer.inlineCallbacks
+    def wait_for_replication(self, callback, timeout):
+        """Wait for an event to happen.
+
+        :param callback:
+            Gets called whenever an event happens. If this returns a truthy
+            value then ``wait_for_replication`` returns, otherwise it waits
+            for another event.
+        :param int timeout:
+            How many milliseconds to wait for callback return a truthy value.
+        :returns:
+            A deferred that resolves with the value returned by the callback.
+        """
+        listener = _NotificationListener(None)
+
+        def timed_out():
+            listener.deferred.cancel()
+
+        timer = self.clock.call_later(timeout / 1000., timed_out)
+        while True:
+            listener.deferred = self.replication_deferred.observe()
+            result = yield callback()
+            if result:
+                break
+
+            try:
+                with PreserveLoggingContext():
+                    yield listener.deferred
+            except defer.CancelledError:
+                break
+
+        self.clock.cancel_call_later(timer, ignore_errs=True)
+
+        defer.returnValue(result)