summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorHubert Chathi <hubert@uhoreg.ca>2019-08-28 17:36:46 -0700
committerHubert Chathi <hubert@uhoreg.ca>2019-08-28 17:36:46 -0700
commite3d3fbf63f6b75d3e0adfd71012163a2c673833b (patch)
tree0711e2e9bbfabcd83867ff524050c05441187882 /synapse/federation
parentblack (diff)
parentMerge branch 'develop' into uhoreg/e2e_cross-signing_merged (diff)
downloadsynapse-e3d3fbf63f6b75d3e0adfd71012163a2c673833b.tar.xz
Merge branch 'uhoreg/e2e_cross-signing_merged' into cross-signing_keys
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_client.py7
-rw-r--r--synapse/federation/federation_server.py16
-rw-r--r--synapse/federation/sender/transaction_manager.py170
-rw-r--r--synapse/federation/transport/client.py46
-rw-r--r--synapse/federation/transport/server.py86
-rw-r--r--synapse/federation/units.py3
6 files changed, 215 insertions, 113 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index 6e03ce21af..bec3080895 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -511,9 +511,8 @@ class FederationClient(FederationBase):
             The [Deferred] result of callback, if it succeeds
 
         Raises:
-            SynapseError if the chosen remote server returns a 300/400 code.
-
-            RuntimeError if no servers were reachable.
+            SynapseError if the chosen remote server returns a 300/400 code, or
+            no servers were reachable.
         """
         for destination in destinations:
             if destination == self.server_name:
@@ -538,7 +537,7 @@ class FederationClient(FederationBase):
             except Exception:
                 logger.warn("Failed to %s via %s", description, destination, exc_info=1)
 
-        raise RuntimeError("Failed to %s via any server" % (description,))
+        raise SynapseError(502, "Failed to %s via any server" % (description,))
 
     def make_membership_event(
         self, destinations, room_id, user_id, membership, content, params
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index d216c46dfe..05fd49f3c1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -43,6 +43,7 @@ from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
 from synapse.logging.context import nested_logging_context
+from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
 from synapse.logging.utils import log_function
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -507,6 +508,7 @@ class FederationServer(FederationBase):
     def on_query_user_devices(self, origin, user_id):
         return self.on_query_request("user_devices", user_id)
 
+    @trace
     @defer.inlineCallbacks
     @log_function
     def on_claim_client_keys(self, origin, content):
@@ -515,6 +517,7 @@ class FederationServer(FederationBase):
             for device_id, algorithm in device_keys.items():
                 query.append((user_id, device_id, algorithm))
 
+        log_kv({"message": "Claiming one time keys.", "user, device pairs": query})
         results = yield self.store.claim_e2e_one_time_keys(query)
 
         json_result = {}
@@ -808,12 +811,13 @@ class FederationHandlerRegistry(object):
         if not handler:
             logger.warn("No handler registered for EDU type %s", edu_type)
 
-        try:
-            yield handler(origin, content)
-        except SynapseError as e:
-            logger.info("Failed to handle edu %r: %r", edu_type, e)
-        except Exception:
-            logger.exception("Failed to handle edu %r", edu_type)
+        with start_active_span_from_edu(content, "handle_edu"):
+            try:
+                yield handler(origin, content)
+            except SynapseError as e:
+                logger.info("Failed to handle edu %r: %r", edu_type, e)
+            except Exception:
+                logger.exception("Failed to handle edu %r", edu_type)
 
     def on_query(self, query_type, args):
         handler = self.query_handlers.get(query_type)
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 52706302f2..62ca6a3e87 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -14,11 +14,19 @@
 # limitations under the License.
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import HttpResponseException
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Transaction
+from synapse.logging.opentracing import (
+    extract_text_map,
+    set_tag,
+    start_active_span_follows_from,
+    tags,
+)
 from synapse.util.metrics import measure_func
 
 logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
     @defer.inlineCallbacks
     def send_new_transaction(self, destination, pending_pdus, pending_edus):
 
-        # Sort based on the order field
-        pending_pdus.sort(key=lambda t: t[1])
-        pdus = [x[0] for x in pending_pdus]
-        edus = pending_edus
+        # Make a transaction-sending opentracing span. This span follows on from
+        # all the edus in that transaction. This needs to be done since there is
+        # no active span here, so if the edus were not received by the remote the
+        # span would have no causality and it would be forgotten.
+        # The span_contexts is a generator so that it won't be evaluated if
+        # opentracing is disabled. (Yay speed!)
 
-        success = True
+        span_contexts = (
+            extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
+        )
 
-        logger.debug("TX [%s] _attempt_new_transaction", destination)
+        with start_active_span_follows_from("send_transaction", span_contexts):
 
-        txn_id = str(self._next_txn_id)
+            # Sort based on the order field
+            pending_pdus.sort(key=lambda t: t[1])
+            pdus = [x[0] for x in pending_pdus]
+            edus = pending_edus
 
-        logger.debug(
-            "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
-            destination,
-            txn_id,
-            len(pdus),
-            len(edus),
-        )
+            success = True
 
-        transaction = Transaction.create_new(
-            origin_server_ts=int(self.clock.time_msec()),
-            transaction_id=txn_id,
-            origin=self._server_name,
-            destination=destination,
-            pdus=pdus,
-            edus=edus,
-        )
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-        self._next_txn_id += 1
+            txn_id = str(self._next_txn_id)
 
-        logger.info(
-            "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
-            destination,
-            txn_id,
-            transaction.transaction_id,
-            len(pdus),
-            len(edus),
-        )
+            logger.debug(
+                "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+                destination,
+                txn_id,
+                len(pdus),
+                len(edus),
+            )
 
-        # Actually send the transaction
-
-        # FIXME (erikj): This is a bit of a hack to make the Pdu age
-        # keys work
-        def json_data_cb():
-            data = transaction.get_dict()
-            now = int(self.clock.time_msec())
-            if "pdus" in data:
-                for p in data["pdus"]:
-                    if "age_ts" in p:
-                        unsigned = p.setdefault("unsigned", {})
-                        unsigned["age"] = now - int(p["age_ts"])
-                        del p["age_ts"]
-            return data
-
-        try:
-            response = yield self._transport_layer.send_transaction(
-                transaction, json_data_cb
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self.clock.time_msec()),
+                transaction_id=txn_id,
+                origin=self._server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
             )
-            code = 200
-        except HttpResponseException as e:
-            code = e.code
-            response = e.response
 
-            if e.code in (401, 404, 429) or 500 <= e.code:
-                logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
-                raise e
+            self._next_txn_id += 1
 
-        logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+            logger.info(
+                "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+                destination,
+                txn_id,
+                transaction.transaction_id,
+                len(pdus),
+                len(edus),
+            )
 
-        if code == 200:
-            for e_id, r in response.get("pdus", {}).items():
-                if "error" in r:
+            # Actually send the transaction
+
+            # FIXME (erikj): This is a bit of a hack to make the Pdu age
+            # keys work
+            def json_data_cb():
+                data = transaction.get_dict()
+                now = int(self.clock.time_msec())
+                if "pdus" in data:
+                    for p in data["pdus"]:
+                        if "age_ts" in p:
+                            unsigned = p.setdefault("unsigned", {})
+                            unsigned["age"] = now - int(p["age_ts"])
+                            del p["age_ts"]
+                return data
+
+            try:
+                response = yield self._transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
+
+                if e.code in (401, 404, 429) or 500 <= e.code:
+                    logger.info(
+                        "TX [%s] {%s} got %d response", destination, txn_id, code
+                    )
+                    raise e
+
+            logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+
+            if code == 200:
+                for e_id, r in response.get("pdus", {}).items():
+                    if "error" in r:
+                        logger.warn(
+                            "TX [%s] {%s} Remote returned error for %s: %s",
+                            destination,
+                            txn_id,
+                            e_id,
+                            r,
+                        )
+            else:
+                for p in pdus:
                     logger.warn(
-                        "TX [%s] {%s} Remote returned error for %s: %s",
+                        "TX [%s] {%s} Failed to send event %s",
                         destination,
                         txn_id,
-                        e_id,
-                        r,
+                        p.event_id,
                     )
-        else:
-            for p in pdus:
-                logger.warn(
-                    "TX [%s] {%s} Failed to send event %s",
-                    destination,
-                    txn_id,
-                    p.event_id,
-                )
-            success = False
+                success = False
 
-        return success
+            set_tag(tags.ERROR, not success)
+            return success
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 0cea0d2a10..482a101c09 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -327,21 +327,37 @@ class TransportLayerClient(object):
         include_all_networks=False,
         third_party_instance_id=None,
     ):
-        path = _create_v1_path("/publicRooms")
-
-        args = {"include_all_networks": "true" if include_all_networks else "false"}
-        if third_party_instance_id:
-            args["third_party_instance_id"] = (third_party_instance_id,)
-        if limit:
-            args["limit"] = [str(limit)]
-        if since_token:
-            args["since"] = [since_token]
-
-        # TODO(erikj): Actually send the search_filter across federation.
-
-        response = yield self.client.get_json(
-            destination=remote_server, path=path, args=args, ignore_backoff=True
-        )
+        if search_filter:
+            # this uses MSC2197 (Search Filtering over Federation)
+            path = _create_v1_path("/publicRooms")
+
+            data = {"include_all_networks": "true" if include_all_networks else "false"}
+            if third_party_instance_id:
+                data["third_party_instance_id"] = third_party_instance_id
+            if limit:
+                data["limit"] = str(limit)
+            if since_token:
+                data["since"] = since_token
+
+            data["filter"] = search_filter
+
+            response = yield self.client.post_json(
+                destination=remote_server, path=path, data=data, ignore_backoff=True
+            )
+        else:
+            path = _create_v1_path("/publicRooms")
+
+            args = {"include_all_networks": "true" if include_all_networks else "false"}
+            if third_party_instance_id:
+                args["third_party_instance_id"] = (third_party_instance_id,)
+            if limit:
+                args["limit"] = [str(limit)]
+            if since_token:
+                args["since"] = [since_token]
+
+            response = yield self.client.get_json(
+                destination=remote_server, path=path, args=args, ignore_backoff=True
+            )
 
         return response
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index ea4e1b6d0f..f9930b6460 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -19,8 +19,9 @@ import functools
 import logging
 import re
 
+from twisted.internet.defer import maybeDeferred
+
 import synapse
-import synapse.logging.opentracing as opentracing
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.room_versions import RoomVersions
 from synapse.api.urls import (
@@ -37,6 +38,12 @@ from synapse.http.servlet import (
     parse_string_from_args,
 )
 from synapse.logging.context import run_in_background
+from synapse.logging.opentracing import (
+    start_active_span,
+    start_active_span_from_request,
+    tags,
+    whitelisted_homeserver,
+)
 from synapse.types import ThirdPartyInstanceID, get_domain_from_id
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.versionstring import get_version_string
@@ -286,19 +293,28 @@ class BaseFederationServlet(object):
                 logger.warn("authenticate_request failed: %s", e)
                 raise
 
-            # Start an opentracing span
-            with opentracing.start_active_span_from_context(
-                request.requestHeaders,
-                "incoming-federation-request",
-                tags={
-                    "request_id": request.get_request_id(),
-                    opentracing.tags.SPAN_KIND: opentracing.tags.SPAN_KIND_RPC_SERVER,
-                    opentracing.tags.HTTP_METHOD: request.get_method(),
-                    opentracing.tags.HTTP_URL: request.get_redacted_uri(),
-                    opentracing.tags.PEER_HOST_IPV6: request.getClientIP(),
-                    "authenticated_entity": origin,
-                },
-            ):
+            request_tags = {
+                "request_id": request.get_request_id(),
+                tags.SPAN_KIND: tags.SPAN_KIND_RPC_SERVER,
+                tags.HTTP_METHOD: request.get_method(),
+                tags.HTTP_URL: request.get_redacted_uri(),
+                tags.PEER_HOST_IPV6: request.getClientIP(),
+                "authenticated_entity": origin,
+                "servlet_name": request.request_metrics.name,
+            }
+
+            # Only accept the span context if the origin is authenticated
+            # and whitelisted
+            if origin and whitelisted_homeserver(origin):
+                scope = start_active_span_from_request(
+                    request, "incoming-federation-request", tags=request_tags
+                )
+            else:
+                scope = start_active_span(
+                    "incoming-federation-request", tags=request_tags
+                )
+
+            with scope:
                 if origin:
                     with ratelimiter.ratelimit(origin) as d:
                         await d
@@ -745,9 +761,49 @@ class PublicRoomList(BaseFederationServlet):
         else:
             network_tuple = ThirdPartyInstanceID(None, None)
 
+        data = await maybeDeferred(
+            self.handler.get_local_public_room_list,
+            limit,
+            since_token,
+            network_tuple=network_tuple,
+            from_federation=True,
+        )
+        return 200, data
+
+    async def on_POST(self, origin, content, query):
+        # This implements MSC2197 (Search Filtering over Federation)
+        if not self.allow_access:
+            raise FederationDeniedError(origin)
+
+        limit = int(content.get("limit", 100))
+        since_token = content.get("since", None)
+        search_filter = content.get("filter", None)
+
+        include_all_networks = content.get("include_all_networks", False)
+        third_party_instance_id = content.get("third_party_instance_id", None)
+
+        if include_all_networks:
+            network_tuple = None
+            if third_party_instance_id is not None:
+                raise SynapseError(
+                    400, "Can't use include_all_networks with an explicit network"
+                )
+        elif third_party_instance_id is None:
+            network_tuple = ThirdPartyInstanceID(None, None)
+        else:
+            network_tuple = ThirdPartyInstanceID.from_string(third_party_instance_id)
+
+        if search_filter is None:
+            logger.warning("Nonefilter")
+
         data = await self.handler.get_local_public_room_list(
-            limit, since_token, network_tuple=network_tuple, from_federation=True
+            limit=limit,
+            since_token=since_token,
+            search_filter=search_filter,
+            network_tuple=network_tuple,
+            from_federation=True,
         )
+
         return 200, data
 
 
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 14aad8f09d..aa84621206 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
 
     internal_keys = ["origin", "destination"]
 
+    def get_context(self):
+        return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
+
 
 class Transaction(JsonEncodedObject):
     """ A transaction is a list of Pdus and Edus to be sent to a remote home