diff options
author | Erik Johnston <erik@matrix.org> | 2018-08-20 17:16:58 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2018-08-20 17:27:24 +0100 |
commit | 1058d141272f17435b3548a4b12abb51a6372383 (patch) | |
tree | 76aa31e52461be05a5b007f11996e090eef1d155 /synapse/metrics | |
parent | Port over enough to get some sytests running on Python 3 (#3668) (diff) | |
download | synapse-1058d141272f17435b3548a4b12abb51a6372383.tar.xz |
Make the in flight background process metrics thread safe
Diffstat (limited to 'synapse/metrics')
-rw-r--r-- | synapse/metrics/background_process_metrics.py | 25 |
1 files changed, 20 insertions, 5 deletions
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index ce678d5f75..264fb93892 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -14,6 +14,7 @@ # limitations under the License. import six +import threading from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily @@ -78,6 +79,9 @@ _background_process_counts = dict() # type: dict[str, int] # of process descriptions that no longer have any active processes. _background_processes = dict() # type: dict[str, set[_BackgroundProcess]] +# A lock that covers the above dicts +_bg_metrics_lock = threading.Lock() + class _Collector(object): """A custom metrics collector for the background process metrics. @@ -92,7 +96,11 @@ class _Collector(object): labels=["name"], ) - for desc, processes in six.iteritems(_background_processes): + # We copy the dict so that it doesn't change from underneath us + with _bg_metrics_lock: + _background_processes_copy = dict(_background_processes) + + for desc, processes in six.iteritems(_background_processes_copy): background_process_in_flight_count.add_metric( (desc,), len(processes), ) @@ -167,19 +175,26 @@ def run_as_background_process(desc, func, *args, **kwargs): """ @defer.inlineCallbacks def run(): - count = _background_process_counts.get(desc, 0) - _background_process_counts[desc] = count + 1 + with _bg_metrics_lock: + count = _background_process_counts.get(desc, 0) + _background_process_counts[desc] = count + 1 + _background_process_start_count.labels(desc).inc() with LoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) proc = _BackgroundProcess(desc, context) - _background_processes.setdefault(desc, set()).add(proc) + + with _bg_metrics_lock: + _background_processes.setdefault(desc, set()).add(proc) + try: yield func(*args, **kwargs) finally: proc.update_metrics() - _background_processes[desc].remove(proc) + + with _bg_metrics_lock: + _background_processes[desc].remove(proc) with PreserveLoggingContext(): return run() |