summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py130
1 files changed, 54 insertions, 76 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py

index f00cd8c588..054ca59ad2 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py
@@ -14,13 +14,15 @@ # limitations under the License. from twisted.internet import defer -from synapse.api.constants import EventTypes +from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.util.logutils import log_function from synapse.util.async import ObservableDeferred -from synapse.util.logcontext import PreserveLoggingContext +from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.metrics import Measure from synapse.types import StreamToken +from synapse.visibility import filter_events_for_client import synapse.metrics from collections import namedtuple @@ -66,10 +68,8 @@ class _NotifierUserStream(object): so that it can remove itself from the indexes in the Notifier class. """ - def __init__(self, user_id, rooms, current_token, time_now_ms, - appservice=None): + def __init__(self, user_id, rooms, current_token, time_now_ms): self.user_id = user_id - self.appservice = appservice self.rooms = set(rooms) self.current_token = current_token self.last_notified_ms = time_now_ms @@ -106,11 +106,6 @@ class _NotifierUserStream(object): notifier.user_to_user_stream.pop(self.user_id) - if self.appservice: - notifier.appservice_to_user_streams.get( - self.appservice, set() - ).discard(self) - def count_listeners(self): return len(self.notify_deferred.observers()) @@ -139,21 +134,22 @@ class Notifier(object): UNUSED_STREAM_EXPIRY_MS = 10 * 60 * 1000 def __init__(self, hs): - self.hs = hs - self.user_to_user_stream = {} self.room_to_user_streams = {} - self.appservice_to_user_streams = {} self.event_sources = hs.get_event_sources() self.store = hs.get_datastore() self.pending_new_room_events = [] self.clock = hs.get_clock() + self.appservice_handler = hs.get_application_service_handler() - hs.get_distributor().observe( - "user_joined_room", self._user_joined_room - ) + if hs.should_send_federation(): + self.federation_sender = hs.get_federation_sender() + else: + self.federation_sender = None + + self.state_handler = hs.get_state_handler() self.clock.looping_call( self.remove_expired_streams, self.UNUSED_STREAM_EXPIRY_MS @@ -171,8 +167,6 @@ class Notifier(object): all_user_streams |= x for x in self.user_to_user_stream.values(): all_user_streams.add(x) - for x in self.appservice_to_user_streams.values(): - all_user_streams |= x return sum(stream.count_listeners() for stream in all_user_streams) metrics.register_callback("listeners", count_listeners) @@ -185,11 +179,8 @@ class Notifier(object): "users", lambda: len(self.user_to_user_stream), ) - metrics.register_callback( - "appservices", - lambda: count(bool, self.appservice_to_user_streams.values()), - ) + @preserve_fn def on_new_room_event(self, event, room_stream_id, max_room_stream_id, extra_users=[]): """ Used by handlers to inform the notifier something has happened @@ -211,6 +202,7 @@ class Notifier(object): self.notify_replication() + @preserve_fn def _notify_pending_new_room_events(self, max_room_stream_id): """Notify for the room events that were queued waiting for a previous event to be persisted. @@ -228,60 +220,52 @@ class Notifier(object): else: self._on_new_room_event(event, room_stream_id, extra_users) + @preserve_fn def _on_new_room_event(self, event, room_stream_id, extra_users=[]): """Notify any user streams that are interested in this room event""" # poke any interested application service. - self.hs.get_handlers().appservice_handler.notify_interested_services( - event - ) + self.appservice_handler.notify_interested_services(room_stream_id) - app_streams = set() - - for appservice in self.appservice_to_user_streams: - # TODO (kegan): Redundant appservice listener checks? - # App services will already be in the room_to_user_streams set, but - # that isn't enough. They need to be checked here in order to - # receive *invites* for users they are interested in. Does this - # make the room_to_user_streams check somewhat obselete? - if appservice.is_interested(event): - app_user_streams = self.appservice_to_user_streams.get( - appservice, set() - ) - app_streams |= app_user_streams + if self.federation_sender: + self.federation_sender.notify_new_events(room_stream_id) + + if event.type == EventTypes.Member and event.membership == Membership.JOIN: + self._user_joined_room(event.state_key, event.room_id) self.on_new_event( "room_key", room_stream_id, users=extra_users, rooms=[event.room_id], - extra_streams=app_streams, ) - def on_new_event(self, stream_key, new_token, users=[], rooms=[], - extra_streams=set()): + @preserve_fn + def on_new_event(self, stream_key, new_token, users=[], rooms=[]): """ Used to inform listeners that something has happend event wise. Will wake up all listeners for the given users and rooms. """ with PreserveLoggingContext(): - user_streams = set() + with Measure(self.clock, "on_new_event"): + user_streams = set() - for user in users: - user_stream = self.user_to_user_stream.get(str(user)) - if user_stream is not None: - user_streams.add(user_stream) + for user in users: + user_stream = self.user_to_user_stream.get(str(user)) + if user_stream is not None: + user_streams.add(user_stream) - for room in rooms: - user_streams |= self.room_to_user_streams.get(room, set()) + for room in rooms: + user_streams |= self.room_to_user_streams.get(room, set()) - time_now_ms = self.clock.time_msec() - for user_stream in user_streams: - try: - user_stream.notify(stream_key, new_token, time_now_ms) - except: - logger.exception("Failed to notify listener") + time_now_ms = self.clock.time_msec() + for user_stream in user_streams: + try: + user_stream.notify(stream_key, new_token, time_now_ms) + except: + logger.exception("Failed to notify listener") - self.notify_replication() + self.notify_replication() + @preserve_fn def on_new_replication_data(self): """Used to inform replication listeners that something has happend without waking up any of the normal user event streams""" @@ -296,7 +280,6 @@ class Notifier(object): """ user_stream = self.user_to_user_stream.get(user_id) if user_stream is None: - appservice = yield self.store.get_app_service_by_user_id(user_id) current_token = yield self.event_sources.get_current_token() if room_ids is None: rooms = yield self.store.get_rooms_for_user(user_id) @@ -304,7 +287,6 @@ class Notifier(object): user_stream = _NotifierUserStream( user_id=user_id, rooms=room_ids, - appservice=appservice, current_token=current_token, time_now_ms=self.clock.time_msec(), ) @@ -398,8 +380,8 @@ class Notifier(object): ) if name == "room": - room_member_handler = self.hs.get_handlers().room_member_handler - new_events = yield room_member_handler._filter_events_for_client( + new_events = yield filter_events_for_client( + self.store, user.to_string(), new_events, is_peeking=is_peeking, @@ -448,9 +430,10 @@ class Notifier(object): @defer.inlineCallbacks def _is_world_readable(self, room_id): - state = yield self.hs.get_state_handler().get_current_state( + state = yield self.state_handler.get_current_state( room_id, - EventTypes.RoomHistoryVisibility + EventTypes.RoomHistoryVisibility, + "", ) if state and "history_visibility" in state.content: defer.returnValue(state.content["history_visibility"] == "world_readable") @@ -479,14 +462,8 @@ class Notifier(object): s = self.room_to_user_streams.setdefault(room, set()) s.add(user_stream) - if user_stream.appservice: - self.appservice_to_user_stream.setdefault( - user_stream.appservice, set() - ).add(user_stream) - - def _user_joined_room(self, user, room_id): - user = str(user) - new_user_stream = self.user_to_user_stream.get(user) + def _user_joined_room(self, user_id, room_id): + new_user_stream = self.user_to_user_stream.get(user_id) if new_user_stream is not None: room_streams = self.room_to_user_streams.setdefault(room_id, set()) room_streams.add(new_user_stream) @@ -503,13 +480,14 @@ class Notifier(object): def wait_for_replication(self, callback, timeout): """Wait for an event to happen. - :param callback: - Gets called whenever an event happens. If this returns a truthy - value then ``wait_for_replication`` returns, otherwise it waits - for another event. - :param int timeout: - How many milliseconds to wait for callback return a truthy value. - :returns: + Args: + callback: Gets called whenever an event happens. If this returns a + truthy value then ``wait_for_replication`` returns, otherwise + it waits for another event. + timeout: How many milliseconds to wait for callback return a truthy + value. + + Returns: A deferred that resolves with the value returned by the callback. """ listener = _NotificationListener(None)