summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/protocol.py14
-rw-r--r--synapse/replication/tcp/resource.py3
2 files changed, 9 insertions, 8 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 5995e5d553..a6280aae70 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -67,14 +67,14 @@ from prometheus_client import Counter
 
 from collections import defaultdict
 
+from six import iterkeys, iteritems
+
 import logging
 import struct
 import fcntl
 
 connection_close_counter = Counter(
-    "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"],
-)
-
+    "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
 
 # A list of all connected protocols. This allows us to send metrics about the
 # connections.
@@ -389,7 +389,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
         if stream_name == "ALL":
             # Subscribe to all streams we're publishing to.
-            for stream in self.streamer.streams_by_name.iterkeys():
+            for stream in iterkeys(self.streamer.streams_by_name):
                 self.subscribe_to_stream(stream, token)
         else:
             self.subscribe_to_stream(stream_name, token)
@@ -495,7 +495,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         BaseReplicationStreamProtocol.connectionMade(self)
 
         # Once we've connected subscribe to the necessary streams
-        for stream_name, token in self.handler.get_streams_to_replicate().iteritems():
+        for stream_name, token in iteritems(self.handler.get_streams_to_replicate()):
             self.replicate(stream_name, token)
 
         # Tell the server if we have any users currently syncing (should only
@@ -622,7 +622,7 @@ tcp_inbound_commands = LaterGauge(
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
-        for k, count in p.inbound_commands_counter.items()
+        for k, count in iteritems(p.inbound_commands_counter.counts)
     })
 
 tcp_outbound_commands = LaterGauge(
@@ -630,7 +630,7 @@ tcp_outbound_commands = LaterGauge(
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
-        for k, count in p.outbound_commands_counter.items()
+        for k, count in iteritems(p.outbound_commands_counter.counts)
     })
 
 # number of updates received for each RDATA stream
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 19987c06f0..63bd6d2652 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -27,6 +27,7 @@ from synapse.metrics import LaterGauge
 import logging
 
 from prometheus_client import Counter
+from six import itervalues
 
 stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates",
                                  "", ["stream_name"])
@@ -81,7 +82,7 @@ class ReplicationStreamer(object):
         # We only support federation stream if federation sending hase been
         # disabled on the master.
         self.streams = [
-            stream(hs) for stream in STREAMS_MAP.itervalues()
+            stream(hs) for stream in itervalues(STREAMS_MAP)
             if stream != FederationStream or not hs.config.send_federation
         ]