summary refs log tree commit diff
path: root/synapse/app/federation_sender.py
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/app/federation_sender.py46
1 files changed, 27 insertions, 19 deletions
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 1d43f2b075..969be58d0b 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -52,8 +52,13 @@ logger = logging.getLogger("synapse.app.federation_sender")
 
 
 class FederationSenderSlaveStore(
-    SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
-    SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
+    SlavedDeviceInboxStore,
+    SlavedTransactionStore,
+    SlavedReceiptsStore,
+    SlavedEventStore,
+    SlavedRegistrationStore,
+    SlavedDeviceStore,
+    SlavedPresenceStore,
 ):
     def __init__(self, db_conn, hs):
         super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
@@ -65,10 +70,7 @@ class FederationSenderSlaveStore(
         self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
 
     def _get_federation_out_pos(self, db_conn):
-        sql = (
-            "SELECT stream_id FROM federation_stream_position"
-            " WHERE type = ?"
-        )
+        sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
         sql = self.database_engine.convert_param_style(sql)
 
         txn = db_conn.cursor()
@@ -103,7 +105,7 @@ class FederationSenderServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
-            )
+            ),
         )
 
         logger.info("Synapse federation_sender now listening on port %d", port)
@@ -117,18 +119,19 @@ class FederationSenderServer(HomeServer):
                     listener["bind_addresses"],
                     listener["port"],
                     manhole(
-                        username="matrix",
-                        password="rabbithole",
-                        globals={"hs": self},
-                    )
+                        username="matrix", password="rabbithole", globals={"hs": self}
+                    ),
                 )
             elif listener["type"] == "metrics":
                 if not self.get_config().enable_metrics:
-                    logger.warn(("Metrics listener configured, but "
-                                 "enable_metrics is not True!"))
+                    logger.warn(
+                        (
+                            "Metrics listener configured, but "
+                            "enable_metrics is not True!"
+                        )
+                    )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"],
-                                         listener["port"])
+                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
             else:
                 logger.warn("Unrecognized listener type: %s", listener["type"])
 
@@ -151,7 +154,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
         self.send_handler.process_replication_rows(stream_name, token, rows)
 
     def get_streams_to_replicate(self):
-        args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+        args = super(
+            FederationSenderReplicationHandler, self
+        ).get_streams_to_replicate()
         args.update(self.send_handler.stream_positions())
         return args
 
@@ -203,6 +208,7 @@ class FederationSenderHandler(object):
     """Processes the replication stream and forwards the appropriate entries
     to the federation sender.
     """
+
     def __init__(self, hs, replication_client):
         self.store = hs.get_datastore()
         self._is_mine_id = hs.is_mine_id
@@ -241,7 +247,7 @@ class FederationSenderHandler(object):
         # ... and when new receipts happen
         elif stream_name == ReceiptsStream.NAME:
             run_as_background_process(
-                "process_receipts_for_federation", self._on_new_receipts, rows,
+                "process_receipts_for_federation", self._on_new_receipts, rows
             )
 
     @defer.inlineCallbacks
@@ -278,12 +284,14 @@ class FederationSenderHandler(object):
 
                     # We ACK this token over replication so that the master can drop
                     # its in memory queues
-                    self.replication_client.send_federation_ack(self.federation_position)
+                    self.replication_client.send_federation_ack(
+                        self.federation_position
+                    )
                     self._last_ack = self.federation_position
         except Exception:
             logger.exception("Error updating federation stream position")
 
 
-if __name__ == '__main__':
+if __name__ == "__main__":
     with LoggingContext("main"):
         start(sys.argv[1:])