summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/federation/federation_server.py37
-rw-r--r--synapse/federation/sender/__init__.py10
-rw-r--r--synapse/handlers/appservice.py6
-rw-r--r--synapse/metrics/__init__.py6
4 files changed, 42 insertions, 17 deletions
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index afe0a8238b..e704cf2f44 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -18,7 +18,7 @@ import logging
 from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
 
 from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
@@ -70,6 +70,10 @@ received_queries_counter = Counter(
     "synapse_federation_server_received_queries", "", ["type"]
 )
 
+pdu_process_time = Histogram(
+    "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+)
+
 
 class FederationServer(FederationBase):
     def __init__(self, hs):
@@ -271,21 +275,22 @@ class FederationServer(FederationBase):
 
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
-                with nested_logging_context(event_id):
-                    try:
-                        await self._handle_received_pdu(origin, pdu)
-                        pdu_results[event_id] = {}
-                    except FederationError as e:
-                        logger.warning("Error handling PDU %s: %s", event_id, e)
-                        pdu_results[event_id] = {"error": str(e)}
-                    except Exception as e:
-                        f = failure.Failure()
-                        pdu_results[event_id] = {"error": str(e)}
-                        logger.error(
-                            "Failed to handle PDU %s",
-                            event_id,
-                            exc_info=(f.type, f.value, f.getTracebackObject()),
-                        )
+                with pdu_process_time.time():
+                    with nested_logging_context(event_id):
+                        try:
+                            await self._handle_received_pdu(origin, pdu)
+                            pdu_results[event_id] = {}
+                        except FederationError as e:
+                            logger.warning("Error handling PDU %s: %s", event_id, e)
+                            pdu_results[event_id] = {"error": str(e)}
+                        except Exception as e:
+                            f = failure.Failure()
+                            pdu_results[event_id] = {"error": str(e)}
+                            logger.error(
+                                "Failed to handle PDU %s",
+                                event_id,
+                                exc_info=(f.type, f.value, f.getTracebackObject()),
+                            )
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 5b8faea4e7..23fb515683 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -201,7 +201,15 @@ class FederationSender(object):
 
                     logger.debug("Sending %s to %r", event, destinations)
 
-                    self._send_pdu(event, destinations)
+                    if destinations:
+                        self._send_pdu(event, destinations)
+
+                        now = self.clock.time_msec()
+                        ts = await self.store.get_received_ts(event.event_id)
+
+                        synapse.metrics.event_processing_lag_by_event.labels(
+                            "federation_sender"
+                        ).observe(now - ts)
 
                 async def handle_room_events(events: Iterable[EventBase]) -> None:
                     with Measure(self.clock, "handle_room_events"):
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index ac1b64caff..f7d9fd621e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -114,6 +114,12 @@ class ApplicationServicesHandler(object):
                         for service in services:
                             self.scheduler.submit_event_for_as(service, event)
 
+                        now = self.clock.time_msec()
+                        ts = yield self.store.get_received_ts(event.event_id)
+                        synapse.metrics.event_processing_lag_by_event.labels(
+                            "appservice_sender"
+                        ).observe(now - ts)
+
                     @defer.inlineCallbacks
                     def handle_room_events(events):
                         for event in events:
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 087a49d65d..6035672698 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -463,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
 # finished being processed.
 event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
+event_processing_lag_by_event = Histogram(
+    "synapse_event_processing_lag_by_event",
+    "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
+    ["name"],
+)
+
 # Build info of the running server.
 build_info = Gauge(
     "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]