summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-08-09 10:16:16 +0100
committerErik Johnston <erik@matrix.org>2018-08-09 10:16:16 +0100
commit5785b93711da5e61cb6d446d090bb7daac5d61c8 (patch)
treee7dc98431b7f52526f85439e785f58a51df89553 /synapse/federation
parentPull in necessary stores in federation_reader (diff)
parentMerge pull request #3632 from matrix-org/erikj/refactor_repl_servlet (diff)
downloadsynapse-5785b93711da5e61cb6d446d090bb7daac5d61c8.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_federation
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py38
-rw-r--r--synapse/federation/federation_server.py21
-rw-r--r--synapse/federation/transaction_queue.py10
-rw-r--r--synapse/federation/transport/client.py5
-rw-r--r--synapse/federation/transport/server.py71
5 files changed, 132 insertions, 13 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 7550e11b6e..c9f3c2d352 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,7 +25,7 @@ from prometheus_client import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import Membership
+from synapse.api.constants import KNOWN_ROOM_VERSIONS, EventTypes, Membership
 from synapse.api.errors import (
     CodeMessageException,
     FederationDeniedError,
@@ -518,10 +518,10 @@ class FederationClient(FederationBase):
                     description, destination, exc_info=1,
                 )
 
-        raise RuntimeError("Failed to %s via any server", description)
+        raise RuntimeError("Failed to %s via any server" % (description, ))
 
     def make_membership_event(self, destinations, room_id, user_id, membership,
-                              content={},):
+                              content, params):
         """
         Creates an m.room.member event, with context, without participating in the room.
 
@@ -537,8 +537,10 @@ class FederationClient(FederationBase):
             user_id (str): The user whose membership is being evented.
             membership (str): The "membership" property of the event. Must be
                 one of "join" or "leave".
-            content (object): Any additional data to put into the content field
+            content (dict): Any additional data to put into the content field
                 of the event.
+            params (dict[str, str|Iterable[str]]): Query parameters to include in the
+                request.
         Return:
             Deferred: resolves to a tuple of (origin (str), event (object))
             where origin is the remote homeserver which generated the event.
@@ -558,10 +560,12 @@ class FederationClient(FederationBase):
         @defer.inlineCallbacks
         def send_request(destination):
             ret = yield self.transport_layer.make_membership_event(
-                destination, room_id, user_id, membership
+                destination, room_id, user_id, membership, params,
             )
 
-            pdu_dict = ret["event"]
+            pdu_dict = ret.get("event", None)
+            if not isinstance(pdu_dict, dict):
+                raise InvalidResponseError("Bad 'event' field in response")
 
             logger.debug("Got response to make_%s: %s", membership, pdu_dict)
 
@@ -605,6 +609,26 @@ class FederationClient(FederationBase):
             Fails with a ``RuntimeError`` if no servers were reachable.
         """
 
+        def check_authchain_validity(signed_auth_chain):
+            for e in signed_auth_chain:
+                if e.type == EventTypes.Create:
+                    create_event = e
+                    break
+            else:
+                raise InvalidResponseError(
+                    "no %s in auth chain" % (EventTypes.Create,),
+                )
+
+            # the room version should be sane.
+            room_version = create_event.content.get("room_version", "1")
+            if room_version not in KNOWN_ROOM_VERSIONS:
+                # This shouldn't be possible, because the remote server should have
+                # rejected the join attempt during make_join.
+                raise InvalidResponseError(
+                    "room appears to have unsupported version %s" % (
+                        room_version,
+                    ))
+
         @defer.inlineCallbacks
         def send_request(destination):
             time_now = self._clock.time_msec()
@@ -661,7 +685,7 @@ class FederationClient(FederationBase):
             for s in signed_state:
                 s.internal_metadata = copy.deepcopy(s.internal_metadata)
 
