diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 6864204616..d4d672aafe 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -51,6 +51,7 @@ indicate which side is sending, these are *not* included on the wire::
from twisted.internet import defer
from twisted.protocols.basic import LineOnlyReceiver
+from twisted.python.failure import Failure
from commands import (
COMMAND_MAP, VALID_CLIENT_COMMANDS, VALID_SERVER_COMMANDS,
@@ -60,6 +61,7 @@ from commands import (
from streams import STREAMS_MAP
from synapse.util.stringutils import random_string
+from synapse.metrics.metric import CounterMetric
import logging
import synapse.metrics
@@ -69,11 +71,8 @@ import fcntl
metrics = synapse.metrics.get_metrics_for(__name__)
-inbound_commands_counter = metrics.register_counter(
- "inbound_commands", labels=["command", "name", "conn_id"],
-)
-outbound_commands_counter = metrics.register_counter(
- "outbound_commands", labels=["command", "name", "conn_id"],
+connection_close_counter = metrics.register_counter(
+ "close_reason", labels=["reason_type"],
)
@@ -135,6 +134,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
# The LoopingCall for sending pings.
self._send_ping_loop = None
+ self.inbound_commands_counter = CounterMetric(
+ "inbound_commands", labels=["command"],
+ )
+ self.outbound_commands_counter = CounterMetric(
+ "outbound_commands", labels=["command"],
+ )
+
def connectionMade(self):
logger.info("[%s] Connection established", self.id())
@@ -193,7 +199,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self.last_received_command = self.clock.time_msec()
- inbound_commands_counter.inc(cmd_name, self.name, self.conn_id)
+ self.inbound_commands_counter.inc(cmd_name)
cmd_cls = COMMAND_MAP[cmd_name]
try:
@@ -242,7 +248,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
self._queue_command(cmd)
return
- outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id)
+ self.outbound_commands_counter.inc(cmd.NAME)
string = "%s %s" % (cmd.NAME, cmd.to_line(),)
if "\n" in string:
@@ -307,6 +313,10 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
def connectionLost(self, reason):
logger.info("[%s] Replication connection closed: %r", self.id(), reason)
+ if isinstance(reason, Failure):
+ connection_close_counter.inc(reason.type.__name__)
+ else:
+ connection_close_counter.inc(reason.__class__.__name__)
try:
# Remove us from list of connections to be monitored
@@ -604,3 +614,24 @@ metrics.register_callback(
},
labels=["name", "conn_id"],
)
+
+
+metrics.register_callback(
+ "inbound_commands",
+ lambda: {
+ (k[0], p.name, p.conn_id): count
+ for p in connected_connections
+ for k, count in p.inbound_commands_counter.counts.iteritems()
+ },
+ labels=["command", "name", "conn_id"],
+)
+
+metrics.register_callback(
+ "outbound_commands",
+ lambda: {
+ (k[0], p.name, p.conn_id): count
+ for p in connected_connections
+ for k, count in p.outbound_commands_counter.counts.iteritems()
+ },
+ labels=["command", "name", "conn_id"],
+)
|