From 6e3fc657b4c8db10e872ba1c7264e82b7fed5ed0 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Wed, 18 Jul 2018 10:50:33 +0100 Subject: Resource tracking for background processes This introduces a mechanism for tracking resource usage by background processes, along with an example of how it will be used. This will help address #3518, but more importantly will give us better insights into things which are happening but not being shown up by the request metrics. We *could* do this with Measure blocks, but: - I think having them pulled out as a completely separate metric class will make it easier to distinguish top-level processes from those which are nested. - I want to be able to report on in-flight background processes, and I don't think we want to do this for *all* Measure blocks. --- synapse/metrics/background_process_metrics.py | 179 ++++++++++++++++++++++++++ 1 file changed, 179 insertions(+) create mode 100644 synapse/metrics/background_process_metrics.py (limited to 'synapse/metrics/background_process_metrics.py') diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py new file mode 100644 index 0000000000..9d820e44a6 --- /dev/null +++ b/synapse/metrics/background_process_metrics.py @@ -0,0 +1,179 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import six + +from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily + +from twisted.internet import defer + +from synapse.util.logcontext import LoggingContext, PreserveLoggingContext + +_background_process_start_count = Counter( + "synapse_background_process_start_count", + "Number of background processes started", + ["name"], +) + +# we set registry=None in all of these to stop them getting registered with +# the default registry. Instead we collect them all via the CustomCollector, +# which ensures that we can update them before they are collected. +# +_background_process_ru_utime = Counter( + "synapse_background_process_ru_utime_seconds", + "User CPU time used by background processes, in seconds", + ["name"], + registry=None, +) + +_background_process_ru_stime = Counter( + "synapse_background_process_ru_stime_seconds", + "System CPU time used by background processes, in seconds", + ["name"], + registry=None, +) + +_background_process_db_txn_count = Counter( + "synapse_background_process_db_txn_count", + "Number of database transactions done by background processes", + ["name"], + registry=None, +) + +_background_process_db_txn_duration = Counter( + "synapse_background_process_db_txn_duration_seconds", + ("Seconds spent by background processes waiting for database " + "transactions, excluding scheduling time"), + ["name"], + registry=None, +) + +_background_process_db_sched_duration = Counter( + "synapse_background_process_db_sched_duration_seconds", + "Seconds spent by background processes waiting for database connections", + ["name"], + registry=None, +) + +# map from description to a counter, so that we can name our logcontexts +# incrementally. (It actually duplicates _background_process_start_count, but +# it's much simpler to do so than to try to combine them.) +_background_process_counts = dict() # type: dict[str, int] + +# map from description to the currently running background processes. +# +# it's kept as a dict of sets rather than a big set so that we can keep track +# of process descriptions that no longer have any active processes. +_background_processes = dict() # type: dict[str, set[_BackgroundProcess]] + + +class _Collector(object): + """A custom metrics collector for the background process metrics. + + Ensures that all of the metrics are up-to-date with any in-flight processes + before they are returned. + """ + def collect(self): + background_process_in_flight_count = GaugeMetricFamily( + "synapse_background_process_in_flight_count", + "Number of background processes in flight", + labels=["name"], + ) + + for desc, processes in six.iteritems(_background_processes): + background_process_in_flight_count.add_metric( + (desc,), len(processes), + ) + for process in processes: + process.update_metrics() + + yield background_process_in_flight_count + + # now we need to run collect() over each of the static Counters, and + # yield each metric they return. + for m in ( + _background_process_ru_utime, + _background_process_ru_stime, + _background_process_db_txn_count, + _background_process_db_txn_duration, + _background_process_db_sched_duration, + ): + for r in m.collect(): + yield r + + +REGISTRY.register(_Collector()) + + +class _BackgroundProcess(object): + def __init__(self, desc, ctx): + self.desc = desc + self._context = ctx + self._reported_stats = None + + def update_metrics(self): + """Updates the metrics with values from this process.""" + new_stats = self._context.get_resource_usage() + if self._reported_stats is None: + diff = new_stats + else: + diff = new_stats - self._reported_stats + self._reported_stats = new_stats + + _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime) + _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime) + _background_process_db_txn_count.labels(self.desc).inc( + diff.db_txn_count, + ) + _background_process_db_txn_duration.labels(self.desc).inc( + diff.db_txn_duration_sec, + ) + _background_process_db_sched_duration.labels(self.desc).inc( + diff.db_sched_duration_sec, + ) + + +def run_as_background_process(desc, func, *args, **kwargs): + """Run the given function in its own logcontext, with resource metrics + + This should be used to wrap processes which are fired off to run in the + background, instead of being associated with a particular request. + + Args: + desc (str): a description for this background process type + func: a function, which may return a Deferred + args: positional args for func + kwargs: keyword args for func + + Returns: None + """ + @defer.inlineCallbacks + def run(): + count = _background_process_counts.get(desc, 0) + _background_process_counts[desc] = count + 1 + _background_process_start_count.labels(desc).inc() + + with LoggingContext(desc) as context: + context.request = "%s-%i" % (desc, count) + proc = _BackgroundProcess(desc, context) + _background_processes.setdefault(desc, set()).add(proc) + try: + yield func(*args, **kwargs) + finally: + proc.update_metrics() + _background_processes[desc].remove(proc) + + with PreserveLoggingContext(): + run() -- cgit 1.4.1 From 03751a64203b169cbf33b636b6d940ca6d414c31 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff Date: Thu, 26 Jul 2018 11:44:26 +0100 Subject: Fix some looping_call calls which were broken in #3604 It turns out that looping_call does check the deferred returned by its callback, and (at least in the case of client_ips), we were relying on this, and I broke it in #3604. Update run_as_background_process to return the deferred, and make sure we return it to clock.looping_call. --- changelog.d/3610.feature | 1 + synapse/app/homeserver.py | 4 ++-- synapse/groups/attestations.py | 2 +- synapse/handlers/profile.py | 2 +- synapse/metrics/background_process_metrics.py | 10 ++++++++-- synapse/rest/media/v1/media_repository.py | 2 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/storage/client_ips.py | 2 +- synapse/storage/devices.py | 2 +- synapse/storage/event_federation.py | 2 +- synapse/storage/event_push_actions.py | 4 ++-- synapse/storage/transactions.py | 4 +++- synapse/util/caches/expiringcache.py | 2 +- 13 files changed, 24 insertions(+), 15 deletions(-) create mode 100644 changelog.d/3610.feature (limited to 'synapse/metrics/background_process_metrics.py') diff --git a/changelog.d/3610.feature b/changelog.d/3610.feature new file mode 100644 index 0000000000..77a294cb9f --- /dev/null +++ b/changelog.d/3610.feature @@ -0,0 +1 @@ +Add metrics to track resource usage by background processes diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index b7e7718290..57b815d777 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -429,7 +429,7 @@ def run(hs): stats_process = [] def start_phone_stats_home(): - run_as_background_process("phone_stats_home", phone_stats_home) + return run_as_background_process("phone_stats_home", phone_stats_home) @defer.inlineCallbacks def phone_stats_home(): @@ -502,7 +502,7 @@ def run(hs): ) def generate_user_daily_visit_stats(): - run_as_background_process( + return run_as_background_process( "generate_user_daily_visits", hs.get_datastore().generate_user_daily_visits, ) diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index 4216af0a27..b04f4234ca 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -153,7 +153,7 @@ class GroupAttestionRenewer(object): defer.returnValue({}) def _start_renew_attestations(self): - run_as_background_process("renew_attestations", self._renew_attestations) + return run_as_background_process("renew_attestations", self._renew_attestations) @defer.inlineCallbacks def _renew_attestations(self): diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py index 43692b83a8..cb5c6d587e 100644 --- a/synapse/handlers/profile.py +++ b/synapse/handlers/profile.py @@ -256,7 +256,7 @@ class ProfileHandler(BaseHandler): ) def _start_update_remote_profile_cache(self): - run_as_background_process( + return run_as_background_process( "Update remote profile", self._update_remote_profile_cache, ) diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 9d820e44a6..ce678d5f75 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -151,13 +151,19 @@ def run_as_background_process(desc, func, *args, **kwargs): This should be used to wrap processes which are fired off to run in the background, instead of being associated with a particular request. + It returns a Deferred which completes when the function completes, but it doesn't + follow the synapse logcontext rules, which makes it appropriate for passing to + clock.looping_call and friends (or for firing-and-forgetting in the middle of a + normal synapse inlineCallbacks function). + Args: desc (str): a description for this background process type func: a function, which may return a Deferred args: positional args for func kwargs: keyword args for func - Returns: None + Returns: Deferred which returns the result of func, but note that it does not + follow the synapse logcontext rules. """ @defer.inlineCallbacks def run(): @@ -176,4 +182,4 @@ def run_as_background_process(desc, func, *args, **kwargs): _background_processes[desc].remove(proc) with PreserveLoggingContext(): - run() + return run() diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 5b13378caa..174ad20123 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -106,7 +106,7 @@ class MediaRepository(object): ) def _start_update_recently_accessed(self): - run_as_background_process( + return run_as_background_process( "update_recently_accessed_media", self._update_recently_accessed, ) diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 4efd5339a4..27aa0def2f 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -373,7 +373,7 @@ class PreviewUrlResource(Resource): }) def _start_expire_url_cache_data(self): - run_as_background_process( + return run_as_background_process( "expire_url_cache_data", self._expire_url_cache_data, ) diff --git a/synapse/storage/client_ips.py b/synapse/storage/client_ips.py index 77ae10da3d..b8cefd43d6 100644 --- a/synapse/storage/client_ips.py +++ b/synapse/storage/client_ips.py @@ -102,7 +102,7 @@ class ClientIpStore(background_updates.BackgroundUpdateStore): to_update, ) - run_as_background_process( + return run_as_background_process( "update_client_ips", update, ) diff --git a/synapse/storage/devices.py b/synapse/storage/devices.py index 52dccb1507..c0943ecf91 100644 --- a/synapse/storage/devices.py +++ b/synapse/storage/devices.py @@ -712,7 +712,7 @@ class DeviceStore(SQLBaseStore): logger.info("Pruned %d device list outbound pokes", txn.rowcount) - run_as_background_process( + return run_as_background_process( "prune_old_outbound_device_pokes", self.runInteraction, "_prune_old_outbound_device_pokes", diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py index 65f2d19e20..f269ec6fb3 100644 --- a/synapse/storage/event_federation.py +++ b/synapse/storage/event_federation.py @@ -549,7 +549,7 @@ class EventFederationStore(EventFederationWorkerStore): sql, (self.stream_ordering_month_ago, self.stream_ordering_month_ago,) ) - run_as_background_process( + return run_as_background_process( "delete_old_forward_extrem_cache", self.runInteraction, "_delete_old_forward_extrem_cache", diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4f44b0ad47..6840320641 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -460,7 +460,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): ) def _find_stream_orderings_for_times(self): - run_as_background_process( + return run_as_background_process( "event_push_action_stream_orderings", self.runInteraction, "_find_stream_orderings_for_times", @@ -790,7 +790,7 @@ class EventPushActionsStore(EventPushActionsWorkerStore): """, (room_id, user_id, stream_ordering)) def _start_rotate_notifs(self): - run_as_background_process("rotate_notifs", self._rotate_notifs) + return run_as_background_process("rotate_notifs", self._rotate_notifs) @defer.inlineCallbacks def _rotate_notifs(self): diff --git a/synapse/storage/transactions.py b/synapse/storage/transactions.py index b4b479d94c..428e7fa36e 100644 --- a/synapse/storage/transactions.py +++ b/synapse/storage/transactions.py @@ -273,7 +273,9 @@ class TransactionStore(SQLBaseStore): return self.cursor_to_dict(txn) def _start_cleanup_transactions(self): - run_as_background_process("cleanup_transactions", self._cleanup_transactions) + return run_as_background_process( + "cleanup_transactions", self._cleanup_transactions, + ) def _cleanup_transactions(self): now = self._clock.time_msec() diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index 465adc54a8..ce85b2ae11 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -64,7 +64,7 @@ class ExpiringCache(object): return def f(): - run_as_background_process( + return run_as_background_process( "prune_cache_%s" % self._cache_name, self._prune_cache, ) -- cgit 1.4.1