summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2018-08-20 17:16:58 +0100
committerErik Johnston <erik@matrix.org>2018-08-20 17:27:24 +0100
commit1058d141272f17435b3548a4b12abb51a6372383 (patch)
tree76aa31e52461be05a5b007f11996e090eef1d155 /synapse/metrics
parentPort over enough to get some sytests running on Python 3 (#3668) (diff)
downloadsynapse-1058d141272f17435b3548a4b12abb51a6372383.tar.xz
Make the in flight background process metrics thread safe
Diffstat (limited to 'synapse/metrics')
-rw-r--r--synapse/metrics/background_process_metrics.py25
1 files changed, 20 insertions, 5 deletions
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index ce678d5f75..264fb93892 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -14,6 +14,7 @@
 # limitations under the License.
 
 import six
+import threading
 
 from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
 
@@ -78,6 +79,9 @@ _background_process_counts = dict()  # type: dict[str, int]
 # of process descriptions that no longer have any active processes.
 _background_processes = dict()  # type: dict[str, set[_BackgroundProcess]]
 
+# A lock that covers the above dicts
+_bg_metrics_lock = threading.Lock()
+
 
 class _Collector(object):
     """A custom metrics collector for the background process metrics.
@@ -92,7 +96,11 @@ class _Collector(object):
             labels=["name"],
         )
 
-        for desc, processes in six.iteritems(_background_processes):
+        # We copy the dict so that it doesn't change from underneath us
+        with _bg_metrics_lock:
+            _background_processes_copy = dict(_background_processes)
+
+        for desc, processes in six.iteritems(_background_processes_copy):
             background_process_in_flight_count.add_metric(
                 (desc,), len(processes),
             )
@@ -167,19 +175,26 @@ def run_as_background_process(desc, func, *args, **kwargs):
     """
     @defer.inlineCallbacks
     def run():
-        count = _background_process_counts.get(desc, 0)
-        _background_process_counts[desc] = count + 1
+        with _bg_metrics_lock:
+            count = _background_process_counts.get(desc, 0)
+            _background_process_counts[desc] = count + 1
+
         _background_process_start_count.labels(desc).inc()
 
         with LoggingContext(desc) as context:
             context.request = "%s-%i" % (desc, count)
             proc = _BackgroundProcess(desc, context)
-            _background_processes.setdefault(desc, set()).add(proc)
+
+            with _bg_metrics_lock:
+                _background_processes.setdefault(desc, set()).add(proc)
+
             try:
                 yield func(*args, **kwargs)
             finally:
                 proc.update_metrics()
-                _background_processes[desc].remove(proc)
+
+                with _bg_metrics_lock:
+                    _background_processes[desc].remove(proc)
 
     with PreserveLoggingContext():
         return run()