summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/metrics/__init__.py28
-rw-r--r--synapse/metrics/_exposition.py5
-rw-r--r--synapse/metrics/background_process_metrics.py34
3 files changed, 36 insertions, 31 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py

index 9cf31f96b3..2643380d9e 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py
@@ -22,8 +22,6 @@ import threading import time from typing import Callable, Dict, Iterable, Optional, Tuple, Union -import six - import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import ( @@ -53,7 +51,7 @@ all_gauges = {} # type: Dict[str, Union[LaterGauge, InFlightGauge, BucketCollec HAVE_PROC_SELF_STAT = os.path.exists("/proc/self/stat") -class RegistryProxy(object): +class RegistryProxy: @staticmethod def collect(): for metric in REGISTRY.collect(): @@ -62,7 +60,7 @@ class RegistryProxy(object): @attr.s(hash=True) -class LaterGauge(object): +class LaterGauge: name = attr.ib(type=str) desc = attr.ib(type=str) @@ -83,7 +81,7 @@ class LaterGauge(object): return if isinstance(calls, dict): - for k, v in six.iteritems(calls): + for k, v in calls.items(): g.add_metric(k, v) else: g.add_metric([], calls) @@ -102,7 +100,7 @@ class LaterGauge(object): all_gauges[self.name] = self -class InFlightGauge(object): +class InFlightGauge: """Tracks number of things (e.g. requests, Measure blocks, etc) in flight at any given time. @@ -194,7 +192,7 @@ class InFlightGauge(object): gauge = GaugeMetricFamily( "_".join([self.name, name]), "", labels=self.labels ) - for key, metrics in six.iteritems(metrics_by_key): + for key, metrics in metrics_by_key.items(): gauge.add_metric(key, getattr(metrics, name)) yield gauge @@ -208,7 +206,7 @@ class InFlightGauge(object): @attr.s(hash=True) -class BucketCollector(object): +class BucketCollector: """ Like a Histogram, but allows buckets to be point-in-time instead of incrementally added to. @@ -271,7 +269,7 @@ class BucketCollector(object): # -class CPUMetrics(object): +class CPUMetrics: def __init__(self): ticks_per_sec = 100 try: @@ -331,7 +329,7 @@ gc_time = Histogram( ) -class GCCounts(object): +class GCCounts: def collect(self): cm = GaugeMetricFamily("python_gc_counts", "GC object counts", labels=["gen"]) for n, m in enumerate(gc.get_count()): @@ -349,7 +347,7 @@ if not running_on_pypy: # -class PyPyGCStats(object): +class PyPyGCStats: def collect(self): # @stats is a pretty-printer object with __str__() returning a nice table, @@ -465,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name" # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) +event_processing_lag_by_event = Histogram( + "synapse_event_processing_lag_by_event", + "Time between an event being persisted and it being queued up to be sent to the relevant remote servers", + ["name"], +) + # Build info of the running server. build_info = Gauge( "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] @@ -478,7 +482,7 @@ build_info.labels( last_ticked = time.time() -class ReactorLastSeenMetric(object): +class ReactorLastSeenMetric: def collect(self): cm = GaugeMetricFamily( "python_twisted_reactor_last_seen", diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index ab7f948ed4..4304c60d56 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py
@@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler): raise self.send_response(200) self.send_header("Content-Type", CONTENT_TYPE_LATEST) + self.send_header("Content-Length", str(len(output))) self.end_headers() self.wfile.write(output) @@ -261,4 +262,6 @@ class MetricsResource(Resource): def render_GET(self, request): request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii")) - return generate_latest(self.registry) + response = generate_latest(self.registry) + request.setHeader(b"Content-Length", str(len(response))) + return response diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 13785038ad..5b73463504 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py
@@ -13,9 +13,9 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import logging import threading -from asyncio import iscoroutine from functools import wraps from typing import TYPE_CHECKING, Dict, Optional, Set @@ -105,7 +105,7 @@ _background_processes_active_since_last_scrape = set() # type: Set[_BackgroundP _bg_metrics_lock = threading.Lock() -class _Collector(object): +class _Collector: """A custom metrics collector for the background process metrics. Ensures that all of the metrics are up-to-date with any in-flight processes @@ -140,7 +140,7 @@ class _Collector(object): REGISTRY.register(_Collector()) -class _BackgroundProcess(object): +class _BackgroundProcess: def __init__(self, desc, ctx): self.desc = desc self._context = ctx @@ -166,7 +166,7 @@ class _BackgroundProcess(object): ) -def run_as_background_process(desc, func, *args, **kwargs): +def run_as_background_process(desc: str, func, *args, **kwargs): """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -175,10 +175,10 @@ def run_as_background_process(desc, func, *args, **kwargs): It returns a Deferred which completes when the function completes, but it doesn't follow the synapse logcontext rules, which makes it appropriate for passing to clock.looping_call and friends (or for firing-and-forgetting in the middle of a - normal synapse inlineCallbacks function). + normal synapse async function). Args: - desc (str): a description for this background process type + desc: a description for this background process type func: a function, which may return a Deferred or a coroutine args: positional args for func kwargs: keyword args for func @@ -187,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs): follow the synapse logcontext rules. """ - @defer.inlineCallbacks - def run(): + async def run(): with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) _background_process_counts[desc] = count + 1 @@ -202,22 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs): try: result = func(*args, **kwargs) - # We probably don't have an ensureDeferred in our call stack to handle - # coroutine results, so we need to ensureDeferred here. - # - # But we need this check because ensureDeferred doesn't like being - # called on immediate values (as opposed to Deferreds or coroutines). - if iscoroutine(result): - result = defer.ensureDeferred(result) + if inspect.isawaitable(result): + result = await result - return (yield result) + return result except Exception: - logger.exception("Background process '%s' threw an exception", desc) + logger.exception( + "Background process '%s' threw an exception", desc, + ) finally: _background_process_in_flight_count.labels(desc).dec() with PreserveLoggingContext(): - return run() + # Note that we return a Deferred here so that it can be used in a + # looping_call and other places that expect a Deferred. + return defer.ensureDeferred(run()) def wrap_as_background_process(desc):