diff options
Diffstat (limited to '')
-rw-r--r-- | synapse/notifier.py | 70 |
1 files changed, 36 insertions, 34 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py index ff589660da..d398078eed 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -37,7 +37,8 @@ logger = logging.getLogger(__name__) notified_events_counter = Counter("synapse_notifier_notified_events", "") users_woken_by_stream_counter = Counter( - "synapse_notifier_users_woken_by_stream", "", ["stream"]) + "synapse_notifier_users_woken_by_stream", "", ["stream"] +) # TODO(paul): Should be shared somewhere @@ -55,6 +56,7 @@ class _NotificationListener(object): The events stream handler will have yielded to the deferred, so to notify the handler it is sufficient to resolve the deferred. """ + __slots__ = ["deferred"] def __init__(self, deferred): @@ -95,9 +97,7 @@ class _NotifierUserStream(object): stream_id(str): The new id for the stream the event came from. time_now_ms(int): The current time in milliseconds. """ - self.current_token = self.current_token.copy_and_advance( - stream_key, stream_id - ) + self.current_token = self.current_token.copy_and_advance(stream_key, stream_id) self.last_notified_token = self.current_token self.last_notified_ms = time_now_ms noify_deferred = self.notify_deferred @@ -141,6 +141,7 @@ class _NotifierUserStream(object): class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))): def __nonzero__(self): return bool(self.events) + __bool__ = __nonzero__ # python3 @@ -190,15 +191,17 @@ class Notifier(object): all_user_streams.add(x) return sum(stream.count_listeners() for stream in all_user_streams) + LaterGauge("synapse_notifier_listeners", "", [], count_listeners) LaterGauge( - "synapse_notifier_rooms", "", [], + "synapse_notifier_rooms", + "", + [], lambda: count(bool, list(self.room_to_user_streams.values())), ) LaterGauge( - "synapse_notifier_users", "", [], - lambda: len(self.user_to_user_stream), + "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream) ) def add_replication_callback(self, cb): @@ -209,8 +212,9 @@ class Notifier(object): """ self.replication_callbacks.append(cb) - def on_new_room_event(self, event, room_stream_id, max_room_stream_id, - extra_users=[]): + def on_new_room_event( + self, event, room_stream_id, max_room_stream_id, extra_users=[] + ): """ Used by handlers to inform the notifier something has happened in the room, room event wise. @@ -222,9 +226,7 @@ class Notifier(object): until all previous events have been persisted before notifying the client streams. """ - self.pending_new_room_events.append(( - room_stream_id, event, extra_users - )) + self.pending_new_room_events.append((room_stream_id, event, extra_users)) self._notify_pending_new_room_events(max_room_stream_id) self.notify_replication() @@ -240,9 +242,9 @@ class Notifier(object): self.pending_new_room_events = [] for room_stream_id, event, extra_users in pending: if room_stream_id > max_room_stream_id: - self.pending_new_room_events.append(( - room_stream_id, event, extra_users - )) + self.pending_new_room_events.append( + (room_stream_id, event, extra_users) + ) else: self._on_new_room_event(event, room_stream_id, extra_users) @@ -250,8 +252,7 @@ class Notifier(object): """Notify any user streams that are interested in this room event""" # poke any interested application service. run_as_background_process( - "notify_app_services", - self._notify_app_services, room_stream_id, + "notify_app_services", self._notify_app_services, room_stream_id ) if self.federation_sender: @@ -261,9 +262,7 @@ class Notifier(object): self._user_joined_room(event.state_key, event.room_id) self.on_new_event( - "room_key", room_stream_id, - users=extra_users, - rooms=[event.room_id], + "room_key", room_stream_id, users=extra_users, rooms=[event.room_id] ) @defer.inlineCallbacks @@ -305,8 +304,9 @@ class Notifier(object): self.notify_replication() @defer.inlineCallbacks - def wait_for_events(self, user_id, timeout, callback, room_ids=None, - from_token=StreamToken.START): + def wait_for_events( + self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START + ): """Wait until the callback returns a non empty response or the timeout fires. """ @@ -339,7 +339,7 @@ class Notifier(object): listener = user_stream.new_listener(prev_token) listener.deferred = timeout_deferred( listener.deferred, - (end_time - now) / 1000., + (end_time - now) / 1000.0, self.hs.get_reactor(), ) with PreserveLoggingContext(): @@ -368,9 +368,15 @@ class Notifier(object): defer.returnValue(result) @defer.inlineCallbacks - def get_events_for(self, user, pagination_config, timeout, - only_keys=None, - is_guest=False, explicit_room_id=None): + def get_events_for( + self, + user, + pagination_config, + timeout, + only_keys=None, + is_guest=False, + explicit_room_id=None, + ): """ For the given user and rooms, return any new events for them. If there are no new events wait for up to `timeout` milliseconds for any new events to happen before returning. @@ -419,10 +425,7 @@ class Notifier(object): if name == "room": new_events = yield filter_events_for_client( - self.store, - user.to_string(), - new_events, - is_peeking=is_peeking, + self.store, user.to_string(), new_events, is_peeking=is_peeking ) elif name == "presence": now = self.clock.time_msec() @@ -450,7 +453,8 @@ class Notifier(object): # # I am sorry for what I have done. user_id_for_stream = "_PEEKING_%s_%s" % ( - explicit_room_id, user_id_for_stream + explicit_room_id, + user_id_for_stream, ) result = yield self.wait_for_events( @@ -477,9 +481,7 @@ class Notifier(object): @defer.inlineCallbacks def _is_world_readable(self, room_id): state = yield self.state_handler.get_current_state( - room_id, - EventTypes.RoomHistoryVisibility, - "", + room_id, EventTypes.RoomHistoryVisibility, "" ) if state and "history_visibility" in state.content: defer.returnValue(state.content["history_visibility"] == "world_readable") |