summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py10
-rw-r--r--synapse/federation/federation_server.py34
-rw-r--r--synapse/federation/send_queue.py26
-rw-r--r--synapse/federation/sender/__init__.py14
-rw-r--r--synapse/federation/sender/per_destination_queue.py18
-rw-r--r--synapse/federation/transport/client.py8
-rw-r--r--synapse/federation/transport/server.py24
7 files changed, 62 insertions, 72 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index ed09c6af1f..c767d30627 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -86,7 +86,7 @@ class FederationClient(FederationBase):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
-        self.pdu_destination_tried = {}  # type: Dict[str, Dict[str, int]]
+        self.pdu_destination_tried: Dict[str, Dict[str, int]] = {}
         self._clock.looping_call(self._clear_tried_cache, 60 * 1000)
         self.state = hs.get_state_handler()
         self.transport_layer = hs.get_federation_transport_client()
@@ -94,13 +94,13 @@ class FederationClient(FederationBase):
         self.hostname = hs.hostname
         self.signing_key = hs.signing_key
 
-        self._get_pdu_cache = ExpiringCache(
+        self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache(
             cache_name="get_pdu_cache",
             clock=self._clock,
             max_len=1000,
             expiry_ms=120 * 1000,
             reset_expiry_on_get=False,
-        )  # type: ExpiringCache[str, EventBase]
+        )
 
     def _clear_tried_cache(self):
         """Clear pdu_destination_tried cache"""
@@ -293,10 +293,10 @@ class FederationClient(FederationBase):
                     transaction_data,
                 )
 
-                pdu_list = [
+                pdu_list: List[EventBase] = [
                     event_from_pdu_json(p, room_version, outlier=outlier)
                     for p in transaction_data["pdus"]
-                ]  # type: List[EventBase]
+                ]
 
                 if pdu_list and pdu_list[0]:
                     pdu = pdu_list[0]
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index ac0f2ccfb3..d91f0ff32f 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -122,12 +122,12 @@ class FederationServer(FederationBase):
 
         # origins that we are currently processing a transaction from.
         # a dict from origin to txn id.
-        self._active_transactions = {}  # type: Dict[str, str]
+        self._active_transactions: Dict[str, str] = {}
 
         # We cache results for transaction with the same ID
-        self._transaction_resp_cache = ResponseCache(
+        self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
             hs.get_clock(), "fed_txn_handler", timeout_ms=30000
-        )  # type: ResponseCache[Tuple[str, str]]
+        )
 
         self.transaction_actions = TransactionActions(self.store)
 
@@ -135,12 +135,12 @@ class FederationServer(FederationBase):
 
         # We cache responses to state queries, as they take a while and often
         # come in waves.
-        self._state_resp_cache = ResponseCache(
-            hs.get_clock(), "state_resp", timeout_ms=30000
-        )  # type: ResponseCache[Tuple[str, Optional[str]]]
-        self._state_ids_resp_cache = ResponseCache(
+        self._state_resp_cache: ResponseCache[
+            Tuple[str, Optional[str]]
+        ] = ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000)
+        self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache(
             hs.get_clock(), "state_ids_resp", timeout_ms=30000
-        )  # type: ResponseCache[Tuple[str, str]]
+        )
 
         self._federation_metrics_domains = (
             hs.config.federation.federation_metrics_domains
@@ -337,7 +337,7 @@ class FederationServer(FederationBase):
 
         origin_host, _ = parse_server_name(origin)
 
-        pdus_by_room = {}  # type: Dict[str, List[EventBase]]
+        pdus_by_room: Dict[str, List[EventBase]] = {}
 
         newest_pdu_ts = 0
 
@@ -516,9 +516,9 @@ class FederationServer(FederationBase):
         self, room_id: str, event_id: Optional[str]
     ) -> Dict[str, list]:
         if event_id:
-            pdus = await self.handler.get_state_for_pdu(
+            pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu(
                 room_id, event_id
-            )  # type: Iterable[EventBase]
+            )
         else:
             pdus = (await self.state.get_current_state(room_id)).values()
 
