summary refs log tree commit diff
diff options
context:
space:
mode:
authorJorik Schellekens <joriks@matrix.org>2019-07-17 13:53:22 +0100
committerJorik Schellekens <joriks@matrix.org>2019-07-17 14:33:00 +0100
commit783ddc417f529407d10c3d3843ea973dc643f695 (patch)
tree3321c842694c6e22bdd8c236ed36dd46f9eb73db
parentThe great logging/ migration (diff)
downloadsynapse-783ddc417f529407d10c3d3843ea973dc643f695.tar.xz
Opentracing across streams
-rw-r--r--synapse/federation/federation_server.py15
-rw-r--r--synapse/federation/sender/per_destination_queue.py209
-rw-r--r--synapse/handlers/devicemessage.py24
-rw-r--r--synapse/storage/devices.py20
-rw-r--r--synapse/storage/schema/delta/55/add_spans_to_device_lists.sql16
5 files changed, 186 insertions, 98 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8c0a18b120..c83df02a70 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -53,6 +53,8 @@ from synapse.util import glob_to_regex
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.caches.response_cache import ResponseCache
 
+import synapse.logging.opentracing as opentracing
+
 # when processing incoming transactions, we try to handle multiple rooms in
 # parallel, up to this limit.
 TRANSACTION_CONCURRENCY_LIMIT = 10
@@ -821,12 +823,13 @@ class FederationHandlerRegistry(object):
         if not handler:
             logger.warn("No handler registered for EDU type %s", edu_type)
 
-        try:
-            yield handler(origin, content)
-        except SynapseError as e:
-            logger.info("Failed to handle edu %r: %r", edu_type, e)
-        except Exception:
-            logger.exception("Failed to handle edu %r", edu_type)
+        with opentracing.start_active_span_from_edu(content, "handle_edu"):
+            try:
+                yield handler(origin, content)
+            except SynapseError as e:
+                logger.info("Failed to handle edu %r: %r", edu_type, e)
+            except Exception:
+                logger.exception("Failed to handle edu %r", edu_type)
 
     def on_query(self, query_type, args):
         handler = self.query_handlers.get(query_type)
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 9aab12c0d3..c83e1d4baf 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -16,10 +16,12 @@
 import datetime
 import logging
 
+from canonicaljson import json
 from prometheus_client import Counter
 
 from twisted.internet import defer
 
