diff options
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r-- | synapse/replication/tcp/protocol.py | 52 |
1 files changed, 16 insertions, 36 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 7240acb0a2..e3f64eba8f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -50,10 +50,7 @@ import abc import fcntl import logging import struct -from collections import defaultdict -from typing import TYPE_CHECKING, DefaultDict, List - -from six import iteritems +from typing import TYPE_CHECKING, List from prometheus_client import Counter @@ -86,6 +83,18 @@ connection_close_counter = Counter( "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"] ) +tcp_inbound_commands_counter = Counter( + "synapse_replication_tcp_protocol_inbound_commands", + "Number of commands received from replication, by command and name of process connected to", + ["command", "name"], +) + +tcp_outbound_commands_counter = Counter( + "synapse_replication_tcp_protocol_outbound_commands", + "Number of commands sent to replication, by command and name of process connected to", + ["command", "name"], +) + # A list of all connected protocols. This allows us to send metrics about the # connections. connected_connections = [] @@ -151,9 +160,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # The LoopingCall for sending pings. self._send_ping_loop = None - self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int] - self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int] - def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -224,9 +230,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - self.inbound_commands_counter[cmd.NAME] = ( - self.inbound_commands_counter[cmd.NAME] + 1 - ) + tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc() # Now lets try and call on_<CMD_NAME> function run_as_background_process( @@ -292,9 +296,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - self.outbound_commands_counter[cmd.NAME] = ( - self.outbound_commands_counter[cmd.NAME] + 1 - ) + tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc() + string = "%s %s" % (cmd.NAME, cmd.to_line()) if "\n" in string: raise Exception("Unexpected newline in command: %r", string) @@ -546,26 +549,3 @@ tcp_transport_kernel_read_buffer = LaterGauge( for p in connected_connections }, ) - - -tcp_inbound_commands = LaterGauge( - "synapse_replication_tcp_protocol_inbound_commands", - "", - ["command", "name"], - lambda: { - (k, p.name): count - for p in connected_connections - for k, count in iteritems(p.inbound_commands_counter) - }, -) - -tcp_outbound_commands = LaterGauge( - "synapse_replication_tcp_protocol_outbound_commands", - "", - ["command", "name"], - lambda: { - (k, p.name): count - for p in connected_connections - for k, count in iteritems(p.outbound_commands_counter) - }, -) |