diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef48984fdd..539c353528 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -25,7 +25,7 @@ import six
import attr
from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
from twisted.internet import reactor
@@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class RegistryProxy(object):
-
@staticmethod
def collect():
for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ class LaterGauge(object):
try:
calls = self.caller()
except Exception:
- logger.exception(
- "Exception running callback for LaterGauge(%s)",
- self.name,
- )
+ logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
@@ -116,9 +112,7 @@ class InFlightGauge(object):
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
- "_MetricsEntry",
- attrs={x: attr.ib(0) for x in sub_metrics},
- slots=True,
+ "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)
# Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ class InFlightGauge(object):
Note: may be called by a separate thread.
"""
- in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+ in_flight = GaugeMetricFamily(
+ self.name + "_total", self.desc, labels=self.labels
+ )
metrics_by_key = {}
@@ -179,7 +175,9 @@ class InFlightGauge(object):
yield in_flight
for name in self.sub_metrics:
- gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+ gauge = GaugeMetricFamily(
+ "_".join([self.name, name]), "", labels=self.labels
+ )
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
@@ -193,12 +191,75 @@ class InFlightGauge(object):
all_gauges[self.name] = self
+@attr.s(hash=True)
+class BucketCollector(object):
+ """
+ Like a Histogram, but allows buckets to be point-in-time instead of
+ incrementally added to.
+
+ Args:
+ name (str): Base name of metric to be exported to Prometheus.
+ data_collector (callable -> dict): A synchronous callable that
+ returns a dict mapping bucket to number of items in the
+ bucket. If these buckets are not the same as the buckets
+ given to this class, they will be remapped into them.
+ buckets (list[float]): List of floats/ints of the buckets to
+ give to Prometheus. +Inf is ignored, if given.
+
+ """
+
+ name = attr.ib()
+ data_collector = attr.ib()
+ buckets = attr.ib()
+
+ def collect(self):
+
+ # Fetch the data -- this must be synchronous!
+ data = self.data_collector()
+
+ buckets = {}
+
+ res = []
+ for x in data.keys():
+ for i, bound in enumerate(self.buckets):
+ if x <= bound:
+ buckets[bound] = buckets.get(bound, 0) + data[x]
+ break
+
+ for i in self.buckets:
+ res.append([i, buckets.get(i, 0)])
+
+ res.append(["+Inf", sum(data.values())])
+
+ metric = HistogramMetricFamily(
+ self.name,
+ "",
+ buckets=res,
+ sum_value=sum([x * y for x, y in data.items()]),
+ )
+ yield metric
+
+ def __attrs_post_init__(self):
+ self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
+ if self.buckets != sorted(self.buckets):
+ raise ValueError("Buckets not sorted")
+
+ self.buckets = tuple(self.buckets)
+
+ if self.name in all_gauges.keys():
+ logger.warning("%s already registered, reregistering" % (self.name,))
+ REGISTRY.unregister(all_gauges.pop(self.name))
+
+ REGISTRY.register(self)
+ all_gauges[self.name] = self
+
+
#
# Detailed CPU metrics
#
-class CPUMetrics(object):
+class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
@@ -237,13 +298,28 @@ gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
- buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
- 5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+ buckets=[
+ 0.0025,
+ 0.005,
+ 0.01,
+ 0.025,
+ 0.05,
+ 0.10,
+ 0.25,
+ 0.50,
+ 1.00,
+ 2.50,
+ 5.00,
+ 7.50,
+ 15.00,
+ 30.00,
+ 45.00,
+ 60.00,
+ ],
)
class GCCounts(object):
-
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
@@ -279,9 +355,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
event_processing_loop_counter = Counter(
- "synapse_event_processing_loop_count",
- "Event processing loop iterations",
- ["name"],
+ "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
)
event_processing_loop_room_count = Counter(
@@ -311,7 +385,6 @@ last_ticked = time.time()
class ReactorLastSeenMetric(object):
-
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
@@ -325,7 +398,6 @@ REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func):
-
@functools.wraps(func)
def f(*args, **kwargs):
now = reactor.seconds()
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..1578403f79 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,7 +17,7 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,38 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
+ # Collect metrics on the number of forward extremities that exist.
+ self._current_forward_extremities_amount = {}
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+ )
+
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
@@ -568,17 +594,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(
- r[0]
- for r in txn
- if not json.loads(r[1]).get("soft_failed")
- )
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events_which_are_prevs_txn,
- chunk,
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
defer.returnValue(results)
@@ -640,9 +660,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_prevs_before_rejected",
- _get_prevs_before_rejected_txn,
- chunk,
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
defer.returnValue(existing_prevs)
|