diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index f23f8c6ecf..c4474df975 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,6 +16,8 @@ import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
+from prometheus_client import Gauge
+
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
from synapse.api.room_versions import RoomVersion
@@ -32,6 +34,16 @@ from synapse.util.caches.descriptors import cached
from synapse.util.caches.lrucache import LruCache
from synapse.util.iterutils import batch_iter
+oldest_pdu_in_federation_staging = Gauge(
+ "synapse_federation_server_oldest_inbound_pdu_in_staging",
+ "The age in seconds since we received the oldest pdu in the federation staging area",
+)
+
+number_pdus_in_federation_queue = Gauge(
+ "synapse_federation_server_number_inbound_pdu_in_staging",
+ "The total number of events in the inbound federation staging",
+)
+
logger = logging.getLogger(__name__)
@@ -54,6 +66,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
500000, "_event_auth_cache", size_callback=len
) # type: LruCache[str, List[Tuple[str, int]]]
+ self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
+
async def get_auth_chain(
self, room_id: str, event_ids: Collection[str], include_given: bool = False
) -> List[EventBase]:
@@ -1075,16 +1089,62 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
self,
origin: str,
event_id: str,
- ) -> None:
- """Remove the given event from the staging area"""
- await self.db_pool.simple_delete(
- table="federation_inbound_events_staging",
- keyvalues={
- "origin": origin,
- "event_id": event_id,
- },
- desc="remove_received_event_from_staging",
- )
+ ) -> Optional[int]:
+ """Remove the given event from the staging area.
+
+ Returns:
+ The received_ts of the row that was deleted, if any.
+ """
+ if self.db_pool.engine.supports_returning:
+
+ def _remove_received_event_from_staging_txn(txn):
+ sql = """
+ DELETE FROM federation_inbound_events_staging
+ WHERE origin = ? AND event_id = ?
+ RETURNING received_ts
+ """
+
+ txn.execute(sql, (origin, event_id))
+ return txn.fetchone()
+
+ row = await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ db_autocommit=True,
+ )
+ if row is None:
+ return None
+
+ return row[0]
+
+ else:
+
+ def _remove_received_event_from_staging_txn(txn):
+ received_ts = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ retcol="received_ts",
+ allow_none=True,
+ )
+ self.db_pool.simple_delete_txn(
+ txn,
+ table="federation_inbound_events_staging",
+ keyvalues={
+ "origin": origin,
+ "event_id": event_id,
+ },
+ )
+
+ return received_ts
+
+ return await self.db_pool.runInteraction(
+ "remove_received_event_from_staging",
+ _remove_received_event_from_staging_txn,
+ )
async def get_next_staged_event_id_for_room(
self,
@@ -1147,6 +1207,40 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return origin, event
+ async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
+ """Get the room IDs of all events currently staged."""
+ return await self.db_pool.simple_select_onecol(
+ table="federation_inbound_events_staging",
+ keyvalues={},
+ retcol="DISTINCT room_id",
+ desc="get_all_rooms_with_staged_incoming_events",
+ )
+
+ @wrap_as_background_process("_get_stats_for_federation_staging")
+ async def _get_stats_for_federation_staging(self):
+ """Update the prometheus metrics for the inbound federation staging area."""
+
+ def _get_stats_for_federation_staging_txn(txn):
+ txn.execute(
+ "SELECT coalesce(count(*), 0) FROM federation_inbound_events_staging"
+ )
+ (count,) = txn.fetchone()
+
+ txn.execute(
+ "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
+ )
+
+ (age,) = txn.fetchone()
+
+ return count, age
+
+ count, age = await self.db_pool.runInteraction(
+ "_get_stats_for_federation_staging", _get_stats_for_federation_staging_txn
+ )
+
+ number_pdus_in_federation_queue.set(count)
+ oldest_pdu_in_federation_staging.set(age)
+
class EventFederationStore(EventFederationWorkerStore):
"""Responsible for storing and serving up the various graphs associated
|