1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
|
# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
import threading
from contextlib import nullcontext
from functools import wraps
from types import TracebackType
from typing import (
TYPE_CHECKING,
Any,
Awaitable,
Callable,
Dict,
Iterable,
Optional,
Set,
Type,
TypeVar,
Union,
)
from prometheus_client import Metric
from prometheus_client.core import REGISTRY, Counter, Gauge
from typing_extensions import ParamSpec
from twisted.internet import defer
from synapse.logging.context import (
ContextResourceUsage,
LoggingContext,
PreserveLoggingContext,
)
from synapse.logging.opentracing import SynapseTags, start_active_span
from synapse.metrics._types import Collector
if TYPE_CHECKING:
import resource
logger = logging.getLogger(__name__)
_background_process_start_count = Counter(
"synapse_background_process_start_count",
"Number of background processes started",
["name"],
)
_background_process_in_flight_count = Gauge(
"synapse_background_process_in_flight_count",
"Number of background processes in flight",
labelnames=["name"],
)
# we set registry=None in all of these to stop them getting registered with
# the default registry. Instead we collect them all via the CustomCollector,
# which ensures that we can update them before they are collected.
#
_background_process_ru_utime = Counter(
"synapse_background_process_ru_utime_seconds",
"User CPU time used by background processes, in seconds",
["name"],
registry=None,
)
_background_process_ru_stime = Counter(
"synapse_background_process_ru_stime_seconds",
"System CPU time used by background processes, in seconds",
["name"],
registry=None,
)
_background_process_db_txn_count = Counter(
"synapse_background_process_db_txn_count",
"Number of database transactions done by background processes",
["name"],
registry=None,
)
_background_process_db_txn_duration = Counter(
"synapse_background_process_db_txn_duration_seconds",
(
"Seconds spent by background processes waiting for database "
"transactions, excluding scheduling time"
),
["name"],
registry=None,
)
_background_process_db_sched_duration = Counter(
"synapse_background_process_db_sched_duration_seconds",
"Seconds spent by background processes waiting for database connections",
["name"],
registry=None,
)
# map from description to a counter, so that we can name our logcontexts
# incrementally. (It actually duplicates _background_process_start_count, but
# it's much simpler to do so than to try to combine them.)
_background_process_counts: Dict[str, int] = {}
# Set of all running background processes that became active active since the
# last time metrics were scraped (i.e. background processes that performed some
# work since the last scrape.)
#
# We do it like this to handle the case where we have a large number of
# background processes stacking up behind a lock or linearizer, where we then
# only need to iterate over and update metrics for the process that have
# actually been active and can ignore the idle ones.
_background_processes_active_since_last_scrape: "Set[_BackgroundProcess]" = set()
# A lock that covers the above set and dict
_bg_metrics_lock = threading.Lock()
class _Collector(Collector):
"""A custom metrics collector for the background process metrics.
Ensures that all of the metrics are up-to-date with any in-flight processes
before they are returned.
"""
def collect(self) -> Iterable[Metric]:
global _background_processes_active_since_last_scrape
# We swap out the _background_processes set with an empty one so that
# we can safely iterate over the set without holding the lock.
with _bg_metrics_lock:
_background_processes_copy = _background_processes_active_since_last_scrape
_background_processes_active_since_last_scrape = set()
for process in _background_processes_copy:
process.update_metrics()
# now we need to run collect() over each of the static Counters, and
# yield each metric they return.
for m in (
_background_process_ru_utime,
_background_process_ru_stime,
_background_process_db_txn_count,
_background_process_db_txn_duration,
_background_process_db_sched_duration,
):
yield from m.collect()
REGISTRY.register(_Collector())
class _BackgroundProcess:
def __init__(self, desc: str, ctx: LoggingContext):
self.desc = desc
self._context = ctx
self._reported_stats: Optional[ContextResourceUsage] = None
def update_metrics(self) -> None:
"""Updates the metrics with values from this process."""
new_stats = self._context.get_resource_usage()
if self._reported_stats is None:
diff = new_stats
else:
diff = new_stats - self._reported_stats
self._reported_stats = new_stats
# For unknown reasons, the difference in times can be negative. See comment in
# synapse.http.request_metrics.RequestMetrics.update_metrics.
_background_process_ru_utime.labels(self.desc).inc(max(diff.ru_utime, 0))
_background_process_ru_stime.labels(self.desc).inc(max(diff.ru_stime, 0))
_background_process_db_txn_count.labels(self.desc).inc(diff.db_txn_count)
_background_process_db_txn_duration.labels(self.desc).inc(
diff.db_txn_duration_sec
)
_background_process_db_sched_duration.labels(self.desc).inc(
diff.db_sched_duration_sec
)
R = TypeVar("R")
def run_as_background_process(
desc: str,
func: Callable[..., Awaitable[Optional[R]]],
*args: Any,
bg_start_span: bool = True,
**kwargs: Any,
) -> "defer.Deferred[Optional[R]]":
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
background, instead of being associated with a particular request.
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
normal synapse async function).
Args:
desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
bg_start_span: Whether to start an opentracing span. Defaults to True.
Should only be disabled for processes that will not log to or tag
a span.
args: positional args for func
kwargs: keyword args for func
Returns:
Deferred which returns the result of func, or `None` if func raises.
Note that the returned Deferred does not follow the synapse logcontext
rules.
"""
async def run() -> Optional[R]:
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
_background_process_start_count.labels(desc).inc()
_background_process_in_flight_count.labels(desc).inc()
with BackgroundProcessLoggingContext(desc, count) as context:
try:
if bg_start_span:
ctx = start_active_span(
f"bgproc.{desc}", tags={SynapseTags.REQUEST_ID: str(context)}
)
else:
ctx = nullcontext() # type: ignore[assignment]
with ctx:
return await func(*args, **kwargs)
except Exception:
logger.exception(
"Background process '%s' threw an exception",
desc,
)
return None
finally:
_background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext():
# Note that we return a Deferred here so that it can be used in a
# looping_call and other places that expect a Deferred.
return defer.ensureDeferred(run())
P = ParamSpec("P")
def wrap_as_background_process(
desc: str,
) -> Callable[
[Callable[P, Awaitable[Optional[R]]]],
Callable[P, "defer.Deferred[Optional[R]]"],
]:
"""Decorator that wraps an asynchronous function `func`, returning a synchronous
decorated function. Calling the decorated version runs `func` as a background
process, forwarding all arguments verbatim.
That is,
@wrap_as_background_process
def func(*args): ...
func(1, 2, third=3)
is equivalent to:
def func(*args): ...
run_as_background_process(func, 1, 2, third=3)
The former can be convenient if `func` needs to be run as a background process in
multiple places.
"""
def wrap_as_background_process_inner(
func: Callable[P, Awaitable[Optional[R]]]
) -> Callable[P, "defer.Deferred[Optional[R]]"]:
@wraps(func)
def wrap_as_background_process_inner_2(
*args: P.args, **kwargs: P.kwargs
) -> "defer.Deferred[Optional[R]]":
# type-ignore: mypy is confusing kwargs with the bg_start_span kwarg.
# Argument 4 to "run_as_background_process" has incompatible type
# "**P.kwargs"; expected "bool"
# See https://github.com/python/mypy/issues/8862
return run_as_background_process(desc, func, *args, **kwargs) # type: ignore[arg-type]
return wrap_as_background_process_inner_2
return wrap_as_background_process_inner
class BackgroundProcessLoggingContext(LoggingContext):
"""A logging context that tracks in flight metrics for background
processes.
"""
__slots__ = ["_proc"]
def __init__(self, name: str, instance_id: Optional[Union[int, str]] = None):
"""
Args:
name: The name of the background process. Each distinct `name` gets a
separate prometheus time series.
instance_id: an identifer to add to `name` to distinguish this instance of
the named background process in the logs. If this is `None`, one is
made up based on id(self).
"""
if instance_id is None:
instance_id = id(self)
super().__init__("%s-%s" % (name, instance_id))
self._proc: Optional[_BackgroundProcess] = _BackgroundProcess(name, self)
def start(self, rusage: "Optional[resource.struct_rusage]") -> None:
"""Log context has started running (again)."""
super().start(rusage)
if self._proc is None:
logger.error(
"Background process re-entered without a proc: %s",
self.name,
stack_info=True,
)
return
# We've become active again so we make sure we're in the list of active
# procs. (Note that "start" here means we've become active, as opposed
# to starting for the first time.)
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.add(self._proc)
def __exit__(
self,
type: Optional[Type[BaseException]],
value: Optional[BaseException],
traceback: Optional[TracebackType],
) -> None:
"""Log context has finished."""
super().__exit__(type, value, traceback)
if self._proc is None:
logger.error(
"Background process exited without a proc: %s",
self.name,
stack_info=True,
)
return
# The background process has finished. We explicitly remove and manually
# update the metrics here so that if nothing is scraping metrics the set
# doesn't infinitely grow.
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.discard(self._proc)
self._proc.update_metrics()
# Set proc to None to break the reference cycle.
self._proc = None
|