diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ed09c6af1f..c767d30627 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -86,7 +86,7 @@ class FederationClient(FederationBase):
def __init__(self, hs: "HomeServer"):
super().__init__(hs)
- self.pdu_destination_tried = {} # type: Dict[str, Dict[str, int]]
+ self.pdu_destination_tried: Dict[str, Dict[str, int]] = {}
self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
self.state = hs.get_state_handler()
self.transport_layer = hs.get_federation_transport_client()
@@ -94,13 +94,13 @@ class FederationClient(FederationBase):
self.hostname = hs.hostname
self.signing_key = hs.signing_key
- self._get_pdu_cache = ExpiringCache(
+ self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
cache_name="get_pdu_cache",
clock=self._clock,
max_len=1000,
expiry_ms=120 * 1000,
reset_expiry_on_get=False,
- ) # type: ExpiringCache[str, EventBase]
+ )
def _clear_tried_cache(self):
"""Clear pdu_destination_tried cache"""
@@ -293,10 +293,10 @@ class FederationClient(FederationBase):
transaction_data,
)
- pdu_list = [
+ pdu_list: List[EventBase] = [
event_from_pdu_json(p, room_version, outlier=outlier)
for p in transaction_data["pdus"]
- ] # type: List[EventBase]
+ ]
if pdu_list and pdu_list[0]:
pdu = pdu_list[0]
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ac0f2ccfb3..d91f0ff32f 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -122,12 +122,12 @@ class FederationServer(FederationBase):
# origins that we are currently processing a transaction from.
# a dict from origin to txn id.
- self._active_transactions = {} # type: Dict[str, str]
+ self._active_transactions: Dict[str, str] = {}
# We cache results for transaction with the same ID
- self._transaction_resp_cache = ResponseCache(
+ self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "fed_txn_handler", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, str]]
+ )
self.transaction_actions = TransactionActions(self.store)
@@ -135,12 +135,12 @@ class FederationServer(FederationBase):
# We cache responses to state queries, as they take a while and often
# come in waves.
- self._state_resp_cache = ResponseCache(
- hs.get_clock(), "state_resp", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, Optional[str]]]
- self._state_ids_resp_cache = ResponseCache(
+ self._state_resp_cache: ResponseCache[
+ Tuple[str, Optional[str]]
+ ] = ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000)
+ self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
hs.get_clock(), "state_ids_resp", timeout_ms=30000
- ) # type: ResponseCache[Tuple[str, str]]
+ )
self._federation_metrics_domains = (
hs.config.federation.federation_metrics_domains
@@ -337,7 +337,7 @@ class FederationServer(FederationBase):
origin_host, _ = parse_server_name(origin)
- pdus_by_room = {} # type: Dict[str, List[EventBase]]
+ pdus_by_room: Dict[str, List[EventBase]] = {}
newest_pdu_ts = 0
@@ -516,9 +516,9 @@ class FederationServer(FederationBase):
self, room_id: str, event_id: Optional[str]
) -> Dict[str, list]:
if event_id:
- pdus = await self.handler.get_state_for_pdu(
+ pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu(
room_id, event_id
- ) # type: Iterable[EventBase]
+ )
else:
pdus = (await self.state.get_current_state(room_id)).values()
@@ -791,7 +791,7 @@ class FederationServer(FederationBase):
log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
results = await self.store.claim_e2e_one_time_keys(query)
- json_result = {} # type: Dict[str, Dict[str, dict]]
+ json_result: Dict[str, Dict[str, dict]] = {}
for user_id, device_keys in results.items():
for device_id, keys in device_keys.items():
for key_id, json_str in keys.items():
@@ -1119,17 +1119,13 @@ class FederationHandlerRegistry:
self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
- self.edu_handlers = (
- {}
- ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
- self.query_handlers = (
- {}
- ) # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]]
+ self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
+ self.query_handlers: Dict[str, Callable[[dict], Awaitable[JsonDict]]] = {}
# Map from type to instance names that we should route EDU handling to.
# We randomly choose one instance from the list to route to for each new
# EDU received.
- self._edu_type_to_instance = {} # type: Dict[str, List[str]]
+ self._edu_type_to_instance: Dict[str, List[str]] = {}
def register_edu_handler(
self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 65d76ea974..1fbf325fdc 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -71,34 +71,32 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# We may have multiple federation sender instances, so we need to track
# their positions separately.
self._sender_instances = hs.config.worker.federation_shard_config.instances
- self._sender_positions = {} # type: Dict[str, int]
+ self._sender_positions: Dict[str, int] = {}
# Pending presence map user_id -> UserPresenceState
- self.presence_map = {} # type: Dict[str, UserPresenceState]
+ self.presence_map: Dict[str, UserPresenceState] = {}
# Stores the destinations we need to explicitly send presence to about a
# given user.
# Stream position -> (user_id, destinations)
- self.presence_destinations = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, Iterable[str]]]
+ self.presence_destinations: SortedDict[
+ int, Tuple[str, Iterable[str]]
+ ] = SortedDict()
# (destination, key) -> EDU
- self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu]
+ self.keyed_edu: Dict[Tuple[str, tuple], Edu] = {}
# stream position -> (destination, key)
- self.keyed_edu_changed = (
- SortedDict()
- ) # type: SortedDict[int, Tuple[str, tuple]]
+ self.keyed_edu_changed: SortedDict[int, Tuple[str, tuple]] = SortedDict()
- self.edus = SortedDict() # type: SortedDict[int, Edu]
+ self.edus: SortedDict[int, Edu] = SortedDict()
# stream ID for the next entry into keyed_edu_changed/edus.
self.pos = 1
# map from stream ID to the time that stream entry was generated, so that we
# can clear out entries after a while
- self.pos_time = SortedDict() # type: SortedDict[int, int]
+ self.pos_time: SortedDict[int, int] = SortedDict()
# EVERYTHING IS SAD. In particular, python only makes new scopes when
# we make a new function, so we need to make a new function so the inner
@@ -291,7 +289,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
# list of tuple(int, BaseFederationRow), where the first is the position
# of the federation stream.
- rows = [] # type: List[Tuple[int, BaseFederationRow]]
+ rows: List[Tuple[int, BaseFederationRow]] = []
# Fetch presence to send to destinations
i = self.presence_destinations.bisect_right(from_token)
@@ -445,11 +443,11 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu
buff.edus.setdefault(self.edu.destination, []).append(self.edu)
-_rowtypes = (
+_rowtypes: Tuple[Type[BaseFederationRow], ...] = (
PresenceDestinationsRow,
KeyedEduRow,
EduRow,
-) # type: Tuple[Type[BaseFederationRow], ...]
+)
TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index deb40f4610..0960f033bc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -148,14 +148,14 @@ class FederationSender(AbstractFederationSender):
self.clock = hs.get_clock()
self.is_mine_id = hs.is_mine_id
- self._presence_router = None # type: Optional[PresenceRouter]
+ self._presence_router: Optional["PresenceRouter"] = None
self._transaction_manager = TransactionManager(hs)
self._instance_name = hs.get_instance_name()
self._federation_shard_config = hs.config.worker.federation_shard_config
# map from destination to PerDestinationQueue
- self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue]
+ self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
LaterGauge(
"synapse_federation_transaction_queue_pending_destinations",
@@ -192,9 +192,7 @@ class FederationSender(AbstractFederationSender):
# awaiting a call to flush_read_receipts_for_room. The presence of an entry
# here for a given room means that we are rate-limiting RR flushes to that room,
# and that there is a pending call to _flush_rrs_for_room in the system.
- self._queues_awaiting_rr_flush_by_room = (
- {}
- ) # type: Dict[str, Set[PerDestinationQueue]]
+ self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
self._rr_txn_interval_per_room_ms = (
1000.0 / hs.config.federation_rr_transactions_per_room_per_second
@@ -265,7 +263,7 @@ class FederationSender(AbstractFederationSender):
if not event.internal_metadata.should_proactively_send():
return
- destinations = None # type: Optional[Set[str]]
+ destinations: Optional[Set[str]] = None
if not event.prev_event_ids():
# If there are no prev event IDs then the state is empty
# and so no remote servers in the room
@@ -331,7 +329,7 @@ class FederationSender(AbstractFederationSender):
for event in events:
await handle_event(event)
- events_by_room = {} # type: Dict[str, List[EventBase]]
+ events_by_room: Dict[str, List[EventBase]] = {}
for event in events:
events_by_room.setdefault(event.room_id, []).append(event)
@@ -628,7 +626,7 @@ class FederationSender(AbstractFederationSender):
In order to reduce load spikes, adds a delay between each destination.
"""
- last_processed = None # type: Optional[str]
+ last_processed: Optional[str] = None
while True:
destinations_to_wake = (
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3a2efd56ee..d06a3aff19 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -105,34 +105,34 @@ class PerDestinationQueue:
# catch-up at startup.
# New events will only be sent once this is finished, at which point
# _catching_up is flipped to False.
- self._catching_up = True # type: bool
+ self._catching_up: bool = True
# The stream_ordering of the most recent PDU that was discarded due to
# being in catch-up mode.
- self._catchup_last_skipped = 0 # type: int
+ self._catchup_last_skipped: int = 0
# Cache of the last successfully-transmitted stream ordering for this
# destination (we are the only updater so this is safe)
- self._last_successful_stream_ordering = None # type: Optional[int]
+ self._last_successful_stream_ordering: Optional[int] = None
# a queue of pending PDUs
- self._pending_pdus = [] # type: List[EventBase]
+ self._pending_pdus: List[EventBase] = []
# XXX this is never actually used: see
# https://github.com/matrix-org/synapse/issues/7549
- self._pending_edus = [] # type: List[Edu]
+ self._pending_edus: List[Edu] = []
# Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
# based on their key (e.g. typing events by room_id)
# Map of (edu_type, key) -> Edu
- self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu]
+ self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {}
# Map of user_id -> UserPresenceState of pending presence to be sent to this
# destination
- self._pending_presence = {} # type: Dict[str, UserPresenceState]
+ self._pending_presence: Dict[str, UserPresenceState] = {}
# room_id -> receipt_type -> user_id -> receipt_dict
- self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]]
+ self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
self._rrs_pending_flush = False
# stream_id of last successfully sent to-device message.
@@ -243,7 +243,7 @@ class PerDestinationQueue:
)
async def _transaction_transmission_loop(self) -> None:
- pending_pdus = [] # type: List[EventBase]
+ pending_pdus: List[EventBase] = []
try:
self.transmission_loop_running = True
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index c9e7c57461..98b1bf77fd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -395,9 +395,9 @@ class TransportLayerClient:
# this uses MSC2197 (Search Filtering over Federation)
path = _create_v1_path("/publicRooms")
- data = {
+ data: Dict[str, Any] = {
"include_all_networks": "true" if include_all_networks else "false"
- } # type: Dict[str, Any]
+ }
if third_party_instance_id:
data["third_party_instance_id"] = third_party_instance_id
if limit:
@@ -423,9 +423,9 @@ class TransportLayerClient:
else:
path = _create_v1_path("/publicRooms")
- args = {
+ args: Dict[str, Any] = {
"include_all_networks": "true" if include_all_networks else "false"
- } # type: Dict[str, Any]
+ }
if third_party_instance_id:
args["third_party_instance_id"] = (third_party_instance_id,)
if limit:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 0b21b375ee..2974d4d0cc 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1013,7 +1013,7 @@ class PublicRoomList(BaseFederationServlet):
if not self.allow_access:
raise FederationDeniedError(origin)
- limit = int(content.get("limit", 100)) # type: Optional[int]
+ limit: Optional[int] = int(content.get("limit", 100))
since_token = content.get("since", None)
search_filter = content.get("filter", None)
@@ -1991,7 +1991,7 @@ class RoomComplexityServlet(BaseFederationServlet):
return 200, complexity
-FEDERATION_SERVLET_CLASSES = (
+FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationSendServlet,
FederationEventServlet,
FederationStateV1Servlet,
@@ -2019,15 +2019,13 @@ FEDERATION_SERVLET_CLASSES = (
FederationSpaceSummaryServlet,
FederationV1SendKnockServlet,
FederationMakeKnockServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
-OPENID_SERVLET_CLASSES = (
- OpenIdUserInfo,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+OPENID_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (OpenIdUserInfo,)
-ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...]
+ROOM_LIST_CLASSES: Tuple[Type[PublicRoomList], ...] = (PublicRoomList,)
-GROUP_SERVER_SERVLET_CLASSES = (
+GROUP_SERVER_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationGroupsProfileServlet,
FederationGroupsSummaryServlet,
FederationGroupsRoomsServlet,
@@ -2046,19 +2044,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
FederationGroupsAddRoomsServlet,
FederationGroupsAddRoomsConfigServlet,
FederationGroupsSettingJoinPolicyServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
-GROUP_LOCAL_SERVLET_CLASSES = (
+GROUP_LOCAL_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationGroupsLocalInviteServlet,
FederationGroupsRemoveLocalUserServlet,
FederationGroupsBulkPublicisedServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
-GROUP_ATTESTATION_SERVLET_CLASSES = (
+GROUP_ATTESTATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
FederationGroupsRenewAttestaionServlet,
-) # type: Tuple[Type[BaseFederationServlet], ...]
+)
DEFAULT_SERVLET_GROUPS = (
|