summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics')
-rw-r--r--synapse/metrics/__init__.py13
-rw-r--r--synapse/metrics/background_process_metrics.py26
2 files changed, 34 insertions, 5 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index a9158fc066..550f8443f7 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions
 
 events_processed_counter = Counter("synapse_federation_client_events_processed", "")
 
+event_processing_loop_counter = Counter(
+    "synapse_event_processing_loop_count",
+    "Event processing loop iterations",
+    ["name"],
+)
+
+event_processing_loop_room_count = Counter(
+    "synapse_event_processing_loop_room_count",
+    "Rooms seen per event processing loop iteration",
+    ["name"],
+)
+
+
 # Used to track where various components have processed in the event stream,
 # e.g. federation sending, appservice sending, etc.
 event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"])
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index ce678d5f75..167167be0a 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import threading
+
 import six
 
 from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily
@@ -78,6 +80,9 @@ _background_process_counts = dict()  # type: dict[str, int]
 # of process descriptions that no longer have any active processes.
 _background_processes = dict()  # type: dict[str, set[_BackgroundProcess]]
 
+# A lock that covers the above dicts
+_bg_metrics_lock = threading.Lock()
+
 
 class _Collector(object):
     """A custom metrics collector for the background process metrics.
@@ -92,7 +97,11 @@ class _Collector(object):
             labels=["name"],
         )
 
-        for desc, processes in six.iteritems(_background_processes):
+        # We copy the dict so that it doesn't change from underneath us
+        with _bg_metrics_lock:
+            _background_processes_copy = dict(_background_processes)
+
+        for desc, processes in six.iteritems(_background_processes_copy):
             background_process_in_flight_count.add_metric(
                 (desc,), len(processes),
             )
@@ -167,19 +176,26 @@ def run_as_background_process(desc, func, *args, **kwargs):
     """
     @defer.inlineCallbacks
     def run():
-        count = _background_process_counts.get(desc, 0)
-        _background_process_counts[desc] = count + 1
+        with _bg_metrics_lock:
+            count = _background_process_counts.get(desc, 0)
+            _background_process_counts[desc] = count + 1
+
         _background_process_start_count.labels(desc).inc()
 
         with LoggingContext(desc) as context:
             context.request = "%s-%i" % (desc, count)
             proc = _BackgroundProcess(desc, context)
-            _background_processes.setdefault(desc, set()).add(proc)
+
+            with _bg_metrics_lock:
+                _background_processes.setdefault(desc, set()).add(proc)
+
             try:
                 yield func(*args, **kwargs)
             finally:
                 proc.update_metrics()
-                _background_processes[desc].remove(proc)
+
+                with _bg_metrics_lock:
+                    _background_processes[desc].remove(proc)
 
     with PreserveLoggingContext():
         return run()