diff --git a/changelog.d/5384.feature b/changelog.d/5384.feature
new file mode 100644
index 0000000000..9497f521c8
--- /dev/null
+++ b/changelog.d/5384.feature
@@ -0,0 +1 @@
+Statistics on forward extremities per room are now exposed via Prometheus.
diff --git a/scripts/generate_signing_key.py b/scripts/generate_signing_key.py
index ba3ba97395..36e9140b50 100755
--- a/scripts/generate_signing_key.py
+++ b/scripts/generate_signing_key.py
@@ -16,7 +16,7 @@
import argparse
import sys
-from signedjson.key import write_signing_keys, generate_signing_key
+from signedjson.key import generate_signing_key, write_signing_keys
from synapse.util.stringutils import random_string
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index ef48984fdd..539c353528 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -25,7 +25,7 @@ import six
import attr
from prometheus_client import Counter, Gauge, Histogram
-from prometheus_client.core import REGISTRY, GaugeMetricFamily
+from prometheus_client.core import REGISTRY, GaugeMetricFamily, HistogramMetricFamily
from twisted.internet import reactor
@@ -40,7 +40,6 @@ HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
class RegistryProxy(object):
-
@staticmethod
def collect():
for metric in REGISTRY.collect():
@@ -63,10 +62,7 @@ class LaterGauge(object):
try:
calls = self.caller()
except Exception:
- logger.exception(
- "Exception running callback for LaterGauge(%s)",
- self.name,
- )
+ logger.exception("Exception running callback for LaterGauge(%s)", self.name)
yield g
return
@@ -116,9 +112,7 @@ class InFlightGauge(object):
# Create a class which have the sub_metrics values as attributes, which
# default to 0 on initialization. Used to pass to registered callbacks.
self._metrics_class = attr.make_class(
- "_MetricsEntry",
- attrs={x: attr.ib(0) for x in sub_metrics},
- slots=True,
+ "_MetricsEntry", attrs={x: attr.ib(0) for x in sub_metrics}, slots=True
)
# Counts number of in flight blocks for a given set of label values
@@ -157,7 +151,9 @@ class InFlightGauge(object):
Note: may be called by a separate thread.
"""
- in_flight = GaugeMetricFamily(self.name + "_total", self.desc, labels=self.labels)
+ in_flight = GaugeMetricFamily(
+ self.name + "_total", self.desc, labels=self.labels
+ )
metrics_by_key = {}
@@ -179,7 +175,9 @@ class InFlightGauge(object):
yield in_flight
for name in self.sub_metrics:
- gauge = GaugeMetricFamily("_".join([self.name, name]), "", labels=self.labels)
+ gauge = GaugeMetricFamily(
+ "_".join([self.name, name]), "", labels=self.labels
+ )
for key, metrics in six.iteritems(metrics_by_key):
gauge.add_metric(key, getattr(metrics, name))
yield gauge
@@ -193,12 +191,75 @@ class InFlightGauge(object):
all_gauges[self.name] = self
+@attr.s(hash=True)
+class BucketCollector(object):
+ """
+ Like a Histogram, but allows buckets to be point-in-time instead of
+ incrementally added to.
+
+ Args:
+ name (str): Base name of metric to be exported to Prometheus.
+ data_collector (callable -> dict): A synchronous callable that
+ returns a dict mapping bucket to number of items in the
+ bucket. If these buckets are not the same as the buckets
+ given to this class, they will be remapped into them.
+ buckets (list[float]): List of floats/ints of the buckets to
+ give to Prometheus. +Inf is ignored, if given.
+
+ """
+
+ name = attr.ib()
+ data_collector = attr.ib()
+ buckets = attr.ib()
+
+ def collect(self):
+
+ # Fetch the data -- this must be synchronous!
+ data = self.data_collector()
+
+ buckets = {}
+
+ res = []
+ for x in data.keys():
+ for i, bound in enumerate(self.buckets):
+ if x <= bound:
+ buckets[bound] = buckets.get(bound, 0) + data[x]
+ break
+
+ for i in self.buckets:
+ res.append([i, buckets.get(i, 0)])
+
+ res.append(["+Inf", sum(data.values())])
+
+ metric = HistogramMetricFamily(
+ self.name,
+ "",
+ buckets=res,
+ sum_value=sum([x * y for x, y in data.items()]),
+ )
+ yield metric
+
+ def __attrs_post_init__(self):
+ self.buckets = [float(x) for x in self.buckets if x != "+Inf"]
+ if self.buckets != sorted(self.buckets):
+ raise ValueError("Buckets not sorted")
+
+ self.buckets = tuple(self.buckets)
+
+ if self.name in all_gauges.keys():
+ logger.warning("%s already registered, reregistering" % (self.name,))
+ REGISTRY.unregister(all_gauges.pop(self.name))
+
+ REGISTRY.register(self)
+ all_gauges[self.name] = self
+
+
#
# Detailed CPU metrics
#
-class CPUMetrics(object):
+class CPUMetrics(object):
def __init__(self):
ticks_per_sec = 100
try:
@@ -237,13 +298,28 @@ gc_time = Histogram(
"python_gc_time",
"Time taken to GC (sec)",
["gen"],
- buckets=[0.0025, 0.005, 0.01, 0.025, 0.05, 0.10, 0.25, 0.50, 1.00, 2.50,
- 5.00, 7.50, 15.00, 30.00, 45.00, 60.00],
+ buckets=[
+ 0.0025,
+ 0.005,
+ 0.01,
+ 0.025,
+ 0.05,
+ 0.10,
+ 0.25,
+ 0.50,
+ 1.00,
+ 2.50,
+ 5.00,
+ 7.50,
+ 15.00,
+ 30.00,
+ 45.00,
+ 60.00,
+ ],
)
class GCCounts(object):
-
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
@@ -279,9 +355,7 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
events_processed_counter = Counter("synapse_federation_client_events_processed", "")
event_processing_loop_counter = Counter(
- "synapse_event_processing_loop_count",
- "Event processing loop iterations",
- ["name"],
+ "synapse_event_processing_loop_count", "Event processing loop iterations", ["name"]
)
event_processing_loop_room_count = Counter(
@@ -311,7 +385,6 @@ last_ticked = time.time()
class ReactorLastSeenMetric(object):
-
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
@@ -325,7 +398,6 @@ REGISTRY.register(ReactorLastSeenMetric())
def runUntilCurrentTimer(func):
-
@functools.wraps(func)
def f(*args, **kwargs):
now = reactor.seconds()
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..1578403f79 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,7 +17,7 @@
import itertools
import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
from functools import wraps
from six import iteritems, text_type
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
from synapse.api.errors import SynapseError
from synapse.events import EventBase # noqa: F401
from synapse.events.snapshot import EventContext # noqa: F401
+from synapse.metrics import BucketCollector
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.state import StateResolutionStore
from synapse.storage.background_updates import BackgroundUpdateStore
@@ -220,13 +221,38 @@ class EventsStore(
EventsWorkerStore,
BackgroundUpdateStore,
):
-
def __init__(self, db_conn, hs):
super(EventsStore, self).__init__(db_conn, hs)
self._event_persist_queue = _EventPeristenceQueue()
self._state_resolution_handler = hs.get_state_resolution_handler()
+ # Collect metrics on the number of forward extremities that exist.
+ self._current_forward_extremities_amount = {}
+
+ BucketCollector(
+ "synapse_forward_extremities",
+ lambda: self._current_forward_extremities_amount,
+ buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"]
+ )
+
+ # Read the extrems every 60 minutes
+ hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+ @defer.inlineCallbacks
+ def _read_forward_extremities(self):
+ def fetch(txn):
+ txn.execute(
+ """
+ select count(*) c from event_forward_extremities
+ group by room_id
+ """
+ )
+ return txn.fetchall()
+
+ res = yield self.runInteraction("read_forward_extremities", fetch)
+ self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
@defer.inlineCallbacks
def persist_events(self, events_and_contexts, backfilled=False):
"""
@@ -568,17 +594,11 @@ class EventsStore(
)
txn.execute(sql, batch)
- results.extend(
- r[0]
- for r in txn
- if not json.loads(r[1]).get("soft_failed")
- )
+ results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_events_which_are_prevs",
- _get_events_which_are_prevs_txn,
- chunk,
+ "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
)
defer.returnValue(results)
@@ -640,9 +660,7 @@ class EventsStore(
for chunk in batch_iter(event_ids, 100):
yield self.runInteraction(
- "_get_prevs_before_rejected",
- _get_prevs_before_rejected_txn,
- chunk,
+ "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
)
defer.returnValue(existing_prevs)
diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py
index 6aa8b8b3c6..f4c81ef77d 100644
--- a/tests/storage/test_cleanup_extrems.py
+++ b/tests/storage/test_cleanup_extrems.py
@@ -15,7 +15,6 @@
import os.path
-from synapse.api.constants import EventTypes
from synapse.storage import prepare_database
from synapse.types import Requester, UserID
@@ -23,17 +22,12 @@ from tests.unittest import HomeserverTestCase
class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
- """Test the background update to clean forward extremities table.
"""
- def make_homeserver(self, reactor, clock):
- # Hack until we understand why test_forked_graph_cleanup fails with v4
- config = self.default_config()
- config['default_room_version'] = '1'
- return self.setup_test_homeserver(config=config)
+ Test the background update to clean forward extremities table.
+ """
def prepare(self, reactor, clock, homeserver):
self.store = homeserver.get_datastore()
- self.event_creator = homeserver.get_event_creation_handler()
self.room_creator = homeserver.get_room_creation_handler()
# Create a test user and room
@@ -42,56 +36,6 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
info = self.get_success(self.room_creator.create_room(self.requester, {}))
self.room_id = info["room_id"]
- def create_and_send_event(self, soft_failed=False, prev_event_ids=None):
- """Create and send an event.
-
- Args:
- soft_failed (bool): Whether to create a soft failed event or not
- prev_event_ids (list[str]|None): Explicitly set the prev events,
- or if None just use the default
-
- Returns:
- str: The new event's ID.
- """
- prev_events_and_hashes = None
- if prev_event_ids:
- prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
-
- event, context = self.get_success(
- self.event_creator.create_event(
- self.requester,
- {
- "type": EventTypes.Message,
- "room_id": self.room_id,
- "sender": self.user.to_string(),
- "content": {"body": "", "msgtype": "m.text"},
- },
- prev_events_and_hashes=prev_events_and_hashes,
- )
- )
-
- if soft_failed:
- event.internal_metadata.soft_failed = True
-
- self.get_success(
- self.event_creator.send_nonmember_event(self.requester, event, context)
- )
-
- return event.event_id
-
- def add_extremity(self, event_id):
- """Add the given event as an extremity to the room.
- """
- self.get_success(
- self.store._simple_insert(
- table="event_forward_extremities",
- values={"room_id": self.room_id, "event_id": event_id},
- desc="test_add_extremity",
- )
- )
-
- self.store.get_latest_event_ids_in_room.invalidate((self.room_id,))
-
def run_background_update(self):
"""Re run the background update to clean up the extremities.
"""
@@ -131,10 +75,16 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
"""
# Create the room graph
- event_id_1 = self.create_and_send_event()
- event_id_2 = self.create_and_send_event(True, [event_id_1])
- event_id_3 = self.create_and_send_event(True, [event_id_2])
- event_id_4 = self.create_and_send_event(False, [event_id_3])
+ event_id_1 = self.create_and_send_event(self.room_id, self.user)
+ event_id_2 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_1]
+ )
+ event_id_3 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_2]
+ )
+ event_id_4 = self.create_and_send_event(
+ self.room_id, self.user, False, [event_id_3]
+ )
# Check the latest events are as expected
latest_event_ids = self.get_success(
@@ -154,12 +104,16 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
Where SF* are soft failed, and with extremities of A and B
"""
# Create the room graph
- event_id_a = self.create_and_send_event()
- event_id_sf1 = self.create_and_send_event(True, [event_id_a])
- event_id_b = self.create_and_send_event(False, [event_id_sf1])
+ event_id_a = self.create_and_send_event(self.room_id, self.user)
+ event_id_sf1 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_a]
+ )
+ event_id_b = self.create_and_send_event(
+ self.room_id, self.user, False, [event_id_sf1]
+ )
# Add the new extremity and check the latest events are as expected
- self.add_extremity(event_id_a)
+ self.add_extremity(self.room_id, event_id_a)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
@@ -185,13 +139,19 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
Where SF* are soft failed, and with extremities of A and B
"""
# Create the room graph
- event_id_a = self.create_and_send_event()
- event_id_sf1 = self.create_and_send_event(True, [event_id_a])
- event_id_sf2 = self.create_and_send_event(True, [event_id_sf1])
- event_id_b = self.create_and_send_event(False, [event_id_sf2])
+ event_id_a = self.create_and_send_event(self.room_id, self.user)
+ event_id_sf1 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_a]
+ )
+ event_id_sf2 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_sf1]
+ )
+ event_id_b = self.create_and_send_event(
+ self.room_id, self.user, False, [event_id_sf2]
+ )
# Add the new extremity and check the latest events are as expected
- self.add_extremity(event_id_a)
+ self.add_extremity(self.room_id, event_id_a)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
@@ -227,16 +187,26 @@ class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase):
"""
# Create the room graph
- event_id_a = self.create_and_send_event()
- event_id_b = self.create_and_send_event()
- event_id_sf1 = self.create_and_send_event(True, [event_id_a])
- event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b])
- event_id_sf3 = self.create_and_send_event(True, [event_id_sf1])
- self.create_and_send_event(True, [event_id_sf2, event_id_sf3]) # SF4
- event_id_c = self.create_and_send_event(False, [event_id_sf3])
+ event_id_a = self.create_and_send_event(self.room_id, self.user)
+ event_id_b = self.create_and_send_event(self.room_id, self.user)
+ event_id_sf1 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_a]
+ )
+ event_id_sf2 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_a, event_id_b]
+ )
+ event_id_sf3 = self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_sf1]
+ )
+ self.create_and_send_event(
+ self.room_id, self.user, True, [event_id_sf2, event_id_sf3]
+ ) # SF4
+ event_id_c = self.create_and_send_event(
+ self.room_id, self.user, False, [event_id_sf3]
+ )
# Add the new extremity and check the latest events are as expected
- self.add_extremity(event_id_a)
+ self.add_extremity(self.room_id, event_id_a)
latest_event_ids = self.get_success(
self.store.get_latest_event_ids_in_room(self.room_id)
diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py
new file mode 100644
index 0000000000..20a068f1fc
--- /dev/null
+++ b/tests/storage/test_event_metrics.py
@@ -0,0 +1,97 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the 'License');
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an 'AS IS' BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.metrics import REGISTRY
+from synapse.types import Requester, UserID
+
+from tests.unittest import HomeserverTestCase
+
+
+class ExtremStatisticsTestCase(HomeserverTestCase):
+ def test_exposed_to_prometheus(self):
+ """
+ Forward extremity counts are exposed via Prometheus.
+ """
+ room_creator = self.hs.get_room_creation_handler()
+
+ user = UserID("alice", "test")
+ requester = Requester(user, None, False, None, None)
+
+ # Real events, forward extremities
+ events = [(3, 2), (6, 2), (4, 6)]
+
+ for event_count, extrems in events:
+ info = self.get_success(room_creator.create_room(requester, {}))
+ room_id = info["room_id"]
+
+ last_event = None
+
+ # Make a real event chain
+ for i in range(event_count):
+ ev = self.create_and_send_event(room_id, user, False, last_event)
+ last_event = [ev]
+
+ # Sprinkle in some extremities
+ for i in range(extrems):
+ ev = self.create_and_send_event(room_id, user, False, last_event)
+
+ # Let it run for a while, then pull out the statistics from the
+ # Prometheus client registry
+ self.reactor.advance(60 * 60 * 1000)
+ self.pump(1)
+
+ items = list(
+ filter(
+ lambda x: x.name == "synapse_forward_extremities",
+ list(REGISTRY.collect()),
+ )
+ )
+
+ # Check the values are what we want
+ buckets = {}
+ _count = 0
+ _sum = 0
+
+ for i in items[0].samples:
+ if i[0].endswith("_bucket"):
+ buckets[i[1]['le']] = i[2]
+ elif i[0].endswith("_count"):
+ _count = i[2]
+ elif i[0].endswith("_sum"):
+ _sum = i[2]
+
+ # 3 buckets, 2 with 2 extrems, 1 with 6 extrems (bucketed as 7), and
+ # +Inf which is all
+ self.assertEqual(
+ buckets,
+ {
+ 1.0: 0,
+ 2.0: 2,
+ 3.0: 0,
+ 5.0: 0,
+ 7.0: 1,
+ 10.0: 0,
+ 15.0: 0,
+ 20.0: 0,
+ 50.0: 0,
+ 100.0: 0,
+ 200.0: 0,
+ 500.0: 0,
+ "+Inf": 3,
+ },
+ )
+ # 3 rooms, with 10 total events
+ self.assertEqual(_count, 3)
+ self.assertEqual(_sum, 10)
diff --git a/tests/unittest.py b/tests/unittest.py
index 7dbb64af59..b6dc7932ce 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -27,11 +27,12 @@ import twisted.logger
from twisted.internet.defer import Deferred
from twisted.trial import unittest
+from synapse.api.constants import EventTypes
from synapse.config.homeserver import HomeServerConfig
from synapse.http.server import JsonResource
from synapse.http.site import SynapseRequest
from synapse.server import HomeServer
-from synapse.types import UserID, create_requester
+from synapse.types import Requester, UserID, create_requester
from synapse.util.logcontext import LoggingContext
from tests.server import get_clock, make_request, render, setup_test_homeserver
@@ -442,6 +443,64 @@ class HomeserverTestCase(TestCase):
access_token = channel.json_body["access_token"]
return access_token
+ def create_and_send_event(
+ self, room_id, user, soft_failed=False, prev_event_ids=None
+ ):
+ """
+ Create and send an event.
+
+ Args:
+ soft_failed (bool): Whether to create a soft failed event or not
+ prev_event_ids (list[str]|None): Explicitly set the prev events,
+ or if None just use the default
+
+ Returns:
+ str: The new event's ID.
+ """
+ event_creator = self.hs.get_event_creation_handler()
+ secrets = self.hs.get_secrets()
+ requester = Requester(user, None, False, None, None)
+
+ prev_events_and_hashes = None
+ if prev_event_ids:
+ prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids]
+
+ event, context = self.get_success(
+ event_creator.create_event(
+ requester,
+ {
+ "type": EventTypes.Message,
+ "room_id": room_id,
+ "sender": user.to_string(),
+ "content": {"body": secrets.token_hex(), "msgtype": "m.text"},
+ },
+ prev_events_and_hashes=prev_events_and_hashes,
+ )
+ )
+
+ if soft_failed:
+ event.internal_metadata.soft_failed = True
+
+ self.get_success(
+ event_creator.send_nonmember_event(requester, event, context)
+ )
+
+ return event.event_id
+
+ def add_extremity(self, room_id, event_id):
+ """
+ Add the given event as an extremity to the room.
+ """
+ self.get_success(
+ self.hs.get_datastore()._simple_insert(
+ table="event_forward_extremities",
+ values={"room_id": room_id, "event_id": event_id},
+ desc="test_add_extremity",
+ )
+ )
+
+ self.hs.get_datastore().get_latest_event_ids_in_room.invalidate((room_id,))
+
def attempt_wrong_password_login(self, username, password):
"""Attempts to login as the user with the given password, asserting
that the attempt *fails*.
|