diff options
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r-- | synapse/app/synchrotron.py | 90 |
1 files changed, 47 insertions, 43 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 5388def28a..858949910d 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -98,10 +98,7 @@ class SynchrotronPresence(object): self.notifier = hs.get_notifier() active_presence = self.store.take_presence_startup_info() - self.user_to_current_state = { - state.user_id: state - for state in active_presence - } + self.user_to_current_state = {state.user_id: state for state in active_presence} # user_id -> last_sync_ms. Lists the users that have stopped syncing # but we haven't notified the master of that yet @@ -196,17 +193,26 @@ class SynchrotronPresence(object): room_ids_to_states, users_to_states = parties self.notifier.on_new_event( - "presence_key", stream_id, rooms=room_ids_to_states.keys(), - users=users_to_states.keys() + "presence_key", + stream_id, + rooms=room_ids_to_states.keys(), + users=users_to_states.keys(), ) @defer.inlineCallbacks def process_replication_rows(self, token, rows): - states = [UserPresenceState( - row.user_id, row.state, row.last_active_ts, - row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg, - row.currently_active - ) for row in rows] + states = [ + UserPresenceState( + row.user_id, + row.state, + row.last_active_ts, + row.last_federation_update_ts, + row.last_user_sync_ts, + row.status_msg, + row.currently_active, + ) + for row in rows + ] for state in states: self.user_to_current_state[state.user_id] = state @@ -217,7 +223,8 @@ class SynchrotronPresence(object): def get_currently_syncing_users(self): if self.hs.config.use_presence: return [ - user_id for user_id, count in iteritems(self.user_to_num_current_syncs) + user_id + for user_id, count in iteritems(self.user_to_num_current_syncs) if count > 0 ] else: @@ -281,12 +288,14 @@ class SynchrotronServer(HomeServer): events.register_servlets(self, resource) InitialSyncRestServlet(self).register(resource) RoomInitialSyncRestServlet(self).register(resource) - resources.update({ - "/_matrix/client/r0": resource, - "/_matrix/client/unstable": resource, - "/_matrix/client/v2_alpha": resource, - "/_matrix/client/api/v1": resource, - }) + resources.update( + { + "/_matrix/client/r0": resource, + "/_matrix/client/unstable": resource, + "/_matrix/client/v2_alpha": resource, + "/_matrix/client/api/v1": resource, + } + ) root_resource = create_resource_tree(resources, NoResource()) @@ -299,7 +308,7 @@ class SynchrotronServer(HomeServer): listener_config, root_resource, self.version_string, - ) + ), ) logger.info("Synapse synchrotron now listening on port %d", port) @@ -313,18 +322,19 @@ class SynchrotronServer(HomeServer): listener["bind_addresses"], listener["port"], manhole( - username="matrix", - password="rabbithole", - globals={"hs": self}, - ) + username="matrix", password="rabbithole", globals={"hs": self} + ), ) elif listener["type"] == "metrics": if not self.get_config().enable_metrics: - logger.warn(("Metrics listener configured, but " - "enable_metrics is not True!")) + logger.warn( + ( + "Metrics listener configured, but " + "enable_metrics is not True!" + ) + ) else: - _base.listen_metrics(listener["bind_addresses"], - listener["port"]) + _base.listen_metrics(listener["bind_addresses"], listener["port"]) else: logger.warn("Unrecognized listener type: %s", listener["type"]) @@ -382,40 +392,36 @@ class SyncReplicationHandler(ReplicationClientHandler): ) elif stream_name == "push_rules": self.notifier.on_new_event( - "push_rules_key", token, users=[row.user_id for row in rows], + "push_rules_key", token, users=[row.user_id for row in rows] ) - elif stream_name in ("account_data", "tag_account_data",): + elif stream_name in ("account_data", "tag_account_data"): self.notifier.on_new_event( - "account_data_key", token, users=[row.user_id for row in rows], + "account_data_key", token, users=[row.user_id for row in rows] ) elif stream_name == "receipts": self.notifier.on_new_event( - "receipt_key", token, rooms=[row.room_id for row in rows], + "receipt_key", token, rooms=[row.room_id for row in rows] ) elif stream_name == "typing": self.typing_handler.process_replication_rows(token, rows) self.notifier.on_new_event( - "typing_key", token, rooms=[row.room_id for row in rows], + "typing_key", token, rooms=[row.room_id for row in rows] ) elif stream_name == "to_device": entities = [row.entity for row in rows if row.entity.startswith("@")] if entities: - self.notifier.on_new_event( - "to_device_key", token, users=entities, - ) + self.notifier.on_new_event("to_device_key", token, users=entities) elif stream_name == "device_lists": all_room_ids = set() for row in rows: room_ids = yield self.store.get_rooms_for_user(row.user_id) all_room_ids.update(room_ids) - self.notifier.on_new_event( - "device_list_key", token, rooms=all_room_ids, - ) + self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids) elif stream_name == "presence": yield self.presence_handler.process_replication_rows(token, rows) elif stream_name == "receipts": self.notifier.on_new_event( - "groups_key", token, users=[row.user_id for row in rows], + "groups_key", token, users=[row.user_id for row in rows] ) except Exception: logger.exception("Error processing replication") @@ -423,9 +429,7 @@ class SyncReplicationHandler(ReplicationClientHandler): def start(config_options): try: - config = HomeServerConfig.load_config( - "Synapse synchrotron", config_options - ) + config = HomeServerConfig.load_config("Synapse synchrotron", config_options) except ConfigError as e: sys.stderr.write("\n" + str(e) + "\n") sys.exit(1) @@ -453,6 +457,6 @@ def start(config_options): _base.start_worker_reactor("synapse-synchrotron", config) -if __name__ == '__main__': +if __name__ == "__main__": with LoggingContext("main"): start(sys.argv[1:]) |