summary refs log tree commit diff
path: root/synapse/app/synchrotron.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/synchrotron.py')
-rw-r--r--synapse/app/synchrotron.py90
1 files changed, 47 insertions, 43 deletions
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index 5388def28a..858949910d 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -98,10 +98,7 @@ class SynchrotronPresence(object):
         self.notifier = hs.get_notifier()
 
         active_presence = self.store.take_presence_startup_info()
-        self.user_to_current_state = {
-            state.user_id: state
-            for state in active_presence
-        }
+        self.user_to_current_state = {state.user_id: state for state in active_presence}
 
         # user_id -> last_sync_ms. Lists the users that have stopped syncing
         # but we haven't notified the master of that yet
@@ -196,17 +193,26 @@ class SynchrotronPresence(object):
         room_ids_to_states, users_to_states = parties
 
         self.notifier.on_new_event(
-            "presence_key", stream_id, rooms=room_ids_to_states.keys(),
-            users=users_to_states.keys()
+            "presence_key",
+            stream_id,
+            rooms=room_ids_to_states.keys(),
+            users=users_to_states.keys(),
         )
 
     @defer.inlineCallbacks
     def process_replication_rows(self, token, rows):
-        states = [UserPresenceState(
-            row.user_id, row.state, row.last_active_ts,
-            row.last_federation_update_ts, row.last_user_sync_ts, row.status_msg,
-            row.currently_active
-        ) for row in rows]
+        states = [
+            UserPresenceState(
+                row.user_id,
+                row.state,
+                row.last_active_ts,
+                row.last_federation_update_ts,
+                row.last_user_sync_ts,
+                row.status_msg,
+                row.currently_active,
+            )
+            for row in rows
+        ]
 
         for state in states:
             self.user_to_current_state[state.user_id] = state
@@ -217,7 +223,8 @@ class SynchrotronPresence(object):
     def get_currently_syncing_users(self):
         if self.hs.config.use_presence:
             return [
-                user_id for user_id, count in iteritems(self.user_to_num_current_syncs)
+                user_id
+                for user_id, count in iteritems(self.user_to_num_current_syncs)
                 if count > 0
             ]
         else:
@@ -281,12 +288,14 @@ class SynchrotronServer(HomeServer):
                     events.register_servlets(self, resource)
                     InitialSyncRestServlet(self).register(resource)
                     RoomInitialSyncRestServlet(self).register(resource)
-                    resources.update({
-                        "/_matrix/client/r0": resource,
-                        "/_matrix/client/unstable": resource,
-                        "/_matrix/client/v2_alpha": resource,
-                        "/_matrix/client/api/v1": resource,
-                    })
+                    resources.update(
+                        {
+                            "/_matrix/client/r0": resource,
+                            "/_matrix/client/unstable": resource,
+                            "/_matrix/client/v2_alpha": resource,
+                            "/_matrix/client/api/v1": resource,
+                        }
+                    )
 
         root_resource = create_resource_tree(resources, NoResource())
 
@@ -299,7 +308,7 @@ class SynchrotronServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse synchrotron now listening on port %d", port)
@@ -313,18 +322,19 @@ class SynchrotronServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -382,40 +392,36 @@ class SyncReplicationHandler(ReplicationClientHandler):
                     )
             elif stream_name == "push_rules":
                 self.notifier.on_new_event(
-                    "push_rules_key", token, users=[row.user_id for row in rows],
+                    "push_rules_key", token, users=[row.user_id for row in rows]
                 )
-            elif stream_name in ("account_data", "tag_account_data",):
+            elif stream_name in ("account_data", "tag_account_data"):
                 self.notifier.on_new_event(
-                    "account_data_key", token, users=[row.user_id for row in rows],
+                    "account_data_key", token, users=[row.user_id for row in rows]
                 )
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
-                    "receipt_key", token, rooms=[row.room_id for row in rows],
+                    "receipt_key", token, rooms=[row.room_id for row in rows]
                 )
             elif stream_name == "typing":
                 self.typing_handler.process_replication_rows(token, rows)
                 self.notifier.on_new_event(
-                    "typing_key", token, rooms=[row.room_id for row in rows],
+                    "typing_key", token, rooms=[row.room_id for row in rows]
                 )
             elif stream_name == "to_device":
                 entities = [row.entity for row in rows if row.entity.startswith("@")]
                 if entities:
-                    self.notifier.on_new_event(
-                        "to_device_key", token, users=entities,
-                    )
+                    self.notifier.on_new_event("to_device_key", token, users=entities)
             elif stream_name == "device_lists":
                 all_room_ids = set()
                 for row in rows:
                     room_ids = yield self.store.get_rooms_for_user(row.user_id)
                     all_room_ids.update(room_ids)
-                self.notifier.on_new_event(
-                    "device_list_key", token, rooms=all_room_ids,
-                )
+                self.notifier.on_new_event("device_list_key", token, rooms=all_room_ids)
             elif stream_name == "presence":
                 yield self.presence_handler.process_replication_rows(token, rows)
             elif stream_name == "receipts":
                 self.notifier.on_new_event(
-                    "groups_key", token, users=[row.user_id for row in rows],
+                    "groups_key", token, users=[row.user_id for row in rows]
                 )
         except Exception:
             logger.exception("Error processing replication")
@@ -423,9 +429,7 @@ class SyncReplicationHandler(ReplicationClientHandler):
 
 def start(config_options):
     try:
-        config = HomeServerConfig.load_config(
-            "Synapse synchrotron", config_options
-        )
+        config = HomeServerConfig.load_config("Synapse synchrotron", config_options)
     except ConfigError as e:
         sys.stderr.write("\n" + str(e) + "\n")
         sys.exit(1)
@@ -453,6 +457,6 @@ def start(config_options):
     _base.start_worker_reactor("synapse-synchrotron", config)
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])