diff options
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r-- | synapse/storage/events.py | 71 |
1 files changed, 57 insertions, 14 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py index f9162be9b9..fefba39ea1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -17,14 +17,14 @@ import itertools import logging -from collections import OrderedDict, deque, namedtuple +from collections import Counter as c_counter, OrderedDict, deque, namedtuple from functools import wraps from six import iteritems, text_type from six.moves import range from canonicaljson import json -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from twisted.internet import defer @@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore from synapse.storage.background_updates import BackgroundUpdateStore @@ -73,6 +74,21 @@ state_delta_reuse_delta_counter = Counter( "synapse_storage_events_state_delta_reuse_delta", "" ) +# The number of forward extremities for each new event. +forward_extremities_counter = Histogram( + "synapse_storage_events_forward_extremities_persisted", + "Number of forward extremities for each new event", + buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), +) + +# The number of stale forward extremities for each new event. Stale extremities +# are those that were in the previous set of extremities as well as the new. +stale_forward_extremities_counter = Histogram( + "synapse_storage_events_stale_forward_extremities_persisted", + "Number of unchanged forward extremities for each new event", + buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"), +) + def encode_json(json_object): """ @@ -220,13 +236,39 @@ class EventsStore( EventsWorkerStore, BackgroundUpdateStore, ): - def __init__(self, db_conn, hs): super(EventsStore, self).__init__(db_conn, hs) self._event_persist_queue = _EventPeristenceQueue() self._state_resolution_handler = hs.get_state_resolution_handler() + # Collect metrics on the number of forward extremities that exist. + # Counter of number of extremities to count + self._current_forward_extremities_amount = c_counter() + + BucketCollector( + "synapse_forward_extremities", + lambda: self._current_forward_extremities_amount, + buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"], + ) + + # Read the extrems every 60 minutes + hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000) + + @defer.inlineCallbacks + def _read_forward_extremities(self): + def fetch(txn): + txn.execute( + """ + select count(*) c from event_forward_extremities + group by room_id + """ + ) + return txn.fetchall() + + res = yield self.runInteraction("read_forward_extremities", fetch) + self._current_forward_extremities_amount = c_counter(list(x[0] for x in res)) + @defer.inlineCallbacks def persist_events(self, events_and_contexts, backfilled=False): """ @@ -514,6 +556,8 @@ class EventsStore( and not event.internal_metadata.is_soft_failed() ] + latest_event_ids = set(latest_event_ids) + # start with the existing forward extremities result = set(latest_event_ids) @@ -537,6 +581,13 @@ class EventsStore( ) result.difference_update(existing_prevs) + # We only update metrics for events that change forward extremities + # (e.g. we ignore backfill/outliers/etc) + if result != latest_event_ids: + forward_extremities_counter.observe(len(result)) + stale = latest_event_ids & result + stale_forward_extremities_counter.observe(len(stale)) + defer.returnValue(result) @defer.inlineCallbacks @@ -568,17 +619,11 @@ class EventsStore( ) txn.execute(sql, batch) - results.extend( - r[0] - for r in txn - if not json.loads(r[1]).get("soft_failed") - ) + results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed")) for chunk in batch_iter(event_ids, 100): yield self.runInteraction( - "_get_events_which_are_prevs", - _get_events_which_are_prevs_txn, - chunk, + "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk ) defer.returnValue(results) @@ -640,9 +685,7 @@ class EventsStore( for chunk in batch_iter(event_ids, 100): yield self.runInteraction( - "_get_prevs_before_rejected", - _get_prevs_before_rejected_txn, - chunk, + "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk ) defer.returnValue(existing_prevs) |