diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index d68f0d8950..1c466cbded 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -243,6 +243,17 @@ class TransactionQueue(object):
next_token, "federation_sender",
)
+ if events:
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_lag.set(
+ now - ts, "federation_sender",
+ )
+ synapse.metrics.event_processing_last_ts.set(
+ ts, "federation_sender",
+ )
+
finally:
self._is_processing = False
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 2a7e31e711..fcc1246bee 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -115,6 +115,16 @@ class ApplicationServicesHandler(object):
synapse.metrics.event_processing_positions.set(
upper_bound, "appservice_sender",
)
+
+ now = self.clock.time_msec()
+ ts = yield self.store.get_received_ts(events[-1].event_id)
+
+ synapse.metrics.event_processing_lag.set(
+ now - ts, "appservice_sender",
+ )
+ synapse.metrics.event_processing_last_ts.set(
+ ts, "appservice_sender",
+ )
finally:
self.is_processing = False
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index a6c0d7e1bf..e3b831db67 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge(
"event_persisted_position",
)
+# Used to track the received_ts of the last event processed by various
+# components
+event_processing_last_ts = synapse_metrics.register_gauge(
+ "event_processing_last_ts", labels=["name"],
+)
+
+# Used to track the lag processing events. This is the time difference
+# between the last processed event's received_ts and the time it was
+# finished being processed.
+event_processing_lag = synapse_metrics.register_gauge(
+ "event_processing_lag", labels=["name"],
+)
+
def runUntilCurrentTimer(func):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index 2e23dd78ba..769eb51489 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event"))
class EventsWorkerStore(SQLBaseStore):
+ def get_received_ts(self, event_id):
+ """Get received_ts (when it was persisted) for the event
+
+ Args:
+ event_id (str)
+
+ Returns:
+ Deferred[int|None]: Timstamp in milliseconds, or None for events
+ that were persisted before received_ts was implemented.
+ """
+ return self._simple_select_one_onecol(
+ table="events",
+ keyvalues={
+ "event_id": event_id,
+ },
+ retcol="received_ts",
+ desc="get_received_ts",
+ )
@defer.inlineCallbacks
def get_event(self, event_id, check_redacted=True,
|