summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py54
1 files changed, 34 insertions, 20 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index f6a38f5140..d1e98428bc 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -33,13 +33,15 @@ from .protocol import ServerReplicationStreamProtocol
 from .streams import STREAMS_MAP
 from .streams.federation import FederationStream
 
-stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
-                                 "", ["stream_name"])
+stream_updates_counter = Counter(
+    "synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
+)
 user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
 federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
 remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
-invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache",
-                                   "")
+invalidate_cache_counter = Counter(
+    "synapse_replication_tcp_resource_invalidate_cache", ""
+)
 user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
 
 logger = logging.getLogger(__name__)
@@ -48,6 +50,7 @@ logger = logging.getLogger(__name__)
 class ReplicationStreamProtocolFactory(Factory):
     """Factory for new replication connections.
     """
+
     def __init__(self, hs):
         self.streamer = ReplicationStreamer(hs)
         self.clock = hs.get_clock()
@@ -55,9 +58,7 @@ class ReplicationStreamProtocolFactory(Factory):
 
     def buildProtocol(self, addr):
         return ServerReplicationStreamProtocol(
-            self.server_name,
-            self.clock,
-            self.streamer,
+            self.server_name, self.clock, self.streamer
         )
 
 
@@ -80,29 +81,39 @@ class ReplicationStreamer(object):
         # Current connections.
         self.connections = []
 
-        LaterGauge("synapse_replication_tcp_resource_total_connections", "", [],
-                   lambda: len(self.connections))
+        LaterGauge(
+            "synapse_replication_tcp_resource_total_connections",
+            "",
+            [],
+            lambda: len(self.connections),
+        )
 
         # List of streams that clients can subscribe to.
         # We only support federation stream if federation sending hase been
         # disabled on the master.
         self.streams = [
-            stream(hs) for stream in itervalues(STREAMS_MAP)
+            stream(hs)
+            for stream in itervalues(STREAMS_MAP)
             if stream != FederationStream or not hs.config.send_federation
         ]
 
         self.streams_by_name = {stream.NAME: stream for stream in self.streams}
 
         LaterGauge(
-            "synapse_replication_tcp_resource_connections_per_stream", "",
+            "synapse_replication_tcp_resource_connections_per_stream",
+            "",
             ["stream_name"],
             lambda: {
-                (stream_name,): len([
-                    conn for conn in self.connections
-                    if stream_name in conn.replication_streams
-                ])
+                (stream_name,): len(
+                    [
+                        conn
+                        for conn in self.connections
+                        if stream_name in conn.replication_streams
+                    ]
+                )
                 for stream_name in self.streams_by_name
-            })
+            },
+        )
 
         self.federation_sender = None
         if not hs.config.send_federation:
@@ -179,7 +190,9 @@ class ReplicationStreamer(object):
 
                         logger.debug(
                             "Getting stream: %s: %s -> %s",
-                            stream.NAME, stream.last_token, stream.upto_token
+                            stream.NAME,
+                            stream.last_token,
+                            stream.upto_token,
                         )
                         try:
                             updates, current_token = yield stream.get_updates()
@@ -189,7 +202,8 @@ class ReplicationStreamer(object):
 
                         logger.debug(
                             "Sending %d updates to %d connections",
-                            len(updates), len(self.connections),
+                            len(updates),
+                            len(self.connections),
                         )
 
                         if updates:
@@ -243,7 +257,7 @@ class ReplicationStreamer(object):
         """
         user_sync_counter.inc()
         yield self.presence_handler.update_external_syncs_row(
-            conn_id, user_id, is_syncing, last_sync_ms,
+            conn_id, user_id, is_syncing, last_sync_ms
         )
 
     @measure_func("repl.on_remove_pusher")
@@ -272,7 +286,7 @@ class ReplicationStreamer(object):
         """
         user_ip_cache_counter.inc()
         yield self.store.insert_client_ip(
-            user_id, access_token, ip, user_agent, device_id, last_seen,
+            user_id, access_token, ip, user_agent, device_id, last_seen
         )
         yield self._server_notices_sender.on_user_ip(user_id)