diff options
Diffstat (limited to 'synapse/metrics')
-rw-r--r-- | synapse/metrics/__init__.py | 13 | ||||
-rw-r--r-- | synapse/metrics/background_process_metrics.py | 26 |
2 files changed, 34 insertions, 5 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index a9158fc066..550f8443f7 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -174,6 +174,19 @@ sent_transactions_counter = Counter("synapse_federation_client_sent_transactions events_processed_counter = Counter("synapse_federation_client_events_processed", "") +event_processing_loop_counter = Counter( + "synapse_event_processing_loop_count", + "Event processing loop iterations", + ["name"], +) + +event_processing_loop_room_count = Counter( + "synapse_event_processing_loop_room_count", + "Rooms seen per event processing loop iteration", + ["name"], +) + + # Used to track where various components have processed in the event stream, # e.g. federation sending, appservice sending, etc. event_processing_positions = Gauge("synapse_event_processing_positions", "", ["name"]) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index ce678d5f75..167167be0a 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -13,6 +13,8 @@ # See the License for the specific language governing permissions and # limitations under the License. +import threading + import six from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily @@ -78,6 +80,9 @@ _background_process_counts = dict() # type: dict[str, int] # of process descriptions that no longer have any active processes. _background_processes = dict() # type: dict[str, set[_BackgroundProcess]] +# A lock that covers the above dicts +_bg_metrics_lock = threading.Lock() + class _Collector(object): """A custom metrics collector for the background process metrics. @@ -92,7 +97,11 @@ class _Collector(object): labels=["name"], ) - for desc, processes in six.iteritems(_background_processes): + # We copy the dict so that it doesn't change from underneath us + with _bg_metrics_lock: + _background_processes_copy = dict(_background_processes) + + for desc, processes in six.iteritems(_background_processes_copy): background_process_in_flight_count.add_metric( (desc,), len(processes), ) @@ -167,19 +176,26 @@ def run_as_background_process(desc, func, *args, **kwargs): """ @defer.inlineCallbacks def run(): - count = _background_process_counts.get(desc, 0) - _background_process_counts[desc] = count + 1 + with _bg_metrics_lock: + count = _background_process_counts.get(desc, 0) + _background_process_counts[desc] = count + 1 + _background_process_start_count.labels(desc).inc() with LoggingContext(desc) as context: context.request = "%s-%i" % (desc, count) proc = _BackgroundProcess(desc, context) - _background_processes.setdefault(desc, set()).add(proc) + + with _bg_metrics_lock: + _background_processes.setdefault(desc, set()).add(proc) + try: yield func(*args, **kwargs) finally: proc.update_metrics() - _background_processes[desc].remove(proc) + + with _bg_metrics_lock: + _background_processes[desc].remove(proc) with PreserveLoggingContext(): return run() |