diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index ded2b1871a..f0aeb5a0d3 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,23 +26,25 @@ from synapse.util.retryutils import NotRetryingDestination, get_retry_limiter
from synapse.util.metrics import measure_func
from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
import synapse.metrics
+from synapse.metrics import LaterGauge
+from synapse.metrics import (
+ sent_edus_counter,
+ sent_transactions_counter,
+ events_processed_counter,
+)
+
+from prometheus_client import Counter
+
+from six import itervalues
import logging
logger = logging.getLogger(__name__)
-metrics = synapse.metrics.get_metrics_for(__name__)
-
-client_metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
-sent_pdus_destination_dist = client_metrics.register_distribution(
- "sent_pdu_destinations"
+sent_pdus_destination_dist = Counter(
+ "synapse_federation_transaction_queue_sent_pdu_destinations", ""
)
-sent_edus_counter = client_metrics.register_counter("sent_edus")
-
-sent_transactions_counter = client_metrics.register_counter("sent_transactions")
-
-events_processed_counter = client_metrics.register_counter("events_processed")
class TransactionQueue(object):
@@ -69,8 +71,10 @@ class TransactionQueue(object):
# done
self.pending_transactions = {}
- metrics.register_callback(
- "pending_destinations",
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_destinations",
+ "",
+ [],
lambda: len(self.pending_transactions),
)
@@ -94,12 +98,16 @@ class TransactionQueue(object):
# Map of destination -> (edu_type, key) -> Edu
self.pending_edus_keyed_by_dest = edus_keyed = {}
- metrics.register_callback(
- "pending_pdus",
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_pdus",
+ "",
+ [],
lambda: sum(map(len, pdus.values())),
)
- metrics.register_callback(
- "pending_edus",
+ LaterGauge(
+ "synapse_federation_transaction_queue_pending_edus",
+ "",
+ [],
lambda: (
sum(map(len, edus.values()))
+ sum(map(len, presence.values()))
@@ -228,7 +236,7 @@ class TransactionQueue(object):
yield logcontext.make_deferred_yieldable(defer.gatherResults(
[
logcontext.run_in_background(handle_room_events, evs)
- for evs in events_by_room.itervalues()
+ for evs in itervalues(events_by_room)
],
consumeErrors=True
))
@@ -241,18 +249,15 @@ class TransactionQueue(object):
now = self.clock.time_msec()
ts = yield self.store.get_received_ts(events[-1].event_id)
- synapse.metrics.event_processing_lag.set(
- now - ts, "federation_sender",
- )
- synapse.metrics.event_processing_last_ts.set(
- ts, "federation_sender",
- )
+ synapse.metrics.event_processing_lag.labels(
+ "federation_sender").set(now - ts)
+ synapse.metrics.event_processing_last_ts.labels(
+ "federation_sender").set(ts)
- events_processed_counter.inc_by(len(events))
+ events_processed_counter.inc(len(events))
- synapse.metrics.event_processing_positions.set(
- next_token, "federation_sender",
- )
+ synapse.metrics.event_processing_positions.labels(
+ "federation_sender").set(next_token)
finally:
self._is_processing = False
@@ -275,7 +280,7 @@ class TransactionQueue(object):
if not destinations:
return
- sent_pdus_destination_dist.inc_by(len(destinations))
+ sent_pdus_destination_dist.inc(len(destinations))
for destination in destinations:
self.pending_pdus_by_dest.setdefault(destination, []).append(
@@ -322,7 +327,7 @@ class TransactionQueue(object):
if not states_map:
break
- yield self._process_presence_inner(states_map.values())
+ yield self._process_presence_inner(list(states_map.values()))
except Exception:
logger.exception("Error sending presence states to servers")
finally:
|