summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/http/send_event.py6
-rw-r--r--synapse/replication/tcp/protocol.py53
2 files changed, 38 insertions, 21 deletions
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index a9baa2c1c3..f080f96cc1 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -21,7 +21,6 @@ from synapse.api.errors import (
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
-from synapse.util.async import sleep
 from synapse.util.caches.response_cache import ResponseCache
 from synapse.util.metrics import Measure
 from synapse.types import Requester, UserID
@@ -33,11 +32,12 @@ logger = logging.getLogger(__name__)
 
 
 @defer.inlineCallbacks
-def send_event_to_master(client, host, port, requester, event, context,
+def send_event_to_master(clock, client, host, port, requester, event, context,
                          ratelimit, extra_users):
     """Send event to be handled on the master
 
     Args:
+        clock (synapse.util.Clock)
         client (SimpleHttpClient)
         host (str): host of master
         port (int): port on master listening for HTTP replication
@@ -77,7 +77,7 @@ def send_event_to_master(client, host, port, requester, event, context,
 
             # If we timed out we probably don't need to worry about backing
             # off too much, but lets just wait a little anyway.
-            yield sleep(1)
+            yield clock.sleep(1)
     except MatrixCodeMessageException as e:
         # We convert to SynapseError as we know that it was a SynapseError
         # on the master process that we should send to the client. (And
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index c870475cd1..171a698e14 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -564,11 +564,13 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 # The following simply registers metrics for the replication connections
 
 pending_commands = LaterGauge(
-    "pending_commands", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_pending_commands",
+    "",
+    ["name", "conn_id"],
     lambda: {
-        (p.name, p.conn_id): len(p.pending_commands)
-        for p in connected_connections
-    })
+        (p.name, p.conn_id): len(p.pending_commands) for p in connected_connections
+    },
+)
 
 
 def transport_buffer_size(protocol):
@@ -579,11 +581,13 @@ def transport_buffer_size(protocol):
 
 
 transport_send_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_send_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
-        (p.name, p.conn_id): transport_buffer_size(p)
-        for p in connected_connections
-    })
+        (p.name, p.conn_id): transport_buffer_size(p) for p in connected_connections
+    },
+)
 
 
 def transport_kernel_read_buffer_size(protocol, read=True):
@@ -602,37 +606,50 @@ def transport_kernel_read_buffer_size(protocol, read=True):
 
 
 tcp_transport_kernel_send_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_kernel_send_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
         for p in connected_connections
-    })
+    },
+)
 
 
 tcp_transport_kernel_read_buffer = LaterGauge(
-    "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
+    "synapse_replication_tcp_protocol_transport_kernel_read_buffer",
+    "",
+    ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
         for p in connected_connections
-    })
+    },
+)
 
 
 tcp_inbound_commands = LaterGauge(
-    "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
+    "synapse_replication_tcp_protocol_inbound_commands",
+    "",
+    ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
         for k, count in iteritems(p.inbound_commands_counter)
-    })
+    },
+)
 
 tcp_outbound_commands = LaterGauge(
-    "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
+    "synapse_replication_tcp_protocol_outbound_commands",
+    "",
+    ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
         for k, count in iteritems(p.outbound_commands_counter)
-    })
+    },
+)
 
 # number of updates received for each RDATA stream
-inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
-                              ["stream_name"])
+inbound_rdata_count = Counter(
+    "synapse_replication_tcp_protocol_inbound_rdata_count", "", ["stream_name"]
+)