diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index c4474df975..d39368c20e 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -62,9 +62,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
)
# Cache of event ID to list of auth event IDs and their depths.
- self._event_auth_cache = LruCache(
+ self._event_auth_cache: LruCache[str, List[Tuple[str, int]]] = LruCache(
500000, "_event_auth_cache", size_callback=len
- ) # type: LruCache[str, List[Tuple[str, int]]]
+ )
self._clock.looping_call(self._get_stats_for_federation_staging, 30 * 1000)
@@ -137,10 +137,10 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
initial_events = set(event_ids)
# All the events that we've found that are reachable from the events.
- seen_events = set() # type: Set[str]
+ seen_events: Set[str] = set()
# A map from chain ID to max sequence number of the given events.
- event_chains = {} # type: Dict[int, int]
+ event_chains: Dict[int, int] = {}
sql = """
SELECT event_id, chain_id, sequence_number
@@ -182,7 +182,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"""
# A map from chain ID to max sequence number *reachable* from any event ID.
- chains = {} # type: Dict[int, int]
+ chains: Dict[int, int] = {}
# Add all linked chains reachable from initial set of chains.
for batch in batch_iter(event_chains, 1000):
@@ -353,14 +353,14 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
initial_events = set(state_sets[0]).union(*state_sets[1:])
# Map from event_id -> (chain ID, seq no)
- chain_info = {} # type: Dict[str, Tuple[int, int]]
+ chain_info: Dict[str, Tuple[int, int]] = {}
# Map from chain ID -> seq no -> event Id
- chain_to_event = {} # type: Dict[int, Dict[int, str]]
+ chain_to_event: Dict[int, Dict[int, str]] = {}
# All the chains that we've found that are reachable from the state
# sets.
- seen_chains = set() # type: Set[int]
+ seen_chains: Set[int] = set()
sql = """
SELECT event_id, chain_id, sequence_number
@@ -392,9 +392,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# Corresponds to `state_sets`, except as a map from chain ID to max
# sequence number reachable from the state set.
- set_to_chain = [] # type: List[Dict[int, int]]
+ set_to_chain: List[Dict[int, int]] = []
for state_set in state_sets:
- chains = {} # type: Dict[int, int]
+ chains: Dict[int, int] = {}
set_to_chain.append(chains)
for event_id in state_set:
@@ -446,7 +446,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
# Mapping from chain ID to the range of sequence numbers that should be
# pulled from the database.
- chain_to_gap = {} # type: Dict[int, Tuple[int, int]]
+ chain_to_gap: Dict[int, Tuple[int, int]] = {}
for chain_id in seen_chains:
min_seq_no = min(chains.get(chain_id, 0) for chains in set_to_chain)
@@ -555,7 +555,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
}
# The sorted list of events whose auth chains we should walk.
- search = [] # type: List[Tuple[int, str]]
+ search: List[Tuple[int, str]] = []
# We need to get the depth of the initial events for sorting purposes.
sql = """
@@ -578,7 +578,7 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
search.sort()
# Map from event to its auth events
- event_to_auth_events = {} # type: Dict[str, Set[str]]
+ event_to_auth_events: Dict[str, Set[str]] = {}
base_sql = """
SELECT a.event_id, auth_id, depth
@@ -1230,7 +1230,9 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
"SELECT coalesce(min(received_ts), 0) FROM federation_inbound_events_staging"
)
- (age,) = txn.fetchone()
+ (received_ts,) = txn.fetchone()
+
+ age = self._clock.time_msec() - received_ts
return count, age
|