summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/federation/federation_client.py19
-rw-r--r--synapse/federation/federation_server.py16
-rw-r--r--synapse/federation/transaction_queue.py21
-rw-r--r--synapse/federation/transport/server.py4
4 files changed, 56 insertions, 4 deletions
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f131941f45..6811a0e3d1 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -25,6 +25,7 @@ from synapse.api.errors import (
 from synapse.util.expiringcache import ExpiringCache
 from synapse.util.logutils import log_function
 from synapse.events import FrozenEvent
+import synapse.metrics
 
 from synapse.util.retryutils import get_retry_limiter, NotRetryingDestination
 
@@ -36,9 +37,17 @@ import random
 logger = logging.getLogger(__name__)
 
 
+# synapse.federation.federation_client is a silly name
+metrics = synapse.metrics.get_metrics_for("synapse.federation.client")
+
+sent_pdus_destination_dist = metrics.register_distribution("sent_pdu_destinations")
+
+sent_edus_counter = metrics.register_counter("sent_edus")
+
+sent_queries_counter = metrics.register_counter("sent_queries", labels=["type"])
+
+
 class FederationClient(FederationBase):
-    def __init__(self):
-        self._get_pdu_cache = None
 
     def start_get_pdu_cache(self):
         self._get_pdu_cache = ExpiringCache(
@@ -68,6 +77,8 @@ class FederationClient(FederationBase):
         order = self._order
         self._order += 1
 
+        sent_pdus_destination_dist.inc_by(len(destinations))
+
         logger.debug("[%s] transaction_layer.enqueue_pdu... ", pdu.event_id)
 
         # TODO, add errback, etc.
@@ -87,6 +98,8 @@ class FederationClient(FederationBase):
             content=content,
         )
 
+        sent_edus_counter.inc()
+
         # TODO, add errback, etc.
         self._transaction_queue.enqueue_edu(edu)
         return defer.succeed(None)
@@ -113,6 +126,8 @@ class FederationClient(FederationBase):
             a Deferred which will eventually yield a JSON object from the
             response
         """
+        sent_queries_counter.inc(query_type)
+
         return self.transport_layer.make_query(
             destination, query_type, args, retry_on_dns_fail=retry_on_dns_fail
         )
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 9c7dcdba96..25c0014f97 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -22,6 +22,7 @@ from .units import Transaction, Edu
 from synapse.util.logutils import log_function
 from synapse.util.logcontext import PreserveLoggingContext
 from synapse.events import FrozenEvent
+import synapse.metrics
 
 from synapse.api.errors import FederationError, SynapseError
 
@@ -32,6 +33,15 @@ import logging
 
 logger = logging.getLogger(__name__)
 
+# synapse.federation.federation_server is a silly name
+metrics = synapse.metrics.get_metrics_for("synapse.federation.server")
+
+received_pdus_counter = metrics.register_counter("received_pdus")
+
+received_edus_counter = metrics.register_counter("received_edus")
+
+received_queries_counter = metrics.register_counter("received_queries", labels=["type"])
+
 
 class FederationServer(FederationBase):
     def set_handler(self, handler):
@@ -84,6 +94,8 @@ class FederationServer(FederationBase):
     def on_incoming_transaction(self, transaction_data):
         transaction = Transaction(**transaction_data)
 
+        received_pdus_counter.inc_by(len(transaction.pdus))
+
         for p in transaction.pdus:
             if "unsigned" in p:
                 unsigned = p["unsigned"]
@@ -153,6 +165,8 @@ class FederationServer(FederationBase):
         defer.returnValue((200, response))
 
     def received_edu(self, origin, edu_type, content):
+        received_edus_counter.inc()
+
         if edu_type in self.edu_handlers:
             self.edu_handlers[edu_type](origin, content)
         else:
@@ -204,6 +218,8 @@ class FederationServer(FederationBase):
 
     @defer.inlineCallbacks
     def on_query_request(self, query_type, args):
+        received_queries_counter.inc(query_type)
+
         if query_type in self.query_handlers:
             response = yield self.query_handlers[query_type](args)
             defer.returnValue((200, response))
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 9dc7849b17..4dccd93d0e 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -25,12 +25,15 @@ from synapse.util.logcontext import PreserveLoggingContext
 from synapse.util.retryutils import (
     get_retry_limiter, NotRetryingDestination,
 )
+import synapse.metrics
 
 import logging
 
 
 logger = logging.getLogger(__name__)
 
+metrics = synapse.metrics.get_metrics_for(__name__)
+
 
 class TransactionQueue(object):
     """This class makes sure we only have one transaction in flight at
@@ -54,11 +57,25 @@ class TransactionQueue(object):
         # done
         self.pending_transactions = {}
 
+        metrics.register_callback(
+            "pending_destinations",
+            lambda: len(self.pending_transactions),
+        )
+
         # Is a mapping from destination -> list of
         # tuple(pending pdus, deferred, order)
-        self.pending_pdus_by_dest = {}
+        self.pending_pdus_by_dest = pdus = {}
         # destination -> list of tuple(edu, deferred)
-        self.pending_edus_by_dest = {}
+        self.pending_edus_by_dest = edus = {}
+
+        metrics.register_callback(
+            "pending_pdus",
+            lambda: sum(map(len, pdus.values())),
+        )
+        metrics.register_callback(
+            "pending_edus",
+            lambda: sum(map(len, edus.values())),
+        )
 
         # destination -> list of tuple(failure, deferred)
         self.pending_failures_by_dest = {}
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 6c624977d7..7838a81362 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -148,6 +148,10 @@ class BaseFederationServlet(object):
                 logger.exception("authenticate_request failed")
                 raise
             defer.returnValue(response)
+
+        # Extra logic that functools.wraps() doesn't finish
+        new_code.__self__ = code.__self__
+
         return new_code
 
     def register(self, server):