summary refs log tree commit diff
path: root/synapse/federation/transaction_queue.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/federation/transaction_queue.py')
-rw-r--r--synapse/federation/transaction_queue.py31
1 files changed, 24 insertions, 7 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 741a4e7a1a..4dccd93d0e 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -25,12 +25,15 @@ from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.retryutils import (
     get_retry_limiter, NotRetryingDestination,
 )
+import synapse.metrics
 
 import logging
 
 
 logger = logging.getLogger(__name__)
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+
 
 class TransactionQueue(object):
     """This class makes sure we only have one transaction in flight at
@@ -54,11 +57,25 @@ class TransactionQueue(object):
         # done
         self.pending_transactions = {}
 
+        metrics.register_callback(
+            "pending_destinations",
+            lambda: len(self.pending_transactions),
+        )
+
         # Is a mapping from destination -> list of
         # tuple(pending pdus, deferred, order)
-        self.pending_pdus_by_dest = {}
+        self.pending_pdus_by_dest = pdus = {}
         # destination -> list of tuple(edu, deferred)
-        self.pending_edus_by_dest = {}
+        self.pending_edus_by_dest = edus = {}
+
+        metrics.register_callback(
+            "pending_pdus",
+            lambda: sum(map(len, pdus.values())),
+        )
+        metrics.register_callback(
+            "pending_edus",
+            lambda: sum(map(len, edus.values())),
+        )
 
         # destination -> list of tuple(failure, deferred)
         self.pending_failures_by_dest = {}
@@ -115,8 +132,8 @@ class TransactionQueue(object):
                 if not deferred.called:
                     deferred.errback(failure)
 
-            def log_failure(failure):
-                logger.warn("Failed to send pdu", failure.value)
+            def log_failure(f):
+                logger.warn("Failed to send pdu to %s: %s", destination, f.value)
 
             deferred.addErrback(log_failure)
 
@@ -143,8 +160,8 @@ class TransactionQueue(object):
             if not deferred.called:
                 deferred.errback(failure)
 
-        def log_failure(failure):
-            logger.warn("Failed to send pdu", failure.value)
+        def log_failure(f):
+            logger.warn("Failed to send edu to %s: %s", destination, f.value)
 
         deferred.addErrback(log_failure)
 
@@ -174,7 +191,7 @@ class TransactionQueue(object):
                 deferred.errback(f)
 
         def log_failure(f):
-            logger.warn("Failed to send pdu", f.value)
+            logger.warn("Failed to send failure to %s: %s", destination, f.value)
 
         deferred.addErrback(log_failure)