diff --git a/synapse/notifier.py b/synapse/notifier.py
index 1a90bd55cd..560866b26e 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,7 +18,8 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
-from synapse.util.async import run_on_reactor, ObservableDeferred
+from synapse.util.async import ObservableDeferred
+from synapse.util.logcontext import PreserveLoggingContext
from synapse.types import StreamToken
import synapse.metrics
@@ -73,7 +74,8 @@ class _NotifierUserStream(object):
self.current_token = current_token
self.last_notified_ms = time_now_ms
- self.notify_deferred = ObservableDeferred(defer.Deferred())
+ with PreserveLoggingContext():
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
def notify(self, stream_key, stream_id, time_now_ms):
"""Notify any listeners for this user of a new event from an
@@ -88,8 +90,10 @@ class _NotifierUserStream(object):
)
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
- self.notify_deferred = ObservableDeferred(defer.Deferred())
- noify_deferred.callback(self.current_token)
+
+ with PreserveLoggingContext():
+ self.notify_deferred = ObservableDeferred(defer.Deferred())
+ noify_deferred.callback(self.current_token)
def remove(self, notifier):
""" Remove this listener from all the indexes in the Notifier
@@ -184,8 +188,6 @@ class Notifier(object):
lambda: count(bool, self.appservice_to_user_streams.values()),
)
- @log_function
- @defer.inlineCallbacks
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
""" Used by handlers to inform the notifier something has happened
@@ -199,12 +201,11 @@ class Notifier(object):
until all previous events have been persisted before notifying
the client streams.
"""
- yield run_on_reactor()
-
- self.pending_new_room_events.append((
- room_stream_id, event, extra_users
- ))
- self._notify_pending_new_room_events(max_room_stream_id)
+ with PreserveLoggingContext():
+ self.pending_new_room_events.append((
+ room_stream_id, event, extra_users
+ ))
+ self._notify_pending_new_room_events(max_room_stream_id)
def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous
@@ -251,31 +252,29 @@ class Notifier(object):
extra_streams=app_streams,
)
- @defer.inlineCallbacks
- @log_function
def on_new_event(self, stream_key, new_token, users=[], rooms=[],
extra_streams=set()):
""" Used to inform listeners that something has happend event wise.
Will wake up all listeners for the given users and rooms.
"""
- yield run_on_reactor()
- user_streams = set()
+ with PreserveLoggingContext():
+ user_streams = set()
- for user in users:
- user_stream = self.user_to_user_stream.get(str(user))
- if user_stream is not None:
- user_streams.add(user_stream)
+ for user in users:
+ user_stream = self.user_to_user_stream.get(str(user))
+ if user_stream is not None:
+ user_streams.add(user_stream)
- for room in rooms:
- user_streams |= self.room_to_user_streams.get(room, set())
+ for room in rooms:
+ user_streams |= self.room_to_user_streams.get(room, set())
- time_now_ms = self.clock.time_msec()
- for user_stream in user_streams:
- try:
- user_stream.notify(stream_key, new_token, time_now_ms)
- except:
- logger.exception("Failed to notify listener")
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
+ try:
+ user_stream.notify(stream_key, new_token, time_now_ms)
+ except:
+ logger.exception("Failed to notify listener")
@defer.inlineCallbacks
def wait_for_events(self, user_id, timeout, callback, room_ids=None,
@@ -325,7 +324,8 @@ class Notifier(object):
# that we don't miss any current_token updates.
prev_token = current_token
listener = user_stream.new_listener(prev_token)
- yield listener.deferred
+ with PreserveLoggingContext():
+ yield listener.deferred
except defer.CancelledError:
break
|