summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/main/event_federation.py39
1 files changed, 39 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f2d27ee893..08d75b0d41 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,6 +16,8 @@ import logging
 from queue import Empty, PriorityQueue
 from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
 
+from prometheus_client import Gauge
+
 from synapse.api.constants import MAX_DEPTH
 from synapse.api.errors import StoreError
 from synapse.api.room_versions import RoomVersion
@@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
 
+oldest_pdu_in_federation_staging = Gauge(
+    "synapse_federation_server_oldest_inbound_pdu_in_staging",
+    "The age in seconds since we received the oldest pdu in the federation staging area",
+)
+
+number_pdus_in_federation_queue = Gauge(
+    "synapse_federation_server_number_inbound_pdu_in_staging",
+    "The total number of events in the inbound federation staging",
+)
+
 logger = logging.getLogger(__name__)
 
 
@@ -54,6 +66,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             500000, "_event_auth_cache", size_callback=len
         )  # type: LruCache[str, List[Tuple[str, int]]]
 
+        self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
+
     async def get_auth_chain(
         self, room_id: str, event_ids: Collection[str], include_given: bool = False
     ) -> List[EventBase]:
@@ -1193,6 +1207,31 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return origin, event
 
+    @wrap_as_background_process("_get_stats_for_federation_staging")
+    async def _get_stats_for_federation_staging(self):
+        """Update the prometheus metrics for the inbound federation staging area."""
+
+        def _get_stats_for_federation_staging_txn(txn):
+            txn.execute(
+                "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging"
+            )
+            (count,) = txn.fetchone()
+
+            txn.execute(
+                "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
+            )
+
+            (age,) = txn.fetchone()
+
+            return count, age
+
+        count, age = await self.db_pool.runInteraction(
+            "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn
+        )
+
+        number_pdus_in_federation_queue.set(count)
+        oldest_pdu_in_federation_staging.set(age)
+
 
 class EventFederationStore(EventFederationWorkerStore):
     """Responsible for storing and serving up the various graphs associated