From bd6dc17221741d4ceae05ae769a70696ae939336 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Mon, 15 Jun 2020 07:03:36 -0400 Subject: Replace iteritems/itervalues/iterkeys with native versions. (#7692) --- synapse/metrics/__init__.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) (limited to 'synapse/metrics') diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 9cf31f96b3..087a49d65d 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -22,8 +22,6 @@ import threading import time from typing import Callable, Dict, Iterable, Optional, Tuple, Union -import six - import attr from prometheus_client import Counter, Gauge, Histogram from prometheus_client.core import ( @@ -83,7 +81,7 @@ class LaterGauge(object): return if isinstance(calls, dict): - for k, v in six.iteritems(calls): + for k, v in calls.items(): g.add_metric(k, v) else: g.add_metric([], calls) @@ -194,7 +192,7 @@ class InFlightGauge(object): gauge = GaugeMetricFamily( "_".join([self.name, name]), "", labels=self.labels ) - for key, metrics in six.iteritems(metrics_by_key): + for key, metrics in metrics_by_key.items(): gauge.add_metric(key, getattr(metrics, name)) yield gauge -- cgit 1.5.1 From 8bbe87f42d7736b3f11db0fcfb4557b214e0356d Mon Sep 17 00:00:00 2001 From: Christian Svensson Date: Tue, 23 Jun 2020 19:06:01 +0200 Subject: Set Content-Length for Metrics requests (#7730) HTTP requires the response to contain a Content-Length header unless chunked encoding is being used. Prometheus metrics endpoint did not set this, causing software such as prometheus-proxy to not be able to scrape synapse for metrics. Signed-off-by: Christian Svensson --- changelog.d/7730.bugfix | 1 + synapse/metrics/_exposition.py | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7730.bugfix (limited to 'synapse/metrics') diff --git a/changelog.d/7730.bugfix b/changelog.d/7730.bugfix new file mode 100644 index 0000000000..9da254b56c --- /dev/null +++ b/changelog.d/7730.bugfix @@ -0,0 +1 @@ +Fix missing `Content-Length` on HTTP responses from the metrics handler. diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py index ab7f948ed4..4304c60d56 100644 --- a/synapse/metrics/_exposition.py +++ b/synapse/metrics/_exposition.py @@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler): raise self.send_response(200) self.send_header("Content-Type", CONTENT_TYPE_LATEST) + self.send_header("Content-Length", str(len(output))) self.end_headers() self.wfile.write(output) @@ -261,4 +262,6 @@ class MetricsResource(Resource): def render_GET(self, request): request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii")) - return generate_latest(self.registry) + response = generate_latest(self.registry) + request.setHeader(b"Content-Length", str(len(response))) + return response -- cgit 1.5.1 From a99658074dc3b2b0f6abcb4f98d56bc1386398aa Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 30 Jun 2020 16:58:06 +0100 Subject: Add some metrics for inbound and outbound federation processing times (#7755) --- changelog.d/7755.misc | 1 + synapse/federation/federation_server.py | 37 +++++++++++++++++++-------------- synapse/federation/sender/__init__.py | 10 ++++++++- synapse/handlers/appservice.py | 6 ++++++ synapse/metrics/__init__.py | 6 ++++++ 5 files changed, 43 insertions(+), 17 deletions(-) create mode 100644 changelog.d/7755.misc (limited to 'synapse/metrics') diff --git a/changelog.d/7755.misc b/changelog.d/7755.misc new file mode 100644 index 0000000000..1fc29206ac --- /dev/null +++ b/changelog.d/7755.misc @@ -0,0 +1 @@ +Add some metrics for inbound and outbound federation latencies: `synapse_federation_server_pdu_process_time` and `synapse_event_processing_lag_by_event`. diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index afe0a8238b..e704cf2f44 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -18,7 +18,7 @@ import logging from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union from canonicaljson import json -from prometheus_client import Counter +from prometheus_client import Counter, Histogram from twisted.internet import defer from twisted.internet.abstract import isIPAddress @@ -70,6 +70,10 @@ received_queries_counter = Counter( "synapse_federation_server_received_queries", "", ["type"] ) +pdu_process_time = Histogram( + "synapse_federation_server_pdu_process_time", "Time taken to process an event", +) + class FederationServer(FederationBase): def __init__(self, hs): @@ -271,21 +275,22 @@ class FederationServer(FederationBase): for pdu in pdus_by_room[room_id]: event_id = pdu.event_id - with nested_logging_context(event_id): - try: - await self._handle_received_pdu(origin, pdu) - pdu_results[event_id] = {} - except FederationError as e: - logger.warning("Error handling PDU %s: %s", event_id, e) - pdu_results[event_id] = {"error": str(e)} - except Exception as e: - f = failure.Failure() - pdu_results[event_id] = {"error": str(e)} - logger.error( - "Failed to handle PDU %s", - event_id, - exc_info=(f.type, f.value, f.getTracebackObject()), - ) + with pdu_process_time.time(): + with nested_logging_context(event_id): + try: + await self._handle_received_pdu(origin, pdu) + pdu_results[event_id] = {} + except FederationError as e: + logger.warning("Error handling PDU %s: %s", event_id, e) + pdu_results[event_id] = {"error": str(e)} + except Exception as e: + f = failure.Failure() + pdu_results[event_id] = {"error": str(e)} + logger.error( + "Failed to handle PDU %s", + event_id, + exc_info=(f.type, f.value, f.getTracebackObject()), + ) await concurrently_execute( process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 5b8faea4e7..23fb515683 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -201,7 +201,15 @@ class FederationSender(object): logger.debug("Sending %s to %r", event, destinations) - self._send_pdu(event, destinations) + if destinations: + self._send_pdu(event, destinations) + + now = self.clock.time_msec() + ts = await self.store.get_received_ts(event.event_id) + + synapse.metrics.event_processing_lag_by_event.labels( + "federation_sender" + ).observe(now - ts) async def handle_room_events(events: Iterable[EventBase]) -> None: with Measure(self.clock, "handle_room_events"): diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index ac1b64caff..f7d9fd621e 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -114,6 +114,12 @@ class ApplicationServicesHandler(object): for service in services: self.scheduler.submit_event_for_as(service, event) + now = self.clock.time_msec() + ts = yield self.store.get_received_ts(event.event_id) + synapse.metrics.event_processing_lag_by_event.labels( + "appservice_sender" + ).observe(now - ts) + @defer.inlineCallbacks def handle_room_events(events): for event in events: diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py index 087a49d65d..6035672698 100644 --- a/synapse/metrics/__init__.py +++ b/synapse/metrics/__init__.py @@ -463,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name" # finished being processed. event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"]) +event_processing_lag_by_event = Histogram( + "synapse_event_processing_lag_by_event", + "Time between an event being persisted and it being queued up to be sent to the relevant remote servers", + ["name"], +) + # Build info of the running server. build_info = Gauge( "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"] -- cgit 1.5.1 From 8ca39bd2c39b81cc909796c57fe5bca45fcd358c Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Thu, 9 Jul 2020 13:01:33 +0100 Subject: Improve stacktraces from exceptions in background processes (#7808) use `Failure()` to fish out the real exception. --- changelog.d/7808.misc | 1 + synapse/metrics/background_process_metrics.py | 10 +++++++++- 2 files changed, 10 insertions(+), 1 deletion(-) create mode 100644 changelog.d/7808.misc (limited to 'synapse/metrics') diff --git a/changelog.d/7808.misc b/changelog.d/7808.misc new file mode 100644 index 0000000000..c2acca56ec --- /dev/null +++ b/changelog.d/7808.misc @@ -0,0 +1 @@ +Improve stacktraces from exceptions in background processes. diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 13785038ad..a9269196b3 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -22,6 +22,7 @@ from typing import TYPE_CHECKING, Dict, Optional, Set from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer +from twisted.python.failure import Failure from synapse.logging.context import LoggingContext, PreserveLoggingContext @@ -212,7 +213,14 @@ def run_as_background_process(desc, func, *args, **kwargs): return (yield result) except Exception: - logger.exception("Background process '%s' threw an exception", desc) + # failure.Failure() fishes the original Failure out of our stack, and + # thus gives us a sensible stack trace. + f = Failure() + logger.error( + "Background process '%s' threw an exception", + desc, + exc_info=(f.type, f.value, f.getTracebackObject()), + ) finally: _background_process_in_flight_count.labels(desc).dec() -- cgit 1.5.1 From c36228c40340f521ad52591ac3eab14946db4be2 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 6 Aug 2020 08:20:42 -0400 Subject: Convert run_as_background_process inner function to async. (#8032) --- changelog.d/8032.misc | 1 + synapse/handlers/appservice.py | 2 +- synapse/http/site.py | 5 ++-- synapse/metrics/background_process_metrics.py | 34 ++++++++++----------------- 4 files changed, 16 insertions(+), 26 deletions(-) create mode 100644 changelog.d/8032.misc (limited to 'synapse/metrics') diff --git a/changelog.d/8032.misc b/changelog.d/8032.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8032.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index fbc56c351b..c9044a5019 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -101,7 +101,7 @@ class ApplicationServicesHandler(object): async def start_scheduler(): try: - return self.scheduler.start() + return await self.scheduler.start() except Exception: logger.error("Application Services Failure") diff --git a/synapse/http/site.py b/synapse/http/site.py index 6f3b2258cc..f506152fea 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -146,10 +146,9 @@ class SynapseRequest(Request): Returns a context manager; the correct way to use this is: - @defer.inlineCallbacks - def handle_request(request): + async def handle_request(request): with request.processing("FooServlet"): - yield really_handle_the_request() + await really_handle_the_request() Once the context manager is closed, the completion of the request will be logged, and the various metrics will be updated. diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index a9269196b3..f766d16db6 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -13,16 +13,15 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import logging import threading -from asyncio import iscoroutine from functools import wraps from typing import TYPE_CHECKING, Dict, Optional, Set from prometheus_client.core import REGISTRY, Counter, Gauge from twisted.internet import defer -from twisted.python.failure import Failure from synapse.logging.context import LoggingContext, PreserveLoggingContext @@ -167,7 +166,7 @@ class _BackgroundProcess(object): ) -def run_as_background_process(desc, func, *args, **kwargs): +def run_as_background_process(desc: str, func, *args, **kwargs): """Run the given function in its own logcontext, with resource metrics This should be used to wrap processes which are fired off to run in the @@ -179,7 +178,7 @@ def run_as_background_process(desc, func, *args, **kwargs): normal synapse inlineCallbacks function). Args: - desc (str): a description for this background process type + desc: a description for this background process type func: a function, which may return a Deferred or a coroutine args: positional args for func kwargs: keyword args for func @@ -188,8 +187,7 @@ def run_as_background_process(desc, func, *args, **kwargs): follow the synapse logcontext rules. """ - @defer.inlineCallbacks - def run(): + async def run(): with _bg_metrics_lock: count = _background_process_counts.get(desc, 0) _background_process_counts[desc] = count + 1 @@ -203,29 +201,21 @@ def run_as_background_process(desc, func, *args, **kwargs): try: result = func(*args, **kwargs) - # We probably don't have an ensureDeferred in our call stack to handle - # coroutine results, so we need to ensureDeferred here. - # - # But we need this check because ensureDeferred doesn't like being - # called on immediate values (as opposed to Deferreds or coroutines). - if iscoroutine(result): - result = defer.ensureDeferred(result) + if inspect.isawaitable(result): + result = await result - return (yield result) + return result except Exception: - # failure.Failure() fishes the original Failure out of our stack, and - # thus gives us a sensible stack trace. - f = Failure() - logger.error( - "Background process '%s' threw an exception", - desc, - exc_info=(f.type, f.value, f.getTracebackObject()), + logger.exception( + "Background process '%s' threw an exception", desc, ) finally: _background_process_in_flight_count.labels(desc).dec() with PreserveLoggingContext(): - return run() + # Note that we return a Deferred here so that it can be used in a + # looping_call and other places that expect a Deferred. + return defer.ensureDeferred(run()) def wrap_as_background_process(desc): -- cgit 1.5.1 From d89692ea843f5c67c87ac4c992575015b359c5a0 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Wed, 19 Aug 2020 07:09:24 -0400 Subject: Convert runWithConnection to async. (#8121) --- changelog.d/8121.misc | 1 + synapse/metrics/background_process_metrics.py | 2 +- synapse/storage/database.py | 27 +++++++++++++-------------- 3 files changed, 15 insertions(+), 15 deletions(-) create mode 100644 changelog.d/8121.misc (limited to 'synapse/metrics') diff --git a/changelog.d/8121.misc b/changelog.d/8121.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8121.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index f766d16db6..4cd7932e5b 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -175,7 +175,7 @@ def run_as_background_process(desc: str, func, *args, **kwargs): It returns a Deferred which completes when the function completes, but it doesn't follow the synapse logcontext rules, which makes it appropriate for passing to clock.looping_call and friends (or for firing-and-forgetting in the middle of a - normal synapse inlineCallbacks function). + normal synapse async function). Args: desc: a description for this background process type diff --git a/synapse/storage/database.py b/synapse/storage/database.py index 8a9e06efcf..b9aef96b08 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -516,14 +516,16 @@ class DatabasePool(object): logger.warning("Starting db txn '%s' from sentinel context", desc) try: - result = yield self.runWithConnection( - self.new_transaction, - desc, - after_callbacks, - exception_callbacks, - func, - *args, - **kwargs + result = yield defer.ensureDeferred( + self.runWithConnection( + self.new_transaction, + desc, + after_callbacks, + exception_callbacks, + func, + *args, + **kwargs + ) ) for after_callback, after_args, after_kwargs in after_callbacks: @@ -535,8 +537,7 @@ class DatabasePool(object): return result - @defer.inlineCallbacks - def runWithConnection(self, func: Callable, *args: Any, **kwargs: Any): + async def runWithConnection(self, func: Callable, *args: Any, **kwargs: Any) -> Any: """Wraps the .runWithConnection() method on the underlying db_pool. Arguments: @@ -547,7 +548,7 @@ class DatabasePool(object): kwargs: named args to pass to `func` Returns: - Deferred: The result of func + The result of func """ parent_context = current_context() # type: Optional[LoggingContextOrSentinel] if not parent_context: @@ -570,12 +571,10 @@ class DatabasePool(object): return func(conn, *args, **kwargs) - result = yield make_deferred_yieldable( + return await make_deferred_yieldable( self._db_pool.runWithConnection(inner_func, *args, **kwargs) ) - return result - @staticmethod def cursor_to_dict(cursor): """Converts a SQL cursor into an list of dicts. -- cgit 1.5.1