summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_server.py15
-rw-r--r--synapse/federation/sender/transaction_manager.py170
-rw-r--r--synapse/federation/units.py3
-rw-r--r--synapse/handlers/devicemessage.py27
-rw-r--r--synapse/logging/opentracing.py26
-rw-r--r--synapse/storage/devices.py39
-rw-r--r--synapse/storage/schema/delta/56/add_spans_to_device_lists.sql20
7 files changed, 208 insertions, 92 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9286ca3202..05fd49f3c1 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -43,7 +43,7 @@ from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Edu, Transaction
 from synapse.http.endpoint import parse_server_name
 from synapse.logging.context import nested_logging_context
-from synapse.logging.opentracing import log_kv, trace
+from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace
 from synapse.logging.utils import log_function
 from synapse.replication.http.federation import (
     ReplicationFederationSendEduRestServlet,
@@ -811,12 +811,13 @@ class FederationHandlerRegistry(object):
         if not handler:
             logger.warn("No handler registered for EDU type %s", edu_type)
 
-        try:
-            yield handler(origin, content)
-        except SynapseError as e:
-            logger.info("Failed to handle edu %r: %r", edu_type, e)
-        except Exception:
-            logger.exception("Failed to handle edu %r", edu_type)
+        with start_active_span_from_edu(content, "handle_edu"):
+            try:
+                yield handler(origin, content)
+            except SynapseError as e:
+                logger.info("Failed to handle edu %r: %r", edu_type, e)
+            except Exception:
+                logger.exception("Failed to handle edu %r", edu_type)
 
     def on_query(self, query_type, args):
         handler = self.query_handlers.get(query_type)
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 52706302f2..62ca6a3e87 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -14,11 +14,19 @@
 # limitations under the License.
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import HttpResponseException
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Transaction
+from synapse.logging.opentracing import (
+    extract_text_map,
+    set_tag,
+    start_active_span_follows_from,
+    tags,
+)
 from synapse.util.metrics import measure_func
 
 logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
     @defer.inlineCallbacks
     def send_new_transaction(self, destination, pending_pdus, pending_edus):
 
-        # Sort based on the order field
-        pending_pdus.sort(key=lambda t: t[1])
-        pdus = [x[0] for x in pending_pdus]
-        edus = pending_edus
+        # Make a transaction-sending opentracing span. This span follows on from
+        # all the edus in that transaction. This needs to be done since there is
+        # no active span here, so if the edus were not received by the remote the
+        # span would have no causality and it would be forgotten.
+        # The span_contexts is a generator so that it won't be evaluated if
+        # opentracing is disabled. (Yay speed!)
 
-        success = True
+        span_contexts = (
+            extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
+        )
 
-        logger.debug("TX [%s] _attempt_new_transaction", destination)
+        with start_active_span_follows_from("send_transaction", span_contexts):
 
-        txn_id = str(self._next_txn_id)
+            # Sort based on the order field
+            pending_pdus.sort(key=lambda t: t[1])
+            pdus = [x[0] for x in pending_pdus]
+            edus = pending_edus
 
-        logger.debug(
-            "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
-            destination,
-            txn_id,
-            len(pdus),
-            len(edus),
-        )
+            success = True
 
-        transaction = Transaction.create_new(
-            origin_server_ts=int(self.clock.time_msec()),
-            transaction_id=txn_id,
-            origin=self._server_name,
-            destination=destination,
-            pdus=pdus,
-            edus=edus,
-        )
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
 
-        self._next_txn_id += 1
+            txn_id = str(self._next_txn_id)
 
-        logger.info(
-            "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
-            destination,
-            txn_id,
-            transaction.transaction_id,
-            len(pdus),
-            len(edus),
-        )
+            logger.debug(
+                "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+                destination,
+                txn_id,
+                len(pdus),
+                len(edus),
+            )
 
-        # Actually send the transaction
-
-        # FIXME (erikj): This is a bit of a hack to make the Pdu age
-        # keys work
-        def json_data_cb():
-            data = transaction.get_dict()
-            now = int(self.clock.time_msec())
-            if "pdus" in data:
-                for p in data["pdus"]:
-                    if "age_ts" in p:
-                        unsigned = p.setdefault("unsigned", {})
-                        unsigned["age"] = now - int(p["age_ts"])
-                        del p["age_ts"]
-            return data
-
-        try:
-            response = yield self._transport_layer.send_transaction(
-                transaction, json_data_cb
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self.clock.time_msec()),
+                transaction_id=txn_id,
+                origin=self._server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
             )
