summary refs log tree commit diff
path: root/synapse/metrics/background_process_metrics.py
blob: c53d2a0d401c91042906ce745e2d47adf0d0e92f (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
# -*- coding: utf-8 -*-
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import logging
import threading
from asyncio import iscoroutine
from functools import wraps

import six

from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily

from twisted.internet import defer

from synapse.logging.context import LoggingContext, PreserveLoggingContext

logger = logging.getLogger(__name__)


_background_process_start_count = Counter(
    "synapse_background_process_start_count",
    "Number of background processes started",
    ["name"],
)

# we set registry=None in all of these to stop them getting registered with
# the default registry. Instead we collect them all via the CustomCollector,
# which ensures that we can update them before they are collected.
#
_background_process_ru_utime = Counter(
    "synapse_background_process_ru_utime_seconds",
    "User CPU time used by background processes, in seconds",
    ["name"],
    registry=None,
)

_background_process_ru_stime = Counter(
    "synapse_background_process_ru_stime_seconds",
    "System CPU time used by background processes, in seconds",
    ["name"],
    registry=None,
)

_background_process_db_txn_count = Counter(
    "synapse_background_process_db_txn_count",
    "Number of database transactions done by background processes",
    ["name"],
    registry=None,
)

_background_process_db_txn_duration = Counter(
    "synapse_background_process_db_txn_duration_seconds",
    (
        "Seconds spent by background processes waiting for database "
        "transactions, excluding scheduling time"
    ),
    ["name"],
    registry=None,
)

_background_process_db_sched_duration = Counter(
    "synapse_background_process_db_sched_duration_seconds",
    "Seconds spent by background processes waiting for database connections",
    ["name"],
    registry=None,
)

# map from description to a counter, so that we can name our logcontexts
# incrementally. (It actually duplicates _background_process_start_count, but
# it's much simpler to do so than to try to combine them.)
_background_process_counts = dict()  # type: dict[str, int]

# map from description to the currently running background processes.
#
# it's kept as a dict of sets rather than a big set so that we can keep track
# of process descriptions that no longer have any active processes.
_background_processes = dict()  # type: dict[str, set[_BackgroundProcess]]

# A lock that covers the above dicts
_bg_metrics_lock = threading.Lock()


class _Collector(object):
    """A custom metrics collector for the background process metrics.

    Ensures that all of the metrics are up-to-date with any in-flight processes
    before they are returned.
    """

    def collect(self):
        background_process_in_flight_count = GaugeMetricFamily(
            "synapse_background_process_in_flight_count",
            "Number of background processes in flight",
            labels=["name"],
        )

        # We copy the dict so that it doesn't change from underneath us.
        # We also copy the process lists as that can also change
        with _bg_metrics_lock:
            _background_processes_copy = {
                k: list(v) for k, v in six.iteritems(_background_processes)
            }

        for desc, processes in six.iteritems(_background_processes_copy):
            background_process_in_flight_count.add_metric((desc,), len(processes))
            for process in processes:
                process.update_metrics()

        yield background_process_in_flight_count

        # now we need to run collect() over each of the static Counters, and
        # yield each metric they return.
        for m in (
            _background_process_ru_utime,
            _background_process_ru_stime,
            _background_process_db_txn_count,
            _background_process_db_txn_duration,
            _background_process_db_sched_duration,
        ):
            for r in m.collect():
                yield r


REGISTRY.register(_Collector())


class _BackgroundProcess(object):
    def __init__(self, desc, ctx):
        self.desc = desc
        self._context = ctx
        self._reported_stats = None

    def update_metrics(self):
        """Updates the metrics with values from this process."""
        new_stats = self._context.get_resource_usage()
        if self._reported_stats is None:
            diff = new_stats
        else:
            diff = new_stats - self._reported_stats
        self._reported_stats = new_stats

        _background_process_ru_utime.labels(self.desc).inc(diff.ru_utime)
        _background_process_ru_stime.labels(self.desc).inc(diff.ru_stime)
        _background_process_db_txn_count.labels(self.desc).inc(diff.db_txn_count)
        _background_process_db_txn_duration.labels(self.desc).inc(
            diff.db_txn_duration_sec
        )
        _background_process_db_sched_duration.labels(self.desc).inc(
            diff.db_sched_duration_sec
        )


def run_as_background_process(desc, func, *args, **kwargs):
    """Run the given function in its own logcontext, with resource metrics

    This should be used to wrap processes which are fired off to run in the
    background, instead of being associated with a particular request.

    It returns a Deferred which completes when the function completes, but it doesn't
    follow the synapse logcontext rules, which makes it appropriate for passing to
    clock.looping_call and friends (or for firing-and-forgetting in the middle of a
    normal synapse inlineCallbacks function).

    Args:
        desc (str): a description for this background process type
        func: a function, which may return a Deferred or a coroutine
        args: positional args for func
        kwargs: keyword args for func

    Returns: Deferred which returns the result of func, but note that it does not
        follow the synapse logcontext rules.
    """

    @defer.inlineCallbacks
    def run():
        with _bg_metrics_lock:
            count = _background_process_counts.get(desc, 0)
            _background_process_counts[desc] = count + 1

        _background_process_start_count.labels(desc).inc()

        with LoggingContext(desc) as context:
            context.request = "%s-%i" % (desc, count)
            proc = _BackgroundProcess(desc, context)

            with _bg_metrics_lock:
                _background_processes.setdefault(desc, set()).add(proc)

            try:
                result = func(*args, **kwargs)

                # We probably don't have an ensureDeferred in our call stack to handle
                # coroutine results, so we need to ensureDeferred here.
                #
                # But we need this check because ensureDeferred doesn't like being
                # called on immediate values (as opposed to Deferreds or coroutines).
                if iscoroutine(result):
                    result = defer.ensureDeferred(result)

                return (yield result)
            except Exception:
                logger.exception("Background process '%s' threw an exception", desc)
            finally:
                proc.update_metrics()

                with _bg_metrics_lock:
                    _background_processes[desc].remove(proc)

    with PreserveLoggingContext():
        return run()


def wrap_as_background_process(desc):
    """Decorator that wraps a function that gets called as a background
    process.

    Equivalent of calling the function with `run_as_background_process`
    """

    def wrap_as_background_process_inner(func):
        @wraps(func)
        def wrap_as_background_process_inner_2(*args, **kwargs):
            return run_as_background_process(desc, func, *args, **kwargs)

        return wrap_as_background_process_inner_2

    return wrap_as_background_process_inner