summary refs log tree commit diff
path: root/synapse/storage/databases/main/event_federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/event_federation.py')
-rw-r--r--synapse/storage/databases/main/event_federation.py125
1 files changed, 67 insertions, 58 deletions
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py

index d4251be7e7..b8bbd1eccd 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py
@@ -1048,15 +1048,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas Args: event_ids: The event IDs to calculate the max depth of. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - column="event_id", - iterable=event_ids, - retcols=( - "event_id", - "depth", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=( + "event_id", + "depth", + ), + desc="get_max_depth_of", ), - desc="get_max_depth_of", ) if not rows: @@ -1064,10 +1067,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas else: max_depth_event_id = "" current_max_depth = 0 - for row in rows: - if row["depth"] > current_max_depth: - max_depth_event_id = row["event_id"] - current_max_depth = row["depth"] + for event_id, depth in rows: + if depth > current_max_depth: + max_depth_event_id = event_id + current_max_depth = depth return max_depth_event_id, current_max_depth @@ -1077,15 +1080,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas Args: event_ids: The event IDs to calculate the max depth of. """ - rows = await self.db_pool.simple_select_many_batch( - table="events", - column="event_id", - iterable=event_ids, - retcols=( - "event_id", - "depth", + rows = cast( + List[Tuple[str, int]], + await self.db_pool.simple_select_many_batch( + table="events", + column="event_id", + iterable=event_ids, + retcols=( + "event_id", + "depth", + ), + desc="get_min_depth_of", ), - desc="get_min_depth_of", ) if not rows: @@ -1093,10 +1099,10 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas else: min_depth_event_id = "" current_min_depth = MAX_DEPTH - for row in rows: - if row["depth"] < current_min_depth: - min_depth_event_id = row["event_id"] - current_min_depth = row["depth"] + for event_id, depth in rows: + if depth < current_min_depth: + min_depth_event_id = event_id + current_min_depth = depth return min_depth_event_id, current_min_depth @@ -1552,19 +1558,18 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas A filtered down list of `event_ids` that have previous failed pull attempts. """ - rows = await self.db_pool.simple_select_many_batch( - table="event_failed_pull_attempts", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=("event_id",), - desc="get_event_ids_with_failed_pull_attempts", + rows = cast( + List[Tuple[str]], + await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=("event_id",), + desc="get_event_ids_with_failed_pull_attempts", + ), ) - event_ids_with_failed_pull_attempts: Set[str] = { - row["event_id"] for row in rows - } - - return event_ids_with_failed_pull_attempts + return {row[0] for row in rows} @trace async def get_event_ids_to_not_pull_from_backoff( @@ -1584,32 +1589,34 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas A dictionary of event_ids that should not be attempted to be pulled and the next timestamp at which we may try pulling them again. """ - event_failed_pull_attempts = await self.db_pool.simple_select_many_batch( - table="event_failed_pull_attempts", - column="event_id", - iterable=event_ids, - keyvalues={}, - retcols=( - "event_id", - "last_attempt_ts", - "num_attempts", + event_failed_pull_attempts = cast( + List[Tuple[str, int, int]], + await self.db_pool.simple_select_many_batch( + table="event_failed_pull_attempts", + column="event_id", + iterable=event_ids, + keyvalues={}, + retcols=( + "event_id", + "last_attempt_ts", + "num_attempts", + ), + desc="get_event_ids_to_not_pull_from_backoff", ), - desc="get_event_ids_to_not_pull_from_backoff", ) current_time = self._clock.time_msec() event_ids_with_backoff = {} - for event_failed_pull_attempt in event_failed_pull_attempts: - event_id = event_failed_pull_attempt["event_id"] + for event_id, last_attempt_ts, num_attempts in event_failed_pull_attempts: # Exponential back-off (up to the upper bound) so we don't try to # pull the same event over and over. ex. 2hr, 4hr, 8hr, 16hr, etc. backoff_end_time = ( - event_failed_pull_attempt["last_attempt_ts"] + last_attempt_ts + ( 2 ** min( - event_failed_pull_attempt["num_attempts"], + num_attempts, BACKFILL_EVENT_EXPONENTIAL_BACKOFF_MAXIMUM_DOUBLING_STEPS, ) ) @@ -1890,21 +1897,23 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas # keeping only the forward extremities (i.e. the events not referenced # by other events in the queue). We do this so that we can always # backpaginate in all the events we have dropped. - rows = await self.db_pool.simple_select_list( - table="federation_inbound_events_staging", - keyvalues={"room_id": room_id}, - retcols=("event_id", "event_json"), - desc="prune_staged_events_in_room_fetch", + rows = cast( + List[Tuple[str, str]], + await self.db_pool.simple_select_list( + table="federation_inbound_events_staging", + keyvalues={"room_id": room_id}, + retcols=("event_id", "event_json"), + desc="prune_staged_events_in_room_fetch", + ), ) # Find the set of events referenced by those in the queue, as well as # collecting all the event IDs in the queue. referenced_events: Set[str] = set() seen_events: Set[str] = set() - for row in rows: - event_id = row["event_id"] + for event_id, event_json in rows: seen_events.add(event_id) - event_d = db_to_json(row["event_json"]) + event_d = db_to_json(event_json) # We don't bother parsing the dicts into full blown event objects, # as that is needlessly expensive.