summary refs log tree commit diff
path: root/synapse/federation
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-06-30 16:58:06 +0100
committerGitHub <noreply@github.com>2020-06-30 16:58:06 +0100
commita99658074dc3b2b0f6abcb4f98d56bc1386398aa (patch)
tree108e1d5a12f2e3e1860aacad4b2790e80209882e /synapse/federation
parentExplain the purpose of the "tests" conditional dependency requirement (#7751) (diff)
downloadsynapse-a99658074dc3b2b0f6abcb4f98d56bc1386398aa.tar.xz
Add some metrics for inbound and outbound federation processing times (#7755)
Diffstat (limited to 'synapse/federation')
-rw-r--r--synapse/federation/federation_server.py37
-rw-r--r--synapse/federation/sender/__init__.py10
2 files changed, 30 insertions, 17 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index afe0a8238b..e704cf2f44 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -18,7 +18,7 @@ import logging
 from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
 
 from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
@@ -70,6 +70,10 @@ received_queries_counter = Counter(
     "synapse_federation_server_received_queries", "", ["type"]
 )
 
+pdu_process_time = Histogram(
+    "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+)
+
 
 class FederationServer(FederationBase):
     def __init__(self, hs):
@@ -271,21 +275,22 @@ class FederationServer(FederationBase):
 
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
-                with nested_logging_context(event_id):
-                    try:
-                        await self._handle_received_pdu(origin, pdu)
-                        pdu_results[event_id] = {}
-                    except FederationError as e:
-                        logger.warning("Error handling PDU %s: %s", event_id, e)
-                        pdu_results[event_id] = {"error": str(e)}
-                    except Exception as e:
-                        f = failure.Failure()
-                        pdu_results[event_id] = {"error": str(e)}
-                        logger.error(
-                            "Failed to handle PDU %s",
-                            event_id,
-                            exc_info=(f.type, f.value, f.getTracebackObject()),
-                        )
+                with pdu_process_time.time():
+                    with nested_logging_context(event_id):
+                        try:
+                            await self._handle_received_pdu(origin, pdu)
+                            pdu_results[event_id] = {}
+                        except FederationError as e:
+                            logger.warning("Error handling PDU %s: %s", event_id, e)
+                            pdu_results[event_id] = {"error": str(e)}
+                        except Exception as e:
+                            f = failure.Failure()
+                            pdu_results[event_id] = {"error": str(e)}
+                            logger.error(
+                                "Failed to handle PDU %s",
+                                event_id,
+                                exc_info=(f.type, f.value, f.getTracebackObject()),
+                            )
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 5b8faea4e7..23fb515683 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -201,7 +201,15 @@ class FederationSender(object):
 
                     logger.debug("Sending %s to %r", event, destinations)
 
-                    self._send_pdu(event, destinations)
+                    if destinations:
+                        self._send_pdu(event, destinations)
+
+                        now = self.clock.time_msec()
+                        ts = await self.store.get_received_ts(event.event_id)
+
+                        synapse.metrics.event_processing_lag_by_event.labels(
+                            "federation_sender"
+                        ).observe(now - ts)
 
                 async def handle_room_events(events: Iterable[EventBase]) -> None:
                     with Measure(self.clock, "handle_room_events"):