diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 2ab599a334..53c508af91 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -15,19 +15,37 @@
import logging
import threading
from functools import wraps
-from typing import TYPE_CHECKING, Dict, Optional, Set, Union
+from types import TracebackType
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ Iterable,
+ Optional,
+ Set,
+ Type,
+ TypeVar,
+ Union,
+ cast,
+)
+from prometheus_client import Metric
from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
-from synapse.logging.context import LoggingContext, PreserveLoggingContext
+from synapse.logging.context import (
+ ContextResourceUsage,
+ LoggingContext,
+ PreserveLoggingContext,
+)
from synapse.logging.opentracing import (
SynapseTags,
noop_context_manager,
start_active_span,
)
-from synapse.util.async_helpers import maybe_awaitable
if TYPE_CHECKING:
import resource
@@ -116,7 +134,7 @@ class _Collector:
before they are returned.
"""
- def collect(self):
+ def collect(self) -> Iterable[Metric]:
global _background_processes_active_since_last_scrape
# We swap out the _background_processes set with an empty one so that
@@ -144,12 +162,12 @@ REGISTRY.register(_Collector())
class _BackgroundProcess:
- def __init__(self, desc, ctx):
+ def __init__(self, desc: str, ctx: LoggingContext):
self.desc = desc
self._context = ctx
- self._reported_stats = None
+ self._reported_stats: Optional[ContextResourceUsage] = None
- def update_metrics(self):
+ def update_metrics(self) -> None:
"""Updates the metrics with values from this process."""
new_stats = self._context.get_resource_usage()
if self._reported_stats is None:
@@ -169,7 +187,16 @@ class _BackgroundProcess:
)
-def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
+R = TypeVar("R")
+
+
+def run_as_background_process(
+ desc: str,
+ func: Callable[..., Awaitable[Optional[R]]],
+ *args: Any,
+ bg_start_span: bool = True,
+ **kwargs: Any,
+) -> "defer.Deferred[Optional[R]]":
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -189,11 +216,13 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar
args: positional args for func
kwargs: keyword args for func
- Returns: Deferred which returns the result of func, but note that it does not
- follow the synapse logcontext rules.
+ Returns:
+ Deferred which returns the result of func, or `None` if func raises.
+ Note that the returned Deferred does not follow the synapse logcontext
+ rules.
"""
- async def run():
+ async def run() -> Optional[R]:
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
@@ -210,12 +239,13 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar
else:
ctx = noop_context_manager()
with ctx:
- return await maybe_awaitable(func(*args, **kwargs))
+ return await func(*args, **kwargs)
except Exception:
logger.exception(
"Background process '%s' threw an exception",
desc,
)
+ return None
finally:
_background_process_in_flight_count.labels(desc).dec()
@@ -225,19 +255,24 @@ def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwar
return defer.ensureDeferred(run())
-def wrap_as_background_process(desc):
+F = TypeVar("F", bound=Callable[..., Awaitable[Optional[Any]]])
+
+
+def wrap_as_background_process(desc: str) -> Callable[[F], F]:
"""Decorator that wraps a function that gets called as a background
process.
- Equivalent of calling the function with `run_as_background_process`
+ Equivalent to calling the function with `run_as_background_process`
"""
- def wrap_as_background_process_inner(func):
+ def wrap_as_background_process_inner(func: F) -> F:
@wraps(func)
- def wrap_as_background_process_inner_2(*args, **kwargs):
+ def wrap_as_background_process_inner_2(
+ *args: Any, **kwargs: Any
+ ) -> "defer.Deferred[Optional[R]]":
return run_as_background_process(desc, func, *args, **kwargs)
- return wrap_as_background_process_inner_2
+ return cast(F, wrap_as_background_process_inner_2)
return wrap_as_background_process_inner
@@ -265,7 +300,7 @@ class BackgroundProcessLoggingContext(LoggingContext):
super().__init__("%s-%s" % (name, instance_id))
self._proc = _BackgroundProcess(name, self)
- def start(self, rusage: "Optional[resource.struct_rusage]"):
+ def start(self, rusage: "Optional[resource.struct_rusage]") -> None:
"""Log context has started running (again)."""
super().start(rusage)
@@ -276,7 +311,12 @@ class BackgroundProcessLoggingContext(LoggingContext):
with _bg_metrics_lock:
_background_processes_active_since_last_scrape.add(self._proc)
- def __exit__(self, type, value, traceback) -> None:
+ def __exit__(
+ self,
+ type: Optional[Type[BaseException]],
+ value: Optional[BaseException],
+ traceback: Optional[TracebackType],
+ ) -> None:
"""Log context has finished."""
super().__exit__(type, value, traceback)
|