diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index a41af4fd6c..0e6b1957c6 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -22,20 +22,19 @@ from .streams import STREAMS_MAP, FederationStream
from .protocol import ServerReplicationStreamProtocol
from synapse.util.metrics import Measure, measure_func
+from synapse.metrics import LaterGauge
import logging
-import synapse.metrics
+from prometheus_client import Counter
-metrics = synapse.metrics.get_metrics_for(__name__)
-stream_updates_counter = metrics.register_counter(
- "stream_updates", labels=["stream_name"]
+stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", "", ["stream_name"]
)
-user_sync_counter = metrics.register_counter("user_sync")
-federation_ack_counter = metrics.register_counter("federation_ack")
-remove_pusher_counter = metrics.register_counter("remove_pusher")
-invalidate_cache_counter = metrics.register_counter("invalidate_cache")
-user_ip_cache_counter = metrics.register_counter("user_ip_cache")
+user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "")
+federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "")
+remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "")
+invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", "")
+user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "")
logger = logging.getLogger(__name__)
@@ -73,7 +72,8 @@ class ReplicationStreamer(object):
# Current connections.
self.connections = []
- metrics.register_callback("total_connections", lambda: len(self.connections))
+ l = LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], lambda: len(self.connections))
+ l.register()
# List of streams that clients can subscribe to.
# We only support federation stream if federation sending hase been
@@ -85,17 +85,15 @@ class ReplicationStreamer(object):
self.streams_by_name = {stream.NAME: stream for stream in self.streams}
- metrics.register_callback(
- "connections_per_stream",
+ LaterGauge(
+ "synapse_replication_tcp_resource_connections_per_stream", "", ["stream_name"],
lambda: {
(stream_name,): len([
conn for conn in self.connections
if stream_name in conn.replication_streams
])
for stream_name in self.streams_by_name
- },
- labels=["stream_name"],
- )
+ }).register()
self.federation_sender = None
if not hs.config.send_federation:
@@ -175,7 +173,7 @@ class ReplicationStreamer(object):
logger.info(
"Streaming: %s -> %s", stream.NAME, updates[-1][0]
)
- stream_updates_counter.inc_by(len(updates), stream.NAME)
+ stream_updates_counter.labels(stream.NAME).inc(len(updates))
# Some streams return multiple rows with the same stream IDs,
# we need to make sure they get sent out in batches. We do
|