diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_client.py | 10 | ||||
-rw-r--r-- | synapse/federation/federation_server.py | 34 | ||||
-rw-r--r-- | synapse/federation/send_queue.py | 26 | ||||
-rw-r--r-- | synapse/federation/sender/__init__.py | 14 | ||||
-rw-r--r-- | synapse/federation/sender/per_destination_queue.py | 18 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 8 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 24 |
7 files changed, 62 insertions, 72 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index ed09c6af1f..c767d30627 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -86,7 +86,7 @@ class FederationClient(FederationBase): def __init__(self, hs: "HomeServer"): super().__init__(hs) - self.pdu_destination_tried = {} # type: Dict[str, Dict[str, int]] + self.pdu_destination_tried: Dict[str, Dict[str, int]] = {} self._clock.looping_call(self._clear_tried_cache, 60 * 1000) self.state = hs.get_state_handler() self.transport_layer = hs.get_federation_transport_client() @@ -94,13 +94,13 @@ class FederationClient(FederationBase): self.hostname = hs.hostname self.signing_key = hs.signing_key - self._get_pdu_cache = ExpiringCache( + self._get_pdu_cache: ExpiringCache[str, EventBase] = ExpiringCache( cache_name="get_pdu_cache", clock=self._clock, max_len=1000, expiry_ms=120 * 1000, reset_expiry_on_get=False, - ) # type: ExpiringCache[str, EventBase] + ) def _clear_tried_cache(self): """Clear pdu_destination_tried cache""" @@ -293,10 +293,10 @@ class FederationClient(FederationBase): transaction_data, ) - pdu_list = [ + pdu_list: List[EventBase] = [ event_from_pdu_json(p, room_version, outlier=outlier) for p in transaction_data["pdus"] - ] # type: List[EventBase] + ] if pdu_list and pdu_list[0]: pdu = pdu_list[0] diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index ac0f2ccfb3..d91f0ff32f 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -122,12 +122,12 @@ class FederationServer(FederationBase): # origins that we are currently processing a transaction from. # a dict from origin to txn id. - self._active_transactions = {} # type: Dict[str, str] + self._active_transactions: Dict[str, str] = {} # We cache results for transaction with the same ID - self._transaction_resp_cache = ResponseCache( + self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache( hs.get_clock(), "fed_txn_handler", timeout_ms=30000 - ) # type: ResponseCache[Tuple[str, str]] + ) self.transaction_actions = TransactionActions(self.store) @@ -135,12 +135,12 @@ class FederationServer(FederationBase): # We cache responses to state queries, as they take a while and often # come in waves. - self._state_resp_cache = ResponseCache( - hs.get_clock(), "state_resp", timeout_ms=30000 - ) # type: ResponseCache[Tuple[str, Optional[str]]] - self._state_ids_resp_cache = ResponseCache( + self._state_resp_cache: ResponseCache[ + Tuple[str, Optional[str]] + ] = ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000) + self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache( hs.get_clock(), "state_ids_resp", timeout_ms=30000 - ) # type: ResponseCache[Tuple[str, str]] + ) self._federation_metrics_domains = ( hs.config.federation.federation_metrics_domains @@ -337,7 +337,7 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) - pdus_by_room = {} # type: Dict[str, List[EventBase]] + pdus_by_room: Dict[str, List[EventBase]] = {} newest_pdu_ts = 0 @@ -516,9 +516,9 @@ class FederationServer(FederationBase): self, room_id: str, event_id: Optional[str] ) -> Dict[str, list]: if event_id: - pdus = await self.handler.get_state_for_pdu( + pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu( room_id, event_id - ) # type: Iterable[EventBase] + ) else: pdus = (await self.state.get_current_state(room_id)).values() @@ -791,7 +791,7 @@ class FederationServer(FederationBase): log_kv({"message": "Claiming one time keys.", "user, device pairs": query}) results = await self.store.claim_e2e_one_time_keys(query) - json_result = {} # type: Dict[str, Dict[str, dict]] + json_result: Dict[str, Dict[str, dict]] = {} for user_id, device_keys in results.items(): for device_id, keys in device_keys.items(): for key_id, json_str in keys.items(): @@ -1119,17 +1119,13 @@ class FederationHandlerRegistry: self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) - self.edu_handlers = ( - {} - ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] - self.query_handlers = ( - {} - ) # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]] + self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {} + self.query_handlers: Dict[str, Callable[[dict], Awaitable[JsonDict]]] = {} # Map from type to instance names that we should route EDU handling to. # We randomly choose one instance from the list to route to for each new # EDU received. - self._edu_type_to_instance = {} # type: Dict[str, List[str]] + self._edu_type_to_instance: Dict[str, List[str]] = {} def register_edu_handler( self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py index 65d76ea974..1fbf325fdc 100644 --- a/synapse/federation/send_queue.py +++ b/synapse/federation/send_queue.py @@ -71,34 +71,32 @@ class FederationRemoteSendQueue(AbstractFederationSender): # We may have multiple federation sender instances, so we need to track # their positions separately. self._sender_instances = hs.config.worker.federation_shard_config.instances - self._sender_positions = {} # type: Dict[str, int] + self._sender_positions: Dict[str, int] = {} # Pending presence map user_id -> UserPresenceState - self.presence_map = {} # type: Dict[str, UserPresenceState] + self.presence_map: Dict[str, UserPresenceState] = {} # Stores the destinations we need to explicitly send presence to about a # given user. # Stream position -> (user_id, destinations) - self.presence_destinations = ( - SortedDict() - ) # type: SortedDict[int, Tuple[str, Iterable[str]]] + self.presence_destinations: SortedDict[ + int, Tuple[str, Iterable[str]] + ] = SortedDict() # (destination, key) -> EDU - self.keyed_edu = {} # type: Dict[Tuple[str, tuple], Edu] + self.keyed_edu: Dict[Tuple[str, tuple], Edu] = {} # stream position -> (destination, key) - self.keyed_edu_changed = ( - SortedDict() - ) # type: SortedDict[int, Tuple[str, tuple]] + self.keyed_edu_changed: SortedDict[int, Tuple[str, tuple]] = SortedDict() - self.edus = SortedDict() # type: SortedDict[int, Edu] + self.edus: SortedDict[int, Edu] = SortedDict() # stream ID for the next entry into keyed_edu_changed/edus. self.pos = 1 # map from stream ID to the time that stream entry was generated, so that we # can clear out entries after a while - self.pos_time = SortedDict() # type: SortedDict[int, int] + self.pos_time: SortedDict[int, int] = SortedDict() # EVERYTHING IS SAD. In particular, python only makes new scopes when # we make a new function, so we need to make a new function so the inner @@ -291,7 +289,7 @@ class FederationRemoteSendQueue(AbstractFederationSender): # list of tuple(int, BaseFederationRow), where the first is the position # of the federation stream. - rows = [] # type: List[Tuple[int, BaseFederationRow]] + rows: List[Tuple[int, BaseFederationRow]] = [] # Fetch presence to send to destinations i = self.presence_destinations.bisect_right(from_token) @@ -445,11 +443,11 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))): # Edu buff.edus.setdefault(self.edu.destination, []).append(self.edu) -_rowtypes = ( +_rowtypes: Tuple[Type[BaseFederationRow], ...] = ( PresenceDestinationsRow, KeyedEduRow, EduRow, -) # type: Tuple[Type[BaseFederationRow], ...] +) TypeToRow = {Row.TypeId: Row for Row in _rowtypes} diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index deb40f4610..0960f033bc 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -148,14 +148,14 @@ class FederationSender(AbstractFederationSender): self.clock = hs.get_clock() self.is_mine_id = hs.is_mine_id - self._presence_router = None # type: Optional[PresenceRouter] + self._presence_router: Optional["PresenceRouter"] = None self._transaction_manager = TransactionManager(hs) self._instance_name = hs.get_instance_name() self._federation_shard_config = hs.config.worker.federation_shard_config # map from destination to PerDestinationQueue - self._per_destination_queues = {} # type: Dict[str, PerDestinationQueue] + self._per_destination_queues: Dict[str, PerDestinationQueue] = {} LaterGauge( "synapse_federation_transaction_queue_pending_destinations", @@ -192,9 +192,7 @@ class FederationSender(AbstractFederationSender): # awaiting a call to flush_read_receipts_for_room. The presence of an entry # here for a given room means that we are rate-limiting RR flushes to that room, # and that there is a pending call to _flush_rrs_for_room in the system. - self._queues_awaiting_rr_flush_by_room = ( - {} - ) # type: Dict[str, Set[PerDestinationQueue]] + self._queues_awaiting_rr_flush_by_room: Dict[str, Set[PerDestinationQueue]] = {} self._rr_txn_interval_per_room_ms = ( 1000.0 / hs.config.federation_rr_transactions_per_room_per_second @@ -265,7 +263,7 @@ class FederationSender(AbstractFederationSender): if not event.internal_metadata.should_proactively_send(): return - destinations = None # type: Optional[Set[str]] + destinations: Optional[Set[str]] = None if not event.prev_event_ids(): # If there are no prev event IDs then the state is empty # and so no remote servers in the room @@ -331,7 +329,7 @@ class FederationSender(AbstractFederationSender): for event in events: await handle_event(event) - events_by_room = {} # type: Dict[str, List[EventBase]] + events_by_room: Dict[str, List[EventBase]] = {} for event in events: events_by_room.setdefault(event.room_id, []).append(event) @@ -628,7 +626,7 @@ class FederationSender(AbstractFederationSender): In order to reduce load spikes, adds a delay between each destination. """ - last_processed = None # type: Optional[str] + last_processed: Optional[str] = None while True: destinations_to_wake = ( diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py index 3a2efd56ee..d06a3aff19 100644 --- a/synapse/federation/sender/per_destination_queue.py +++ b/synapse/federation/sender/per_destination_queue.py @@ -105,34 +105,34 @@ class PerDestinationQueue: # catch-up at startup. # New events will only be sent once this is finished, at which point # _catching_up is flipped to False. - self._catching_up = True # type: bool + self._catching_up: bool = True # The stream_ordering of the most recent PDU that was discarded due to # being in catch-up mode. - self._catchup_last_skipped = 0 # type: int + self._catchup_last_skipped: int = 0 # Cache of the last successfully-transmitted stream ordering for this # destination (we are the only updater so this is safe) - self._last_successful_stream_ordering = None # type: Optional[int] + self._last_successful_stream_ordering: Optional[int] = None # a queue of pending PDUs - self._pending_pdus = [] # type: List[EventBase] + self._pending_pdus: List[EventBase] = [] # XXX this is never actually used: see # https://github.com/matrix-org/synapse/issues/7549 - self._pending_edus = [] # type: List[Edu] + self._pending_edus: List[Edu] = [] # Pending EDUs by their "key". Keyed EDUs are EDUs that get clobbered # based on their key (e.g. typing events by room_id) # Map of (edu_type, key) -> Edu - self._pending_edus_keyed = {} # type: Dict[Tuple[str, Hashable], Edu] + self._pending_edus_keyed: Dict[Tuple[str, Hashable], Edu] = {} # Map of user_id -> UserPresenceState of pending presence to be sent to this # destination - self._pending_presence = {} # type: Dict[str, UserPresenceState] + self._pending_presence: Dict[str, UserPresenceState] = {} # room_id -> receipt_type -> user_id -> receipt_dict - self._pending_rrs = {} # type: Dict[str, Dict[str, Dict[str, dict]]] + self._pending_rrs: Dict[str, Dict[str, Dict[str, dict]]] = {} self._rrs_pending_flush = False # stream_id of last successfully sent to-device message. @@ -243,7 +243,7 @@ class PerDestinationQueue: ) async def _transaction_transmission_loop(self) -> None: - pending_pdus = [] # type: List[EventBase] + pending_pdus: List[EventBase] = [] try: self.transmission_loop_running = True diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index c9e7c57461..98b1bf77fd 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -395,9 +395,9 @@ class TransportLayerClient: # this uses MSC2197 (Search Filtering over Federation) path = _create_v1_path("/publicRooms") - data = { + data: Dict[str, Any] = { "include_all_networks": "true" if include_all_networks else "false" - } # type: Dict[str, Any] + } if third_party_instance_id: data["third_party_instance_id"] = third_party_instance_id if limit: @@ -423,9 +423,9 @@ class TransportLayerClient: else: path = _create_v1_path("/publicRooms") - args = { + args: Dict[str, Any] = { "include_all_networks": "true" if include_all_networks else "false" - } # type: Dict[str, Any] + } if third_party_instance_id: args["third_party_instance_id"] = (third_party_instance_id,) if limit: diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 0b21b375ee..2974d4d0cc 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -1013,7 +1013,7 @@ class PublicRoomList(BaseFederationServlet): if not self.allow_access: raise FederationDeniedError(origin) - limit = int(content.get("limit", 100)) # type: Optional[int] + limit: Optional[int] = int(content.get("limit", 100)) since_token = content.get("since", None) search_filter = content.get("filter", None) @@ -1991,7 +1991,7 @@ class RoomComplexityServlet(BaseFederationServlet): return 200, complexity -FEDERATION_SERVLET_CLASSES = ( +FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationSendServlet, FederationEventServlet, FederationStateV1Servlet, @@ -2019,15 +2019,13 @@ FEDERATION_SERVLET_CLASSES = ( FederationSpaceSummaryServlet, FederationV1SendKnockServlet, FederationMakeKnockServlet, -) # type: Tuple[Type[BaseFederationServlet], ...] +) -OPENID_SERVLET_CLASSES = ( - OpenIdUserInfo, -) # type: Tuple[Type[BaseFederationServlet], ...] +OPENID_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (OpenIdUserInfo,) -ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...] +ROOM_LIST_CLASSES: Tuple[Type[PublicRoomList], ...] = (PublicRoomList,) -GROUP_SERVER_SERVLET_CLASSES = ( +GROUP_SERVER_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationGroupsProfileServlet, FederationGroupsSummaryServlet, FederationGroupsRoomsServlet, @@ -2046,19 +2044,19 @@ GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsAddRoomsServlet, FederationGroupsAddRoomsConfigServlet, FederationGroupsSettingJoinPolicyServlet, -) # type: Tuple[Type[BaseFederationServlet], ...] +) -GROUP_LOCAL_SERVLET_CLASSES = ( +GROUP_LOCAL_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationGroupsLocalInviteServlet, FederationGroupsRemoveLocalUserServlet, FederationGroupsBulkPublicisedServlet, -) # type: Tuple[Type[BaseFederationServlet], ...] +) -GROUP_ATTESTATION_SERVLET_CLASSES = ( +GROUP_ATTESTATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = ( FederationGroupsRenewAttestaionServlet, -) # type: Tuple[Type[BaseFederationServlet], ...] +) DEFAULT_SERVLET_GROUPS = ( |