+import synapse.logging.opentracing as opentracing
 from synapse.api.errors import (
     FederationDeniedError,
     HttpResponseException,
@@ -204,97 +206,142 @@ class PerDestinationQueue(object):
 
                 pending_edus = device_update_edus + to_device_edus
 
-                # BEGIN CRITICAL SECTION
-                #
-                # In order to avoid a race condition, we need to make sure that
-                # the following code (from popping the queues up to the point
-                # where we decide if we actually have any pending messages) is
-                # atomic - otherwise new PDUs or EDUs might arrive in the
-                # meantime, but not get sent because we hold the
-                # transmission_loop_running flag.
-
-                pending_pdus = self._pending_pdus
-
-                # We can only include at most 50 PDUs per transactions
-                pending_pdus, self._pending_pdus = pending_pdus[:50], pending_pdus[50:]
-
-                pending_edus.extend(self._get_rr_edus(force_flush=False))
-                pending_presence = self._pending_presence
-                self._pending_presence = {}
-                if pending_presence:
-                    pending_edus.append(
-                        Edu(
-                            origin=self._server_name,
-                            destination=self._destination,
-                            edu_type="m.presence",
-                            content={
-                                "push": [
-                                    format_user_presence_state(
-                                        presence, self._clock.time_msec()
-                                    )
-                                    for presence in pending_presence.values()
-                                ]
-                            },
-                        )
+                # Make a transaction sending span, this span follows on from all the
+                # edus in that transaction. This needs to be done because if the edus
+                # are never received on the remote the span effectively has no causality.
+
+                span_contexts = [
+                    opentracing.extract_text_map(
+                        json.loads(
+                            edu.get_dict().get("content", {}).get("context", "{}")
+                        ).get("opentracing", {})
                     )
+                    for edu in pending_edus
+                ]
 
-                pending_edus.extend(
-                    self._pop_pending_edus(MAX_EDUS_PER_TRANSACTION - len(pending_edus))
-                )
-                while (
-                    len(pending_edus) < MAX_EDUS_PER_TRANSACTION
-                    and self._pending_edus_keyed
+                with opentracing.start_active_span_follows_from(
+                    "send_transaction", span_contexts
                 ):
-                    _, val = self._pending_edus_keyed.popitem()
-                    pending_edus.append(val)
-
-                if pending_pdus:
-                    logger.debug(
-                        "TX [%s] len(pending_pdus_by_dest[dest]) = %d",
-                        self._destination,
-                        len(pending_pdus),
+                    # Link each sent edu to this transaction's span
+                    _pending_edus = []
+                    for edu in pending_edus:
+                        edu_dict = edu.get_dict()
+                        span_context = json.loads(
+                            edu_dict.get("content", {}).get("context", "{}")
+                        ).get("opentracing", {})
+                        # If there is no span context then we are either blacklisting
+                        # this destination or we are not tracing
+                        if not span_context == {}:
+                            if not "references" in span_context:
+                                span_context["references"] = [
+                                    opentracing.active_span_context_as_string()
+                                ]
+                            else:
+                                span_context["references"].append(
+                                    opentracing.active_span_context_as_string()
+                                )
+                            edu_dict["content"]["context"] = json.dumps(
+                                {"opentracing": span_context}
+                            )
+                        _pending_edus.append(Edu(**edu_dict))
+                    pending_edus = _pending_edus
+
+                    # BEGIN CRITICAL SECTION
+                    #
+                    # In order to avoid a race condition, we need to make sure that
+                    # the following code (from popping the queues up to the point
+                    # where we decide if we actually have any pending messages) is
+                    # atomic - otherwise new PDUs or EDUs might arrive in the
+                    # meantime, but not get sent because we hold the
+                    # transmission_loop_running flag.
+
+                    pending_pdus = self._pending_pdus
+
+                    # We can only include at most 50 PDUs per transactions
+                    pending_pdus, self._pending_pdus = (
+                        pending_pdus[:50],
+                        pending_pdus[50:],
                     )
 
-                if not pending_pdus and not pending_edus:
-                    logger.debug("TX [%s] Nothing to send", self._destination)
-                    self._last_device_stream_id = device_stream_id
-                    return
-
-                # if we've decided to send a transaction anyway, and we have room, we
-                # may as well send any pending RRs
-                if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
-                    pending_edus.extend(self._get_rr_edus(force_flush=True))
-
-                # END CRITICAL SECTION
-
-                success = yield self._transaction_manager.send_new_transaction(
-                    self._destination, pending_pdus, pending_edus
-                )
-                if success:
-                    sent_transactions_counter.inc()
-                    sent_edus_counter.inc(len(pending_edus))
-                    for edu in pending_edus:
-                        sent_edus_by_type.labels(edu.edu_type).inc()
-                    # Remove the acknowledged device messages from the database
-                    # Only bother if we actually sent some device messages
-                    if to_device_edus:
-                        yield self._store.delete_device_msgs_for_remote(
-                            self._destination, device_stream_id
+                    pending_edus.extend(self._get_rr_edus(force_flush=False))
+                    pending_presence = self._pending_presence
+                    self._pending_presence = {}
+                    if pending_presence:
+                        pending_edus.append(
+                            Edu(
+                                origin=self._server_name,
+                                destination=self._destination,
+                                edu_type="m.presence",
+                                content={
+                                    "push": [
+                                        format_user_presence_state(
+                                            presence, self._clock.time_msec()
+                                        )
+                                        for presence in pending_presence.values()
+                                    ]
+                                },
+                            )
                         )
 
-                    # also mark the device updates as sent
-                    if device_update_edus:
-                        logger.info(
-                            "Marking as sent %r %r", self._destination, dev_list_id
+                    pending_edus.extend(
+                        self._pop_pending_edus(
+                            MAX_EDUS_PER_TRANSACTION - len(pending_edus)
                         )
-                        yield self._store.mark_as_sent_devices_by_remote(
-                            self._destination, dev_list_id
+                    )
+                    while (
+                        len(pending_edus) < MAX_EDUS_PER_TRANSACTION
+                        and self._pending_edus_keyed
+                    ):
+                        _, val = self._pending_edus_keyed.popitem()
+                        pending_edus.append(val)
+
+                    if pending_pdus:
+                        logger.debug(
+                            "TX [%s] len(pending_pdus_by_dest[dest]) = %d",
+                            self._destination,
+                            len(pending_pdus),
                         )
 
-                    self._last_device_stream_id = device_stream_id
-                    self._last_device_list_stream_id = dev_list_id
-                else:
-                    break
+                    if not pending_pdus and not pending_edus:
+                        logger.debug("TX [%s] Nothing to send", self._destination)
+                        self._last_device_stream_id = device_stream_id
+                        return
+
+                    # if we've decided to send a transaction anyway, and we have room, we
+                    # may as well send any pending RRs
+                    if len(pending_edus) < MAX_EDUS_PER_TRANSACTION:
+                        pending_edus.extend(self._get_rr_edus(force_flush=True))
+
+                    # END CRITICAL SECTION
+
+                    success = yield self._transaction_manager.send_new_transaction(
+                        self._destination, pending_pdus, pending_edus
+                    )
+                    if success:
+                        sent_transactions_counter.inc()
+                        sent_edus_counter.inc(len(pending_edus))
+                        for edu in pending_edus:
+                            sent_edus_by_type.labels(edu.edu_type).inc()
+                        # Remove the acknowledged device messages from the database
+                        # Only bother if we actually sent some device messages
+                        if to_device_edus:
+                            yield self._store.delete_device_msgs_for_remote(
+                                self._destination, device_stream_id
+                            )
+
+                        # also mark the device updates as sent
+                        if device_update_edus:
+                            logger.info(
+                                "Marking as sent %r %r", self._destination, dev_list_id
+                            )
+                            yield self._store.mark_as_sent_devices_by_remote(
+                                self._destination, dev_list_id
+                            )
+
+                        self._last_device_stream_id = device_stream_id
+                        self._last_device_list_stream_id = dev_list_id
+                    else:
+                        break
         except NotRetryingDestination as e:
             logger.debug(
                 "TX [%s] not ready for retry yet (next retry at %s) - "
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 6463d900cd..bed564e87b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -15,12 +15,14 @@
 
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
+import synapse.logging.opentracing as opentracing
 from synapse.api.errors import SynapseError
 from synapse.types import UserID, get_domain_from_id
 from synapse.util.stringutils import random_string
-import synapse.logging.opentracing as opentracing
 
 logger = logging.getLogger(__name__)
 
@@ -102,14 +104,22 @@ class DeviceMessageHandler(object):
 
         message_id = random_string(16)
 
+        context = {"opentracing": {}}
+        opentracing.inject_active_span_text_map(context["opentracing"])
+
         remote_edu_contents = {}
         for destination, messages in remote_messages.items():
-            remote_edu_contents[destination] = {
-                "messages": messages,
-                "sender": sender_user_id,
-                "type": message_type,
-                "message_id": message_id,
-            }
+            with opentracing.start_active_span("to_device_for_user"):
+                opentracing.set_tag("destination", destination)
+                remote_edu_contents[destination] = {
+                    "messages": messages,
+                    "sender": sender_user_id,
+                    "type": message_type,
+                    "message_id": message_id,
+                    "context": json.dumps(context)
+                    if opentracing.whitelisted_homeserver(destination)
+                    else "",
+                }
 
         opentracing.log_kv(local_messages)
         stream_id = yield self.store.add_messages_to_device_inbox(
diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py
index 315c2ba595..332c562848 100644
--- a/synapse/storage/devices.py
+++ b/synapse/storage/devices.py
@@ -74,6 +74,7 @@ class DeviceWorkerStore(SQLBaseStore):
 
         defer.returnValue({d["device_id"]: d for d in devices})
 
+    @opentracing.trace_defered_function
     @defer.inlineCallbacks
     def get_devices_by_remote(self, destination, from_stream_id, limit):
         """Get stream of updates to send to remote servers
@@ -128,8 +129,10 @@ class DeviceWorkerStore(SQLBaseStore):
         # (user_id, device_id) entries into a map, with the value being
         # the max stream_id across each set of duplicate entries
         #
-        # maps (user_id, device_id) -> stream_id
+        # maps (user_id, device_id) -> (stream_id, context)
         # as long as their stream_id does not match that of the last row
+        # where context is any metadata about the message's context such as
+        # opentracing data
         query_map = {}
         for update in updates:
             if stream_id_cutoff is not None and update[2] >= stream_id_cutoff:
@@ -137,7 +140,7 @@ class DeviceWorkerStore(SQLBaseStore):
                 break
 
             key = (update[0], update[1])
-            query_map[key] = max(query_map.get(key, 0), update[2])
+            query_map[key] = (max(query_map.get(key, 0), update[2]), update[3])
 
         # If we didn't find any updates with a stream_id lower than the cutoff, it
         # means that there are more than limit updates all of which have the same
@@ -172,7 +175,7 @@ class DeviceWorkerStore(SQLBaseStore):
             List: List of device updates
         """
         sql = """
-            SELECT user_id, device_id, stream_id FROM device_lists_outbound_pokes
+            SELECT user_id, device_id, stream_id, context FROM device_lists_outbound_pokes
             WHERE destination = ? AND ? < stream_id AND stream_id <= ? AND sent = ?
             ORDER BY stream_id
             LIMIT ?
@@ -211,12 +214,15 @@ class DeviceWorkerStore(SQLBaseStore):
                 destination, user_id, from_stream_id
             )
             for device_id, device in iteritems(user_devices):
-                stream_id = query_map[(user_id, device_id)]
+                stream_id = query_map[(user_id, device_id)][0]
                 result = {
                     "user_id": user_id,
                     "device_id": device_id,
                     "prev_id": [prev_id] if prev_id else [],
                     "stream_id": stream_id,
+                    "context": query_map[(user_id, device_id)][1]
+                    if opentracing.whitelisted_homeserver(destination)
+                    else "",
                 }
 
                 prev_id = stream_id
@@ -817,6 +823,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
             ],
         )
 
+        context = {"opentracing": {}}
+        opentracing.inject_active_span_text_map(context["opentracing"])
+
         self._simple_insert_many_txn(
             txn,
             table="device_lists_outbound_pokes",
@@ -828,6 +837,9 @@ class DeviceStore(DeviceWorkerStore, BackgroundUpdateStore):
                     "device_id": device_id,
                     "sent": False,
                     "ts": now,
+                    "context": json.dumps(context)
+                    if opentracing.whitelisted_homeserver(destination)
+                    else "",
                 }
                 for destination in hosts
                 for device_id in device_ids
diff --git a/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql b/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql
new file mode 100644
index 0000000000..a4c6b917f7
--- /dev/null
+++ b/synapse/storage/schema/delta/55/add_spans_to_device_lists.sql
@@ -0,0 +1,16 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.d
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ALTER TABLE device_lists_outbound_pokes ADD context TEXT;
\ No newline at end of file