diff --git a/synapse/federation/sender/transaction_manager.py b/synapse/federation/sender/transaction_manager.py
index 8280f8b900..3e07f925e0 100644
--- a/synapse/federation/sender/transaction_manager.py
+++ b/synapse/federation/sender/transaction_manager.py
@@ -15,7 +15,7 @@
import logging
from typing import TYPE_CHECKING, List
-from canonicaljson import json
+from prometheus_client import Gauge
from synapse.api.errors import HttpResponseException
from synapse.events import EventBase
@@ -28,6 +28,7 @@ from synapse.logging.opentracing import (
tags,
whitelisted_homeserver,
)
+from synapse.util import json_decoder
from synapse.util.metrics import measure_func
if TYPE_CHECKING:
@@ -35,8 +36,14 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
+last_pdu_age_metric = Gauge(
+ "synapse_federation_last_sent_pdu_age",
+ "The age (in seconds) of the last PDU successfully sent to the given domain",
+ labelnames=("server_name",),
+)
+
-class TransactionManager(object):
+class TransactionManager:
"""Helper class which handles building and sending transactions
shared between PerDestinationQueue objects
@@ -49,13 +56,26 @@ class TransactionManager(object):
self._transaction_actions = TransactionActions(self._store)
self._transport_layer = hs.get_federation_transport_client()
+ self._federation_metrics_domains = (
+ hs.get_config().federation.federation_metrics_domains
+ )
+
# HACK to get unique tx id
self._next_txn_id = int(self.clock.time_msec())
@measure_func("_send_new_transaction")
async def send_new_transaction(
- self, destination: str, pending_pdus: List[EventBase], pending_edus: List[Edu]
- ):
+ self, destination: str, pdus: List[EventBase], edus: List[Edu],
+ ) -> bool:
+ """
+ Args:
+ destination: The destination to send to (e.g. 'example.org')
+ pdus: In-order list of PDUs to send
+ edus: List of EDUs to send
+
+ Returns:
+ True iff the transaction was successful
+ """
# Make a transaction-sending opentracing span. This span follows on from
# all the edus in that transaction. This needs to be done since there is
@@ -65,20 +85,14 @@ class TransactionManager(object):
span_contexts = []
keep_destination = whitelisted_homeserver(destination)
- for edu in pending_edus:
+ for edu in edus:
context = edu.get_context()
if context:
- span_contexts.append(extract_text_map(json.loads(context)))
+ span_contexts.append(extract_text_map(json_decoder.decode(context)))
if keep_destination:
edu.strip_context()
with start_active_span_follows_from("send_transaction", span_contexts):
-
- # Sort based on the order field
- pending_pdus.sort(key=lambda t: t[1])
- pdus = [x[0] for x in pending_pdus]
- edus = pending_edus
-
success = True
logger.debug("TX [%s] _attempt_new_transaction", destination)
@@ -117,6 +131,9 @@ class TransactionManager(object):
# FIXME (erikj): This is a bit of a hack to make the Pdu age
# keys work
+ # FIXME (richardv): I also believe it no longer works. We (now?) store
+ # "age_ts" in "unsigned" rather than at the top level. See
+ # https://github.com/matrix-org/synapse/issues/8429.
def json_data_cb():
data = transaction.get_dict()
now = int(self.clock.time_msec())
@@ -165,5 +182,12 @@ class TransactionManager(object):
)
success = False
+ if success and pdus and destination in self._federation_metrics_domains:
+ last_pdu = pdus[-1]
+ last_pdu_age = self.clock.time_msec() - last_pdu.origin_server_ts
+ last_pdu_age_metric.labels(server_name=destination).set(
+ last_pdu_age / 1000
+ )
+
set_tag(tags.ERROR, not success)
return success
|