diff options
Diffstat (limited to 'synapse/federation/federation_server.py')
-rw-r--r-- | synapse/federation/federation_server.py | 181 |
1 files changed, 111 insertions, 70 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 275b9c99d7..32a8a2ee46 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -15,7 +15,7 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Dict +from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union import six from six import iteritems @@ -38,6 +38,7 @@ from synapse.api.errors import ( UnsupportedRoomVersionError, ) from synapse.api.room_versions import KNOWN_ROOM_VERSIONS +from synapse.events import EventBase from synapse.federation.federation_base import FederationBase, event_from_pdu_json from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction @@ -94,7 +95,9 @@ class FederationServer(FederationBase): # come in waves. self._state_resp_cache = ResponseCache(hs, "state_resp", timeout_ms=30000) - async def on_backfill_request(self, origin, room_id, versions, limit): + async def on_backfill_request( + self, origin: str, room_id: str, versions: List[str], limit: int + ) -> Tuple[int, Dict[str, Any]]: with (await self._server_linearizer.queue((origin, room_id))): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -107,23 +110,25 @@ class FederationServer(FederationBase): return 200, res - async def on_incoming_transaction(self, origin, transaction_data): + async def on_incoming_transaction( + self, origin: str, transaction_data: JsonDict + ) -> Tuple[int, Dict[str, Any]]: # keep this as early as possible to make the calculated origin ts as # accurate as possible. request_time = self._clock.time_msec() transaction = Transaction(**transaction_data) - if not transaction.transaction_id: + if not transaction.transaction_id: # type: ignore raise Exception("Transaction missing transaction_id") - logger.debug("[%s] Got transaction", transaction.transaction_id) + logger.debug("[%s] Got transaction", transaction.transaction_id) # type: ignore # use a linearizer to ensure that we don't process the same transaction # multiple times in parallel. with ( await self._transaction_linearizer.queue( - (origin, transaction.transaction_id) + (origin, transaction.transaction_id) # type: ignore ) ): result = await self._handle_incoming_transaction( @@ -132,31 +137,33 @@ class FederationServer(FederationBase): return result - async def _handle_incoming_transaction(self, origin, transaction, request_time): + async def _handle_incoming_transaction( + self, origin: str, transaction: Transaction, request_time: int + ) -> Tuple[int, Dict[str, Any]]: """ Process an incoming transaction and return the HTTP response Args: - origin (unicode): the server making the request - transaction (Transaction): incoming transaction - request_time (int): timestamp that the HTTP request arrived at + origin: the server making the request + transaction: incoming transaction + request_time: timestamp that the HTTP request arrived at Returns: - Deferred[(int, object)]: http response code and body + HTTP response code and body """ response = await self.transaction_actions.have_responded(origin, transaction) if response: logger.debug( "[%s] We've already responded to this request", - transaction.transaction_id, + transaction.transaction_id, # type: ignore ) return response - logger.debug("[%s] Transaction is new", transaction.transaction_id) + logger.debug("[%s] Transaction is new", transaction.transaction_id) # type: ignore # Reject if PDU count > 50 or EDU count > 100 - if len(transaction.pdus) > 50 or ( - hasattr(transaction, "edus") and len(transaction.edus) > 100 + if len(transaction.pdus) > 50 or ( # type: ignore + hasattr(transaction, "edus") and len(transaction.edus) > 100 # type: ignore ): logger.info("Transaction PDU or EDU count too large. Returning 400") @@ -204,13 +211,13 @@ class FederationServer(FederationBase): report back to the sending server. """ - received_pdus_counter.inc(len(transaction.pdus)) + received_pdus_counter.inc(len(transaction.pdus)) # type: ignore origin_host, _ = parse_server_name(origin) - pdus_by_room = {} + pdus_by_room = {} # type: Dict[str, List[EventBase]] - for p in transaction.pdus: + for p in transaction.pdus: # type: ignore if "unsigned" in p: unsigned = p["unsigned"] if "age" in unsigned: @@ -254,7 +261,7 @@ class FederationServer(FederationBase): # require callouts to other servers to fetch missing events), but # impose a limit to avoid going too crazy with ram/cpu. - async def process_pdus_for_room(room_id): + async def process_pdus_for_room(room_id: str): logger.debug("Processing PDUs for %s", room_id) try: await self.check_server_matches_acl(origin_host, room_id) @@ -310,7 +317,9 @@ class FederationServer(FederationBase): TRANSACTION_CONCURRENCY_LIMIT, ) - async def on_context_state_request(self, origin, room_id, event_id): + async def on_context_state_request( + self, origin: str, room_id: str, event_id: str + ) -> Tuple[int, Dict[str, Any]]: origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -338,7 +347,9 @@ class FederationServer(FederationBase): return 200, resp - async def on_state_ids_request(self, origin, room_id, event_id): + async def on_state_ids_request( + self, origin: str, room_id: str, event_id: str + ) -> Tuple[int, Dict[str, Any]]: if not event_id: raise NotImplementedError("Specify an event") @@ -354,7 +365,9 @@ class FederationServer(FederationBase): return 200, {"pdu_ids": state_ids, "auth_chain_ids": auth_chain_ids} - async def _on_context_state_request_compute(self, room_id, event_id): + async def _on_context_state_request_compute( + self, room_id: str, event_id: str + ) -> Dict[str, list]: if event_id: pdus = await self.handler.get_state_for_pdu(room_id, event_id) else: @@ -367,7 +380,9 @@ class FederationServer(FederationBase): "auth_chain": [pdu.get_pdu_json() for pdu in auth_chain], } - async def on_pdu_request(self, origin, event_id): + async def on_pdu_request( + self, origin: str, event_id: str + ) -> Tuple[int, Union[JsonDict, str]]: pdu = await self.handler.get_persisted_pdu(origin, event_id) if pdu: @@ -375,12 +390,16 @@ class FederationServer(FederationBase): else: return 404, "" - async def on_query_request(self, query_type, args): + async def on_query_request( + self, query_type: str, args: Dict[str, str] + ) -> Tuple[int, Dict[str, Any]]: received_queries_counter.labels(query_type).inc() resp = await self.registry.on_query(query_type, args) return 200, resp - async def on_make_join_request(self, origin, room_id, user_id, supported_versions): + async def on_make_join_request( + self, origin: str, room_id: str, user_id: str, supported_versions: List[str] + ) -> Dict[str, Any]: origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -397,7 +416,7 @@ class FederationServer(FederationBase): async def on_invite_request( self, origin: str, content: JsonDict, room_version_id: str - ): + ) -> Dict[str, Any]: room_version = KNOWN_ROOM_VERSIONS.get(room_version_id) if not room_version: raise SynapseError( @@ -409,12 +428,14 @@ class FederationServer(FederationBase): pdu = event_from_pdu_json(content, room_version) origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, pdu.room_id) - pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version) time_now = self._clock.time_msec() return {"event": ret_pdu.get_pdu_json(time_now)} - async def on_send_join_request(self, origin, content, room_id): + async def on_send_join_request( + self, origin: str, content: JsonDict, room_id: str + ) -> Dict[str, Any]: logger.debug("on_send_join_request: content: %s", content) room_version = await self.store.get_room_version(room_id) @@ -425,7 +446,7 @@ class FederationServer(FederationBase): logger.debug("on_send_join_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) res_pdus = await self.handler.on_send_join_request(origin, pdu) time_now = self._clock.time_msec() @@ -434,7 +455,9 @@ class FederationServer(FederationBase): "auth_chain": [p.get_pdu_json(time_now) for p in res_pdus["auth_chain"]], } - async def on_make_leave_request(self, origin, room_id, user_id): + async def on_make_leave_request( + self, origin: str, room_id: str, user_id: str + ) -> Dict[str, Any]: origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) pdu = await self.handler.on_make_leave_request(origin, room_id, user_id) @@ -444,7 +467,9 @@ class FederationServer(FederationBase): time_now = self._clock.time_msec() return {"event": pdu.get_pdu_json(time_now), "room_version": room_version} - async def on_send_leave_request(self, origin, content, room_id): + async def on_send_leave_request( + self, origin: str, content: JsonDict, room_id: str + ) -> dict: logger.debug("on_send_leave_request: content: %s", content) room_version = await self.store.get_room_version(room_id) @@ -455,12 +480,14 @@ class FederationServer(FederationBase): logger.debug("on_send_leave_request: pdu sigs: %s", pdu.signatures) - pdu = await self._check_sigs_and_hash(room_version.identifier, pdu) + pdu = await self._check_sigs_and_hash(room_version, pdu) await self.handler.on_send_leave_request(origin, pdu) return {} - async def on_event_auth(self, origin, room_id, event_id): + async def on_event_auth( + self, origin: str, room_id: str, event_id: str + ) -> Tuple[int, Dict[str, Any]]: with (await self._server_linearizer.queue((origin, room_id))): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -471,15 +498,21 @@ class FederationServer(FederationBase): return 200, res @log_function - def on_query_client_keys(self, origin, content): - return self.on_query_request("client_keys", content) - - async def on_query_user_devices(self, origin: str, user_id: str): + async def on_query_client_keys( + self, origin: str, content: Dict[str, str] + ) -> Tuple[int, Dict[str, Any]]: + return await self.on_query_request("client_keys", content) + + async def on_query_user_devices( + self, origin: str, user_id: str + ) -> Tuple[int, Dict[str, Any]]: keys = await self.device_handler.on_federation_query_user_devices(user_id) return 200, keys @trace - async def on_claim_client_keys(self, origin, content): + async def on_claim_client_keys( + self, origin: str, content: JsonDict + ) -> Dict[str, Any]: query = [] for user_id, device_keys in content.get("one_time_keys", {}).items(): for device_id, algorithm in device_keys.items(): @@ -488,7 +521,7 @@ class FederationServer(FederationBase): log_kv({"message": "Claiming one time keys.", "user, device pairs": query}) results = await self.store.claim_e2e_one_time_keys(query) - json_result = {} + json_result = {} # type: Dict[str, Dict[str, dict]] for user_id, device_keys in results.items(): for device_id, keys in device_keys.items(): for key_id, json_bytes in keys.items(): @@ -511,8 +544,13 @@ class FederationServer(FederationBase): return {"one_time_keys": json_result} async def on_get_missing_events( - self, origin, room_id, earliest_events, latest_events, limit - ): + self, + origin: str, + room_id: str, + earliest_events: List[str], + latest_events: List[str], + limit: int, + ) -> Dict[str, list]: with (await self._server_linearizer.queue((origin, room_id))): origin_host, _ = parse_server_name(origin) await self.check_server_matches_acl(origin_host, room_id) @@ -541,11 +579,11 @@ class FederationServer(FederationBase): return {"events": [ev.get_pdu_json(time_now) for ev in missing_events]} @log_function - def on_openid_userinfo(self, token): + async def on_openid_userinfo(self, token: str) -> Optional[str]: ts_now_ms = self._clock.time_msec() - return self.store.get_user_id_for_open_id_token(token, ts_now_ms) + return await self.store.get_user_id_for_open_id_token(token, ts_now_ms) - def _transaction_from_pdus(self, pdu_list): + def _transaction_from_pdus(self, pdu_list: List[EventBase]) -> Transaction: """Returns a new Transaction containing the given PDUs suitable for transmission. """ @@ -558,7 +596,7 @@ class FederationServer(FederationBase): destination=None, ) - async def _handle_received_pdu(self, origin, pdu): + async def _handle_received_pdu(self, origin: str, pdu: EventBase) -> None: """ Process a PDU received in a federation /send/ transaction. If the event is invalid, then this method throws a FederationError. @@ -579,10 +617,8 @@ class FederationServer(FederationBase): until we try to backfill across the discontinuity. Args: - origin (str): server which sent the pdu - pdu (FrozenEvent): received pdu - - Returns (Deferred): completes with None + origin: server which sent the pdu + pdu: received pdu Raises: FederationError if the signatures / hash do not match, or if the event was unacceptable for any other reason (eg, too large, @@ -611,7 +647,7 @@ class FederationServer(FederationBase): logger.info("Accepting join PDU %s from %s", pdu.event_id, origin) # We've already checked that we know the room version by this point - room_version = await self.store.get_room_version_id(pdu.room_id) + room_version = await self.store.get_room_version(pdu.room_id) # Check signature. try: @@ -625,25 +661,27 @@ class FederationServer(FederationBase): return "<ReplicationLayer(%s)>" % self.server_name async def exchange_third_party_invite( - self, sender_user_id, target_user_id, room_id, signed + self, sender_user_id: str, target_user_id: str, room_id: str, signed: Dict ): ret = await self.handler.exchange_third_party_invite( sender_user_id, target_user_id, room_id, signed ) return ret - async def on_exchange_third_party_invite_request(self, room_id, event_dict): + async def on_exchange_third_party_invite_request( + self, room_id: str, event_dict: Dict + ): ret = await self.handler.on_exchange_third_party_invite_request( room_id, event_dict ) return ret - async def check_server_matches_acl(self, server_name, room_id): + async def check_server_matches_acl(self, server_name: str, room_id: str): """Check if the given server is allowed by the server ACLs in the room Args: - server_name (str): name of server, *without any port part* - room_id (str): ID of the room to check + server_name: name of server, *without any port part* + room_id: ID of the room to check Raises: AuthError if the server does not match the ACL @@ -661,15 +699,15 @@ class FederationServer(FederationBase): raise AuthError(code=403, msg="Server is banned from room") -def server_matches_acl_event(server_name, acl_event): +def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool: """Check if the given server is allowed by the ACL event Args: - server_name (str): name of server, without any port part - acl_event (EventBase): m.room.server_acl event + server_name: name of server, without any port part + acl_event: m.room.server_acl event Returns: - bool: True if this server is allowed by the ACLs + True if this server is allowed by the ACLs """ logger.debug("Checking %s against acl %s", server_name, acl_event.content) @@ -713,7 +751,7 @@ def server_matches_acl_event(server_name, acl_event): return False -def _acl_entry_matches(server_name, acl_entry): +def _acl_entry_matches(server_name: str, acl_entry: str) -> Match: if not isinstance(acl_entry, six.string_types): logger.warning( "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry) @@ -732,13 +770,13 @@ class FederationHandlerRegistry(object): self.edu_handlers = {} self.query_handlers = {} - def register_edu_handler(self, edu_type, handler): + def register_edu_handler(self, edu_type: str, handler: Callable[[str, dict], None]): """Sets the handler callable that will be used to handle an incoming federation EDU of the given type. Args: - edu_type (str): The type of the incoming EDU to register handler for - handler (Callable[[str, dict]]): A callable invoked on incoming EDU + edu_type: The type of the incoming EDU to register handler for + handler: A callable invoked on incoming EDU of the given type. The arguments are the origin server name and the EDU contents. """ @@ -749,14 +787,16 @@ class FederationHandlerRegistry(object): self.edu_handlers[edu_type] = handler - def register_query_handler(self, query_type, handler): + def register_query_handler( + self, query_type: str, handler: Callable[[dict], defer.Deferred] + ): """Sets the handler callable that will be used to handle an incoming federation query of the given type. Args: - query_type (str): Category name of the query, which should match + query_type: Category name of the query, which should match the string used by make_query. - handler (Callable[[dict], Deferred[dict]]): Invoked to handle + handler: Invoked to handle incoming queries of this type. The return will be yielded on and the result used as the response to the query request. """ @@ -767,10 +807,11 @@ class FederationHandlerRegistry(object): self.query_handlers[query_type] = handler - async def on_edu(self, edu_type, origin, content): + async def on_edu(self, edu_type: str, origin: str, content: dict): handler = self.edu_handlers.get(edu_type) if not handler: logger.warning("No handler registered for EDU type %s", edu_type) + return with start_active_span_from_edu(content, "handle_edu"): try: @@ -780,7 +821,7 @@ class FederationHandlerRegistry(object): except Exception: logger.exception("Failed to handle edu %r", edu_type) - def on_query(self, query_type, args): + def on_query(self, query_type: str, args: dict) -> defer.Deferred: handler = self.query_handlers.get(query_type) if not handler: logger.warning("No handler registered for query type %s", query_type) @@ -807,7 +848,7 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): super(ReplicationFederationHandlerRegistry, self).__init__() - async def on_edu(self, edu_type, origin, content): + async def on_edu(self, edu_type: str, origin: str, content: dict): """Overrides FederationHandlerRegistry """ if not self.config.use_presence and edu_type == "m.presence": @@ -821,7 +862,7 @@ class ReplicationFederationHandlerRegistry(FederationHandlerRegistry): return await self._send_edu(edu_type=edu_type, origin=origin, content=content) - async def on_query(self, query_type, args): + async def on_query(self, query_type: str, args: dict): """Overrides FederationHandlerRegistry """ handler = self.query_handlers.get(query_type) |