summary refs log tree commit diff
path: root/synapse/storage/events.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/events.py')
-rw-r--r--synapse/storage/events.py71
1 files changed, 57 insertions, 14 deletions
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index f9162be9b9..fefba39ea1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -17,14 +17,14 @@
 
 import itertools
 import logging
-from collections import OrderedDict, deque, namedtuple
+from collections import Counter as c_counter, OrderedDict, deque, namedtuple
 from functools import wraps
 
 from six import iteritems, text_type
 from six.moves import range
 
 from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 
@@ -33,6 +33,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
+from synapse.metrics import BucketCollector
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.state import StateResolutionStore
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +74,21 @@ state_delta_reuse_delta_counter = Counter(
     "synapse_storage_events_state_delta_reuse_delta", ""
 )
 
+# The number of forward extremities for each new event.
+forward_extremities_counter = Histogram(
+    "synapse_storage_events_forward_extremities_persisted",
+    "Number of forward extremities for each new event",
+    buckets=(1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
+# The number of stale forward extremities for each new event. Stale extremities
+# are those that were in the previous set of extremities as well as the new.
+stale_forward_extremities_counter = Histogram(
+    "synapse_storage_events_stale_forward_extremities_persisted",
+    "Number of unchanged forward extremities for each new event",
+    buckets=(0, 1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"),
+)
+
 
 def encode_json(json_object):
     """
@@ -220,13 +236,39 @@ class EventsStore(
     EventsWorkerStore,
     BackgroundUpdateStore,
 ):
-
     def __init__(self, db_conn, hs):
         super(EventsStore, self).__init__(db_conn, hs)
 
         self._event_persist_queue = _EventPeristenceQueue()
         self._state_resolution_handler = hs.get_state_resolution_handler()
 
+        # Collect metrics on the number of forward extremities that exist.
+        # Counter of number of extremities to count
+        self._current_forward_extremities_amount = c_counter()
+
+        BucketCollector(
+            "synapse_forward_extremities",
+            lambda: self._current_forward_extremities_amount,
+            buckets=[1, 2, 3, 5, 7, 10, 15, 20, 50, 100, 200, 500, "+Inf"],
+        )
+
+        # Read the extrems every 60 minutes
+        hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000)
+
+    @defer.inlineCallbacks
+    def _read_forward_extremities(self):
+        def fetch(txn):
+            txn.execute(
+                """
+                select count(*) c from event_forward_extremities
+                group by room_id
+                """
+            )
+            return txn.fetchall()
+
+        res = yield self.runInteraction("read_forward_extremities", fetch)
+        self._current_forward_extremities_amount = c_counter(list(x[0] for x in res))
+
     @defer.inlineCallbacks
     def persist_events(self, events_and_contexts, backfilled=False):
         """
@@ -514,6 +556,8 @@ class EventsStore(
             and not event.internal_metadata.is_soft_failed()
         ]
 
+        latest_event_ids = set(latest_event_ids)
+
         # start with the existing forward extremities
         result = set(latest_event_ids)
 
@@ -537,6 +581,13 @@ class EventsStore(
         )
         result.difference_update(existing_prevs)
 
+        # We only update metrics for events that change forward extremities
+        # (e.g. we ignore backfill/outliers/etc)
+        if result != latest_event_ids:
+            forward_extremities_counter.observe(len(result))
+            stale = latest_event_ids & result
+            stale_forward_extremities_counter.observe(len(stale))
+
         defer.returnValue(result)
 
     @defer.inlineCallbacks
@@ -568,17 +619,11 @@ class EventsStore(
             )
 
             txn.execute(sql, batch)
-            results.extend(
-                r[0]
-                for r in txn
-                if not json.loads(r[1]).get("soft_failed")
-            )
+            results.extend(r[0] for r in txn if not json.loads(r[1]).get("soft_failed"))
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_events_which_are_prevs",
-                _get_events_which_are_prevs_txn,
-                chunk,
+                "_get_events_which_are_prevs", _get_events_which_are_prevs_txn, chunk
             )
 
         defer.returnValue(results)
@@ -640,9 +685,7 @@ class EventsStore(
 
         for chunk in batch_iter(event_ids, 100):
             yield self.runInteraction(
-                "_get_prevs_before_rejected",
-                _get_prevs_before_rejected_txn,
-                chunk,
+                "_get_prevs_before_rejected", _get_prevs_before_rejected_txn, chunk
             )
 
         defer.returnValue(existing_prevs)