diff --git a/synapse/notifier.py b/synapse/notifier.py
index ff589660da..d398078eed 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -37,7 +37,8 @@ logger = logging.getLogger(__name__)
notified_events_counter = Counter("synapse_notifier_notified_events", "")
users_woken_by_stream_counter = Counter(
- "synapse_notifier_users_woken_by_stream", "", ["stream"])
+ "synapse_notifier_users_woken_by_stream", "", ["stream"]
+)
# TODO(paul): Should be shared somewhere
@@ -55,6 +56,7 @@ class _NotificationListener(object):
The events stream handler will have yielded to the deferred, so to
notify the handler it is sufficient to resolve the deferred.
"""
+
__slots__ = ["deferred"]
def __init__(self, deferred):
@@ -95,9 +97,7 @@ class _NotifierUserStream(object):
stream_id(str): The new id for the stream the event came from.
time_now_ms(int): The current time in milliseconds.
"""
- self.current_token = self.current_token.copy_and_advance(
- stream_key, stream_id
- )
+ self.current_token = self.current_token.copy_and_advance(stream_key, stream_id)
self.last_notified_token = self.current_token
self.last_notified_ms = time_now_ms
noify_deferred = self.notify_deferred
@@ -141,6 +141,7 @@ class _NotifierUserStream(object):
class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
def __nonzero__(self):
return bool(self.events)
+
__bool__ = __nonzero__ # python3
@@ -190,15 +191,17 @@ class Notifier(object):
all_user_streams.add(x)
return sum(stream.count_listeners() for stream in all_user_streams)
+
LaterGauge("synapse_notifier_listeners", "", [], count_listeners)
LaterGauge(
- "synapse_notifier_rooms", "", [],
+ "synapse_notifier_rooms",
+ "",
+ [],
lambda: count(bool, list(self.room_to_user_streams.values())),
)
LaterGauge(
- "synapse_notifier_users", "", [],
- lambda: len(self.user_to_user_stream),
+ "synapse_notifier_users", "", [], lambda: len(self.user_to_user_stream)
)
def add_replication_callback(self, cb):
@@ -209,8 +212,9 @@ class Notifier(object):
"""
self.replication_callbacks.append(cb)
- def on_new_room_event(self, event, room_stream_id, max_room_stream_id,
- extra_users=[]):
+ def on_new_room_event(
+ self, event, room_stream_id, max_room_stream_id, extra_users=[]
+ ):
""" Used by handlers to inform the notifier something has happened
in the room, room event wise.
@@ -222,9 +226,7 @@ class Notifier(object):
until all previous events have been persisted before notifying
the client streams.
"""
- self.pending_new_room_events.append((
- room_stream_id, event, extra_users
- ))
+ self.pending_new_room_events.append((room_stream_id, event, extra_users))
self._notify_pending_new_room_events(max_room_stream_id)
self.notify_replication()
@@ -240,9 +242,9 @@ class Notifier(object):
self.pending_new_room_events = []
for room_stream_id, event, extra_users in pending:
if room_stream_id > max_room_stream_id:
- self.pending_new_room_events.append((
- room_stream_id, event, extra_users
- ))
+ self.pending_new_room_events.append(
+ (room_stream_id, event, extra_users)
+ )
else:
self._on_new_room_event(event, room_stream_id, extra_users)
@@ -250,8 +252,7 @@ class Notifier(object):
"""Notify any user streams that are interested in this room event"""
# poke any interested application service.
run_as_background_process(
- "notify_app_services",
- self._notify_app_services, room_stream_id,
+ "notify_app_services", self._notify_app_services, room_stream_id
)
if self.federation_sender:
@@ -261,9 +262,7 @@ class Notifier(object):
self._user_joined_room(event.state_key, event.room_id)
self.on_new_event(
- "room_key", room_stream_id,
- users=extra_users,
- rooms=[event.room_id],
+ "room_key", room_stream_id, users=extra_users, rooms=[event.room_id]
)
@defer.inlineCallbacks
@@ -305,8 +304,9 @@ class Notifier(object):
self.notify_replication()
@defer.inlineCallbacks
- def wait_for_events(self, user_id, timeout, callback, room_ids=None,
- from_token=StreamToken.START):
+ def wait_for_events(
+ self, user_id, timeout, callback, room_ids=None, from_token=StreamToken.START
+ ):
"""Wait until the callback returns a non empty response or the
timeout fires.
"""
@@ -339,7 +339,7 @@ class Notifier(object):
listener = user_stream.new_listener(prev_token)
listener.deferred = timeout_deferred(
listener.deferred,
- (end_time - now) / 1000.,
+ (end_time - now) / 1000.0,
self.hs.get_reactor(),
)
with PreserveLoggingContext():
@@ -368,9 +368,15 @@ class Notifier(object):
defer.returnValue(result)
@defer.inlineCallbacks
- def get_events_for(self, user, pagination_config, timeout,
- only_keys=None,
- is_guest=False, explicit_room_id=None):
+ def get_events_for(
+ self,
+ user,
+ pagination_config,
+ timeout,
+ only_keys=None,
+ is_guest=False,
+ explicit_room_id=None,
+ ):
""" For the given user and rooms, return any new events for them. If
there are no new events wait for up to `timeout` milliseconds for any
new events to happen before returning.
@@ -419,10 +425,7 @@ class Notifier(object):
if name == "room":
new_events = yield filter_events_for_client(
- self.store,
- user.to_string(),
- new_events,
- is_peeking=is_peeking,
+ self.store, user.to_string(), new_events, is_peeking=is_peeking
)
elif name == "presence":
now = self.clock.time_msec()
@@ -450,7 +453,8 @@ class Notifier(object):
#
# I am sorry for what I have done.
user_id_for_stream = "_PEEKING_%s_%s" % (
- explicit_room_id, user_id_for_stream
+ explicit_room_id,
+ user_id_for_stream,
)
result = yield self.wait_for_events(
@@ -477,9 +481,7 @@ class Notifier(object):
@defer.inlineCallbacks
def _is_world_readable(self, room_id):
state = yield self.state_handler.get_current_state(
- room_id,
- EventTypes.RoomHistoryVisibility,
- "",
+ room_id, EventTypes.RoomHistoryVisibility, ""
)
if state and "history_visibility" in state.content:
defer.returnValue(state.content["history_visibility"] == "world_readable")
|