diff --git a/tests/logging/__init__.py b/tests/logging/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/logging/__init__.py
diff --git a/tests/logging/test_structured.py b/tests/logging/test_structured.py
new file mode 100644
index 0000000000..a786de0233
--- /dev/null
+++ b/tests/logging/test_structured.py
@@ -0,0 +1,197 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import os.path
+import shutil
+import sys
+import textwrap
+
+from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile
+
+from synapse.config.logger import setup_logging
+from synapse.logging._structured import setup_structured_logging
+from synapse.logging.context import LoggingContext
+
+from tests.unittest import DEBUG, HomeserverTestCase
+
+
+class FakeBeginner(object):
+ def beginLoggingTo(self, observers, **kwargs):
+ self.observers = observers
+
+
+class StructuredLoggingTestCase(HomeserverTestCase):
+ """
+ Tests for Synapse's structured logging support.
+ """
+
+ def test_output_to_json_round_trip(self):
+ """
+ Synapse logs can be outputted to JSON and then read back again.
+ """
+ temp_dir = self.mktemp()
+ os.mkdir(temp_dir)
+ self.addCleanup(shutil.rmtree, temp_dir)
+
+ json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json"))
+
+ log_config = {
+ "drains": {"jsonfile": {"type": "file_json", "location": json_log_file}}
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ # Make a logger and send an event
+ logger = Logger(
+ namespace="tests.logging.test_structured", observer=beginner.observers[0]
+ )
+ logger.info("Hello there, {name}!", name="wally")
+
+ # Read the log file and check it has the event we sent
+ with open(json_log_file, "r") as f:
+ logged_events = list(eventsFromJSONLogFile(f))
+ self.assertEqual(len(logged_events), 1)
+
+ # The event pulled from the file should render fine
+ self.assertEqual(
+ eventAsText(logged_events[0], includeTimestamp=False),
+ "[tests.logging.test_structured#info] Hello there, wally!",
+ )
+
+ def test_output_to_text(self):
+ """
+ Synapse logs can be outputted to text.
+ """
+ temp_dir = self.mktemp()
+ os.mkdir(temp_dir)
+ self.addCleanup(shutil.rmtree, temp_dir)
+
+ log_file = os.path.abspath(os.path.join(temp_dir, "out.log"))
+
+ log_config = {"drains": {"file": {"type": "file", "location": log_file}}}
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ # Make a logger and send an event
+ logger = Logger(
+ namespace="tests.logging.test_structured", observer=beginner.observers[0]
+ )
+ logger.info("Hello there, {name}!", name="wally")
+
+ # Read the log file and check it has the event we sent
+ with open(log_file, "r") as f:
+ logged_events = f.read().strip().split("\n")
+ self.assertEqual(len(logged_events), 1)
+
+ # The event pulled from the file should render fine
+ self.assertTrue(
+ logged_events[0].endswith(
+ " - tests.logging.test_structured - INFO - None - Hello there, wally!"
+ )
+ )
+
+ def test_collects_logcontext(self):
+ """
+ Test that log outputs have the attached logging context.
+ """
+ log_config = {"drains": {}}
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ publisher = setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ logs = []
+
+ publisher.addObserver(logs.append)
+
+ # Make a logger and send an event
+ logger = Logger(
+ namespace="tests.logging.test_structured", observer=beginner.observers[0]
+ )
+
+ with LoggingContext("testcontext", request="somereq"):
+ logger.info("Hello there, {name}!", name="steve")
+
+ self.assertEqual(len(logs), 1)
+ self.assertEqual(logs[0]["request"], "somereq")
+
+
+class StructuredLoggingConfigurationFileTestCase(HomeserverTestCase):
+ def make_homeserver(self, reactor, clock):
+
+ tempdir = self.mktemp()
+ os.mkdir(tempdir)
+ log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml"))
+ self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log"))
+
+ config = self.default_config()
+ config["log_config"] = log_config_file
+
+ with open(log_config_file, "w") as f:
+ f.write(
+ textwrap.dedent(
+ """\
+ structured: true
+
+ drains:
+ file:
+ type: file_json
+ location: %s
+ """
+ % (self.homeserver_log,)
+ )
+ )
+
+ self.addCleanup(self._sys_cleanup)
+
+ return self.setup_test_homeserver(config=config)
+
+ def _sys_cleanup(self):
+ sys.stdout = sys.__stdout__
+ sys.stderr = sys.__stderr__
+
+ # Do not remove! We need the logging system to be set other than WARNING.
+ @DEBUG
+ def test_log_output(self):
+ """
+ When a structured logging config is given, Synapse will use it.
+ """
+ setup_logging(self.hs, self.hs.config)
+
+ # Make a logger and send an event
+ logger = Logger(namespace="tests.logging.test_structured")
+
+ with LoggingContext("testcontext", request="somereq"):
+ logger.info("Hello there, {name}!", name="steve")
+
+ with open(self.homeserver_log, "r") as f:
+ logged_events = [
+ eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f)
+ ]
+
+ logs = "\n".join(logged_events)
+ self.assertTrue("***** STARTING SERVER *****" in logs)
+ self.assertTrue("Hello there, steve!" in logs)
diff --git a/tests/logging/test_terse_json.py b/tests/logging/test_terse_json.py
new file mode 100644
index 0000000000..514282591d
--- /dev/null
+++ b/tests/logging/test_terse_json.py
@@ -0,0 +1,234 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from collections import Counter
+
+from twisted.logger import Logger
+
+from synapse.logging._structured import setup_structured_logging
+
+from tests.server import connect_client
+from tests.unittest import HomeserverTestCase
+
+from .test_structured import FakeBeginner
+
+
+class TerseJSONTCPTestCase(HomeserverTestCase):
+ def test_log_output(self):
+ """
+ The Terse JSON outputter delivers simplified structured logs over TCP.
+ """
+ log_config = {
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ }
+ }
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs, self.hs.config, log_config, logBeginner=beginner
+ )
+
+ logger = Logger(
+ namespace="tests.logging.test_terse_json", observer=beginner.observers[0]
+ )
+ logger.info("Hello there, {name}!", name="wally")
+
+ # Trigger the connection
+ self.pump()
+
+ _, server = connect_client(self.reactor, 0)
+
+ # Trigger data being sent
+ self.pump()
+
+ # One log message, with a single trailing newline
+ logs = server.data.decode("utf8").splitlines()
+ self.assertEqual(len(logs), 1)
+ self.assertEqual(server.data.count(b"\n"), 1)
+
+ log = json.loads(logs[0])
+
+ # The terse logger should give us these keys.
+ expected_log_keys = [
+ "log",
+ "time",
+ "level",
+ "log_namespace",
+ "request",
+ "scope",
+ "server_name",
+ "name",
+ ]
+ self.assertEqual(set(log.keys()), set(expected_log_keys))
+
+ # It contains the data we expect.
+ self.assertEqual(log["name"], "wally")
+
+ def test_log_backpressure_debug(self):
+ """
+ When backpressure is hit, DEBUG logs will be shed.
+ """
+ log_config = {
+ "loggers": {"synapse": {"level": "DEBUG"}},
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ "maximum_buffer": 10,
+ }
+ },
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs,
+ self.hs.config,
+ log_config,
+ logBeginner=beginner,
+ redirect_stdlib_logging=False,
+ )
+
+ logger = Logger(
+ namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+ )
+
+ # Send some debug messages
+ for i in range(0, 3):
+ logger.debug("debug %s" % (i,))
+
+ # Send a bunch of useful messages
+ for i in range(0, 7):
+ logger.info("test message %s" % (i,))
+
+ # The last debug message pushes it past the maximum buffer
+ logger.debug("too much debug")
+
+ # Allow the reconnection
+ _, server = connect_client(self.reactor, 0)
+ self.pump()
+
+ # Only the 7 infos made it through, the debugs were elided
+ logs = server.data.splitlines()
+ self.assertEqual(len(logs), 7)
+
+ def test_log_backpressure_info(self):
+ """
+ When backpressure is hit, DEBUG and INFO logs will be shed.
+ """
+ log_config = {
+ "loggers": {"synapse": {"level": "DEBUG"}},
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ "maximum_buffer": 10,
+ }
+ },
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs,
+ self.hs.config,
+ log_config,
+ logBeginner=beginner,
+ redirect_stdlib_logging=False,
+ )
+
+ logger = Logger(
+ namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+ )
+
+ # Send some debug messages
+ for i in range(0, 3):
+ logger.debug("debug %s" % (i,))
+
+ # Send a bunch of useful messages
+ for i in range(0, 10):
+ logger.warn("test warn %s" % (i,))
+
+ # Send a bunch of info messages
+ for i in range(0, 3):
+ logger.info("test message %s" % (i,))
+
+ # The last debug message pushes it past the maximum buffer
+ logger.debug("too much debug")
+
+ # Allow the reconnection
+ client, server = connect_client(self.reactor, 0)
+ self.pump()
+
+ # The 10 warnings made it through, the debugs and infos were elided
+ logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
+ self.assertEqual(len(logs), 10)
+
+ self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
+
+ def test_log_backpressure_cut_middle(self):
+ """
+ When backpressure is hit, and no more DEBUG and INFOs cannot be culled,
+ it will cut the middle messages out.
+ """
+ log_config = {
+ "loggers": {"synapse": {"level": "DEBUG"}},
+ "drains": {
+ "tersejson": {
+ "type": "network_json_terse",
+ "host": "127.0.0.1",
+ "port": 8000,
+ "maximum_buffer": 10,
+ }
+ },
+ }
+
+ # Begin the logger with our config
+ beginner = FakeBeginner()
+ setup_structured_logging(
+ self.hs,
+ self.hs.config,
+ log_config,
+ logBeginner=beginner,
+ redirect_stdlib_logging=False,
+ )
+
+ logger = Logger(
+ namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+ )
+
+ # Send a bunch of useful messages
+ for i in range(0, 20):
+ logger.warn("test warn", num=i)
+
+ # Allow the reconnection
+ client, server = connect_client(self.reactor, 0)
+ self.pump()
+
+ # The first five and last five warnings made it through, the debugs and
+ # infos were elided
+ logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
+ self.assertEqual(len(logs), 10)
+ self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
+ self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs])
diff --git a/tests/server.py b/tests/server.py
index e573c4e4c5..c8269619b1 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -11,9 +11,13 @@ from twisted.internet import address, threads, udp
from twisted.internet._resolver import SimpleResolverComplexifier
from twisted.internet.defer import Deferred, fail, succeed
from twisted.internet.error import DNSLookupError
-from twisted.internet.interfaces import IReactorPluggableNameResolver, IResolverSimple
+from twisted.internet.interfaces import (
+ IReactorPluggableNameResolver,
+ IReactorTCP,
+ IResolverSimple,
+)
from twisted.python.failure import Failure
-from twisted.test.proto_helpers import MemoryReactorClock
+from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
from twisted.web.http import unquote
from twisted.web.http_headers import Headers
@@ -465,3 +469,22 @@ class FakeTransport(object):
self.buffer = self.buffer[len(to_write) :]
if self.buffer and self.autoflush:
self._reactor.callLater(0.0, self.flush)
+
+
+def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol:
+ """
+ Connect a client to a fake TCP transport.
+
+ Args:
+ reactor
+ factory: The connecting factory to build.
+ """
+ factory = reactor.tcpClients[client_id][2]
+ client = factory.buildProtocol(None)
+ server = AccumulatingProtocol()
+ server.makeConnection(FakeTransport(client, reactor))
+ client.makeConnection(FakeTransport(server, reactor))
+
+ reactor.tcpClients.pop(client_id)
+
+ return client, server
|