diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 1d43f2b075..969be58d0b 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -52,8 +52,13 @@ logger = logging.getLogger("synapse.app.federation_sender")
class FederationSenderSlaveStore(
- SlavedDeviceInboxStore, SlavedTransactionStore, SlavedReceiptsStore, SlavedEventStore,
- SlavedRegistrationStore, SlavedDeviceStore, SlavedPresenceStore,
+ SlavedDeviceInboxStore,
+ SlavedTransactionStore,
+ SlavedReceiptsStore,
+ SlavedEventStore,
+ SlavedRegistrationStore,
+ SlavedDeviceStore,
+ SlavedPresenceStore,
):
def __init__(self, db_conn, hs):
super(FederationSenderSlaveStore, self).__init__(db_conn, hs)
@@ -65,10 +70,7 @@ class FederationSenderSlaveStore(
self.federation_out_pos_startup = self._get_federation_out_pos(db_conn)
def _get_federation_out_pos(self, db_conn):
- sql = (
- "SELECT stream_id FROM federation_stream_position"
- " WHERE type = ?"
- )
+ sql = "SELECT stream_id FROM federation_stream_position" " WHERE type = ?"
sql = self.database_engine.convert_param_style(sql)
txn = db_conn.cursor()
@@ -103,7 +105,7 @@ class FederationSenderServer(HomeServer):
listener_config,
root_resource,
self.version_string,
- )
+ ),
)
logger.info("Synapse federation_sender now listening on port %d", port)
@@ -117,18 +119,19 @@ class FederationSenderServer(HomeServer):
listener["bind_addresses"],
listener["port"],
manhole(
- username="matrix",
- password="rabbithole",
- globals={"hs": self},
- )
+ username="matrix", password="rabbithole", globals={"hs": self}
+ ),
)
elif listener["type"] == "metrics":
if not self.get_config().enable_metrics:
- logger.warn(("Metrics listener configured, but "
- "enable_metrics is not True!"))
+ logger.warn(
+ (
+ "Metrics listener configured, but "
+ "enable_metrics is not True!"
+ )
+ )
else:
- _base.listen_metrics(listener["bind_addresses"],
- listener["port"])
+ _base.listen_metrics(listener["bind_addresses"], listener["port"])
else:
logger.warn("Unrecognized listener type: %s", listener["type"])
@@ -151,7 +154,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
self.send_handler.process_replication_rows(stream_name, token, rows)
def get_streams_to_replicate(self):
- args = super(FederationSenderReplicationHandler, self).get_streams_to_replicate()
+ args = super(
+ FederationSenderReplicationHandler, self
+ ).get_streams_to_replicate()
args.update(self.send_handler.stream_positions())
return args
@@ -203,6 +208,7 @@ class FederationSenderHandler(object):
"""Processes the replication stream and forwards the appropriate entries
to the federation sender.
"""
+
def __init__(self, hs, replication_client):
self.store = hs.get_datastore()
self._is_mine_id = hs.is_mine_id
@@ -241,7 +247,7 @@ class FederationSenderHandler(object):
# ... and when new receipts happen
elif stream_name == ReceiptsStream.NAME:
run_as_background_process(
- "process_receipts_for_federation", self._on_new_receipts, rows,
+ "process_receipts_for_federation", self._on_new_receipts, rows
)
@defer.inlineCallbacks
@@ -278,12 +284,14 @@ class FederationSenderHandler(object):
# We ACK this token over replication so that the master can drop
# its in memory queues
- self.replication_client.send_federation_ack(self.federation_position)
+ self.replication_client.send_federation_ack(
+ self.federation_position
+ )
self._last_ack = self.federation_position
except Exception:
logger.exception("Error updating federation stream position")
-if __name__ == '__main__':
+if __name__ == "__main__":
with LoggingContext("main"):
start(sys.argv[1:])
|