summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2018-05-28 23:30:09 +1000
committerGitHub <noreply@github.com>2018-05-28 23:30:09 +1000
commit81717e85156abd68d70a68638a5f75149cc573f9 (patch)
tree8c302a1c18de1d46c95eeb2184d51be6482731fe /synapse/replication/tcp/protocol.py
parentMerge pull request #3288 from matrix-org/rav/no_spam_guests (diff)
parentfix up tests (diff)
downloadsynapse-81717e85156abd68d70a68638a5f75149cc573f9.tar.xz
Merge pull request #3256 from matrix-org/3218-official-prom
Switch to the Python Prometheus library
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py94
1 files changed, 38 insertions, 56 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 7ca1588f6a..a6280aae70 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -60,22 +60,21 @@ from .commands import (
 )
 from .streams import STREAMS_MAP
 
+from synapse.metrics import LaterGauge
 from synapse.util.stringutils import random_string
-from synapse.metrics.metric import CounterMetric
 
-import logging
-import synapse.metrics
-import struct
-import fcntl
+from prometheus_client import Counter
 
-from six import iterkeys, iteritems
+from collections import defaultdict
 
-metrics = synapse.metrics.get_metrics_for(__name__)
+from six import iterkeys, iteritems
 
-connection_close_counter = metrics.register_counter(
-    "close_reason", labels=["reason_type"],
-)
+import logging
+import struct
+import fcntl
 
+connection_close_counter = Counter(
+    "synapse_replication_tcp_protocol_close_reason", "", ["reason_type"])
 
 # A list of all connected protocols. This allows us to send metrics about the
 # connections.
@@ -137,12 +136,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
         # The LoopingCall for sending pings.
         self._send_ping_loop = None
 
-        self.inbound_commands_counter = CounterMetric(
-            "inbound_commands", labels=["command"],
-        )
-        self.outbound_commands_counter = CounterMetric(
-            "outbound_commands", labels=["command"],
-        )
+        self.inbound_commands_counter = defaultdict(int)
+        self.outbound_commands_counter = defaultdict(int)
 
     def connectionMade(self):
         logger.info("[%s] Connection established", self.id())
@@ -202,7 +197,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
 
         self.last_received_command = self.clock.time_msec()
 
-        self.inbound_commands_counter.inc(cmd_name)
+        self.inbound_commands_counter[cmd_name] = (
+            self.inbound_commands_counter[cmd_name] + 1)
 
         cmd_cls = COMMAND_MAP[cmd_name]
         try:
@@ -252,8 +248,8 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
             self._queue_command(cmd)
             return
 
-        self.outbound_commands_counter.inc(cmd.NAME)
-
+        self.outbound_commands_counter[cmd.NAME] = (
+            self.outbound_commands_counter[cmd.NAME] + 1)
         string = "%s %s" % (cmd.NAME, cmd.to_line(),)
         if "\n" in string:
             raise Exception("Unexpected newline in command: %r", string)
@@ -318,9 +314,9 @@ class BaseReplicationStreamProtocol(LineOnlyReceiver):
     def connectionLost(self, reason):
         logger.info("[%s] Replication connection closed: %r", self.id(), reason)
         if isinstance(reason, Failure):
-            connection_close_counter.inc(reason.type.__name__)
+            connection_close_counter.labels(reason.type.__name__).inc()
         else:
-            connection_close_counter.inc(reason.__class__.__name__)
+            connection_close_counter.labels(reason.__class__.__name__).inc()
 
         try:
             # Remove us from list of connections to be monitored
@@ -519,7 +515,7 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
     def on_RDATA(self, cmd):
         stream_name = cmd.stream_name
-        inbound_rdata_count.inc(stream_name)
+        inbound_rdata_count.labels(stream_name).inc()
 
         try:
             row = STREAMS_MAP[stream_name].ROW_TYPE(*cmd.row)
@@ -567,14 +563,12 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
 # The following simply registers metrics for the replication connections
 
-metrics.register_callback(
-    "pending_commands",
+pending_commands = LaterGauge(
+    "pending_commands", "", ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): len(p.pending_commands)
         for p in connected_connections
-    },
-    labels=["name", "conn_id"],
-)
+    })
 
 
 def transport_buffer_size(protocol):
@@ -584,14 +578,12 @@ def transport_buffer_size(protocol):
     return 0
 
 
-metrics.register_callback(
-    "transport_send_buffer",
+transport_send_buffer = LaterGauge(
+    "synapse_replication_tcp_transport_send_buffer", "", ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_buffer_size(p)
         for p in connected_connections
-    },
-    labels=["name", "conn_id"],
-)
+    })
 
 
 def transport_kernel_read_buffer_size(protocol, read=True):
@@ -609,48 +601,38 @@ def transport_kernel_read_buffer_size(protocol, read=True):
     return 0
 
 
-metrics.register_callback(
-    "transport_kernel_send_buffer",
+tcp_transport_kernel_send_buffer = LaterGauge(
+    "synapse_replication_tcp_transport_kernel_send_buffer", "", ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, False)
         for p in connected_connections
-    },
-    labels=["name", "conn_id"],
-)
+    })
 
 
-metrics.register_callback(
-    "transport_kernel_read_buffer",
+tcp_transport_kernel_read_buffer = LaterGauge(
+    "synapse_replication_tcp_transport_kernel_read_buffer", "", ["name", "conn_id"],
     lambda: {
         (p.name, p.conn_id): transport_kernel_read_buffer_size(p, True)
         for p in connected_connections
-    },
-    labels=["name", "conn_id"],
-)
+    })
 
 
-metrics.register_callback(
-    "inbound_commands",
+tcp_inbound_commands = LaterGauge(
+    "synapse_replication_tcp_inbound_commands", "", ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
         for k, count in iteritems(p.inbound_commands_counter.counts)
-    },
-    labels=["command", "name", "conn_id"],
-)
+    })
 
-metrics.register_callback(
-    "outbound_commands",
+tcp_outbound_commands = LaterGauge(
+    "synapse_replication_tcp_outbound_commands", "", ["command", "name", "conn_id"],
     lambda: {
         (k[0], p.name, p.conn_id): count
         for p in connected_connections
         for k, count in iteritems(p.outbound_commands_counter.counts)
-    },
-    labels=["command", "name", "conn_id"],
-)
+    })
 
 # number of updates received for each RDATA stream
-inbound_rdata_count = metrics.register_counter(
-    "inbound_rdata_count",
-    labels=["stream_name"],
-)
+inbound_rdata_count = Counter("synapse_replication_tcp_inbound_rdata_count", "",
+                              ["stream_name"])