diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index a9269196b3..5b73463504 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,16 +13,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import inspect
import logging
import threading
-from asyncio import iscoroutine
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set
from prometheus_client.core import REGISTRY, Counter, Gauge
from twisted.internet import defer
-from twisted.python.failure import Failure
from synapse.logging.context import LoggingContext, PreserveLoggingContext
@@ -106,7 +105,7 @@ _background_processes_active_since_last_scrape = set() # type: Set[_BackgroundP
_bg_metrics_lock = threading.Lock()
-class _Collector(object):
+class _Collector:
"""A custom metrics collector for the background process metrics.
Ensures that all of the metrics are up-to-date with any in-flight processes
@@ -141,7 +140,7 @@ class _Collector(object):
REGISTRY.register(_Collector())
-class _BackgroundProcess(object):
+class _BackgroundProcess:
def __init__(self, desc, ctx):
self.desc = desc
self._context = ctx
@@ -167,7 +166,7 @@ class _BackgroundProcess(object):
)
-def run_as_background_process(desc, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -176,10 +175,10 @@ def run_as_background_process(desc, func, *args, **kwargs):
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
- normal synapse inlineCallbacks function).
+ normal synapse async function).
Args:
- desc (str): a description for this background process type
+ desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
@@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules.
"""
- @defer.inlineCallbacks
- def run():
+ async def run():
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
@@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
- # We probably don't have an ensureDeferred in our call stack to handle
- # coroutine results, so we need to ensureDeferred here.
- #
- # But we need this check because ensureDeferred doesn't like being
- # called on immediate values (as opposed to Deferreds or coroutines).
- if iscoroutine(result):
- result = defer.ensureDeferred(result)
+ if inspect.isawaitable(result):
+ result = await result
- return (yield result)
+ return result
except Exception:
- # failure.Failure() fishes the original Failure out of our stack, and
- # thus gives us a sensible stack trace.
- f = Failure()
- logger.error(
- "Background process '%s' threw an exception",
- desc,
- exc_info=(f.type, f.value, f.getTracebackObject()),
+ logger.exception(
+ "Background process '%s' threw an exception", desc,
)
finally:
_background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext():
- return run()
+ # Note that we return a Deferred here so that it can be used in a
+ # looping_call and other places that expect a Deferred.
+ return defer.ensureDeferred(run())
def wrap_as_background_process(desc):
|