summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-04-11 11:07:51 +0100
committerErik Johnston <erik@matrix.org>2018-04-11 12:13:40 +0100
commit92e34615c5c9ed5df2b0072a2686d3e42239e420 (patch)
tree53fe39bfa8245da8fa1e73690ac18a4fab9c28e7
parentAdd GaugeMetric (diff)
downloadsynapse-92e34615c5c9ed5df2b0072a2686d3e42239e420.tar.xz
Track where event stream processing have gotten up to
-rw-r--r--synapse/federation/transaction_queue.py4
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/metrics/__init__.py13
-rw-r--r--synapse/storage/events.py3
4 files changed, 24 insertions, 0 deletions
diff --git a/synapse/federation/transaction_queue.py b/synapse/federation/transaction_queue.py
index 5b0b798e57..d68f0d8950 100644
--- a/synapse/federation/transaction_queue.py
+++ b/synapse/federation/transaction_queue.py
@@ -239,6 +239,10 @@ class TransactionQueue(object):
                     "events", next_token
                 )
 
+                synapse.metrics.event_processing_positions.set(
+                    next_token, "federation_sender",
+                )
+
         finally:
             self._is_processing = False
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 3dd3fa2a27..2a7e31e711 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -111,6 +111,10 @@ class ApplicationServicesHandler(object):
                     events_processed_counter.inc_by(len(events))
 
                     yield self.store.set_appservice_last_pos(upper_bound)
+
+                    synapse.metrics.event_processing_positions.set(
+                        upper_bound, "appservice_sender",
+                    )
             finally:
                 self.is_processing = False
 
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 2666f982a6..a6c0d7e1bf 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -151,6 +151,19 @@ reactor_metrics = get_metrics_for("python.twisted.reactor")
 tick_time = reactor_metrics.register_distribution("tick_time")
 pending_calls_metric = reactor_metrics.register_distribution("pending_calls")
 
+synapse_metrics = get_metrics_for("synapse")
+
+# Used to track where various components have processed in the event stream,
+# e.g. federation sending, appservice sending, etc.
+event_processing_positions = synapse_metrics.register_gauge(
+    "event_processing_positions", labels=["name"],
+)
+
+# Used to track the current max events stream position
+event_persisted_position = synapse_metrics.register_gauge(
+    "event_persisted_position",
+)
+
 
 def runUntilCurrentTimer(func):
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index ece5e6c41f..da44b52fd6 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -444,6 +444,9 @@ class EventsStore(EventsWorkerStore):
                     new_forward_extremeties=new_forward_extremeties,
                 )
                 persist_event_counter.inc_by(len(chunk))
+                synapse.metrics.event_persisted_position.set(
+                    chunk[-1][0].internal_metadata.stream_ordering,
+                )
                 for event, context in chunk:
                     if context.app_service:
                         origin_type = "local"