diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index d39368c20e..44018c1c31 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,11 +16,11 @@ import logging
from queue import Empty, PriorityQueue
from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
-from prometheus_client import Gauge
+from prometheus_client import Counter, Gauge
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
-from synapse.api.room_versions import RoomVersion
+from synapse.api.room_versions import EventFormatVersions, RoomVersion
from synapse.events import EventBase, make_event_from_dict
from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
@@ -44,6 +44,12 @@ number_pdus_in_federation_queue = Gauge(
"The total number of events in the inbound federation staging",
)
+pdus_pruned_from_federation_queue = Counter(
+ "synapse_federation_server_number_inbound_pdu_pruned",
+ "The number of events in the inbound federation staging that have been "
+ "pruned due to the queue getting too long",
+)
+
logger = logging.getLogger(__name__)
@@ -936,15 +942,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# We want to make sure that we do a breadth-first, "depth" ordered
# search.
- query = (
- "SELECT depth, prev_event_id FROM event_edges"
- " INNER JOIN events"
- " ON prev_event_id = events.event_id"
- " WHERE event_edges.event_id = ?"
- " AND event_edges.is_state = ?"
- " LIMIT ?"
- )
+ # Look for the prev_event_id connected to the given event_id
+ query = """
+ SELECT depth, prev_event_id FROM event_edges
+ /* Get the depth of the prev_event_id from the events table */
+ INNER JOIN events
+ ON prev_event_id = events.event_id
+ /* Find an event which matches the given event_id */
+ WHERE event_edges.event_id = ?
+ AND event_edges.is_state = ?
+ LIMIT ?
+ """
+
+ # Look for the "insertion" events connected to the given event_id
+ connected_insertion_event_query = """
+ SELECT e.depth, i.event_id FROM insertion_event_edges AS i
+ /* Get the depth of the insertion event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which points via prev_events to the given event_id */
+ WHERE i.insertion_prev_event_id = ?
+ LIMIT ?
+ """
+
+ # Find any chunk connections of a given insertion event
+ chunk_connection_query = """
+ SELECT e.depth, c.event_id FROM insertion_events AS i
+ /* Find the chunk that connects to the given insertion event */
+ INNER JOIN chunk_events AS c
+ ON i.next_chunk_id = c.chunk_id
+ /* Get the depth of the chunk start event from the events table */
+ INNER JOIN events AS e USING (event_id)
+ /* Find an insertion event which matches the given event_id */
+ WHERE i.event_id = ?
+ LIMIT ?
+ """
+ # In a PriorityQueue, the lowest valued entries are retrieved first.
+ # We're using depth as the priority in the queue.
+ # Depth is lowest at the oldest-in-time message and highest and
+ # newest-in-time message. We add events to the queue with a negative depth so that
+ # we process the newest-in-time messages first going backwards in time.
queue = PriorityQueue()
for event_id in event_list:
@@ -970,9 +1007,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
event_results.add(event_id)
+ # Try and find any potential historical chunks of message history.
+ #
+ # First we look for an insertion event connected to the current
+ # event (by prev_event). If we find any, we need to go and try to
+ # find any chunk events connected to the insertion event (by
+ # chunk_id). If we find any, we'll add them to the queue and
+ # navigate up the DAG like normal in the next iteration of the loop.
+ txn.execute(
+ connected_insertion_event_query, (event_id, limit - len(event_results))
+ )
+ connected_insertion_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: connected_insertion_event_query %s",
+ connected_insertion_event_id_results,
+ )
+ for row in connected_insertion_event_id_results:
+ connected_insertion_event_depth = row[0]
+ connected_insertion_event = row[1]
+ queue.put((-connected_insertion_event_depth, connected_insertion_event))
+
+ # Find any chunk connections for the given insertion event
+ txn.execute(
+ chunk_connection_query,
+ (connected_insertion_event, limit - len(event_results)),
+ )
+ chunk_start_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: chunk_start_event_id_results %s",
+ chunk_start_event_id_results,
+ )
+ for row in chunk_start_event_id_results:
+ if row[1] not in event_results:
+ queue.put((-row[0], row[1]))
+
+ # Navigate up the DAG by prev_event
txn.execute(query, (event_id, False, limit - len(event_results)))
+ prev_event_id_results = txn.fetchall()
+ logger.debug(
+ "_get_backfill_events: prev_event_ids %s", prev_event_id_results
+ )
- for row in txn:
+ for row in prev_event_id_results:
if row[1] not in event_results:
queue.put((-row[0], row[1]))
@@ -1207,6 +1283,100 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
return origin, event
+ async def prune_staged_events_in_room(
+ self,
+ room_id: str,
+ room_version: RoomVersion,
+ ) -> bool:
+ """Checks if there are lots of staged events for the room, and if so
+ prune them down.
+
+ Returns:
+ Whether any events were pruned
+ """
+
+ # First check the size of the queue.
+ count = await self.db_pool.simple_select_one_onecol(
+ table="federation_inbound_events_staging",
+ keyvalues={"room_id": room_id},
+ retcol="COALESCE(COUNT(*), 0)",
+ desc="prune_staged_events_in_room_count",
+ )
+
+ if count < 100:
+ return False
+
+ # If the queue is too large, then we want clear the entire queue,
+ # keeping only the forward extremities (i.e. the events not referenced
+ # by other events in the queue). We do this so that we can always
+ # backpaginate in all the events we have dropped.
+ rows = await self.db_pool.simple_select_list(
+ table="federation_inbound_events_staging",
+ keyvalues={"room_id": room_id},
+ retcols=("event_id", "event_json"),
+ desc="prune_staged_events_in_room_fetch",
+ )
+
+ # Find the set of events referenced by those in the queue, as well as
+ # collecting all the event IDs in the queue.
+ referenced_events: Set[str] = set()
+ seen_events: Set[str] = set()
+ for row in rows:
+ event_id = row["event_id"]
+ seen_events.add(event_id)
+ event_d = db_to_json(row["event_json"])
+
+ # We don't bother parsing the dicts into full blown event objects,
+ # as that is needlessly expensive.
+
+ # We haven't checked that the `prev_events` have the right format
+ # yet, so we check as we go.
+ prev_events = event_d.get("prev_events", [])
+ if not isinstance(prev_events, list):
+ logger.info("Invalid prev_events for %s", event_id)
+ continue
+
+ if room_version.event_format == EventFormatVersions.V1:
+ for prev_event_tuple in prev_events:
+ if not isinstance(prev_event_tuple, list) or len(prev_events) != 2:
+ logger.info("Invalid prev_events for %s", event_id)
+ break
+
+ prev_event_id = prev_event_tuple[0]
+ if not isinstance(prev_event_id, str):
+ logger.info("Invalid prev_events for %s", event_id)
+ break
+
+ referenced_events.add(prev_event_id)
+ else:
+ for prev_event_id in prev_events:
+ if not isinstance(prev_event_id, str):
+ logger.info("Invalid prev_events for %s", event_id)
+ break
+
+ referenced_events.add(prev_event_id)
+
+ to_delete = referenced_events & seen_events
+ if not to_delete:
+ return False
+
+ pdus_pruned_from_federation_queue.inc(len(to_delete))
+ logger.info(
+ "Pruning %d events in room %s from federation queue",
+ len(to_delete),
+ room_id,
+ )
+
+ await self.db_pool.simple_delete_many(
+ table="federation_inbound_events_staging",
+ keyvalues={"room_id": room_id},
+ iterable=to_delete,
+ column="event_id",
+ desc="prune_staged_events_in_room_delete",
+ )
+
+ return True
+
async def get_all_rooms_with_staged_incoming_events(self) -> List[str]:
"""Get the room IDs of all events currently staged."""
return await self.db_pool.simple_select_onecol(
@@ -1227,12 +1397,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
(count,) = txn.fetchone()
txn.execute(
- "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
+ "SELECT min(received_ts) FROM federation_inbound_events_staging"
)
(received_ts,) = txn.fetchone()
- age = self._clock.time_msec() - received_ts
+ # If there is nothing in the staging area default it to 0.
+ age = 0
+ if received_ts is not None:
+ age = self._clock.time_msec() - received_ts
return count, age
|