summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/metrics/metric.py3
-rw-r--r--synapse/replication/tcp/protocol.py47
2 files changed, 31 insertions, 19 deletions
diff --git a/synapse/metrics/metric.py b/synapse/metrics/metric.py
index 920cde1dd0..e87b2b80a7 100644
--- a/synapse/metrics/metric.py
+++ b/synapse/metrics/metric.py
@@ -83,9 +83,6 @@ class CounterMetric(BaseMetric):
     def render(self):
         return map_concat(self.render_item, sorted(self.counts.keys()))
 
-    def unregister_counter(self, *values):
-        self.counts.pop(values, None)
-
 
 class CallbackMetric(BaseMetric):
     """A metric that returns the numeric value returned by a callback whenever
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 95ea256e77..d4d672aafe 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -61,6 +61,7 @@ from commands import (
 from streams import STREAMS_MAP
 
 from synapse.util.stringutils import random_string
+from synapse.metrics.metric import CounterMetric
 
 import logging
 import synapse.metrics
@@ -70,12 +71,6 @@ import fcntl
 
 metrics = synapse.metrics.get_metrics_for(__name__)
 
-inbound_commands_counter = metrics.register_counter(
-    "inbound_commands", labels=["command", "name", "conn_id"],
-)
-outbound_commands_counter = metrics.register_counter(
-    "outbound_commands", labels=["command", "name", "conn_id"],
-)
 connection_close_counter = metrics.register_counter(
     "close_reason", labels=["reason_type"],
 )
@@ -139,6 +134,13 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         # The LoopingCall for sending pings.
         self._send_ping_loop = None
 
+        self.inbound_commands_counter = CounterMetric(
+            "inbound_commands", labels=["command"],
+        )
+        self.outbound_commands_counter = CounterMetric(
+            "outbound_commands", labels=["command"],
+        )
+
     def connectionMade(self):
         logger.info("[%s] Connection established", self.id())
 
@@ -197,7 +199,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         self.last_received_command = self.clock.time_msec()
 
-        inbound_commands_counter.inc(cmd_name, self.name, self.conn_id)
+        self.inbound_commands_counter.inc(cmd_name)
 
         cmd_cls = COMMAND_MAP[cmd_name]
         try:
@@ -246,7 +248,7 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             self._queue_command(cmd)
             return
 
-        outbound_commands_counter.inc(cmd.NAME, self.name, self.conn_id)
+        self.outbound_commands_counter.inc(cmd.NAME)
 
         string = "%s %s" % (cmd.NAME, cmd.to_line(),)
         if "\n" in string:
@@ -334,14 +336,6 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         self.state = ConnectionStates.CLOSED
         self.pending_commands = []
 
-        for cmd in COMMAND_MAP:
-            outbound_commands_counter.unregister_counter(
-                cmd, self.name, self.conn_id
-            )
-            inbound_commands_counter.unregister_counter(
-                cmd, self.name, self.conn_id
-            )
-
         if self.transport:
             self.transport.unregisterProducer()
 
@@ -620,3 +614,24 @@ metrics.register_callback(
     },
     labels=["name", "conn_id"],
 )
+
+
+metrics.register_callback(
+    "inbound_commands",
+    lambda: {
+        (k[0], p.name, p.conn_id): count
+        for p in connected_connections
+        for k, count in p.inbound_commands_counter.counts.iteritems()
+    },
+    labels=["command", "name", "conn_id"],
+)
+
+metrics.register_callback(
+    "outbound_commands",
+    lambda: {
+        (k[0], p.name, p.conn_id): count
+        for p in connected_connections
+        for k, count in p.outbound_commands_counter.counts.iteritems()
+    },
+    labels=["command", "name", "conn_id"],
+)