diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 3dd53f2038..1f35063e9f 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -21,6 +21,7 @@
import datetime
import itertools
import logging
+import time
from queue import Empty, PriorityQueue
from typing import (
TYPE_CHECKING,
@@ -381,14 +382,23 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
BATCH_SIZE = 1000
chains_to_fetch_sorted = SortedSet(chains_to_fetch)
+ logger.info("CHAINS: Fetching chain links %d", len(chains_to_fetch_sorted))
+
+ start_block = time.monotonic()
+
while chains_to_fetch_sorted:
batch2 = list(chains_to_fetch_sorted.islice(-BATCH_SIZE))
chains_to_fetch_sorted.difference_update(batch2)
+ logger.info("CHAINS: batch2 %d", len(batch2))
clause, args = make_in_list_sql_clause(
txn.database_engine, "origin_chain_id", batch2
)
+ start_query = time.monotonic()
txn.execute(sql % (clause,), args)
+ end_query = time.monotonic()
+
+ logger.info("CHAINS: query took %d ms", (end_query - start_query) * 1000)
links: Dict[int, List[Tuple[int, int, int]]] = {}
@@ -404,8 +414,15 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
chains_to_fetch_sorted.difference_update(links)
+ logger.info("CHAINS: returned %d", len(links))
+ logger.info("CHAINS: remaining %d", len(chains_to_fetch_sorted))
+
yield links
+ end_block = time.monotonic()
+
+ logger.info("CHAINS: block took %d ms", (end_block - start_block) * 1000)
+
def _get_auth_chain_ids_txn(
self, txn: LoggingTransaction, event_ids: Collection[str], include_given: bool
) -> Set[str]:
@@ -592,6 +609,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# are reachable from any event.
# (We need to take a copy of `seen_chains` as the function mutates it)
+ logger.info("CHAINS: for room %s", room_id)
for links in self._get_chain_links(txn, seen_chains):
for chains in set_to_chain:
for chain_id in links:
@@ -602,6 +620,8 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
seen_chains.update(chains)
+ logger.info("CHAINS: materialized chains %d", len(chains))
+
# Now for each chain we figure out the maximum sequence number reachable
# from *any* state set and the minimum sequence number reachable from
# *all* state sets. Events in that range are in the auth chain
|