diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7c03198313..341a516da2 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -992,7 +992,7 @@ class PresenceHandler(BaseHandler):
room_ids([str]): List of room_ids to notify.
"""
with PreserveLoggingContext():
- self.notifier.on_new_user_event(
+ self.notifier.on_new_event(
"presence_key",
self._user_cachemap_latest_serial,
users_to_push,
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index f3f7050633..f0d12d35f4 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -88,7 +88,7 @@ class ReceiptsHandler(BaseHandler):
self._latest_serial = max(self._latest_serial, stream_id)
with PreserveLoggingContext():
- self.notifier.on_new_user_event(
+ self.notifier.on_new_event(
"recei[t_key", self._latest_serial, rooms=[room_id]
)
@@ -102,7 +102,7 @@ class ReceiptsHandler(BaseHandler):
receipt["remotedomains"] = remotedomains
- self.notifier.on_new_user_event(
+ self.notifier.on_new_event(
"receipt_key", self._latest_room_serial, rooms=[room_id]
)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index a9895292c2..026bd2b9d4 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -218,7 +218,7 @@ class TypingNotificationHandler(BaseHandler):
self._room_serials[room_id] = self._latest_room_serial
with PreserveLoggingContext():
- self.notifier.on_new_user_event(
+ self.notifier.on_new_event(
"typing_key", self._latest_room_serial, rooms=[room_id]
)
diff --git a/synapse/notifier.py b/synapse/notifier.py
index f13164dbdc..85ae343135 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -221,16 +221,7 @@ class Notifier(object):
event
)
- room_id = event.room_id
-
- room_user_streams = self.room_to_user_streams.get(room_id, set())
-
- user_streams = room_user_streams.copy()
-
- for user in extra_users:
- user_stream = self.user_to_user_stream.get(str(user))
- if user_stream is not None:
- user_streams.add(user_stream)
+ app_streams = set()
for appservice in self.appservice_to_user_streams:
# TODO (kegan): Redundant appservice listener checks?
@@ -242,24 +233,20 @@ class Notifier(object):
app_user_streams = self.appservice_to_user_streams.get(
appservice, set()
)
- user_streams |= app_user_streams
-
- logger.debug("on_new_room_event listeners %s", user_streams)
+ app_streams |= app_user_streams
- time_now_ms = self.clock.time_msec()
- for user_stream in user_streams:
- try:
- user_stream.notify(
- "room_key", "s%d" % (room_stream_id,), time_now_ms
- )
- except:
- logger.exception("Failed to notify listener")
+ self.on_new_event(
+ "room_key", room_stream_id,
+ users=extra_users,
+ rooms=[event.room_id],
+ extra_streams=app_streams,
+ )
@defer.inlineCallbacks
@log_function
- def on_new_user_event(self, stream_key, new_token, users=[], rooms=[]):
- """ Used to inform listeners that something has happend
- presence/user event wise.
+ def on_new_event(self, stream_key, new_token, users=[], rooms=[],
+ extra_streams=set()):
+ """ Used to inform listeners that something has happend event wise.
Will wake up all listeners for the given users and rooms.
"""
|