summary refs log tree commit diff
diff options
context:
space:
mode:
authorRichard van der Hoff <1389908+richvdh@users.noreply.github.com>2020-03-19 11:00:24 +0100
committerGitHub <noreply@github.com>2020-03-19 10:00:24 +0000
commit8c75667ad7810b4c05e40f7665e724a40aaf4d64 (patch)
tree151260d3d74a739c2d658df8d651e73a0a6ff06d
parentMove pusherpool startup into _base.setup (#7104) (diff)
downloadsynapse-8c75667ad7810b4c05e40f7665e724a40aaf4d64.tar.xz
Add prometheus metrics for the number of active pushers (#7103)
-rw-r--r--changelog.d/7103.feature1
-rw-r--r--synapse/metrics/__init__.py12
-rw-r--r--synapse/metrics/background_process_metrics.py5
-rw-r--r--synapse/push/pusherpool.py24
-rw-r--r--tox.ini2
5 files changed, 36 insertions, 8 deletions
diff --git a/changelog.d/7103.feature b/changelog.d/7103.feature
new file mode 100644
index 0000000000..413e7f29d7
--- /dev/null
+++ b/changelog.d/7103.feature
@@ -0,0 +1 @@
+Add prometheus metrics for the number of active pushers.
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 0dba997a23..d2fd29acb4 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -20,7 +20,7 @@ import os
 import platform
 import threading
 import time
-from typing import Dict, Union
+from typing import Callable, Dict, Iterable, Optional, Tuple, Union
 
 import six
 
@@ -59,10 +59,12 @@ class RegistryProxy(object):
 @attr.s(hash=True)
 class LaterGauge(object):
 
-    name = attr.ib()
-    desc = attr.ib()
-    labels = attr.ib(hash=False)
-    caller = attr.ib()
+    name = attr.ib(type=str)
+    desc = attr.ib(type=str)
+    labels = attr.ib(hash=False, type=Optional[Iterable[str]])
+    # callback: should either return a value (if there are no labels for this metric),
+    # or dict mapping from a label tuple to a value
+    caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]])
 
     def collect(self):
 
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index b65bcd8806..8449ef82f7 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -17,6 +17,7 @@ import logging
 import threading
 from asyncio import iscoroutine
 from functools import wraps
+from typing import Dict, Set
 
 import six
 
@@ -80,13 +81,13 @@ _background_process_db_sched_duration = Counter(
 # map from description to a counter, so that we can name our logcontexts
 # incrementally. (It actually duplicates _background_process_start_count, but
 # it's much simpler to do so than to try to combine them.)
-_background_process_counts = {}  # type: dict[str, int]
+_background_process_counts = {}  # type: Dict[str, int]
 
 # map from description to the currently running background processes.
 #
 # it's kept as a dict of sets rather than a big set so that we can keep track
 # of process descriptions that no longer have any active processes.
-_background_processes = {}  # type: dict[str, set[_BackgroundProcess]]
+_background_processes = {}  # type: Dict[str, Set[_BackgroundProcess]]
 
 # A lock that covers the above dicts
 _bg_metrics_lock = threading.Lock()
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 01789a9fb4..bf721759df 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -15,11 +15,16 @@
 # limitations under the License.
 
 import logging
+from collections import defaultdict
+from typing import Dict, Tuple, Union
 
 from twisted.internet import defer
 
+from synapse.metrics import LaterGauge
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
+from synapse.push.emailpusher import EmailPusher
+from synapse.push.httppusher import HttpPusher
 from synapse.push.pusher import PusherFactory
 from synapse.util.async_helpers import concurrently_execute
 
@@ -47,7 +52,24 @@ class PusherPool:
         self._should_start_pushers = _hs.config.start_pushers
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
-        self.pushers = {}
+
+        # map from user id to app_id:pushkey to pusher
+        self.pushers = {}  # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]]
+
+        def count_pushers():
+            results = defaultdict(int)  # type: Dict[Tuple[str, str], int]
+            for pushers in self.pushers.values():
+                for pusher in pushers.values():
+                    k = (type(pusher).__name__, pusher.app_id)
+                    results[k] += 1
+            return results
+
+        LaterGauge(
+            name="synapse_pushers",
+            desc="the number of active pushers",
+            labels=["kind", "app_id"],
+            caller=count_pushers,
+        )
 
     def start(self):
         """Starts the pushers off in a background process.
diff --git a/tox.ini b/tox.ini
index 8b4c37c2ee..8e3f09e638 100644
--- a/tox.ini
+++ b/tox.ini
@@ -191,7 +191,9 @@ commands = mypy \
             synapse/handlers/sync.py \
             synapse/handlers/ui_auth \
             synapse/logging/ \
+            synapse/metrics \
             synapse/module_api \
+            synapse/push/pusherpool.py \
             synapse/replication \
             synapse/rest \
             synapse/spam_checker_api \