summary refs log tree commit diff
path: root/synapse/notifier.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/notifier.py')
-rw-r--r--synapse/notifier.py70
1 files changed, 36 insertions, 34 deletions
diff --git a/synapse/notifier.py b/synapse/notifier.py
index ff589660da..d398078eed 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -37,7 +37,8 @@ logger = logging.getLogger(__name__)
 notified_events_counter = Counter("synapse_notifier_notified_events", "")
 
 users_woken_by_stream_counter = Counter(
-    "synapse_notifier_users_woken_by_stream", "", ["stream"])
+    "synapse_notifier_users_woken_by_stream", "", ["stream"]
+)
 
 
 # TODO(paul): Should be shared somewhere
@@ -55,6 +56,7 @@ class _NotificationListener(object):
     The events stream handler will have yielded to the deferred, so to
     notify the handler it is sufficient to resolve the deferred.
     """
+
     __slots__ = ["deferred"]
 
     def __init__(self, deferred):
@@ -95,9 +97,7 @@ class _NotifierUserStream(object):
             stream_id(str): The new id for the stream the event came from.
             time_now_ms(int): The current time in milliseconds.
         """
-        self.current_token = self.current_token.copy_and_advance(
-            stream_key, stream_id
-        )
+        self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
         self.last_notified_token = self.current_token
         self.last_notified_ms = time_now_ms
         noify_deferred = self.notify_deferred
@@ -141,6 +141,7 @@ class _NotifierUserStream(object):
 class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
     def __nonzero__(self):
         return bool(self.events)
+
     __bool__ = __nonzero__  # python3
 
 
@@ -190,15 +191,17 @@ class Notifier(object):
                 all_user_streams.add(x)
 
             return sum(stream.count_listeners() for stream in all_user_streams)
+
         LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
 
         LaterGauge(
-            "synapse_notifier_rooms", "", [],
+            "synapse_notifier_rooms",
+            "",
+            [],
             lambda: count(bool, list(self.room_to_user_streams.values())),
         )
         LaterGauge(
-            "synapse_notifier_users", "", [],
-            lambda: len(self.user_to_user_stream),
+            "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
         )
 
     def add_replication_callback(self, cb):
@@ -209,8 +212,9 @@ class Notifier(object):
         """
         self.replication_callbacks.append(cb)
 
-    def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
-                          extra_users=[]):
+    def on_new_room_event(
+        self, event, room_stream_id, max_room_stream_id, extra_users=[]
+    ):
         """ Used by handlers to inform the notifier something has happened
         in the room, room event wise.
 
@@ -222,9 +226,7 @@ class Notifier(object):
         until all previous events have been persisted before notifying
         the client streams.
         """
-        self.pending_new_room_events.append((
-            room_stream_id, event, extra_users
-        ))
+        self.pending_new_room_events.append((room_stream_id, event, extra_users))
         self._notify_pending_new_room_events(max_room_stream_id)
 
         self.notify_replication()
@@ -240,9 +242,9 @@ class Notifier(object):
         self.pending_new_room_events = []
         for room_stream_id, event, extra_users in pending:
             if room_stream_id > max_room_stream_id:
-                self.pending_new_room_events.append((
-                    room_stream_id, event, extra_users
-                ))
+                self.pending_new_room_events.append(
+                    (room_stream_id, event, extra_users)
+                )
             else:
                 self._on_new_room_event(event, room_stream_id, extra_users)
 
@@ -250,8 +252,7 @@ class Notifier(object):
         """Notify any user streams that are interested in this room event"""
         # poke any interested application service.
         run_as_background_process(
-            "notify_app_services",
-            self._notify_app_services, room_stream_id,
+            "notify_app_services", self._notify_app_services, room_stream_id
         )
 
         if self.federation_sender:
@@ -261,9 +262,7 @@ class Notifier(object):
             self._user_joined_room(event.state_key, event.room_id)
 
         self.on_new_event(
-            "room_key", room_stream_id,
-            users=extra_users,
-            rooms=[event.room_id],
+            "room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
         )
 
     @defer.inlineCallbacks
@@ -305,8 +304,9 @@ class Notifier(object):
         self.notify_replication()
 
     @defer.inlineCallbacks
-    def wait_for_events(self, user_id, timeout, callback, room_ids=None,
-                        from_token=StreamToken.START):
+    def wait_for_events(
+        self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START
+    ):
         """Wait until the callback returns a non empty response or the
         timeout fires.
         """
@@ -339,7 +339,7 @@ class Notifier(object):
                     listener = user_stream.new_listener(prev_token)
                     listener.deferred = timeout_deferred(
                         listener.deferred,
-                        (end_time - now) / 1000.,
+                        (end_time - now) / 1000.0,
                         self.hs.get_reactor(),
                     )
                     with PreserveLoggingContext():
@@ -368,9 +368,15 @@ class Notifier(object):
         defer.returnValue(result)
 
     @defer.inlineCallbacks
-    def get_events_for(self, user, pagination_config, timeout,
-                       only_keys=None,
-                       is_guest=False, explicit_room_id=None):
+    def get_events_for(
+        self,
+        user,
+        pagination_config,
+        timeout,
+        only_keys=None,
+        is_guest=False,
+        explicit_room_id=None,
+    ):
         """ For the given user and rooms, return any new events for them. If
         there are no new events wait for up to `timeout` milliseconds for any
         new events to happen before returning.
@@ -419,10 +425,7 @@ class Notifier(object):
 
                 if name == "room":
                     new_events = yield filter_events_for_client(
-                        self.store,
-                        user.to_string(),
-                        new_events,
-                        is_peeking=is_peeking,
+                        self.store, user.to_string(), new_events, is_peeking=is_peeking
                     )
                 elif name == "presence":
                     now = self.clock.time_msec()
@@ -450,7 +453,8 @@ class Notifier(object):
             #
             # I am sorry for what I have done.
             user_id_for_stream = "_PEEKING_%s_%s" % (
-                explicit_room_id, user_id_for_stream
+                explicit_room_id,
+                user_id_for_stream,
             )
 
         result = yield self.wait_for_events(
@@ -477,9 +481,7 @@ class Notifier(object):
     @defer.inlineCallbacks
     def _is_world_readable(self, room_id):
         state = yield self.state_handler.get_current_state(
-            room_id,
-            EventTypes.RoomHistoryVisibility,
-            "",
+            room_id, EventTypes.RoomHistoryVisibility, ""
         )
         if state and "history_visibility" in state.content:
             defer.returnValue(state.content["history_visibility"] == "world_readable")