@@ -791,7 +791,7 @@ class FederationServer(FederationBase):
         log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
         results = await self.store.claim_e2e_one_time_keys(query)
 
-        json_result = {}  # type: Dict[str, Dict[str, dict]]
+        json_result: Dict[str, Dict[str, dict]] = {}
         for user_id, device_keys in results.items():
             for device_id, keys in device_keys.items():
                 for key_id, json_str in keys.items():
@@ -1119,17 +1119,13 @@ class FederationHandlerRegistry:
         self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs)
         self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs)
 
-        self.edu_handlers = (
-            {}
-        )  # type: Dict[str, Callable[[str, dict], Awaitable[None]]]
-        self.query_handlers = (
-            {}
-        )  # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]]
+        self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {}
+        self.query_handlers: Dict[str, Callable[[dict], Awaitable[JsonDict]]] = {}
 
         # Map from type to instance names that we should route EDU handling to.
         # We randomly choose one instance from the list to route to for each new
         # EDU received.
-        self._edu_type_to_instance = {}  # type: Dict[str, List[str]]
+        self._edu_type_to_instance: Dict[str, List[str]] = {}
 
     def register_edu_handler(
         self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]]
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 65d76ea974..1fbf325fdc 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -71,34 +71,32 @@ class FederationRemoteSendQueue(AbstractFederationSender):
         # We may have multiple federation sender instances, so we need to track
         # their positions separately.
         self._sender_instances = hs.config.worker.federation_shard_config.instances
-        self._sender_positions = {}  # type: Dict[str, int]
+        self._sender_positions: Dict[str, int] = {}
 
         # Pending presence map user_id -> UserPresenceState
-        self.presence_map = {}  # type: Dict[str, UserPresenceState]
+        self.presence_map: Dict[str, UserPresenceState] = {}
 
         # Stores the destinations we need to explicitly send presence to about a
         # given user.
         # Stream position -> (user_id, destinations)
-        self.presence_destinations = (
-            SortedDict()
-        )  # type: SortedDict[int, Tuple[str, Iterable[str]]]
+        self.presence_destinations: SortedDict[
+            int, Tuple[str, Iterable[str]]
+        ] = SortedDict()
 
         # (destination, key) -> EDU
-        self.keyed_edu = {}  # type: Dict[Tuple[str, tuple], Edu]
+        self.keyed_edu: Dict[Tuple[str, tuple], Edu] = {}
 
         # stream position -> (destination, key)
-        self.keyed_edu_changed = (
-            SortedDict()
-        )  # type: SortedDict[int, Tuple[str, tuple]]
+        self.keyed_edu_changed: SortedDict[int, Tuple[str, tuple]] = SortedDict()
 
-        self.edus = SortedDict()  # type: SortedDict[int, Edu]
+        self.edus: SortedDict[int, Edu] = SortedDict()
 
         # stream ID for the next entry into keyed_edu_changed/edus.
         self.pos = 1
 
         # map from stream ID to the time that stream entry was generated, so that we
         # can clear out entries after a while
-        self.pos_time = SortedDict()  # type: SortedDict[int, int]
+        self.pos_time: SortedDict[int, int] = SortedDict()
 
         # EVERYTHING IS SAD. In particular, python only makes new scopes when
         # we make a new function, so we need to make a new function so the inner
@@ -291,7 +289,7 @@ class FederationRemoteSendQueue(AbstractFederationSender):
 
         # list of tuple(int, BaseFederationRow), where the first is the position
         # of the federation stream.
-        rows = []  # type: List[Tuple[int, BaseFederationRow]]
+        rows: List[Tuple[int, BaseFederationRow]] = []
 
         # Fetch presence to send to destinations
         i = self.presence_destinations.bisect_right(from_token)
