From 0ee9076ffe40140db5e86763af077efbdb5d86b0 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Wed, 3 Jul 2019 04:01:28 +1000 Subject: Fix media repo breaking (#5593) --- synapse/util/logcontext.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) (limited to 'synapse/util') diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 6b0d2deea0..9e1b537804 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -24,6 +24,7 @@ See doc/log_contexts.rst for details on how this works. import logging import threading +import types from twisted.internet import defer, threads @@ -528,8 +529,9 @@ def run_in_background(f, *args, **kwargs): return from the function, and that the sentinel context is set once the deferred returned by the function completes. - Useful for wrapping functions that return a deferred which you don't yield - on (for instance because you want to pass it to deferred.gatherResults()). + Useful for wrapping functions that return a deferred or coroutine, which you don't + yield or await on (for instance because you want to pass it to + deferred.gatherResults()). Note that if you completely discard the result, you should make sure that `f` doesn't raise any deferred exceptions, otherwise a scary-looking @@ -544,6 +546,9 @@ def run_in_background(f, *args, **kwargs): # by synchronous exceptions, so let's turn them into Failures. return defer.fail() + if isinstance(res, types.CoroutineType): + res = defer.ensureDeferred(res) + if not isinstance(res, defer.Deferred): return res -- cgit 1.4.1 From 91753cae5962a56ce0440b06891f1040ba7582e9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 3 Jul 2019 09:31:27 +0100 Subject: Fix a number of "Starting txn from sentinel context" warnings (#5605) Fixes #5602, #5603 --- changelog.d/5605.bugfix | 1 + synapse/handlers/account_validity.py | 10 +++++++++- synapse/storage/events.py | 9 ++++++++- synapse/storage/registration.py | 13 ++++++++++--- synapse/util/__init__.py | 8 +++++++- 5 files changed, 35 insertions(+), 6 deletions(-) create mode 100644 changelog.d/5605.bugfix (limited to 'synapse/util') diff --git a/changelog.d/5605.bugfix b/changelog.d/5605.bugfix new file mode 100644 index 0000000000..4995ba9f68 --- /dev/null +++ b/changelog.d/5605.bugfix @@ -0,0 +1 @@ +Fix a number of "Starting txn from sentinel context" warnings. diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index 0719da3ab7..edb48054a0 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -22,6 +22,7 @@ from email.mime.text import MIMEText from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID from synapse.util import stringutils from synapse.util.logcontext import make_deferred_yieldable @@ -67,7 +68,14 @@ class AccountValidityHandler(object): ) # Check the renewal emails to send and send them every 30min. - self.clock.looping_call(self.send_renewal_emails, 30 * 60 * 1000) + def send_emails(): + # run as a background process to make sure that the database transactions + # have a logcontext to report to + return run_as_background_process( + "send_renewals", self.send_renewal_emails + ) + + self.clock.looping_call(send_emails, 30 * 60 * 1000) @defer.inlineCallbacks def send_renewal_emails(self): diff --git a/synapse/storage/events.py b/synapse/storage/events.py index fefba39ea1..86f8485704 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -253,7 +253,14 @@ class EventsStore( ) # Read the extrems every 60 minutes - hs.get_clock().looping_call(self._read_forward_extremities, 60 * 60 * 1000) + def read_forward_extremities(): + # run as a background process to make sure that the database transactions + # have a logcontext to report to + return run_as_background_process( + "read_forward_extremities", self._read_forward_extremities + ) + + hs.get_clock().looping_call(read_forward_extremities, 60 * 60 * 1000) @defer.inlineCallbacks def _read_forward_extremities(self): diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 983ce13291..13a3d5208b 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -25,6 +25,7 @@ from twisted.internet import defer from synapse.api.constants import UserTypes from synapse.api.errors import Codes, StoreError, ThreepidValidationError +from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage import background_updates from synapse.storage._base import SQLBaseStore from synapse.types import UserID @@ -619,9 +620,15 @@ class RegistrationStore( ) # Create a background job for culling expired 3PID validity tokens - hs.get_clock().looping_call( - self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS - ) + def start_cull(): + # run as a background process to make sure that the database transactions + # have a logcontext to report to + return run_as_background_process( + "cull_expired_threepid_validation_tokens", + self.cull_expired_threepid_validation_tokens, + ) + + hs.get_clock().looping_call(start_cull, THIRTY_MINUTES_IN_MS) @defer.inlineCallbacks def _backgroud_update_set_deactivated_flag(self, progress, batch_size): diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index dcc747cac1..954e32fb2a 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -62,7 +62,10 @@ class Clock(object): def looping_call(self, f, msec): """Call a function repeatedly. - Waits `msec` initially before calling `f` for the first time. + Waits `msec` initially before calling `f` for the first time. + + Note that the function will be called with no logcontext, so if it is anything + other than trivial, you probably want to wrap it in run_as_background_process. Args: f(function): The function to call repeatedly. @@ -77,6 +80,9 @@ class Clock(object): def call_later(self, delay, callback, *args, **kwargs): """Call something later + Note that the function will be called with no logcontext, so if it is anything + other than trivial, you probably want to wrap it in run_as_background_process. + Args: delay(float): How long to wait in seconds. callback(function): Function to call -- cgit 1.4.1 From cb8d568cf90bb0f5f07ee9e7e6796ad7cd83361f Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Wed, 3 Jul 2019 13:40:45 +0100 Subject: Fix 'utime went backwards' errors on daemonization. (#5609) * Fix 'utime went backwards' errors on daemonization. Fixes #5608 * remove spurious debug --- changelog.d/5609.bugfix | 1 + synapse/app/_base.py | 57 ++++++++++++++++++++++++---------------------- synapse/util/logcontext.py | 17 ++++++++++---- 3 files changed, 44 insertions(+), 31 deletions(-) create mode 100644 changelog.d/5609.bugfix (limited to 'synapse/util') diff --git a/changelog.d/5609.bugfix b/changelog.d/5609.bugfix new file mode 100644 index 0000000000..534ee22a1b --- /dev/null +++ b/changelog.d/5609.bugfix @@ -0,0 +1 @@ +Fix 'utime went backwards' errors on daemonization. diff --git a/synapse/app/_base.py b/synapse/app/_base.py index d50a9840d4..9b5c30f506 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -93,33 +93,36 @@ def start_reactor( install_dns_limiter(reactor) def run(): - # make sure that we run the reactor with the sentinel log context, - # otherwise other PreserveLoggingContext instances will get confused - # and complain when they see the logcontext arbitrarily swapping - # between the sentinel and `run` logcontexts. - with PreserveLoggingContext(): - logger.info("Running") - - change_resource_limit(soft_file_limit) - if gc_thresholds: - gc.set_threshold(*gc_thresholds) - reactor.run() - - if daemonize: - if print_pidfile: - print(pid_file) - - daemon = Daemonize( - app=appname, - pid=pid_file, - action=run, - auto_close_fds=False, - verbose=True, - logger=logger, - ) - daemon.start() - else: - run() + logger.info("Running") + change_resource_limit(soft_file_limit) + if gc_thresholds: + gc.set_threshold(*gc_thresholds) + reactor.run() + + # make sure that we run the reactor with the sentinel log context, + # otherwise other PreserveLoggingContext instances will get confused + # and complain when they see the logcontext arbitrarily swapping + # between the sentinel and `run` logcontexts. + # + # We also need to drop the logcontext before forking if we're daemonizing, + # otherwise the cputime metrics get confused about the per-thread resource usage + # appearing to go backwards. + with PreserveLoggingContext(): + if daemonize: + if print_pidfile: + print(pid_file) + + daemon = Daemonize( + app=appname, + pid=pid_file, + action=run, + auto_close_fds=False, + verbose=True, + logger=logger, + ) + daemon.start() + else: + run() def quit_with_error(error_string): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py index 9e1b537804..30dfa1d6b2 100644 --- a/synapse/util/logcontext.py +++ b/synapse/util/logcontext.py @@ -52,6 +52,15 @@ except Exception: return None +# get an id for the current thread. +# +# threading.get_ident doesn't actually return an OS-level tid, and annoyingly, +# on Linux it actually returns the same value either side of a fork() call. However +# we only fork in one place, so it's not worth the hoop-jumping to get a real tid. +# +get_thread_id = threading.get_ident + + class ContextResourceUsage(object): """Object for tracking the resources used by a log context @@ -225,7 +234,7 @@ class LoggingContext(object): # became active. self.usage_start = None - self.main_thread = threading.current_thread() + self.main_thread = get_thread_id() self.request = None self.tag = "" self.alive = True @@ -318,7 +327,7 @@ class LoggingContext(object): record.request = self.request def start(self): - if threading.current_thread() is not self.main_thread: + if get_thread_id() != self.main_thread: logger.warning("Started logcontext %s on different thread", self) return @@ -328,7 +337,7 @@ class LoggingContext(object): self.usage_start = get_thread_resource_usage() def stop(self): - if threading.current_thread() is not self.main_thread: + if get_thread_id() != self.main_thread: logger.warning("Stopped logcontext %s on different thread", self) return @@ -355,7 +364,7 @@ class LoggingContext(object): # If we are on the correct thread and we're currently running then we # can include resource usage so far. - is_main_thread = threading.current_thread() is self.main_thread + is_main_thread = get_thread_id() == self.main_thread if self.alive and self.usage_start and is_main_thread: utime_delta, stime_delta = self._get_cputime() res.ru_utime += utime_delta -- cgit 1.4.1 From 463b072b1290a8cb75bc1d7b6688fa76bbfdb14f Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Thu, 4 Jul 2019 00:07:04 +1000 Subject: Move logging utilities out of the side drawer of util/ and into logging/ (#5606) --- changelog.d/5606.misc | 1 + contrib/example_log_config.yaml | 8 +- contrib/experiments/test_messaging.py | 2 +- contrib/systemd/log_config.yaml | 2 +- debian/changelog | 3 + debian/log.yaml | 2 +- docker/conf/log.config | 2 +- docs/log_contexts.rst | 38 +- synapse/app/_base.py | 2 +- synapse/app/appservice.py | 2 +- synapse/app/client_reader.py | 2 +- synapse/app/event_creator.py | 2 +- synapse/app/federation_reader.py | 2 +- synapse/app/federation_sender.py | 2 +- synapse/app/frontend_proxy.py | 2 +- synapse/app/homeserver.py | 2 +- synapse/app/media_repository.py | 2 +- synapse/app/pusher.py | 2 +- synapse/app/synchrotron.py | 2 +- synapse/app/user_dir.py | 2 +- synapse/appservice/scheduler.py | 2 +- synapse/config/logger.py | 4 +- synapse/crypto/keyring.py | 15 +- synapse/events/snapshot.py | 2 +- synapse/federation/federation_base.py | 24 +- synapse/federation/federation_client.py | 8 +- synapse/federation/federation_server.py | 4 +- synapse/federation/persistence.py | 2 +- synapse/federation/sender/__init__.py | 12 +- synapse/federation/transport/client.py | 2 +- synapse/federation/transport/server.py | 2 +- synapse/groups/attestations.py | 2 +- synapse/handlers/account_validity.py | 2 +- synapse/handlers/appservice.py | 2 +- synapse/handlers/auth.py | 6 +- synapse/handlers/e2e_keys.py | 2 +- synapse/handlers/events.py | 2 +- synapse/handlers/federation.py | 43 +- synapse/handlers/initial_sync.py | 2 +- synapse/handlers/message.py | 2 +- synapse/handlers/pagination.py | 2 +- synapse/handlers/presence.py | 4 +- synapse/handlers/sync.py | 2 +- synapse/handlers/typing.py | 2 +- synapse/http/client.py | 2 +- synapse/http/federation/matrix_federation_agent.py | 2 +- synapse/http/federation/srv_resolver.py | 2 +- synapse/http/matrixfederationclient.py | 2 +- synapse/http/request_metrics.py | 2 +- synapse/http/server.py | 2 +- synapse/http/site.py | 2 +- synapse/logging/__init__.py | 0 synapse/logging/context.py | 693 +++++++++++++++++++++ synapse/logging/formatter.py | 53 ++ synapse/logging/utils.py | 194 ++++++ synapse/metrics/background_process_metrics.py | 2 +- synapse/notifier.py | 4 +- synapse/push/mailer.py | 2 +- synapse/replication/tcp/protocol.py | 2 +- synapse/rest/client/transactions.py | 2 +- synapse/rest/media/v1/_base.py | 6 +- synapse/rest/media/v1/media_repository.py | 12 +- synapse/rest/media/v1/media_storage.py | 5 +- synapse/rest/media/v1/preview_url_resource.py | 2 +- synapse/rest/media/v1/storage_provider.py | 5 +- synapse/state/__init__.py | 2 +- synapse/storage/_base.py | 2 +- synapse/storage/events.py | 4 +- synapse/storage/events_worker.py | 6 +- synapse/storage/stream.py | 2 +- synapse/util/__init__.py | 12 +- synapse/util/async_helpers.py | 9 +- synapse/util/caches/descriptors.py | 11 +- synapse/util/caches/response_cache.py | 4 +- synapse/util/distributor.py | 2 +- synapse/util/file_consumer.py | 2 +- synapse/util/logcontext.py | 693 --------------------- synapse/util/logformatter.py | 53 -- synapse/util/logutils.py | 194 ------ synapse/util/metrics.py | 2 +- synapse/util/ratelimitutils.py | 2 +- synapse/util/retryutils.py | 4 +- tests/appservice/test_scheduler.py | 2 +- tests/crypto/test_keyring.py | 13 +- .../federation/test_matrix_federation_agent.py | 2 +- tests/http/federation/test_srv_resolver.py | 2 +- tests/http/test_fedclient.py | 2 +- tests/patch_inline_callbacks.py | 2 +- tests/push/test_http.py | 2 +- tests/rest/client/test_transactions.py | 2 +- tests/rest/media/v1/test_media_storage.py | 2 +- tests/test_federation.py | 2 +- tests/test_server.py | 2 +- tests/test_utils/logging_setup.py | 2 +- tests/unittest.py | 2 +- tests/util/caches/test_descriptors.py | 49 +- tests/util/test_async_utils.py | 7 +- tests/util/test_linearizer.py | 9 +- tests/util/test_logcontext.py | 24 +- tests/util/test_logformatter.py | 2 +- tests/utils.py | 2 +- 101 files changed, 1189 insertions(+), 1173 deletions(-) create mode 100644 changelog.d/5606.misc create mode 100644 synapse/logging/__init__.py create mode 100644 synapse/logging/context.py create mode 100644 synapse/logging/formatter.py create mode 100644 synapse/logging/utils.py delete mode 100644 synapse/util/logcontext.py delete mode 100644 synapse/util/logformatter.py delete mode 100644 synapse/util/logutils.py (limited to 'synapse/util') diff --git a/changelog.d/5606.misc b/changelog.d/5606.misc new file mode 100644 index 0000000000..bb3c028167 --- /dev/null +++ b/changelog.d/5606.misc @@ -0,0 +1 @@ +Move logging code out of `synapse.util` and into `synapse.logging`. diff --git a/contrib/example_log_config.yaml b/contrib/example_log_config.yaml index 06592963da..3a76a7a33f 100644 --- a/contrib/example_log_config.yaml +++ b/contrib/example_log_config.yaml @@ -1,7 +1,7 @@ -# Example log_config file for synapse. To enable, point `log_config` to it in +# Example log_config file for synapse. To enable, point `log_config` to it in # `homeserver.yaml`, and restart synapse. # -# This configuration will produce similar results to the defaults within +# This configuration will produce similar results to the defaults within # synapse, but can be edited to give more flexibility. version: 1 @@ -12,7 +12,7 @@ formatters: filters: context: - (): synapse.util.logcontext.LoggingContextFilter + (): synapse.logging.context.LoggingContextFilter request: "" handlers: @@ -35,7 +35,7 @@ handlers: root: level: INFO handlers: [console] # to use file handler instead, switch to [file] - + loggers: synapse: level: INFO diff --git a/contrib/experiments/test_messaging.py b/contrib/experiments/test_messaging.py index c7e55d8aa7..5ef140ae48 100644 --- a/contrib/experiments/test_messaging.py +++ b/contrib/experiments/test_messaging.py @@ -36,7 +36,7 @@ from synapse.util import origin_from_ucid from synapse.app.homeserver import SynapseHomeServer -# from synapse.util.logutils import log_function +# from synapse.logging.utils import log_function from twisted.internet import reactor, defer from twisted.python import log diff --git a/contrib/systemd/log_config.yaml b/contrib/systemd/log_config.yaml index d85bdd1208..22f67a50ce 100644 --- a/contrib/systemd/log_config.yaml +++ b/contrib/systemd/log_config.yaml @@ -8,7 +8,7 @@ formatters: filters: context: - (): synapse.util.logcontext.LoggingContextFilter + (): synapse.logging.context.LoggingContextFilter request: "" handlers: diff --git a/debian/changelog b/debian/changelog index 91653e7243..bf5a59b335 100644 --- a/debian/changelog +++ b/debian/changelog @@ -3,6 +3,9 @@ matrix-synapse-py3 (1.0.0+nmu1) UNRELEASED; urgency=medium [ Silke Hofstra ] * Include systemd-python to allow logging to the systemd journal. + [ Amber Brown ] + * Update logging config defaults to match API changes in Synapse. + -- Silke Hofstra Wed, 29 May 2019 09:45:29 +0200 matrix-synapse-py3 (1.0.0) stable; urgency=medium diff --git a/debian/log.yaml b/debian/log.yaml index 206b65f1ac..95b655dd35 100644 --- a/debian/log.yaml +++ b/debian/log.yaml @@ -7,7 +7,7 @@ formatters: filters: context: - (): synapse.util.logcontext.LoggingContextFilter + (): synapse.logging.context.LoggingContextFilter request: "" handlers: diff --git a/docker/conf/log.config b/docker/conf/log.config index 895e45d20b..ea5ccfd68b 100644 --- a/docker/conf/log.config +++ b/docker/conf/log.config @@ -6,7 +6,7 @@ formatters: filters: context: - (): synapse.util.logcontext.LoggingContextFilter + (): synapse.logging.context.LoggingContextFilter request: "" handlers: diff --git a/docs/log_contexts.rst b/docs/log_contexts.rst index 27cde11cf7..f5cd5de8ab 100644 --- a/docs/log_contexts.rst +++ b/docs/log_contexts.rst @@ -1,4 +1,4 @@ -Log contexts +Log Contexts ============ .. contents:: @@ -12,7 +12,7 @@ record. Logcontexts are also used for CPU and database accounting, so that we can track which requests were responsible for high CPU use or database activity. -The ``synapse.util.logcontext`` module provides a facilities for managing the +The ``synapse.logging.context`` module provides a facilities for managing the current log context (as well as providing the ``LoggingContextFilter`` class). Deferreds make the whole thing complicated, so this document describes how it @@ -27,19 +27,19 @@ found them: .. code:: python - from synapse.util import logcontext # omitted from future snippets + from synapse.logging import context # omitted from future snippets def handle_request(request_id): - request_context = logcontext.LoggingContext() + request_context = context.LoggingContext() - calling_context = logcontext.LoggingContext.current_context() - logcontext.LoggingContext.set_current_context(request_context) + calling_context = context.LoggingContext.current_context() + context.LoggingContext.set_current_context(request_context) try: request_context.request = request_id do_request_handling() logger.debug("finished") finally: - logcontext.LoggingContext.set_current_context(calling_context) + context.LoggingContext.set_current_context(calling_context) def do_request_handling(): logger.debug("phew") # this will be logged against request_id @@ -51,7 +51,7 @@ written much more succinctly as: .. code:: python def handle_request(request_id): - with logcontext.LoggingContext() as request_context: + with context.LoggingContext() as request_context: request_context.request = request_id do_request_handling() logger.debug("finished") @@ -74,7 +74,7 @@ blocking operation, and returns a deferred: @defer.inlineCallbacks def handle_request(request_id): - with logcontext.LoggingContext() as request_context: + with context.LoggingContext() as request_context: request_context.request = request_id yield do_request_handling() logger.debug("finished") @@ -179,7 +179,7 @@ though, we need to make up a new Deferred, or we get a Deferred back from external code. We need to make it follow our rules. The easy way to do it is with a combination of ``defer.inlineCallbacks``, and -``logcontext.PreserveLoggingContext``. Suppose we want to implement ``sleep``, +``context.PreserveLoggingContext``. Suppose we want to implement ``sleep``, which returns a deferred which will run its callbacks after a given number of seconds. That might look like: @@ -204,13 +204,13 @@ That doesn't follow the rules, but we can fix it by wrapping it with This technique works equally for external functions which return deferreds, or deferreds we have made ourselves. -You can also use ``logcontext.make_deferred_yieldable``, which just does the +You can also use ``context.make_deferred_yieldable``, which just does the boilerplate for you, so the above could be written: .. code:: python def sleep(seconds): - return logcontext.make_deferred_yieldable(get_sleep_deferred(seconds)) + return context.make_deferred_yieldable(get_sleep_deferred(seconds)) Fire-and-forget @@ -279,7 +279,7 @@ Obviously that option means that the operations done in that might be fixed by setting a different logcontext via a ``with LoggingContext(...)`` in ``background_operation``). -The second option is to use ``logcontext.run_in_background``, which wraps a +The second option is to use ``context.run_in_background``, which wraps a function so that it doesn't reset the logcontext even when it returns an incomplete deferred, and adds a callback to the returned deferred to reset the logcontext. In other words, it turns a function that follows the Synapse rules @@ -293,7 +293,7 @@ It can be used like this: def do_request_handling(): yield foreground_operation() - logcontext.run_in_background(background_operation) + context.run_in_background(background_operation) # this will now be logged against the request context logger.debug("Request handling complete") @@ -332,7 +332,7 @@ gathered: result = yield defer.gatherResults([d1, d2]) In this case particularly, though, option two, of using -``logcontext.preserve_fn`` almost certainly makes more sense, so that +``context.preserve_fn`` almost certainly makes more sense, so that ``operation1`` and ``operation2`` are both logged against the original logcontext. This looks like: @@ -340,8 +340,8 @@ logcontext. This looks like: @defer.inlineCallbacks def do_request_handling(): - d1 = logcontext.preserve_fn(operation1)() - d2 = logcontext.preserve_fn(operation2)() + d1 = context.preserve_fn(operation1)() + d2 = context.preserve_fn(operation2)() with PreserveLoggingContext(): result = yield defer.gatherResults([d1, d2]) @@ -381,7 +381,7 @@ off the background process, and then leave the ``with`` block to wait for it: .. code:: python def handle_request(request_id): - with logcontext.LoggingContext() as request_context: + with context.LoggingContext() as request_context: request_context.request = request_id d = do_request_handling() @@ -414,7 +414,7 @@ runs its callbacks in the original logcontext, all is happy. The business of a Deferred which runs its callbacks in the original logcontext isn't hard to achieve — we have it today, in the shape of -``logcontext._PreservingContextDeferred``: +``context._PreservingContextDeferred``: .. code:: python diff --git a/synapse/app/_base.py b/synapse/app/_base.py index 9b5c30f506..1ebb7ae539 100644 --- a/synapse/app/_base.py +++ b/synapse/app/_base.py @@ -27,7 +27,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory import synapse from synapse.app import check_bind_error from synapse.crypto import context_factory -from synapse.util import PreserveLoggingContext +from synapse.logging.context import PreserveLoggingContext from synapse.util.async_helpers import Linearizer from synapse.util.rlimit import change_resource_limit from synapse.util.versionstring import get_version_string diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py index 9120bdb143..be44249ed6 100644 --- a/synapse/app/appservice.py +++ b/synapse/app/appservice.py @@ -26,6 +26,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext, run_in_background from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage.appservice import SlavedApplicationServiceStore @@ -36,7 +37,6 @@ from synapse.replication.tcp.client import ReplicationClientHandler from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py index 90bc79cdda..ff11beca82 100644 --- a/synapse/app/client_reader.py +++ b/synapse/app/client_reader.py @@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore @@ -64,7 +65,6 @@ from synapse.rest.client.versions import VersionsRestServlet from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py index ff522e4499..cacad25eac 100644 --- a/synapse/app/event_creator.py +++ b/synapse/app/event_creator.py @@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore @@ -59,7 +60,6 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py index 9421420930..11e80dbae0 100644 --- a/synapse/app/federation_reader.py +++ b/synapse/app/federation_reader.py @@ -28,6 +28,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.federation.transport.server import TransportLayerServer from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore @@ -48,7 +49,6 @@ from synapse.rest.key.v2 import KeyApiV2Resource from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py index 969be58d0b..97da7bdcbf 100644 --- a/synapse/app/federation_sender.py +++ b/synapse/app/federation_sender.py @@ -27,6 +27,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.federation import send_queue from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext, run_in_background from synapse.metrics import RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource @@ -44,7 +45,6 @@ from synapse.storage.engines import create_engine from synapse.types import ReadReceipt from synapse.util.async_helpers import Linearizer from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py index 2fd7d57ebf..417a10bbd2 100644 --- a/synapse/app/frontend_proxy.py +++ b/synapse/app/frontend_proxy.py @@ -29,6 +29,7 @@ from synapse.config.logger import setup_logging from synapse.http.server import JsonResource from synapse.http.servlet import RestServlet, parse_json_object_from_request from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore @@ -41,7 +42,6 @@ from synapse.rest.client.v2_alpha._base import client_patterns from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 49da105cf6..639b1429c0 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -54,6 +54,7 @@ from synapse.federation.transport.server import TransportLayerServer from synapse.http.additional_resource import AdditionalResource from synapse.http.server import RootRedirect from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext from synapse.metrics import RegistryProxy from synapse.metrics.background_process_metrics import run_as_background_process from synapse.metrics.resource import METRICS_PREFIX, MetricsResource @@ -72,7 +73,6 @@ from synapse.storage.engines import IncorrectDatabaseSetup, create_engine from synapse.storage.prepare_database import UpgradeDatabaseException, prepare_database from synapse.util.caches import CACHE_SIZE_FACTOR from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.module_loader import load_module from synapse.util.rlimit import change_resource_limit diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py index cf0e2036c3..f23b9b6eda 100644 --- a/synapse/app/media_repository.py +++ b/synapse/app/media_repository.py @@ -27,6 +27,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore @@ -40,7 +41,6 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.media_repository import MediaRepositoryStore from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index df29ea5ecb..4f929edf86 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -26,6 +26,7 @@ from synapse.config._base import ConfigError from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext, run_in_background from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import __func__ @@ -38,7 +39,6 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py index 858949910d..de4797fddc 100644 --- a/synapse/app/synchrotron.py +++ b/synapse/app/synchrotron.py @@ -31,6 +31,7 @@ from synapse.config.logger import setup_logging from synapse.handlers.presence import PresenceHandler, get_interested_parties from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext, run_in_background from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore, __func__ @@ -57,7 +58,6 @@ from synapse.server import HomeServer from synapse.storage.engines import create_engine from synapse.storage.presence import UserPresenceState from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.stringutils import random_string from synapse.util.versionstring import get_version_string diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py index 2d9d2e1bbc..1177ddd72e 100644 --- a/synapse/app/user_dir.py +++ b/synapse/app/user_dir.py @@ -28,6 +28,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.logger import setup_logging from synapse.http.server import JsonResource from synapse.http.site import SynapseSite +from synapse.logging.context import LoggingContext, run_in_background from synapse.metrics import RegistryProxy from synapse.metrics.resource import METRICS_PREFIX, MetricsResource from synapse.replication.slave.storage._base import BaseSlavedStore @@ -46,7 +47,6 @@ from synapse.storage.engines import create_engine from synapse.storage.user_directory import UserDirectoryStore from synapse.util.caches.stream_change_cache import StreamChangeCache from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string diff --git a/synapse/appservice/scheduler.py b/synapse/appservice/scheduler.py index b54bf5411f..e5b36494f5 100644 --- a/synapse/appservice/scheduler.py +++ b/synapse/appservice/scheduler.py @@ -53,8 +53,8 @@ import logging from twisted.internet import defer from synapse.appservice import ApplicationServiceState +from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.logcontext import run_in_background logger = logging.getLogger(__name__) diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 931aec41c0..0f5554211c 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -24,7 +24,7 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner import synapse from synapse.app import _base as appbase -from synapse.util.logcontext import LoggingContextFilter +from synapse.logging.context import LoggingContextFilter from synapse.util.versionstring import get_version_string from ._base import Config @@ -40,7 +40,7 @@ formatters: filters: context: - (): synapse.util.logcontext.LoggingContextFilter + (): synapse.logging.context.LoggingContextFilter request: "" handlers: diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py index 10c2eb7f0f..341c863152 100644 --- a/synapse/crypto/keyring.py +++ b/synapse/crypto/keyring.py @@ -44,15 +44,16 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) -from synapse.storage.keys import FetchKeyResult -from synapse.util import logcontext, unwrapFirstError -from synapse.util.async_helpers import yieldable_gather_results -from synapse.util.logcontext import ( +from synapse.logging.context import ( LoggingContext, PreserveLoggingContext, + make_deferred_yieldable, preserve_fn, run_in_background, ) +from synapse.storage.keys import FetchKeyResult +from synapse.util import unwrapFirstError +from synapse.util.async_helpers import yieldable_gather_results from synapse.util.metrics import Measure from synapse.util.retryutils import NotRetryingDestination @@ -140,7 +141,7 @@ class Keyring(object): """ req = VerifyJsonRequest(server_name, json_object, validity_time, request_name) requests = (req,) - return logcontext.make_deferred_yieldable(self._verify_objects(requests)[0]) + return make_deferred_yieldable(self._verify_objects(requests)[0]) def verify_json_objects_for_server(self, server_and_json): """Bulk verifies signatures of json objects, bulk fetching keys as @@ -557,7 +558,7 @@ class BaseV2KeyFetcher(object): signed_key_json_bytes = encode_canonical_json(signed_key_json) - yield logcontext.make_deferred_yieldable( + yield make_deferred_yieldable( defer.gatherResults( [ run_in_background( @@ -612,7 +613,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher): defer.returnValue({}) - results = yield logcontext.make_deferred_yieldable( + results = yield make_deferred_yieldable( defer.gatherResults( [run_in_background(get_key, server) for server in self.key_servers], consumeErrors=True, diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index a96cdada3d..a9545e6c1b 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -19,7 +19,7 @@ from frozendict import frozendict from twisted.internet import defer -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.logging.context import make_deferred_yieldable, run_in_background class EventContext(object): diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py index 1e925b19e7..f7bb806ae7 100644 --- a/synapse/federation/federation_base.py +++ b/synapse/federation/federation_base.py @@ -27,8 +27,14 @@ from synapse.crypto.event_signing import check_event_content_hash from synapse.events import event_type_from_format_version from synapse.events.utils import prune_event from synapse.http.servlet import assert_params_in_dict +from synapse.logging.context import ( + LoggingContext, + PreserveLoggingContext, + make_deferred_yieldable, + preserve_fn, +) from synapse.types import get_domain_from_id -from synapse.util import logcontext, unwrapFirstError +from synapse.util import unwrapFirstError logger = logging.getLogger(__name__) @@ -73,7 +79,7 @@ class FederationBase(object): @defer.inlineCallbacks def handle_check_result(pdu, deferred): try: - res = yield logcontext.make_deferred_yieldable(deferred) + res = yield make_deferred_yieldable(deferred) except SynapseError: res = None @@ -102,10 +108,10 @@ class FederationBase(object): defer.returnValue(res) - handle = logcontext.preserve_fn(handle_check_result) + handle = preserve_fn(handle_check_result) deferreds2 = [handle(pdu, deferred) for pdu, deferred in zip(pdus, deferreds)] - valid_pdus = yield logcontext.make_deferred_yieldable( + valid_pdus = yield make_deferred_yieldable( defer.gatherResults(deferreds2, consumeErrors=True) ).addErrback(unwrapFirstError) @@ -115,7 +121,7 @@ class FederationBase(object): defer.returnValue([p for p in valid_pdus if p]) def _check_sigs_and_hash(self, room_version, pdu): - return logcontext.make_deferred_yieldable( + return make_deferred_yieldable( self._check_sigs_and_hashes(room_version, [pdu])[0] ) @@ -133,14 +139,14 @@ class FederationBase(object): * returns a redacted version of the event (if the signature matched but the hash did not) * throws a SynapseError if the signature check failed. - The deferreds run their callbacks in the sentinel logcontext. + The deferreds run their callbacks in the sentinel """ deferreds = _check_sigs_on_pdus(self.keyring, room_version, pdus) - ctx = logcontext.LoggingContext.current_context() + ctx = LoggingContext.current_context() def callback(_, pdu): - with logcontext.PreserveLoggingContext(ctx): + with PreserveLoggingContext(ctx): if not check_event_content_hash(pdu): # let's try to distinguish between failures because the event was # redacted (which are somewhat expected) vs actual ball-tampering @@ -178,7 +184,7 @@ class FederationBase(object): def errback(failure, pdu): failure.trap(SynapseError) - with logcontext.PreserveLoggingContext(ctx): + with PreserveLoggingContext(ctx): logger.warn( "Signature check failed for %s: %s", pdu.event_id, diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py index 3883eb525e..3cb4b94420 100644 --- a/synapse/federation/federation_client.py +++ b/synapse/federation/federation_client.py @@ -39,10 +39,10 @@ from synapse.api.room_versions import ( ) from synapse.events import builder, room_version_to_event_format from synapse.federation.federation_base import FederationBase, event_from_pdu_json -from synapse.util import logcontext, unwrapFirstError +from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.utils import log_function +from synapse.util import unwrapFirstError from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, run_in_background -from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) @@ -207,7 +207,7 @@ class FederationClient(FederationBase): ] # FIXME: We should handle signature failures more gracefully. - pdus[:] = yield logcontext.make_deferred_yieldable( + pdus[:] = yield make_deferred_yieldable( defer.gatherResults( self._check_sigs_and_hashes(room_version, pdus), consumeErrors=True ).addErrback(unwrapFirstError) diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py index 2e0cebb638..8c0a18b120 100644 --- a/synapse/federation/federation_server.py +++ b/synapse/federation/federation_server.py @@ -42,6 +42,8 @@ from synapse.federation.federation_base import FederationBase, event_from_pdu_js from synapse.federation.persistence import TransactionActions from synapse.federation.units import Edu, Transaction from synapse.http.endpoint import parse_server_name +from synapse.logging.context import nested_logging_context +from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationFederationSendEduRestServlet, ReplicationGetQueryRestServlet, @@ -50,8 +52,6 @@ from synapse.types import get_domain_from_id from synapse.util import glob_to_regex from synapse.util.async_helpers import Linearizer, concurrently_execute from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import nested_logging_context -from synapse.util.logutils import log_function # when processing incoming transactions, we try to handle multiple rooms in # parallel, up to this limit. diff --git a/synapse/federation/persistence.py b/synapse/federation/persistence.py index 7535f79203..d086e04243 100644 --- a/synapse/federation/persistence.py +++ b/synapse/federation/persistence.py @@ -23,7 +23,7 @@ import logging from twisted.internet import defer -from synapse.util.logutils import log_function +from synapse.logging.utils import log_function logger = logging.getLogger(__name__) diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py index 766c5a37cd..d46f4aaeb1 100644 --- a/synapse/federation/sender/__init__.py +++ b/synapse/federation/sender/__init__.py @@ -26,6 +26,11 @@ from synapse.federation.sender.per_destination_queue import PerDestinationQueue from synapse.federation.sender.transaction_manager import TransactionManager from synapse.federation.units import Edu from synapse.handlers.presence import get_interested_remotes +from synapse.logging.context import ( + make_deferred_yieldable, + preserve_fn, + run_in_background, +) from synapse.metrics import ( LaterGauge, event_processing_loop_counter, @@ -33,7 +38,6 @@ from synapse.metrics import ( events_processed_counter, ) from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util import logcontext from synapse.util.metrics import measure_func logger = logging.getLogger(__name__) @@ -210,10 +214,10 @@ class FederationSender(object): for event in events: events_by_room.setdefault(event.room_id, []).append(event) - yield logcontext.make_deferred_yieldable( + yield make_deferred_yieldable( defer.gatherResults( [ - logcontext.run_in_background(handle_room_events, evs) + run_in_background(handle_room_events, evs) for evs in itervalues(events_by_room) ], consumeErrors=True, @@ -360,7 +364,7 @@ class FederationSender(object): for queue in queues: queue.flush_read_receipts_for_room(room_id) - @logcontext.preserve_fn # the caller should not yield on this + @preserve_fn # the caller should not yield on this @defer.inlineCallbacks def send_presence(self, states): """Send the new presence states to the appropriate destinations. diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py index aecd142309..1aae9ec9e7 100644 --- a/synapse/federation/transport/client.py +++ b/synapse/federation/transport/client.py @@ -22,7 +22,7 @@ from twisted.internet import defer from synapse.api.constants import Membership from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX -from synapse.util.logutils import log_function +from synapse.logging.utils import log_function logger = logging.getLogger(__name__) diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py index 955f0f4308..2efdcff7ef 100644 --- a/synapse/federation/transport/server.py +++ b/synapse/federation/transport/server.py @@ -36,8 +36,8 @@ from synapse.http.servlet import ( parse_json_object_from_request, parse_string_from_args, ) +from synapse.logging.context import run_in_background from synapse.types import ThirdPartyInstanceID, get_domain_from_id -from synapse.util.logcontext import run_in_background from synapse.util.ratelimitutils import FederationRateLimiter from synapse.util.versionstring import get_version_string diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py index e73757570c..f497711133 100644 --- a/synapse/groups/attestations.py +++ b/synapse/groups/attestations.py @@ -43,9 +43,9 @@ from signedjson.sign import sign_json from twisted.internet import defer from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError +from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import get_domain_from_id -from synapse.util.logcontext import run_in_background logger = logging.getLogger(__name__) diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py index edb48054a0..1f1708ba7d 100644 --- a/synapse/handlers/account_validity.py +++ b/synapse/handlers/account_validity.py @@ -22,10 +22,10 @@ from email.mime.text import MIMEText from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.logging.context import make_deferred_yieldable from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import UserID from synapse.util import stringutils -from synapse.util.logcontext import make_deferred_yieldable try: from synapse.push.mailer import load_jinja2_templates diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 5cc89d43f6..8f089f0e33 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -23,13 +23,13 @@ from twisted.internet import defer import synapse from synapse.api.constants import EventTypes +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import ( event_processing_loop_counter, event_processing_loop_room_count, ) from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util import log_failure -from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.metrics import Measure logger = logging.getLogger(__name__) diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py index c8c1ed3246..ef5585aa99 100644 --- a/synapse/handlers/auth.py +++ b/synapse/handlers/auth.py @@ -36,9 +36,9 @@ from synapse.api.errors import ( SynapseError, ) from synapse.api.ratelimiting import Ratelimiter +from synapse.logging.context import defer_to_thread from synapse.module_api import ModuleApi from synapse.types import UserID -from synapse.util import logcontext from synapse.util.caches.expiringcache import ExpiringCache from ._base import BaseHandler @@ -987,7 +987,7 @@ class AuthHandler(BaseHandler): bcrypt.gensalt(self.bcrypt_rounds), ).decode("ascii") - return logcontext.defer_to_thread(self.hs.get_reactor(), _do_hash) + return defer_to_thread(self.hs.get_reactor(), _do_hash) def validate_hash(self, password, stored_hash): """Validates that self.hash(password) == stored_hash. @@ -1013,7 +1013,7 @@ class AuthHandler(BaseHandler): if not isinstance(stored_hash, bytes): stored_hash = stored_hash.encode("ascii") - return logcontext.defer_to_thread(self.hs.get_reactor(), _do_validate_hash) + return defer_to_thread(self.hs.get_reactor(), _do_validate_hash) else: return defer.succeed(False) diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index 807900fe52..55b4ab3a1a 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -23,8 +23,8 @@ from canonicaljson import encode_canonical_json, json from twisted.internet import defer from synapse.api.errors import CodeMessageException, FederationDeniedError, SynapseError +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.types import UserID, get_domain_from_id -from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.retryutils import NotRetryingDestination logger = logging.getLogger(__name__) diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py index 5836d3c639..6a38328af3 100644 --- a/synapse/handlers/events.py +++ b/synapse/handlers/events.py @@ -21,8 +21,8 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, SynapseError from synapse.events import EventBase +from synapse.logging.utils import log_function from synapse.types import UserID -from synapse.util.logutils import log_function from synapse.visibility import filter_events_for_client from ._base import BaseHandler diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 02d397c498..57be968c67 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -45,6 +45,13 @@ from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions from synapse.crypto.event_signing import compute_event_signature from synapse.event_auth import auth_types_for_event from synapse.events.validator import EventValidator +from synapse.logging.context import ( + make_deferred_yieldable, + nested_logging_context, + preserve_fn, + run_in_background, +) +from synapse.logging.utils import log_function from synapse.replication.http.federation import ( ReplicationCleanRoomRestServlet, ReplicationFederationSendEventsRestServlet, @@ -52,10 +59,9 @@ from synapse.replication.http.federation import ( from synapse.replication.http.membership import ReplicationUserJoinedLeftRoomRestServlet from synapse.state import StateResolutionStore, resolve_events_with_store from synapse.types import UserID, get_domain_from_id -from synapse.util import logcontext, unwrapFirstError +from synapse.util import unwrapFirstError from synapse.util.async_helpers import Linearizer from synapse.util.distributor import user_joined_room -from synapse.util.logutils import log_function from synapse.util.retryutils import NotRetryingDestination from synapse.visibility import filter_events_for_server @@ -338,7 +344,7 @@ class FederationHandler(BaseHandler): room_version = yield self.store.get_room_version(room_id) - with logcontext.nested_logging_context(p): + with nested_logging_context(p): # note that if any of the missing prevs share missing state or # auth events, the requests to fetch those events are deduped # by the get_pdu_cache in federation_client. @@ -532,7 +538,7 @@ class FederationHandler(BaseHandler): event_id, ev.event_id, ) - with logcontext.nested_logging_context(ev.event_id): + with nested_logging_context(ev.event_id): try: yield self.on_receive_pdu(origin, ev, sent_to_us_directly=False) except FederationError as e: @@ -725,10 +731,10 @@ class FederationHandler(BaseHandler): missing_auth - failed_to_fetch, ) - results = yield logcontext.make_deferred_yieldable( + results = yield make_deferred_yieldable( defer.gatherResults( [ - logcontext.run_in_background( + run_in_background( self.federation_client.get_pdu, [dest], event_id, @@ -994,10 +1000,8 @@ class FederationHandler(BaseHandler): event_ids = list(extremities.keys()) logger.debug("calling resolve_state_groups in _maybe_backfill") - resolve = logcontext.preserve_fn( - self.state_handler.resolve_state_groups_for_events - ) - states = yield logcontext.make_deferred_yieldable( + resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events) + states = yield make_deferred_yieldable( defer.gatherResults( [resolve(room_id, [e]) for e in event_ids], consumeErrors=True ) @@ -1171,7 +1175,7 @@ class FederationHandler(BaseHandler): # lots of requests for missing prev_events which we do actually # have. Hence we fire off the deferred, but don't wait for it. - logcontext.run_in_background(self._handle_queued_pdus, room_queue) + run_in_background(self._handle_queued_pdus, room_queue) defer.returnValue(True) @@ -1191,7 +1195,7 @@ class FederationHandler(BaseHandler): p.event_id, p.room_id, ) - with logcontext.nested_logging_context(p.event_id): + with nested_logging_context(p.event_id): yield self.on_receive_pdu(origin, p, sent_to_us_directly=True) except Exception as e: logger.warn( @@ -1610,7 +1614,7 @@ class FederationHandler(BaseHandler): success = True finally: if not success: - logcontext.run_in_background( + run_in_background( self.store.remove_push_actions_from_staging, event.event_id ) @@ -1629,7 +1633,7 @@ class FederationHandler(BaseHandler): @defer.inlineCallbacks def prep(ev_info): event = ev_info["event"] - with logcontext.nested_logging_context(suffix=event.event_id): + with nested_logging_context(suffix=event.event_id): res = yield self._prep_event( origin, event, @@ -1639,12 +1643,9 @@ class FederationHandler(BaseHandler): ) defer.returnValue(res) - contexts = yield logcontext.make_deferred_yieldable( + contexts = yield make_deferred_yieldable( defer.gatherResults( - [ - logcontext.run_in_background(prep, ev_info) - for ev_info in event_infos - ], + [run_in_background(prep, ev_info) for ev_info in event_infos], consumeErrors=True, ) ) @@ -2106,10 +2107,10 @@ class FederationHandler(BaseHandler): room_version = yield self.store.get_room_version(event.room_id) - different_events = yield logcontext.make_deferred_yieldable( + different_events = yield make_deferred_yieldable( defer.gatherResults( [ - logcontext.run_in_background( + run_in_background( self.store.get_event, d, allow_none=True, allow_rejected=False ) for d in different_auth diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py index a1fe9d116f..54c966c8a6 100644 --- a/synapse/handlers/initial_sync.py +++ b/synapse/handlers/initial_sync.py @@ -21,12 +21,12 @@ from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError, Codes, SynapseError from synapse.events.validator import EventValidator from synapse.handlers.presence import format_user_presence_state +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.streams.config import PaginationConfig from synapse.types import StreamToken, UserID from synapse.util import unwrapFirstError from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache -from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.visibility import filter_events_for_client from ._base import BaseHandler diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 683da6bf32..eaeda7a5cb 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -34,13 +34,13 @@ from synapse.api.errors import ( from synapse.api.room_versions import RoomVersions from synapse.api.urls import ConsentURIBuilder from synapse.events.validator import EventValidator +from synapse.logging.context import run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.http.send_event import ReplicationSendEventRestServlet from synapse.storage.state import StateFilter from synapse.types import RoomAlias, UserID, create_requester from synapse.util.async_helpers import Linearizer from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import run_in_background from synapse.util.metrics import measure_func from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py index 76ee97ddd3..20bcfed334 100644 --- a/synapse/handlers/pagination.py +++ b/synapse/handlers/pagination.py @@ -20,10 +20,10 @@ from twisted.python.failure import Failure from synapse.api.constants import EventTypes, Membership from synapse.api.errors import SynapseError +from synapse.logging.context import run_in_background from synapse.storage.state import StateFilter from synapse.types import RoomStreamToken from synapse.util.async_helpers import ReadWriteLock -from synapse.util.logcontext import run_in_background from synapse.util.stringutils import random_string from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index c80dc2eba0..6f3537e435 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -34,14 +34,14 @@ from twisted.internet import defer import synapse.metrics from synapse.api.constants import EventTypes, Membership, PresenceState from synapse.api.errors import SynapseError +from synapse.logging.context import run_in_background +from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.presence import UserPresenceState from synapse.types import UserID, get_domain_from_id from synapse.util.async_helpers import Linearizer from synapse.util.caches.descriptors import cachedInlineCallbacks -from synapse.util.logcontext import run_in_background -from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index a3f550554f..cd1ac0a27a 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -25,6 +25,7 @@ from prometheus_client import Counter from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.logging.context import LoggingContext from synapse.push.clientformat import format_push_rules_for_user from synapse.storage.roommember import MemberSummary from synapse.storage.state import StateFilter @@ -33,7 +34,6 @@ from synapse.util.async_helpers import concurrently_execute from synapse.util.caches.expiringcache import ExpiringCache from synapse.util.caches.lrucache import LruCache from synapse.util.caches.response_cache import ResponseCache -from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure, measure_func from synapse.visibility import filter_events_for_client diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py index f8062c8671..c3e0c8fc7e 100644 --- a/synapse/handlers/typing.py +++ b/synapse/handlers/typing.py @@ -19,9 +19,9 @@ from collections import namedtuple from twisted.internet import defer from synapse.api.errors import AuthError, SynapseError +from synapse.logging.context import run_in_background from synapse.types import UserID, get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import run_in_background from synapse.util.metrics import Measure from synapse.util.wheel_timer import WheelTimer diff --git a/synapse/http/client.py b/synapse/http/client.py index 9bc7035c8d..45d5010952 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -45,9 +45,9 @@ from synapse.http import ( cancelled_to_request_timed_out_error, redact_uri, ) +from synapse.logging.context import make_deferred_yieldable from synapse.util.async_helpers import timeout_deferred from synapse.util.caches import CACHE_SIZE_FACTOR -from synapse.util.logcontext import make_deferred_yieldable logger = logging.getLogger(__name__) diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py index 414cde0777..054c321a20 100644 --- a/synapse/http/federation/matrix_federation_agent.py +++ b/synapse/http/federation/matrix_federation_agent.py @@ -30,9 +30,9 @@ from twisted.web.http_headers import Headers from twisted.web.iweb import IAgent from synapse.http.federation.srv_resolver import SrvResolver, pick_server_from_list +from synapse.logging.context import make_deferred_yieldable from synapse.util import Clock from synapse.util.caches.ttlcache import TTLCache -from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure # period to cache .well-known results for by default diff --git a/synapse/http/federation/srv_resolver.py b/synapse/http/federation/srv_resolver.py index 1f22f78a75..ecc88f9b96 100644 --- a/synapse/http/federation/srv_resolver.py +++ b/synapse/http/federation/srv_resolver.py @@ -25,7 +25,7 @@ from twisted.internet.error import ConnectError from twisted.names import client, dns from twisted.names.error import DNSNameError, DomainError -from synapse.util.logcontext import make_deferred_yieldable +from synapse.logging.context import make_deferred_yieldable logger = logging.getLogger(__name__) diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py index 5ef8bb60a3..dee3710f68 100644 --- a/synapse/http/matrixfederationclient.py +++ b/synapse/http/matrixfederationclient.py @@ -48,8 +48,8 @@ from synapse.api.errors import ( from synapse.http import QuieterFileBodyProducer from synapse.http.client import BlacklistingAgentWrapper, IPBlacklistingResolver from synapse.http.federation.matrix_federation_agent import MatrixFederationAgent +from synapse.logging.context import make_deferred_yieldable from synapse.util.async_helpers import timeout_deferred -from synapse.util.logcontext import make_deferred_yieldable from synapse.util.metrics import Measure logger = logging.getLogger(__name__) diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py index 62045a918b..46af27c8f6 100644 --- a/synapse/http/request_metrics.py +++ b/synapse/http/request_metrics.py @@ -19,8 +19,8 @@ import threading from prometheus_client.core import Counter, Histogram +from synapse.logging.context import LoggingContext from synapse.metrics import LaterGauge -from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) diff --git a/synapse/http/server.py b/synapse/http/server.py index d993161a3e..72a3d67eb6 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -39,8 +39,8 @@ from synapse.api.errors import ( SynapseError, UnrecognizedRequestError, ) +from synapse.logging.context import preserve_fn from synapse.util.caches import intern_dict -from synapse.util.logcontext import preserve_fn logger = logging.getLogger(__name__) diff --git a/synapse/http/site.py b/synapse/http/site.py index 93f679ea48..df5274c177 100644 --- a/synapse/http/site.py +++ b/synapse/http/site.py @@ -19,7 +19,7 @@ from twisted.web.server import Request, Site from synapse.http import redact_uri from synapse.http.request_metrics import RequestMetrics, requests_counter -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.logging.context import LoggingContext, PreserveLoggingContext logger = logging.getLogger(__name__) diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/synapse/logging/context.py b/synapse/logging/context.py new file mode 100644 index 0000000000..30dfa1d6b2 --- /dev/null +++ b/synapse/logging/context.py @@ -0,0 +1,693 @@ +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" Thread-local-alike tracking of log contexts within synapse + +This module provides objects and utilities for tracking contexts through +synapse code, so that log lines can include a request identifier, and so that +CPU and database activity can be accounted for against the request that caused +them. + +See doc/log_contexts.rst for details on how this works. +""" + +import logging +import threading +import types + +from twisted.internet import defer, threads + +logger = logging.getLogger(__name__) + +try: + import resource + + # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined + # to be 1 on linux so we hard code it. + RUSAGE_THREAD = 1 + + # If the system doesn't support RUSAGE_THREAD then this should throw an + # exception. + resource.getrusage(RUSAGE_THREAD) + + def get_thread_resource_usage(): + return resource.getrusage(RUSAGE_THREAD) + + +except Exception: + # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we + # won't track resource usage by returning None. + def get_thread_resource_usage(): + return None + + +# get an id for the current thread. +# +# threading.get_ident doesn't actually return an OS-level tid, and annoyingly, +# on Linux it actually returns the same value either side of a fork() call. However +# we only fork in one place, so it's not worth the hoop-jumping to get a real tid. +# +get_thread_id = threading.get_ident + + +class ContextResourceUsage(object): + """Object for tracking the resources used by a log context + + Attributes: + ru_utime (float): user CPU time (in seconds) + ru_stime (float): system CPU time (in seconds) + db_txn_count (int): number of database transactions done + db_sched_duration_sec (float): amount of time spent waiting for a + database connection + db_txn_duration_sec (float): amount of time spent doing database + transactions (excluding scheduling time) + evt_db_fetch_count (int): number of events requested from the database + """ + + __slots__ = [ + "ru_stime", + "ru_utime", + "db_txn_count", + "db_txn_duration_sec", + "db_sched_duration_sec", + "evt_db_fetch_count", + ] + + def __init__(self, copy_from=None): + """Create a new ContextResourceUsage + + Args: + copy_from (ContextResourceUsage|None): if not None, an object to + copy stats from + """ + if copy_from is None: + self.reset() + else: + self.ru_utime = copy_from.ru_utime + self.ru_stime = copy_from.ru_stime + self.db_txn_count = copy_from.db_txn_count + + self.db_txn_duration_sec = copy_from.db_txn_duration_sec + self.db_sched_duration_sec = copy_from.db_sched_duration_sec + self.evt_db_fetch_count = copy_from.evt_db_fetch_count + + def copy(self): + return ContextResourceUsage(copy_from=self) + + def reset(self): + self.ru_stime = 0.0 + self.ru_utime = 0.0 + self.db_txn_count = 0 + + self.db_txn_duration_sec = 0 + self.db_sched_duration_sec = 0 + self.evt_db_fetch_count = 0 + + def __repr__(self): + return ( + "" + ) % ( + self.ru_stime, + self.ru_utime, + self.db_txn_count, + self.db_txn_duration_sec, + self.db_sched_duration_sec, + self.evt_db_fetch_count, + ) + + def __iadd__(self, other): + """Add another ContextResourceUsage's stats to this one's. + + Args: + other (ContextResourceUsage): the other resource usage object + """ + self.ru_utime += other.ru_utime + self.ru_stime += other.ru_stime + self.db_txn_count += other.db_txn_count + self.db_txn_duration_sec += other.db_txn_duration_sec + self.db_sched_duration_sec += other.db_sched_duration_sec + self.evt_db_fetch_count += other.evt_db_fetch_count + return self + + def __isub__(self, other): + self.ru_utime -= other.ru_utime + self.ru_stime -= other.ru_stime + self.db_txn_count -= other.db_txn_count + self.db_txn_duration_sec -= other.db_txn_duration_sec + self.db_sched_duration_sec -= other.db_sched_duration_sec + self.evt_db_fetch_count -= other.evt_db_fetch_count + return self + + def __add__(self, other): + res = ContextResourceUsage(copy_from=self) + res += other + return res + + def __sub__(self, other): + res = ContextResourceUsage(copy_from=self) + res -= other + return res + + +class LoggingContext(object): + """Additional context for log formatting. Contexts are scoped within a + "with" block. + + If a parent is given when creating a new context, then: + - logging fields are copied from the parent to the new context on entry + - when the new context exits, the cpu usage stats are copied from the + child to the parent + + Args: + name (str): Name for the context for debugging. + parent_context (LoggingContext|None): The parent of the new context + """ + + __slots__ = [ + "previous_context", + "name", + "parent_context", + "_resource_usage", + "usage_start", + "main_thread", + "alive", + "request", + "tag", + ] + + thread_local = threading.local() + + class Sentinel(object): + """Sentinel to represent the root context""" + + __slots__ = [] + + def __str__(self): + return "sentinel" + + def copy_to(self, record): + pass + + def start(self): + pass + + def stop(self): + pass + + def add_database_transaction(self, duration_sec): + pass + + def add_database_scheduled(self, sched_sec): + pass + + def record_event_fetch(self, event_count): + pass + + def __nonzero__(self): + return False + + __bool__ = __nonzero__ # python3 + + sentinel = Sentinel() + + def __init__(self, name=None, parent_context=None, request=None): + self.previous_context = LoggingContext.current_context() + self.name = name + + # track the resources used by this context so far + self._resource_usage = ContextResourceUsage() + + # If alive has the thread resource usage when the logcontext last + # became active. + self.usage_start = None + + self.main_thread = get_thread_id() + self.request = None + self.tag = "" + self.alive = True + + self.parent_context = parent_context + + if self.parent_context is not None: + self.parent_context.copy_to(self) + + if request is not None: + # the request param overrides the request from the parent context + self.request = request + + def __str__(self): + if self.request: + return str(self.request) + return "%s@%x" % (self.name, id(self)) + + @classmethod + def current_context(cls): + """Get the current logging context from thread local storage + + Returns: + LoggingContext: the current logging context + """ + return getattr(cls.thread_local, "current_context", cls.sentinel) + + @classmethod + def set_current_context(cls, context): + """Set the current logging context in thread local storage + Args: + context(LoggingContext): The context to activate. + Returns: + The context that was previously active + """ + current = cls.current_context() + + if current is not context: + current.stop() + cls.thread_local.current_context = context + context.start() + return current + + def __enter__(self): + """Enters this logging context into thread local storage""" + old_context = self.set_current_context(self) + if self.previous_context != old_context: + logger.warn( + "Expected previous context %r, found %r", + self.previous_context, + old_context, + ) + self.alive = True + + return self + + def __exit__(self, type, value, traceback): + """Restore the logging context in thread local storage to the state it + was before this context was entered. + Returns: + None to avoid suppressing any exceptions that were thrown. + """ + current = self.set_current_context(self.previous_context) + if current is not self: + if current is self.sentinel: + logger.warning("Expected logging context %s was lost", self) + else: + logger.warning( + "Expected logging context %s but found %s", self, current + ) + self.previous_context = None + self.alive = False + + # if we have a parent, pass our CPU usage stats on + if self.parent_context is not None and hasattr( + self.parent_context, "_resource_usage" + ): + self.parent_context._resource_usage += self._resource_usage + + # reset them in case we get entered again + self._resource_usage.reset() + + def copy_to(self, record): + """Copy logging fields from this context to a log record or + another LoggingContext + """ + + # 'request' is the only field we currently use in the logger, so that's + # all we need to copy + record.request = self.request + + def start(self): + if get_thread_id() != self.main_thread: + logger.warning("Started logcontext %s on different thread", self) + return + + # If we haven't already started record the thread resource usage so + # far + if not self.usage_start: + self.usage_start = get_thread_resource_usage() + + def stop(self): + if get_thread_id() != self.main_thread: + logger.warning("Stopped logcontext %s on different thread", self) + return + + # When we stop, let's record the cpu used since we started + if not self.usage_start: + logger.warning("Called stop on logcontext %s without calling start", self) + return + + utime_delta, stime_delta = self._get_cputime() + self._resource_usage.ru_utime += utime_delta + self._resource_usage.ru_stime += stime_delta + + self.usage_start = None + + def get_resource_usage(self): + """Get resources used by this logcontext so far. + + Returns: + ContextResourceUsage: a *copy* of the object tracking resource + usage so far + """ + # we always return a copy, for consistency + res = self._resource_usage.copy() + + # If we are on the correct thread and we're currently running then we + # can include resource usage so far. + is_main_thread = get_thread_id() == self.main_thread + if self.alive and self.usage_start and is_main_thread: + utime_delta, stime_delta = self._get_cputime() + res.ru_utime += utime_delta + res.ru_stime += stime_delta + + return res + + def _get_cputime(self): + """Get the cpu usage time so far + + Returns: Tuple[float, float]: seconds in user mode, seconds in system mode + """ + current = get_thread_resource_usage() + + utime_delta = current.ru_utime - self.usage_start.ru_utime + stime_delta = current.ru_stime - self.usage_start.ru_stime + + # sanity check + if utime_delta < 0: + logger.error( + "utime went backwards! %f < %f", + current.ru_utime, + self.usage_start.ru_utime, + ) + utime_delta = 0 + + if stime_delta < 0: + logger.error( + "stime went backwards! %f < %f", + current.ru_stime, + self.usage_start.ru_stime, + ) + stime_delta = 0 + + return utime_delta, stime_delta + + def add_database_transaction(self, duration_sec): + if duration_sec < 0: + raise ValueError("DB txn time can only be non-negative") + self._resource_usage.db_txn_count += 1 + self._resource_usage.db_txn_duration_sec += duration_sec + + def add_database_scheduled(self, sched_sec): + """Record a use of the database pool + + Args: + sched_sec (float): number of seconds it took us to get a + connection + """ + if sched_sec < 0: + raise ValueError("DB scheduling time can only be non-negative") + self._resource_usage.db_sched_duration_sec += sched_sec + + def record_event_fetch(self, event_count): + """Record a number of events being fetched from the db + + Args: + event_count (int): number of events being fetched + """ + self._resource_usage.evt_db_fetch_count += event_count + + +class LoggingContextFilter(logging.Filter): + """Logging filter that adds values from the current logging context to each + record. + Args: + **defaults: Default values to avoid formatters complaining about + missing fields + """ + + def __init__(self, **defaults): + self.defaults = defaults + + def filter(self, record): + """Add each fields from the logging contexts to the record. + Returns: + True to include the record in the log output. + """ + context = LoggingContext.current_context() + for key, value in self.defaults.items(): + setattr(record, key, value) + + # context should never be None, but if it somehow ends up being, then + # we end up in a death spiral of infinite loops, so let's check, for + # robustness' sake. + if context is not None: + context.copy_to(record) + + return True + + +class PreserveLoggingContext(object): + """Captures the current logging context and restores it when the scope is + exited. Used to restore the context after a function using + @defer.inlineCallbacks is resumed by a callback from the reactor.""" + + __slots__ = ["current_context", "new_context", "has_parent"] + + def __init__(self, new_context=None): + if new_context is None: + new_context = LoggingContext.sentinel + self.new_context = new_context + + def __enter__(self): + """Captures the current logging context""" + self.current_context = LoggingContext.set_current_context(self.new_context) + + if self.current_context: + self.has_parent = self.current_context.previous_context is not None + if not self.current_context.alive: + logger.debug("Entering dead context: %s", self.current_context) + + def __exit__(self, type, value, traceback): + """Restores the current logging context""" + context = LoggingContext.set_current_context(self.current_context) + + if context != self.new_context: + if context is LoggingContext.sentinel: + logger.warning("Expected logging context %s was lost", self.new_context) + else: + logger.warning( + "Expected logging context %s but found %s", + self.new_context, + context, + ) + + if self.current_context is not LoggingContext.sentinel: + if not self.current_context.alive: + logger.debug("Restoring dead context: %s", self.current_context) + + +def nested_logging_context(suffix, parent_context=None): + """Creates a new logging context as a child of another. + + The nested logging context will have a 'request' made up of the parent context's + request, plus the given suffix. + + CPU/db usage stats will be added to the parent context's on exit. + + Normal usage looks like: + + with nested_logging_context(suffix): + # ... do stuff + + Args: + suffix (str): suffix to add to the parent context's 'request'. + parent_context (LoggingContext|None): parent context. Will use the current context + if None. + + Returns: + LoggingContext: new logging context. + """ + if parent_context is None: + parent_context = LoggingContext.current_context() + return LoggingContext( + parent_context=parent_context, request=parent_context.request + "-" + suffix + ) + + +def preserve_fn(f): + """Function decorator which wraps the function with run_in_background""" + + def g(*args, **kwargs): + return run_in_background(f, *args, **kwargs) + + return g + + +def run_in_background(f, *args, **kwargs): + """Calls a function, ensuring that the current context is restored after + return from the function, and that the sentinel context is set once the + deferred returned by the function completes. + + Useful for wrapping functions that return a deferred or coroutine, which you don't + yield or await on (for instance because you want to pass it to + deferred.gatherResults()). + + Note that if you completely discard the result, you should make sure that + `f` doesn't raise any deferred exceptions, otherwise a scary-looking + CRITICAL error about an unhandled error will be logged without much + indication about where it came from. + """ + current = LoggingContext.current_context() + try: + res = f(*args, **kwargs) + except: # noqa: E722 + # the assumption here is that the caller doesn't want to be disturbed + # by synchronous exceptions, so let's turn them into Failures. + return defer.fail() + + if isinstance(res, types.CoroutineType): + res = defer.ensureDeferred(res) + + if not isinstance(res, defer.Deferred): + return res + + if res.called and not res.paused: + # The function should have maintained the logcontext, so we can + # optimise out the messing about + return res + + # The function may have reset the context before returning, so + # we need to restore it now. + ctx = LoggingContext.set_current_context(current) + + # The original context will be restored when the deferred + # completes, but there is nothing waiting for it, so it will + # get leaked into the reactor or some other function which + # wasn't expecting it. We therefore need to reset the context + # here. + # + # (If this feels asymmetric, consider it this way: we are + # effectively forking a new thread of execution. We are + # probably currently within a ``with LoggingContext()`` block, + # which is supposed to have a single entry and exit point. But + # by spawning off another deferred, we are effectively + # adding a new exit point.) + res.addBoth(_set_context_cb, ctx) + return res + + +def make_deferred_yieldable(deferred): + """Given a deferred, make it follow the Synapse logcontext rules: + + If the deferred has completed (or is not actually a Deferred), essentially + does nothing (just returns another completed deferred with the + result/failure). + + If the deferred has not yet completed, resets the logcontext before + returning a deferred. Then, when the deferred completes, restores the + current logcontext before running callbacks/errbacks. + + (This is more-or-less the opposite operation to run_in_background.) + """ + if not isinstance(deferred, defer.Deferred): + return deferred + + if deferred.called and not deferred.paused: + # it looks like this deferred is ready to run any callbacks we give it + # immediately. We may as well optimise out the logcontext faffery. + return deferred + + # ok, we can't be sure that a yield won't block, so let's reset the + # logcontext, and add a callback to the deferred to restore it. + prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) + deferred.addBoth(_set_context_cb, prev_context) + return deferred + + +def _set_context_cb(result, context): + """A callback function which just sets the logging context""" + LoggingContext.set_current_context(context) + return result + + +def defer_to_thread(reactor, f, *args, **kwargs): + """ + Calls the function `f` using a thread from the reactor's default threadpool and + returns the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked, and whose threadpool we should use for the + function. + + Normally this will be hs.get_reactor(). + + f (callable): The function to call. + + args: positional arguments to pass to f. + + kwargs: keyword arguments to pass to f. + + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. + """ + return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) + + +def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): + """ + A wrapper for twisted.internet.threads.deferToThreadpool, which handles + logcontexts correctly. + + Calls the function `f` using a thread from the given threadpool and returns + the result as a Deferred. + + Creates a new logcontext for `f`, which is created as a child of the current + logcontext (so its CPU usage metrics will get attributed to the current + logcontext). `f` should preserve the logcontext it is given. + + The result deferred follows the Synapse logcontext rules: you should `yield` + on it. + + Args: + reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread + the Deferred will be invoked. Normally this will be hs.get_reactor(). + + threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for + running `f`. Normally this will be hs.get_reactor().getThreadPool(). + + f (callable): The function to call. + + args: positional arguments to pass to f. + + kwargs: keyword arguments to pass to f. + + Returns: + Deferred: A Deferred which fires a callback with the result of `f`, or an + errback if `f` throws an exception. + """ + logcontext = LoggingContext.current_context() + + def g(): + with LoggingContext(parent_context=logcontext): + return f(*args, **kwargs) + + return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g)) diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py new file mode 100644 index 0000000000..fbf570c756 --- /dev/null +++ b/synapse/logging/formatter.py @@ -0,0 +1,53 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import logging +import traceback + +from six import StringIO + + +class LogFormatter(logging.Formatter): + """Log formatter which gives more detail for exceptions + + This is the same as the standard log formatter, except that when logging + exceptions [typically via log.foo("msg", exc_info=1)], it prints the + sequence that led up to the point at which the exception was caught. + (Normally only stack frames between the point the exception was raised and + where it was caught are logged). + """ + + def __init__(self, *args, **kwargs): + super(LogFormatter, self).__init__(*args, **kwargs) + + def formatException(self, ei): + sio = StringIO() + (typ, val, tb) = ei + + # log the stack above the exception capture point if possible, but + # check that we actually have an f_back attribute to work around + # https://twistedmatrix.com/trac/ticket/9305 + + if tb and hasattr(tb.tb_frame, "f_back"): + sio.write("Capture point (most recent call last):\n") + traceback.print_stack(tb.tb_frame.f_back, None, sio) + + traceback.print_exception(typ, val, tb, None, sio) + s = sio.getvalue() + sio.close() + if s[-1:] == "\n": + s = s[:-1] + return s diff --git a/synapse/logging/utils.py b/synapse/logging/utils.py new file mode 100644 index 0000000000..7df0fa6087 --- /dev/null +++ b/synapse/logging/utils.py @@ -0,0 +1,194 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +import inspect +import logging +import time +from functools import wraps +from inspect import getcallargs + +from six import PY3 + +_TIME_FUNC_ID = 0 + + +def _log_debug_as_f(f, msg, msg_args): + name = f.__module__ + logger = logging.getLogger(name) + + if logger.isEnabledFor(logging.DEBUG): + if PY3: + lineno = f.__code__.co_firstlineno + pathname = f.__code__.co_filename + else: + lineno = f.func_code.co_firstlineno + pathname = f.func_code.co_filename + + record = logging.LogRecord( + name=name, + level=logging.DEBUG, + pathname=pathname, + lineno=lineno, + msg=msg, + args=msg_args, + exc_info=None, + ) + + logger.handle(record) + + +def log_function(f): + """ Function decorator that logs every call to that function. + """ + func_name = f.__name__ + + @wraps(f) + def wrapped(*args, **kwargs): + name = f.__module__ + logger = logging.getLogger(name) + level = logging.DEBUG + + if logger.isEnabledFor(level): + bound_args = getcallargs(f, *args, **kwargs) + + def format(value): + r = str(value) + if len(r) > 50: + r = r[:50] + "..." + return r + + func_args = ["%s=%s" % (k, format(v)) for k, v in bound_args.items()] + + msg_args = {"func_name": func_name, "args": ", ".join(func_args)} + + _log_debug_as_f(f, "Invoked '%(func_name)s' with args: %(args)s", msg_args) + + return f(*args, **kwargs) + + wrapped.__name__ = func_name + return wrapped + + +def time_function(f): + func_name = f.__name__ + + @wraps(f) + def wrapped(*args, **kwargs): + global _TIME_FUNC_ID + id = _TIME_FUNC_ID + _TIME_FUNC_ID += 1 + + start = time.clock() + + try: + _log_debug_as_f(f, "[FUNC START] {%s-%d}", (func_name, id)) + + r = f(*args, **kwargs) + finally: + end = time.clock() + _log_debug_as_f( + f, "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start) + ) + + return r + + return wrapped + + +def trace_function(f): + func_name = f.__name__ + linenum = f.func_code.co_firstlineno + pathname = f.func_code.co_filename + + @wraps(f) + def wrapped(*args, **kwargs): + name = f.__module__ + logger = logging.getLogger(name) + level = logging.DEBUG + + s = inspect.currentframe().f_back + + to_print = [ + "\t%s:%s %s. Args: args=%s, kwargs=%s" + % (pathname, linenum, func_name, args, kwargs) + ] + while s: + if True or s.f_globals["__name__"].startswith("synapse"): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + to_print.append( + "\t%s:%d %s. Args: %s" % (filename, lineno, function, args_string) + ) + + s = s.f_back + + msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print) + + record = logging.LogRecord( + name=name, + level=level, + pathname=pathname, + lineno=lineno, + msg=msg, + args=None, + exc_info=None, + ) + + logger.handle(record) + + return f(*args, **kwargs) + + wrapped.__name__ = func_name + return wrapped + + +def get_previous_frames(): + s = inspect.currentframe().f_back.f_back + to_return = [] + while s: + if s.f_globals["__name__"].startswith("synapse"): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + to_return.append( + "{{ %s:%d %s - Args: %s }}" % (filename, lineno, function, args_string) + ) + + s = s.f_back + + return ", ".join(to_return) + + +def get_previous_frame(ignore=[]): + s = inspect.currentframe().f_back.f_back + + while s: + if s.f_globals["__name__"].startswith("synapse"): + if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore): + filename, lineno, function, _, _ = inspect.getframeinfo(s) + args_string = inspect.formatargvalues(*inspect.getargvalues(s)) + + return "{{ %s:%d %s - Args: %s }}" % ( + filename, + lineno, + function, + args_string, + ) + + s = s.f_back + + return None diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py index 167e2c068a..edd6b42db3 100644 --- a/synapse/metrics/background_process_metrics.py +++ b/synapse/metrics/background_process_metrics.py @@ -22,7 +22,7 @@ from prometheus_client.core import REGISTRY, Counter, GaugeMetricFamily from twisted.internet import defer -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext +from synapse.logging.context import LoggingContext, PreserveLoggingContext logger = logging.getLogger(__name__) diff --git a/synapse/notifier.py b/synapse/notifier.py index d398078eed..918ef64897 100644 --- a/synapse/notifier.py +++ b/synapse/notifier.py @@ -23,12 +23,12 @@ from twisted.internet import defer from synapse.api.constants import EventTypes, Membership from synapse.api.errors import AuthError from synapse.handlers.presence import format_user_presence_state +from synapse.logging.context import PreserveLoggingContext +from synapse.logging.utils import log_function from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.types import StreamToken from synapse.util.async_helpers import ObservableDeferred, timeout_deferred -from synapse.util.logcontext import PreserveLoggingContext -from synapse.util.logutils import log_function from synapse.util.metrics import Measure from synapse.visibility import filter_events_for_client diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 809199fe88..521c6e2cd7 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -29,6 +29,7 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.api.errors import StoreError +from synapse.logging.context import make_deferred_yieldable from synapse.push.presentable_names import ( calculate_room_name, descriptor_from_member_events, @@ -36,7 +37,6 @@ from synapse.push.presentable_names import ( ) from synapse.types import UserID from synapse.util.async_helpers import concurrently_execute -from synapse.util.logcontext import make_deferred_yieldable from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 97efb835ad..5ffdf2675d 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -62,9 +62,9 @@ from twisted.internet import defer from twisted.protocols.basic import LineOnlyReceiver from twisted.python.failure import Failure +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from .commands import ( diff --git a/synapse/rest/client/transactions.py b/synapse/rest/client/transactions.py index 36404b797d..6da71dc46f 100644 --- a/synapse/rest/client/transactions.py +++ b/synapse/rest/client/transactions.py @@ -17,8 +17,8 @@ to ensure idempotency when performing PUTs using the REST API.""" import logging +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.util.async_helpers import ObservableDeferred -from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py index 3318638d3e..5fefee4dde 100644 --- a/synapse/rest/media/v1/_base.py +++ b/synapse/rest/media/v1/_base.py @@ -25,7 +25,7 @@ from twisted.protocols.basic import FileSender from synapse.api.errors import Codes, SynapseError, cs_error from synapse.http.server import finish_request, respond_with_json -from synapse.util import logcontext +from synapse.logging.context import make_deferred_yieldable from synapse.util.stringutils import is_ascii logger = logging.getLogger(__name__) @@ -75,9 +75,7 @@ def respond_with_file(request, media_type, file_path, file_size=None, upload_nam add_file_headers(request, media_type, file_size, upload_name) with open(file_path, "rb") as f: - yield logcontext.make_deferred_yieldable( - FileSender().beginFileTransfer(f, request) - ) + yield make_deferred_yieldable(FileSender().beginFileTransfer(f, request)) finish_request(request) else: diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index df3d985a38..65afffbb42 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -33,8 +33,8 @@ from synapse.api.errors import ( RequestSendFailed, SynapseError, ) +from synapse.logging.context import defer_to_thread from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util import logcontext from synapse.util.async_helpers import Linearizer from synapse.util.retryutils import NotRetryingDestination from synapse.util.stringutils import random_string @@ -463,7 +463,7 @@ class MediaRepository(object): ) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield logcontext.defer_to_thread( + t_byte_source = yield defer_to_thread( self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, @@ -511,7 +511,7 @@ class MediaRepository(object): ) thumbnailer = Thumbnailer(input_path) - t_byte_source = yield logcontext.defer_to_thread( + t_byte_source = yield defer_to_thread( self.hs.get_reactor(), self._generate_thumbnail, thumbnailer, @@ -596,7 +596,7 @@ class MediaRepository(object): return if thumbnailer.transpose_method is not None: - m_width, m_height = yield logcontext.defer_to_thread( + m_width, m_height = yield defer_to_thread( self.hs.get_reactor(), thumbnailer.transpose ) @@ -616,11 +616,11 @@ class MediaRepository(object): for (t_width, t_height, t_type), t_method in iteritems(thumbnails): # Generate the thumbnail if t_method == "crop": - t_byte_source = yield logcontext.defer_to_thread( + t_byte_source = yield defer_to_thread( self.hs.get_reactor(), thumbnailer.crop, t_width, t_height, t_type ) elif t_method == "scale": - t_byte_source = yield logcontext.defer_to_thread( + t_byte_source = yield defer_to_thread( self.hs.get_reactor(), thumbnailer.scale, t_width, t_height, t_type ) else: diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py index eff86836fb..25e5ac2848 100644 --- a/synapse/rest/media/v1/media_storage.py +++ b/synapse/rest/media/v1/media_storage.py @@ -24,9 +24,8 @@ import six from twisted.internet import defer from twisted.protocols.basic import FileSender -from synapse.util import logcontext +from synapse.logging.context import defer_to_thread, make_deferred_yieldable from synapse.util.file_consumer import BackgroundFileConsumer -from synapse.util.logcontext import make_deferred_yieldable from ._base import Responder @@ -65,7 +64,7 @@ class MediaStorage(object): with self.store_into_file(file_info) as (f, fname, finish_cb): # Write to the main repository - yield logcontext.defer_to_thread( + yield defer_to_thread( self.hs.get_reactor(), _write_file_synchronously, source, f ) yield finish_cb() diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py index 053346fb86..5871737bfd 100644 --- a/synapse/rest/media/v1/preview_url_resource.py +++ b/synapse/rest/media/v1/preview_url_resource.py @@ -42,11 +42,11 @@ from synapse.http.server import ( wrap_json_request_handler, ) from synapse.http.servlet import parse_integer, parse_string +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process from synapse.rest.media.v1._base import get_filename_from_headers from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logcontext import make_deferred_yieldable, run_in_background from synapse.util.stringutils import random_string from ._base import FileInfo diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py index 359b45ebfc..e8f559acc1 100644 --- a/synapse/rest/media/v1/storage_provider.py +++ b/synapse/rest/media/v1/storage_provider.py @@ -20,8 +20,7 @@ import shutil from twisted.internet import defer from synapse.config._base import Config -from synapse.util import logcontext -from synapse.util.logcontext import run_in_background +from synapse.logging.context import defer_to_thread, run_in_background from .media_storage import FileResponder @@ -125,7 +124,7 @@ class FileStorageProviderBackend(StorageProvider): if not os.path.exists(dirname): os.makedirs(dirname) - return logcontext.defer_to_thread( + return defer_to_thread( self.hs.get_reactor(), shutil.copyfile, primary_fname, backup_fname ) diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py index 1b454a56a1..9f708fa205 100644 --- a/synapse/state/__init__.py +++ b/synapse/state/__init__.py @@ -28,11 +28,11 @@ from twisted.internet import defer from synapse.api.constants import EventTypes from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, StateResolutionVersions from synapse.events.snapshot import EventContext +from synapse.logging.utils import log_function from synapse.state import v1, v2 from synapse.util.async_helpers import Linearizer from synapse.util.caches import get_cache_factor_for from synapse.util.caches.expiringcache import ExpiringCache -from synapse.util.logutils import log_function from synapse.util.metrics import Measure logger = logging.getLogger(__name__) diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 29589853c6..2f940dbae6 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -30,12 +30,12 @@ from prometheus_client import Histogram from twisted.internet import defer from synapse.api.errors import StoreError +from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.metrics.background_process_metrics import run_as_background_process from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id from synapse.util import batch_iter from synapse.util.caches.descriptors import Cache -from synapse.util.logcontext import LoggingContext, PreserveLoggingContext from synapse.util.stringutils import exception_to_unicode # import a function which will return a monotonic time, in seconds diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 86f8485704..b486ca50eb 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -33,6 +33,8 @@ from synapse.api.constants import EventTypes from synapse.api.errors import SynapseError from synapse.events import EventBase # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 +from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable +from synapse.logging.utils import log_function from synapse.metrics import BucketCollector from synapse.metrics.background_process_metrics import run_as_background_process from synapse.state import StateResolutionStore @@ -45,8 +47,6 @@ from synapse.util import batch_iter from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.descriptors import cached, cachedInlineCallbacks from synapse.util.frozenutils import frozendict_json_encoder -from synapse.util.logcontext import PreserveLoggingContext, make_deferred_yieldable -from synapse.util.logutils import log_function from synapse.util.metrics import Measure logger = logging.getLogger(__name__) diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py index 6d680d405a..09db872511 100644 --- a/synapse/storage/events_worker.py +++ b/synapse/storage/events_worker.py @@ -29,14 +29,14 @@ from synapse.api.room_versions import EventFormatVersions from synapse.events import FrozenEvent, event_type_from_format_version # noqa: F401 from synapse.events.snapshot import EventContext # noqa: F401 from synapse.events.utils import prune_event -from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import get_domain_from_id -from synapse.util.logcontext import ( +from synapse.logging.context import ( LoggingContext, PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import get_domain_from_id from synapse.util.metrics import Measure from ._base import SQLBaseStore diff --git a/synapse/storage/stream.py b/synapse/storage/stream.py index d9482a3843..386a9dbe14 100644 --- a/synapse/storage/stream.py +++ b/synapse/storage/stream.py @@ -41,12 +41,12 @@ from six.moves import range from twisted.internet import defer +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.storage._base import SQLBaseStore from synapse.storage.engines import PostgresEngine from synapse.storage.events_worker import EventsWorkerStore from synapse.types import RoomStreamToken from synapse.util.caches.stream_change_cache import StreamChangeCache -from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 954e32fb2a..c6d2ce4404 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -21,10 +21,14 @@ import attr from twisted.internet import defer, task -from synapse.util.logcontext import PreserveLoggingContext +from synapse.logging import context, formatter logger = logging.getLogger(__name__) +# Compatibility alias, for existing logconfigs. +logcontext = context +logformatter = formatter + def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. @@ -46,7 +50,7 @@ class Clock(object): @defer.inlineCallbacks def sleep(self, seconds): d = defer.Deferred() - with PreserveLoggingContext(): + with context.PreserveLoggingContext(): self._reactor.callLater(seconds, d.callback, seconds) res = yield d defer.returnValue(res) @@ -91,10 +95,10 @@ class Clock(object): """ def wrapped_callback(*args, **kwargs): - with PreserveLoggingContext(): + with context.PreserveLoggingContext(): callback(*args, **kwargs) - with PreserveLoggingContext(): + with context.PreserveLoggingContext(): return self._reactor.callLater(delay, wrapped_callback, *args, **kwargs) def cancel_call_later(self, timer, ignore_errs=False): diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 7757b8708a..58a6b8764f 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -23,13 +23,12 @@ from twisted.internet import defer from twisted.internet.defer import CancelledError from twisted.python import failure -from synapse.util import Clock, logcontext, unwrapFirstError - -from .logcontext import ( +from synapse.logging.context import ( PreserveLoggingContext, make_deferred_yieldable, run_in_background, ) +from synapse.util import Clock, unwrapFirstError logger = logging.getLogger(__name__) @@ -153,7 +152,7 @@ def concurrently_execute(func, args, limit): except StopIteration: pass - return logcontext.make_deferred_yieldable( + return make_deferred_yieldable( defer.gatherResults( [run_in_background(_concurrently_execute_inner) for _ in range(limit)], consumeErrors=True, @@ -174,7 +173,7 @@ def yieldable_gather_results(func, iter, *args, **kwargs): Deferred[list]: Resolved when all functions have been invoked, or errors if one of the function calls fails. """ - return logcontext.make_deferred_yieldable( + return make_deferred_yieldable( defer.gatherResults( [run_in_background(func, item, *args, **kwargs) for item in iter], consumeErrors=True, diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index d2f25063aa..675db2f448 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -24,7 +24,8 @@ from six import itervalues, string_types from twisted.internet import defer -from synapse.util import logcontext, unwrapFirstError +from synapse.logging.context import make_deferred_yieldable, preserve_fn +from synapse.util import unwrapFirstError from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches import get_cache_factor_for from synapse.util.caches.lrucache import LruCache @@ -388,7 +389,7 @@ class CacheDescriptor(_CacheDescriptorBase): except KeyError: ret = defer.maybeDeferred( - logcontext.preserve_fn(self.function_to_call), obj, *args, **kwargs + preserve_fn(self.function_to_call), obj, *args, **kwargs ) def onErr(f): @@ -408,7 +409,7 @@ class CacheDescriptor(_CacheDescriptorBase): observer = result_d.observe() if isinstance(observer, defer.Deferred): - return logcontext.make_deferred_yieldable(observer) + return make_deferred_yieldable(observer) else: return observer @@ -563,7 +564,7 @@ class CacheListDescriptor(_CacheDescriptorBase): cached_defers.append( defer.maybeDeferred( - logcontext.preserve_fn(self.function_to_call), **args_to_call + preserve_fn(self.function_to_call), **args_to_call ).addCallbacks(complete_all, errback) ) @@ -571,7 +572,7 @@ class CacheListDescriptor(_CacheDescriptorBase): d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks( lambda _: results, unwrapFirstError ) - return logcontext.make_deferred_yieldable(d) + return make_deferred_yieldable(d) else: return results diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index cbe54d45dd..d6908e169d 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -16,9 +16,9 @@ import logging from twisted.internet import defer +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches import register_cache -from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -78,7 +78,7 @@ class ResponseCache(object): *deferred* should run its callbacks in the sentinel logcontext (ie, you should wrap normal synapse deferreds with - logcontext.run_in_background). + synapse.logging.context.run_in_background). Can return either a new Deferred (which also doesn't follow the synapse logcontext rules), or, if *deferred* was already complete, the actual diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py index 5a79db821c..45af8d3eeb 100644 --- a/synapse/util/distributor.py +++ b/synapse/util/distributor.py @@ -17,8 +17,8 @@ import logging from twisted.internet import defer +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py index 629ed44149..8b17d1c8b8 100644 --- a/synapse/util/file_consumer.py +++ b/synapse/util/file_consumer.py @@ -17,7 +17,7 @@ from six.moves import queue from twisted.internet import threads -from synapse.util.logcontext import make_deferred_yieldable, run_in_background +from synapse.logging.context import make_deferred_yieldable, run_in_background class BackgroundFileConsumer(object): diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py deleted file mode 100644 index 30dfa1d6b2..0000000000 --- a/synapse/util/logcontext.py +++ /dev/null @@ -1,693 +0,0 @@ -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" Thread-local-alike tracking of log contexts within synapse - -This module provides objects and utilities for tracking contexts through -synapse code, so that log lines can include a request identifier, and so that -CPU and database activity can be accounted for against the request that caused -them. - -See doc/log_contexts.rst for details on how this works. -""" - -import logging -import threading -import types - -from twisted.internet import defer, threads - -logger = logging.getLogger(__name__) - -try: - import resource - - # Python doesn't ship with a definition of RUSAGE_THREAD but it's defined - # to be 1 on linux so we hard code it. - RUSAGE_THREAD = 1 - - # If the system doesn't support RUSAGE_THREAD then this should throw an - # exception. - resource.getrusage(RUSAGE_THREAD) - - def get_thread_resource_usage(): - return resource.getrusage(RUSAGE_THREAD) - - -except Exception: - # If the system doesn't support resource.getrusage(RUSAGE_THREAD) then we - # won't track resource usage by returning None. - def get_thread_resource_usage(): - return None - - -# get an id for the current thread. -# -# threading.get_ident doesn't actually return an OS-level tid, and annoyingly, -# on Linux it actually returns the same value either side of a fork() call. However -# we only fork in one place, so it's not worth the hoop-jumping to get a real tid. -# -get_thread_id = threading.get_ident - - -class ContextResourceUsage(object): - """Object for tracking the resources used by a log context - - Attributes: - ru_utime (float): user CPU time (in seconds) - ru_stime (float): system CPU time (in seconds) - db_txn_count (int): number of database transactions done - db_sched_duration_sec (float): amount of time spent waiting for a - database connection - db_txn_duration_sec (float): amount of time spent doing database - transactions (excluding scheduling time) - evt_db_fetch_count (int): number of events requested from the database - """ - - __slots__ = [ - "ru_stime", - "ru_utime", - "db_txn_count", - "db_txn_duration_sec", - "db_sched_duration_sec", - "evt_db_fetch_count", - ] - - def __init__(self, copy_from=None): - """Create a new ContextResourceUsage - - Args: - copy_from (ContextResourceUsage|None): if not None, an object to - copy stats from - """ - if copy_from is None: - self.reset() - else: - self.ru_utime = copy_from.ru_utime - self.ru_stime = copy_from.ru_stime - self.db_txn_count = copy_from.db_txn_count - - self.db_txn_duration_sec = copy_from.db_txn_duration_sec - self.db_sched_duration_sec = copy_from.db_sched_duration_sec - self.evt_db_fetch_count = copy_from.evt_db_fetch_count - - def copy(self): - return ContextResourceUsage(copy_from=self) - - def reset(self): - self.ru_stime = 0.0 - self.ru_utime = 0.0 - self.db_txn_count = 0 - - self.db_txn_duration_sec = 0 - self.db_sched_duration_sec = 0 - self.evt_db_fetch_count = 0 - - def __repr__(self): - return ( - "" - ) % ( - self.ru_stime, - self.ru_utime, - self.db_txn_count, - self.db_txn_duration_sec, - self.db_sched_duration_sec, - self.evt_db_fetch_count, - ) - - def __iadd__(self, other): - """Add another ContextResourceUsage's stats to this one's. - - Args: - other (ContextResourceUsage): the other resource usage object - """ - self.ru_utime += other.ru_utime - self.ru_stime += other.ru_stime - self.db_txn_count += other.db_txn_count - self.db_txn_duration_sec += other.db_txn_duration_sec - self.db_sched_duration_sec += other.db_sched_duration_sec - self.evt_db_fetch_count += other.evt_db_fetch_count - return self - - def __isub__(self, other): - self.ru_utime -= other.ru_utime - self.ru_stime -= other.ru_stime - self.db_txn_count -= other.db_txn_count - self.db_txn_duration_sec -= other.db_txn_duration_sec - self.db_sched_duration_sec -= other.db_sched_duration_sec - self.evt_db_fetch_count -= other.evt_db_fetch_count - return self - - def __add__(self, other): - res = ContextResourceUsage(copy_from=self) - res += other - return res - - def __sub__(self, other): - res = ContextResourceUsage(copy_from=self) - res -= other - return res - - -class LoggingContext(object): - """Additional context for log formatting. Contexts are scoped within a - "with" block. - - If a parent is given when creating a new context, then: - - logging fields are copied from the parent to the new context on entry - - when the new context exits, the cpu usage stats are copied from the - child to the parent - - Args: - name (str): Name for the context for debugging. - parent_context (LoggingContext|None): The parent of the new context - """ - - __slots__ = [ - "previous_context", - "name", - "parent_context", - "_resource_usage", - "usage_start", - "main_thread", - "alive", - "request", - "tag", - ] - - thread_local = threading.local() - - class Sentinel(object): - """Sentinel to represent the root context""" - - __slots__ = [] - - def __str__(self): - return "sentinel" - - def copy_to(self, record): - pass - - def start(self): - pass - - def stop(self): - pass - - def add_database_transaction(self, duration_sec): - pass - - def add_database_scheduled(self, sched_sec): - pass - - def record_event_fetch(self, event_count): - pass - - def __nonzero__(self): - return False - - __bool__ = __nonzero__ # python3 - - sentinel = Sentinel() - - def __init__(self, name=None, parent_context=None, request=None): - self.previous_context = LoggingContext.current_context() - self.name = name - - # track the resources used by this context so far - self._resource_usage = ContextResourceUsage() - - # If alive has the thread resource usage when the logcontext last - # became active. - self.usage_start = None - - self.main_thread = get_thread_id() - self.request = None - self.tag = "" - self.alive = True - - self.parent_context = parent_context - - if self.parent_context is not None: - self.parent_context.copy_to(self) - - if request is not None: - # the request param overrides the request from the parent context - self.request = request - - def __str__(self): - if self.request: - return str(self.request) - return "%s@%x" % (self.name, id(self)) - - @classmethod - def current_context(cls): - """Get the current logging context from thread local storage - - Returns: - LoggingContext: the current logging context - """ - return getattr(cls.thread_local, "current_context", cls.sentinel) - - @classmethod - def set_current_context(cls, context): - """Set the current logging context in thread local storage - Args: - context(LoggingContext): The context to activate. - Returns: - The context that was previously active - """ - current = cls.current_context() - - if current is not context: - current.stop() - cls.thread_local.current_context = context - context.start() - return current - - def __enter__(self): - """Enters this logging context into thread local storage""" - old_context = self.set_current_context(self) - if self.previous_context != old_context: - logger.warn( - "Expected previous context %r, found %r", - self.previous_context, - old_context, - ) - self.alive = True - - return self - - def __exit__(self, type, value, traceback): - """Restore the logging context in thread local storage to the state it - was before this context was entered. - Returns: - None to avoid suppressing any exceptions that were thrown. - """ - current = self.set_current_context(self.previous_context) - if current is not self: - if current is self.sentinel: - logger.warning("Expected logging context %s was lost", self) - else: - logger.warning( - "Expected logging context %s but found %s", self, current - ) - self.previous_context = None - self.alive = False - - # if we have a parent, pass our CPU usage stats on - if self.parent_context is not None and hasattr( - self.parent_context, "_resource_usage" - ): - self.parent_context._resource_usage += self._resource_usage - - # reset them in case we get entered again - self._resource_usage.reset() - - def copy_to(self, record): - """Copy logging fields from this context to a log record or - another LoggingContext - """ - - # 'request' is the only field we currently use in the logger, so that's - # all we need to copy - record.request = self.request - - def start(self): - if get_thread_id() != self.main_thread: - logger.warning("Started logcontext %s on different thread", self) - return - - # If we haven't already started record the thread resource usage so - # far - if not self.usage_start: - self.usage_start = get_thread_resource_usage() - - def stop(self): - if get_thread_id() != self.main_thread: - logger.warning("Stopped logcontext %s on different thread", self) - return - - # When we stop, let's record the cpu used since we started - if not self.usage_start: - logger.warning("Called stop on logcontext %s without calling start", self) - return - - utime_delta, stime_delta = self._get_cputime() - self._resource_usage.ru_utime += utime_delta - self._resource_usage.ru_stime += stime_delta - - self.usage_start = None - - def get_resource_usage(self): - """Get resources used by this logcontext so far. - - Returns: - ContextResourceUsage: a *copy* of the object tracking resource - usage so far - """ - # we always return a copy, for consistency - res = self._resource_usage.copy() - - # If we are on the correct thread and we're currently running then we - # can include resource usage so far. - is_main_thread = get_thread_id() == self.main_thread - if self.alive and self.usage_start and is_main_thread: - utime_delta, stime_delta = self._get_cputime() - res.ru_utime += utime_delta - res.ru_stime += stime_delta - - return res - - def _get_cputime(self): - """Get the cpu usage time so far - - Returns: Tuple[float, float]: seconds in user mode, seconds in system mode - """ - current = get_thread_resource_usage() - - utime_delta = current.ru_utime - self.usage_start.ru_utime - stime_delta = current.ru_stime - self.usage_start.ru_stime - - # sanity check - if utime_delta < 0: - logger.error( - "utime went backwards! %f < %f", - current.ru_utime, - self.usage_start.ru_utime, - ) - utime_delta = 0 - - if stime_delta < 0: - logger.error( - "stime went backwards! %f < %f", - current.ru_stime, - self.usage_start.ru_stime, - ) - stime_delta = 0 - - return utime_delta, stime_delta - - def add_database_transaction(self, duration_sec): - if duration_sec < 0: - raise ValueError("DB txn time can only be non-negative") - self._resource_usage.db_txn_count += 1 - self._resource_usage.db_txn_duration_sec += duration_sec - - def add_database_scheduled(self, sched_sec): - """Record a use of the database pool - - Args: - sched_sec (float): number of seconds it took us to get a - connection - """ - if sched_sec < 0: - raise ValueError("DB scheduling time can only be non-negative") - self._resource_usage.db_sched_duration_sec += sched_sec - - def record_event_fetch(self, event_count): - """Record a number of events being fetched from the db - - Args: - event_count (int): number of events being fetched - """ - self._resource_usage.evt_db_fetch_count += event_count - - -class LoggingContextFilter(logging.Filter): - """Logging filter that adds values from the current logging context to each - record. - Args: - **defaults: Default values to avoid formatters complaining about - missing fields - """ - - def __init__(self, **defaults): - self.defaults = defaults - - def filter(self, record): - """Add each fields from the logging contexts to the record. - Returns: - True to include the record in the log output. - """ - context = LoggingContext.current_context() - for key, value in self.defaults.items(): - setattr(record, key, value) - - # context should never be None, but if it somehow ends up being, then - # we end up in a death spiral of infinite loops, so let's check, for - # robustness' sake. - if context is not None: - context.copy_to(record) - - return True - - -class PreserveLoggingContext(object): - """Captures the current logging context and restores it when the scope is - exited. Used to restore the context after a function using - @defer.inlineCallbacks is resumed by a callback from the reactor.""" - - __slots__ = ["current_context", "new_context", "has_parent"] - - def __init__(self, new_context=None): - if new_context is None: - new_context = LoggingContext.sentinel - self.new_context = new_context - - def __enter__(self): - """Captures the current logging context""" - self.current_context = LoggingContext.set_current_context(self.new_context) - - if self.current_context: - self.has_parent = self.current_context.previous_context is not None - if not self.current_context.alive: - logger.debug("Entering dead context: %s", self.current_context) - - def __exit__(self, type, value, traceback): - """Restores the current logging context""" - context = LoggingContext.set_current_context(self.current_context) - - if context != self.new_context: - if context is LoggingContext.sentinel: - logger.warning("Expected logging context %s was lost", self.new_context) - else: - logger.warning( - "Expected logging context %s but found %s", - self.new_context, - context, - ) - - if self.current_context is not LoggingContext.sentinel: - if not self.current_context.alive: - logger.debug("Restoring dead context: %s", self.current_context) - - -def nested_logging_context(suffix, parent_context=None): - """Creates a new logging context as a child of another. - - The nested logging context will have a 'request' made up of the parent context's - request, plus the given suffix. - - CPU/db usage stats will be added to the parent context's on exit. - - Normal usage looks like: - - with nested_logging_context(suffix): - # ... do stuff - - Args: - suffix (str): suffix to add to the parent context's 'request'. - parent_context (LoggingContext|None): parent context. Will use the current context - if None. - - Returns: - LoggingContext: new logging context. - """ - if parent_context is None: - parent_context = LoggingContext.current_context() - return LoggingContext( - parent_context=parent_context, request=parent_context.request + "-" + suffix - ) - - -def preserve_fn(f): - """Function decorator which wraps the function with run_in_background""" - - def g(*args, **kwargs): - return run_in_background(f, *args, **kwargs) - - return g - - -def run_in_background(f, *args, **kwargs): - """Calls a function, ensuring that the current context is restored after - return from the function, and that the sentinel context is set once the - deferred returned by the function completes. - - Useful for wrapping functions that return a deferred or coroutine, which you don't - yield or await on (for instance because you want to pass it to - deferred.gatherResults()). - - Note that if you completely discard the result, you should make sure that - `f` doesn't raise any deferred exceptions, otherwise a scary-looking - CRITICAL error about an unhandled error will be logged without much - indication about where it came from. - """ - current = LoggingContext.current_context() - try: - res = f(*args, **kwargs) - except: # noqa: E722 - # the assumption here is that the caller doesn't want to be disturbed - # by synchronous exceptions, so let's turn them into Failures. - return defer.fail() - - if isinstance(res, types.CoroutineType): - res = defer.ensureDeferred(res) - - if not isinstance(res, defer.Deferred): - return res - - if res.called and not res.paused: - # The function should have maintained the logcontext, so we can - # optimise out the messing about - return res - - # The function may have reset the context before returning, so - # we need to restore it now. - ctx = LoggingContext.set_current_context(current) - - # The original context will be restored when the deferred - # completes, but there is nothing waiting for it, so it will - # get leaked into the reactor or some other function which - # wasn't expecting it. We therefore need to reset the context - # here. - # - # (If this feels asymmetric, consider it this way: we are - # effectively forking a new thread of execution. We are - # probably currently within a ``with LoggingContext()`` block, - # which is supposed to have a single entry and exit point. But - # by spawning off another deferred, we are effectively - # adding a new exit point.) - res.addBoth(_set_context_cb, ctx) - return res - - -def make_deferred_yieldable(deferred): - """Given a deferred, make it follow the Synapse logcontext rules: - - If the deferred has completed (or is not actually a Deferred), essentially - does nothing (just returns another completed deferred with the - result/failure). - - If the deferred has not yet completed, resets the logcontext before - returning a deferred. Then, when the deferred completes, restores the - current logcontext before running callbacks/errbacks. - - (This is more-or-less the opposite operation to run_in_background.) - """ - if not isinstance(deferred, defer.Deferred): - return deferred - - if deferred.called and not deferred.paused: - # it looks like this deferred is ready to run any callbacks we give it - # immediately. We may as well optimise out the logcontext faffery. - return deferred - - # ok, we can't be sure that a yield won't block, so let's reset the - # logcontext, and add a callback to the deferred to restore it. - prev_context = LoggingContext.set_current_context(LoggingContext.sentinel) - deferred.addBoth(_set_context_cb, prev_context) - return deferred - - -def _set_context_cb(result, context): - """A callback function which just sets the logging context""" - LoggingContext.set_current_context(context) - return result - - -def defer_to_thread(reactor, f, *args, **kwargs): - """ - Calls the function `f` using a thread from the reactor's default threadpool and - returns the result as a Deferred. - - Creates a new logcontext for `f`, which is created as a child of the current - logcontext (so its CPU usage metrics will get attributed to the current - logcontext). `f` should preserve the logcontext it is given. - - The result deferred follows the Synapse logcontext rules: you should `yield` - on it. - - Args: - reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread - the Deferred will be invoked, and whose threadpool we should use for the - function. - - Normally this will be hs.get_reactor(). - - f (callable): The function to call. - - args: positional arguments to pass to f. - - kwargs: keyword arguments to pass to f. - - Returns: - Deferred: A Deferred which fires a callback with the result of `f`, or an - errback if `f` throws an exception. - """ - return defer_to_threadpool(reactor, reactor.getThreadPool(), f, *args, **kwargs) - - -def defer_to_threadpool(reactor, threadpool, f, *args, **kwargs): - """ - A wrapper for twisted.internet.threads.deferToThreadpool, which handles - logcontexts correctly. - - Calls the function `f` using a thread from the given threadpool and returns - the result as a Deferred. - - Creates a new logcontext for `f`, which is created as a child of the current - logcontext (so its CPU usage metrics will get attributed to the current - logcontext). `f` should preserve the logcontext it is given. - - The result deferred follows the Synapse logcontext rules: you should `yield` - on it. - - Args: - reactor (twisted.internet.base.ReactorBase): The reactor in whose main thread - the Deferred will be invoked. Normally this will be hs.get_reactor(). - - threadpool (twisted.python.threadpool.ThreadPool): The threadpool to use for - running `f`. Normally this will be hs.get_reactor().getThreadPool(). - - f (callable): The function to call. - - args: positional arguments to pass to f. - - kwargs: keyword arguments to pass to f. - - Returns: - Deferred: A Deferred which fires a callback with the result of `f`, or an - errback if `f` throws an exception. - """ - logcontext = LoggingContext.current_context() - - def g(): - with LoggingContext(parent_context=logcontext): - return f(*args, **kwargs) - - return make_deferred_yieldable(threads.deferToThreadPool(reactor, threadpool, g)) diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py deleted file mode 100644 index fbf570c756..0000000000 --- a/synapse/util/logformatter.py +++ /dev/null @@ -1,53 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2017 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import logging -import traceback - -from six import StringIO - - -class LogFormatter(logging.Formatter): - """Log formatter which gives more detail for exceptions - - This is the same as the standard log formatter, except that when logging - exceptions [typically via log.foo("msg", exc_info=1)], it prints the - sequence that led up to the point at which the exception was caught. - (Normally only stack frames between the point the exception was raised and - where it was caught are logged). - """ - - def __init__(self, *args, **kwargs): - super(LogFormatter, self).__init__(*args, **kwargs) - - def formatException(self, ei): - sio = StringIO() - (typ, val, tb) = ei - - # log the stack above the exception capture point if possible, but - # check that we actually have an f_back attribute to work around - # https://twistedmatrix.com/trac/ticket/9305 - - if tb and hasattr(tb.tb_frame, "f_back"): - sio.write("Capture point (most recent call last):\n") - traceback.print_stack(tb.tb_frame.f_back, None, sio) - - traceback.print_exception(typ, val, tb, None, sio) - s = sio.getvalue() - sio.close() - if s[-1:] == "\n": - s = s[:-1] - return s diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py deleted file mode 100644 index 7df0fa6087..0000000000 --- a/synapse/util/logutils.py +++ /dev/null @@ -1,194 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -import inspect -import logging -import time -from functools import wraps -from inspect import getcallargs - -from six import PY3 - -_TIME_FUNC_ID = 0 - - -def _log_debug_as_f(f, msg, msg_args): - name = f.__module__ - logger = logging.getLogger(name) - - if logger.isEnabledFor(logging.DEBUG): - if PY3: - lineno = f.__code__.co_firstlineno - pathname = f.__code__.co_filename - else: - lineno = f.func_code.co_firstlineno - pathname = f.func_code.co_filename - - record = logging.LogRecord( - name=name, - level=logging.DEBUG, - pathname=pathname, - lineno=lineno, - msg=msg, - args=msg_args, - exc_info=None, - ) - - logger.handle(record) - - -def log_function(f): - """ Function decorator that logs every call to that function. - """ - func_name = f.__name__ - - @wraps(f) - def wrapped(*args, **kwargs): - name = f.__module__ - logger = logging.getLogger(name) - level = logging.DEBUG - - if logger.isEnabledFor(level): - bound_args = getcallargs(f, *args, **kwargs) - - def format(value): - r = str(value) - if len(r) > 50: - r = r[:50] + "..." - return r - - func_args = ["%s=%s" % (k, format(v)) for k, v in bound_args.items()] - - msg_args = {"func_name": func_name, "args": ", ".join(func_args)} - - _log_debug_as_f(f, "Invoked '%(func_name)s' with args: %(args)s", msg_args) - - return f(*args, **kwargs) - - wrapped.__name__ = func_name - return wrapped - - -def time_function(f): - func_name = f.__name__ - - @wraps(f) - def wrapped(*args, **kwargs): - global _TIME_FUNC_ID - id = _TIME_FUNC_ID - _TIME_FUNC_ID += 1 - - start = time.clock() - - try: - _log_debug_as_f(f, "[FUNC START] {%s-%d}", (func_name, id)) - - r = f(*args, **kwargs) - finally: - end = time.clock() - _log_debug_as_f( - f, "[FUNC END] {%s-%d} %.3f sec", (func_name, id, end - start) - ) - - return r - - return wrapped - - -def trace_function(f): - func_name = f.__name__ - linenum = f.func_code.co_firstlineno - pathname = f.func_code.co_filename - - @wraps(f) - def wrapped(*args, **kwargs): - name = f.__module__ - logger = logging.getLogger(name) - level = logging.DEBUG - - s = inspect.currentframe().f_back - - to_print = [ - "\t%s:%s %s. Args: args=%s, kwargs=%s" - % (pathname, linenum, func_name, args, kwargs) - ] - while s: - if True or s.f_globals["__name__"].startswith("synapse"): - filename, lineno, function, _, _ = inspect.getframeinfo(s) - args_string = inspect.formatargvalues(*inspect.getargvalues(s)) - - to_print.append( - "\t%s:%d %s. Args: %s" % (filename, lineno, function, args_string) - ) - - s = s.f_back - - msg = "\nTraceback for %s:\n" % (func_name,) + "\n".join(to_print) - - record = logging.LogRecord( - name=name, - level=level, - pathname=pathname, - lineno=lineno, - msg=msg, - args=None, - exc_info=None, - ) - - logger.handle(record) - - return f(*args, **kwargs) - - wrapped.__name__ = func_name - return wrapped - - -def get_previous_frames(): - s = inspect.currentframe().f_back.f_back - to_return = [] - while s: - if s.f_globals["__name__"].startswith("synapse"): - filename, lineno, function, _, _ = inspect.getframeinfo(s) - args_string = inspect.formatargvalues(*inspect.getargvalues(s)) - - to_return.append( - "{{ %s:%d %s - Args: %s }}" % (filename, lineno, function, args_string) - ) - - s = s.f_back - - return ", ".join(to_return) - - -def get_previous_frame(ignore=[]): - s = inspect.currentframe().f_back.f_back - - while s: - if s.f_globals["__name__"].startswith("synapse"): - if not any(s.f_globals["__name__"].startswith(ig) for ig in ignore): - filename, lineno, function, _, _ = inspect.getframeinfo(s) - args_string = inspect.formatargvalues(*inspect.getargvalues(s)) - - return "{{ %s:%d %s - Args: %s }}" % ( - filename, - lineno, - function, - args_string, - ) - - s = s.f_back - - return None diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 01284d3cf8..c30b6de19c 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -20,8 +20,8 @@ from prometheus_client import Counter from twisted.internet import defer +from synapse.logging.context import LoggingContext from synapse.metrics import InFlightGauge -from synapse.util.logcontext import LoggingContext logger = logging.getLogger(__name__) diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 06defa8199..27bceac00e 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -20,7 +20,7 @@ import logging from twisted.internet import defer from synapse.api.errors import LimitExceededError -from synapse.util.logcontext import ( +from synapse.logging.context import ( PreserveLoggingContext, make_deferred_yieldable, run_in_background, diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index 1a77456498..d8d0ceae51 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -17,7 +17,7 @@ import random from twisted.internet import defer -import synapse.util.logcontext +import synapse.logging.context from synapse.api.errors import CodeMessageException logger = logging.getLogger(__name__) @@ -225,4 +225,4 @@ class RetryDestinationLimiter(object): logger.exception("Failed to store destination_retry_timings") # we deliberately do this in the background. - synapse.util.logcontext.run_in_background(store_retry_timings) + synapse.logging.context.run_in_background(store_retry_timings) diff --git a/tests/appservice/test_scheduler.py b/tests/appservice/test_scheduler.py index db9f86bdac..04b8c2c07c 100644 --- a/tests/appservice/test_scheduler.py +++ b/tests/appservice/test_scheduler.py @@ -22,7 +22,7 @@ from synapse.appservice.scheduler import ( _ServiceQueuer, _TransactionController, ) -from synapse.util.logcontext import make_deferred_yieldable +from synapse.logging.context import make_deferred_yieldable from tests import unittest diff --git a/tests/crypto/test_keyring.py b/tests/crypto/test_keyring.py index 5a355f00cc..795703967d 100644 --- a/tests/crypto/test_keyring.py +++ b/tests/crypto/test_keyring.py @@ -30,9 +30,12 @@ from synapse.crypto.keyring import ( ServerKeyFetcher, StoreKeyFetcher, ) +from synapse.logging.context import ( + LoggingContext, + PreserveLoggingContext, + make_deferred_yieldable, +) from synapse.storage.keys import FetchKeyResult -from synapse.util import logcontext -from synapse.util.logcontext import LoggingContext from tests import unittest @@ -131,7 +134,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): @defer.inlineCallbacks def get_perspectives(**kwargs): self.assertEquals(LoggingContext.current_context().request, "11") - with logcontext.PreserveLoggingContext(): + with PreserveLoggingContext(): yield persp_deferred defer.returnValue(persp_resp) @@ -158,7 +161,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): self.assertFalse(res_deferreds[0].called) res_deferreds[0].addBoth(self.check_context, None) - yield logcontext.make_deferred_yieldable(res_deferreds[0]) + yield make_deferred_yieldable(res_deferreds[0]) # let verify_json_objects_for_server finish its work before we kill the # logcontext @@ -184,7 +187,7 @@ class KeyringTestCase(unittest.HomeserverTestCase): [("server10", json1, 0, "test")] ) res_deferreds_2[0].addBoth(self.check_context, None) - yield logcontext.make_deferred_yieldable(res_deferreds_2[0]) + yield make_deferred_yieldable(res_deferreds_2[0]) # let verify_json_objects_for_server finish its work before we kill the # logcontext diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py index 417fda3ab2..a49f9b3224 100644 --- a/tests/http/federation/test_matrix_federation_agent.py +++ b/tests/http/federation/test_matrix_federation_agent.py @@ -36,8 +36,8 @@ from synapse.http.federation.matrix_federation_agent import ( _cache_period_from_headers, ) from synapse.http.federation.srv_resolver import Server +from synapse.logging.context import LoggingContext from synapse.util.caches.ttlcache import TTLCache -from synapse.util.logcontext import LoggingContext from tests.http import TestServerTLSConnectionFactory, get_test_ca_cert_file from tests.server import FakeTransport, ThreadedMemoryReactorClock diff --git a/tests/http/federation/test_srv_resolver.py b/tests/http/federation/test_srv_resolver.py index cf6c6e95b5..65b51dc981 100644 --- a/tests/http/federation/test_srv_resolver.py +++ b/tests/http/federation/test_srv_resolver.py @@ -22,7 +22,7 @@ from twisted.internet.error import ConnectError from twisted.names import dns, error from synapse.http.federation.srv_resolver import SrvResolver -from synapse.util.logcontext import LoggingContext +from synapse.logging.context import LoggingContext from tests import unittest from tests.utils import MockClock diff --git a/tests/http/test_fedclient.py b/tests/http/test_fedclient.py index c4c0d9b968..b9d6d7ad1c 100644 --- a/tests/http/test_fedclient.py +++ b/tests/http/test_fedclient.py @@ -29,7 +29,7 @@ from synapse.http.matrixfederationclient import ( MatrixFederationHttpClient, MatrixFederationRequest, ) -from synapse.util.logcontext import LoggingContext +from synapse.logging.context import LoggingContext from tests.server import FakeTransport from tests.unittest import HomeserverTestCase diff --git a/tests/patch_inline_callbacks.py b/tests/patch_inline_callbacks.py index ee0add3455..220884311c 100644 --- a/tests/patch_inline_callbacks.py +++ b/tests/patch_inline_callbacks.py @@ -28,7 +28,7 @@ def do_patch(): Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit """ - from synapse.util.logcontext import LoggingContext + from synapse.logging.context import LoggingContext orig_inline_callbacks = defer.inlineCallbacks diff --git a/tests/push/test_http.py b/tests/push/test_http.py index 22c3f73ef3..8ce6bb62da 100644 --- a/tests/push/test_http.py +++ b/tests/push/test_http.py @@ -18,8 +18,8 @@ from mock import Mock from twisted.internet.defer import Deferred import synapse.rest.admin +from synapse.logging.context import make_deferred_yieldable from synapse.rest.client.v1 import login, room -from synapse.util.logcontext import make_deferred_yieldable from tests.unittest import HomeserverTestCase diff --git a/tests/rest/client/test_transactions.py b/tests/rest/client/test_transactions.py index 708dc26e61..a8adc9a61d 100644 --- a/tests/rest/client/test_transactions.py +++ b/tests/rest/client/test_transactions.py @@ -2,9 +2,9 @@ from mock import Mock, call from twisted.internet import defer, reactor +from synapse.logging.context import LoggingContext from synapse.rest.client.transactions import CLEANUP_PERIOD_MS, HttpTransactionCache from synapse.util import Clock -from synapse.util.logcontext import LoggingContext from tests import unittest from tests.utils import MockClock diff --git a/tests/rest/media/v1/test_media_storage.py b/tests/rest/media/v1/test_media_storage.py index 39c9342423..bc662b61db 100644 --- a/tests/rest/media/v1/test_media_storage.py +++ b/tests/rest/media/v1/test_media_storage.py @@ -24,11 +24,11 @@ from six.moves.urllib import parse from twisted.internet.defer import Deferred +from synapse.logging.context import make_deferred_yieldable from synapse.rest.media.v1._base import FileInfo from synapse.rest.media.v1.filepath import MediaFilePaths from synapse.rest.media.v1.media_storage import MediaStorage from synapse.rest.media.v1.storage_provider import FileStorageProviderBackend -from synapse.util.logcontext import make_deferred_yieldable from tests import unittest diff --git a/tests/test_federation.py b/tests/test_federation.py index 6a8339b561..a73f18f88e 100644 --- a/tests/test_federation.py +++ b/tests/test_federation.py @@ -3,9 +3,9 @@ from mock import Mock from twisted.internet.defer import maybeDeferred, succeed from synapse.events import FrozenEvent +from synapse.logging.context import LoggingContext from synapse.types import Requester, UserID from synapse.util import Clock -from synapse.util.logcontext import LoggingContext from tests import unittest from tests.server import ThreadedMemoryReactorClock, setup_test_homeserver diff --git a/tests/test_server.py b/tests/test_server.py index da29ae92ce..ba08483a4b 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -26,8 +26,8 @@ from twisted.web.server import NOT_DONE_YET from synapse.api.errors import Codes, SynapseError from synapse.http.server import JsonResource from synapse.http.site import SynapseSite, logger +from synapse.logging.context import make_deferred_yieldable from synapse.util import Clock -from synapse.util.logcontext import make_deferred_yieldable from tests import unittest from tests.server import ( diff --git a/tests/test_utils/logging_setup.py b/tests/test_utils/logging_setup.py index 813f984199..2d96b0fa8d 100644 --- a/tests/test_utils/logging_setup.py +++ b/tests/test_utils/logging_setup.py @@ -17,7 +17,7 @@ import os import twisted.logger -from synapse.util.logcontext import LoggingContextFilter +from synapse.logging.context import LoggingContextFilter class ToTwistedHandler(logging.Handler): diff --git a/tests/unittest.py b/tests/unittest.py index d26804b5b5..a09e76c7c2 100644 --- a/tests/unittest.py +++ b/tests/unittest.py @@ -33,9 +33,9 @@ from synapse.api.constants import EventTypes from synapse.config.homeserver import HomeServerConfig from synapse.http.server import JsonResource from synapse.http.site import SynapseRequest +from synapse.logging.context import LoggingContext from synapse.server import HomeServer from synapse.types import Requester, UserID, create_requester -from synapse.util.logcontext import LoggingContext from tests.server import get_clock, make_request, render, setup_test_homeserver from tests.test_utils.logging_setup import setup_logging diff --git a/tests/util/caches/test_descriptors.py b/tests/util/caches/test_descriptors.py index 6f8f52537c..7807328e2f 100644 --- a/tests/util/caches/test_descriptors.py +++ b/tests/util/caches/test_descriptors.py @@ -21,7 +21,11 @@ import mock from twisted.internet import defer, reactor from synapse.api.errors import SynapseError -from synapse.util import logcontext +from synapse.logging.context import ( + LoggingContext, + PreserveLoggingContext, + make_deferred_yieldable, +) from synapse.util.caches import descriptors from tests import unittest @@ -32,7 +36,7 @@ logger = logging.getLogger(__name__) def run_on_reactor(): d = defer.Deferred() reactor.callLater(0, d.callback, 0) - return logcontext.make_deferred_yieldable(d) + return make_deferred_yieldable(d) class CacheTestCase(unittest.TestCase): @@ -153,7 +157,7 @@ class DescriptorTestCase(unittest.TestCase): def fn(self, arg1): @defer.inlineCallbacks def inner_fn(): - with logcontext.PreserveLoggingContext(): + with PreserveLoggingContext(): yield complete_lookup defer.returnValue(1) @@ -161,10 +165,10 @@ class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def do_lookup(): - with logcontext.LoggingContext() as c1: + with LoggingContext() as c1: c1.name = "c1" r = yield obj.fn(1) - self.assertEqual(logcontext.LoggingContext.current_context(), c1) + self.assertEqual(LoggingContext.current_context(), c1) defer.returnValue(r) def check_result(r): @@ -174,18 +178,12 @@ class DescriptorTestCase(unittest.TestCase): # set off a deferred which will do a cache lookup d1 = do_lookup() - self.assertEqual( - logcontext.LoggingContext.current_context(), - logcontext.LoggingContext.sentinel, - ) + self.assertEqual(LoggingContext.current_context(), LoggingContext.sentinel) d1.addCallback(check_result) # and another d2 = do_lookup() - self.assertEqual( - logcontext.LoggingContext.current_context(), - logcontext.LoggingContext.sentinel, - ) + self.assertEqual(LoggingContext.current_context(), LoggingContext.sentinel) d2.addCallback(check_result) # let the lookup complete @@ -210,29 +208,25 @@ class DescriptorTestCase(unittest.TestCase): @defer.inlineCallbacks def do_lookup(): - with logcontext.LoggingContext() as c1: + with LoggingContext() as c1: c1.name = "c1" try: d = obj.fn(1) self.assertEqual( - logcontext.LoggingContext.current_context(), - logcontext.LoggingContext.sentinel, + LoggingContext.current_context(), LoggingContext.sentinel ) yield d self.fail("No exception thrown") except SynapseError: pass - self.assertEqual(logcontext.LoggingContext.current_context(), c1) + self.assertEqual(LoggingContext.current_context(), c1) obj = Cls() # set off a deferred which will do a cache lookup d1 = do_lookup() - self.assertEqual( - logcontext.LoggingContext.current_context(), - logcontext.LoggingContext.sentinel, - ) + self.assertEqual(LoggingContext.current_context(), LoggingContext.sentinel) return d1 @@ -288,23 +282,20 @@ class CachedListDescriptorTestCase(unittest.TestCase): @descriptors.cachedList("fn", "args1", inlineCallbacks=True) def list_fn(self, args1, arg2): - assert logcontext.LoggingContext.current_context().request == "c1" + assert LoggingContext.current_context().request == "c1" # we want this to behave like an asynchronous function yield run_on_reactor() - assert logcontext.LoggingContext.current_context().request == "c1" + assert LoggingContext.current_context().request == "c1" defer.returnValue(self.mock(args1, arg2)) - with logcontext.LoggingContext() as c1: + with LoggingContext() as c1: c1.request = "c1" obj = Cls() obj.mock.return_value = {10: "fish", 20: "chips"} d1 = obj.list_fn([10, 20], 2) - self.assertEqual( - logcontext.LoggingContext.current_context(), - logcontext.LoggingContext.sentinel, - ) + self.assertEqual(LoggingContext.current_context(), LoggingContext.sentinel) r = yield d1 - self.assertEqual(logcontext.LoggingContext.current_context(), c1) + self.assertEqual(LoggingContext.current_context(), c1) obj.mock.assert_called_once_with([10, 20], 2) self.assertEqual(r, {10: "fish", 20: "chips"}) obj.mock.reset_mock() diff --git a/tests/util/test_async_utils.py b/tests/util/test_async_utils.py index bf85d3b8ec..f60918069a 100644 --- a/tests/util/test_async_utils.py +++ b/tests/util/test_async_utils.py @@ -16,9 +16,8 @@ from twisted.internet import defer from twisted.internet.defer import CancelledError, Deferred from twisted.internet.task import Clock -from synapse.util import logcontext +from synapse.logging.context import LoggingContext, PreserveLoggingContext from synapse.util.async_helpers import timeout_deferred -from synapse.util.logcontext import LoggingContext from tests.unittest import TestCase @@ -69,14 +68,14 @@ class TimeoutDeferredTest(TestCase): @defer.inlineCallbacks def blocking(): non_completing_d = Deferred() - with logcontext.PreserveLoggingContext(): + with PreserveLoggingContext(): try: yield non_completing_d except CancelledError: blocking_was_cancelled[0] = True raise - with logcontext.LoggingContext("one") as context_one: + with LoggingContext("one") as context_one: # the errbacks should be run in the test logcontext def errback(res, deferred_name): self.assertIs( diff --git a/tests/util/test_linearizer.py b/tests/util/test_linearizer.py index ec7ba9719c..0ec8ef90ce 100644 --- a/tests/util/test_linearizer.py +++ b/tests/util/test_linearizer.py @@ -19,7 +19,8 @@ from six.moves import range from twisted.internet import defer, reactor from twisted.internet.defer import CancelledError -from synapse.util import Clock, logcontext +from synapse.logging.context import LoggingContext +from synapse.util import Clock from synapse.util.async_helpers import Linearizer from tests import unittest @@ -51,13 +52,13 @@ class LinearizerTestCase(unittest.TestCase): @defer.inlineCallbacks def func(i, sleep=False): - with logcontext.LoggingContext("func(%s)" % i) as lc: + with LoggingContext("func(%s)" % i) as lc: with (yield linearizer.queue("")): - self.assertEqual(logcontext.LoggingContext.current_context(), lc) + self.assertEqual(LoggingContext.current_context(), lc) if sleep: yield Clock(reactor).sleep(0) - self.assertEqual(logcontext.LoggingContext.current_context(), lc) + self.assertEqual(LoggingContext.current_context(), lc) func(0, sleep=True) for i in range(1, 100): diff --git a/tests/util/test_logcontext.py b/tests/util/test_logcontext.py index 8d69fbf111..8b8455c8b7 100644 --- a/tests/util/test_logcontext.py +++ b/tests/util/test_logcontext.py @@ -1,8 +1,14 @@ import twisted.python.failure from twisted.internet import defer, reactor -from synapse.util import Clock, logcontext -from synapse.util.logcontext import LoggingContext +from synapse.logging.context import ( + LoggingContext, + PreserveLoggingContext, + make_deferred_yieldable, + nested_logging_context, + run_in_background, +) +from synapse.util import Clock from .. import unittest @@ -43,7 +49,7 @@ class LoggingContextTestCase(unittest.TestCase): context_one.request = "one" # fire off function, but don't wait on it. - d2 = logcontext.run_in_background(function) + d2 = run_in_background(function) def cb(res): callback_completed[0] = True @@ -85,7 +91,7 @@ class LoggingContextTestCase(unittest.TestCase): def test_run_in_background_with_non_blocking_fn(self): @defer.inlineCallbacks def nonblocking_function(): - with logcontext.PreserveLoggingContext(): + with PreserveLoggingContext(): yield defer.succeed(None) return self._test_run_in_background(nonblocking_function) @@ -94,7 +100,7 @@ class LoggingContextTestCase(unittest.TestCase): # a function which returns a deferred which looks like it has been # called, but is actually paused def testfunc(): - return logcontext.make_deferred_yieldable(_chained_deferred_function()) + return make_deferred_yieldable(_chained_deferred_function()) return self._test_run_in_background(testfunc) @@ -128,7 +134,7 @@ class LoggingContextTestCase(unittest.TestCase): with LoggingContext() as context_one: context_one.request = "one" - d1 = logcontext.make_deferred_yieldable(blocking_function()) + d1 = make_deferred_yieldable(blocking_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(LoggingContext.current_context(), sentinel_context) @@ -144,7 +150,7 @@ class LoggingContextTestCase(unittest.TestCase): with LoggingContext() as context_one: context_one.request = "one" - d1 = logcontext.make_deferred_yieldable(_chained_deferred_function()) + d1 = make_deferred_yieldable(_chained_deferred_function()) # make sure that the context was reset by make_deferred_yieldable self.assertIs(LoggingContext.current_context(), sentinel_context) @@ -161,7 +167,7 @@ class LoggingContextTestCase(unittest.TestCase): with LoggingContext() as context_one: context_one.request = "one" - d1 = logcontext.make_deferred_yieldable("bum") + d1 = make_deferred_yieldable("bum") self._check_test_key("one") r = yield d1 @@ -170,7 +176,7 @@ class LoggingContextTestCase(unittest.TestCase): def test_nested_logging_context(self): with LoggingContext(request="foo"): - nested_context = logcontext.nested_logging_context(suffix="bar") + nested_context = nested_logging_context(suffix="bar") self.assertEqual(nested_context.request, "foo-bar") diff --git a/tests/util/test_logformatter.py b/tests/util/test_logformatter.py index 297aebbfbe..0fb60caacb 100644 --- a/tests/util/test_logformatter.py +++ b/tests/util/test_logformatter.py @@ -14,7 +14,7 @@ # limitations under the License. import sys -from synapse.util.logformatter import LogFormatter +from synapse.logging.formatter import LogFormatter from tests import unittest diff --git a/tests/utils.py b/tests/utils.py index da43166f3a..d8e55b0801 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -34,6 +34,7 @@ from synapse.config.homeserver import HomeServerConfig from synapse.config.server import DEFAULT_ROOM_VERSION from synapse.federation.transport import server as federation_server from synapse.http.server import HttpServer +from synapse.logging.context import LoggingContext from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import PostgresEngine, create_engine @@ -42,7 +43,6 @@ from synapse.storage.prepare_database import ( _setup_new_database, prepare_database, ) -from synapse.util.logcontext import LoggingContext from synapse.util.ratelimitutils import FederationRateLimiter # set this to True to run the tests against postgres instead of sqlite. -- cgit 1.4.1 From 1ee268d33d3e4eb40270eb169ca5266843481b41 Mon Sep 17 00:00:00 2001 From: Amber Brown Date: Fri, 5 Jul 2019 02:32:02 +1000 Subject: Improve the backwards compatibility re-exports of synapse.logging.context (#5617) * Improve the backwards compatibility re-exports of synapse.logging.context. * reexport logformatter too --- changelog.d/5617.misc | 1 + synapse/util/__init__.py | 6 +----- synapse/util/logcontext.py | 39 +++++++++++++++++++++++++++++++++++++++ synapse/util/logformatter.py | 21 +++++++++++++++++++++ 4 files changed, 62 insertions(+), 5 deletions(-) create mode 100644 changelog.d/5617.misc create mode 100644 synapse/util/logcontext.py create mode 100644 synapse/util/logformatter.py (limited to 'synapse/util') diff --git a/changelog.d/5617.misc b/changelog.d/5617.misc new file mode 100644 index 0000000000..bb3c028167 --- /dev/null +++ b/changelog.d/5617.misc @@ -0,0 +1 @@ +Move logging code out of `synapse.util` and into `synapse.logging`. diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index c6d2ce4404..f506b2a695 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -21,14 +21,10 @@ import attr from twisted.internet import defer, task -from synapse.logging import context, formatter +from synapse.logging import context logger = logging.getLogger(__name__) -# Compatibility alias, for existing logconfigs. -logcontext = context -logformatter = formatter - def unwrapFirstError(failure): # defer.gatherResults and DeferredLists wrap failures. diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py new file mode 100644 index 0000000000..40e5c10a49 --- /dev/null +++ b/synapse/util/logcontext.py @@ -0,0 +1,39 @@ +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Backwards compatibility re-exports of ``synapse.logging.context`` functionality. +""" + +from synapse.logging.context import ( + LoggingContext, + LoggingContextFilter, + PreserveLoggingContext, + defer_to_thread, + make_deferred_yieldable, + nested_logging_context, + preserve_fn, + run_in_background, +) + +__all__ = [ + "defer_to_thread", + "LoggingContext", + "LoggingContextFilter", + "make_deferred_yieldable", + "nested_logging_context", + "preserve_fn", + "PreserveLoggingContext", + "run_in_background", +] diff --git a/synapse/util/logformatter.py b/synapse/util/logformatter.py new file mode 100644 index 0000000000..320e8f8174 --- /dev/null +++ b/synapse/util/logformatter.py @@ -0,0 +1,21 @@ +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Backwards compatibility re-exports of ``synapse.logging.formatter`` functionality. +""" + +from synapse.logging.formatter import LogFormatter + +__all__ = ["LogFormatter"] -- cgit 1.4.1 From 9481707a52a89022bc5d99c181ceb9e7ec9ec9e9 Mon Sep 17 00:00:00 2001 From: Richard van der Hoff <1389908+richvdh@users.noreply.github.com> Date: Fri, 5 Jul 2019 11:10:19 +0100 Subject: Fixes to the federation rate limiter (#5621) - Put the default window_size back to 1000ms (broken by #5181) - Make the `rc_federation` config actually do something - fix an off-by-one error in the 'concurrent' limit - Avoid creating an unused `_PerHostRatelimiter` object for every single incoming request --- changelog.d/5621.bugfix | 1 + synapse/config/ratelimiting.py | 4 +- synapse/util/ratelimitutils.py | 16 +++---- tests/config/test_ratelimiting.py | 40 ++++++++++++++++ tests/util/test_ratelimitutils.py | 97 +++++++++++++++++++++++++++++++++++++++ tests/utils.py | 6 --- 6 files changed, 148 insertions(+), 16 deletions(-) create mode 100644 changelog.d/5621.bugfix create mode 100644 tests/config/test_ratelimiting.py create mode 100644 tests/util/test_ratelimitutils.py (limited to 'synapse/util') diff --git a/changelog.d/5621.bugfix b/changelog.d/5621.bugfix new file mode 100644 index 0000000000..f1a2851f45 --- /dev/null +++ b/changelog.d/5621.bugfix @@ -0,0 +1 @@ +Various minor fixes to the federation request rate limiter. diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py index 8c587f3fd2..33f31cf213 100644 --- a/synapse/config/ratelimiting.py +++ b/synapse/config/ratelimiting.py @@ -23,7 +23,7 @@ class RateLimitConfig(object): class FederationRateLimitConfig(object): _items_and_default = { - "window_size": 10000, + "window_size": 1000, "sleep_limit": 10, "sleep_delay": 500, "reject_limit": 50, @@ -54,7 +54,7 @@ class RatelimitConfig(Config): # Load the new-style federation config, if it exists. Otherwise, fall # back to the old method. - if "federation_rc" in config: + if "rc_federation" in config: self.rc_federation = FederationRateLimitConfig(**config["rc_federation"]) else: self.rc_federation = FederationRateLimitConfig( diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 27bceac00e..5ca4521ce3 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -36,9 +36,11 @@ class FederationRateLimiter(object): clock (Clock) config (FederationRateLimitConfig) """ - self.clock = clock - self._config = config - self.ratelimiters = {} + + def new_limiter(): + return _PerHostRatelimiter(clock=clock, config=config) + + self.ratelimiters = collections.defaultdict(new_limiter) def ratelimit(self, host): """Used to ratelimit an incoming request from given host @@ -53,11 +55,9 @@ class FederationRateLimiter(object): host (str): Origin of incoming request. Returns: - _PerHostRatelimiter + context manager which returns a deferred. """ - return self.ratelimiters.setdefault( - host, _PerHostRatelimiter(clock=self.clock, config=self._config) - ).ratelimit() + return self.ratelimiters[host].ratelimit() class _PerHostRatelimiter(object): @@ -122,7 +122,7 @@ class _PerHostRatelimiter(object): self.request_times.append(time_now) def queue_request(): - if len(self.current_processing) > self.concurrent_requests: + if len(self.current_processing) >= self.concurrent_requests: queue_defer = defer.Deferred() self.ready_request_queue[request_id] = queue_defer logger.info( diff --git a/tests/config/test_ratelimiting.py b/tests/config/test_ratelimiting.py new file mode 100644 index 0000000000..13ab282384 --- /dev/null +++ b/tests/config/test_ratelimiting.py @@ -0,0 +1,40 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.config.homeserver import HomeServerConfig + +from tests.unittest import TestCase +from tests.utils import default_config + + +class RatelimitConfigTestCase(TestCase): + def test_parse_rc_federation(self): + config_dict = default_config("test") + config_dict["rc_federation"] = { + "window_size": 20000, + "sleep_limit": 693, + "sleep_delay": 252, + "reject_limit": 198, + "concurrent": 7, + } + + config = HomeServerConfig() + config.parse_config_dict(config_dict, "", "") + config_obj = config.rc_federation + + self.assertEqual(config_obj.window_size, 20000) + self.assertEqual(config_obj.sleep_limit, 693) + self.assertEqual(config_obj.sleep_delay, 252) + self.assertEqual(config_obj.reject_limit, 198) + self.assertEqual(config_obj.concurrent, 7) diff --git a/tests/util/test_ratelimitutils.py b/tests/util/test_ratelimitutils.py new file mode 100644 index 0000000000..4d1aee91d5 --- /dev/null +++ b/tests/util/test_ratelimitutils.py @@ -0,0 +1,97 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from synapse.config.homeserver import HomeServerConfig +from synapse.util.ratelimitutils import FederationRateLimiter + +from tests.server import get_clock +from tests.unittest import TestCase +from tests.utils import default_config + + +class FederationRateLimiterTestCase(TestCase): + def test_ratelimit(self): + """A simple test with the default values""" + reactor, clock = get_clock() + rc_config = build_rc_config() + ratelimiter = FederationRateLimiter(clock, rc_config) + + with ratelimiter.ratelimit("testhost") as d1: + # shouldn't block + self.successResultOf(d1) + + def test_concurrent_limit(self): + """Test what happens when we hit the concurrent limit""" + reactor, clock = get_clock() + rc_config = build_rc_config({"rc_federation": {"concurrent": 2}}) + ratelimiter = FederationRateLimiter(clock, rc_config) + + with ratelimiter.ratelimit("testhost") as d1: + # shouldn't block + self.successResultOf(d1) + + cm2 = ratelimiter.ratelimit("testhost") + d2 = cm2.__enter__() + # also shouldn't block + self.successResultOf(d2) + + cm3 = ratelimiter.ratelimit("testhost") + d3 = cm3.__enter__() + # this one should block, though ... + self.assertNoResult(d3) + + # ... until we complete an earlier request + cm2.__exit__(None, None, None) + self.successResultOf(d3) + + def test_sleep_limit(self): + """Test what happens when we hit the sleep limit""" + reactor, clock = get_clock() + rc_config = build_rc_config( + {"rc_federation": {"sleep_limit": 2, "sleep_delay": 500}} + ) + ratelimiter = FederationRateLimiter(clock, rc_config) + + with ratelimiter.ratelimit("testhost") as d1: + # shouldn't block + self.successResultOf(d1) + + with ratelimiter.ratelimit("testhost") as d2: + # nor this + self.successResultOf(d2) + + with ratelimiter.ratelimit("testhost") as d3: + # this one should block, though ... + self.assertNoResult(d3) + sleep_time = _await_resolution(reactor, d3) + self.assertAlmostEqual(sleep_time, 500, places=3) + + +def _await_resolution(reactor, d): + """advance the clock until the deferred completes. + + Returns the number of milliseconds it took to complete. + """ + start_time = reactor.seconds() + while not d.called: + reactor.advance(0.01) + return (reactor.seconds() - start_time) * 1000 + + +def build_rc_config(settings={}): + config_dict = default_config("test") + config_dict.update(settings) + config = HomeServerConfig() + config.parse_config_dict(config_dict, "", "") + return config.rc_federation diff --git a/tests/utils.py b/tests/utils.py index d8e55b0801..8a94ce0b47 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -152,12 +152,6 @@ def default_config(name, parse=False): "mau_stats_only": False, "mau_limits_reserved_threepids": [], "admin_contact": None, - "rc_federation": { - "reject_limit": 10, - "sleep_limit": 10, - "sleep_delay": 10, - "concurrent": 10, - }, "rc_message": {"per_second": 10000, "burst_count": 10000}, "rc_registration": {"per_second": 10000, "burst_count": 10000}, "rc_login": { -- cgit 1.4.1