From 841c581c401e1435c2c7820a803b3f0c574eb8b6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Apr 2020 16:26:19 +0100 Subject: Fix replication metrics when using redis (#7325) --- synapse/replication/tcp/protocol.py | 52 ++++++++++++------------------------- synapse/replication/tcp/redis.py | 14 +++++++++- 2 files changed, 29 insertions(+), 37 deletions(-) (limited to 'synapse/replication') diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 7240acb0a2..e3f64eba8f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -50,10 +50,7 @@ import abc import fcntl import logging import struct -from collections import defaultdict -from typing import TYPE_CHECKING, DefaultDict, List - -from six import iteritems +from typing import TYPE_CHECKING, List from prometheus_client import Counter @@ -86,6 +83,18 @@ connection_close_counter = Counter( "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"] ) +tcp_inbound_commands_counter = Counter( + "synapse_replication_tcp_protocol_inbound_commands", + "Number of commands received from replication, by command and name of process connected to", + ["command", "name"], +) + +tcp_outbound_commands_counter = Counter( + "synapse_replication_tcp_protocol_outbound_commands", + "Number of commands sent to replication, by command and name of process connected to", + ["command", "name"], +) + # A list of all connected protocols. This allows us to send metrics about the # connections. connected_connections = [] @@ -151,9 +160,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): # The LoopingCall for sending pings. self._send_ping_loop = None - self.inbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int] - self.outbound_commands_counter = defaultdict(int) # type: DefaultDict[str, int] - def connectionMade(self): logger.info("[%s] Connection established", self.id()) @@ -224,9 +230,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self.last_received_command = self.clock.time_msec() - self.inbound_commands_counter[cmd.NAME] = ( - self.inbound_commands_counter[cmd.NAME] + 1 - ) + tcp_inbound_commands_counter.labels(cmd.NAME, self.name).inc() # Now lets try and call on_ function run_as_background_process( @@ -292,9 +296,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver): self._queue_command(cmd) return - self.outbound_commands_counter[cmd.NAME] = ( - self.outbound_commands_counter[cmd.NAME] + 1 - ) + tcp_outbound_commands_counter.labels(cmd.NAME, self.name).inc() + string = "%s %s" % (cmd.NAME, cmd.to_line()) if "\n" in string: raise Exception("Unexpected newline in command: %r", string) @@ -546,26 +549,3 @@ tcp_transport_kernel_read_buffer = LaterGauge( for p in connected_connections }, ) - - -tcp_inbound_commands = LaterGauge( - "synapse_replication_tcp_protocol_inbound_commands", - "", - ["command", "name"], - lambda: { - (k, p.name): count - for p in connected_connections - for k, count in iteritems(p.inbound_commands_counter) - }, -) - -tcp_outbound_commands = LaterGauge( - "synapse_replication_tcp_protocol_outbound_commands", - "", - ["command", "name"], - lambda: { - (k, p.name): count - for p in connected_connections - for k, count in iteritems(p.outbound_commands_counter) - }, -) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 4c08425735..49b3ed0c5e 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -25,7 +25,11 @@ from synapse.replication.tcp.commands import ( ReplicateCommand, parse_command_from_line, ) -from synapse.replication.tcp.protocol import AbstractConnection +from synapse.replication.tcp.protocol import ( + AbstractConnection, + tcp_inbound_commands_counter, + tcp_outbound_commands_counter, +) if TYPE_CHECKING: from synapse.replication.tcp.handler import ReplicationCommandHandler @@ -79,6 +83,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): ) return + # We use "redis" as the name here as we don't have 1:1 connections to + # remote instances. + tcp_inbound_commands_counter.labels(cmd.NAME, "redis").inc() + # Now lets try and call on_ function run_as_background_process( "replication-" + cmd.get_logcontext_id(), self.handle_command, cmd @@ -126,6 +134,10 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection): encoded_string = string.encode("utf-8") + # We use "redis" as the name here as we don't have 1:1 connections to + # remote instances. + tcp_outbound_commands_counter.labels(cmd.NAME, "redis").inc() + async def _send(): with PreserveLoggingContext(): # Note that we use the other connection as we can't send -- cgit 1.4.1