diff --git a/synapse/notifier.py b/synapse/notifier.py
index e16a4608e9..abe12b1434 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.async import run_on_reactor
from synapse.types import StreamToken
import synapse.metrics
@@ -50,13 +51,9 @@ class _NotificationListener(object):
so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user, rooms, from_token, limit, timeout, deferred,
- appservice=None):
+ def __init__(self, user, rooms, deferred, appservice=None):
self.user = user
self.appservice = appservice
- self.from_token = from_token
- self.limit = limit
- self.timeout = timeout
self.deferred = deferred
self.rooms = rooms
self.timer = None
@@ -64,17 +61,14 @@ class _NotificationListener(object):
def notified(self):
return self.deferred.called
- def notify(self, notifier, events, start_token, end_token):
+ def notify(self, notifier):
""" Inform whoever is listening about the new events. This will
also remove this listener from all the indexes in the Notifier
it knows about.
"""
- result = (events, (start_token, end_token))
-
try:
- self.deferred.callback(result)
- notified_events_counter.inc_by(len(events))
+ self.deferred.callback(None)
except defer.AlreadyCalledError:
pass
@@ -161,6 +155,7 @@ class Notifier(object):
listening to the room, and any listeners for the users in the
`extra_users` param.
"""
+ yield run_on_reactor()
# poke any interested application service.
self.hs.get_handlers().appservice_handler.notify_interested_services(
event
@@ -168,8 +163,6 @@ class Notifier(object):
room_id = event.room_id
- room_source = self.event_sources.sources["room"]
-
room_listeners = self.room_to_listeners.get(room_id, set())
_discard_if_notified(room_listeners)
@@ -200,34 +193,12 @@ class Notifier(object):
logger.debug("on_new_room_event listeners %s", listeners)
- # TODO (erikj): Can we make this more efficient by hitting the
- # db once?
-
- @defer.inlineCallbacks
- def notify(listener):
- events, end_key = yield room_source.get_new_events_for_user(
- listener.user,
- listener.from_token.room_key,
- listener.limit,
- )
-
- if events:
- end_token = listener.from_token.copy_and_replace(
- "room_key", end_key
- )
-
- listener.notify(
- self, events, listener.from_token, end_token
- )
-
- def eb(failure):
- logger.exception("Failed to notify listener", failure)
-
with PreserveLoggingContext():
- yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners],
- consumeErrors=True,
- )
+ for listener in listeners:
+ try:
+ listener.notify(self)
+ except:
+ logger.exception("Failed to notify listener")
@defer.inlineCallbacks
@log_function
@@ -237,11 +208,7 @@ class Notifier(object):
Will wake up all listeners for the given users and rooms.
"""
- # TODO(paul): This is horrible, having to manually list every event
- # source here individually
- presence_source = self.event_sources.sources["presence"]
- typing_source = self.event_sources.sources["typing"]
-
+ yield run_on_reactor()
listeners = set()
for user in users:
@@ -258,51 +225,12 @@ class Notifier(object):
listeners |= room_listeners
- @defer.inlineCallbacks
- def notify(listener):
- presence_events, presence_end_key = (
- yield presence_source.get_new_events_for_user(
- listener.user,
- listener.from_token.presence_key,
- listener.limit,
- )
- )
- typing_events, typing_end_key = (
- yield typing_source.get_new_events_for_user(
- listener.user,
- listener.from_token.typing_key,
- listener.limit,
- )
- )
-
- if presence_events or typing_events:
- end_token = listener.from_token.copy_and_replace(
- "presence_key", presence_end_key
- ).copy_and_replace(
- "typing_key", typing_end_key
- )
-
- listener.notify(
- self,
- presence_events + typing_events,
- listener.from_token,
- end_token
- )
-
- def eb(failure):
- logger.error(
- "Failed to notify listener",
- exc_info=(
- failure.type,
- failure.value,
- failure.getTracebackObject())
- )
-
with PreserveLoggingContext():
- yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners],
- consumeErrors=True,
- )
+ for listener in listeners:
+ try:
+ listener.notify(self)
+ except:
+ logger.exception("Failed to notify listener")
@defer.inlineCallbacks
def wait_for_events(self, user, rooms, timeout, callback,
@@ -319,9 +247,6 @@ class Notifier(object):
listener = [_NotificationListener(
user=user,
rooms=rooms,
- from_token=from_token,
- limit=1,
- timeout=timeout,
deferred=deferred,
appservice=appservice,
)]
@@ -338,7 +263,7 @@ class Notifier(object):
def _timeout_listener():
timed_out[0] = True
timer[0] = None
- listener[0].notify(self, [], from_token, from_token)
+ listener[0].notify(self)
# We create multiple notification listeners so we have to manage
# canceling the timeout ourselves.
@@ -350,10 +275,8 @@ class Notifier(object):
listener[0] = _NotificationListener(
user=user,
rooms=rooms,
- from_token=from_token,
- limit=1,
- timeout=timeout,
deferred=deferred,
+ appservice=appservice,
)
self._register_with_keys(listener[0])
result = yield callback()
|