diff options
-rwxr-xr-x | synapse/app/homeserver.py | 12 | ||||
-rw-r--r-- | synapse/config/logger.py | 24 | ||||
-rw-r--r-- | synapse/crypto/keyclient.py | 10 | ||||
-rw-r--r-- | synapse/http/client.py | 22 | ||||
-rw-r--r-- | synapse/http/server.py | 13 | ||||
-rw-r--r-- | synapse/storage/_base.py | 57 | ||||
-rw-r--r-- | synapse/util/async.py | 8 | ||||
-rw-r--r-- | synapse/util/logcontext.py | 108 | ||||
-rw-r--r-- | synapse/util/logutils.py | 1 | ||||
-rw-r--r-- | tests/util/test_log_context.py | 43 |
10 files changed, 247 insertions, 51 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 43164c8d67..85284a4919 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -33,6 +33,7 @@ from synapse.api.urls import ( ) from synapse.config.homeserver import HomeServerConfig from synapse.crypto import context_factory +from synapse.util.logcontext import LoggingContext from daemonize import Daemonize import twisted.manhole.telnet @@ -246,7 +247,7 @@ def setup(): daemon = Daemonize( app="synapse-homeserver", pid=config.pid_file, - action=reactor.run, + action=run, auto_close_fds=False, verbose=True, logger=logger, @@ -256,6 +257,13 @@ def setup(): else: reactor.run() +def run(): + with LoggingContext("run"): + reactor.run() + +def main(): + with LoggingContext("main"): + setup() if __name__ == '__main__': - setup() + main() diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 05611d02f7..8566296433 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -14,7 +14,7 @@ # limitations under the License. from ._base import Config - +from synapse.util.logcontext import LoggingContextFilter from twisted.python.log import PythonLoggingObserver import logging import logging.config @@ -46,7 +46,8 @@ class LoggingConfig(Config): def setup_logging(self): log_format = ( - '%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(message)s' + "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" + " - %(message)s" ) if self.log_config is None: @@ -54,13 +55,20 @@ class LoggingConfig(Config): if self.verbosity: level = logging.DEBUG - # FIXME: we need a logging.WARN for a -q quiet option + # FIXME: we need a logging.WARN for a -q quiet option + logger = logging.getLogger('') + logger.setLevel(level) + formatter = logging.Formatter(log_format) + if self.log_file: + handler = logging.FileHandler(self.log_file) + else: + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + handler.addFilter(LoggingContextFilter(request="")) - logging.basicConfig( - level=level, - filename=self.log_file, - format=log_format - ) + logger.addHandler(handler) + logger.info("Test") else: logging.config.fileConfig(self.log_config) diff --git a/synapse/crypto/keyclient.py b/synapse/crypto/keyclient.py index 5191be4570..bb1f400b54 100644 --- a/synapse/crypto/keyclient.py +++ b/synapse/crypto/keyclient.py @@ -18,6 +18,7 @@ from twisted.web.http import HTTPClient from twisted.internet.protocol import Factory from twisted.internet import defer, reactor from synapse.http.endpoint import matrix_endpoint +from synapse.util.logcontext import PreserveLoggingContext import json import logging @@ -36,10 +37,11 @@ def fetch_server_key(server_name, ssl_context_factory): for i in range(5): try: - protocol = yield endpoint.connect(factory) - server_response, server_certificate = yield protocol.remote_key - defer.returnValue((server_response, server_certificate)) - return + with PreserveLoggingContext(): + protocol = yield endpoint.connect(factory) + server_response, server_certificate = yield protocol.remote_key + defer.returnValue((server_response, server_certificate)) + return except Exception as e: logger.exception(e) raise IOError("Cannot get key for %s" % server_name) diff --git a/synapse/http/client.py b/synapse/http/client.py index c34b086eb9..29e6053bc1 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -23,6 +23,7 @@ from twisted.web.http_headers import Headers from synapse.http.endpoint import matrix_endpoint from synapse.util.async import sleep +from synapse.util.logcontext import PreserveLoggingContext from syutil.jsonutil import encode_canonical_json @@ -108,16 +109,17 @@ class BaseHttpClient(object): producer = body_callback(method, url_bytes, headers_dict) try: - response = yield self.agent.request( - destination, - endpoint, - method, - path_bytes, - param_bytes, - query_bytes, - Headers(headers_dict), - producer - ) + with PreserveLoggingContext(): + response = yield self.agent.request( + destination, + endpoint, + method, + path_bytes, + param_bytes, + query_bytes, + Headers(headers_dict), + producer + ) logger.debug("Got response to %s", method) break diff --git a/synapse/http/server.py b/synapse/http/server.py index 8d419c02dd..ed1f1170cb 100644 --- a/synapse/http/server.py +++ b/synapse/http/server.py @@ -20,6 +20,7 @@ from syutil.jsonutil import ( from synapse.api.errors import ( cs_exception, SynapseError, CodeMessageException ) +from synapse.util.logcontext import LoggingContext from twisted.internet import defer, reactor from twisted.web import server, resource @@ -88,9 +89,19 @@ class JsonResource(HttpServer, resource.Resource): def render(self, request): """ This get's called by twisted every time someone sends us a request. """ - self._async_render(request) + self._async_render_with_logging_context(request) return server.NOT_DONE_YET + _request_id = 0 + + @defer.inlineCallbacks + def _async_render_with_logging_context(self, request): + request_id = "%s-%s" % (request.method, JsonResource._request_id) + JsonResource._request_id += 1 + with LoggingContext(request_id) as request_context: + request_context.request = request_id + yield self._async_render(request) + @defer.inlineCallbacks def _async_render(self, request): """ This get's called by twisted every time someone sends us a request. diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 670387b04a..30e6eac8db 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -17,8 +17,11 @@ import logging from synapse.api.errors import StoreError from synapse.api.events.utils import prune_event from synapse.util.logutils import log_function +from synapse.util.logcontext import PreserveLoggingContext, LoggingContext from syutil.base64util import encode_base64 +from twisted.internet import defer + import collections import copy import json @@ -84,32 +87,40 @@ class SQLBaseStore(object): self.event_factory = hs.get_event_factory() self._clock = hs.get_clock() + @defer.inlineCallbacks def runInteraction(self, desc, func, *args, **kwargs): """Wraps the .runInteraction() method on the underlying db_pool.""" + current_context = LoggingContext.current_context() def inner_func(txn, *args, **kwargs): - start = time.clock() * 1000 - txn_id = SQLBaseStore._TXN_ID - - # We don't really need these to be unique, so lets stop it from - # growing really large. - self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) - - name = "%s-%x" % (desc, txn_id, ) - - transaction_logger.debug("[TXN START] {%s}", name) - try: - return func(LoggingTransaction(txn, name), *args, **kwargs) - except: - logger.exception("[TXN FAIL] {%s}", name) - raise - finally: - end = time.clock() * 1000 - transaction_logger.debug( - "[TXN END] {%s} %f", - name, end - start - ) + with LoggingContext("runInteraction") as context: + current_context.copy_to(context) + start = time.clock() * 1000 + txn_id = SQLBaseStore._TXN_ID + + # We don't really need these to be unique, so lets stop it from + # growing really large. + self._TXN_ID = (self._TXN_ID + 1) % (sys.maxint - 1) + + name = "%s-%x" % (desc, txn_id, ) + + transaction_logger.debug("[TXN START] {%s}", name) + try: + return func(LoggingTransaction(txn, name), *args, **kwargs) + except: + logger.exception("[TXN FAIL] {%s}", name) + raise + finally: + end = time.clock() * 1000 + transaction_logger.debug( + "[TXN END] {%s} %f", + name, end - start + ) - return self._db_pool.runInteraction(inner_func, *args, **kwargs) + with PreserveLoggingContext(): + result = yield self._db_pool.runInteraction( + inner_func, *args, **kwargs + ) + defer.returnValue(result) def cursor_to_dict(self, cursor): """Converts a SQL cursor into an list of dicts. @@ -177,7 +188,7 @@ class SQLBaseStore(object): ) logger.debug( - "[SQL] %s Args=%s Func=%s", + "[SQL] %s Args=%s", sql, values.values(), ) diff --git a/synapse/util/async.py b/synapse/util/async.py index bf578f8bfb..1219d927db 100644 --- a/synapse/util/async.py +++ b/synapse/util/async.py @@ -16,15 +16,17 @@ from twisted.internet import defer, reactor +from .logcontext import PreserveLoggingContext +@defer.inlineCallbacks def sleep(seconds): d = defer.Deferred() reactor.callLater(seconds, d.callback, seconds) - return d - + with PreserveLoggingContext(): + yield d def run_on_reactor(): """ This will cause the rest of the function to be invoked upon the next iteration of the main loop """ - return sleep(0) \ No newline at end of file + return sleep(0) diff --git a/synapse/util/logcontext.py b/synapse/util/logcontext.py new file mode 100644 index 0000000000..13176b05ce --- /dev/null +++ b/synapse/util/logcontext.py @@ -0,0 +1,108 @@ +import threading +import logging + + +class LoggingContext(object): + """Additional context for log formatting. Contexts are scoped within a + "with" block. Contexts inherit the state of their parent contexts. + Args: + name (str): Name for the context for debugging. + """ + + __slots__ = ["parent_context", "name", "__dict__"] + + thread_local = threading.local() + + class Sentinel(object): + """Sentinel to represent the root context""" + + __slots__ = [] + + def copy_to(self, record): + pass + + sentinel = Sentinel() + + def __init__(self, name=None): + self.parent_context = None + self.name = name + + def __str__(self): + return "%s@%x" % (self.name, id(self)) + + @classmethod + def current_context(cls): + """Get the current logging context from thread local storage""" + return getattr(cls.thread_local, "current_context", cls.sentinel) + + def __enter__(self): + """Enters this logging context into thread local storage""" + if self.parent_context is not None: + raise Exception("Attempt to enter logging context multiple times") + self.parent_context = self.current_context() + self.thread_local.current_context = self + return self + + def __exit__(self, type, value, traceback): + """Restore the logging context in thread local storage to the state it + was before this context was entered. + Returns: + None to avoid suppressing any exeptions that were thrown. + """ + if self.thread_local.current_context is not self: + logging.error( + "Current logging context %s is not the expected context %s", + self.thread_local.current_context, + self + ) + self.thread_local.current_context = self.parent_context + self.parent_context = None + + def __getattr__(self, name): + """Delegate member lookup to parent context""" + return getattr(self.parent_context, name) + + def copy_to(self, record): + """Copy fields from this context and its parents to the record""" + if self.parent_context is not None: + self.parent_context.copy_to(record) + for key, value in self.__dict__.items(): + setattr(record, key, value) + + +class LoggingContextFilter(logging.Filter): + """Logging filter that adds values from the current logging context to each + record. + Args: + **defaults: Default values to avoid formatters complaining about + missing fields + """ + def __init__(self, **defaults): + self.defaults = defaults + + def filter(self, record): + """Add each fields from the logging contexts to the record. + Returns: + True to include the record in the log output. + """ + context = LoggingContext.current_context() + for key, value in self.defaults.items(): + setattr(record, key, value) + context.copy_to(record) + return True + + +class PreserveLoggingContext(object): + """Captures the current logging context and restores it when the scope is + exited. Used to restore the context after a function using + @defer.inlineCallbacks is resumed by a callback from the reactor.""" + + __slots__ = ["current_context"] + + def __enter__(self): + """Captures the current logging context""" + self.current_context = LoggingContext.current_context() + + def __exit__(self, type, value, traceback): + """Restores the current logging context""" + LoggingContext.thread_local.current_context = self.current_context diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index fadf0bd510..903a6cf1b3 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -75,6 +75,7 @@ def trace_function(f): linenum = f.func_code.co_firstlineno pathname = f.func_code.co_filename + @wraps(f) def wrapped(*args, **kwargs): name = f.__module__ logger = logging.getLogger(name) diff --git a/tests/util/test_log_context.py b/tests/util/test_log_context.py new file mode 100644 index 0000000000..efa0f28bad --- /dev/null +++ b/tests/util/test_log_context.py @@ -0,0 +1,43 @@ +from twisted.internet import defer +from twisted.internet import reactor +from .. import unittest + +from synapse.util.async import sleep +from synapse.util.logcontext import LoggingContext + +class LoggingContextTestCase(unittest.TestCase): + + def _check_test_key(self, value): + self.assertEquals( + LoggingContext.current_context().test_key, value + ) + + def test_with_context(self): + with LoggingContext() as context_one: + context_one.test_key = "test" + self._check_test_key("test") + + def test_chaining(self): + with LoggingContext() as context_one: + context_one.test_key = "one" + with LoggingContext() as context_two: + self._check_test_key("one") + context_two.test_key = "two" + self._check_test_key("two") + self._check_test_key("one") + + @defer.inlineCallbacks + def test_sleep(self): + @defer.inlineCallbacks + def competing_callback(): + with LoggingContext() as competing_context: + competing_context.test_key = "competing" + yield sleep(0) + self._check_test_key("competing") + + reactor.callLater(0, competing_callback) + + with LoggingContext() as context_one: + context_one.test_key = "one" + yield sleep(0) + self._check_test_key("one") |