summary refs log tree commit diff
path: root/synapse/metrics
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/metrics')
-rw-r--r--synapse/metrics/__init__.py12
-rw-r--r--synapse/metrics/_exposition.py5
-rw-r--r--synapse/metrics/background_process_metrics.py30
3 files changed, 26 insertions, 21 deletions
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9cf31f96b3..6035672698 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -22,8 +22,6 @@ import threading
 import time
 from typing import Callable, Dict, Iterable, Optional, Tuple, Union
 
-import six
-
 import attr
 from prometheus_client import Counter, Gauge, Histogram
 from prometheus_client.core import (
@@ -83,7 +81,7 @@ class LaterGauge(object):
             return
 
         if isinstance(calls, dict):
-            for k, v in six.iteritems(calls):
+            for k, v in calls.items():
                 g.add_metric(k, v)
         else:
             g.add_metric([], calls)
@@ -194,7 +192,7 @@ class InFlightGauge(object):
             gauge = GaugeMetricFamily(
                 "_".join([self.name, name]), "", labels=self.labels
             )
-            for key, metrics in six.iteritems(metrics_by_key):
+            for key, metrics in metrics_by_key.items():
                 gauge.add_metric(key, getattr(metrics, name))
             yield gauge
 
@@ -465,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
 # finished being processed.
 event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
+event_processing_lag_by_event = Histogram(
+    "synapse_event_processing_lag_by_event",
+    "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
+    ["name"],
+)
+
 # Build info of the running server.
 build_info = Gauge(
     "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index ab7f948ed4..4304c60d56 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler):
             raise
         self.send_response(200)
         self.send_header("Content-Type", CONTENT_TYPE_LATEST)
+        self.send_header("Content-Length", str(len(output)))
         self.end_headers()
         self.wfile.write(output)
 
@@ -261,4 +262,6 @@ class MetricsResource(Resource):
 
     def render_GET(self, request):
         request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
-        return generate_latest(self.registry)
+        response = generate_latest(self.registry)
+        request.setHeader(b"Content-Length", str(len(response)))
+        return response
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index 13785038ad..4cd7932e5b 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -13,9 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import inspect
 import logging
 import threading
-from asyncio import iscoroutine
 from functools import wraps
 from typing import TYPE_CHECKING, Dict, Optional, Set
 
@@ -166,7 +166,7 @@ class _BackgroundProcess(object):
         )
 
 
-def run_as_background_process(desc, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, **kwargs):
     """Run the given function in its own logcontext, with resource metrics
 
     This should be used to wrap processes which are fired off to run in the
@@ -175,10 +175,10 @@ def run_as_background_process(desc, func, *args, **kwargs):
     It returns a Deferred which completes when the function completes, but it doesn't
     follow the synapse logcontext rules, which makes it appropriate for passing to
     clock.looping_call and friends (or for firing-and-forgetting in the middle of a
-    normal synapse inlineCallbacks function).
+    normal synapse async function).
 
     Args:
-        desc (str): a description for this background process type
+        desc: a description for this background process type
         func: a function, which may return a Deferred or a coroutine
         args: positional args for func
         kwargs: keyword args for func
@@ -187,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs):
         follow the synapse logcontext rules.
     """
 
-    @defer.inlineCallbacks
-    def run():
+    async def run():
         with _bg_metrics_lock:
             count = _background_process_counts.get(desc, 0)
             _background_process_counts[desc] = count + 1
@@ -202,22 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs):
             try:
                 result = func(*args, **kwargs)
 
-                # We probably don't have an ensureDeferred in our call stack to handle
-                # coroutine results, so we need to ensureDeferred here.
-                #
-                # But we need this check because ensureDeferred doesn't like being
-                # called on immediate values (as opposed to Deferreds or coroutines).
-                if iscoroutine(result):
-                    result = defer.ensureDeferred(result)
+                if inspect.isawaitable(result):
+                    result = await result
 
-                return (yield result)
+                return result
             except Exception:
-                logger.exception("Background process '%s' threw an exception", desc)
+                logger.exception(
+                    "Background process '%s' threw an exception", desc,
+                )
             finally:
                 _background_process_in_flight_count.labels(desc).dec()
 
     with PreserveLoggingContext():
-        return run()
+        # Note that we return a Deferred here so that it can be used in a
+        # looping_call and other places that expect a Deferred.
+        return defer.ensureDeferred(run())
 
 
 def wrap_as_background_process(desc):