summary refs log tree commit diff
path: root/synapse/storage/databases/main/event_federation.py
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2021-08-03 10:34:44 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2021-08-03 10:34:44 +0100
commit11dda97e86579d807528392df7ba3c3efdd03c01 (patch)
tree003e6d55563e78ccca0e9f4a2b1c798231038370 /synapse/storage/databases/main/event_federation.py
parentMerge branch 'release-v1.39' of github.com:matrix-org/synapse into matrix-org... (diff)
parentFix the `tests-done` github actions step, again (#10512) (diff)
downloadsynapse-11dda97e86579d807528392df7ba3c3efdd03c01.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/storage/databases/main/event_federation.py')
-rw-r--r--synapse/storage/databases/main/event_federation.py199
1 files changed, 186 insertions, 13 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py

index d39368c20e..44018c1c31 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -16,11 +16,11 @@ import logging from queue import Empty, PriorityQueue from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple -from prometheus_client import Gauge +from prometheus_client import Counter, Gauge from synapse.api.constants import MAX_DEPTH from synapse.api.errors import StoreError -from synapse.api.room_versions import RoomVersion +from synapse.api.room_versions import EventFormatVersions, RoomVersion from synapse.events import EventBase, make_event_from_dict from synapse.metrics.background_process_metrics import wrap_as_background_process from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause @@ -44,6 +44,12 @@ number_pdus_in_federation_queue = Gauge( "The total number of events in the inbound federation staging", ) +pdus_pruned_from_federation_queue = Counter( + "synapse_federation_server_number_inbound_pdu_pruned", + "The number of events in the inbound federation staging that have been " + "pruned due to the queue getting too long", +) + logger = logging.getLogger(__name__) @@ -936,15 +942,46 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas # We want to make sure that we do a breadth-first, "depth" ordered # search. - query = ( - "SELECT depth, prev_event_id FROM event_edges" - " INNER JOIN events" - " ON prev_event_id = events.event_id" - " WHERE event_edges.event_id = ?" - " AND event_edges.is_state = ?" - " LIMIT ?" - ) + # Look for the prev_event_id connected to the given event_id + query = """ + SELECT depth, prev_event_id FROM event_edges + /* Get the depth of the prev_event_id from the events table */ + INNER JOIN events + ON prev_event_id = events.event_id + /* Find an event which matches the given event_id */ + WHERE event_edges.event_id = ? + AND event_edges.is_state = ? + LIMIT ? + """ + + # Look for the "insertion" events connected to the given event_id + connected_insertion_event_query = """ + SELECT e.depth, i.event_id FROM insertion_event_edges AS i + /* Get the depth of the insertion event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which points via prev_events to the given event_id */ + WHERE i.insertion_prev_event_id = ? + LIMIT ? + """ + + # Find any chunk connections of a given insertion event + chunk_connection_query = """ + SELECT e.depth, c.event_id FROM insertion_events AS i + /* Find the chunk that connects to the given insertion event */ + INNER JOIN chunk_events AS c + ON i.next_chunk_id = c.chunk_id + /* Get the depth of the chunk start event from the events table */ + INNER JOIN events AS e USING (event_id) + /* Find an insertion event which matches the given event_id */ + WHERE i.event_id = ? + LIMIT ? + """ + # In a PriorityQueue, the lowest valued entries are retrieved first. + # We're using depth as the priority in the queue. + # Depth is lowest at the oldest-in-time message and highest and + # newest-in-time message. We add events to the queue with a negative depth so that + # we process the newest-in-time messages first going backwards in time. queue = PriorityQueue() for event_id in event_list: @@ -970,9 +1007,48 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas event_results.add(event_id) + # Try and find any potential historical chunks of message history. + # + # First we look for an insertion event connected to the current + # event (by prev_event). If we find any, we need to go and try to + # find any chunk events connected to the insertion event (by + # chunk_id). If we find any, we'll add them to the queue and + # navigate up the DAG like normal in the next iteration of the loop. + txn.execute( + connected_insertion_event_query, (event_id, limit - len(event_results)) + ) + connected_insertion_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: connected_insertion_event_query %s", + connected_insertion_event_id_results, + ) + for row in connected_insertion_event_id_results: + connected_insertion_event_depth = row[0] + connected_insertion_event = row[1] + queue.put((-connected_insertion_event_depth, connected_insertion_event)) + + # Find any chunk connections for the given insertion event + txn.execute( + chunk_connection_query, + (connected_insertion_event, limit - len(event_results)), + ) + chunk_start_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: chunk_start_event_id_results %s", + chunk_start_event_id_results, + ) + for row in chunk_start_event_id_results: + if row[1] not in event_results: + queue.put((-row[0], row[1])) + + # Navigate up the DAG by prev_event txn.execute(query, (event_id, False, limit - len(event_results))) + prev_event_id_results = txn.fetchall() + logger.debug( + "_get_backfill_events: prev_event_ids %s", prev_event_id_results + ) - for row in txn: + for row in prev_event_id_results: if row[1] not in event_results: queue.put((-row[0], row[1])) @@ -1207,6 +1283,100 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas return origin, event + async def prune_staged_events_in_room( + self, + room_id: str, + room_version: RoomVersion, + ) -> bool: + """Checks if there are lots of staged events for the room, and if so + prune them down. + + Returns: + Whether any events were pruned + """ + + # First check the size of the queue. + count = await self.db_pool.simple_select_one_onecol( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + retcol="COALESCE(COUNT(*), 0)", + desc="prune_staged_events_in_room_count", + ) + + if count < 100: + return False + + # If the queue is too large, then we want clear the entire queue, + # keeping only the forward extremities (i.e. the events not referenced + # by other events in the queue). We do this so that we can always + # backpaginate in all the events we have dropped. + rows = await self.db_pool.simple_select_list( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + retcols=("event_id", "event_json"), + desc="prune_staged_events_in_room_fetch", + ) + + # Find the set of events referenced by those in the queue, as well as + # collecting all the event IDs in the queue. + referenced_events: Set[str] = set() + seen_events: Set[str] = set() + for row in rows: + event_id = row["event_id"] + seen_events.add(event_id) + event_d = db_to_json(row["event_json"]) + + # We don't bother parsing the dicts into full blown event objects, + # as that is needlessly expensive. + + # We haven't checked that the `prev_events` have the right format + # yet, so we check as we go. + prev_events = event_d.get("prev_events", []) + if not isinstance(prev_events, list): + logger.info("Invalid prev_events for %s", event_id) + continue + + if room_version.event_format == EventFormatVersions.V1: + for prev_event_tuple in prev_events: + if not isinstance(prev_event_tuple, list) or len(prev_events) != 2: + logger.info("Invalid prev_events for %s", event_id) + break + + prev_event_id = prev_event_tuple[0] + if not isinstance(prev_event_id, str): + logger.info("Invalid prev_events for %s", event_id) + break + + referenced_events.add(prev_event_id) + else: + for prev_event_id in prev_events: + if not isinstance(prev_event_id, str): + logger.info("Invalid prev_events for %s", event_id) + break + + referenced_events.add(prev_event_id) + + to_delete = referenced_events & seen_events + if not to_delete: + return False + + pdus_pruned_from_federation_queue.inc(len(to_delete)) + logger.info( + "Pruning %d events in room %s from federation queue", + len(to_delete), + room_id, + ) + + await self.db_pool.simple_delete_many( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + iterable=to_delete, + column="event_id", + desc="prune_staged_events_in_room_delete", + ) + + return True + async def get_all_rooms_with_staged_incoming_events(self) -> List[str]: """Get the room IDs of all events currently staged.""" return await self.db_pool.simple_select_onecol( @@ -1227,12 +1397,15 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas (count,) = txn.fetchone() txn.execute( - "SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging" + "SELECT min(received_ts) FROM federation_inbound_events_staging" ) (received_ts,) = txn.fetchone() - age = self._clock.time_msec() - received_ts + # If there is nothing in the staging area default it to 0. + age = 0 + if received_ts is not None: + age = self._clock.time_msec() - received_ts return count, age