summary refs log tree commit diff
path: root/synapse/logging/_structured.py
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2019-08-28 21:18:53 +1000
committerGitHub <noreply@github.com>2019-08-28 21:18:53 +1000
commit7dc398586c2156a456d9526ac0e42c1fec9f8143 (patch)
treee5a21464decddbf9798495a351ddb3dbe6e250e4 /synapse/logging/_structured.py
parentFix a cache-invalidation bug for worker-based deployments (#5920) (diff)
downloadsynapse-7dc398586c2156a456d9526ac0e42c1fec9f8143.tar.xz
Implement a structured logging output system. (#5680)
Diffstat (limited to 'synapse/logging/_structured.py')
-rw-r--r--synapse/logging/_structured.py374
1 files changed, 374 insertions, 0 deletions
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
new file mode 100644
index 0000000000..0367d6dfc4
--- /dev/null
+++ b/synapse/logging/_structured.py
@@ -0,0 +1,374 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os.path
+import sys
+import typing
+import warnings
+
+import attr
+from constantly import NamedConstant, Names, ValueConstant, Values
+from zope.interface import implementer
+
+from twisted.logger import (
+    FileLogObserver,
+    FilteringLogObserver,
+    ILogObserver,
+    LogBeginner,
+    Logger,
+    LogLevel,
+    LogLevelFilterPredicate,
+    LogPublisher,
+    eventAsText,
+    globalLogBeginner,
+    jsonFileLogObserver,
+)
+
+from synapse.config._base import ConfigError
+from synapse.logging._terse_json import (
+    TerseJSONToConsoleLogObserver,
+    TerseJSONToTCPLogObserver,
+)
+from synapse.logging.context import LoggingContext
+
+
+def stdlib_log_level_to_twisted(level: str) -> LogLevel:
+    """
+    Convert a stdlib log level to Twisted's log level.
+    """
+    lvl = level.lower().replace("warning", "warn")
+    return LogLevel.levelWithName(lvl)
+
+
+@attr.s
+@implementer(ILogObserver)
+class LogContextObserver(object):
+    """
+    An ILogObserver which adds Synapse-specific log context information.
+
+    Attributes:
+        observer (ILogObserver): The target parent observer.
+    """
+
+    observer = attr.ib()
+
+    def __call__(self, event: dict) -> None:
+        """
+        Consume a log event and emit it to the parent observer after filtering
+        and adding log context information.
+
+        Args:
+            event (dict)
+        """
+        # Filter out some useless events that Twisted outputs
+        if "log_text" in event:
+            if event["log_text"].startswith("DNSDatagramProtocol starting on "):
+                return
+
+            if event["log_text"].startswith("(UDP Port "):
+                return
+
+            if event["log_text"].startswith("Timing out client") or event[
+                "log_format"
+            ].startswith("Timing out client"):
+                return
+
+        context = LoggingContext.current_context()
+
+        # Copy the context information to the log event.
+        if context is not None:
+            context.copy_to_twisted_log_entry(event)
+        else:
+            # If there's no logging context, not even the root one, we might be
+            # starting up or it might be from non-Synapse code. Log it as if it
+            # came from the root logger.
+            event["request"] = None
+            event["scope"] = None
+
+        self.observer(event)
+
+
+class PythonStdlibToTwistedLogger(logging.Handler):
+    """
+    Transform a Python stdlib log message into a Twisted one.
+    """
+
+    def __init__(self, observer, *args, **kwargs):
+        """
+        Args:
+            observer (ILogObserver): A Twisted logging observer.
+            *args, **kwargs: Args/kwargs to be passed to logging.Handler.
+        """
+        self.observer = observer
+        super().__init__(*args, **kwargs)
+
+    def emit(self, record: logging.LogRecord) -> None:
+        """
+        Emit a record to Twisted's observer.
+
+        Args:
+            record (logging.LogRecord)
+        """
+
+        self.observer(
+            {
+                "log_time": record.created,
+                "log_text": record.getMessage(),
+                "log_format": "{log_text}",
+                "log_namespace": record.name,
+                "log_level": stdlib_log_level_to_twisted(record.levelname),
+            }
+        )
+
+
+def SynapseFileLogObserver(outFile: typing.io.TextIO) -> FileLogObserver:
+    """
+    A log observer that formats events like the traditional log formatter and
+    sends them to `outFile`.
+
+    Args:
+        outFile (file object): The file object to write to.
+    """
+
+    def formatEvent(_event: dict) -> str:
+        event = dict(_event)
+        event["log_level"] = event["log_level"].name.upper()
+        event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
+            event.get("log_format", "{log_text}") or "{log_text}"
+        )
+        return eventAsText(event, includeSystem=False) + "\n"
+
+    return FileLogObserver(outFile, formatEvent)
+
+
+class DrainType(Names):
+    CONSOLE = NamedConstant()
+    CONSOLE_JSON = NamedConstant()
+    CONSOLE_JSON_TERSE = NamedConstant()
+    FILE = NamedConstant()
+    FILE_JSON = NamedConstant()
+    NETWORK_JSON_TERSE = NamedConstant()
+
+
+class OutputPipeType(Values):
+    stdout = ValueConstant(sys.__stdout__)
+    stderr = ValueConstant(sys.__stderr__)
+
+
+@attr.s
+class DrainConfiguration(object):
+    name = attr.ib()
+    type = attr.ib()
+    location = attr.ib()
+    options = attr.ib(default=None)
+
+
+@attr.s
+class NetworkJSONTerseOptions(object):
+    maximum_buffer = attr.ib(type=int)
+
+
+DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
+
+
+def parse_drain_configs(
+    drains: dict
+) -> typing.Generator[DrainConfiguration, None, None]:
+    """
+    Parse the drain configurations.
+
+    Args:
+        drains (dict): A list of drain configurations.
+
+    Yields:
+        DrainConfiguration instances.
+
+    Raises:
+        ConfigError: If any of the drain configuration items are invalid.
+    """
+    for name, config in drains.items():
+        if "type" not in config:
+            raise ConfigError("Logging drains require a 'type' key.")
+
+        try:
+            logging_type = DrainType.lookupByName(config["type"].upper())
+        except ValueError:
+            raise ConfigError(
+                "%s is not a known logging drain type." % (config["type"],)
+            )
+
+        if logging_type in [
+            DrainType.CONSOLE,
+            DrainType.CONSOLE_JSON,
+            DrainType.CONSOLE_JSON_TERSE,
+        ]:
+            location = config.get("location")
+            if location is None or location not in ["stdout", "stderr"]:
+                raise ConfigError(
+                    (
+                        "The %s drain needs the 'location' key set to "
+                        "either 'stdout' or 'stderr'."
+                    )
+                    % (logging_type,)
+                )
+
+            pipe = OutputPipeType.lookupByName(location).value
+
+            yield DrainConfiguration(name=name, type=logging_type, location=pipe)
+
+        elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
+            if "location" not in config:
+                raise ConfigError(
+                    "The %s drain needs the 'location' key set." % (logging_type,)
+                )
+
+            location = config.get("location")
+            if os.path.abspath(location) != location:
+                raise ConfigError(
+                    "File paths need to be absolute, '%s' is a relative path"
+                    % (location,)
+                )
+            yield DrainConfiguration(name=name, type=logging_type, location=location)
+
+        elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
+            host = config.get("host")
+            port = config.get("port")
+            maximum_buffer = config.get("maximum_buffer", 1000)
+            yield DrainConfiguration(
+                name=name,
+                type=logging_type,
+                location=(host, port),
+                options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
+            )
+
+        else:
+            raise ConfigError(
+                "The %s drain type is currently not implemented."
+                % (config["type"].upper(),)
+            )
+
+
+def setup_structured_logging(
+    hs,
+    config,
+    log_config: dict,
+    logBeginner: LogBeginner = globalLogBeginner,
+    redirect_stdlib_logging: bool = True,
+) -> LogPublisher:
+    """
+    Set up Twisted's structured logging system.
+
+    Args:
+        hs: The homeserver to use.
+        config (HomeserverConfig): The configuration of the Synapse homeserver.
+        log_config (dict): The log configuration to use.
+    """
+    if config.no_redirect_stdio:
+        raise ConfigError(
+            "no_redirect_stdio cannot be defined using structured logging."
+        )
+
+    logger = Logger()
+
+    if "drains" not in log_config:
+        raise ConfigError("The logging configuration requires a list of drains.")
+
+    observers = []
+
+    for observer in parse_drain_configs(log_config["drains"]):
+        # Pipe drains
+        if observer.type == DrainType.CONSOLE:
+            logger.debug(
+                "Starting up the {name} console logger drain", name=observer.name
+            )
+            observers.append(SynapseFileLogObserver(observer.location))
+        elif observer.type == DrainType.CONSOLE_JSON:
+            logger.debug(
+                "Starting up the {name} JSON console logger drain", name=observer.name
+            )
+            observers.append(jsonFileLogObserver(observer.location))
+        elif observer.type == DrainType.CONSOLE_JSON_TERSE:
+            logger.debug(
+                "Starting up the {name} terse JSON console logger drain",
+                name=observer.name,
+            )
+            observers.append(
+                TerseJSONToConsoleLogObserver(observer.location, metadata={})
+            )
+
+        # File drains
+        elif observer.type == DrainType.FILE:
+            logger.debug("Starting up the {name} file logger drain", name=observer.name)
+            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
+            observers.append(SynapseFileLogObserver(log_file))
+        elif observer.type == DrainType.FILE_JSON:
+            logger.debug(
+                "Starting up the {name} JSON file logger drain", name=observer.name
+            )
+            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
+            observers.append(jsonFileLogObserver(log_file))
+
+        elif observer.type == DrainType.NETWORK_JSON_TERSE:
+            metadata = {"server_name": hs.config.server_name}
+            log_observer = TerseJSONToTCPLogObserver(
+                hs=hs,
+                host=observer.location[0],
+                port=observer.location[1],
+                metadata=metadata,
+                maximum_buffer=observer.options.maximum_buffer,
+            )
+            log_observer.start()
+            observers.append(log_observer)
+        else:
+            # We should never get here, but, just in case, throw an error.
+            raise ConfigError("%s drain type cannot be configured" % (observer.type,))
+
+    publisher = LogPublisher(*observers)
+    log_filter = LogLevelFilterPredicate()
+
+    for namespace, namespace_config in log_config.get(
+        "loggers", DEFAULT_LOGGERS
+    ).items():
+        # Set the log level for twisted.logger.Logger namespaces
+        log_filter.setLogLevelForNamespace(
+            namespace,
+            stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
+        )
+
+        # Also set the log levels for the stdlib logger namespaces, to prevent
+        # them getting to PythonStdlibToTwistedLogger and having to be formatted
+        if "level" in namespace_config:
+            logging.getLogger(namespace).setLevel(namespace_config.get("level"))
+
+    f = FilteringLogObserver(publisher, [log_filter])
+    lco = LogContextObserver(f)
+
+    if redirect_stdlib_logging:
+        stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
+        stdliblogger = logging.getLogger()
+        stdliblogger.addHandler(stuff_into_twisted)
+
+    # Always redirect standard I/O, otherwise other logging outputs might miss
+    # it.
+    logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
+
+    return publisher
+
+
+def reload_structured_logging(*args, log_config=None) -> None:
+    warnings.warn(
+        "Currently the structured logging system can not be reloaded, doing nothing"
+    )