From a7bdf98d01d2225a479753a85ba81adf02b16a32 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 5 Aug 2020 21:38:57 +0100 Subject: Rename database classes to make some sense (#8033) --- synapse/storage/databases/main/metrics.py | 130 ++++++++++++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 synapse/storage/databases/main/metrics.py (limited to 'synapse/storage/databases/main/metrics.py') diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py new file mode 100644 index 0000000000..baa7a5092a --- /dev/null +++ b/synapse/storage/databases/main/metrics.py @@ -0,0 +1,130 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import typing +from collections import Counter + +from twisted.internet import defer + +from synapse.metrics import BucketCollector +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.storage._base import SQLBaseStore +from synapse.storage.database import DatabasePool +from synapse.storage.databases.main.event_push_actions import ( + EventPushActionsWorkerStore, +) + + +class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): + """Functions to pull various metrics from the DB, for e.g. phone home + stats and prometheus metrics. + """ + + def __init__(self, database: DatabasePool, db_conn, hs): + super().__init__(database, db_conn, hs) + + # Collect metrics on the number of forward extremities that exist. + # Counter of number of extremities to count + self._current_forward_extremities_amount = ( + Counter() + ) # type: typing.Counter[int] + + BucketCollector( + "synapse_forward_extremities", + lambda: self._current_forward_extremities_amount, + buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"], + ) + + # Read the extrems every 60 minutes + def read_forward_extremities(): + # run as a background process to make sure that the database transactions + # have a logcontext to report to + return run_as_background_process( + "read_forward_extremities", self._read_forward_extremities + ) + + hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) + + async def _read_forward_extremities(self): + def fetch(txn): + txn.execute( + """ + select count(*) c from event_forward_extremities + group by room_id + """ + ) + return txn.fetchall() + + res = await self.db_pool.runInteraction("read_forward_extremities", fetch) + self._current_forward_extremities_amount = Counter([x[0] for x in res]) + + @defer.inlineCallbacks + def count_daily_messages(self): + """ + Returns an estimate of the number of messages sent in the last day. + + If it has been significantly less or more than one day since the last + call to this function, it will return None. + """ + + def _count_messages(txn): + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + (count,) = txn.fetchone() + return count + + ret = yield self.db_pool.runInteraction("count_messages", _count_messages) + return ret + + @defer.inlineCallbacks + def count_daily_sent_messages(self): + def _count_messages(txn): + # This is good enough as if you have silly characters in your own + # hostname then thats your own fault. + like_clause = "%:" + self.hs.hostname + + sql = """ + SELECT COALESCE(COUNT(*), 0) FROM events + WHERE type = 'm.room.message' + AND sender LIKE ? + AND stream_ordering > ? + """ + + txn.execute(sql, (like_clause, self.stream_ordering_day_ago)) + (count,) = txn.fetchone() + return count + + ret = yield self.db_pool.runInteraction( + "count_daily_sent_messages", _count_messages + ) + return ret + + @defer.inlineCallbacks + def count_daily_active_rooms(self): + def _count(txn): + sql = """ + SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events + WHERE type = 'm.room.message' + AND stream_ordering > ? + """ + txn.execute(sql, (self.stream_ordering_day_ago,)) + (count,) = txn.fetchone() + return count + + ret = yield self.db_pool.runInteraction("count_daily_active_rooms", _count) + return ret -- cgit 1.5.1 From 04faa0bfa960d9f0dc60e9cf4ec270221249b7ca Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Tue, 11 Aug 2020 17:21:20 -0400 Subject: Convert tags and metrics databases to async/await (#8062) --- changelog.d/8062.misc | 1 + synapse/storage/databases/main/metrics.py | 20 ++-- synapse/storage/databases/main/tags.py | 103 +++++++++++---------- .../test_resource_limits_server_notices.py | 5 +- 4 files changed, 64 insertions(+), 65 deletions(-) create mode 100644 changelog.d/8062.misc (limited to 'synapse/storage/databases/main/metrics.py') diff --git a/changelog.d/8062.misc b/changelog.d/8062.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8062.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index baa7a5092a..686052bd83 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -15,8 +15,6 @@ import typing from collections import Counter -from twisted.internet import defer - from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore @@ -69,8 +67,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): res = await self.db_pool.runInteraction("read_forward_extremities", fetch) self._current_forward_extremities_amount = Counter([x[0] for x in res]) - @defer.inlineCallbacks - def count_daily_messages(self): + async def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. @@ -88,11 +85,9 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): (count,) = txn.fetchone() return count - ret = yield self.db_pool.runInteraction("count_messages", _count_messages) - return ret + return await self.db_pool.runInteraction("count_messages", _count_messages) - @defer.inlineCallbacks - def count_daily_sent_messages(self): + async def count_daily_sent_messages(self): def _count_messages(txn): # This is good enough as if you have silly characters in your own # hostname then thats your own fault. @@ -109,13 +104,11 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): (count,) = txn.fetchone() return count - ret = yield self.db_pool.runInteraction( + return await self.db_pool.runInteraction( "count_daily_sent_messages", _count_messages ) - return ret - @defer.inlineCallbacks - def count_daily_active_rooms(self): + async def count_daily_active_rooms(self): def _count(txn): sql = """ SELECT COALESCE(COUNT(DISTINCT room_id), 0) FROM events @@ -126,5 +119,4 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): (count,) = txn.fetchone() return count - ret = yield self.db_pool.runInteraction("count_daily_active_rooms", _count) - return ret + return await self.db_pool.runInteraction("count_daily_active_rooms", _count) diff --git a/synapse/storage/databases/main/tags.py b/synapse/storage/databases/main/tags.py index eedd2d96c3..e4e0a0c433 100644 --- a/synapse/storage/databases/main/tags.py +++ b/synapse/storage/databases/main/tags.py @@ -15,14 +15,13 @@ # limitations under the License. import logging -from typing import List, Tuple +from typing import Dict, List, Tuple from canonicaljson import json -from twisted.internet import defer - from synapse.storage._base import db_to_json from synapse.storage.databases.main.account_data import AccountDataWorkerStore +from synapse.types import JsonDict from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -30,30 +29,26 @@ logger = logging.getLogger(__name__) class TagsWorkerStore(AccountDataWorkerStore): @cached() - def get_tags_for_user(self, user_id): + async def get_tags_for_user(self, user_id: str) -> Dict[str, Dict[str, JsonDict]]: """Get all the tags for a user. Args: - user_id(str): The user to get the tags for. + user_id: The user to get the tags for. Returns: - A deferred dict mapping from room_id strings to dicts mapping from - tag strings to tag content. + A mapping from room_id strings to dicts mapping from tag strings to + tag content. """ - deferred = self.db_pool.simple_select_list( + rows = await self.db_pool.simple_select_list( "room_tags", {"user_id": user_id}, ["room_id", "tag", "content"] ) - @deferred.addCallback - def tags_by_room(rows): - tags_by_room = {} - for row in rows: - room_tags = tags_by_room.setdefault(row["room_id"], {}) - room_tags[row["tag"]] = db_to_json(row["content"]) - return tags_by_room - - return deferred + tags_by_room = {} + for row in rows: + room_tags = tags_by_room.setdefault(row["room_id"], {}) + room_tags[row["tag"]] = db_to_json(row["content"]) + return tags_by_room async def get_all_updated_tags( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -127,17 +122,19 @@ class TagsWorkerStore(AccountDataWorkerStore): return results, upto_token, limited - @defer.inlineCallbacks - def get_updated_tags(self, user_id, stream_id): + async def get_updated_tags( + self, user_id: str, stream_id: int + ) -> Dict[str, List[str]]: """Get all the tags for the rooms where the tags have changed since the given version Args: user_id(str): The user to get the tags for. stream_id(int): The earliest update to get for the user. + Returns: - A deferred dict mapping from room_id strings to lists of tag - strings for all the rooms that changed since the stream_id token. + A mapping from room_id strings to lists of tag strings for all the + rooms that changed since the stream_id token. """ def get_updated_tags_txn(txn): @@ -155,47 +152,53 @@ class TagsWorkerStore(AccountDataWorkerStore): if not changed: return {} - room_ids = yield self.db_pool.runInteraction( + room_ids = await self.db_pool.runInteraction( "get_updated_tags", get_updated_tags_txn ) results = {} if room_ids: - tags_by_room = yield self.get_tags_for_user(user_id) + tags_by_room = await self.get_tags_for_user(user_id) for room_id in room_ids: results[room_id] = tags_by_room.get(room_id, {}) return results - def get_tags_for_room(self, user_id, room_id): + async def get_tags_for_room( + self, user_id: str, room_id: str + ) -> Dict[str, JsonDict]: """Get all the tags for the given room + Args: - user_id(str): The user to get tags for - room_id(str): The room to get tags for + user_id: The user to get tags for + room_id: The room to get tags for + Returns: - A deferred list of string tags. + A mapping of tags to tag content. """ - return self.db_pool.simple_select_list( + rows = await self.db_pool.simple_select_list( table="room_tags", keyvalues={"user_id": user_id, "room_id": room_id}, retcols=("tag", "content"), desc="get_tags_for_room", - ).addCallback( - lambda rows: {row["tag"]: db_to_json(row["content"]) for row in rows} ) + return {row["tag"]: db_to_json(row["content"]) for row in rows} class TagsStore(TagsWorkerStore): - @defer.inlineCallbacks - def add_tag_to_room(self, user_id, room_id, tag, content): + async def add_tag_to_room( + self, user_id: str, room_id: str, tag: str, content: JsonDict + ) -> int: """Add a tag to a room for a user. + Args: - user_id(str): The user to add a tag for. - room_id(str): The room to add a tag for. - tag(str): The tag name to add. - content(dict): A json object to associate with the tag. + user_id: The user to add a tag for. + room_id: The room to add a tag for. + tag: The tag name to add. + content: A json object to associate with the tag. + Returns: - A deferred that completes once the tag has been added. + The next account data ID. """ content_json = json.dumps(content) @@ -209,18 +212,17 @@ class TagsStore(TagsWorkerStore): self._update_revision_txn(txn, user_id, room_id, next_id) with self._account_data_id_gen.get_next() as next_id: - yield self.db_pool.runInteraction("add_tag", add_tag_txn, next_id) + await self.db_pool.runInteraction("add_tag", add_tag_txn, next_id) self.get_tags_for_user.invalidate((user_id,)) - result = self._account_data_id_gen.get_current_token() - return result + return self._account_data_id_gen.get_current_token() - @defer.inlineCallbacks - def remove_tag_from_room(self, user_id, room_id, tag): + async def remove_tag_from_room(self, user_id: str, room_id: str, tag: str) -> int: """Remove a tag from a room for a user. + Returns: - A deferred that completes once the tag has been removed + The next account data ID. """ def remove_tag_txn(txn, next_id): @@ -232,21 +234,22 @@ class TagsStore(TagsWorkerStore): self._update_revision_txn(txn, user_id, room_id, next_id) with self._account_data_id_gen.get_next() as next_id: - yield self.db_pool.runInteraction("remove_tag", remove_tag_txn, next_id) + await self.db_pool.runInteraction("remove_tag", remove_tag_txn, next_id) self.get_tags_for_user.invalidate((user_id,)) - result = self._account_data_id_gen.get_current_token() - return result + return self._account_data_id_gen.get_current_token() - def _update_revision_txn(self, txn, user_id, room_id, next_id): + def _update_revision_txn( + self, txn, user_id: str, room_id: str, next_id: int + ) -> None: """Update the latest revision of the tags for the given user and room. Args: txn: The database cursor - user_id(str): The ID of the user. - room_id(str): The ID of the room. - next_id(int): The the revision to advance to. + user_id: The ID of the user. + room_id: The ID of the room. + next_id: The the revision to advance to. """ txn.call_after( diff --git a/tests/server_notices/test_resource_limits_server_notices.py b/tests/server_notices/test_resource_limits_server_notices.py index 3f88abe3d2..2858d13558 100644 --- a/tests/server_notices/test_resource_limits_server_notices.py +++ b/tests/server_notices/test_resource_limits_server_notices.py @@ -27,6 +27,7 @@ from synapse.server_notices.resource_limits_server_notices import ( ) from tests import unittest +from tests.test_utils import make_awaitable from tests.unittest import override_config from tests.utils import default_config @@ -79,7 +80,9 @@ class TestResourceLimitsServerNotices(unittest.HomeserverTestCase): return_value=defer.succeed("!something:localhost") ) self._rlsn._store.add_tag_to_room = Mock(return_value=defer.succeed(None)) - self._rlsn._store.get_tags_for_room = Mock(return_value=defer.succeed({})) + self._rlsn._store.get_tags_for_room = Mock( + side_effect=lambda user_id, room_id: make_awaitable({}) + ) @override_config({"hs_disabled": True}) def test_maybe_send_server_notice_disabled_hs(self): -- cgit 1.5.1 From 6d2d42f8fb04599713d3e6e7fc3bc4c9b7063c9a Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 29 Sep 2020 22:26:28 +0100 Subject: Rewrite BucketCollector This was a bit unweildy for what I wanted: in particular, I wanted to assign each measurement straight into a bucket, rather than storing an intermediate Counter which didn't do any bucketing at all. I've replaced it with something that is hopefully a bit easier to use. (I'm not entirely sure what the difference between a HistogramMetricFamily and a GaugeHistogramMetricFamily is, but given our counters can go down as well as up the latter *sounds* more accurate?) --- synapse/metrics/__init__.py | 115 ++++++++++++++++++------------ synapse/storage/databases/main/metrics.py | 26 +++---- tests/storage/test_event_metrics.py | 19 ++--- 3 files changed, 89 insertions(+), 71 deletions(-) (limited to 'synapse/storage/databases/main/metrics.py') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a1f7ca3449..b8d2a8e8a9 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -15,6 +15,7 @@ import functools import gc +import itertools import logging import os import platform @@ -27,8 +28,8 @@ from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import ( REGISTRY, CounterMetricFamily, + GaugeHistogramMetricFamily, GaugeMetricFamily, - HistogramMetricFamily, ) from twisted.internet import reactor @@ -46,7 +47,7 @@ logger = logging.getLogger(__name__) METRICS_PREFIX = "/_synapse/metrics" running_on_pypy = platform.python_implementation() == "PyPy" -all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollector]] +all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge]] HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") @@ -205,63 +206,83 @@ class InFlightGauge: all_gauges[self.name] = self -@attr.s(slots=True, hash=True) -class BucketCollector: - """ - Like a Histogram, but allows buckets to be point-in-time instead of - incrementally added to. +class GaugeBucketCollector: + """Like a Histogram, but the buckets are Gauges which are updated atomically. - Args: - name (str): Base name of metric to be exported to Prometheus. - data_collector (callable -> dict): A synchronous callable that - returns a dict mapping bucket to number of items in the - bucket. If these buckets are not the same as the buckets - given to this class, they will be remapped into them. - buckets (list[float]): List of floats/ints of the buckets to - give to Prometheus. +Inf is ignored, if given. + The data is updated by calling `update_data` with an iterable of measurements. + We assume that the data is updated less frequently than it is reported to + Prometheus, and optimise for that case. """ - name = attr.ib() - data_collector = attr.ib() - buckets = attr.ib() + __slots__ = ("_name", "_documentation", "_bucket_bounds", "_metric") - def collect(self): + def __init__( + self, + name: str, + documentation: str, + buckets: Iterable[float], + registry=REGISTRY, + ): + """ + Args: + name: base name of metric to be exported to Prometheus. (a _bucket suffix + will be added.) + documentation: help text for the metric + buckets: The top bounds of the buckets to report + registry: metric registry to register with + """ + self._name = name + self._documentation = documentation - # Fetch the data -- this must be synchronous! - data = self.data_collector() + # the tops of the buckets + self._bucket_bounds = [float(b) for b in buckets] + if self._bucket_bounds != sorted(self._bucket_bounds): + raise ValueError("Buckets not in sorted order") - buckets = {} # type: Dict[float, int] + if self._bucket_bounds[-1] != float("inf"): + self._bucket_bounds.append(float("inf")) - res = [] - for x in data.keys(): - for i, bound in enumerate(self.buckets): - if x <= bound: - buckets[bound] = buckets.get(bound, 0) + data[x] + self._metric = self._values_to_metric([]) + registry.register(self) - for i in self.buckets: - res.append([str(i), buckets.get(i, 0)]) + def collect(self): + yield self._metric - res.append(["+Inf", sum(data.values())]) + def update_data(self, values: Iterable[float]): + """Update the data to be reported by the metric - metric = HistogramMetricFamily( - self.name, "", buckets=res, sum_value=sum(x * y for x, y in data.items()) + The existing data is cleared, and each measurement in the input is assigned + to the relevant bucket. + """ + self._metric = self._values_to_metric(values) + + def _values_to_metric(self, values: Iterable[float]) -> GaugeHistogramMetricFamily: + total = 0.0 + bucket_values = [0 for _ in self._bucket_bounds] + + for v in values: + # assign each value to a bucket + for i, bound in enumerate(self._bucket_bounds): + if v <= bound: + bucket_values[i] += 1 + break + + # ... and increment the sum + total += v + + # now, aggregate the bucket values so that they count the number of entries in + # that bucket or below. + accumulated_values = itertools.accumulate(bucket_values) + + return GaugeHistogramMetricFamily( + self._name, + self._documentation, + buckets=list( + zip((str(b) for b in self._bucket_bounds), accumulated_values) + ), + gsum_value=total, ) - yield metric - - def __attrs_post_init__(self): - self.buckets = [float(x) for x in self.buckets if x != "+Inf"] - if self.buckets != sorted(self.buckets): - raise ValueError("Buckets not sorted") - - self.buckets = tuple(self.buckets) - - if self.name in all_gauges.keys(): - logger.warning("%s already registered, reregistering" % (self.name,)) - REGISTRY.unregister(all_gauges.pop(self.name)) - - REGISTRY.register(self) - all_gauges[self.name] = self # diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 686052bd83..4efc093b9e 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -12,10 +12,8 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import typing -from collections import Counter -from synapse.metrics import BucketCollector +from synapse.metrics import GaugeBucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool @@ -23,6 +21,14 @@ from synapse.storage.databases.main.event_push_actions import ( EventPushActionsWorkerStore, ) +# Collect metrics on the number of forward extremities that exist. +_extremities_collecter = GaugeBucketCollector( + "synapse_forward_extremities", + "Number of rooms on the server with the given number of forward extremities" + " or fewer", + buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500], +) + class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): """Functions to pull various metrics from the DB, for e.g. phone home @@ -32,18 +38,6 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) - # Collect metrics on the number of forward extremities that exist. - # Counter of number of extremities to count - self._current_forward_extremities_amount = ( - Counter() - ) # type: typing.Counter[int] - - BucketCollector( - "synapse_forward_extremities", - lambda: self._current_forward_extremities_amount, - buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"], - ) - # Read the extrems every 60 minutes def read_forward_extremities(): # run as a background process to make sure that the database transactions @@ -65,7 +59,7 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): return txn.fetchall() res = await self.db_pool.runInteraction("read_forward_extremities", fetch) - self._current_forward_extremities_amount = Counter([x[0] for x in res]) + _extremities_collecter.update_data(x[0] for x in res) async def count_daily_messages(self): """ diff --git a/tests/storage/test_event_metrics.py b/tests/storage/test_event_metrics.py index 949846fe33..3957471f3f 100644 --- a/tests/storage/test_event_metrics.py +++ b/tests/storage/test_event_metrics.py @@ -52,14 +52,14 @@ class ExtremStatisticsTestCase(HomeserverTestCase): self.reactor.advance(60 * 60 * 1000) self.pump(1) - items = set( + items = list( filter( lambda x: b"synapse_forward_extremities_" in x, - generate_latest(REGISTRY).split(b"\n"), + generate_latest(REGISTRY, emit_help=False).split(b"\n"), ) ) - expected = { + expected = [ b'synapse_forward_extremities_bucket{le="1.0"} 0.0', b'synapse_forward_extremities_bucket{le="2.0"} 2.0', b'synapse_forward_extremities_bucket{le="3.0"} 2.0', @@ -72,9 +72,12 @@ class ExtremStatisticsTestCase(HomeserverTestCase): b'synapse_forward_extremities_bucket{le="100.0"} 3.0', b'synapse_forward_extremities_bucket{le="200.0"} 3.0', b'synapse_forward_extremities_bucket{le="500.0"} 3.0', - b'synapse_forward_extremities_bucket{le="+Inf"} 3.0', - b"synapse_forward_extremities_count 3.0", - b"synapse_forward_extremities_sum 10.0", - } - + # per https://docs.google.com/document/d/1KwV0mAXwwbvvifBvDKH_LU1YjyXE_wxCkHNoCGq1GX0/edit#heading=h.wghdjzzh72j9, + # "inf" is valid: "this includes variants such as inf" + b'synapse_forward_extremities_bucket{le="inf"} 3.0', + b"# TYPE synapse_forward_extremities_gcount gauge", + b"synapse_forward_extremities_gcount 3.0", + b"# TYPE synapse_forward_extremities_gsum gauge", + b"synapse_forward_extremities_gsum 10.0", + ] self.assertEqual(items, expected) -- cgit 1.5.1 From 20e7c4de262746479000ec507b7a3c37f1779a60 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Tue, 29 Sep 2020 22:30:00 +0100 Subject: Add an improved "forward extremities" metric Hopefully, N(extremities) * N(state_events) is a more realistic approximation to "how big a problem is this room?". --- synapse/storage/databases/main/metrics.py | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) (limited to 'synapse/storage/databases/main/metrics.py') diff --git a/synapse/storage/databases/main/metrics.py b/synapse/storage/databases/main/metrics.py index 4efc093b9e..92099f95ce 100644 --- a/synapse/storage/databases/main/metrics.py +++ b/synapse/storage/databases/main/metrics.py @@ -29,6 +29,18 @@ _extremities_collecter = GaugeBucketCollector( buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500], ) +# we also expose metrics on the "number of excess extremity events", which is +# (E-1)*N, where E is the number of extremities and N is the number of state +# events in the room. This is an approximation to the number of state events +# we could remove from state resolution by reducing the graph to a single +# forward extremity. +_excess_state_events_collecter = GaugeBucketCollector( + "synapse_excess_extremity_events", + "Number of rooms on the server with the given number of excess extremity " + "events, or fewer", + buckets=[0] + [1 << n for n in range(12)], +) + class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): """Functions to pull various metrics from the DB, for e.g. phone home @@ -52,15 +64,26 @@ class ServerMetricsStore(EventPushActionsWorkerStore, SQLBaseStore): def fetch(txn): txn.execute( """ - select count(*) c from event_forward_extremities - group by room_id + SELECT t1.c, t2.c + FROM ( + SELECT room_id, COUNT(*) c FROM event_forward_extremities + GROUP BY room_id + ) t1 LEFT JOIN ( + SELECT room_id, COUNT(*) c FROM current_state_events + GROUP BY room_id + ) t2 ON t1.room_id = t2.room_id """ ) return txn.fetchall() res = await self.db_pool.runInteraction("read_forward_extremities", fetch) + _extremities_collecter.update_data(x[0] for x in res) + _excess_state_events_collecter.update_data( + (x[0] - 1) * x[1] for x in res if x[1] + ) + async def count_daily_messages(self): """ Returns an estimate of the number of messages sent in the last day. -- cgit 1.5.1