diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 37 |
1 files changed, 19 insertions, 18 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index a603c520ea..63bd6d2652 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -22,20 +22,21 @@ from .streams import STREAMS_MAP, FederationStream from .protocol import ServerReplicationStreamProtocol from synapse.util.metrics import Measure, measure_func +from synapse.metrics import LaterGauge import logging -import synapse.metrics +from prometheus_client import Counter +from six import itervalues -metrics = synapse.metrics.get_metrics_for(__name__) -stream_updates_counter = metrics.register_counter( - "stream_updates", labels=["stream_name"] -) -user_sync_counter = metrics.register_counter("user_sync") -federation_ack_counter = metrics.register_counter("federation_ack") -remove_pusher_counter = metrics.register_counter("remove_pusher") -invalidate_cache_counter = metrics.register_counter("invalidate_cache") -user_ip_cache_counter = metrics.register_counter("user_ip_cache") +stream_updates_counter = Counter("synapse_replication_tcp_resource_stream_updates", + "", ["stream_name"]) +user_sync_counter = Counter("synapse_replication_tcp_resource_user_sync", "") +federation_ack_counter = Counter("synapse_replication_tcp_resource_federation_ack", "") +remove_pusher_counter = Counter("synapse_replication_tcp_resource_remove_pusher", "") +invalidate_cache_counter = Counter("synapse_replication_tcp_resource_invalidate_cache", + "") +user_ip_cache_counter = Counter("synapse_replication_tcp_resource_user_ip_cache", "") logger = logging.getLogger(__name__) @@ -74,29 +75,29 @@ class ReplicationStreamer(object): # Current connections. self.connections = [] - metrics.register_callback("total_connections", lambda: len(self.connections)) + LaterGauge("synapse_replication_tcp_resource_total_connections", "", [], + lambda: len(self.connections)) # List of streams that clients can subscribe to. # We only support federation stream if federation sending hase been # disabled on the master. self.streams = [ - stream(hs) for stream in STREAMS_MAP.itervalues() + stream(hs) for stream in itervalues(STREAMS_MAP) if stream != FederationStream or not hs.config.send_federation ] self.streams_by_name = {stream.NAME: stream for stream in self.streams} - metrics.register_callback( - "connections_per_stream", + LaterGauge( + "synapse_replication_tcp_resource_connections_per_stream", "", + ["stream_name"], lambda: { (stream_name,): len([ conn for conn in self.connections if stream_name in conn.replication_streams ]) for stream_name in self.streams_by_name - }, - labels=["stream_name"], - ) + }) self.federation_sender = None if not hs.config.send_federation: @@ -176,7 +177,7 @@ class ReplicationStreamer(object): logger.info( "Streaming: %s -> %s", stream.NAME, updates[-1][0] ) - stream_updates_counter.inc_by(len(updates), stream.NAME) + stream_updates_counter.labels(stream.NAME).inc(len(updates)) # Some streams return multiple rows with the same stream IDs, # we need to make sure they get sent out in batches. We do |