summary refs log tree commit diff
path: root/synapse/replication
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication')
-rw-r--r--synapse/replication/tcp/protocol.py52
-rw-r--r--synapse/replication/tcp/redis.py14
2 files changed, 29 insertions, 37 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 7240acb0a2..e3f64eba8f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -50,10 +50,7 @@ import abc
 import fcntl
 import logging
 import struct
-from collections import defaultdict
-from typing import TYPE_CHECKING, DefaultDict, List
-
-from six import iteritems
+from typing import TYPE_CHECKING, List
 
 from prometheus_client import Counter
 
@@ -86,6 +83,18 @@ connection_close_counter = Counter(
     "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
 )
 
+tcp_inbound_commands_counter = Counter(
+    "synapse_replication_tcp_protocol_inbound_commands",
+    "Number of commands received from replication, by command and name of process connected to",
+    ["command", "name"],
+)
+
+tcp_outbound_commands_counter = Counter(
+    "synapse_replication_tcp_protocol_outbound_commands",
+    "Number of commands sent to replication, by command and name of process connected to",
+    ["command", "name"],
+)
+
 # A list of all connected protocols. This allows us to send metrics about the
 # connections.
 connected_connections = []
@@ -151,9 +160,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         # The LoopingCall for sending pings.
         self._send_ping_loop = None
 
-        self.inbound_commands_counter = defaultdict(int)  # type: DefaultDict[str, int]
-        self.outbound_commands_counter = defaultdict(int)  # type: DefaultDict[str, int]
-
     def connectionMade(self):
         logger.info("[%s] Connection established", self.id())
 
@@ -224,9 +230,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         self.last_received_command = self.clock.time_msec()
 
-        self.inbound_commands_counter[cmd.NAME] = (
-            self.inbound_commands_counter[cmd.NAME] + 1
-        )
+        tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()
 
         # Now lets try and call on_<CMD_NAME> function
         run_as_background_process(
@@ -292,9 +296,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             self._queue_command(cmd)
             return
 
-        self.outbound_commands_counter[cmd.NAME] = (
-            self.outbound_commands_counter[cmd.NAME] + 1
-        )
+        tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc()
+
         string = "%s %s" % (cmd.NAME, cmd.to_line())
         if "\n" in string:
             raise Exception("Unexpected newline in command: %r", string)
@@ -546,26 +549,3 @@ tcp_transport_kernel_read_buffer = LaterGauge(
         for p in connected_connections
     },
 )
-
-
-tcp_inbound_commands = LaterGauge(
-    "synapse_replication_tcp_protocol_inbound_commands",
-    "",
-    ["command", "name"],
-    lambda: {
-        (k, p.name): count
-        for p in connected_connections
-        for k, count in iteritems(p.inbound_commands_counter)
-    },
-)
-
-tcp_outbound_commands = LaterGauge(
-    "synapse_replication_tcp_protocol_outbound_commands",
-    "",
-    ["command", "name"],
-    lambda: {
-        (k, p.name): count
-        for p in connected_connections
-        for k, count in iteritems(p.outbound_commands_counter)
-    },
-)
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 4c08425735..49b3ed0c5e 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -25,7 +25,11 @@ from synapse.replication.tcp.commands import (
     ReplicateCommand,
     parse_command_from_line,
 )
-from synapse.replication.tcp.protocol import AbstractConnection
+from synapse.replication.tcp.protocol import (
+    AbstractConnection,
+    tcp_inbound_commands_counter,
+    tcp_outbound_commands_counter,
+)
 
 if TYPE_CHECKING:
     from synapse.replication.tcp.handler import ReplicationCommandHandler
@@ -79,6 +83,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
             )
             return
 
+        # We use "redis" as the name here as we don't have 1:1 connections to
+        # remote instances.
+        tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc()
+
         # Now lets try and call on_<CMD_NAME> function
         run_as_background_process(
             "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
@@ -126,6 +134,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
 
         encoded_string = string.encode("utf-8")
 
+        # We use "redis" as the name here as we don't have 1:1 connections to
+        # remote instances.
+        tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
+
         async def _send():
             with PreserveLoggingContext():
                 # Note that we use the other connection as we can't send