diff --git a/synapse/notifier.py b/synapse/notifier.py
index f00cd8c588..054ca59ad2 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -14,13 +14,15 @@
# limitations under the License.
from twisted.internet import defer
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import AuthError
from synapse.util.logutils import log_function
from synapse.util.async import ObservableDeferred
-from synapse.util.logcontext import PreserveLoggingContext
+from synapse.util.logcontext import PreserveLoggingContext, preserve_fn
+from synapse.util.metrics import Measure
from synapse.types import StreamToken
+from synapse.visibility import filter_events_for_client
import synapse.metrics
from collections import namedtuple
@@ -66,10 +68,8 @@ class _NotifierUserStream(object):
so that it can remove itself from the indexes in the Notifier class.
"""
- def __init__(self, user_id, rooms, current_token, time_now_ms,
- appservice=None):
+ def __init__(self, user_id, rooms, current_token, time_now_ms):
self.user_id = user_id
- self.appservice = appservice
self.rooms = set(rooms)
self.current_token = current_token
self.last_notified_ms = time_now_ms
@@ -106,11 +106,6 @@ class _NotifierUserStream(object):
notifier.user_to_user_stream.pop(self.user_id)
- if self.appservice:
- notifier.appservice_to_user_streams.get(
- self.appservice, set()
- ).discard(self)
-
def count_listeners(self):
return len(self.notify_deferred.observers())
@@ -139,21 +134,22 @@ class Notifier(object):
UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000
def __init__(self, hs):
- self.hs = hs
-
self.user_to_user_stream = {}
self.room_to_user_streams = {}
- self.appservice_to_user_streams = {}
self.event_sources = hs.get_event_sources()
self.store = hs.get_datastore()
self.pending_new_room_events = []
self.clock = hs.get_clock()
+ self.appservice_handler = hs.get_application_service_handler()
- hs.get_distributor().observe(
- "user_joined_room", self._user_joined_room
- )
+ if hs.should_send_federation():
+ self.federation_sender = hs.get_federation_sender()
+ else:
+ self.federation_sender = None
+
+ self.state_handler = hs.get_state_handler()
self.clock.looping_call(
self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS
@@ -171,8 +167,6 @@ class Notifier(object):
all_user_streams |= x
for x in self.user_to_user_stream.values():
all_user_streams.add(x)
- for x in self.appservice_to_user_streams.values():
- all_user_streams |= x
return sum(stream.count_listeners() for stream in all_user_streams)
metrics.register_callback("listeners", count_listeners)
@@ -185,11 +179,8 @@ class Notifier(object):
"users",
lambda: len(self.user_to_user_stream),
)
- metrics.register_callback(
- "appservices",
- lambda: count(bool, self.appservice_to_user_streams.values()),
- )
+ @preserve_fn
def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
extra_users=[]):
""" Used by handlers to inform the notifier something has happened
@@ -211,6 +202,7 @@ class Notifier(object):
self.notify_replication()
+ @preserve_fn
def _notify_pending_new_room_events(self, max_room_stream_id):
"""Notify for the room events that were queued waiting for a previous
event to be persisted.
@@ -228,60 +220,52 @@ class Notifier(object):
else:
self._on_new_room_event(event, room_stream_id, extra_users)
+ @preserve_fn
def _on_new_room_event(self, event, room_stream_id, extra_users=[]):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
- self.hs.get_handlers().appservice_handler.notify_interested_services(
- event
- )
+ self.appservice_handler.notify_interested_services(room_stream_id)
- app_streams = set()
-
- for appservice in self.appservice_to_user_streams:
- # TODO (kegan): Redundant appservice listener checks?
- # App services will already be in the room_to_user_streams set, but
- # that isn't enough. They need to be checked here in order to
- # receive *invites* for users they are interested in. Does this
- # make the room_to_user_streams check somewhat obselete?
- if appservice.is_interested(event):
- app_user_streams = self.appservice_to_user_streams.get(
- appservice, set()
- )
- app_streams |= app_user_streams
+ if self.federation_sender:
+ self.federation_sender.notify_new_events(room_stream_id)
+
+ if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+ self._user_joined_room(event.state_key, event.room_id)
self.on_new_event(
"room_key", room_stream_id,
users=extra_users,
rooms=[event.room_id],
- extra_streams=app_streams,
)
- def on_new_event(self, stream_key, new_token, users=[], rooms=[],
- extra_streams=set()):
+ @preserve_fn
+ def on_new_event(self, stream_key, new_token, users=[], rooms=[]):
""" Used to inform listeners that something has happend event wise.
Will wake up all listeners for the given users and rooms.
"""
with PreserveLoggingContext():
- user_streams = set()
+ with Measure(self.clock, "on_new_event"):
+ user_streams = set()
- for user in users:
- user_stream = self.user_to_user_stream.get(str(user))
- if user_stream is not None:
- user_streams.add(user_stream)
+ for user in users:
+ user_stream = self.user_to_user_stream.get(str(user))
+ if user_stream is not None:
+ user_streams.add(user_stream)
- for room in rooms:
- user_streams |= self.room_to_user_streams.get(room, set())
+ for room in rooms:
+ user_streams |= self.room_to_user_streams.get(room, set())
- time_now_ms = self.clock.time_msec()
- for user_stream in user_streams:
- try:
- user_stream.notify(stream_key, new_token, time_now_ms)
- except:
- logger.exception("Failed to notify listener")
+ time_now_ms = self.clock.time_msec()
+ for user_stream in user_streams:
+ try:
+ user_stream.notify(stream_key, new_token, time_now_ms)
+ except:
+ logger.exception("Failed to notify listener")
- self.notify_replication()
+ self.notify_replication()
+ @preserve_fn
def on_new_replication_data(self):
"""Used to inform replication listeners that something has happend
without waking up any of the normal user event streams"""
@@ -296,7 +280,6 @@ class Notifier(object):
"""
user_stream = self.user_to_user_stream.get(user_id)
if user_stream is None:
- appservice = yield self.store.get_app_service_by_user_id(user_id)
current_token = yield self.event_sources.get_current_token()
if room_ids is None:
rooms = yield self.store.get_rooms_for_user(user_id)
@@ -304,7 +287,6 @@ class Notifier(object):
user_stream = _NotifierUserStream(
user_id=user_id,
rooms=room_ids,
- appservice=appservice,
current_token=current_token,
time_now_ms=self.clock.time_msec(),
)
@@ -398,8 +380,8 @@ class Notifier(object):
)
if name == "room":
- room_member_handler = self.hs.get_handlers().room_member_handler
- new_events = yield room_member_handler._filter_events_for_client(
+ new_events = yield filter_events_for_client(
+ self.store,
user.to_string(),
new_events,
is_peeking=is_peeking,
@@ -448,9 +430,10 @@ class Notifier(object):
@defer.inlineCallbacks
def _is_world_readable(self, room_id):
- state = yield self.hs.get_state_handler().get_current_state(
+ state = yield self.state_handler.get_current_state(
room_id,
- EventTypes.RoomHistoryVisibility
+ EventTypes.RoomHistoryVisibility,
+ "",
)
if state and "history_visibility" in state.content:
defer.returnValue(state.content["history_visibility"] == "world_readable")
@@ -479,14 +462,8 @@ class Notifier(object):
s = self.room_to_user_streams.setdefault(room, set())
s.add(user_stream)
- if user_stream.appservice:
- self.appservice_to_user_stream.setdefault(
- user_stream.appservice, set()
- ).add(user_stream)
-
- def _user_joined_room(self, user, room_id):
- user = str(user)
- new_user_stream = self.user_to_user_stream.get(user)
+ def _user_joined_room(self, user_id, room_id):
+ new_user_stream = self.user_to_user_stream.get(user_id)
if new_user_stream is not None:
room_streams = self.room_to_user_streams.setdefault(room_id, set())
room_streams.add(new_user_stream)
@@ -503,13 +480,14 @@ class Notifier(object):
def wait_for_replication(self, callback, timeout):
"""Wait for an event to happen.
- :param callback:
- Gets called whenever an event happens. If this returns a truthy
- value then ``wait_for_replication`` returns, otherwise it waits
- for another event.
- :param int timeout:
- How many milliseconds to wait for callback return a truthy value.
- :returns:
+ Args:
+ callback: Gets called whenever an event happens. If this returns a
+ truthy value then ``wait_for_replication`` returns, otherwise
+ it waits for another event.
+ timeout: How many milliseconds to wait for callback return a truthy
+ value.
+
+ Returns:
A deferred that resolves with the value returned by the callback.
"""
listener = _NotificationListener(None)
|