diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 7240acb0a2..e3f64eba8f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -50,10 +50,7 @@ import abc
import fcntl
import logging
import struct
-from collections import defaultdict
-from typing import TYPE_CHECKING, DefaultDict, List
-
-from six import iteritems
+from typing import TYPE_CHECKING, List
from prometheus_client import Counter
@@ -86,6 +83,18 @@ connection_close_counter = Counter(
"synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
)
+tcp_inbound_commands_counter = Counter(
+ "synapse_replication_tcp_protocol_inbound_commands",
+ "Number of commands received from replication, by command and name of process connected to",
+ ["command", "name"],
+)
+
+tcp_outbound_commands_counter = Counter(
+ "synapse_replication_tcp_protocol_outbound_commands",
+ "Number of commands sent to replication, by command and name of process connected to",
+ ["command", "name"],
+)
+
# A list of all connected protocols. This allows us to send metrics about the
# connections.
connected_connections = []
@@ -151,9 +160,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings.
self._send_ping_loop = None
- self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
- self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int]
-
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@@ -224,9 +230,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec()
- self.inbound_commands_counter[cmd.NAME] = (
- self.inbound_commands_counter[cmd.NAME] + 1
- )
+ tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()
# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
@@ -292,9 +296,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd)
return
- self.outbound_commands_counter[cmd.NAME] = (
- self.outbound_commands_counter[cmd.NAME] + 1
- )
+ tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc()
+
string = "%s %s" % (cmd.NAME, cmd.to_line())
if "\n" in string:
raise Exception("Unexpected newline in command: %r", string)
@@ -546,26 +549,3 @@ tcp_transport_kernel_read_buffer = LaterGauge(
for p in connected_connections
},
)
-
-
-tcp_inbound_commands = LaterGauge(
- "synapse_replication_tcp_protocol_inbound_commands",
- "",
- ["command", "name"],
- lambda: {
- (k, p.name): count
- for p in connected_connections
- for k, count in iteritems(p.inbound_commands_counter)
- },
-)
-
-tcp_outbound_commands = LaterGauge(
- "synapse_replication_tcp_protocol_outbound_commands",
- "",
- ["command", "name"],
- lambda: {
- (k, p.name): count
- for p in connected_connections
- for k, count in iteritems(p.outbound_commands_counter)
- },
-)
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 4c08425735..49b3ed0c5e 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -25,7 +25,11 @@ from synapse.replication.tcp.commands import (
ReplicateCommand,
parse_command_from_line,
)
-from synapse.replication.tcp.protocol import AbstractConnection
+from synapse.replication.tcp.protocol import (
+ AbstractConnection,
+ tcp_inbound_commands_counter,
+ tcp_outbound_commands_counter,
+)
if TYPE_CHECKING:
from synapse.replication.tcp.handler import ReplicationCommandHandler
@@ -79,6 +83,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
)
return
+ # We use "redis" as the name here as we don't have 1:1 connections to
+ # remote instances.
+ tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc()
+
# Now lets try and call on_<CMD_NAME> function
run_as_background_process(
"replication-" + cmd.get_logcontext_id(), self.handle_command, cmd
@@ -126,6 +134,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
encoded_string = string.encode("utf-8")
+ # We use "redis" as the name here as we don't have 1:1 connections to
+ # remote instances.
+ tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc()
+
async def _send():
with PreserveLoggingContext():
# Note that we use the other connection as we can't send
|