@@ -445,11 +443,11 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-_rowtypes = (
+_rowtypes: Tuple[Type[BaseFederationRow], ...] = (
     PresenceDestinationsRow,
     KeyedEduRow,
     EduRow,
-)  # type: Tuple[Type[BaseFederationRow], ...]
+)
 
 TypeToRow = {Row.TypeId: Row for Row in _rowtypes}
 
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index deb40f4610..0960f033bc 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -148,14 +148,14 @@ class FederationSender(AbstractFederationSender):
         self.clock = hs.get_clock()
         self.is_mine_id = hs.is_mine_id
 
-        self._presence_router = None  # type: Optional[PresenceRouter]
+        self._presence_router: Optional["PresenceRouter"] = None
         self._transaction_manager = TransactionManager(hs)
 
         self._instance_name = hs.get_instance_name()
         self._federation_shard_config = hs.config.worker.federation_shard_config
 
         # map from destination to PerDestinationQueue
-        self._per_destination_queues = {}  # type: Dict[str, PerDestinationQueue]
+        self._per_destination_queues: Dict[str, PerDestinationQueue] = {}
 
         LaterGauge(
             "synapse_federation_transaction_queue_pending_destinations",
@@ -192,9 +192,7 @@ class FederationSender(AbstractFederationSender):
         # awaiting a call to flush_read_receipts_for_room. The presence of an entry
         # here for a given room means that we are rate-limiting RR flushes to that room,
         # and that there is a pending call to _flush_rrs_for_room in the system.
-        self._queues_awaiting_rr_flush_by_room = (
-            {}
-        )  # type: Dict[str, Set[PerDestinationQueue]]
+        self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {}
 
         self._rr_txn_interval_per_room_ms = (
             1000.0 / hs.config.federation_rr_transactions_per_room_per_second
@@ -265,7 +263,7 @@ class FederationSender(AbstractFederationSender):
                     if not event.internal_metadata.should_proactively_send():
                         return
 
-                    destinations = None  # type: Optional[Set[str]]
+                    destinations: Optional[Set[str]] = None
                     if not event.prev_event_ids():
                         # If there are no prev event IDs then the state is empty
                         # and so no remote servers in the room
@@ -331,7 +329,7 @@ class FederationSender(AbstractFederationSender):
                         for event in events:
                             await handle_event(event)
 
-                events_by_room = {}  # type: Dict[str, List[EventBase]]
+                events_by_room: Dict[str, List[EventBase]] = {}
                 for event in events:
                     events_by_room.setdefault(event.room_id, []).append(event)
 
@@ -628,7 +626,7 @@ class FederationSender(AbstractFederationSender):
         In order to reduce load spikes, adds a delay between each destination.
         """
 
-        last_processed = None  # type: Optional[str]
+        last_processed: Optional[str] = None
 
         while True:
             destinations_to_wake = (
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3a2efd56ee..d06a3aff19 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -105,34 +105,34 @@ class PerDestinationQueue:
         # catch-up at startup.
         # New events will only be sent once this is finished, at which point
         # _catching_up is flipped to False.
-        self._catching_up = True  # type: bool
+        self._catching_up: bool = True
 
         # The stream_ordering of the most recent PDU that was discarded due to
         # being in catch-up mode.
-        self._catchup_last_skipped = 0  # type: int
+        self._catchup_last_skipped: int = 0
 
         # Cache of the last successfully-transmitted stream ordering for this
         # destination (we are the only updater so this is safe)
-        self._last_successful_stream_ordering = None  # type: Optional[int]
+        self._last_successful_stream_ordering: Optional[int] = None
 
         # a queue of pending PDUs
-        self._pending_pdus = []  # type: List[EventBase]
+        self._pending_pdus: List[EventBase] = []
 
         # XXX this is never actually used: see
         # https://github.com/matrix-org/synapse/issues/7549
-        self._pending_edus = []  # type: List[Edu]
+        self._pending_edus: List[Edu] = []
 
         # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered
         # based on their key (e.g. typing events by room_id)
         # Map of (edu_type, key) -> Edu
-        self._pending_edus_keyed = {}  # type: Dict[Tuple[str, Hashable], Edu]
+        self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {}
 
         # Map of user_id -> UserPresenceState of pending presence to be sent to this
         # destination
-        self._pending_presence = {}  # type: Dict[str, UserPresenceState]
+        self._pending_presence: Dict[str, UserPresenceState] = {}
 
         # room_id -> receipt_type -> user_id -> receipt_dict
-        self._pending_rrs = {}  # type: Dict[str, Dict[str, Dict[str, dict]]]
+        self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {}
         self._rrs_pending_flush = False
 
         # stream_id of last successfully sent to-device message.
@@ -243,7 +243,7 @@ class PerDestinationQueue:
         )
 
     async def _transaction_transmission_loop(self) -> None:
-        pending_pdus = []  # type: List[EventBase]
+        pending_pdus: List[EventBase] = []
         try:
             self.transmission_loop_running = True
 
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index c9e7c57461..98b1bf77fd 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -395,9 +395,9 @@ class TransportLayerClient:
             # this uses MSC2197 (Search Filtering over Federation)
             path = _create_v1_path("/publicRooms")
 
-            data = {
+            data: Dict[str, Any] = {
                 "include_all_networks": "true" if include_all_networks else "false"
-            }  # type: Dict[str, Any]
+            }
             if third_party_instance_id:
                 data["third_party_instance_id"] = third_party_instance_id
             if limit:
@@ -423,9 +423,9 @@ class TransportLayerClient:
         else:
             path = _create_v1_path("/publicRooms")
 
-            args = {
+            args: Dict[str, Any] = {
                 "include_all_networks": "true" if include_all_networks else "false"
-            }  # type: Dict[str, Any]
+            }
             if third_party_instance_id:
                 args["third_party_instance_id"] = (third_party_instance_id,)
             if limit:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 0b21b375ee..2974d4d0cc 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -1013,7 +1013,7 @@ class PublicRoomList(BaseFederationServlet):
         if not self.allow_access:
             raise FederationDeniedError(origin)
 
-        limit = int(content.get("limit", 100))  # type: Optional[int]
+        limit: Optional[int] = int(content.get("limit", 100))
         since_token = content.get("since", None)
         search_filter = content.get("filter", None)
 
@@ -1991,7 +1991,7 @@ class RoomComplexityServlet(BaseFederationServlet):
         return 200, complexity
 
 
-FEDERATION_SERVLET_CLASSES = (
+FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationSendServlet,
     FederationEventServlet,
     FederationStateV1Servlet,
@@ -2019,15 +2019,13 @@ FEDERATION_SERVLET_CLASSES = (
     FederationSpaceSummaryServlet,
     FederationV1SendKnockServlet,
     FederationMakeKnockServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
-OPENID_SERVLET_CLASSES = (
-    OpenIdUserInfo,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+OPENID_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (OpenIdUserInfo,)
 
-ROOM_LIST_CLASSES = (PublicRoomList,)  # type: Tuple[Type[PublicRoomList], ...]
+ROOM_LIST_CLASSES: Tuple[Type[PublicRoomList], ...] = (PublicRoomList,)
 
-GROUP_SERVER_SERVLET_CLASSES = (
+GROUP_SERVER_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationGroupsProfileServlet,
     FederationGroupsSummaryServlet,
     FederationGroupsRoomsServlet,
@@ -2046,19 +2044,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsAddRoomsServlet,
     FederationGroupsAddRoomsConfigServlet,
     FederationGroupsSettingJoinPolicyServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
 
-GROUP_LOCAL_SERVLET_CLASSES = (
+GROUP_LOCAL_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationGroupsLocalInviteServlet,
     FederationGroupsRemoveLocalUserServlet,
     FederationGroupsBulkPublicisedServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
 
-GROUP_ATTESTATION_SERVLET_CLASSES = (
+GROUP_ATTESTATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationGroupsRenewAttestaionServlet,
-)  # type: Tuple[Type[BaseFederationServlet], ...]
+)
 
 
 DEFAULT_SERVLET_GROUPS = (