-            code = 200
-        except HttpResponseException as e:
-            code = e.code
-            response = e.response
 
-            if e.code in (401, 404, 429) or 500 <= e.code:
-                logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
-                raise e
+            self._next_txn_id += 1
 
-        logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+            logger.info(
+                "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+                destination,
+                txn_id,
+                transaction.transaction_id,
+                len(pdus),
+                len(edus),
+            )
 
-        if code == 200:
-            for e_id, r in response.get("pdus", {}).items():
-                if "error" in r:
+            # Actually send the transaction
+
+            # FIXME (erikj): This is a bit of a hack to make the Pdu age
+            # keys work
+            def json_data_cb():
+                data = transaction.get_dict()
+                now = int(self.clock.time_msec())
+                if "pdus" in data:
+                    for p in data["pdus"]:
+                        if "age_ts" in p:
+                            unsigned = p.setdefault("unsigned", {})
+                            unsigned["age"] = now - int(p["age_ts"])
+                            del p["age_ts"]
+                return data
+
+            try:
+                response = yield self._transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
+
+                if e.code in (401, 404, 429) or 500 <= e.code:
+                    logger.info(
+                        "TX [%s] {%s} got %d response", destination, txn_id, code
+                    )
+                    raise e
+
+            logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+
+            if code == 200:
+                for e_id, r in response.get("pdus", {}).items():
+                    if "error" in r:
+                        logger.warn(
+                            "TX [%s] {%s} Remote returned error for %s: %s",
+                            destination,
+                            txn_id,
+                            e_id,
+                            r,
+                        )
+            else:
+                for p in pdus:
                     logger.warn(
-                        "TX [%s] {%s} Remote returned error for %s: %s",
+                        "TX [%s] {%s} Failed to send event %s",
                         destination,
                         txn_id,
-                        e_id,
-                        r,
+                        p.event_id,
                     )
-        else:
-            for p in pdus:
-                logger.warn(
-                    "TX [%s] {%s} Failed to send event %s",
-                    destination,
-                    txn_id,
-                    p.event_id,
-                )
-            success = False
+                success = False
 
-        return success
+            set_tag(tags.ERROR, not success)
+            return success
diff --git a/synapse/federation/units.py b/synapse/federation/units.py
index 14aad8f09d..aa84621206 100644
--- a/synapse/federation/units.py
+++ b/synapse/federation/units.py
@@ -38,6 +38,9 @@ class Edu(JsonEncodedObject):
 
     internal_keys = ["origin", "destination"]
 
