diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9cf31f96b3..2643380d9e 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -22,8 +22,6 @@ import threading
import time
from typing import Callable, Dict, Iterable, Optional, Tuple, Union
-import six
-
import attr
from prometheus_client import Counter, Gauge, Histogram
from prometheus_client.core import (
@@ -53,7 +51,7 @@ all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollec
HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat")
-class RegistryProxy(object):
+class RegistryProxy:
@staticmethod
def collect():
for metric in REGISTRY.collect():
@@ -62,7 +60,7 @@ class RegistryProxy(object):
@attr.s(hash=True)
-class LaterGauge(object):
+class LaterGauge:
name = attr.ib(type=str)
desc = attr.ib(type=str)
@@ -83,7 +81,7 @@ class LaterGauge(object):
return
if isinstance(calls, dict):
- for k, v in six.iteritems(calls):
+ for k, v in calls.items():
g.add_metric(k, v)
else:
g.add_metric([], calls)
@@ -102,7 +100,7 @@ class LaterGauge(object):
all_gauges[self.name] = self
-class InFlightGauge(object):
+class InFlightGauge:
"""Tracks number of things (e.g. requests, Measure blocks, etc) in flight
at any given time.
@@ -194,7 +192,7 @@ class InFlightGauge(object):
gauge = GaugeMetricFamily(
"_".join([self.name, name]), "", labels=self.labels
)
- for key, metrics in six.iteritems(metrics_by_key):
+ for key, metrics in metrics_by_key.items():
gauge.add_metric(key, getattr(metrics, name))
yield gauge
@@ -208,7 +206,7 @@ class InFlightGauge(object):
@attr.s(hash=True)
-class BucketCollector(object):
+class BucketCollector:
"""
Like a Histogram, but allows buckets to be point-in-time instead of
incrementally added to.
@@ -271,7 +269,7 @@ class BucketCollector(object):
#
-class CPUMetrics(object):
+class CPUMetrics:
def __init__(self):
ticks_per_sec = 100
try:
@@ -331,7 +329,7 @@ gc_time = Histogram(
)
-class GCCounts(object):
+class GCCounts:
def collect(self):
cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"])
for n, m in enumerate(gc.get_count()):
@@ -349,7 +347,7 @@ if not running_on_pypy:
#
-class PyPyGCStats(object):
+class PyPyGCStats:
def collect(self):
# @stats is a pretty-printer object with __str__() returning a nice table,
@@ -465,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
# finished being processed.
event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
+event_processing_lag_by_event = Histogram(
+ "synapse_event_processing_lag_by_event",
+ "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
+ ["name"],
+)
+
# Build info of the running server.
build_info = Gauge(
"synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
@@ -478,7 +482,7 @@ build_info.labels(
last_ticked = time.time()
-class ReactorLastSeenMetric(object):
+class ReactorLastSeenMetric:
def collect(self):
cm = GaugeMetricFamily(
"python_twisted_reactor_last_seen",
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index ab7f948ed4..4304c60d56 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler):
raise
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
+ self.send_header("Content-Length", str(len(output)))
self.end_headers()
self.wfile.write(output)
@@ -261,4 +262,6 @@ class MetricsResource(Resource):
def render_GET(self, request):
request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
- return generate_latest(self.registry)
+ response = generate_latest(self.registry)
+ request.setHeader(b"Content-Length", str(len(response)))
+ return response
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 13785038ad..5b73463504 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,9 +13,9 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import inspect
import logging
import threading
-from asyncio import iscoroutine
from functools import wraps
from typing import TYPE_CHECKING, Dict, Optional, Set
@@ -105,7 +105,7 @@ _background_processes_active_since_last_scrape = set() # type: Set[_BackgroundP
_bg_metrics_lock = threading.Lock()
-class _Collector(object):
+class _Collector:
"""A custom metrics collector for the background process metrics.
Ensures that all of the metrics are up-to-date with any in-flight processes
@@ -140,7 +140,7 @@ class _Collector(object):
REGISTRY.register(_Collector())
-class _BackgroundProcess(object):
+class _BackgroundProcess:
def __init__(self, desc, ctx):
self.desc = desc
self._context = ctx
@@ -166,7 +166,7 @@ class _BackgroundProcess(object):
)
-def run_as_background_process(desc, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, **kwargs):
"""Run the given function in its own logcontext, with resource metrics
This should be used to wrap processes which are fired off to run in the
@@ -175,10 +175,10 @@ def run_as_background_process(desc, func, *args, **kwargs):
It returns a Deferred which completes when the function completes, but it doesn't
follow the synapse logcontext rules, which makes it appropriate for passing to
clock.looping_call and friends (or for firing-and-forgetting in the middle of a
- normal synapse inlineCallbacks function).
+ normal synapse async function).
Args:
- desc (str): a description for this background process type
+ desc: a description for this background process type
func: a function, which may return a Deferred or a coroutine
args: positional args for func
kwargs: keyword args for func
@@ -187,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
follow the synapse logcontext rules.
"""
- @defer.inlineCallbacks
- def run():
+ async def run():
with _bg_metrics_lock:
count = _background_process_counts.get(desc, 0)
_background_process_counts[desc] = count + 1
@@ -202,22 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs):
try:
result = func(*args, **kwargs)
- # We probably don't have an ensureDeferred in our call stack to handle
- # coroutine results, so we need to ensureDeferred here.
- #
- # But we need this check because ensureDeferred doesn't like being
- # called on immediate values (as opposed to Deferreds or coroutines).
- if iscoroutine(result):
- result = defer.ensureDeferred(result)
+ if inspect.isawaitable(result):
+ result = await result
- return (yield result)
+ return result
except Exception:
- logger.exception("Background process '%s' threw an exception", desc)
+ logger.exception(
+ "Background process '%s' threw an exception", desc,
+ )
finally:
_background_process_in_flight_count.labels(desc).dec()
with PreserveLoggingContext():
- return run()
+ # Note that we return a Deferred here so that it can be used in a
+ # looping_call and other places that expect a Deferred.
+ return defer.ensureDeferred(run())
def wrap_as_background_process(desc):
|