diff options
Diffstat (limited to 'synapse/federation')
-rw-r--r-- | synapse/federation/federation_server.py | 15 | ||||
-rw-r--r-- | synapse/federation/sender/transaction_manager.py | 170 | ||||
-rw-r--r-- | synapse/federation/units.py | 3 |
3 files changed, 108 insertions, 80 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 9286ca3202..05fd49f3c1 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -43,7 +43,7 @@ from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name from synapse.logging.context import nested_logging_context -from synapse.logging.opentracing import log_kv, trace +from synapse.logging.opentracing import log_kv, start_active_span_from_edu, trace from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, @@ -811,12 +811,13 @@ class FederationHandlerRegistry(object): if not handler: logger.warn("No handler registered for EDU type %s", edu_type) - try: - yield handler(origin, content) - except SynapseError as e: - logger.info("Failed to handle edu %r: %r", edu_type, e) - except Exception: - logger.exception("Failed to handle edu %r", edu_type) + with start_active_span_from_edu(content, "handle_edu"): + try: + yield handler(origin, content) + except SynapseError as e: + logger.info("Failed to handle edu %r: %r", edu_type, e) + except Exception: + logger.exception("Failed to handle edu %r", edu_type) def on_query(self, query_type, args): handler = self.query_handlers.get(query_type) diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py index 52706302f2..62ca6a3e87 100644 --- a/synapse/federation/sender/transaction_manager.py +++ b/synapse/federation/sender/transaction_manager.py @@ -14,11 +14,19 @@ # limitations under the License. import logging +from canonicaljson import json + from twisted.internet import defer from synapse.api.errors import HttpResponseException from synapse.federation.persistence import TransactionActions from synapse.federation.units import Transaction +from synapse.logging.opentracing import ( + extract_text_map, + set_tag, + start_active_span_follows_from, + tags, +) from synapse.util.metrics import measure_func logger = logging.getLogger(__name__) @@ -44,93 +52,109 @@ class TransactionManager(object): @defer.inlineCallbacks def send_new_transaction(self, destination, pending_pdus, pending_edus): - # Sort based on the order field - pending_pdus.sort(key=lambda t: t[1]) - pdus = [x[0] for x in pending_pdus] - edus = pending_edus + # Make a transaction-sending opentracing span. This span follows on from + # all the edus in that transaction. This needs to be done since there is + # no active span here, so if the edus were not received by the remote the + # span would have no causality and it would be forgotten. + # The span_contexts is a generator so that it won't be evaluated if + # opentracing is disabled. (Yay speed!) - success = True + span_contexts = ( + extract_text_map(json.loads(edu.get_context())) for edu in pending_edus + ) - logger.debug("TX [%s] _attempt_new_transaction", destination) + with start_active_span_follows_from("send_transaction", span_contexts): - txn_id = str(self._next_txn_id) + # Sort based on the order field + pending_pdus.sort(key=lambda t: t[1]) + pdus = [x[0] for x in pending_pdus] + edus = pending_edus - logger.debug( - "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", - destination, - txn_id, - len(pdus), - len(edus), - ) + success = True - transaction = Transaction.create_new( - origin_server_ts=int(self.clock.time_msec()), - transaction_id=txn_id, - origin=self._server_name, - destination=destination, - pdus=pdus, - edus=edus, - ) + logger.debug("TX [%s] _attempt_new_transaction", destination) - self._next_txn_id += 1 + txn_id = str(self._next_txn_id) - logger.info( - "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", - destination, - txn_id, - transaction.transaction_id, - len(pdus), - len(edus), - ) + logger.debug( + "TX [%s] {%s} Attempting new transaction" " (pdus: %d, edus: %d)", + destination, + txn_id, + len(pdus), + len(edus), + ) - # Actually send the transaction - - # FIXME (erikj): This is a bit of a hack to make the Pdu age - # keys work - def json_data_cb(): - data = transaction.get_dict() - now = int(self.clock.time_msec()) - if "pdus" in data: - for p in data["pdus"]: - if "age_ts" in p: - unsigned = p.setdefault("unsigned", {}) - unsigned["age"] = now - int(p["age_ts"]) - del p["age_ts"] - return data - - try: - response = yield self._transport_layer.send_transaction( - transaction, json_data_cb + transaction = Transaction.create_new( + origin_server_ts=int(self.clock.time_msec()), + transaction_id=txn_id, + origin=self._server_name, + destination=destination, + pdus=pdus, + edus=edus, ) - code = 200 - except HttpResponseException as e: - code = e.code - response = e.response - if e.code in (401, 404, 429) or 500 <= e.code: - logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) - raise e + self._next_txn_id += 1 - logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) + logger.info( + "TX [%s] {%s} Sending transaction [%s]," " (PDUs: %d, EDUs: %d)", + destination, + txn_id, + transaction.transaction_id, + len(pdus), + len(edus), + ) - if code == 200: - for e_id, r in response.get("pdus", {}).items(): - if "error" in r: + # Actually send the transaction + + # FIXME (erikj): This is a bit of a hack to make the Pdu age + # keys work + def json_data_cb(): + data = transaction.get_dict() + now = int(self.clock.time_msec()) + if "pdus" in data: + for p in data["pdus"]: + if "age_ts" in p: + unsigned = p.setdefault("unsigned", {}) + unsigned["age"] = now - int(p["age_ts"]) + del p["age_ts"] + return data + + try: + response = yield self._transport_layer.send_transaction( + transaction, json_data_cb + ) + code = 200 + except HttpResponseException as e: + code = e.code + response = e.response + + if e.code in (401, 404, 429) or 500 <= e.code: + logger.info( + "TX [%s] {%s} got %d response", destination, txn_id, code + ) + raise e + + logger.info("TX [%s] {%s} got %d response", destination, txn_id, code) + + if code == 200: + for e_id, r in response.get("pdus", {}).items(): + if "error" in r: + logger.warn( + "TX [%s] {%s} Remote returned error for %s: %s", + destination, + txn_id, + e_id, + r, + ) + else: + for p in pdus: logger.warn( - "TX [%s] {%s} Remote returned error for %s: %s", + "TX [%s] {%s} Failed to send event %s", destination, txn_id, - e_id, - r, + p.event_id, ) - else: - for p in pdus: - logger.warn( - "TX [%s] {%s} Failed to send event %s", - destination, - txn_id, - p.event_id, - ) - success = False + success = False - return success + set_tag(tags.ERROR, not success) + return success diff --git a/synapse/federation/units.py b/synapse/federation/units.py index 14aad8f09d..aa84621206 100644 --- a/synapse/federation/units.py +++ b/synapse/federation/units.py @@ -38,6 +38,9 @@ class Edu(JsonEncodedObject): internal_keys = ["origin", "destination"] + def get_context(self): + return getattr(self, "content", {}).get("org.matrix.opentracing_context", "{}") + class Transaction(JsonEncodedObject): """ A transaction is a list of Pdus and Edus to be sent to a remote home |