diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 68f30d893c..3dd53f2038 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -39,6 +39,7 @@ from typing import (
import attr
from prometheus_client import Counter, Gauge
+from sortedcontainers import SortedSet
from synapse.api.constants import MAX_DEPTH
from synapse.api.errors import StoreError
@@ -373,24 +374,16 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# We fetch the links in batches. Separate batches will likely fetch the
# same set of links (e.g. they'll always pull in the links to create
- # event). To try and minimize the amount of redundant links, we sort the
- # chain IDs in reverse, as there will be a correlation between the order
- # of chain IDs and links (i.e., higher chain IDs are more likely to
- # depend on lower chain IDs than vice versa).
+ # event). To try and minimize the amount of redundant links, we query
+ # the chain IDs in reverse order, as there will be a correlation between
+ # the order of chain IDs and links (i.e., higher chain IDs are more
+ # likely to depend on lower chain IDs than vice versa).
BATCH_SIZE = 1000
- chains_to_fetch_list = list(chains_to_fetch)
- chains_to_fetch_list.sort(reverse=True)
+ chains_to_fetch_sorted = SortedSet(chains_to_fetch)
- seen_chains: Set[int] = set()
- while chains_to_fetch_list:
- batch2 = [
- c for c in chains_to_fetch_list[-BATCH_SIZE:] if c not in seen_chains
- ]
- chains_to_fetch_list = chains_to_fetch_list[:-BATCH_SIZE]
- while len(batch2) < BATCH_SIZE and chains_to_fetch_list:
- chain_id = chains_to_fetch_list.pop()
- if chain_id not in seen_chains:
- batch2.append(chain_id)
+ while chains_to_fetch_sorted:
+ batch2 = list(chains_to_fetch_sorted.islice(-BATCH_SIZE))
+ chains_to_fetch_sorted.difference_update(batch2)
clause, args = make_in_list_sql_clause(
txn.database_engine, "origin_chain_id", batch2
@@ -409,8 +402,7 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
(origin_sequence_number, target_chain_id, target_sequence_number)
)
- seen_chains.update(links)
- seen_chains.update(batch2)
+ chains_to_fetch_sorted.difference_update(links)
yield links
|