diff options
Diffstat (limited to 'synapse/federation/transport')
-rw-r--r-- | synapse/federation/transport/__init__.py | 4 | ||||
-rw-r--r-- | synapse/federation/transport/client.py | 184 | ||||
-rw-r--r-- | synapse/federation/transport/server.py | 121 |
3 files changed, 204 insertions, 105 deletions
diff --git a/synapse/federation/transport/__init__.py b/synapse/federation/transport/__init__.py index d9fcc520a0..5db733af98 100644 --- a/synapse/federation/transport/__init__.py +++ b/synapse/federation/transport/__init__.py @@ -14,9 +14,9 @@ # limitations under the License. """The transport layer is responsible for both sending transactions to remote -home servers and receiving a variety of requests from other home servers. +homeservers and receiving a variety of requests from other homeservers. -By default this is done over HTTPS (and all home servers are required to +By default this is done over HTTPS (and all homeservers are required to support HTTPS), however individual pairings of servers may decide to communicate over a different (albeit still reliable) protocol. """ diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index 482a101c09..060bf07197 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -15,12 +15,14 @@ # limitations under the License. import logging +from typing import Any, Dict, Optional from six.moves import urllib from twisted.internet import defer from synapse.api.constants import Membership +from synapse.api.errors import Codes, HttpResponseException, SynapseError from synapse.api.urls import ( FEDERATION_UNSTABLE_PREFIX, FEDERATION_V1_PREFIX, @@ -39,36 +41,12 @@ class TransportLayerClient(object): self.client = hs.get_http_client() @log_function - def get_room_state(self, destination, room_id, event_id): - """ Requests all state for a given room from the given server at the - given event. - - Args: - destination (str): The host name of the remote home server we want - to get the state from. - context (str): The name of the context we want the state of - event_id (str): The event we want the context at. - - Returns: - Deferred: Results in a dict received from the remote homeserver. - """ - logger.debug("get_room_state dest=%s, room=%s", destination, room_id) - - path = _create_v1_path("/state/%s", room_id) - return self.client.get_json( - destination, - path=path, - args={"event_id": event_id}, - try_trailing_slash_on_400=True, - ) - - @log_function def get_room_state_ids(self, destination, room_id, event_id): """ Requests all state for a given room from the given server at the given event. Returns the state's event_id's Args: - destination (str): The host name of the remote home server we want + destination (str): The host name of the remote homeserver we want to get the state from. context (str): The name of the context we want the state of event_id (str): The event we want the context at. @@ -91,7 +69,7 @@ class TransportLayerClient(object): """ Requests the pdu with give id and origin from the given server. Args: - destination (str): The host name of the remote home server we want + destination (str): The host name of the remote homeserver we want to get the state from. event_id (str): The id of the event being requested. timeout (int): How long to try (in ms) the destination for before @@ -122,10 +100,10 @@ class TransportLayerClient(object): Deferred: Results in a dict received from the remote homeserver. """ logger.debug( - "backfill dest=%s, room_id=%s, event_tuples=%s, limit=%s", + "backfill dest=%s, room_id=%s, event_tuples=%r, limit=%s", destination, room_id, - repr(event_tuples), + event_tuples, str(limit), ) @@ -267,7 +245,7 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_join(self, destination, room_id, event_id, content): + def send_join_v1(self, destination, room_id, event_id, content): path = _create_v1_path("/send_join/%s/%s", room_id, event_id) response = yield self.client.put_json( @@ -278,7 +256,18 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_leave(self, destination, room_id, event_id, content): + def send_join_v2(self, destination, room_id, event_id, content): + path = _create_v2_path("/send_join/%s/%s", room_id, event_id) + + response = yield self.client.put_json( + destination=destination, path=path, data=content + ) + + return response + + @defer.inlineCallbacks + @log_function + def send_leave_v1(self, destination, room_id, event_id, content): path = _create_v1_path("/send_leave/%s/%s", room_id, event_id) response = yield self.client.put_json( @@ -296,6 +285,24 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function + def send_leave_v2(self, destination, room_id, event_id, content): + path = _create_v2_path("/send_leave/%s/%s", room_id, event_id) + + response = yield self.client.put_json( + destination=destination, + path=path, + data=content, + # we want to do our best to send this through. The problem is + # that if it fails, we won't retry it later, so if the remote + # server was just having a momentary blip, the room will be out of + # sync. + ignore_backoff=True, + ) + + return response + + @defer.inlineCallbacks + @log_function def send_invite_v1(self, destination, room_id, event_id, content): path = _create_v1_path("/invite/%s/%s", room_id, event_id) @@ -320,18 +327,25 @@ class TransportLayerClient(object): @log_function def get_public_rooms( self, - remote_server, - limit, - since_token, - search_filter=None, - include_all_networks=False, - third_party_instance_id=None, + remote_server: str, + limit: Optional[int] = None, + since_token: Optional[str] = None, + search_filter: Optional[Dict] = None, + include_all_networks: bool = False, + third_party_instance_id: Optional[str] = None, ): + """Get the list of public rooms from a remote homeserver + + See synapse.federation.federation_client.FederationClient.get_public_rooms for + more information. + """ if search_filter: # this uses MSC2197 (Search Filtering over Federation) path = _create_v1_path("/publicRooms") - data = {"include_all_networks": "true" if include_all_networks else "false"} + data = { + "include_all_networks": "true" if include_all_networks else "false" + } # type: Dict[str, Any] if third_party_instance_id: data["third_party_instance_id"] = third_party_instance_id if limit: @@ -341,13 +355,25 @@ class TransportLayerClient(object): data["filter"] = search_filter - response = yield self.client.post_json( - destination=remote_server, path=path, data=data, ignore_backoff=True - ) + try: + response = yield self.client.post_json( + destination=remote_server, path=path, data=data, ignore_backoff=True + ) + except HttpResponseException as e: + if e.code == 403: + raise SynapseError( + 403, + "You are not allowed to view the public rooms list of %s" + % (remote_server,), + errcode=Codes.FORBIDDEN, + ) + raise else: path = _create_v1_path("/publicRooms") - args = {"include_all_networks": "true" if include_all_networks else "false"} + args = { + "include_all_networks": "true" if include_all_networks else "false" + } # type: Dict[str, Any] if third_party_instance_id: args["third_party_instance_id"] = (third_party_instance_id,) if limit: @@ -355,9 +381,19 @@ class TransportLayerClient(object): if since_token: args["since"] = [since_token] - response = yield self.client.get_json( - destination=remote_server, path=path, args=args, ignore_backoff=True - ) + try: + response = yield self.client.get_json( + destination=remote_server, path=path, args=args, ignore_backoff=True + ) + except HttpResponseException as e: + if e.code == 403: + raise SynapseError( + 403, + "You are not allowed to view the public rooms list of %s" + % (remote_server,), + errcode=Codes.FORBIDDEN, + ) + raise return response @@ -383,17 +419,6 @@ class TransportLayerClient(object): @defer.inlineCallbacks @log_function - def send_query_auth(self, destination, room_id, event_id, content): - path = _create_v1_path("/query_auth/%s/%s", room_id, event_id) - - content = yield self.client.post_json( - destination=destination, path=path, data=content - ) - - return content - - @defer.inlineCallbacks - @log_function def query_client_keys(self, destination, query_content, timeout): """Query the device keys for a list of user ids hosted on a remote server. @@ -402,20 +427,30 @@ class TransportLayerClient(object): { "device_keys": { "<user_id>": ["<device_id>"] - } } + } + } Response: { "device_keys": { "<user_id>": { "<device_id>": {...} - } } } + } + }, + "master_key": { + "<user_id>": {...} + } + }, + "self_signing_key": { + "<user_id>": {...} + } + } Args: destination(str): The server to query. query_content(dict): The user ids to query. Returns: - A dict containg the device keys. + A dict containing device and cross-signing keys. """ path = _create_v1_path("/user/keys/query") @@ -432,14 +467,30 @@ class TransportLayerClient(object): Response: { "stream_id": "...", - "devices": [ { ... } ] + "devices": [ { ... } ], + "master_key": { + "user_id": "<user_id>", + "usage": [...], + "keys": {...}, + "signatures": { + "<user_id>": {...} + } + }, + "self_signing_key": { + "user_id": "<user_id>", + "usage": [...], + "keys": {...}, + "signatures": { + "<user_id>": {...} + } + } } Args: destination(str): The server to query. query_content(dict): The user ids to query. Returns: - A dict containg the device keys. + A dict containing device and cross-signing keys. """ path = _create_v1_path("/user/devices/%s", user_id) @@ -457,8 +508,10 @@ class TransportLayerClient(object): { "one_time_keys": { "<user_id>": { - "<device_id>": "<algorithm>" - } } } + "<device_id>": "<algorithm>" + } + } + } Response: { @@ -466,13 +519,16 @@ class TransportLayerClient(object): "<user_id>": { "<device_id>": { "<algorithm>:<key_id>": "<key_base64>" - } } } } + } + } + } + } Args: destination(str): The server to query. query_content(dict): The user ids to query. Returns: - A dict containg the one-time keys. + A dict containing the one-time keys. """ path = _create_v1_path("/user/keys/claim") diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 7dc696c7ae..af4595498c 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -18,6 +18,7 @@ import functools import logging import re +from typing import Optional, Tuple, Type from twisted.internet.defer import maybeDeferred @@ -44,6 +45,7 @@ from synapse.logging.opentracing import ( tags, whitelisted_homeserver, ) +from synapse.server import HomeServer from synapse.types import ThirdPartyInstanceID, get_domain_from_id from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string @@ -101,12 +103,17 @@ class NoAuthenticationError(AuthenticationError): class Authenticator(object): - def __init__(self, hs): + def __init__(self, hs: HomeServer): self._clock = hs.get_clock() self.keyring = hs.get_keyring() self.server_name = hs.hostname self.store = hs.get_datastore() self.federation_domain_whitelist = hs.config.federation_domain_whitelist + self.notifer = hs.get_notifier() + + self.replication_client = None + if hs.config.worker.worker_app: + self.replication_client = hs.get_tcp_replication() # A method just so we can pass 'self' as the authenticator to the Servlets async def authenticate_request(self, request, content): @@ -151,7 +158,7 @@ class Authenticator(object): origin, json_request, now, "Incoming request" ) - logger.info("Request from %s", origin) + logger.debug("Request from %s", origin) request.authenticated_entity = origin # If we get a valid signed request from the other side, its probably @@ -165,7 +172,18 @@ class Authenticator(object): async def _reset_retry_timings(self, origin): try: logger.info("Marking origin %r as up", origin) - await self.store.set_destination_retry_timings(origin, 0, 0) + await self.store.set_destination_retry_timings(origin, None, 0, 0) + + # Inform the relevant places that the remote server is back up. + self.notifer.notify_remote_server_up(origin) + if self.replication_client: + # If we're on a worker we try and inform master about this. The + # replication client doesn't hook into the notifier to avoid + # infinite loops where we send a `REMOTE_SERVER_UP` command to + # master, which then echoes it back to us which in turn pokes + # the notifier. + self.replication_client.send_remote_server_up(origin) + except Exception: logger.exception("Error resetting retry timings on %s", origin) @@ -202,7 +220,7 @@ def _parse_auth_header(header_bytes): sig = strip_quotes(param_dict["sig"]) return origin, key, sig except Exception as e: - logger.warn( + logger.warning( "Error parsing auth header '%s': %s", header_bytes.decode("ascii", "replace"), e, @@ -250,6 +268,8 @@ class BaseFederationServlet(object): returned. """ + PATH = "" # Overridden in subclasses, the regex to match against the path. + REQUIRE_AUTH = True PREFIX = FEDERATION_V1_PREFIX # Allows specifying the API version @@ -287,10 +307,12 @@ class BaseFederationServlet(object): except NoAuthenticationError: origin = None if self.REQUIRE_AUTH: - logger.warn("authenticate_request failed: missing authentication") + logger.warning( + "authenticate_request failed: missing authentication" + ) raise except Exception as e: - logger.warn("authenticate_request failed: %s", e) + logger.warning("authenticate_request failed: %s", e) raise request_tags = { @@ -328,9 +350,6 @@ class BaseFederationServlet(object): return response - # Extra logic that functools.wraps() doesn't finish - new_func.__self__ = func.__self__ - return new_func def register(self, server): @@ -419,7 +438,7 @@ class FederationEventServlet(BaseFederationServlet): return await self.handler.on_pdu_request(origin, event_id) -class FederationStateServlet(BaseFederationServlet): +class FederationStateV1Servlet(BaseFederationServlet): PATH = "/state/(?P<context>[^/]*)/?" # This is when someone asks for all data for a given context. @@ -427,7 +446,7 @@ class FederationStateServlet(BaseFederationServlet): return await self.handler.on_context_state_request( origin, context, - parse_string_from_args(query, "event_id", None, required=True), + parse_string_from_args(query, "event_id", None, required=False), ) @@ -504,11 +523,21 @@ class FederationMakeLeaveServlet(BaseFederationServlet): return 200, content -class FederationSendLeaveServlet(BaseFederationServlet): +class FederationV1SendLeaveServlet(BaseFederationServlet): PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)" async def on_PUT(self, origin, content, query, room_id, event_id): content = await self.handler.on_send_leave_request(origin, content, room_id) + return 200, (200, content) + + +class FederationV2SendLeaveServlet(BaseFederationServlet): + PATH = "/send_leave/(?P<room_id>[^/]*)/(?P<event_id>[^/]*)" + + PREFIX = FEDERATION_V2_PREFIX + + async def on_PUT(self, origin, content, query, room_id, event_id): + content = await self.handler.on_send_leave_request(origin, content, room_id) return 200, content @@ -519,13 +548,25 @@ class FederationEventAuthServlet(BaseFederationServlet): return await self.handler.on_event_auth(origin, context, event_id) -class FederationSendJoinServlet(BaseFederationServlet): +class FederationV1SendJoinServlet(BaseFederationServlet): PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)" async def on_PUT(self, origin, content, query, context, event_id): # TODO(paul): assert that context/event_id parsed from path actually # match those given in content content = await self.handler.on_send_join_request(origin, content, context) + return 200, (200, content) + + +class FederationV2SendJoinServlet(BaseFederationServlet): + PATH = "/send_join/(?P<context>[^/]*)/(?P<event_id>[^/]*)" + + PREFIX = FEDERATION_V2_PREFIX + + async def on_PUT(self, origin, content, query, context, event_id): + # TODO(paul): assert that context/event_id parsed from path actually + # match those given in content + content = await self.handler.on_send_join_request(origin, content, context) return 200, content @@ -538,7 +579,7 @@ class FederationV1InviteServlet(BaseFederationServlet): # state resolution algorithm, and we don't use that for processing # invites content = await self.handler.on_invite_request( - origin, content, room_version=RoomVersions.V1.identifier + origin, content, room_version_id=RoomVersions.V1.identifier ) # V1 federation API is defined to return a content of `[200, {...}]` @@ -565,7 +606,7 @@ class FederationV2InviteServlet(BaseFederationServlet): event.setdefault("unsigned", {})["invite_room_state"] = invite_room_state content = await self.handler.on_invite_request( - origin, event, room_version=room_version + origin, event, room_version_id=room_version ) return 200, content @@ -602,17 +643,6 @@ class FederationClientKeysClaimServlet(BaseFederationServlet): return 200, response -class FederationQueryAuthServlet(BaseFederationServlet): - PATH = "/query_auth/(?P<context>[^/]*)/(?P<event_id>[^/]*)" - - async def on_POST(self, origin, content, query, context, event_id): - new_content = await self.handler.on_query_auth_request( - origin, content, context, event_id - ) - - return 200, new_content - - class FederationGetMissingEventsServlet(BaseFederationServlet): # TODO(paul): Why does this path alone end with "/?" optional? PATH = "/get_missing_events/(?P<room_id>[^/]*)/?" @@ -712,7 +742,7 @@ class PublicRoomList(BaseFederationServlet): This API returns information in the same format as /publicRooms on the client API, but will only ever include local public rooms and hence is - intended for consumption by other home servers. + intended for consumption by other homeservers. GET /publicRooms HTTP/1.1 @@ -765,6 +795,10 @@ class PublicRoomList(BaseFederationServlet): else: network_tuple = ThirdPartyInstanceID(None, None) + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + data = await maybeDeferred( self.handler.get_local_public_room_list, limit, @@ -779,7 +813,7 @@ class PublicRoomList(BaseFederationServlet): if not self.allow_access: raise FederationDeniedError(origin) - limit = int(content.get("limit", 100)) + limit = int(content.get("limit", 100)) # type: Optional[int] since_token = content.get("since", None) search_filter = content.get("filter", None) @@ -800,6 +834,10 @@ class PublicRoomList(BaseFederationServlet): if search_filter is None: logger.warning("Nonefilter") + if limit == 0: + # zero is a special value which corresponds to no limit. + limit = None + data = await self.handler.get_local_public_room_list( limit=limit, since_token=since_token, @@ -922,7 +960,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet): if get_domain_from_id(requester_user_id) != origin: raise SynapseError(403, "requester_user_id doesn't match origin") - result = await self.groups_handler.update_room_in_group( + result = await self.handler.update_room_in_group( group_id, requester_user_id, room_id, config_key, content ) @@ -1350,18 +1388,19 @@ class RoomComplexityServlet(BaseFederationServlet): FEDERATION_SERVLET_CLASSES = ( FederationSendServlet, FederationEventServlet, - FederationStateServlet, + FederationStateV1Servlet, FederationStateIdsServlet, FederationBackfillServlet, FederationQueryServlet, FederationMakeJoinServlet, FederationMakeLeaveServlet, FederationEventServlet, - FederationSendJoinServlet, - FederationSendLeaveServlet, + FederationV1SendJoinServlet, + FederationV2SendJoinServlet, + FederationV1SendLeaveServlet, + FederationV2SendLeaveServlet, FederationV1InviteServlet, FederationV2InviteServlet, - FederationQueryAuthServlet, FederationGetMissingEventsServlet, FederationEventAuthServlet, FederationClientKeysQueryServlet, @@ -1371,11 +1410,13 @@ FEDERATION_SERVLET_CLASSES = ( On3pidBindServlet, FederationVersionServlet, RoomComplexityServlet, -) +) # type: Tuple[Type[BaseFederationServlet], ...] -OPENID_SERVLET_CLASSES = (OpenIdUserInfo,) +OPENID_SERVLET_CLASSES = ( + OpenIdUserInfo, +) # type: Tuple[Type[BaseFederationServlet], ...] -ROOM_LIST_CLASSES = (PublicRoomList,) +ROOM_LIST_CLASSES = (PublicRoomList,) # type: Tuple[Type[PublicRoomList], ...] GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsProfileServlet, @@ -1396,17 +1437,19 @@ GROUP_SERVER_SERVLET_CLASSES = ( FederationGroupsAddRoomsServlet, FederationGroupsAddRoomsConfigServlet, FederationGroupsSettingJoinPolicyServlet, -) +) # type: Tuple[Type[BaseFederationServlet], ...] GROUP_LOCAL_SERVLET_CLASSES = ( FederationGroupsLocalInviteServlet, FederationGroupsRemoveLocalUserServlet, FederationGroupsBulkPublicisedServlet, -) +) # type: Tuple[Type[BaseFederationServlet], ...] -GROUP_ATTESTATION_SERVLET_CLASSES = (FederationGroupsRenewAttestaionServlet,) +GROUP_ATTESTATION_SERVLET_CLASSES = ( + FederationGroupsRenewAttestaionServlet, +) # type: Tuple[Type[BaseFederationServlet], ...] DEFAULT_SERVLET_GROUPS = ( "federation", |