diff options
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 43 |
1 files changed, 18 insertions, 25 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index ac0f2ccfb3..29619aeeb8 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -122,12 +122,12 @@ class FederationServer(FederationBase): # origins that we are currently processing a transaction from. # a dict from origin to txn id. - self._active_transactions = {} # type: Dict[str, str] + self._active_transactions: Dict[str, str] = {} # We cache results for transaction with the same ID - self._transaction_resp_cache = ResponseCache( + self._transaction_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache( hs.get_clock(), "fed_txn_handler", timeout_ms=30000 - ) # type: ResponseCache[Tuple[str, str]] + ) self.transaction_actions = TransactionActions(self.store) @@ -135,12 +135,12 @@ class FederationServer(FederationBase): # We cache responses to state queries, as they take a while and often # come in waves. - self._state_resp_cache = ResponseCache( - hs.get_clock(), "state_resp", timeout_ms=30000 - ) # type: ResponseCache[Tuple[str, Optional[str]]] - self._state_ids_resp_cache = ResponseCache( + self._state_resp_cache: ResponseCache[ + Tuple[str, Optional[str]] + ] = ResponseCache(hs.get_clock(), "state_resp", timeout_ms=30000) + self._state_ids_resp_cache: ResponseCache[Tuple[str, str]] = ResponseCache( hs.get_clock(), "state_ids_resp", timeout_ms=30000 - ) # type: ResponseCache[Tuple[str, str]] + ) self._federation_metrics_domains = ( hs.config.federation.federation_metrics_domains @@ -337,7 +337,7 @@ class FederationServer(FederationBase): origin_host, _ = parse_server_name(origin) - pdus_by_room = {} # type: Dict[str, List[EventBase]] + pdus_by_room: Dict[str, List[EventBase]] = {} newest_pdu_ts = 0 @@ -516,9 +516,9 @@ class FederationServer(FederationBase): self, room_id: str, event_id: Optional[str] ) -> Dict[str, list]: if event_id: - pdus = await self.handler.get_state_for_pdu( + pdus: Iterable[EventBase] = await self.handler.get_state_for_pdu( room_id, event_id - ) # type: Iterable[EventBase] + ) else: pdus = (await self.state.get_current_state(room_id)).values() @@ -562,8 +562,7 @@ class FederationServer(FederationBase): raise IncompatibleRoomVersionError(room_version=room_version) pdu = await self.handler.on_make_join_request(origin, room_id, user_id) - time_now = self._clock.time_msec() - return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} + return {"event": pdu.get_templated_pdu_json(), "room_version": room_version} async def on_invite_request( self, origin: str, content: JsonDict, room_version_id: str @@ -611,8 +610,7 @@ class FederationServer(FederationBase): room_version = await self.store.get_room_version_id(room_id) - time_now = self._clock.time_msec() - return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} + return {"event": pdu.get_templated_pdu_json(), "room_version": room_version} async def on_send_leave_request( self, origin: str, content: JsonDict, room_id: str @@ -659,9 +657,8 @@ class FederationServer(FederationBase): ) pdu = await self.handler.on_make_knock_request(origin, room_id, user_id) - time_now = self._clock.time_msec() return { - "event": pdu.get_pdu_json(time_now), + "event": pdu.get_templated_pdu_json(), "room_version": room_version.identifier, } @@ -791,7 +788,7 @@ class FederationServer(FederationBase): log_kv({"message": "Claiming one time keys.", "user, device pairs": query}) results = await self.store.claim_e2e_one_time_keys(query) - json_result = {} # type: Dict[str, Dict[str, dict]] + json_result: Dict[str, Dict[str, dict]] = {} for user_id, device_keys in results.items(): for device_id, keys in device_keys.items(): for key_id, json_str in keys.items(): @@ -1119,17 +1116,13 @@ class FederationHandlerRegistry: self._get_query_client = ReplicationGetQueryRestServlet.make_client(hs) self._send_edu = ReplicationFederationSendEduRestServlet.make_client(hs) - self.edu_handlers = ( - {} - ) # type: Dict[str, Callable[[str, dict], Awaitable[None]]] - self.query_handlers = ( - {} - ) # type: Dict[str, Callable[[dict], Awaitable[JsonDict]]] + self.edu_handlers: Dict[str, Callable[[str, dict], Awaitable[None]]] = {} + self.query_handlers: Dict[str, Callable[[dict], Awaitable[JsonDict]]] = {} # Map from type to instance names that we should route EDU handling to. # We randomly choose one instance from the list to route to for each new # EDU received. - self._edu_type_to_instance = {} # type: Dict[str, List[str]] + self._edu_type_to_instance: Dict[str, List[str]] = {} def register_edu_handler( self, edu_type: str, handler: Callable[[str, JsonDict], Awaitable[None]] |