diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index ce678d5f75..037f1c490e 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,6 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
+import threading
+
import six
from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
@@ -21,6 +24,9 @@ from twisted.internet import defer
from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
+logger = logging.getLogger(__name__)
+
+
_background_process_start_count = Counter(
"synapse_background_process_start_count",
"Number of background processes started",
@@ -78,6 +84,9 @@ _background_process_counts = dict() # type: dict[str, int]
# of process descriptions that no longer have any active processes.
_background_processes = dict() # type: dict[str, set[_BackgroundProcess]]
+# A lock that covers the above dicts
+_bg_metrics_lock = threading.Lock()
+
class _Collector(object):
"""A custom metrics collector for the background process metrics.
@@ -92,7 +101,15 @@ class _Collector(object):
labels=["name"],
)
- for desc, processes in six.iteritems(_background_processes):
+ # We copy the dict so that it doesn't change from underneath us.
+ # We also copy the process lists as that can also change
+ with _bg_metrics_lock:
+ _background_processes_copy = {
+ k: list(v)
+ for k, v in six.iteritems(_background_processes)
+ }
+
+ for desc, processes in six.iteritems(_background_processes_copy):
background_process_in_flight_count.add_metric(
(desc,), len(processes),
)
@@ -167,19 +184,28 @@ def run_as_background_process(desc, func, *args, **kwargs):
"""
@defer.inlineCallbacks
def run():
- count = _background_process_counts.get(desc, 0)
- _background_process_counts[desc] = count + 1
+ with _bg_metrics_lock:
+ count = _background_process_counts.get(desc, 0)
+ _background_process_counts[desc] = count + 1
+
_background_process_start_count.labels(desc).inc()
with LoggingContext(desc) as context:
context.request = "%s-%i" % (desc, count)
proc = _BackgroundProcess(desc, context)
- _background_processes.setdefault(desc, set()).add(proc)
+
+ with _bg_metrics_lock:
+ _background_processes.setdefault(desc, set()).add(proc)
+
try:
yield func(*args, **kwargs)
+ except Exception:
+ logger.exception("Background process '%s' threw an exception", desc)
finally:
proc.update_metrics()
- _background_processes[desc].remove(proc)
+
+ with _bg_metrics_lock:
+ _background_processes[desc].remove(proc)
with PreserveLoggingContext():
return run()
|