diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index d4251be7e7..b8bbd1eccd 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1048,15 +1048,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
Args:
event_ids: The event IDs to calculate the max depth of.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="events",
- column="event_id",
- iterable=event_ids,
- retcols=(
- "event_id",
- "depth",
+ rows = cast(
+ List[Tuple[str, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=(
+ "event_id",
+ "depth",
+ ),
+ desc="get_max_depth_of",
),
- desc="get_max_depth_of",
)
if not rows:
@@ -1064,10 +1067,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
else:
max_depth_event_id = ""
current_max_depth = 0
- for row in rows:
- if row["depth"] > current_max_depth:
- max_depth_event_id = row["event_id"]
- current_max_depth = row["depth"]
+ for event_id, depth in rows:
+ if depth > current_max_depth:
+ max_depth_event_id = event_id
+ current_max_depth = depth
return max_depth_event_id, current_max_depth
@@ -1077,15 +1080,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
Args:
event_ids: The event IDs to calculate the max depth of.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="events",
- column="event_id",
- iterable=event_ids,
- retcols=(
- "event_id",
- "depth",
+ rows = cast(
+ List[Tuple[str, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="events",
+ column="event_id",
+ iterable=event_ids,
+ retcols=(
+ "event_id",
+ "depth",
+ ),
+ desc="get_min_depth_of",
),
- desc="get_min_depth_of",
)
if not rows:
@@ -1093,10 +1099,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
else:
min_depth_event_id = ""
current_min_depth = MAX_DEPTH
- for row in rows:
- if row["depth"] < current_min_depth:
- min_depth_event_id = row["event_id"]
- current_min_depth = row["depth"]
+ for event_id, depth in rows:
+ if depth < current_min_depth:
+ min_depth_event_id = event_id
+ current_min_depth = depth
return min_depth_event_id, current_min_depth
@@ -1552,19 +1558,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
A filtered down list of `event_ids` that have previous failed pull attempts.
"""
- rows = await self.db_pool.simple_select_many_batch(
- table="event_failed_pull_attempts",
- column="event_id",
- iterable=event_ids,
- keyvalues={},
- retcols=("event_id",),
- desc="get_event_ids_with_failed_pull_attempts",
+ rows = cast(
+ List[Tuple[str]],
+ await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=("event_id",),
+ desc="get_event_ids_with_failed_pull_attempts",
+ ),
)
- event_ids_with_failed_pull_attempts: Set[str] = {
- row["event_id"] for row in rows
- }
-
- return event_ids_with_failed_pull_attempts
+ return {row[0] for row in rows}
@trace
async def get_event_ids_to_not_pull_from_backoff(
@@ -1584,32 +1589,34 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
A dictionary of event_ids that should not be attempted to be pulled and the
next timestamp at which we may try pulling them again.
"""
- event_failed_pull_attempts = await self.db_pool.simple_select_many_batch(
- table="event_failed_pull_attempts",
- column="event_id",
- iterable=event_ids,
- keyvalues={},
- retcols=(
- "event_id",
- "last_attempt_ts",
- "num_attempts",
+ event_failed_pull_attempts = cast(
+ List[Tuple[str, int, int]],
+ await self.db_pool.simple_select_many_batch(
+ table="event_failed_pull_attempts",
+ column="event_id",
+ iterable=event_ids,
+ keyvalues={},
+ retcols=(
+ "event_id",
+ "last_attempt_ts",
+ "num_attempts",
+ ),
+ desc="get_event_ids_to_not_pull_from_backoff",
),
- desc="get_event_ids_to_not_pull_from_backoff",
)
current_time = self._clock.time_msec()
event_ids_with_backoff = {}
- for event_failed_pull_attempt in event_failed_pull_attempts:
- event_id = event_failed_pull_attempt["event_id"]
+ for event_id, last_attempt_ts, num_attempts in event_failed_pull_attempts:
# Exponential back-off (up to the upper bound) so we don't try to
# pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc.
backoff_end_time = (
- event_failed_pull_attempt["last_attempt_ts"]
+ last_attempt_ts
+ (
2
** min(
- event_failed_pull_attempt["num_attempts"],
+ num_attempts,
BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS,
)
)
@@ -1890,21 +1897,23 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
# keeping only the forward extremities (i.e. the events not referenced
# by other events in the queue). We do this so that we can always
# backpaginate in all the events we have dropped.
- rows = await self.db_pool.simple_select_list(
- table="federation_inbound_events_staging",
- keyvalues={"room_id": room_id},
- retcols=("event_id", "event_json"),
- desc="prune_staged_events_in_room_fetch",
+ rows = cast(
+ List[Tuple[str, str]],
+ await self.db_pool.simple_select_list(
+ table="federation_inbound_events_staging",
+ keyvalues={"room_id": room_id},
+ retcols=("event_id", "event_json"),
+ desc="prune_staged_events_in_room_fetch",
+ ),
)
# Find the set of events referenced by those in the queue, as well as
# collecting all the event IDs in the queue.
referenced_events: Set[str] = set()
seen_events: Set[str] = set()
- for row in rows:
- event_id = row["event_id"]
+ for event_id, event_json in rows:
seen_events.add(event_id)
- event_d = db_to_json(row["event_json"])
+ event_d = db_to_json(event_json)
# We don't bother parsing the dicts into full blown event objects,
# as that is needlessly expensive.
|