+    def get_context(self):
+        return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}")
+
 
 class Transaction(JsonEncodedObject):
     """ A transaction is a list of Pdus and Edus to be sent to a remote home
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index e1ebb6346c..c7d56779b8 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,9 +15,17 @@
 
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    set_tag,
+    start_active_span,
+    whitelisted_homeserver,
+)
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
 
@@ -100,14 +108,21 @@ class DeviceMessageHandler(object):
 
         message_id = random_string(16)
 
+        context = get_active_span_text_map()
+
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            remote_edu_contents[destination] = {
-                "messages": messages,
-                "sender": sender_user_id,
-                "type": message_type,
-                "message_id": message_id,
-            }
+            with start_active_span("to_device_for_user"):
+                set_tag("destination", destination)
+                remote_edu_contents[destination] = {
+                    "messages": messages,
+                    "sender": sender_user_id,
+                    "type": message_type,
+                    "message_id": message_id,
+                    "org.matrix.opentracing_context": json.dumps(context)
+                    if whitelisted_homeserver(destination)
+                    else None,
+                }
 
         stream_id = yield self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 4abea4474b..dd296027a1 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -149,6 +149,9 @@ unchartered waters will require the enforcement of the whitelist.
 ``logging/opentracing.py`` has a ``whitelisted_homeserver`` method which takes
 in a destination and compares it to the whitelist.
 
+Most injection methods take a 'destination' arg. The context will only be injected
+if the destination matches the whitelist or the destination is None.
+
 =======
 Gotchas
 =======
@@ -576,6 +579,29 @@ def inject_active_span_text_map(carrier, destination, check_destination=True):
     )
 
 
+def get_active_span_text_map(destination=None):
+    """
+    Gets a span context as a dict. This can be used instead of manually
+    injecting a span into an empty carrier.
+
+    Args:
+        destination (str): the name of the remote server.
+
+    Returns:
+        dict: the active span's context if opentracing is enabled, otherwise empty.
+    """
+
+    if not opentracing or (destination and not whitelisted_homeserver(destination)):
+        return {}
+
+    carrier = {}
+    opentracing.tracer.inject(
+        opentracing.tracer.active_span, opentracing.Format.TEXT_MAP, carrier
+    )
+
+    return carrier
+
+
 def active_span_context_as_string():
     """
     Returns:
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 8f72d92895..e11881161d 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -21,6 +21,11 @@ from canonicaljson import json
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
+from synapse.logging.opentracing import (
+    get_active_span_text_map,
+    trace,
+    whitelisted_homeserver,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import Cache, SQLBaseStore, db_to_json
 from synapse.storage.background_updates import BackgroundUpdateStore
@@ -73,6 +78,7 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return {d["device_id"]: d for d in devices}
 
+    @trace
     @defer.inlineCallbacks
     def get_devices_by_remote(self, destination, from_stream_id, limit):
         """Get stream of updates to send to remote servers
@@ -127,8 +133,15 @@ class DeviceWorkerStore(SQLBaseStore):
         # (user_id, device_id) entries into a map, with the value being
         # the max stream_id across each set of duplicate entries
         #
-        # maps (user_id, device_id) -> stream_id
+        # maps (user_id, device_id) -> (stream_id, opentracing_context)
         # as long as their stream_id does not match that of the last row
+        #
+        # opentracing_context contains the opentracing metadata for the request
+        # that created the poke
+        #
+        # The most recent request's opentracing_context is used as the
+        # context which created the Edu.
+
         query_map = {}
         for update in updates:
             if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -136,7 +149,14 @@ class DeviceWorkerStore(SQLBaseStore):
                 break
 
             key = (update[0], update[1])
-            query_map[key] = max(query_map.get(key, 0), update[2])
+
+            update_context = update[3]
+            update_stream_id = update[2]
+
+            previous_update_stream_id, _ = query_map.get(key, (0, None))
+
+            if update_stream_id > previous_update_stream_id:
+                query_map[key] = (update_stream_id, update_context)
 
         # If we didn't find any updates with a stream_id lower than the cutoff, it
         # means that there are more than limit updates all of which have the same
@@ -171,7 +191,7 @@ class DeviceWorkerStore(SQLBaseStore):
             List: List of device updates
         """
         sql = """
-            SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
+            SELECT user_id, device_id, stream_id, opentracing_context FROM device_lists_outbound_pokes
             WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
             ORDER BY stream_id
             LIMIT ?
@@ -187,8 +207,9 @@ class DeviceWorkerStore(SQLBaseStore):
         Args:
             destination (str): The host the device updates are intended for
             from_stream_id (int): The minimum stream_id to filter updates by, exclusive
-            query_map (Dict[(str, str): int]): Dictionary mapping
-                user_id/device_id to update stream_id
+            query_map (Dict[(str, str): (int, str|None)]): Dictionary mapping
+                user_id/device_id to update stream_id and the relevent json-encoded
+                opentracing context
 
         Returns:
             List[Dict]: List of objects representing an device update EDU
@@ -210,12 +231,13 @@ class DeviceWorkerStore(SQLBaseStore):
                 destination, user_id, from_stream_id
             )
             for device_id, device in iteritems(user_devices):
-                stream_id = query_map[(user_id, device_id)]
+                stream_id, opentracing_context = query_map[(user_id, device_id)]
                 result = {
                     "user_id": user_id,
                     "device_id": device_id,
                     "prev_id": [prev_id] if prev_id else [],
                     "stream_id": stream_id,
+                    "org.matrix.opentracing_context": opentracing_context,
                 }
 
                 prev_id = stream_id
@@ -814,6 +836,8 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
             ],
         )
 
+        context = get_active_span_text_map()
+
         self._simple_insert_many_txn(
             txn,
             table="device_lists_outbound_pokes",
@@ -825,6 +849,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
                     "device_id": device_id,
                     "sent": False,
                     "ts": now,
+                    "opentracing_context": json.dumps(context)
+                    if whitelisted_homeserver(destination)
+                    else None,
                 }
                 for destination in hosts
                 for device_id in device_ids
diff --git a/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
new file mode 100644
index 0000000000..41807eb1e7
--- /dev/null
+++ b/synapse/storage/schema/delta/56/add_spans_to_device_lists.sql
@@ -0,0 +1,20 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/*
+ * Opentracing context data for inclusion in the device_list_update EDUs, as a
+ * json-encoded dictionary. NULL if opentracing is disabled (or not enabled for this destination).
+ */
+ALTER TABLE device_lists_outbound_pokes ADD opentracing_context TEXT;