diff --git a/.buildkite/docker-compose.py35.pg95.yaml b/.buildkite/docker-compose.py35.pg95.yaml
index 2f14387fbc..aaea33006b 100644
--- a/.buildkite/docker-compose.py35.pg95.yaml
+++ b/.buildkite/docker-compose.py35.pg95.yaml
@@ -6,6 +6,7 @@ services:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
+ command: -c fsync=off
testenv:
image: python:3.5
diff --git a/.buildkite/docker-compose.py37.pg11.yaml b/.buildkite/docker-compose.py37.pg11.yaml
index f3eec05ceb..1b32675e78 100644
--- a/.buildkite/docker-compose.py37.pg11.yaml
+++ b/.buildkite/docker-compose.py37.pg11.yaml
@@ -6,6 +6,7 @@ services:
image: postgres:11
environment:
POSTGRES_PASSWORD: postgres
+ command: -c fsync=off
testenv:
image: python:3.7
diff --git a/.buildkite/docker-compose.py37.pg95.yaml b/.buildkite/docker-compose.py37.pg95.yaml
index 2a41db8eba..7679f6508d 100644
--- a/.buildkite/docker-compose.py37.pg95.yaml
+++ b/.buildkite/docker-compose.py37.pg95.yaml
@@ -6,6 +6,7 @@ services:
image: postgres:9.5
environment:
POSTGRES_PASSWORD: postgres
+ command: -c fsync=off
testenv:
image: python:3.7
diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml
index b75269a155..d9327227ed 100644
--- a/.buildkite/pipeline.yml
+++ b/.buildkite/pipeline.yml
@@ -45,8 +45,15 @@ steps:
- docker#v3.0.1:
image: "python:3.6"
- - wait
+ - command:
+ - "python -m pip install tox"
+ - "tox -e mypy"
+ label: ":mypy: mypy"
+ plugins:
+ - docker#v3.0.1:
+ image: "python:3.5"
+ - wait
- command:
- "apt-get update && apt-get install -y python3.5 python3.5-dev python3-pip libxml2-dev libxslt-dev zlib1g-dev"
@@ -55,6 +62,7 @@ steps:
label: ":python: 3.5 / SQLite / Old Deps"
env:
TRIAL_FLAGS: "-j 2"
+ LANG: "C.UTF-8"
plugins:
- docker#v3.0.1:
image: "ubuntu:xenial" # We use xenail to get an old sqlite and python
diff --git a/.gitignore b/.gitignore
index f6168a8819..e53d4908d5 100644
--- a/.gitignore
+++ b/.gitignore
@@ -20,6 +20,7 @@ _trial_temp*/
/*.signing.key
/env/
/homeserver*.yaml
+/logs
/media_store/
/uploads
@@ -29,8 +30,9 @@ _trial_temp*/
/.vscode/
# build products
-/.coverage*
!/.coveragerc
+/.coverage*
+/.mypy_cache/
/.tox
/build/
/coverage.*
@@ -38,4 +40,3 @@ _trial_temp*/
/docs/build/
/htmlcov
/pip-wheel-metadata/
-
diff --git a/changelog.d/5680.misc b/changelog.d/5680.misc
new file mode 100644
index 0000000000..46a403a188
--- /dev/null
+++ b/changelog.d/5680.misc
@@ -0,0 +1 @@
+Lay the groundwork for structured logging output.
diff --git a/docs/structured_logging.md b/docs/structured_logging.md
new file mode 100644
index 0000000000..decec9b8fa
--- /dev/null
+++ b/docs/structured_logging.md
@@ -0,0 +1,83 @@
+# Structured Logging
+
+A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack".
+
+Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to).
+
+A structured logging configuration looks similar to the following:
+
+```yaml
+structured: true
+
+loggers:
+ synapse:
+ level: INFO
+ synapse.storage.SQL:
+ level: WARNING
+
+drains:
+ console:
+ type: console
+ location: stdout
+ file:
+ type: file_json
+ location: homeserver.log
+```
+
+The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON).
+
+## Drain Types
+
+Drain types can be specified by the `type` key.
+
+### `console`
+
+Outputs human-readable logs to the console.
+
+Arguments:
+
+- `location`: Either `stdout` or `stderr`.
+
+### `console_json`
+
+Outputs machine-readable JSON logs to the console.
+
+Arguments:
+
+- `location`: Either `stdout` or `stderr`.
+
+### `console_json_terse`
+
+Outputs machine-readable JSON logs to the console, separated by newlines. This
+format is not designed to be read and re-formatted into human-readable text, but
+is optimal for a logging aggregation system.
+
+Arguments:
+
+- `location`: Either `stdout` or `stderr`.
+
+### `file`
+
+Outputs human-readable logs to a file.
+
+Arguments:
+
+- `location`: An absolute path to the file to log to.
+
+### `file_json`
+
+Outputs machine-readable logs to a file.
+
+Arguments:
+
+- `location`: An absolute path to the file to log to.
+
+### `network_json_terse`
+
+Delivers machine-readable JSON logs to a log aggregator over TCP. This is
+compatible with LogStash's TCP input with the codec set to `json_lines`.
+
+Arguments:
+
+- `host`: Hostname or IP address of the log aggregator.
+- `port`: Numerical port to contact on the host.
\ No newline at end of file
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 69dcf3523f..c30fdeee9a 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -36,18 +36,20 @@ from synapse.util.versionstring import get_version_string
logger = logging.getLogger(__name__)
+# list of tuples of function, args list, kwargs dict
_sighup_callbacks = []
-def register_sighup(func):
+def register_sighup(func, *args, **kwargs):
"""
Register a function to be called when a SIGHUP occurs.
Args:
func (function): Function to be called when sent a SIGHUP signal.
- Will be called with a single argument, the homeserver.
+ Will be called with a single default argument, the homeserver.
+ *args, **kwargs: args and kwargs to be passed to the target function.
"""
- _sighup_callbacks.append(func)
+ _sighup_callbacks.append((func, args, kwargs))
def start_worker_reactor(appname, config, run_command=reactor.run):
@@ -248,8 +250,8 @@ def start(hs, listeners=None):
# we're not using systemd.
sdnotify(b"RELOADING=1")
- for i in _sighup_callbacks:
- i(hs)
+ for i, args, kwargs in _sighup_callbacks:
+ i(hs, *args, **kwargs)
sdnotify(b"READY=1")
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index 1fd52a5526..04751a6a5e 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -227,8 +227,6 @@ def start(config_options):
config.start_pushers = False
config.send_federation = False
- setup_logging(config, use_worker_options=True)
-
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -241,6 +239,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
# We use task.react as the basic run command as it correctly handles tearing
diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 54bb114dec..767b87d2db 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -141,8 +141,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.appservice"
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -167,6 +165,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ps, config, use_worker_options=True)
+
ps.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ps, config.worker_listeners
diff --git a/synapse/app/client_reader.py b/synapse/app/client_reader.py
index 721bb5b119..86193d35a8 100644
--- a/synapse/app/client_reader.py
+++ b/synapse/app/client_reader.py
@@ -179,8 +179,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.client_reader"
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -193,6 +191,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/event_creator.py b/synapse/app/event_creator.py
index 473c8895d0..c67fe69a50 100644
--- a/synapse/app/event_creator.py
+++ b/synapse/app/event_creator.py
@@ -175,8 +175,6 @@ def start(config_options):
assert config.worker_replication_http_port is not None
- setup_logging(config, use_worker_options=True)
-
# This should only be done on the user directory worker or the master
config.update_user_directory = False
@@ -192,6 +190,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/federation_reader.py b/synapse/app/federation_reader.py
index 5255d9e8cc..1ef027a88c 100644
--- a/synapse/app/federation_reader.py
+++ b/synapse/app/federation_reader.py
@@ -160,8 +160,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.federation_reader"
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -174,6 +172,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index c5a2880e69..04fbb407af 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -171,8 +171,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.federation_sender"
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -197,6 +195,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/frontend_proxy.py b/synapse/app/frontend_proxy.py
index e2822ca848..611d285421 100644
--- a/synapse/app/frontend_proxy.py
+++ b/synapse/app/frontend_proxy.py
@@ -232,8 +232,6 @@ def start(config_options):
assert config.worker_main_http_uri is not None
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -246,6 +244,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 8233905844..04f1ed14f3 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -341,8 +341,6 @@ def setup(config_options):
# generating config files and shouldn't try to continue.
sys.exit(0)
- synapse.config.logger.setup_logging(config, use_worker_options=False)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -356,6 +354,8 @@ def setup(config_options):
database_engine=database_engine,
)
+ synapse.config.logger.setup_logging(hs, config, use_worker_options=False)
+
logger.info("Preparing database: %s...", config.database_config["name"])
try:
diff --git a/synapse/app/media_repository.py b/synapse/app/media_repository.py
index 3a168577c7..2ac783ffa3 100644
--- a/synapse/app/media_repository.py
+++ b/synapse/app/media_repository.py
@@ -155,8 +155,6 @@ def start(config_options):
"Please add ``enable_media_repo: false`` to the main config\n"
)
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -169,6 +167,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 692ffa2f04..d84732ee3c 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -184,8 +184,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.pusher"
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
if config.start_pushers:
@@ -210,6 +208,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ps, config, use_worker_options=True)
+
ps.setup()
def start():
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index a1c3b162f7..473026fce5 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -435,8 +435,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.synchrotron"
- setup_logging(config, use_worker_options=True)
-
synapse.events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -450,6 +448,8 @@ def start(config_options):
application_service_handler=SynchrotronApplicationService(),
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index cb29a1afab..e01afb39f2 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -197,8 +197,6 @@ def start(config_options):
assert config.worker_app == "synapse.app.user_dir"
- setup_logging(config, use_worker_options=True)
-
events.USE_FROZEN_DICTS = config.use_frozen_dicts
database_engine = create_engine(config.database_config)
@@ -223,6 +221,8 @@ def start(config_options):
database_engine=database_engine,
)
+ setup_logging(ss, config, use_worker_options=True)
+
ss.setup()
reactor.addSystemEventTrigger(
"before", "startup", _base.start, ss, config.worker_listeners
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index d321d00b80..981df5a10c 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -25,6 +25,10 @@ from twisted.logger import STDLibLogObserver, globalLogBeginner
import synapse
from synapse.app import _base as appbase
+from synapse.logging._structured import (
+ reload_structured_logging,
+ setup_structured_logging,
+)
from synapse.logging.context import LoggingContextFilter
from synapse.util.versionstring import get_version_string
@@ -119,21 +123,10 @@ class LoggingConfig(Config):
log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
-def setup_logging(config, use_worker_options=False):
- """ Set up python logging
-
- Args:
- config (LoggingConfig | synapse.config.workers.WorkerConfig):
- configuration data
-
- use_worker_options (bool): True to use the 'worker_log_config' option
- instead of 'log_config'.
-
- register_sighup (func | None): Function to call to register a
- sighup handler.
+def _setup_stdlib_logging(config, log_config):
+ """
+ Set up Python stdlib logging.
"""
- log_config = config.worker_log_config if use_worker_options else config.log_config
-
if log_config is None:
log_format = (
"%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
@@ -151,35 +144,10 @@ def setup_logging(config, use_worker_options=False):
handler.addFilter(LoggingContextFilter(request=""))
logger.addHandler(handler)
else:
+ logging.config.dictConfig(log_config)
- def load_log_config():
- with open(log_config, "r") as f:
- logging.config.dictConfig(yaml.safe_load(f))
-
- def sighup(*args):
- # it might be better to use a file watcher or something for this.
- load_log_config()
- logging.info("Reloaded log config from %s due to SIGHUP", log_config)
-
- load_log_config()
- appbase.register_sighup(sighup)
-
- # make sure that the first thing we log is a thing we can grep backwards
- # for
- logging.warn("***** STARTING SERVER *****")
- logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
- logging.info("Server hostname: %s", config.server_name)
-
- # It's critical to point twisted's internal logging somewhere, otherwise it
- # stacks up and leaks kup to 64K object;
- # see: https://twistedmatrix.com/trac/ticket/8164
- #
- # Routing to the python logging framework could be a performance problem if
- # the handlers blocked for a long time as python.logging is a blocking API
- # see https://twistedmatrix.com/documents/current/core/howto/logger.html
- # filed as https://github.com/matrix-org/synapse/issues/1727
- #
- # However this may not be too much of a problem if we are just writing to a file.
+ # Route Twisted's native logging through to the standard library logging
+ # system.
observer = STDLibLogObserver()
def _log(event):
@@ -201,3 +169,54 @@ def setup_logging(config, use_worker_options=False):
)
if not config.no_redirect_stdio:
print("Redirected stdout/stderr to logs")
+
+
+def _reload_stdlib_logging(*args, log_config=None):
+ logger = logging.getLogger("")
+
+ if not log_config:
+ logger.warn("Reloaded a blank config?")
+
+ logging.config.dictConfig(log_config)
+
+
+def setup_logging(hs, config, use_worker_options=False):
+ """
+ Set up the logging subsystem.
+
+ Args:
+ config (LoggingConfig | synapse.config.workers.WorkerConfig):
+ configuration data
+
+ use_worker_options (bool): True to use the 'worker_log_config' option
+ instead of 'log_config'.
+ """
+ log_config = config.worker_log_config if use_worker_options else config.log_config
+
+ def read_config(*args, callback=None):
+ if log_config is None:
+ return None
+
+ with open(log_config, "rb") as f:
+ log_config_body = yaml.safe_load(f.read())
+
+ if callback:
+ callback(log_config=log_config_body)
+ logging.info("Reloaded log config from %s due to SIGHUP", log_config)
+
+ return log_config_body
+
+ log_config_body = read_config()
+
+ if log_config_body and log_config_body.get("structured") is True:
+ setup_structured_logging(hs, config, log_config_body)
+ appbase.register_sighup(read_config, callback=reload_structured_logging)
+ else:
+ _setup_stdlib_logging(config, log_config_body)
+ appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
+
+ # make sure that the first thing we log is a thing we can grep backwards
+ # for
+ logging.warn("***** STARTING SERVER *****")
+ logging.warn("Server %s version %s", sys.argv[0], get_version_string(synapse))
+ logging.info("Server hostname: %s", config.server_name)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c86903b98b..94306c94a9 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -326,8 +326,9 @@ class FederationHandler(BaseHandler):
ours = yield self.store.get_state_groups_ids(room_id, seen)
# state_maps is a list of mappings from (type, state_key) to event_id
- # type: list[dict[tuple[str, str], str]]
- state_maps = list(ours.values())
+ state_maps = list(
+ ours.values()
+ ) # type: list[dict[tuple[str, str], str]]
# we don't need this any more, let's delete it.
del ours
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
new file mode 100644
index 0000000000..0367d6dfc4
--- /dev/null
+++ b/synapse/logging/_structured.py
@@ -0,0 +1,374 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import os.path
+import sys
+import typing
+import warnings
+
+import attr
+from constantly import NamedConstant, Names, ValueConstant, Values
+from zope.interface import implementer
+
+from twisted.logger import (
+ FileLogObserver,
+ FilteringLogObserver,
+ ILogObserver,
+ LogBeginner,
+ Logger,
+ LogLevel,
+ LogLevelFilterPredicate,
+ LogPublisher,
+ eventAsText,
+ globalLogBeginner,
+ jsonFileLogObserver,
+)
+
+from synapse.config._base import ConfigError
+from synapse.logging._terse_json import (
+ TerseJSONToConsoleLogObserver,
+ TerseJSONToTCPLogObserver,
+)
+from synapse.logging.context import LoggingContext
+
+
+def stdlib_log_level_to_twisted(level: str) -> LogLevel:
+ """
+ Convert a stdlib log level to Twisted's log level.
+ """
+ lvl = level.lower().replace("warning", "warn")
+ return LogLevel.levelWithName(lvl)
+
+
+@attr.s
+@implementer(ILogObserver)
+class LogContextObserver(object):
+ """
+ An ILogObserver which adds Synapse-specific log context information.
+
+ Attributes:
+ observer (ILogObserver): The target parent observer.
+ """
+
+ observer = attr.ib()
+
+ def __call__(self, event: dict) -> None:
+ """
+ Consume a log event and emit it to the parent observer after filtering
+ and adding log context information.
+
+ Args:
+ event (dict)
+ """
+ # Filter out some useless events that Twisted outputs
+ if "log_text" in event:
+ if event["log_text"].startswith("DNSDatagramProtocol starting on "):
+ return
+
+ if event["log_text"].startswith("(UDP Port "):
+ return
+
+ if event["log_text"].startswith("Timing out client") or event[
+ "log_format"
+ ].startswith("Timing out client"):
+ return
+
+ context = LoggingContext.current_context()
+
+ # Copy the context information to the log event.
+ if context is not None:
+ context.copy_to_twisted_log_entry(event)
+ else:
+ # If there's no logging context, not even the root one, we might be
+ # starting up or it might be from non-Synapse code. Log it as if it
+ # came from the root logger.
+ event["request"] = None
+ event["scope"] = None
+
+ self.observer(event)
+
+
+class PythonStdlibToTwistedLogger(logging.Handler):
+ """
+ Transform a Python stdlib log message into a Twisted one.
+ """
+
+ def __init__(self, observer, *args, **kwargs):
+ """
+ Args:
+ observer (ILogObserver): A Twisted logging observer.
+ *args, **kwargs: Args/kwargs to be passed to logging.Handler.
+ """
+ self.observer = observer
+ super().__init__(*args, **kwargs)
+
+ def emit(self, record: logging.LogRecord) -> None:
+ """
+ Emit a record to Twisted's observer.
+
+ Args:
+ record (logging.LogRecord)
+ """
+
+ self.observer(
+ {
+ "log_time": record.created,
+ "log_text": record.getMessage(),
+ "log_format": "{log_text}",
+ "log_namespace": record.name,
+ "log_level": stdlib_log_level_to_twisted(record.levelname),
+ }
+ )
+
+
+def SynapseFileLogObserver(outFile: typing.io.TextIO) -> FileLogObserver:
+ """
+ A log observer that formats events like the traditional log formatter and
+ sends them to `outFile`.
+
+ Args:
+ outFile (file object): The file object to write to.
+ """
+
+ def formatEvent(_event: dict) -> str:
+ event = dict(_event)
+ event["log_level"] = event["log_level"].name.upper()
+ event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
+ event.get("log_format", "{log_text}") or "{log_text}"
+ )
+ return eventAsText(event, includeSystem=False) + "\n"
+
+ return FileLogObserver(outFile, formatEvent)
+
+
+class DrainType(Names):
+ CONSOLE = NamedConstant()
+ CONSOLE_JSON = NamedConstant()
+ CONSOLE_JSON_TERSE = NamedConstant()
+ FILE = NamedConstant()
+ FILE_JSON = NamedConstant()
+ NETWORK_JSON_TERSE = NamedConstant()
+
+
+class OutputPipeType(Values):
+ stdout = ValueConstant(sys.__stdout__)
+ stderr = ValueConstant(sys.__stderr__)
+
+
+@attr.s
+class DrainConfiguration(object):
+ name = attr.ib()
+ type = attr.ib()
+ location = attr.ib()
+ options = attr.ib(default=None)
+
+
+@attr.s
+class NetworkJSONTerseOptions(object):
+ maximum_buffer = attr.ib(type=int)
+
+
+DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
+
+
+def parse_drain_configs(
+ drains: dict
+) -> typing.Generator[DrainConfiguration, None, None]:
+ """
+ Parse the drain configurations.
+
+ Args:
+ drains (dict): A list of drain configurations.
+
+ Yields:
+ DrainConfiguration instances.
+
+ Raises:
+ ConfigError: If any of the drain configuration items are invalid.
+ """
+ for name, config in drains.items():
+ if "type" not in config:
+ raise ConfigError("Logging drains require a 'type' key.")
+
+ try:
+ logging_type = DrainType.lookupByName(config["type"].upper())
+ except ValueError:
+ raise ConfigError(
+ "%s is not a known logging drain type." % (config["type"],)
+ )
+
+ if logging_type in [
+ DrainType.CONSOLE,
+ DrainType.CONSOLE_JSON,
+ DrainType.CONSOLE_JSON_TERSE,
+ ]:
+ location = config.get("location")
+ if location is None or location not in ["stdout", "stderr"]:
+ raise ConfigError(
+ (
+ "The %s drain needs the 'location' key set to "
+ "either 'stdout' or 'stderr'."
+ )
+ % (logging_type,)
+ )
+
+ pipe = OutputPipeType.lookupByName(location).value
+
+ yield DrainConfiguration(name=name, type=logging_type, location=pipe)
+
+ elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
+ if "location" not in config:
+ raise ConfigError(
+ "The %s drain needs the 'location' key set." % (logging_type,)
+ )
+
+ location = config.get("location")
+ if os.path.abspath(location) != location:
+ raise ConfigError(
+ "File paths need to be absolute, '%s' is a relative path"
+ % (location,)
+ )
+ yield DrainConfiguration(name=name, type=logging_type, location=location)
+
+ elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
+ host = config.get("host")
+ port = config.get("port")
+ maximum_buffer = config.get("maximum_buffer", 1000)
+ yield DrainConfiguration(
+ name=name,
+ type=logging_type,
+ location=(host, port),
+ options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
+ )
+
+ else:
+ raise ConfigError(
+ "The %s drain type is currently not implemented."
+ % (config["type"].upper(),)
+ )
+
+
+def setup_structured_logging(
+ hs,
+ config,
+ log_config: dict,
+ logBeginner: LogBeginner = globalLogBeginner,
+ redirect_stdlib_logging: bool = True,
+) -> LogPublisher:
+ """
+ Set up Twisted's structured logging system.
+
+ Args:
+ hs: The homeserver to use.
+ config (HomeserverConfig): The configuration of the Synapse homeserver.
+ log_config (dict): The log configuration to use.
+ """
+ if config.no_redirect_stdio:
+ raise ConfigError(
+ "no_redirect_stdio cannot be defined using structured logging."
+ )
+
+ logger = Logger()
+
+ if "drains" not in log_config:
+ raise ConfigError("The logging configuration requires a list of drains.")
+
+ observers = []
+
+ for observer in parse_drain_configs(log_config["drains"]):
+ # Pipe drains
+ if observer.type == DrainType.CONSOLE:
+ logger.debug(
+ "Starting up the {name} console logger drain", name=observer.name
+ )
+ observers.append(SynapseFileLogObserver(observer.location))
+ elif observer.type == DrainType.CONSOLE_JSON:
+ logger.debug(
+ "Starting up the {name} JSON console logger drain", name=observer.name
+ )
+ observers.append(jsonFileLogObserver(observer.location))
+ elif observer.type == DrainType.CONSOLE_JSON_TERSE:
+ logger.debug(
+ "Starting up the {name} terse JSON console logger drain",
+ name=observer.name,
+ )
+ observers.append(
+ TerseJSONToConsoleLogObserver(observer.location, metadata={})
+ )
+
+ # File drains
+ elif observer.type == DrainType.FILE:
+ logger.debug("Starting up the {name} file logger drain", name=observer.name)
+ log_file = open(observer.location, "at", buffering=1, encoding="utf8")
+ observers.append(SynapseFileLogObserver(log_file))
+ elif observer.type == DrainType.FILE_JSON:
+ logger.debug(
+ "Starting up the {name} JSON file logger drain", name=observer.name
+ )
+ log_file = open(observer.location, "at", buffering=1, encoding="utf8")
+ observers.append(jsonFileLogObserver(log_file))
+
+ elif observer.type == DrainType.NETWORK_JSON_TERSE:
+ metadata = {"server_name": hs.config.server_name}
+ log_observer = TerseJSONToTCPLogObserver(
+ hs=hs,
+ host=observer.location[0],
+ port=observer.location[1],
+ metadata=metadata,
+ maximum_buffer=observer.options.maximum_buffer,
+ )
+ log_observer.start()
+ observers.append(log_observer)
+ else:
+ # We should never get here, but, just in case, throw an error.
+ raise ConfigError("%s drain type cannot be configured" % (observer.type,))
+
+ publisher = LogPublisher(*observers)
+ log_filter = LogLevelFilterPredicate()
+
+ for namespace, namespace_config in log_config.get(
+ "loggers", DEFAULT_LOGGERS
+ ).items():
+ # Set the log level for twisted.logger.Logger namespaces
+ log_filter.setLogLevelForNamespace(
+ namespace,
+ stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
+ )
+
+ # Also set the log levels for the stdlib logger namespaces, to prevent
+ # them getting to PythonStdlibToTwistedLogger and having to be formatted
+ if "level" in namespace_config:
+ logging.getLogger(namespace).setLevel(namespace_config.get("level"))
+
+ f = FilteringLogObserver(publisher, [log_filter])
+ lco = LogContextObserver(f)
+
+ if redirect_stdlib_logging:
+ stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
+ stdliblogger = logging.getLogger()
+ stdliblogger.addHandler(stuff_into_twisted)
+
+ # Always redirect standard I/O, otherwise other logging outputs might miss
+ # it.
+ logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
+
+ return publisher
+
+
+def reload_structured_logging(*args, log_config=None) -> None:
+ warnings.warn(
+ "Currently the structured logging system can not be reloaded, doing nothing"
+ )
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
new file mode 100644
index 0000000000..7f1e8f23fe
--- /dev/null
+++ b/synapse/logging/_terse_json.py
@@ -0,0 +1,278 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+"""
+Log formatters that output terse JSON.
+"""
+
+import sys
+from collections import deque
+from ipaddress import IPv4Address, IPv6Address, ip_address
+from math import floor
+from typing.io import TextIO
+
+import attr
+from simplejson import dumps
+
+from twisted.application.internet import ClientService
+from twisted.internet.endpoints import (
+ HostnameEndpoint,
+ TCP4ClientEndpoint,
+ TCP6ClientEndpoint,
+)
+from twisted.internet.protocol import Factory, Protocol
+from twisted.logger import FileLogObserver, Logger
+from twisted.python.failure import Failure
+
+
+def flatten_event(event: dict, metadata: dict, include_time: bool = False):
+ """
+ Flatten a Twisted logging event to an dictionary capable of being sent
+ as a log event to a logging aggregation system.
+
+ The format is vastly simplified and is not designed to be a "human readable
+ string" in the sense that traditional logs are. Instead, the structure is
+ optimised for searchability and filtering, with human-understandable log
+ keys.
+
+ Args:
+ event (dict): The Twisted logging event we are flattening.
+ metadata (dict): Additional data to include with each log message. This
+ can be information like the server name. Since the target log
+ consumer does not know who we are other than by host IP, this
+ allows us to forward through static information.
+ include_time (bool): Should we include the `time` key? If False, the
+ event time is stripped from the event.
+ """
+ new_event = {}
+
+ # If it's a failure, make the new event's log_failure be the traceback text.
+ if "log_failure" in event:
+ new_event["log_failure"] = event["log_failure"].getTraceback()
+
+ # If it's a warning, copy over a string representation of the warning.
+ if "warning" in event:
+ new_event["warning"] = str(event["warning"])
+
+ # Stdlib logging events have "log_text" as their human-readable portion,
+ # Twisted ones have "log_format". For now, include the log_format, so that
+ # context only given in the log format (e.g. what is being logged) is
+ # available.
+ if "log_text" in event:
+ new_event["log"] = event["log_text"]
+ else:
+ new_event["log"] = event["log_format"]
+
+ # We want to include the timestamp when forwarding over the network, but
+ # exclude it when we are writing to stdout. This is because the log ingester
+ # (e.g. logstash, fluentd) can add its own timestamp.
+ if include_time:
+ new_event["time"] = round(event["log_time"], 2)
+
+ # Convert the log level to a textual representation.
+ new_event["level"] = event["log_level"].name.upper()
+
+ # Ignore these keys, and do not transfer them over to the new log object.
+ # They are either useless (isError), transferred manually above (log_time,
+ # log_level, etc), or contain Python objects which are not useful for output
+ # (log_logger, log_source).
+ keys_to_delete = [
+ "isError",
+ "log_failure",
+ "log_format",
+ "log_level",
+ "log_logger",
+ "log_source",
+ "log_system",
+ "log_time",
+ "log_text",
+ "observer",
+ "warning",
+ ]
+
+ # If it's from the Twisted legacy logger (twisted.python.log), it adds some
+ # more keys we want to purge.
+ if event.get("log_namespace") == "log_legacy":
+ keys_to_delete.extend(["message", "system", "time"])
+
+ # Rather than modify the dictionary in place, construct a new one with only
+ # the content we want. The original event should be considered 'frozen'.
+ for key in event.keys():
+
+ if key in keys_to_delete:
+ continue
+
+ if isinstance(event[key], (str, int, bool, float)) or event[key] is None:
+ # If it's a plain type, include it as is.
+ new_event[key] = event[key]
+ else:
+ # If it's not one of those basic types, write out a string
+ # representation. This should probably be a warning in development,
+ # so that we are sure we are only outputting useful data.
+ new_event[key] = str(event[key])
+
+ # Add the metadata information to the event (e.g. the server_name).
+ new_event.update(metadata)
+
+ return new_event
+
+
+def TerseJSONToConsoleLogObserver(outFile: TextIO, metadata: dict) -> FileLogObserver:
+ """
+ A log observer that formats events to a flattened JSON representation.
+
+ Args:
+ outFile: The file object to write to.
+ metadata: Metadata to be added to each log object.
+ """
+
+ def formatEvent(_event: dict) -> str:
+ flattened = flatten_event(_event, metadata)
+ return dumps(flattened, ensure_ascii=False, separators=(",", ":")) + "\n"
+
+ return FileLogObserver(outFile, formatEvent)
+
+
+@attr.s
+class TerseJSONToTCPLogObserver(object):
+ """
+ An IObserver that writes JSON logs to a TCP target.
+
+ Args:
+ hs (HomeServer): The Homeserver that is being logged for.
+ host: The host of the logging target.
+ port: The logging target's port.
+ metadata: Metadata to be added to each log entry.
+ """
+
+ hs = attr.ib()
+ host = attr.ib(type=str)
+ port = attr.ib(type=int)
+ metadata = attr.ib(type=dict)
+ maximum_buffer = attr.ib(type=int)
+ _buffer = attr.ib(default=attr.Factory(deque), type=deque)
+ _writer = attr.ib(default=None)
+ _logger = attr.ib(default=attr.Factory(Logger))
+
+ def start(self) -> None:
+
+ # Connect without DNS lookups if it's a direct IP.
+ try:
+ ip = ip_address(self.host)
+ if isinstance(ip, IPv4Address):
+ endpoint = TCP4ClientEndpoint(
+ self.hs.get_reactor(), self.host, self.port
+ )
+ elif isinstance(ip, IPv6Address):
+ endpoint = TCP6ClientEndpoint(
+ self.hs.get_reactor(), self.host, self.port
+ )
+ except ValueError:
+ endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+
+ factory = Factory.forProtocol(Protocol)
+ self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+ self._service.startService()
+
+ def _write_loop(self) -> None:
+ """
+ Implement the write loop.
+ """
+ if self._writer:
+ return
+
+ self._writer = self._service.whenConnected()
+
+ @self._writer.addBoth
+ def writer(r):
+ if isinstance(r, Failure):
+ r.printTraceback(file=sys.__stderr__)
+ self._writer = None
+ self.hs.get_reactor().callLater(1, self._write_loop)
+ return
+
+ try:
+ for event in self._buffer:
+ r.transport.write(
+ dumps(event, ensure_ascii=False, separators=(",", ":")).encode(
+ "utf8"
+ )
+ )
+ r.transport.write(b"\n")
+ self._buffer.clear()
+ except Exception as e:
+ sys.__stderr__.write("Failed writing out logs with %s\n" % (str(e),))
+
+ self._writer = False
+ self.hs.get_reactor().callLater(1, self._write_loop)
+
+ def _handle_pressure(self) -> None:
+ """
+ Handle backpressure by shedding events.
+
+ The buffer will, in this order, until the buffer is below the maximum:
+ - Shed DEBUG events
+ - Shed INFO events
+ - Shed the middle 50% of the events.
+ """
+ if len(self._buffer) <= self.maximum_buffer:
+ return
+
+ # Strip out DEBUGs
+ self._buffer = deque(
+ filter(lambda event: event["level"] != "DEBUG", self._buffer)
+ )
+
+ if len(self._buffer) <= self.maximum_buffer:
+ return
+
+ # Strip out INFOs
+ self._buffer = deque(
+ filter(lambda event: event["level"] != "INFO", self._buffer)
+ )
+
+ if len(self._buffer) <= self.maximum_buffer:
+ return
+
+ # Cut the middle entries out
+ buffer_split = floor(self.maximum_buffer / 2)
+
+ old_buffer = self._buffer
+ self._buffer = deque()
+
+ for i in range(buffer_split):
+ self._buffer.append(old_buffer.popleft())
+
+ end_buffer = []
+ for i in range(buffer_split):
+ end_buffer.append(old_buffer.pop())
+
+ self._buffer.extend(reversed(end_buffer))
+
+ def __call__(self, event: dict) -> None:
+ flattened = flatten_event(event, self.metadata, include_time=True)
+ self._buffer.append(flattened)
+
+ # Handle backpressure, if it exists.
+ try:
+ self._handle_pressure()
+ except Exception:
+ # If handling backpressure fails,clear the buffer and log the
+ # exception.
+ self._buffer.clear()
+ self._logger.failure("Failed clearing backpressure")
+
+ # Try and write immediately.
+ self._write_loop()
diff --git a/synapse/logging/context.py b/synapse/logging/context.py
index b456c31f70..63379bfb93 100644
--- a/synapse/logging/context.py
+++ b/synapse/logging/context.py
@@ -25,6 +25,7 @@ See doc/log_contexts.rst for details on how this works.
import logging
import threading
import types
+from typing import Any, List
from twisted.internet import defer, threads
@@ -194,7 +195,7 @@ class LoggingContext(object):
class Sentinel(object):
"""Sentinel to represent the root context"""
- __slots__ = []
+ __slots__ = [] # type: List[Any]
def __str__(self):
return "sentinel"
@@ -202,6 +203,10 @@ class LoggingContext(object):
def copy_to(self, record):
pass
+ def copy_to_twisted_log_entry(self, record):
+ record["request"] = None
+ record["scope"] = None
+
def start(self):
pass
@@ -330,6 +335,13 @@ class LoggingContext(object):
# we also track the current scope:
record.scope = self.scope
+ def copy_to_twisted_log_entry(self, record):
+ """
+ Copy logging fields from this context to a Twisted log record.
+ """
+ record["request"] = self.request
+ record["scope"] = self.scope
+
def start(self):
if get_thread_id() != self.main_thread:
logger.warning("Started logcontext %s on different thread", self)
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index c6465c0386..ec0ac547c1 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -47,9 +47,9 @@ REQUIREMENTS = [
"idna>=2.5",
# validating SSL certs for IP addresses requires service_identity 18.1.
"service_identity>=18.1.0",
- # our logcontext handling relies on the ability to cancel inlineCallbacks
- # (https://twistedmatrix.com/trac/ticket/4632) which landed in Twisted 18.7.
- "Twisted>=18.7.0",
+ # Twisted 18.9 introduces some logger improvements that the structured
+ # logger utilises
+ "Twisted>=18.9.0",
"treq>=15.1",
# Twisted has required pyopenssl 16.0 since about Twisted 16.6.
"pyopenssl>=16.0.0",
diff --git a/tests/logging/__init__.py b/tests/logging/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/logging/__init__.py
diff --git a/tests/logging/test_structured.py b/tests/logging/test_structured.py
new file mode 100644
index 0000000000..a786de0233
--- /dev/null
+++ b/tests/logging/test_structured.py
@@ -0,0 +1,197 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import os.path
+import shutil
+import sys
+import textwrap
+
+from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile
+
+from synapse.config.logger import setup_logging
+from synapse.logging._structured import setup_structured_logging
+from synapse.logging.context import LoggingContext
+
+from tests.unittest import DEBUG, HomeserverTestCase
+
+
+class FakeBeginner(object):
+ def beginLoggingTo(self, observers, **kwargs):
+ self.observers = observers
+
+
+class StructuredLoggingTestCase(HomeserverTestCase):
+ """
+ Tests for Synapse's structured logging support.
+ """
+
+ def test_output_to_json_round_trip(self):
+ """
+ Synapse logs can be outputted to JSON and then read back again.
+ """
+ temp_dir = self.mktemp()
+ os.mkdir(temp_dir)
+ self.addCleanup(shutil.rmtree, temp_dir)
+
+ json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json"))
+
+ log_config = {
+ "drains": {"jsonfile": {"type": "file_json", "location": json_log_file}}
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ # Make a logger and send an event
+ logger = Logger(
+ namespace="tests.logging.test_structured", observer=beginner.observers[0]
+ )
+ logger.info("Hello there, {name}!", name="wally")
+
+ # Read the log file and check it has the event we sent
+ with open(json_log_file, "r") as f:
+ logged_events = list(eventsFromJSONLogFile(f))
+ self.assertEqual(len(logged_events), 1)
+
+ # The event pulled from the file should render fine
+ self.assertEqual(
+ eventAsText(logged_events[0], includeTimestamp=False),
+ "[tests.logging.test_structured#info] Hello there, wally!",
+ )
+
+ def test_output_to_text(self):
+ """
+ Synapse logs can be outputted to text.
+ """
+ temp_dir = self.mktemp()
+ os.mkdir(temp_dir)
+ self.addCleanup(shutil.rmtree, temp_dir)
+
+ log_file = os.path.abspath(os.path.join(temp_dir, "out.log"))
+
+ log_config = {"drains": {"file": {"type": "file", "location": log_file}}}
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ # Make a logger and send an event
+ logger = Logger(
+ namespace="tests.logging.test_structured", observer=beginner.observers[0]
+ )
+ logger.info("Hello there, {name}!", name="wally")
+
+ # Read the log file and check it has the event we sent
+ with open(log_file, "r") as f:
+ logged_events = f.read().strip().split("\n")
+ self.assertEqual(len(logged_events), 1)
+
+ # The event pulled from the file should render fine
+ self.assertTrue(
+ logged_events[0].endswith(
+ " - tests.logging.test_structured - INFO - None - Hello there, wally!"
+ )
+ )
+
+ def test_collects_logcontext(self):
+ """
+ Test that log outputs have the attached logging context.
+ """
+ log_config = {"drains": {}}
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ publisher = setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ logs = []
+
+ publisher.addObserver(logs.append)
+
+ # Make a logger and send an event
+ logger = Logger(
+ namespace="tests.logging.test_structured", observer=beginner.observers[0]
+ )
+
+ with LoggingContext("testcontext", request="somereq"):
+ logger.info("Hello there, {name}!", name="steve")
+
+ self.assertEqual(len(logs), 1)
+ self.assertEqual(logs[0]["request"], "somereq")
+
+
+class StructuredLoggingConfigurationFileTestCase(HomeserverTestCase):
+ def make_homeserver(self, reactor, clock):
+
+ tempdir = self.mktemp()
+ os.mkdir(tempdir)
+ log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml"))
+ self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log"))
+
+ config = self.default_config()
+ config["log_config"] = log_config_file
+
+ with open(log_config_file, "w") as f:
+ f.write(
+ textwrap.dedent(
+ """\
+ structured: true
+
+ drains:
+ file:
+ type: file_json
+ location: %s
+ """
+ % (self.homeserver_log,)
+ )
+ )
+
+ self.addCleanup(self._sys_cleanup)
+
+ return self.setup_test_homeserver(config=config)
+
+ def _sys_cleanup(self):
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+
+ # Do not remove! We need the logging system to be set other than WARNING.
+ @DEBUG
+ def test_log_output(self):
+ """
+ When a structured logging config is given, Synapse will use it.
+ """
+ setup_logging(self.hs, self.hs.config)
+
+ # Make a logger and send an event
+ logger = Logger(namespace="tests.logging.test_structured")
+
+ with LoggingContext("testcontext", request="somereq"):
+ logger.info("Hello there, {name}!", name="steve")
+
+ with open(self.homeserver_log, "r") as f:
+ logged_events = [
+ eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f)
+ ]
+
+ logs = "\n".join(logged_events)
+ self.assertTrue("***** STARTING SERVER *****" in logs)
+ self.assertTrue("Hello there, steve!" in logs)
diff --git a/tests/logging/test_terse_json.py b/tests/logging/test_terse_json.py
new file mode 100644
index 0000000000..514282591d
--- /dev/null
+++ b/tests/logging/test_terse_json.py
@@ -0,0 +1,234 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from collections import Counter
+
+from twisted.logger import Logger
+
+from synapse.logging._structured import setup_structured_logging
+
+from tests.server import connect_client
+from tests.unittest import HomeserverTestCase
+
+from .test_structured import FakeBeginner
+
+
+class TerseJSONTCPTestCase(HomeserverTestCase):
+ def test_log_output(self):
+ """
+ The Terse JSON outputter delivers simplified structured logs over TCP.
+ """
+ log_config = {
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ }
+ }
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ logger = Logger(
+ namespace="tests.logging.test_terse_json", observer=beginner.observers[0]
+ )
+ logger.info("Hello there, {name}!", name="wally")
+
+ # Trigger the connection
+ self.pump()
+
+ _, server = connect_client(self.reactor, 0)
+
+ # Trigger data being sent
+ self.pump()
+
+ # One log message, with a single trailing newline
+ logs = server.data.decode("utf8").splitlines()
+ self.assertEqual(len(logs), 1)
+ self.assertEqual(server.data.count(b"\n"), 1)
+
+ log = json.loads(logs[0])
+
+ # The terse logger should give us these keys.
+ expected_log_keys = [
+ "log",
+ "time",
+ "level",
+ "log_namespace",
+ "request",
+ "scope",
+ "server_name",
+ "name",
+ ]
+ self.assertEqual(set(log.keys()), set(expected_log_keys))
+
+ # It contains the data we expect.
+ self.assertEqual(log["name"], "wally")
+
+ def test_log_backpressure_debug(self):
+ """
+ When backpressure is hit, DEBUG logs will be shed.
+ """
+ log_config = {
+ "loggers": {"synapse": {"level": "DEBUG"}},
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ "maximum_buffer": 10,
+ }
+ },
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs,
+ self.hs.config,
+ log_config,
+ logBeginner=beginner,
+ redirect_stdlib_logging=False,
+ )
+
+ logger = Logger(
+ namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+ )
+
+ # Send some debug messages
+ for i in range(0, 3):
+ logger.debug("debug %s" % (i,))
+
+ # Send a bunch of useful messages
+ for i in range(0, 7):
+ logger.info("test message %s" % (i,))
+
+ # The last debug message pushes it past the maximum buffer
+ logger.debug("too much debug")
+
+ # Allow the reconnection
+ _, server = connect_client(self.reactor, 0)
+ self.pump()
+
+ # Only the 7 infos made it through, the debugs were elided
+ logs = server.data.splitlines()
+ self.assertEqual(len(logs), 7)
+
+ def test_log_backpressure_info(self):
+ """
+ When backpressure is hit, DEBUG and INFO logs will be shed.
+ """
+ log_config = {
+ "loggers": {"synapse": {"level": "DEBUG"}},
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ "maximum_buffer": 10,
+ }
+ },
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs,
+ self.hs.config,
+ log_config,
+ logBeginner=beginner,
+ redirect_stdlib_logging=False,
+ )
+
+ logger = Logger(
+ namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+ )
+
+ # Send some debug messages
+ for i in range(0, 3):
+ logger.debug("debug %s" % (i,))
+
+ # Send a bunch of useful messages
+ for i in range(0, 10):
+ logger.warn("test warn %s" % (i,))
+
+ # Send a bunch of info messages
+ for i in range(0, 3):
+ logger.info("test message %s" % (i,))
+
+ # The last debug message pushes it past the maximum buffer
+ logger.debug("too much debug")
+
+ # Allow the reconnection
+ client, server = connect_client(self.reactor, 0)
+ self.pump()
+
+ # The 10 warnings made it through, the debugs and infos were elided
+ logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
+ self.assertEqual(len(logs), 10)
+
+ self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
+
+ def test_log_backpressure_cut_middle(self):
+ """
+ When backpressure is hit, and no more DEBUG and INFOs cannot be culled,
+ it will cut the middle messages out.
+ """
+ log_config = {
+ "loggers": {"synapse": {"level": "DEBUG"}},
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ "maximum_buffer": 10,
+ }
+ },
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs,
+ self.hs.config,
+ log_config,
+ logBeginner=beginner,
+ redirect_stdlib_logging=False,
+ )
+
+ logger = Logger(
+ namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+ )
+
+ # Send a bunch of useful messages
+ for i in range(0, 20):
+ logger.warn("test warn", num=i)
+
+ # Allow the reconnection
+ client, server = connect_client(self.reactor, 0)
+ self.pump()
+
+ # The first five and last five warnings made it through, the debugs and
+ # infos were elided
+ logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
+ self.assertEqual(len(logs), 10)
+ self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
+ self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs])
diff --git a/tests/server.py b/tests/server.py
index e573c4e4c5..c8269619b1 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -11,9 +11,13 @@ from twisted.internet import address, threads, udp
from twisted.internet._resolver import SimpleResolverComplexifier
from twisted.internet.defer import Deferred, fail, succeed
from twisted.internet.error import DNSLookupError
-from twisted.internet.interfaces import IReactorPluggableNameResolver, IResolverSimple
+from twisted.internet.interfaces import (
+ IReactorPluggableNameResolver,
+ IReactorTCP,
+ IResolverSimple,
+)
from twisted.python.failure import Failure
-from twisted.test.proto_helpers import MemoryReactorClock
+from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
from twisted.web.http import unquote
from twisted.web.http_headers import Headers
@@ -465,3 +469,22 @@ class FakeTransport(object):
self.buffer = self.buffer[len(to_write) :]
if self.buffer and self.autoflush:
self._reactor.callLater(0.0, self.flush)
+
+
+def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol:
+ """
+ Connect a client to a fake TCP transport.
+
+ Args:
+ reactor
+ factory: The connecting factory to build.
+ """
+ factory = reactor.tcpClients[client_id][2]
+ client = factory.buildProtocol(None)
+ server = AccumulatingProtocol()
+ server.makeConnection(FakeTransport(client, reactor))
+ client.makeConnection(FakeTransport(server, reactor))
+
+ reactor.tcpClients.pop(client_id)
+
+ return client, server
diff --git a/tox.ini b/tox.ini
index 09b4b8fc3c..f9a3b7e49a 100644
--- a/tox.ini
+++ b/tox.ini
@@ -146,3 +146,13 @@ commands =
coverage combine
coverage xml
codecov -X gcov
+
+[testenv:mypy]
+basepython = python3.5
+deps =
+ {[base]deps}
+ mypy
+extras = all
+commands = mypy --ignore-missing-imports \
+ synapse/logging/_structured.py \
+ synapse/logging/_terse_json.py
\ No newline at end of file
|