summary refs log tree commit diff
path: root/synapse/federation/sender/transaction_manager.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/sender/transaction_manager.py')
-rw-r--r--synapse/federation/sender/transaction_manager.py193
1 files changed, 112 insertions, 81 deletions
diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 52706302f2..5b6c79c51a 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -14,11 +14,20 @@
 # limitations under the License.
 import logging
 
+from canonicaljson import json
+
 from twisted.internet import defer
 
 from synapse.api.errors import HttpResponseException
 from synapse.federation.persistence import TransactionActions
 from synapse.federation.units import Transaction
+from synapse.logging.opentracing import (
+    extract_text_map,
+    set_tag,
+    start_active_span_follows_from,
+    tags,
+    whitelisted_homeserver,
+)
 from synapse.util.metrics import measure_func
 
 logger = logging.getLogger(__name__)
@@ -44,93 +53,115 @@ class TransactionManager(object):
     @defer.inlineCallbacks
     def send_new_transaction(self, destination, pending_pdus, pending_edus):
 
-        # Sort based on the order field
-        pending_pdus.sort(key=lambda t: t[1])
-        pdus = [x[0] for x in pending_pdus]
-        edus = pending_edus
-
-        success = True
-
-        logger.debug("TX [%s] _attempt_new_transaction", destination)
-
-        txn_id = str(self._next_txn_id)
-
-        logger.debug(
-            "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
-            destination,
-            txn_id,
-            len(pdus),
-            len(edus),
-        )
-
-        transaction = Transaction.create_new(
-            origin_server_ts=int(self.clock.time_msec()),
-            transaction_id=txn_id,
-            origin=self._server_name,
-            destination=destination,
-            pdus=pdus,
-            edus=edus,
-        )
-
-        self._next_txn_id += 1
-
-        logger.info(
-            "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
-            destination,
-            txn_id,
-            transaction.transaction_id,
-            len(pdus),
-            len(edus),
-        )
-
-        # Actually send the transaction
-
-        # FIXME (erikj): This is a bit of a hack to make the Pdu age
-        # keys work
-        def json_data_cb():
-            data = transaction.get_dict()
-            now = int(self.clock.time_msec())
-            if "pdus" in data:
-                for p in data["pdus"]:
-                    if "age_ts" in p:
-                        unsigned = p.setdefault("unsigned", {})
-                        unsigned["age"] = now - int(p["age_ts"])
-                        del p["age_ts"]
-            return data
-
-        try:
-            response = yield self._transport_layer.send_transaction(
-                transaction, json_data_cb
+        # Make a transaction-sending opentracing span. This span follows on from
+        # all the edus in that transaction. This needs to be done since there is
+        # no active span here, so if the edus were not received by the remote the
+        # span would have no causality and it would be forgotten.
+        # The span_contexts is a generator so that it won't be evaluated if
+        # opentracing is disabled. (Yay speed!)
+
+        span_contexts = []
+        keep_destination = whitelisted_homeserver(destination)
+
+        for edu in pending_edus:
+            context = edu.get_context()
+            if context:
+                span_contexts.append(extract_text_map(json.loads(context)))
+            if keep_destination:
+                edu.strip_context()
+
+        with start_active_span_follows_from("send_transaction", span_contexts):
+
+            # Sort based on the order field
+            pending_pdus.sort(key=lambda t: t[1])
+            pdus = [x[0] for x in pending_pdus]
+            edus = pending_edus
+
+            success = True
+
+            logger.debug("TX [%s] _attempt_new_transaction", destination)
+
+            txn_id = str(self._next_txn_id)
+
+            logger.debug(
+                "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+                destination,
+                txn_id,
+                len(pdus),
+                len(edus),
+            )
+
+            transaction = Transaction.create_new(
+                origin_server_ts=int(self.clock.time_msec()),
+                transaction_id=txn_id,
+                origin=self._server_name,
+                destination=destination,
+                pdus=pdus,
+                edus=edus,
             )
-            code = 200
-        except HttpResponseException as e:
-            code = e.code
-            response = e.response
 
-            if e.code in (401, 404, 429) or 500 <= e.code:
-                logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
-                raise e
+            self._next_txn_id += 1
 
-        logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+            logger.info(
+                "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+                destination,
+                txn_id,
+                transaction.transaction_id,
+                len(pdus),
+                len(edus),
+            )
 
-        if code == 200:
-            for e_id, r in response.get("pdus", {}).items():
-                if "error" in r:
+            # Actually send the transaction
+
+            # FIXME (erikj): This is a bit of a hack to make the Pdu age
+            # keys work
+            def json_data_cb():
+                data = transaction.get_dict()
+                now = int(self.clock.time_msec())
+                if "pdus" in data:
+                    for p in data["pdus"]:
+                        if "age_ts" in p:
+                            unsigned = p.setdefault("unsigned", {})
+                            unsigned["age"] = now - int(p["age_ts"])
+                            del p["age_ts"]
+                return data
+
+            try:
+                response = yield self._transport_layer.send_transaction(
+                    transaction, json_data_cb
+                )
+                code = 200
+            except HttpResponseException as e:
+                code = e.code
+                response = e.response
+
+                if e.code in (401, 404, 429) or 500 <= e.code:
+                    logger.info(
+                        "TX [%s] {%s} got %d response", destination, txn_id, code
+                    )
+                    raise e
+
+            logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+
+            if code == 200:
+                for e_id, r in response.get("pdus", {}).items():
+                    if "error" in r:
+                        logger.warn(
+                            "TX [%s] {%s} Remote returned error for %s: %s",
+                            destination,
+                            txn_id,
+                            e_id,
+                            r,
+                        )
+            else:
+                for p in pdus:
                     logger.warn(
-                        "TX [%s] {%s} Remote returned error for %s: %s",
+                        "TX [%s] {%s} Failed to send event %s",
                         destination,
                         txn_id,
-                        e_id,
-                        r,
+                        p.event_id,
                     )
-        else:
-            for p in pdus:
-                logger.warn(
-                    "TX [%s] {%s} Failed to send event %s",
-                    destination,
-                    txn_id,
-                    p.event_id,
-                )
-            success = False
+                success = False
 
-        return success
+            set_tag(tags.ERROR, not success)
+            return success