From 509e381afa8c656e72f5fef3d651a9819794174a Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 21 Feb 2020 07:15:07 -0500 Subject: Clarify list/set/dict/tuple comprehensions and enforce via flake8 (#6957) Ensure good comprehension hygiene using flake8-comprehensions. --- synapse/push/pusherpool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/push/pusherpool.py') diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index b9dca5bc63..01789a9fb4 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -191,7 +191,7 @@ class PusherPool: min_stream_id - 1, max_stream_id ) # This returns a tuple, user_id is at index 3 - users_affected = set([r[3] for r in updated_receipts]) + users_affected = {r[3] for r in updated_receipts} for u in users_affected: if u in self.pushers: -- cgit 1.5.1 From 8c75667ad7810b4c05e40f7665e724a40aaf4d64 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 19 Mar 2020 11:00:24 +0100 Subject: Add prometheus metrics for the number of active pushers (#7103) --- changelog.d/7103.feature | 1 + synapse/metrics/__init__.py | 12 +++++++----- synapse/metrics/background_process_metrics.py | 5 +++-- synapse/push/pusherpool.py | 24 +++++++++++++++++++++++- tox.ini | 2 ++ 5 files changed, 36 insertions(+), 8 deletions(-) create mode 100644 changelog.d/7103.feature (limited to 'synapse/push/pusherpool.py') diff --git a/changelog.d/7103.feature b/changelog.d/7103.feature new file mode 100644 index 0000000000..413e7f29d7 --- /dev/null +++ b/changelog.d/7103.feature @@ -0,0 +1 @@ +Add prometheus metrics for the number of active pushers. diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 0dba997a23..d2fd29acb4 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -20,7 +20,7 @@ import os import platform import threading import time -from typing import Dict, Union +from typing import Callable, Dict, Iterable, Optional, Tuple, Union import six @@ -59,10 +59,12 @@ class RegistryProxy(object): @attr.s(hash=True) class LaterGauge(object): - name = attr.ib() - desc = attr.ib() - labels = attr.ib(hash=False) - caller = attr.ib() + name = attr.ib(type=str) + desc = attr.ib(type=str) + labels = attr.ib(hash=False, type=Optional[Iterable[str]]) + # callback: should either return a value (if there are no labels for this metric), + # or dict mapping from a label tuple to a value + caller = attr.ib(type=Callable[[], Union[Dict[Tuple[str, ...], float], float]]) def collect(self): diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index b65bcd8806..8449ef82f7 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -17,6 +17,7 @@ import logging import threading from asyncio import iscoroutine from functools import wraps +from typing import Dict, Set import six @@ -80,13 +81,13 @@ _background_process_db_sched_duration = Counter( # map from description to a counter, so that we can name our logcontexts # incrementally. (It actually duplicates _background_process_start_count, but # it's much simpler to do so than to try to combine them.) -_background_process_counts = {} # type: dict[str, int] +_background_process_counts = {} # type: Dict[str, int] # map from description to the currently running background processes. # # it's kept as a dict of sets rather than a big set so that we can keep track # of process descriptions that no longer have any active processes. -_background_processes = {} # type: dict[str, set[_BackgroundProcess]] +_background_processes = {} # type: Dict[str, Set[_BackgroundProcess]] # A lock that covers the above dicts _bg_metrics_lock = threading.Lock() diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 01789a9fb4..bf721759df 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -15,11 +15,16 @@ # limitations under the License. import logging +from collections import defaultdict +from typing import Dict, Tuple, Union from twisted.internet import defer +from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException +from synapse.push.emailpusher import EmailPusher +from synapse.push.httppusher import HttpPusher from synapse.push.pusher import PusherFactory from synapse.util.async_helpers import concurrently_execute @@ -47,7 +52,24 @@ class PusherPool: self._should_start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() - self.pushers = {} + + # map from user id to app_id:pushkey to pusher + self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] + + def count_pushers(): + results = defaultdict(int) # type: Dict[Tuple[str, str], int] + for pushers in self.pushers.values(): + for pusher in pushers.values(): + k = (type(pusher).__name__, pusher.app_id) + results[k] += 1 + return results + + LaterGauge( + name="synapse_pushers", + desc="the number of active pushers", + labels=["kind", "app_id"], + caller=count_pushers, + ) def start(self): """Starts the pushers off in a background process. diff --git a/tox.ini b/tox.ini index 8b4c37c2ee..8e3f09e638 100644 --- a/tox.ini +++ b/tox.ini @@ -191,7 +191,9 @@ commands = mypy \ synapse/handlers/sync.py \ synapse/handlers/ui_auth \ synapse/logging/ \ + synapse/metrics \ synapse/module_api \ + synapse/push/pusherpool.py \ synapse/replication \ synapse/rest \ synapse/spam_checker_api \ -- cgit 1.5.1 From e913823a220b89a205a09efe53116fab435dfdfb Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 19 Mar 2020 11:28:49 +0100 Subject: Fix concurrent modification errors in pusher metrics (#7106) add a lock to try to make this metric actually work --- changelog.d/7106.feature | 1 + synapse/push/pusherpool.py | 27 ++++++++++++++++++--------- 2 files changed, 19 insertions(+), 9 deletions(-) create mode 100644 changelog.d/7106.feature (limited to 'synapse/push/pusherpool.py') diff --git a/changelog.d/7106.feature b/changelog.d/7106.feature new file mode 100644 index 0000000000..413e7f29d7 --- /dev/null +++ b/changelog.d/7106.feature @@ -0,0 +1 @@ +Add prometheus metrics for the number of active pushers. diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index bf721759df..88d203aa44 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -16,6 +16,7 @@ import logging from collections import defaultdict +from threading import Lock from typing import Dict, Tuple, Union from twisted.internet import defer @@ -56,12 +57,17 @@ class PusherPool: # map from user id to app_id:pushkey to pusher self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] + # a lock for the pushers dict, since `count_pushers` is called from an different + # and we otherwise get concurrent modification errors + self._pushers_lock = Lock() + def count_pushers(): results = defaultdict(int) # type: Dict[Tuple[str, str], int] - for pushers in self.pushers.values(): - for pusher in pushers.values(): - k = (type(pusher).__name__, pusher.app_id) - results[k] += 1 + with self._pushers_lock: + for pushers in self.pushers.values(): + for pusher in pushers.values(): + k = (type(pusher).__name__, pusher.app_id) + results[k] += 1 return results LaterGauge( @@ -293,11 +299,12 @@ class PusherPool: return appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"]) - byuser = self.pushers.setdefault(pusherdict["user_name"], {}) - if appid_pushkey in byuser: - byuser[appid_pushkey].on_stop() - byuser[appid_pushkey] = p + with self._pushers_lock: + byuser = self.pushers.setdefault(pusherdict["user_name"], {}) + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p # Check if there *may* be push to process. We do this as this check is a # lot cheaper to do than actually fetching the exact rows we need to @@ -326,7 +333,9 @@ class PusherPool: if appid_pushkey in byuser: logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) byuser[appid_pushkey].on_stop() - del byuser[appid_pushkey] + with self._pushers_lock: + del byuser[appid_pushkey] + yield self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) -- cgit 1.5.1