-            auth_chain.sort(key=lambda e: e.depth)
+            check_authchain_validity(signed_auth)
 
             defer.returnValue({
                 "state": signed_state,
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 941e30a596..d23c1cf13b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -27,7 +27,13 @@ from twisted.internet.abstract import isIPAddress
 from twisted.python import failure
 
 from synapse.api.constants import EventTypes
-from synapse.api.errors import AuthError, FederationError, NotFoundError, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    FederationError,
+    IncompatibleRoomVersionError,
+    NotFoundError,
+    SynapseError,
+)
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.federation.federation_base import FederationBase, event_from_pdu_json
 from synapse.federation.persistence import TransactionActions
@@ -327,12 +333,21 @@ class FederationServer(FederationBase):
         defer.returnValue((200, resp))
 
     @defer.inlineCallbacks
-    def on_make_join_request(self, origin, room_id, user_id):
+    def on_make_join_request(self, origin, room_id, user_id, supported_versions):
         origin_host, _ = parse_server_name(origin)
         yield self.check_server_matches_acl(origin_host, room_id)
+
+        room_version = yield self.store.get_room_version(room_id)
+        if room_version not in supported_versions:
+            logger.warn("Room version %s not in %s", room_version, supported_versions)
+            raise IncompatibleRoomVersionError(room_version=room_version)
+
         pdu = yield self.handler.on_make_join_request(room_id, user_id)
         time_now = self._clock.time_msec()
-        defer.returnValue({"event": pdu.get_pdu_json(time_now)})
+        defer.returnValue({
+            "event": pdu.get_pdu_json(time_now),
+            "room_version": room_version,
+        })
 
     @defer.inlineCallbacks
     def on_invite_request(self, origin, content):
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 78f9d40a3a..f603c8a368 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -26,6 +26,8 @@ from synapse.api.errors import FederationDeniedError, HttpResponseException
 from synapse.handlers.presence import format_user_presence_state, get_interested_remotes
 from synapse.metrics import (
     LaterGauge,
+    event_processing_loop_counter,
+    event_processing_loop_room_count,
     events_processed_counter,
     sent_edus_counter,
     sent_transactions_counter,
@@ -253,7 +255,13 @@ class TransactionQueue(object):
                     synapse.metrics.event_processing_last_ts.labels(
                         "federation_sender").set(ts)
 
-                events_processed_counter.inc(len(events))
+                    events_processed_counter.inc(len(events))
+
+                    event_processing_loop_room_count.labels(
+                        "federation_sender"
+                    ).inc(len(events_by_room))
+
+                event_processing_loop_counter.labels("federation_sender").inc()
 
                 synapse.metrics.event_processing_positions.labels(
                     "federation_sender").set(next_token)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 4529d454af..b4fbe2c9d5 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -195,7 +195,7 @@ class TransportLayerClient(object):
 
     @defer.inlineCallbacks
     @log_function
-    def make_membership_event(self, destination, room_id, user_id, membership):
+    def make_membership_event(self, destination, room_id, user_id, membership, params):
         """Asks a remote server to build and sign us a membership event
 
         Note that this does not append any events to any graphs.
@@ -205,6 +205,8 @@ class TransportLayerClient(object):
             room_id (str): room to join/leave
             user_id (str): user to be joined/left
             membership (str): one of join/leave
+            params (dict[str, str|Iterable[str]]): Query parameters to include in the
+                request.
 
         Returns:
             Deferred: Succeeds when we get a 2xx HTTP response. The result
@@ -241,6 +243,7 @@ class TransportLayerClient(object):
         content = yield self.client.get_json(
             destination=destination,
             path=path,
+            args=params,
             retry_on_dns_fail=retry_on_dns_fail,
             timeout=20000,
             ignore_backoff=ignore_backoff,
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index eae5f2b427..77969a4f38 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -190,6 +190,41 @@ def _parse_auth_header(header_bytes):
 
 
 class BaseFederationServlet(object):
+    """Abstract base class for federation servlet classes.
+
+    The servlet object should have a PATH attribute which takes the form of a regexp to
+    match against the request path (excluding the /federation/v1 prefix).
+
+    The servlet should also implement one or more of on_GET, on_POST, on_PUT, to match
+    the appropriate HTTP method. These methods have the signature:
+
+        on_<METHOD>(self, origin, content, query, **kwargs)
+
+        With arguments:
+
+            origin (unicode|None): The authenticated server_name of the calling server,
+                unless REQUIRE_AUTH is set to False and authentication failed.
+
+            content (unicode|None): decoded json body of the request. None if the
+                request was a GET.
+
+            query (dict[bytes, list[bytes]]): Query params from the request. url-decoded
+                (ie, '+' and '%xx' are decoded) but note that it is *not* utf8-decoded
+                yet.
+
+            **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+                components as specified in the path match regexp.
+
+        Returns:
+            Deferred[(int, object)|None]: either (response code, response object) to
+                 return a JSON response, or None if the request has already been handled.
+
+        Raises:
+            SynapseError: to return an error code
+
+            Exception: other exceptions will be caught, logged, and a 500 will be
+                returned.
+    """
     REQUIRE_AUTH = True
 
     def __init__(self, handler, authenticator, ratelimiter, server_name):
@@ -204,6 +239,18 @@ class BaseFederationServlet(object):
         @defer.inlineCallbacks
         @functools.wraps(func)
         def new_func(request, *args, **kwargs):
+            """ A callback which can be passed to HttpServer.RegisterPaths
+
+            Args:
+                request (twisted.web.http.Request):
+                *args: unused?
+                **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+                    components as specified in the path match regexp.
+
+            Returns:
+                Deferred[(int, object)|None]: (response code, response object) as returned
+                    by the callback method. None if the request has already been handled.
+            """
             content = None
             if request.method in ["PUT", "POST"]:
                 # TODO: Handle other method types? other content types?
@@ -384,9 +431,31 @@ class FederationMakeJoinServlet(BaseFederationServlet):
     PATH = "/make_join/(?P<context>[^/]*)/(?P<user_id>[^/]*)"
 
     @defer.inlineCallbacks
-    def on_GET(self, origin, content, query, context, user_id):
+    def on_GET(self, origin, _content, query, context, user_id):
+        """
+        Args:
+            origin (unicode): The authenticated server_name of the calling server
+
+            _content (None): (GETs don't have bodies)
+
+            query (dict[bytes, list[bytes]]): Query params from the request.
+
+            **kwargs (dict[unicode, unicode]): the dict mapping keys to path
+                components as specified in the path match regexp.
+
+        Returns:
+            Deferred[(int, object)|None]: either (response code, response object) to
+                 return a JSON response, or None if the request has already been handled.
+        """
+        versions = query.get(b'ver')
+        if versions is not None:
+            supported_versions = [v.decode("utf-8") for v in versions]
+        else:
+            supported_versions = ["1"]
+
         content = yield self.handler.on_make_join_request(
             origin, context, user_id,
+            supported_versions=supported_versions,
         )
         defer.returnValue((200, content))