diff options
author | Erik Johnston <erik@matrix.org> | 2018-04-11 11:52:19 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-04-11 14:27:09 +0100 |
commit | 4dae4a97ed0e0b2cc9b5493172670ec7353ded2e (patch) | |
tree | c9cba610b5cd4985b46d50fd4fd8197179d48790 /synapse | |
parent | Track where event stream processing have gotten up to (diff) | |
download | synapse-4dae4a97ed0e0b2cc9b5493172670ec7353ded2e.tar.xz |
Track last processed event received_ts
Diffstat (limited to '')
-rw-r--r-- | synapse/federation/transaction_queue.py | 11 | ||||
-rw-r--r-- | synapse/handlers/appservice.py | 10 | ||||
-rw-r--r-- | synapse/metrics/__init__.py | 13 | ||||
-rw-r--r-- | synapse/storage/events_worker.py | 18 |
4 files changed, 52 insertions, 0 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py index d68f0d8950..1c466cbded 100644 --- a/synapse/federation/transaction_queue.py +++ b/synapse/federation/transaction_queue.py @@ -243,6 +243,17 @@ class TransactionQueue(object): next_token, "federation_sender", ) + if events: + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "federation_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "federation_sender", + ) + finally: self._is_processing = False diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 2a7e31e711..fcc1246bee 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -115,6 +115,16 @@ class ApplicationServicesHandler(object): synapse.metrics.event_processing_positions.set( upper_bound, "appservice_sender", ) + + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(events[-1].event_id) + + synapse.metrics.event_processing_lag.set( + now - ts, "appservice_sender", + ) + synapse.metrics.event_processing_last_ts.set( + ts, "appservice_sender", + ) finally: self.is_processing = False diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a6c0d7e1bf..e3b831db67 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -164,6 +164,19 @@ event_persisted_position = synapse_metrics.register_gauge( "event_persisted_position", ) +# Used to track the received_ts of the last event processed by various +# components +event_processing_last_ts = synapse_metrics.register_gauge( + "event_processing_last_ts", labels=["name"], +) + +# Used to track the lag processing events. This is the time difference +# between the last processed event's received_ts and the time it was +# finished being processed. +event_processing_lag = synapse_metrics.register_gauge( + "event_processing_lag", labels=["name"], +) + def runUntilCurrentTimer(func): diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 2e23dd78ba..769eb51489 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -51,6 +51,24 @@ _EventCacheEntry = namedtuple("_EventCacheEntry", ("event", "redacted_event")) class EventsWorkerStore(SQLBaseStore): + def get_received_ts(self, event_id): + """Get received_ts (when it was persisted) for the event + + Args: + event_id (str) + + Returns: + Deferred[int|None]: Timstamp in milliseconds, or None for events + that were persisted before received_ts was implemented. + """ + return self._simple_select_one_onecol( + table="events", + keyvalues={ + "event_id": event_id, + }, + retcol="received_ts", + desc="get_received_ts", + ) @defer.inlineCallbacks def get_event(self, event_id, check_redacted=True, |