summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-04-22 16:26:19 +0100
committerGitHub <noreply@github.com>2020-04-22 16:26:19 +0100
commit841c581c401e1435c2c7820a803b3f0c574eb8b6 (patch)
tree40e3408f218dd375e5d199e1cb83a69cd7beb9ea /synapse/replication/tcp/protocol.py
parentMerge tag 'v1.12.4rc1' into develop (diff)
downloadsynapse-841c581c401e1435c2c7820a803b3f0c574eb8b6.tar.xz
Fix replication metrics when using redis (#7325)
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py52
1 files changed, 16 insertions, 36 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 7240acb0a2..e3f64eba8f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -50,10 +50,7 @@ import abc
 import fcntl
 import logging
 import struct
-from collections import defaultdict
-from typing import TYPE_CHECKING, DefaultDict, List
-
-from six import iteritems
+from typing import TYPE_CHECKING, List
 
 from prometheus_client import Counter
 
@@ -86,6 +83,18 @@ connection_close_counter = Counter(
     "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"]
 )
 
+tcp_inbound_commands_counter = Counter(
+    "synapse_replication_tcp_protocol_inbound_commands",
+    "Number of commands received from replication, by command and name of process connected to",
+    ["command", "name"],
+)
+
+tcp_outbound_commands_counter = Counter(
+    "synapse_replication_tcp_protocol_outbound_commands",
+    "Number of commands sent to replication, by command and name of process connected to",
+    ["command", "name"],
+)
+
 # A list of all connected protocols. This allows us to send metrics about the
 # connections.
 connected_connections = []
@@ -151,9 +160,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         # The LoopingCall for sending pings.
         self._send_ping_loop = None
 
-        self.inbound_commands_counter = defaultdict(int)  # type: DefaultDict[str, int]
-        self.outbound_commands_counter = defaultdict(int)  # type: DefaultDict[str, int]
-
     def connectionMade(self):
         logger.info("[%s] Connection established", self.id())
 
@@ -224,9 +230,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         self.last_received_command = self.clock.time_msec()
 
-        self.inbound_commands_counter[cmd.NAME] = (
-            self.inbound_commands_counter[cmd.NAME] + 1
-        )
+        tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc()
 
         # Now lets try and call on_<CMD_NAME> function
         run_as_background_process(
@@ -292,9 +296,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             self._queue_command(cmd)
             return
 
-        self.outbound_commands_counter[cmd.NAME] = (
-            self.outbound_commands_counter[cmd.NAME] + 1
-        )
+        tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc()
+
         string = "%s %s" % (cmd.NAME, cmd.to_line())
         if "\n" in string:
             raise Exception("Unexpected newline in command: %r", string)
@@ -546,26 +549,3 @@ tcp_transport_kernel_read_buffer = LaterGauge(
         for p in connected_connections
     },
 )
-
-
-tcp_inbound_commands = LaterGauge(
-    "synapse_replication_tcp_protocol_inbound_commands",
-    "",
-    ["command", "name"],
-    lambda: {
-        (k, p.name): count
-        for p in connected_connections
-        for k, count in iteritems(p.inbound_commands_counter)
-    },
-)
-
-tcp_outbound_commands = LaterGauge(
-    "synapse_replication_tcp_protocol_outbound_commands",
-    "",
-    ["command", "name"],
-    lambda: {
-        (k, p.name): count
-        for p in connected_connections
-        for k, count in iteritems(p.outbound_commands_counter)
-    },
-)