summary refs log tree commit diff
path: root/synapse/metrics/background_process_metrics.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics/background_process_metrics.py')
-rw-r--r--synapse/metrics/background_process_metrics.py36
1 files changed, 31 insertions, 5 deletions
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py

index ce678d5f75..037f1c490e 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py
@@ -13,6 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging +import threading + import six from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily @@ -21,6 +24,9 @@ from twisted.internet import defer from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +logger = logging.getLogger(__name__) + + _background_process_start_count = Counter( "synapse_background_process_start_count", "Number of background processes started", @@ -78,6 +84,9 @@ _background_process_counts = dict() # type: dict[str, int] # of process descriptions that no longer have any active processes. _background_processes = dict() # type: dict[str, set[_BackgroundProcess]] +# A lock that covers the above dicts +_bg_metrics_lock = threading.Lock() + class _Collector(object): """A custom metrics collector for the background process metrics. @@ -92,7 +101,15 @@ class _Collector(object): labels=["name"], ) - for desc, processes in six.iteritems(_background_processes): + # We copy the dict so that it doesn't change from underneath us. + # We also copy the process lists as that can also change + with _bg_metrics_lock: + _background_processes_copy = { + k: list(v) + for k, v in six.iteritems(_background_processes) + } + + for desc, processes in six.iteritems(_background_processes_copy): background_process_in_flight_count.add_metric( (desc,), len(processes), ) @@ -167,19 +184,28 @@ def run_as_background_process(desc, func, *args, **kwargs): """ @defer.inlineCallbacks def run(): - count = _background_process_counts.get(desc, 0) - _background_process_counts[desc] = count + 1 + with _bg_metrics_lock: + count = _background_process_counts.get(desc, 0) + _background_process_counts[desc] = count + 1 + _background_process_start_count.labels(desc).inc() with LoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) proc = _BackgroundProcess(desc, context) - _background_processes.setdefault(desc, set()).add(proc) + + with _bg_metrics_lock: + _background_processes.setdefault(desc, set()).add(proc) + try: yield func(*args, **kwargs) + except Exception: + logger.exception("Background process '%s' threw an exception", desc) finally: proc.update_metrics() - _background_processes[desc].remove(proc) + + with _bg_metrics_lock: + _background_processes[desc].remove(proc) with PreserveLoggingContext(): return run()