diff --git a/synapse/notifier.py b/synapse/notifier.py
index 3aec1d4af2..df13e8ddb6 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -18,6 +18,7 @@ from twisted.internet import defer
from synapse.util.logutils import log_function
from synapse.util.logcontext import PreserveLoggingContext
from synapse.util.async import run_on_reactor
+from synapse.types import StreamToken
import logging
@@ -35,8 +36,10 @@ class _NotificationListener(object):
so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user, rooms, from_token, limit, timeout, deferred):
+ def __init__(self, user, rooms, from_token, limit, timeout, deferred,
+ appservice=None):
self.user = user
+ self.appservice = appservice
self.from_token = from_token
self.limit = limit
self.timeout = timeout
@@ -60,10 +63,14 @@ class _NotificationListener(object):
pass
for room in self.rooms:
- lst = notifier.rooms_to_listeners.get(room, set())
+ lst = notifier.room_to_listeners.get(room, set())
lst.discard(self)
notifier.user_to_listeners.get(self.user, set()).discard(self)
+ if self.appservice:
+ notifier.appservice_to_listeners.get(
+ self.appservice, set()
+ ).discard(self)
class Notifier(object):
@@ -76,8 +83,9 @@ class Notifier(object):
def __init__(self, hs):
self.hs = hs
- self.rooms_to_listeners = {}
+ self.room_to_listeners = {}
self.user_to_listeners = {}
+ self.appservice_to_listeners = {}
self.event_sources = hs.get_event_sources()
@@ -98,15 +106,32 @@ class Notifier(object):
`extra_users` param.
"""
yield run_on_reactor()
+
+ # poke any interested application service.
+ self.hs.get_handlers().appservice_handler.notify_interested_services(
+ event
+ )
+
room_id = event.room_id
room_source = self.event_sources.sources["room"]
- listeners = self.rooms_to_listeners.get(room_id, set()).copy()
+ listeners = self.room_to_listeners.get(room_id, set()).copy()
for user in extra_users:
listeners |= self.user_to_listeners.get(user, set()).copy()
+ for appservice in self.appservice_to_listeners:
+ # TODO (kegan): Redundant appservice listener checks?
+ # App services will already be in the room_to_listeners set, but
+ # that isn't enough. They need to be checked here in order to
+ # receive *invites* for users they are interested in. Does this
+ # make the room_to_listeners check somewhat obselete?
+ if appservice.is_interested(event):
+ listeners |= self.appservice_to_listeners.get(
+ appservice, set()
+ ).copy()
+
logger.debug("on_new_room_event listeners %s", listeners)
# TODO (erikj): Can we make this more efficient by hitting the
@@ -134,7 +159,8 @@ class Notifier(object):
with PreserveLoggingContext():
yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners]
+ [notify(l).addErrback(eb) for l in listeners],
+ consumeErrors=True,
)
@defer.inlineCallbacks
@@ -158,7 +184,7 @@ class Notifier(object):
listeners |= self.user_to_listeners.get(user, set()).copy()
for room in rooms:
- listeners |= self.rooms_to_listeners.get(room, set()).copy()
+ listeners |= self.room_to_listeners.get(room, set()).copy()
@defer.inlineCallbacks
def notify(listener):
@@ -202,9 +228,57 @@ class Notifier(object):
with PreserveLoggingContext():
yield defer.DeferredList(
- [notify(l).addErrback(eb) for l in listeners]
+ [notify(l).addErrback(eb) for l in listeners],
+ consumeErrors=True,
)
+ @defer.inlineCallbacks
+ def wait_for_events(self, user, rooms, filter, timeout, callback):
+ """Wait until the callback returns a non empty response or the
+ timeout fires.
+ """
+
+ deferred = defer.Deferred()
+
+ from_token = StreamToken("s0", "0", "0")
+
+ listener = [_NotificationListener(
+ user=user,
+ rooms=rooms,
+ from_token=from_token,
+ limit=1,
+ timeout=timeout,
+ deferred=deferred,
+ )]
+
+ if timeout:
+ self._register_with_keys(listener[0])
+
+ result = yield callback()
+ if timeout:
+ timed_out = [False]
+
+ def _timeout_listener():
+ timed_out[0] = True
+ listener[0].notify(self, [], from_token, from_token)
+
+ self.clock.call_later(timeout/1000., _timeout_listener)
+ while not result and not timed_out[0]:
+ yield deferred
+ deferred = defer.Deferred()
+ listener[0] = _NotificationListener(
+ user=user,
+ rooms=rooms,
+ from_token=from_token,
+ limit=1,
+ timeout=timeout,
+ deferred=deferred,
+ )
+ self._register_with_keys(listener[0])
+ result = yield callback()
+
+ defer.returnValue(result)
+
def get_events_for(self, user, rooms, pagination_config, timeout):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
@@ -224,6 +298,10 @@ class Notifier(object):
if not from_token:
from_token = yield self.event_sources.get_current_token()
+ appservice = yield self.hs.get_datastore().get_app_service_by_user_id(
+ user.to_string()
+ )
+
listener = _NotificationListener(
user,
rooms,
@@ -231,6 +309,7 @@ class Notifier(object):
limit,
timeout,
deferred,
+ appservice=appservice
)
def _timeout_listener():
@@ -258,11 +337,16 @@ class Notifier(object):
@log_function
def _register_with_keys(self, listener):
for room in listener.rooms:
- s = self.rooms_to_listeners.setdefault(room, set())
+ s = self.room_to_listeners.setdefault(room, set())
s.add(listener)
self.user_to_listeners.setdefault(listener.user, set()).add(listener)
+ if listener.appservice:
+ self.appservice_to_listeners.setdefault(
+ listener.appservice, set()
+ ).add(listener)
+
@defer.inlineCallbacks
@log_function
def _check_for_updates(self, listener):
@@ -296,5 +380,5 @@ class Notifier(object):
def _user_joined_room(self, user, room_id):
new_listeners = self.user_to_listeners.get(user, set())
- listeners = self.rooms_to_listeners.setdefault(room_id, set())
+ listeners = self.room_to_listeners.setdefault(room_id, set())
listeners |= new_listeners
|