diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 5388def28a..858949910d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -98,10 +98,7 @@ class SynchrotronPresence(object):
self.notifier = hs.get_notifier()
active_presence = self.store.take_presence_startup_info()
- self.user_to_current_state = {
- state.user_id: state
- for state in active_presence
- }
+ self.user_to_current_state = {state.user_id: state for state in active_presence}
# user_id -> last_sync_ms. Lists the users that have stopped syncing
# but we haven't notified the master of that yet
@@ -196,17 +193,26 @@ class SynchrotronPresence(object):
room_ids_to_states, users_to_states = parties
self.notifier.on_new_event(
- "presence_key", stream_id, rooms=room_ids_to_states.keys(),
- users=users_to_states.keys()
+ "presence_key",
+ stream_id,
+ rooms=room_ids_to_states.keys(),
+ users=users_to_states.keys(),
)
@defer.inlineCallbacks
def process_replication_rows(self, token, rows):
- states = [UserPresenceState(
- row.user_id, row.state, row.last_active_ts,
- row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg,
- row.currently_active
- ) for row in rows]
+ states = [
+ UserPresenceState(
+ row.user_id,
+ row.state,
+ row.last_active_ts,
+ row.last_federation_update_ts,
+ row.last_user_sync_ts,
+ row.status_msg,
+ row.currently_active,
+ )
+ for row in rows
+ ]
for state in states:
self.user_to_current_state[state.user_id] = state
@@ -217,7 +223,8 @@ class SynchrotronPresence(object):
def get_currently_syncing_users(self):
if self.hs.config.use_presence:
return [
- user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+ user_id
+ for user_id, count in iteritems(self.user_to_num_current_syncs)
if count > 0
]
else:
@@ -281,12 +288,14 @@ class SynchrotronServer(HomeServer):
events.register_servlets(self, resource)
InitialSyncRestServlet(self).register(resource)
RoomInitialSyncRestServlet(self).register(resource)
- resources.update({
- "/_matrix/client/r0": resource,
- "/_matrix/client/unstable": resource,
- "/_matrix/client/v2_alpha": resource,
- "/_matrix/client/api/v1": resource,
- })
+ resources.update(
+ {
+ "/_matrix/client/r0": resource,
+ "/_matrix/client/unstable": resource,
+ "/_matrix/client/v2_alpha": resource,
+ "/_matrix/client/api/v1": resource,
+ }
+ )
root_resource = create_resource_tree(resources, NoResource())
@@ -299,7 +308,7 @@ class SynchrotronServer(HomeServer):
listener_config,
root_resource,
self.version_string,
- )
+ ),
)
logger.info("Synapse synchrotron now listening on port %d", port)
@@ -313,18 +322,19 @@ class SynchrotronServer(HomeServer):
listener["bind_addresses"],
listener["port"],
manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- )
+ username="matrix", password="rabbithole", globals={"hs": self}
+ ),
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
- logger.warn(("Metrics listener configured, but "
- "enable_metrics is not True!"))
+ logger.warn(
+ (
+ "Metrics listener configured, but "
+ "enable_metrics is not True!"
+ )
+ )
else:
- _base.listen_metrics(listener["bind_addresses"],
- listener["port"])
+ _base.listen_metrics(listener["bind_addresses"], listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -382,40 +392,36 @@ class SyncReplicationHandler(ReplicationClientHandler):
)
elif stream_name == "push_rules":
self.notifier.on_new_event(
- "push_rules_key", token, users=[row.user_id for row in rows],
+ "push_rules_key", token, users=[row.user_id for row in rows]
)
- elif stream_name in ("account_data", "tag_account_data",):
+ elif stream_name in ("account_data", "tag_account_data"):
self.notifier.on_new_event(
- "account_data_key", token, users=[row.user_id for row in rows],
+ "account_data_key", token, users=[row.user_id for row in rows]
)
elif stream_name == "receipts":
self.notifier.on_new_event(
- "receipt_key", token, rooms=[row.room_id for row in rows],
+ "receipt_key", token, rooms=[row.room_id for row in rows]
)
elif stream_name == "typing":
self.typing_handler.process_replication_rows(token, rows)
self.notifier.on_new_event(
- "typing_key", token, rooms=[row.room_id for row in rows],
+ "typing_key", token, rooms=[row.room_id for row in rows]
)
elif stream_name == "to_device":
entities = [row.entity for row in rows if row.entity.startswith("@")]
if entities:
- self.notifier.on_new_event(
- "to_device_key", token, users=entities,
- )
+ self.notifier.on_new_event("to_device_key", token, users=entities)
elif stream_name == "device_lists":
all_room_ids = set()
for row in rows:
room_ids = yield self.store.get_rooms_for_user(row.user_id)
all_room_ids.update(room_ids)
- self.notifier.on_new_event(
- "device_list_key", token, rooms=all_room_ids,
- )
+ self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
elif stream_name == "presence":
yield self.presence_handler.process_replication_rows(token, rows)
elif stream_name == "receipts":
self.notifier.on_new_event(
- "groups_key", token, users=[row.user_id for row in rows],
+ "groups_key", token, users=[row.user_id for row in rows]
)
except Exception:
logger.exception("Error processing replication")
@@ -423,9 +429,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
def start(config_options):
try:
- config = HomeServerConfig.load_config(
- "Synapse synchrotron", config_options
- )
+ config = HomeServerConfig.load_config("Synapse synchrotron", config_options)
except ConfigError as e:
sys.stderr.write("\n" + str(e) + "\n")
sys.exit(1)
@@ -453,6 +457,6 @@ def start(config_options):
_base.start_worker_reactor("synapse-synchrotron", config)
-if __name__ == '__main__':
+if __name__ == "__main__":
with LoggingContext("main"):
start(sys.argv[1:])
|