diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 52706302f2..62ca6a3e87 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -14,11 +14,19 @@
# limitations under the License.
import logging
+from canonicaljson import json
+
from twisted.internet import defer
from synapse.api.errors import HttpResponseException
from synapse.federation.persistence import TransactionActions
from synapse.federation.units import Transaction
+from synapse.logging.opentracing import (
+ extract_text_map,
+ set_tag,
+ start_active_span_follows_from,
+ tags,
+)
from synapse.util.metrics import measure_func
logger = logging.getLogger(__name__)
@@ -44,93 +52,109 @@ class TransactionManager(object):
@defer.inlineCallbacks
def send_new_transaction(self, destination, pending_pdus, pending_edus):
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[1])
- pdus = [x[0] for x in pending_pdus]
- edus = pending_edus
+ # Make a transaction-sending opentracing span. This span follows on from
+ # all the edus in that transaction. This needs to be done since there is
+ # no active span here, so if the edus were not received by the remote the
+ # span would have no causality and it would be forgotten.
+ # The span_contexts is a generator so that it won't be evaluated if
+ # opentracing is disabled. (Yay speed!)
- success = True
+ span_contexts = (
+ extract_text_map(json.loads(edu.get_context())) for edu in pending_edus
+ )
- logger.debug("TX [%s] _attempt_new_transaction", destination)
+ with start_active_span_follows_from("send_transaction", span_contexts):
- txn_id = str(self._next_txn_id)
+ # Sort based on the order field
+ pending_pdus.sort(key=lambda t: t[1])
+ pdus = [x[0] for x in pending_pdus]
+ edus = pending_edus
- logger.debug(
- "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
- destination,
- txn_id,
- len(pdus),
- len(edus),
- )
+ success = True
- transaction = Transaction.create_new(
- origin_server_ts=int(self.clock.time_msec()),
- transaction_id=txn_id,
- origin=self._server_name,
- destination=destination,
- pdus=pdus,
- edus=edus,
- )
+ logger.debug("TX [%s] _attempt_new_transaction", destination)
- self._next_txn_id += 1
+ txn_id = str(self._next_txn_id)
- logger.info(
- "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
- destination,
- txn_id,
- transaction.transaction_id,
- len(pdus),
- len(edus),
- )
+ logger.debug(
+ "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)",
+ destination,
+ txn_id,
+ len(pdus),
+ len(edus),
+ )
- # Actually send the transaction
-
- # FIXME (erikj): This is a bit of a hack to make the Pdu age
- # keys work
- def json_data_cb():
- data = transaction.get_dict()
- now = int(self.clock.time_msec())
- if "pdus" in data:
- for p in data["pdus"]:
- if "age_ts" in p:
- unsigned = p.setdefault("unsigned", {})
- unsigned["age"] = now - int(p["age_ts"])
- del p["age_ts"]
- return data
-
- try:
- response = yield self._transport_layer.send_transaction(
- transaction, json_data_cb
+ transaction = Transaction.create_new(
+ origin_server_ts=int(self.clock.time_msec()),
+ transaction_id=txn_id,
+ origin=self._server_name,
+ destination=destination,
+ pdus=pdus,
+ edus=edus,
)
- code = 200
- except HttpResponseException as e:
- code = e.code
- response = e.response
- if e.code in (401, 404, 429) or 500 <= e.code:
- logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
- raise e
+ self._next_txn_id += 1
- logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+ logger.info(
+ "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)",
+ destination,
+ txn_id,
+ transaction.transaction_id,
+ len(pdus),
+ len(edus),
+ )
- if code == 200:
- for e_id, r in response.get("pdus", {}).items():
- if "error" in r:
+ # Actually send the transaction
+
+ # FIXME (erikj): This is a bit of a hack to make the Pdu age
+ # keys work
+ def json_data_cb():
+ data = transaction.get_dict()
+ now = int(self.clock.time_msec())
+ if "pdus" in data:
+ for p in data["pdus"]:
+ if "age_ts" in p:
+ unsigned = p.setdefault("unsigned", {})
+ unsigned["age"] = now - int(p["age_ts"])
+ del p["age_ts"]
+ return data
+
+ try:
+ response = yield self._transport_layer.send_transaction(
+ transaction, json_data_cb
+ )
+ code = 200
+ except HttpResponseException as e:
+ code = e.code
+ response = e.response
+
+ if e.code in (401, 404, 429) or 500 <= e.code:
+ logger.info(
+ "TX [%s] {%s} got %d response", destination, txn_id, code
+ )
+ raise e
+
+ logger.info("TX [%s] {%s} got %d response", destination, txn_id, code)
+
+ if code == 200:
+ for e_id, r in response.get("pdus", {}).items():
+ if "error" in r:
+ logger.warn(
+ "TX [%s] {%s} Remote returned error for %s: %s",
+ destination,
+ txn_id,
+ e_id,
+ r,
+ )
+ else:
+ for p in pdus:
logger.warn(
- "TX [%s] {%s} Remote returned error for %s: %s",
+ "TX [%s] {%s} Failed to send event %s",
destination,
txn_id,
- e_id,
- r,
+ p.event_id,
)
- else:
- for p in pdus:
- logger.warn(
- "TX [%s] {%s} Failed to send event %s",
- destination,
- txn_id,
- p.event_id,
- )
- success = False
+ success = False
- return success
+ set_tag(tags.ERROR, not success)
+ return success
|