diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/config/logger.py | 96 | ||||
-rw-r--r-- | synapse/logging/__init__.py | 20 | ||||
-rw-r--r-- | synapse/logging/_remote.py | 97 | ||||
-rw-r--r-- | synapse/logging/_structured.py | 329 | ||||
-rw-r--r-- | synapse/logging/_terse_json.py | 192 | ||||
-rw-r--r-- | synapse/logging/filter.py | 33 |
6 files changed, 266 insertions, 501 deletions
diff --git a/synapse/config/logger.py b/synapse/config/logger.py index 6b7be28aee..d4e887a3e0 100644 --- a/synapse/config/logger.py +++ b/synapse/config/logger.py @@ -23,7 +23,6 @@ from string import Template import yaml from twisted.logger import ( - ILogObserver, LogBeginner, STDLibLogObserver, eventAsText, @@ -32,11 +31,9 @@ from twisted.logger import ( import synapse from synapse.app import _base as appbase -from synapse.logging._structured import ( - reload_structured_logging, - setup_structured_logging, -) +from synapse.logging._structured import setup_structured_logging from synapse.logging.context import LoggingContextFilter +from synapse.logging.filter import MetadataFilter from synapse.util.versionstring import get_version_string from ._base import Config, ConfigError @@ -48,7 +45,11 @@ DEFAULT_LOG_CONFIG = Template( # This is a YAML file containing a standard Python logging configuration # dictionary. See [1] for details on the valid settings. # +# Synapse also supports structured logging for machine readable logs which can +# be ingested by ELK stacks. See [2] for details. +# # [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema +# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md version: 1 @@ -176,11 +177,11 @@ class LoggingConfig(Config): log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file)) -def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): +def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) -> None: """ - Set up Python stdlib logging. + Set up Python standard library logging. """ - if log_config is None: + if log_config_path is None: log_format = ( "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s" " - %(message)s" @@ -196,7 +197,8 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): handler.setFormatter(formatter) logger.addHandler(handler) else: - logging.config.dictConfig(log_config) + # Load the logging configuration. + _load_logging_config(log_config_path) # We add a log record factory that runs all messages through the # LoggingContextFilter so that we get the context *at the time we log* @@ -204,12 +206,14 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): # filter options, but care must when using e.g. MemoryHandler to buffer # writes. - log_filter = LoggingContextFilter(request="") + log_context_filter = LoggingContextFilter(request="") + log_metadata_filter = MetadataFilter({"server_name": config.server_name}) old_factory = logging.getLogRecordFactory() def factory(*args, **kwargs): record = old_factory(*args, **kwargs) - log_filter.filter(record) + log_context_filter.filter(record) + log_metadata_filter.filter(record) return record logging.setLogRecordFactory(factory) @@ -255,21 +259,40 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner): if not config.no_redirect_stdio: print("Redirected stdout/stderr to logs") - return observer - -def _reload_stdlib_logging(*args, log_config=None): - logger = logging.getLogger("") +def _load_logging_config(log_config_path: str) -> None: + """ + Configure logging from a log config path. + """ + with open(log_config_path, "rb") as f: + log_config = yaml.safe_load(f.read()) if not log_config: - logger.warning("Reloaded a blank config?") + logging.warning("Loaded a blank logging config?") + + # If the old structured logging configuration is being used, convert it to + # the new style configuration. + if "structured" in log_config and log_config.get("structured"): + log_config = setup_structured_logging(log_config) logging.config.dictConfig(log_config) +def _reload_logging_config(log_config_path): + """ + Reload the log configuration from the file and apply it. + """ + # If no log config path was given, it cannot be reloaded. + if log_config_path is None: + return + + _load_logging_config(log_config_path) + logging.info("Reloaded log config from %s due to SIGHUP", log_config_path) + + def setup_logging( hs, config, use_worker_options=False, logBeginner: LogBeginner = globalLogBeginner -) -> ILogObserver: +) -> None: """ Set up the logging subsystem. @@ -282,41 +305,18 @@ def setup_logging( logBeginner: The Twisted logBeginner to use. - Returns: - The "root" Twisted Logger observer, suitable for sending logs to from a - Logger instance. """ - log_config = config.worker_log_config if use_worker_options else config.log_config - - def read_config(*args, callback=None): - if log_config is None: - return None - - with open(log_config, "rb") as f: - log_config_body = yaml.safe_load(f.read()) - - if callback: - callback(log_config=log_config_body) - logging.info("Reloaded log config from %s due to SIGHUP", log_config) - - return log_config_body + log_config_path = ( + config.worker_log_config if use_worker_options else config.log_config + ) - log_config_body = read_config() + # Perform one-time logging configuration. + _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner) + # Add a SIGHUP handler to reload the logging configuration, if one is available. + appbase.register_sighup(_reload_logging_config, log_config_path) - if log_config_body and log_config_body.get("structured") is True: - logger = setup_structured_logging( - hs, config, log_config_body, logBeginner=logBeginner - ) - appbase.register_sighup(read_config, callback=reload_structured_logging) - else: - logger = _setup_stdlib_logging(config, log_config_body, logBeginner=logBeginner) - appbase.register_sighup(read_config, callback=_reload_stdlib_logging) - - # make sure that the first thing we log is a thing we can grep backwards - # for + # Log immediately so we can grep backwards. logging.warning("***** STARTING SERVER *****") logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse)) logging.info("Server hostname: %s", config.server_name) logging.info("Instance name: %s", hs.get_instance_name()) - - return logger diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py index e69de29bb2..b28b7b2ef7 100644 --- a/synapse/logging/__init__.py +++ b/synapse/logging/__init__.py @@ -0,0 +1,20 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# These are imported to allow for nicer logging configuration files. +from synapse.logging._remote import RemoteHandler +from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter + +__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"] diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py index 0caf325916..ba45424f02 100644 --- a/synapse/logging/_remote.py +++ b/synapse/logging/_remote.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging import sys import traceback from collections import deque @@ -21,6 +22,7 @@ from math import floor from typing import Callable, Optional import attr +from typing_extensions import Deque from zope.interface import implementer from twisted.application.internet import ClientService @@ -32,7 +34,8 @@ from twisted.internet.endpoints import ( ) from twisted.internet.interfaces import IPushProducer, ITransport from twisted.internet.protocol import Factory, Protocol -from twisted.logger import ILogObserver, Logger, LogLevel + +logger = logging.getLogger(__name__) @attr.s @@ -45,11 +48,11 @@ class LogProducer: Args: buffer: Log buffer to read logs from. transport: Transport to write to. - format_event: A callable to format the log entry to a string. + format: A callable to format the log record to a string. """ transport = attr.ib(type=ITransport) - format_event = attr.ib(type=Callable[[dict], str]) + _format = attr.ib(type=Callable[[logging.LogRecord], str]) _buffer = attr.ib(type=deque) _paused = attr.ib(default=False, type=bool, init=False) @@ -61,16 +64,19 @@ class LogProducer: self._buffer = deque() def resumeProducing(self): + # If we're already producing, nothing to do. self._paused = False + # Loop until paused. while self._paused is False and (self._buffer and self.transport.connected): try: - # Request the next event and format it. - event = self._buffer.popleft() - msg = self.format_event(event) + # Request the next record and format it. + record = self._buffer.popleft() + msg = self._format(record) # Send it as a new line over the transport. self.transport.write(msg.encode("utf8")) + self.transport.write(b"\n") except Exception: # Something has gone wrong writing to the transport -- log it # and break out of the while. @@ -78,60 +84,63 @@ class LogProducer: break -@attr.s -@implementer(ILogObserver) -class TCPLogObserver: +class RemoteHandler(logging.Handler): """ - An IObserver that writes JSON logs to a TCP target. + An logging handler that writes logs to a TCP target. Args: - hs (HomeServer): The homeserver that is being logged for. host: The host of the logging target. port: The logging target's port. - format_event: A callable to format the log entry to a string. maximum_buffer: The maximum buffer size. """ - hs = attr.ib() - host = attr.ib(type=str) - port = attr.ib(type=int) - format_event = attr.ib(type=Callable[[dict], str]) - maximum_buffer = attr.ib(type=int) - _buffer = attr.ib(default=attr.Factory(deque), type=deque) - _connection_waiter = attr.ib(default=None, type=Optional[Deferred]) - _logger = attr.ib(default=attr.Factory(Logger)) - _producer = attr.ib(default=None, type=Optional[LogProducer]) - - def start(self) -> None: + def __init__( + self, + host: str, + port: int, + maximum_buffer: int = 1000, + level=logging.NOTSET, + _reactor=None, + ): + super().__init__(level=level) + self.host = host + self.port = port + self.maximum_buffer = maximum_buffer + + self._buffer = deque() # type: Deque[logging.LogRecord] + self._connection_waiter = None # type: Optional[Deferred] + self._producer = None # type: Optional[LogProducer] # Connect without DNS lookups if it's a direct IP. + if _reactor is None: + from twisted.internet import reactor + + _reactor = reactor + try: ip = ip_address(self.host) if isinstance(ip, IPv4Address): - endpoint = TCP4ClientEndpoint( - self.hs.get_reactor(), self.host, self.port - ) + endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port) elif isinstance(ip, IPv6Address): - endpoint = TCP6ClientEndpoint( - self.hs.get_reactor(), self.host, self.port - ) + endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port) else: raise ValueError("Unknown IP address provided: %s" % (self.host,)) except ValueError: - endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port) + endpoint = HostnameEndpoint(_reactor, self.host, self.port) factory = Factory.forProtocol(Protocol) - self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor()) + self._service = ClientService(endpoint, factory, clock=_reactor) self._service.startService() self._connect() - def stop(self): + def close(self): self._service.stopService() def _connect(self) -> None: """ Triggers an attempt to connect then write to the remote if not already writing. """ + # Do not attempt to open multiple connections. if self._connection_waiter: return @@ -158,9 +167,7 @@ class TCPLogObserver: # Make a new producer and start it. self._producer = LogProducer( - buffer=self._buffer, - transport=r.transport, - format_event=self.format_event, + buffer=self._buffer, transport=r.transport, format=self.format, ) r.transport.registerProducer(self._producer, True) self._producer.resumeProducing() @@ -168,19 +175,19 @@ class TCPLogObserver: def _handle_pressure(self) -> None: """ - Handle backpressure by shedding events. + Handle backpressure by shedding records. The buffer will, in this order, until the buffer is below the maximum: - - Shed DEBUG events - - Shed INFO events - - Shed the middle 50% of the events. + - Shed DEBUG records. + - Shed INFO records. + - Shed the middle 50% of the records. """ if len(self._buffer) <= self.maximum_buffer: return # Strip out DEBUGs self._buffer = deque( - filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer) + filter(lambda record: record.levelno > logging.DEBUG, self._buffer) ) if len(self._buffer) <= self.maximum_buffer: @@ -188,7 +195,7 @@ class TCPLogObserver: # Strip out INFOs self._buffer = deque( - filter(lambda event: event["log_level"] != LogLevel.info, self._buffer) + filter(lambda record: record.levelno > logging.INFO, self._buffer) ) if len(self._buffer) <= self.maximum_buffer: @@ -209,17 +216,17 @@ class TCPLogObserver: self._buffer.extend(reversed(end_buffer)) - def __call__(self, event: dict) -> None: - self._buffer.append(event) + def emit(self, record: logging.LogRecord) -> None: + self._buffer.append(record) # Handle backpressure, if it exists. try: self._handle_pressure() except Exception: - # If handling backpressure fails,clear the buffer and log the + # If handling backpressure fails, clear the buffer and log the # exception. self._buffer.clear() - self._logger.failure("Failed clearing backpressure") + logger.warning("Failed clearing backpressure") # Try and write immediately. self._connect() diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py index 0fc2ea609e..14d9c104c2 100644 --- a/synapse/logging/_structured.py +++ b/synapse/logging/_structured.py @@ -12,138 +12,12 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -import logging import os.path -import sys -import typing -import warnings -from typing import List +from typing import Any, Dict, Generator, Optional, Tuple -import attr -from constantly import NamedConstant, Names, ValueConstant, Values -from zope.interface import implementer - -from twisted.logger import ( - FileLogObserver, - FilteringLogObserver, - ILogObserver, - LogBeginner, - Logger, - LogLevel, - LogLevelFilterPredicate, - LogPublisher, - eventAsText, - jsonFileLogObserver, -) +from constantly import NamedConstant, Names from synapse.config._base import ConfigError -from synapse.logging._terse_json import ( - TerseJSONToConsoleLogObserver, - TerseJSONToTCPLogObserver, -) -from synapse.logging.context import current_context - - -def stdlib_log_level_to_twisted(level: str) -> LogLevel: - """ - Convert a stdlib log level to Twisted's log level. - """ - lvl = level.lower().replace("warning", "warn") - return LogLevel.levelWithName(lvl) - - -@attr.s -@implementer(ILogObserver) -class LogContextObserver: - """ - An ILogObserver which adds Synapse-specific log context information. - - Attributes: - observer (ILogObserver): The target parent observer. - """ - - observer = attr.ib() - - def __call__(self, event: dict) -> None: - """ - Consume a log event and emit it to the parent observer after filtering - and adding log context information. - - Args: - event (dict) - """ - # Filter out some useless events that Twisted outputs - if "log_text" in event: - if event["log_text"].startswith("DNSDatagramProtocol starting on "): - return - - if event["log_text"].startswith("(UDP Port "): - return - - if event["log_text"].startswith("Timing out client") or event[ - "log_format" - ].startswith("Timing out client"): - return - - context = current_context() - - # Copy the context information to the log event. - context.copy_to_twisted_log_entry(event) - - self.observer(event) - - -class PythonStdlibToTwistedLogger(logging.Handler): - """ - Transform a Python stdlib log message into a Twisted one. - """ - - def __init__(self, observer, *args, **kwargs): - """ - Args: - observer (ILogObserver): A Twisted logging observer. - *args, **kwargs: Args/kwargs to be passed to logging.Handler. - """ - self.observer = observer - super().__init__(*args, **kwargs) - - def emit(self, record: logging.LogRecord) -> None: - """ - Emit a record to Twisted's observer. - - Args: - record (logging.LogRecord) - """ - - self.observer( - { - "log_time": record.created, - "log_text": record.getMessage(), - "log_format": "{log_text}", - "log_namespace": record.name, - "log_level": stdlib_log_level_to_twisted(record.levelname), - } - ) - - -def SynapseFileLogObserver(outFile: typing.IO[str]) -> FileLogObserver: - """ - A log observer that formats events like the traditional log formatter and - sends them to `outFile`. - - Args: - outFile (file object): The file object to write to. - """ - - def formatEvent(_event: dict) -> str: - event = dict(_event) - event["log_level"] = event["log_level"].name.upper() - event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + ( - event.get("log_format", "{log_text}") or "{log_text}" - ) - return eventAsText(event, includeSystem=False) + "\n" - - return FileLogObserver(outFile, formatEvent) class DrainType(Names): @@ -155,30 +29,12 @@ class DrainType(Names): NETWORK_JSON_TERSE = NamedConstant() -class OutputPipeType(Values): - stdout = ValueConstant(sys.__stdout__) - stderr = ValueConstant(sys.__stderr__) - - -@attr.s -class DrainConfiguration: - name = attr.ib() - type = attr.ib() - location = attr.ib() - options = attr.ib(default=None) - - -@attr.s -class NetworkJSONTerseOptions: - maximum_buffer = attr.ib(type=int) - - -DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}} +DEFAULT_LOGGERS = {"synapse": {"level": "info"}} def parse_drain_configs( drains: dict, -) -> typing.Generator[DrainConfiguration, None, None]: +) -> Generator[Tuple[str, Dict[str, Any]], None, None]: """ Parse the drain configurations. @@ -186,11 +42,12 @@ def parse_drain_configs( drains (dict): A list of drain configurations. Yields: - DrainConfiguration instances. + dict instances representing a logging handler. Raises: ConfigError: If any of the drain configuration items are invalid. """ + for name, config in drains.items(): if "type" not in config: raise ConfigError("Logging drains require a 'type' key.") @@ -202,6 +59,18 @@ def parse_drain_configs( "%s is not a known logging drain type." % (config["type"],) ) + # Either use the default formatter or the tersejson one. + if logging_type in (DrainType.CONSOLE_JSON, DrainType.FILE_JSON,): + formatter = "json" # type: Optional[str] + elif logging_type in ( + DrainType.CONSOLE_JSON_TERSE, + DrainType.NETWORK_JSON_TERSE, + ): + formatter = "tersejson" + else: + # A formatter of None implies using the default formatter. + formatter = None + if logging_type in [ DrainType.CONSOLE, DrainType.CONSOLE_JSON, @@ -217,9 +86,11 @@ def parse_drain_configs( % (logging_type,) ) - pipe = OutputPipeType.lookupByName(location).value - - yield DrainConfiguration(name=name, type=logging_type, location=pipe) + yield name, { + "class": "logging.StreamHandler", + "formatter": formatter, + "stream": "ext://sys." + location, + } elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]: if "location" not in config: @@ -233,18 +104,25 @@ def parse_drain_configs( "File paths need to be absolute, '%s' is a relative path" % (location,) ) - yield DrainConfiguration(name=name, type=logging_type, location=location) + + yield name, { + "class": "logging.FileHandler", + "formatter": formatter, + "filename": location, + } elif logging_type in [DrainType.NETWORK_JSON_TERSE]: host = config.get("host") port = config.get("port") maximum_buffer = config.get("maximum_buffer", 1000) - yield DrainConfiguration( - name=name, - type=logging_type, - location=(host, port), - options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer), - ) + + yield name, { + "class": "synapse.logging.RemoteHandler", + "formatter": formatter, + "host": host, + "port": port, + "maximum_buffer": maximum_buffer, + } else: raise ConfigError( @@ -253,126 +131,29 @@ def parse_drain_configs( ) -class StoppableLogPublisher(LogPublisher): +def setup_structured_logging(log_config: dict,) -> dict: """ - A log publisher that can tell its observers to shut down any external - communications. - """ - - def stop(self): - for obs in self._observers: - if hasattr(obs, "stop"): - obs.stop() - - -def setup_structured_logging( - hs, - config, - log_config: dict, - logBeginner: LogBeginner, - redirect_stdlib_logging: bool = True, -) -> LogPublisher: - """ - Set up Twisted's structured logging system. - - Args: - hs: The homeserver to use. - config (HomeserverConfig): The configuration of the Synapse homeserver. - log_config (dict): The log configuration to use. + Convert a legacy structured logging configuration (from Synapse < v1.23.0) + to one compatible with the new standard library handlers. """ - if config.no_redirect_stdio: - raise ConfigError( - "no_redirect_stdio cannot be defined using structured logging." - ) - - logger = Logger() - if "drains" not in log_config: raise ConfigError("The logging configuration requires a list of drains.") - observers = [] # type: List[ILogObserver] - - for observer in parse_drain_configs(log_config["drains"]): - # Pipe drains - if observer.type == DrainType.CONSOLE: - logger.debug( - "Starting up the {name} console logger drain", name=observer.name - ) - observers.append(SynapseFileLogObserver(observer.location)) - elif observer.type == DrainType.CONSOLE_JSON: - logger.debug( - "Starting up the {name} JSON console logger drain", name=observer.name - ) - observers.append(jsonFileLogObserver(observer.location)) - elif observer.type == DrainType.CONSOLE_JSON_TERSE: - logger.debug( - "Starting up the {name} terse JSON console logger drain", - name=observer.name, - ) - observers.append( - TerseJSONToConsoleLogObserver(observer.location, metadata={}) - ) - - # File drains - elif observer.type == DrainType.FILE: - logger.debug("Starting up the {name} file logger drain", name=observer.name) - log_file = open(observer.location, "at", buffering=1, encoding="utf8") - observers.append(SynapseFileLogObserver(log_file)) - elif observer.type == DrainType.FILE_JSON: - logger.debug( - "Starting up the {name} JSON file logger drain", name=observer.name - ) - log_file = open(observer.location, "at", buffering=1, encoding="utf8") - observers.append(jsonFileLogObserver(log_file)) - - elif observer.type == DrainType.NETWORK_JSON_TERSE: - metadata = {"server_name": hs.config.server_name} - log_observer = TerseJSONToTCPLogObserver( - hs=hs, - host=observer.location[0], - port=observer.location[1], - metadata=metadata, - maximum_buffer=observer.options.maximum_buffer, - ) - log_observer.start() - observers.append(log_observer) - else: - # We should never get here, but, just in case, throw an error. - raise ConfigError("%s drain type cannot be configured" % (observer.type,)) - - publisher = StoppableLogPublisher(*observers) - log_filter = LogLevelFilterPredicate() - - for namespace, namespace_config in log_config.get( - "loggers", DEFAULT_LOGGERS - ).items(): - # Set the log level for twisted.logger.Logger namespaces - log_filter.setLogLevelForNamespace( - namespace, - stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")), - ) - - # Also set the log levels for the stdlib logger namespaces, to prevent - # them getting to PythonStdlibToTwistedLogger and having to be formatted - if "level" in namespace_config: - logging.getLogger(namespace).setLevel(namespace_config.get("level")) - - f = FilteringLogObserver(publisher, [log_filter]) - lco = LogContextObserver(f) - - if redirect_stdlib_logging: - stuff_into_twisted = PythonStdlibToTwistedLogger(lco) - stdliblogger = logging.getLogger() - stdliblogger.addHandler(stuff_into_twisted) - - # Always redirect standard I/O, otherwise other logging outputs might miss - # it. - logBeginner.beginLoggingTo([lco], redirectStandardIO=True) + new_config = { + "version": 1, + "formatters": { + "json": {"class": "synapse.logging.JsonFormatter"}, + "tersejson": {"class": "synapse.logging.TerseJsonFormatter"}, + }, + "handlers": {}, + "loggers": log_config.get("loggers", DEFAULT_LOGGERS), + "root": {"handlers": []}, + } - return publisher + for handler_name, handler in parse_drain_configs(log_config["drains"]): + new_config["handlers"][handler_name] = handler + # Add each handler to the root logger. + new_config["root"]["handlers"].append(handler_name) -def reload_structured_logging(*args, log_config=None) -> None: - warnings.warn( - "Currently the structured logging system can not be reloaded, doing nothing" - ) + return new_config diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py index 9b46956ca9..2fbf5549a1 100644 --- a/synapse/logging/_terse_json.py +++ b/synapse/logging/_terse_json.py @@ -16,141 +16,65 @@ """ Log formatters that output terse JSON. """ - import json -from typing import IO - -from twisted.logger import FileLogObserver - -from synapse.logging._remote import TCPLogObserver +import logging _encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":")) - -def flatten_event(event: dict, metadata: dict, include_time: bool = False): - """ - Flatten a Twisted logging event to an dictionary capable of being sent - as a log event to a logging aggregation system. - - The format is vastly simplified and is not designed to be a "human readable - string" in the sense that traditional logs are. Instead, the structure is - optimised for searchability and filtering, with human-understandable log - keys. - - Args: - event (dict): The Twisted logging event we are flattening. - metadata (dict): Additional data to include with each log message. This - can be information like the server name. Since the target log - consumer does not know who we are other than by host IP, this - allows us to forward through static information. - include_time (bool): Should we include the `time` key? If False, the - event time is stripped from the event. - """ - new_event = {} - - # If it's a failure, make the new event's log_failure be the traceback text. - if "log_failure" in event: - new_event["log_failure"] = event["log_failure"].getTraceback() - - # If it's a warning, copy over a string representation of the warning. - if "warning" in event: - new_event["warning"] = str(event["warning"]) - - # Stdlib logging events have "log_text" as their human-readable portion, - # Twisted ones have "log_format". For now, include the log_format, so that - # context only given in the log format (e.g. what is being logged) is - # available. - if "log_text" in event: - new_event["log"] = event["log_text"] - else: - new_event["log"] = event["log_format"] - - # We want to include the timestamp when forwarding over the network, but - # exclude it when we are writing to stdout. This is because the log ingester - # (e.g. logstash, fluentd) can add its own timestamp. - if include_time: - new_event["time"] = round(event["log_time"], 2) - - # Convert the log level to a textual representation. - new_event["level"] = event["log_level"].name.upper() - - # Ignore these keys, and do not transfer them over to the new log object. - # They are either useless (isError), transferred manually above (log_time, - # log_level, etc), or contain Python objects which are not useful for output - # (log_logger, log_source). - keys_to_delete = [ - "isError", - "log_failure", - "log_format", - "log_level", - "log_logger", - "log_source", - "log_system", - "log_time", - "log_text", - "observer", - "warning", - ] - - # If it's from the Twisted legacy logger (twisted.python.log), it adds some - # more keys we want to purge. - if event.get("log_namespace") == "log_legacy": - keys_to_delete.extend(["message", "system", "time"]) - - # Rather than modify the dictionary in place, construct a new one with only - # the content we want. The original event should be considered 'frozen'. - for key in event.keys(): - - if key in keys_to_delete: - continue - - if isinstance(event[key], (str, int, bool, float)) or event[key] is None: - # If it's a plain type, include it as is. - new_event[key] = event[key] - else: - # If it's not one of those basic types, write out a string - # representation. This should probably be a warning in development, - # so that we are sure we are only outputting useful data. - new_event[key] = str(event[key]) - - # Add the metadata information to the event (e.g. the server_name). - new_event.update(metadata) - - return new_event - - -def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogObserver: - """ - A log observer that formats events to a flattened JSON representation. - - Args: - outFile: The file object to write to. - metadata: Metadata to be added to each log object. - """ - - def formatEvent(_event: dict) -> str: - flattened = flatten_event(_event, metadata) - return _encoder.encode(flattened) + "\n" - - return FileLogObserver(outFile, formatEvent) - - -def TerseJSONToTCPLogObserver( - hs, host: str, port: int, metadata: dict, maximum_buffer: int -) -> FileLogObserver: - """ - A log observer that formats events to a flattened JSON representation. - - Args: - hs (HomeServer): The homeserver that is being logged for. - host: The host of the logging target. - port: The logging target's port. - metadata: Metadata to be added to each log object. - maximum_buffer: The maximum buffer size. - """ - - def formatEvent(_event: dict) -> str: - flattened = flatten_event(_event, metadata, include_time=True) - return _encoder.encode(flattened) + "\n" - - return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer) +# The properties of a standard LogRecord. +_LOG_RECORD_ATTRIBUTES = { + "args", + "asctime", + "created", + "exc_info", + # exc_text isn't a public attribute, but is used to cache the result of formatException. + "exc_text", + "filename", + "funcName", + "levelname", + "levelno", + "lineno", + "message", + "module", + "msecs", + "msg", + "name", + "pathname", + "process", + "processName", + "relativeCreated", + "stack_info", + "thread", + "threadName", +} + + +class JsonFormatter(logging.Formatter): + def format(self, record: logging.LogRecord) -> str: + event = { + "log": record.getMessage(), + "namespace": record.name, + "level": record.levelname, + } + + return self._format(record, event) + + def _format(self, record: logging.LogRecord, event: dict) -> str: + # Add any extra attributes to the event. + for key, value in record.__dict__.items(): + if key not in _LOG_RECORD_ATTRIBUTES: + event[key] = value + + return _encoder.encode(event) + + +class TerseJsonFormatter(JsonFormatter): + def format(self, record: logging.LogRecord) -> str: + event = { + "log": record.getMessage(), + "namespace": record.name, + "level": record.levelname, + "time": round(record.created, 2), + } + + return self._format(record, event) diff --git a/synapse/logging/filter.py b/synapse/logging/filter.py new file mode 100644 index 0000000000..1baf8dd679 --- /dev/null +++ b/synapse/logging/filter.py @@ -0,0 +1,33 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import logging + +from typing_extensions import Literal + + +class MetadataFilter(logging.Filter): + """Logging filter that adds constant values to each record. + + Args: + metadata: Key-value pairs to add to each record. + """ + + def __init__(self, metadata: dict): + self._metadata = metadata + + def filter(self, record: logging.LogRecord) -> Literal[True]: + for key, value in self._metadata.items(): + setattr(record, key, value) + return True |