diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index deb40f4610..0960f033bc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -148,14 +148,14 @@ class FederationSender(AbstractFederationSender):
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
- self._presence_router = None # type: Optional[PresenceRouter]
+ self._presence_router: Optional["PresenceRouter"] = None
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
# map from destination to PerDestinationQueue
- self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
+ self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
@@ -192,9 +192,7 @@ class FederationSender(AbstractFederationSender):
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
- self._queues_awaiting_rr_flush_by_room = (
- {}
- ) # type: Dict[str, Set[PerDestinationQueue]]
+ self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
self._rr_txn_interval_per_room_ms = (
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
@@ -265,7 +263,7 @@ class FederationSender(AbstractFederationSender):
if not event.internal_metadata.should_proactively_send():
return
- destinations = None # type: Optional[Set[str]]
+ destinations: Optional[Set[str]] = None
if not event.prev_event_ids():
# If there are no prev event IDs then the state is empty
# and so no remote servers in the room
@@ -331,7 +329,7 @@ class FederationSender(AbstractFederationSender):
for event in events:
await handle_event(event)
- events_by_room = {} # type: Dict[str, List[EventBase]]
+ events_by_room: Dict[str, List[EventBase]] = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -628,7 +626,7 @@ class FederationSender(AbstractFederationSender):
In order to reduce load spikes, adds a delay between each destination.
"""
- last_processed = None # type: Optional[str]
+ last_processed: Optional[str] = None
while True:
destinations_to_wake = (
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3a2efd56ee..d06a3aff19 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -105,34 +105,34 @@ class PerDestinationQueue:
# catch-up at startup.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
- self._catching_up = True # type: bool
+ self._catching_up: bool = True
# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode.
- self._catchup_last_skipped = 0 # type: int
+ self._catchup_last_skipped: int = 0
# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
- self._last_successful_stream_ordering = None # type: Optional[int]
+ self._last_successful_stream_ordering: Optional[int] = None
# a queue of pending PDUs
- self._pending_pdus = [] # type: List[EventBase]
+ self._pending_pdus: List[EventBase] = []
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
- self._pending_edus = [] # type: List[Edu]
+ self._pending_edus: List[Edu] = []
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
- self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu]
+ self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {}
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
- self._pending_presence = {} # type: Dict[str, UserPresenceState]
+ self._pending_presence: Dict[str, UserPresenceState] = {}
# room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]]
+ self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -243,7 +243,7 @@ class PerDestinationQueue:
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[EventBase]
+ pending_pdus: List[EventBase] = []
try:
self.transmission_loop_running = True
|