summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--UPGRADE.rst16
-rw-r--r--changelog.d/8607.misc1
-rw-r--r--docs/sample_log_config.yaml4
-rw-r--r--docs/structured_logging.md164
-rwxr-xr-xscripts-dev/lint.sh2
-rw-r--r--synapse/config/logger.py96
-rw-r--r--synapse/logging/__init__.py20
-rw-r--r--synapse/logging/_remote.py97
-rw-r--r--synapse/logging/_structured.py329
-rw-r--r--synapse/logging/_terse_json.py192
-rw-r--r--synapse/logging/filter.py33
-rw-r--r--synmark/__init__.py39
-rw-r--r--synmark/__main__.py6
-rw-r--r--synmark/suites/logging.py60
-rw-r--r--tests/logging/__init__.py34
-rw-r--r--tests/logging/test_remote_handler.py153
-rw-r--r--tests/logging/test_structured.py214
-rw-r--r--tests/logging/test_terse_json.py253
-rw-r--r--tests/server.py4
19 files changed, 706 insertions, 1011 deletions
diff --git a/UPGRADE.rst b/UPGRADE.rst
index 5a68312217..960c2aeb2b 100644
--- a/UPGRADE.rst
+++ b/UPGRADE.rst
@@ -75,6 +75,22 @@ for example:
      wget https://packages.matrix.org/debian/pool/main/m/matrix-synapse-py3/matrix-synapse-py3_1.3.0+stretch1_amd64.deb
      dpkg -i matrix-synapse-py3_1.3.0+stretch1_amd64.deb
 
+Upgrading to v1.23.0
+====================
+
+Structured logging configuration breaking changes
+-------------------------------------------------
+
+This release deprecates use of the ``structured: true`` logging configuration for
+structured logging. If your logging configuration contains ``structured: true``
+then it should be modified based on the `structured logging documentation
+<https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md>`_.
+
+The ``structured`` and ``drains`` logging options are now deprecated and should
+be replaced by standard logging configuration of ``handlers`` and ``formatters`.
+
+A future will release of Synapse will make using ``structured: true`` an error.
+
 Upgrading to v1.22.0
 ====================
 
diff --git a/changelog.d/8607.misc b/changelog.d/8607.misc
new file mode 100644
index 0000000000..9e56551a34
--- /dev/null
+++ b/changelog.d/8607.misc
@@ -0,0 +1 @@
+Re-organize the structured logging code to separate the TCP transport handling from the JSON formatting.
diff --git a/docs/sample_log_config.yaml b/docs/sample_log_config.yaml
index e26657f9fe..ff3c747180 100644
--- a/docs/sample_log_config.yaml
+++ b/docs/sample_log_config.yaml
@@ -3,7 +3,11 @@
 # This is a YAML file containing a standard Python logging configuration
 # dictionary. See [1] for details on the valid settings.
 #
+# Synapse also supports structured logging for machine readable logs which can
+# be ingested by ELK stacks. See [2] for details.
+#
 # [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
+# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md
 
 version: 1
 
diff --git a/docs/structured_logging.md b/docs/structured_logging.md
index decec9b8fa..b1281667e0 100644
--- a/docs/structured_logging.md
+++ b/docs/structured_logging.md
@@ -1,83 +1,161 @@
 # Structured Logging
 
-A structured logging system can be useful when your logs are destined for a machine to parse and process. By maintaining its machine-readable characteristics, it enables more efficient searching and aggregations when consumed by software such as the "ELK stack".
+A structured logging system can be useful when your logs are destined for a
+machine to parse and process. By maintaining its machine-readable characteristics,
+it enables more efficient searching and aggregations when consumed by software
+such as the "ELK stack".
 
-Synapse's structured logging system is configured via the file that Synapse's `log_config` config option points to. The file must be YAML and contain `structured: true`. It must contain a list of "drains" (places where logs go to).
+Synapse's structured logging system is configured via the file that Synapse's
+`log_config` config option points to. The file should include a formatter which
+uses the `synapse.logging.TerseJsonFormatter` class included with Synapse and a
+handler which uses the above formatter.
+
+There is also a `synapse.logging.JsonFormatter` option which does not include
+a timestamp in the resulting JSON. This is useful if the log ingester adds its
+own timestamp.
 
 A structured logging configuration looks similar to the following:
 
 ```yaml
-structured: true
+version: 1
+
+formatters:
+    structured:
+        class: synapse.logging.TerseJsonFormatter
+
+handlers:
+    file:
+        class: logging.handlers.TimedRotatingFileHandler
+        formatter: structured
+        filename: /path/to/my/logs/homeserver.log
+        when: midnight
+        backupCount: 3  # Does not include the current log file.
+        encoding: utf8
 
 loggers:
     synapse:
         level: INFO
+        handlers: [remote]
     synapse.storage.SQL:
         level: WARNING
-
-drains:
-    console:
-        type: console
-        location: stdout
-    file:
-        type: file_json
-        location: homeserver.log
 ```
 
-The above logging config will set Synapse as 'INFO' logging level by default, with the SQL layer at 'WARNING', and will have two logging drains (to the console and to a file, stored as JSON).
-
-## Drain Types
+The above logging config will set Synapse as 'INFO' logging level by default,
+with the SQL layer at 'WARNING', and will log to a file, stored as JSON.
 
-Drain types can be specified by the `type` key.
+It is also possible to figure Synapse to log to a remote endpoint by using the
+`synapse.logging.RemoteHandler` class included with Synapse. It takes the
+following arguments:
 
-### `console`
+- `host`: Hostname or IP address of the log aggregator.
+- `port`: Numerical port to contact on the host.
+- `maximum_buffer`: (Optional, defaults to 1000) The maximum buffer size to allow.
 
-Outputs human-readable logs to the console.
+A remote structured logging configuration looks similar to the following:
 
-Arguments:
+```yaml
+version: 1
 
-- `location`: Either `stdout` or `stderr`.
+formatters:
+    structured:
+        class: synapse.logging.TerseJsonFormatter
 
-### `console_json`
+handlers:
+    remote:
+        class: synapse.logging.RemoteHandler
+        formatter: structured
+        host: 10.1.2.3
+        port: 9999
 
-Outputs machine-readable JSON logs to the console.
+loggers:
+    synapse:
+        level: INFO
+        handlers: [remote]
+    synapse.storage.SQL:
+        level: WARNING
+```
 
-Arguments:
+The above logging config will set Synapse as 'INFO' logging level by default,
+with the SQL layer at 'WARNING', and will log JSON formatted messages to a
+remote endpoint at 10.1.2.3:9999.
 
-- `location`: Either `stdout` or `stderr`.
+## Upgrading from legacy structured logging configuration
 
-### `console_json_terse`
+Versions of Synapse prior to v1.23.0 included a custom structured logging
+configuration which is deprecated. It used a `structured: true` flag and
+configured `drains` instead of ``handlers`` and `formatters`.
 
-Outputs machine-readable JSON logs to the console, separated by newlines. This
-format is not designed to be read and re-formatted into human-readable text, but
-is optimal for a logging aggregation system.
+Synapse currently automatically converts the old configuration to the new
+configuration, but this will be removed in a future version of Synapse. The
+following reference can be used to update your configuration. Based on the drain
+`type`, we can pick a new handler:
 
-Arguments:
+1. For a type of `console`, `console_json`, or `console_json_terse`: a handler
+   with a class of `logging.StreamHandler` and a `stream` of `ext://sys.stdout`
+   or `ext://sys.stderr` should be used.
+2. For a type of `file` or `file_json`: a handler of `logging.FileHandler` with
+   a location of the file path should be used.
+3. For a type of `network_json_terse`: a handler of `synapse.logging.RemoteHandler`
+   with the host and port should be used.
 
-- `location`: Either `stdout` or `stderr`.
+Then based on the drain `type` we can pick a new formatter:
 
-### `file`
+1. For a type of `console` or `file` no formatter is necessary.
+2. For a type of `console_json` or `file_json`: a formatter of
+   `synapse.logging.JsonFormatter` should be used.
+3. For a type of `console_json_terse` or `network_json_terse`: a formatter of
+   `synapse.logging.TerseJsonFormatter` should be used.
 
-Outputs human-readable logs to a file.
+For each new handler and formatter they should be added to the logging configuration
+and then assigned to either a logger or the root logger.
 
-Arguments:
+An example legacy configuration:
 
-- `location`: An absolute path to the file to log to.
+```yaml
+structured: true
 
-### `file_json`
+loggers:
+    synapse:
+        level: INFO
+    synapse.storage.SQL:
+        level: WARNING
 
-Outputs machine-readable logs to a file.
+drains:
+    console:
+        type: console
+        location: stdout
+    file:
+        type: file_json
+        location: homeserver.log
+```
 
-Arguments:
+Would be converted into a new configuration:
 
-- `location`: An absolute path to the file to log to.
+```yaml
+version: 1
 
-### `network_json_terse`
+formatters:
+    json:
+        class: synapse.logging.JsonFormatter
 
-Delivers machine-readable JSON logs to a log aggregator over TCP. This is
-compatible with LogStash's TCP input with the codec set to `json_lines`.
+handlers:
+    console:
+        class: logging.StreamHandler
+        location: ext://sys.stdout
+    file:
+        class: logging.FileHandler
+        formatter: json
+        filename: homeserver.log
 
-Arguments:
+loggers:
+    synapse:
+        level: INFO
+        handlers: [console, file]
+    synapse.storage.SQL:
+        level: WARNING
+```
 
-- `host`: Hostname or IP address of the log aggregator.
-- `port`: Numerical port to contact on the host.
\ No newline at end of file
+The new logging configuration is a bit more verbose, but significantly more
+flexible. It allows for configuration that were not previously possible, such as
+sending plain logs over the network, or using different handlers for different
+modules.
diff --git a/scripts-dev/lint.sh b/scripts-dev/lint.sh
index f141805519..f328ab57d5 100755
--- a/scripts-dev/lint.sh
+++ b/scripts-dev/lint.sh
@@ -80,7 +80,7 @@ else
   # then lint everything!
   if [[ -z ${files+x} ]]; then
     # Lint all source code files and directories
-    files=("synapse" "tests" "scripts-dev" "scripts" "contrib" "synctl" "setup.py")
+    files=("synapse" "tests" "scripts-dev" "scripts" "contrib" "synctl" "setup.py" "synmark")
   fi
 fi
 
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 6b7be28aee..d4e887a3e0 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -23,7 +23,6 @@ from string import Template
 import yaml
 
 from twisted.logger import (
-    ILogObserver,
     LogBeginner,
     STDLibLogObserver,
     eventAsText,
@@ -32,11 +31,9 @@ from twisted.logger import (
 
 import synapse
 from synapse.app import _base as appbase
-from synapse.logging._structured import (
-    reload_structured_logging,
-    setup_structured_logging,
-)
+from synapse.logging._structured import setup_structured_logging
 from synapse.logging.context import LoggingContextFilter
+from synapse.logging.filter import MetadataFilter
 from synapse.util.versionstring import get_version_string
 
 from ._base import Config, ConfigError
@@ -48,7 +45,11 @@ DEFAULT_LOG_CONFIG = Template(
 # This is a YAML file containing a standard Python logging configuration
 # dictionary. See [1] for details on the valid settings.
 #
+# Synapse also supports structured logging for machine readable logs which can
+# be ingested by ELK stacks. See [2] for details.
+#
 # [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
+# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md
 
 version: 1
 
@@ -176,11 +177,11 @@ class LoggingConfig(Config):
                 log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
 
 
-def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
+def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) -> None:
     """
-    Set up Python stdlib logging.
+    Set up Python standard library logging.
     """
-    if log_config is None:
+    if log_config_path is None:
         log_format = (
             "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
             " - %(message)s"
@@ -196,7 +197,8 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
         handler.setFormatter(formatter)
         logger.addHandler(handler)
     else:
-        logging.config.dictConfig(log_config)
+        # Load the logging configuration.
+        _load_logging_config(log_config_path)
 
     # We add a log record factory that runs all messages through the
     # LoggingContextFilter so that we get the context *at the time we log*
@@ -204,12 +206,14 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
     # filter options, but care must when using e.g. MemoryHandler to buffer
     # writes.
 
-    log_filter = LoggingContextFilter(request="")
+    log_context_filter = LoggingContextFilter(request="")
+    log_metadata_filter = MetadataFilter({"server_name": config.server_name})
     old_factory = logging.getLogRecordFactory()
 
     def factory(*args, **kwargs):
         record = old_factory(*args, **kwargs)
-        log_filter.filter(record)
+        log_context_filter.filter(record)
+        log_metadata_filter.filter(record)
         return record
 
     logging.setLogRecordFactory(factory)
@@ -255,21 +259,40 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
     if not config.no_redirect_stdio:
         print("Redirected stdout/stderr to logs")
 
-    return observer
-
 
-def _reload_stdlib_logging(*args, log_config=None):
-    logger = logging.getLogger("")
+def _load_logging_config(log_config_path: str) -> None:
+    """
+    Configure logging from a log config path.
+    """
+    with open(log_config_path, "rb") as f:
+        log_config = yaml.safe_load(f.read())
 
     if not log_config:
-        logger.warning("Reloaded a blank config?")
+        logging.warning("Loaded a blank logging config?")
+
+    # If the old structured logging configuration is being used, convert it to
+    # the new style configuration.
+    if "structured" in log_config and log_config.get("structured"):
+        log_config = setup_structured_logging(log_config)
 
     logging.config.dictConfig(log_config)
 
 
+def _reload_logging_config(log_config_path):
+    """
+    Reload the log configuration from the file and apply it.
+    """
+    # If no log config path was given, it cannot be reloaded.
+    if log_config_path is None:
+        return
+
+    _load_logging_config(log_config_path)
+    logging.info("Reloaded log config from %s due to SIGHUP", log_config_path)
+
+
 def setup_logging(
     hs, config, use_worker_options=False, logBeginner: LogBeginner = globalLogBeginner
-) -> ILogObserver:
+) -> None:
     """
     Set up the logging subsystem.
 
@@ -282,41 +305,18 @@ def setup_logging(
 
         logBeginner: The Twisted logBeginner to use.
 
-    Returns:
-        The "root" Twisted Logger observer, suitable for sending logs to from a
-        Logger instance.
     """
-    log_config = config.worker_log_config if use_worker_options else config.log_config
-
-    def read_config(*args, callback=None):
-        if log_config is None:
-            return None
-
-        with open(log_config, "rb") as f:
-            log_config_body = yaml.safe_load(f.read())
-
-        if callback:
-            callback(log_config=log_config_body)
-            logging.info("Reloaded log config from %s due to SIGHUP", log_config)
-
-        return log_config_body
+    log_config_path = (
+        config.worker_log_config if use_worker_options else config.log_config
+    )
 
-    log_config_body = read_config()
+    # Perform one-time logging configuration.
+    _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
+    # Add a SIGHUP handler to reload the logging configuration, if one is available.
+    appbase.register_sighup(_reload_logging_config, log_config_path)
 
-    if log_config_body and log_config_body.get("structured") is True:
-        logger = setup_structured_logging(
-            hs, config, log_config_body, logBeginner=logBeginner
-        )
-        appbase.register_sighup(read_config, callback=reload_structured_logging)
-    else:
-        logger = _setup_stdlib_logging(config, log_config_body, logBeginner=logBeginner)
-        appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
-
-    # make sure that the first thing we log is a thing we can grep backwards
-    # for
+    # Log immediately so we can grep backwards.
     logging.warning("***** STARTING SERVER *****")
     logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
     logging.info("Server hostname: %s", config.server_name)
     logging.info("Instance name: %s", hs.get_instance_name())
-
-    return logger
diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py
index e69de29bb2..b28b7b2ef7 100644
--- a/synapse/logging/__init__.py
+++ b/synapse/logging/__init__.py
@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# These are imported to allow for nicer logging configuration files.
+from synapse.logging._remote import RemoteHandler
+from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
+
+__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 0caf325916..ba45424f02 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import sys
 import traceback
 from collections import deque
@@ -21,6 +22,7 @@ from math import floor
 from typing import Callable, Optional
 
 import attr
+from typing_extensions import Deque
 from zope.interface import implementer
 
 from twisted.application.internet import ClientService
@@ -32,7 +34,8 @@ from twisted.internet.endpoints import (
 )
 from twisted.internet.interfaces import IPushProducer, ITransport
 from twisted.internet.protocol import Factory, Protocol
-from twisted.logger import ILogObserver, Logger, LogLevel
+
+logger = logging.getLogger(__name__)
 
 
 @attr.s
@@ -45,11 +48,11 @@ class LogProducer:
     Args:
         buffer: Log buffer to read logs from.
         transport: Transport to write to.
-        format_event: A callable to format the log entry to a string.
+        format: A callable to format the log record to a string.
     """
 
     transport = attr.ib(type=ITransport)
-    format_event = attr.ib(type=Callable[[dict], str])
+    _format = attr.ib(type=Callable[[logging.LogRecord], str])
     _buffer = attr.ib(type=deque)
     _paused = attr.ib(default=False, type=bool, init=False)
 
@@ -61,16 +64,19 @@ class LogProducer:
         self._buffer = deque()
 
     def resumeProducing(self):
+        # If we're already producing, nothing to do.
         self._paused = False
 
+        # Loop until paused.
         while self._paused is False and (self._buffer and self.transport.connected):
             try:
-                # Request the next event and format it.
-                event = self._buffer.popleft()
-                msg = self.format_event(event)
+                # Request the next record and format it.
+                record = self._buffer.popleft()
+                msg = self._format(record)
 
                 # Send it as a new line over the transport.
                 self.transport.write(msg.encode("utf8"))
+                self.transport.write(b"\n")
             except Exception:
                 # Something has gone wrong writing to the transport -- log it
                 # and break out of the while.
@@ -78,60 +84,63 @@ class LogProducer:
                 break
 
 
-@attr.s
-@implementer(ILogObserver)
-class TCPLogObserver:
+class RemoteHandler(logging.Handler):
     """
-    An IObserver that writes JSON logs to a TCP target.
+    An logging handler that writes logs to a TCP target.
 
     Args:
-        hs (HomeServer): The homeserver that is being logged for.
         host: The host of the logging target.
         port: The logging target's port.
-        format_event: A callable to format the log entry to a string.
         maximum_buffer: The maximum buffer size.
     """
 
-    hs = attr.ib()
-    host = attr.ib(type=str)
-    port = attr.ib(type=int)
-    format_event = attr.ib(type=Callable[[dict], str])
-    maximum_buffer = attr.ib(type=int)
-    _buffer = attr.ib(default=attr.Factory(deque), type=deque)
-    _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
-    _logger = attr.ib(default=attr.Factory(Logger))
-    _producer = attr.ib(default=None, type=Optional[LogProducer])
-
-    def start(self) -> None:
+    def __init__(
+        self,
+        host: str,
+        port: int,
+        maximum_buffer: int = 1000,
+        level=logging.NOTSET,
+        _reactor=None,
+    ):
+        super().__init__(level=level)
+        self.host = host
+        self.port = port
+        self.maximum_buffer = maximum_buffer
+
+        self._buffer = deque()  # type: Deque[logging.LogRecord]
+        self._connection_waiter = None  # type: Optional[Deferred]
+        self._producer = None  # type: Optional[LogProducer]
 
         # Connect without DNS lookups if it's a direct IP.
+        if _reactor is None:
+            from twisted.internet import reactor
+
+            _reactor = reactor
+
         try:
             ip = ip_address(self.host)
             if isinstance(ip, IPv4Address):
-                endpoint = TCP4ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
             elif isinstance(ip, IPv6Address):
-                endpoint = TCP6ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
             else:
                 raise ValueError("Unknown IP address provided: %s" % (self.host,))
         except ValueError:
-            endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+            endpoint = HostnameEndpoint(_reactor, self.host, self.port)
 
         factory = Factory.forProtocol(Protocol)
-        self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+        self._service = ClientService(endpoint, factory, clock=_reactor)
         self._service.startService()
         self._connect()
 
-    def stop(self):
+    def close(self):
         self._service.stopService()
 
     def _connect(self) -> None:
         """
         Triggers an attempt to connect then write to the remote if not already writing.
         """
+        # Do not attempt to open multiple connections.
         if self._connection_waiter:
             return
 
@@ -158,9 +167,7 @@ class TCPLogObserver:
 
             # Make a new producer and start it.
             self._producer = LogProducer(
-                buffer=self._buffer,
-                transport=r.transport,
-                format_event=self.format_event,
+                buffer=self._buffer, transport=r.transport, format=self.format,
             )
             r.transport.registerProducer(self._producer, True)
             self._producer.resumeProducing()
@@ -168,19 +175,19 @@ class TCPLogObserver:
 
     def _handle_pressure(self) -> None:
         """
-        Handle backpressure by shedding events.
+        Handle backpressure by shedding records.
 
         The buffer will, in this order, until the buffer is below the maximum:
-            - Shed DEBUG events
-            - Shed INFO events
-            - Shed the middle 50% of the events.
+            - Shed DEBUG records.
+            - Shed INFO records.
+            - Shed the middle 50% of the records.
         """
         if len(self._buffer) <= self.maximum_buffer:
             return
 
         # Strip out DEBUGs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer)
+            filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -188,7 +195,7 @@ class TCPLogObserver:
 
         # Strip out INFOs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.info, self._buffer)
+            filter(lambda record: record.levelno > logging.INFO, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -209,17 +216,17 @@ class TCPLogObserver:
 
         self._buffer.extend(reversed(end_buffer))
 
-    def __call__(self, event: dict) -> None:
-        self._buffer.append(event)
+    def emit(self, record: logging.LogRecord) -> None:
+        self._buffer.append(record)
 
         # Handle backpressure, if it exists.
         try:
             self._handle_pressure()
         except Exception:
-            # If handling backpressure fails,clear the buffer and log the
+            # If handling backpressure fails, clear the buffer and log the
             # exception.
             self._buffer.clear()
-            self._logger.failure("Failed clearing backpressure")
+            logger.warning("Failed clearing backpressure")
 
         # Try and write immediately.
         self._connect()
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 0fc2ea609e..14d9c104c2 100644
--- a/synapse/logging/_structured.py
+++ b/synapse/logging/_structured.py
@@ -12,138 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import logging
 import os.path
-import sys
-import typing
-import warnings
-from typing import List
+from typing import Any, Dict, Generator, Optional, Tuple
 
-import attr
-from constantly import NamedConstant, Names, ValueConstant, Values
-from zope.interface import implementer
-
-from twisted.logger import (
-    FileLogObserver,
-    FilteringLogObserver,
-    ILogObserver,
-    LogBeginner,
-    Logger,
-    LogLevel,
-    LogLevelFilterPredicate,
-    LogPublisher,
-    eventAsText,
-    jsonFileLogObserver,
-)
+from constantly import NamedConstant, Names
 
 from synapse.config._base import ConfigError
-from synapse.logging._terse_json import (
-    TerseJSONToConsoleLogObserver,
-    TerseJSONToTCPLogObserver,
-)
-from synapse.logging.context import current_context
-
-
-def stdlib_log_level_to_twisted(level: str) -> LogLevel:
-    """
-    Convert a stdlib log level to Twisted's log level.
-    """
-    lvl = level.lower().replace("warning", "warn")
-    return LogLevel.levelWithName(lvl)
-
-
-@attr.s
-@implementer(ILogObserver)
-class LogContextObserver:
-    """
-    An ILogObserver which adds Synapse-specific log context information.
-
-    Attributes:
-        observer (ILogObserver): The target parent observer.
-    """
-
-    observer = attr.ib()
-
-    def __call__(self, event: dict) -> None:
-        """
-        Consume a log event and emit it to the parent observer after filtering
-        and adding log context information.
-
-        Args:
-            event (dict)
-        """
-        # Filter out some useless events that Twisted outputs
-        if "log_text" in event:
-            if event["log_text"].startswith("DNSDatagramProtocol starting on "):
-                return
-
-            if event["log_text"].startswith("(UDP Port "):
-                return
-
-            if event["log_text"].startswith("Timing out client") or event[
-                "log_format"
-            ].startswith("Timing out client"):
-                return
-
-        context = current_context()
-
-        # Copy the context information to the log event.
-        context.copy_to_twisted_log_entry(event)
-
-        self.observer(event)
-
-
-class PythonStdlibToTwistedLogger(logging.Handler):
-    """
-    Transform a Python stdlib log message into a Twisted one.
-    """
-
-    def __init__(self, observer, *args, **kwargs):
-        """
-        Args:
-            observer (ILogObserver): A Twisted logging observer.
-            *args, **kwargs: Args/kwargs to be passed to logging.Handler.
-        """
-        self.observer = observer
-        super().__init__(*args, **kwargs)
-
-    def emit(self, record: logging.LogRecord) -> None:
-        """
-        Emit a record to Twisted's observer.
-
-        Args:
-            record (logging.LogRecord)
-        """
-
-        self.observer(
-            {
-                "log_time": record.created,
-                "log_text": record.getMessage(),
-                "log_format": "{log_text}",
-                "log_namespace": record.name,
-                "log_level": stdlib_log_level_to_twisted(record.levelname),
-            }
-        )
-
-
-def SynapseFileLogObserver(outFile: typing.IO[str]) -> FileLogObserver:
-    """
-    A log observer that formats events like the traditional log formatter and
-    sends them to `outFile`.
-
-    Args:
-        outFile (file object): The file object to write to.
-    """
-
-    def formatEvent(_event: dict) -> str:
-        event = dict(_event)
-        event["log_level"] = event["log_level"].name.upper()
-        event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
-            event.get("log_format", "{log_text}") or "{log_text}"
-        )
-        return eventAsText(event, includeSystem=False) + "\n"
-
-    return FileLogObserver(outFile, formatEvent)
 
 
 class DrainType(Names):
@@ -155,30 +29,12 @@ class DrainType(Names):
     NETWORK_JSON_TERSE = NamedConstant()
 
 
-class OutputPipeType(Values):
-    stdout = ValueConstant(sys.__stdout__)
-    stderr = ValueConstant(sys.__stderr__)
-
-
-@attr.s
-class DrainConfiguration:
-    name = attr.ib()
-    type = attr.ib()
-    location = attr.ib()
-    options = attr.ib(default=None)
-
-
-@attr.s
-class NetworkJSONTerseOptions:
-    maximum_buffer = attr.ib(type=int)
-
-
-DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
+DEFAULT_LOGGERS = {"synapse": {"level": "info"}}
 
 
 def parse_drain_configs(
     drains: dict,
-) -> typing.Generator[DrainConfiguration, None, None]:
+) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
     """
     Parse the drain configurations.
 
@@ -186,11 +42,12 @@ def parse_drain_configs(
         drains (dict): A list of drain configurations.
 
     Yields:
-        DrainConfiguration instances.
+        dict instances representing a logging handler.
 
     Raises:
         ConfigError: If any of the drain configuration items are invalid.
     """
+
     for name, config in drains.items():
         if "type" not in config:
             raise ConfigError("Logging drains require a 'type' key.")
@@ -202,6 +59,18 @@ def parse_drain_configs(
                 "%s is not a known logging drain type." % (config["type"],)
             )
 
+        # Either use the default formatter or the tersejson one.
+        if logging_type in (DrainType.CONSOLE_JSON, DrainType.FILE_JSON,):
+            formatter = "json"  # type: Optional[str]
+        elif logging_type in (
+            DrainType.CONSOLE_JSON_TERSE,
+            DrainType.NETWORK_JSON_TERSE,
+        ):
+            formatter = "tersejson"
+        else:
+            # A formatter of None implies using the default formatter.
+            formatter = None
+
         if logging_type in [
             DrainType.CONSOLE,
             DrainType.CONSOLE_JSON,
@@ -217,9 +86,11 @@ def parse_drain_configs(
                     % (logging_type,)
                 )
 
-            pipe = OutputPipeType.lookupByName(location).value
-
-            yield DrainConfiguration(name=name, type=logging_type, location=pipe)
+            yield name, {
+                "class": "logging.StreamHandler",
+                "formatter": formatter,
+                "stream": "ext://sys." + location,
+            }
 
         elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
             if "location" not in config:
@@ -233,18 +104,25 @@ def parse_drain_configs(
                     "File paths need to be absolute, '%s' is a relative path"
                     % (location,)
                 )
-            yield DrainConfiguration(name=name, type=logging_type, location=location)
+
+            yield name, {
+                "class": "logging.FileHandler",
+                "formatter": formatter,
+                "filename": location,
+            }
 
         elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
             host = config.get("host")
             port = config.get("port")
             maximum_buffer = config.get("maximum_buffer", 1000)
-            yield DrainConfiguration(
-                name=name,
-                type=logging_type,
-                location=(host, port),
-                options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
-            )
+
+            yield name, {
+                "class": "synapse.logging.RemoteHandler",
+                "formatter": formatter,
+                "host": host,
+                "port": port,
+                "maximum_buffer": maximum_buffer,
+            }
 
         else:
             raise ConfigError(
@@ -253,126 +131,29 @@ def parse_drain_configs(
             )
 
 
-class StoppableLogPublisher(LogPublisher):
+def setup_structured_logging(log_config: dict,) -> dict:
     """
-    A log publisher that can tell its observers to shut down any external
-    communications.
-    """
-
-    def stop(self):
-        for obs in self._observers:
-            if hasattr(obs, "stop"):
-                obs.stop()
-
-
-def setup_structured_logging(
-    hs,
-    config,
-    log_config: dict,
-    logBeginner: LogBeginner,
-    redirect_stdlib_logging: bool = True,
-) -> LogPublisher:
-    """
-    Set up Twisted's structured logging system.
-
-    Args:
-        hs: The homeserver to use.
-        config (HomeserverConfig): The configuration of the Synapse homeserver.
-        log_config (dict): The log configuration to use.
+    Convert a legacy structured logging configuration (from Synapse < v1.23.0)
+    to one compatible with the new standard library handlers.
     """
-    if config.no_redirect_stdio:
-        raise ConfigError(
-            "no_redirect_stdio cannot be defined using structured logging."
-        )
-
-    logger = Logger()
-
     if "drains" not in log_config:
         raise ConfigError("The logging configuration requires a list of drains.")
 
-    observers = []  # type: List[ILogObserver]
-
-    for observer in parse_drain_configs(log_config["drains"]):
-        # Pipe drains
-        if observer.type == DrainType.CONSOLE:
-            logger.debug(
-                "Starting up the {name} console logger drain", name=observer.name
-            )
-            observers.append(SynapseFileLogObserver(observer.location))
-        elif observer.type == DrainType.CONSOLE_JSON:
-            logger.debug(
-                "Starting up the {name} JSON console logger drain", name=observer.name
-            )
-            observers.append(jsonFileLogObserver(observer.location))
-        elif observer.type == DrainType.CONSOLE_JSON_TERSE:
-            logger.debug(
-                "Starting up the {name} terse JSON console logger drain",
-                name=observer.name,
-            )
-            observers.append(
-                TerseJSONToConsoleLogObserver(observer.location, metadata={})
-            )
-
-        # File drains
-        elif observer.type == DrainType.FILE:
-            logger.debug("Starting up the {name} file logger drain", name=observer.name)
-            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
-            observers.append(SynapseFileLogObserver(log_file))
-        elif observer.type == DrainType.FILE_JSON:
-            logger.debug(
-                "Starting up the {name} JSON file logger drain", name=observer.name
-            )
-            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
-            observers.append(jsonFileLogObserver(log_file))
-
-        elif observer.type == DrainType.NETWORK_JSON_TERSE:
-            metadata = {"server_name": hs.config.server_name}
-            log_observer = TerseJSONToTCPLogObserver(
-                hs=hs,
-                host=observer.location[0],
-                port=observer.location[1],
-                metadata=metadata,
-                maximum_buffer=observer.options.maximum_buffer,
-            )
-            log_observer.start()
-            observers.append(log_observer)
-        else:
-            # We should never get here, but, just in case, throw an error.
-            raise ConfigError("%s drain type cannot be configured" % (observer.type,))
-
-    publisher = StoppableLogPublisher(*observers)
-    log_filter = LogLevelFilterPredicate()
-
-    for namespace, namespace_config in log_config.get(
-        "loggers", DEFAULT_LOGGERS
-    ).items():
-        # Set the log level for twisted.logger.Logger namespaces
-        log_filter.setLogLevelForNamespace(
-            namespace,
-            stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
-        )
-
-        # Also set the log levels for the stdlib logger namespaces, to prevent
-        # them getting to PythonStdlibToTwistedLogger and having to be formatted
-        if "level" in namespace_config:
-            logging.getLogger(namespace).setLevel(namespace_config.get("level"))
-
-    f = FilteringLogObserver(publisher, [log_filter])
-    lco = LogContextObserver(f)
-
-    if redirect_stdlib_logging:
-        stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
-        stdliblogger = logging.getLogger()
-        stdliblogger.addHandler(stuff_into_twisted)
-
-    # Always redirect standard I/O, otherwise other logging outputs might miss
-    # it.
-    logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
+    new_config = {
+        "version": 1,
+        "formatters": {
+            "json": {"class": "synapse.logging.JsonFormatter"},
+            "tersejson": {"class": "synapse.logging.TerseJsonFormatter"},
+        },
+        "handlers": {},
+        "loggers": log_config.get("loggers", DEFAULT_LOGGERS),
+        "root": {"handlers": []},
+    }
 
-    return publisher
+    for handler_name, handler in parse_drain_configs(log_config["drains"]):
+        new_config["handlers"][handler_name] = handler
 
+        # Add each handler to the root logger.
+        new_config["root"]["handlers"].append(handler_name)
 
-def reload_structured_logging(*args, log_config=None) -> None:
-    warnings.warn(
-        "Currently the structured logging system can not be reloaded, doing nothing"
-    )
+    return new_config
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 9b46956ca9..2fbf5549a1 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -16,141 +16,65 @@
 """
 Log formatters that output terse JSON.
 """
-
 import json
-from typing import IO
-
-from twisted.logger import FileLogObserver
-
-from synapse.logging._remote import TCPLogObserver
+import logging
 
 _encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
 
-
-def flatten_event(event: dict, metadata: dict, include_time: bool = False):
-    """
-    Flatten a Twisted logging event to an dictionary capable of being sent
-    as a log event to a logging aggregation system.
-
-    The format is vastly simplified and is not designed to be a "human readable
-    string" in the sense that traditional logs are. Instead, the structure is
-    optimised for searchability and filtering, with human-understandable log
-    keys.
-
-    Args:
-        event (dict): The Twisted logging event we are flattening.
-        metadata (dict): Additional data to include with each log message. This
-            can be information like the server name. Since the target log
-            consumer does not know who we are other than by host IP, this
-            allows us to forward through static information.
-        include_time (bool): Should we include the `time` key? If False, the
-            event time is stripped from the event.
-    """
-    new_event = {}
-
-    # If it's a failure, make the new event's log_failure be the traceback text.
-    if "log_failure" in event:
-        new_event["log_failure"] = event["log_failure"].getTraceback()
-
-    # If it's a warning, copy over a string representation of the warning.
-    if "warning" in event:
-        new_event["warning"] = str(event["warning"])
-
-    # Stdlib logging events have "log_text" as their human-readable portion,
-    # Twisted ones have "log_format". For now, include the log_format, so that
-    # context only given in the log format (e.g. what is being logged) is
-    # available.
-    if "log_text" in event:
-        new_event["log"] = event["log_text"]
-    else:
-        new_event["log"] = event["log_format"]
-
-    # We want to include the timestamp when forwarding over the network, but
-    # exclude it when we are writing to stdout. This is because the log ingester
-    # (e.g. logstash, fluentd) can add its own timestamp.
-    if include_time:
-        new_event["time"] = round(event["log_time"], 2)
-
-    # Convert the log level to a textual representation.
-    new_event["level"] = event["log_level"].name.upper()
-
-    # Ignore these keys, and do not transfer them over to the new log object.
-    # They are either useless (isError), transferred manually above (log_time,
-    # log_level, etc), or contain Python objects which are not useful for output
-    # (log_logger, log_source).
-    keys_to_delete = [
-        "isError",
-        "log_failure",
-        "log_format",
-        "log_level",
-        "log_logger",
-        "log_source",
-        "log_system",
-        "log_time",
-        "log_text",
-        "observer",
-        "warning",
-    ]
-
-    # If it's from the Twisted legacy logger (twisted.python.log), it adds some
-    # more keys we want to purge.
-    if event.get("log_namespace") == "log_legacy":
-        keys_to_delete.extend(["message", "system", "time"])
-
-    # Rather than modify the dictionary in place, construct a new one with only
-    # the content we want. The original event should be considered 'frozen'.
-    for key in event.keys():
-
-        if key in keys_to_delete:
-            continue
-
-        if isinstance(event[key], (str, int, bool, float)) or event[key] is None:
-            # If it's a plain type, include it as is.
-            new_event[key] = event[key]
-        else:
-            # If it's not one of those basic types, write out a string
-            # representation. This should probably be a warning in development,
-            # so that we are sure we are only outputting useful data.
-            new_event[key] = str(event[key])
-
-    # Add the metadata information to the event (e.g. the server_name).
-    new_event.update(metadata)
-
-    return new_event
-
-
-def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogObserver:
-    """
-    A log observer that formats events to a flattened JSON representation.
-
-    Args:
-        outFile: The file object to write to.
-        metadata: Metadata to be added to each log object.
-    """
-
-    def formatEvent(_event: dict) -> str:
-        flattened = flatten_event(_event, metadata)
-        return _encoder.encode(flattened) + "\n"
-
-    return FileLogObserver(outFile, formatEvent)
-
-
-def TerseJSONToTCPLogObserver(
-    hs, host: str, port: int, metadata: dict, maximum_buffer: int
-) -> FileLogObserver:
-    """
-    A log observer that formats events to a flattened JSON representation.
-
-    Args:
-        hs (HomeServer): The homeserver that is being logged for.
-        host: The host of the logging target.
-        port: The logging target's port.
-        metadata: Metadata to be added to each log object.
-        maximum_buffer: The maximum buffer size.
-    """
-
-    def formatEvent(_event: dict) -> str:
-        flattened = flatten_event(_event, metadata, include_time=True)
-        return _encoder.encode(flattened) + "\n"
-
-    return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer)
+# The properties of a standard LogRecord.
+_LOG_RECORD_ATTRIBUTES = {
+    "args",
+    "asctime",
+    "created",
+    "exc_info",
+    # exc_text isn't a public attribute, but is used to cache the result of formatException.
+    "exc_text",
+    "filename",
+    "funcName",
+    "levelname",
+    "levelno",
+    "lineno",
+    "message",
+    "module",
+    "msecs",
+    "msg",
+    "name",
+    "pathname",
+    "process",
+    "processName",
+    "relativeCreated",
+    "stack_info",
+    "thread",
+    "threadName",
+}
+
+
+class JsonFormatter(logging.Formatter):
+    def format(self, record: logging.LogRecord) -> str:
+        event = {
+            "log": record.getMessage(),
+            "namespace": record.name,
+            "level": record.levelname,
+        }
+
+        return self._format(record, event)
+
+    def _format(self, record: logging.LogRecord, event: dict) -> str:
+        # Add any extra attributes to the event.
+        for key, value in record.__dict__.items():
+            if key not in _LOG_RECORD_ATTRIBUTES:
+                event[key] = value
+
+        return _encoder.encode(event)
+
+
+class TerseJsonFormatter(JsonFormatter):
+    def format(self, record: logging.LogRecord) -> str:
+        event = {
+            "log": record.getMessage(),
+            "namespace": record.name,
+            "level": record.levelname,
+            "time": round(record.created, 2),
+        }
+
+        return self._format(record, event)
diff --git a/synapse/logging/filter.py b/synapse/logging/filter.py
new file mode 100644
index 0000000000..1baf8dd679
--- /dev/null
+++ b/synapse/logging/filter.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from typing_extensions import Literal
+
+
+class MetadataFilter(logging.Filter):
+    """Logging filter that adds constant values to each record.
+
+    Args:
+        metadata: Key-value pairs to add to each record.
+    """
+
+    def __init__(self, metadata: dict):
+        self._metadata = metadata
+
+    def filter(self, record: logging.LogRecord) -> Literal[True]:
+        for key, value in self._metadata.items():
+            setattr(record, key, value)
+        return True
diff --git a/synmark/__init__.py b/synmark/__init__.py
index 09bc7e7927..3d4ec3e184 100644
--- a/synmark/__init__.py
+++ b/synmark/__init__.py
@@ -21,45 +21,6 @@ except ImportError:
     from twisted.internet.pollreactor import PollReactor as Reactor
 from twisted.internet.main import installReactor
 
-from synapse.config.homeserver import HomeServerConfig
-from synapse.util import Clock
-
-from tests.utils import default_config, setup_test_homeserver
-
-
-async def make_homeserver(reactor, config=None):
-    """
-    Make a Homeserver suitable for running benchmarks against.
-
-    Args:
-        reactor: A Twisted reactor to run under.
-        config: A HomeServerConfig to use, or None.
-    """
-    cleanup_tasks = []
-    clock = Clock(reactor)
-
-    if not config:
-        config = default_config("test")
-
-    config_obj = HomeServerConfig()
-    config_obj.parse_config_dict(config, "", "")
-
-    hs = setup_test_homeserver(
-        cleanup_tasks.append, config=config_obj, reactor=reactor, clock=clock
-    )
-    stor = hs.get_datastore()
-
-    # Run the database background updates.
-    if hasattr(stor.db_pool.updates, "do_next_background_update"):
-        while not await stor.db_pool.updates.has_completed_background_updates():
-            await stor.db_pool.updates.do_next_background_update(1)
-
-    def cleanup():
-        for i in cleanup_tasks:
-            i()
-
-    return hs, clock.sleep, cleanup
-
 
 def make_reactor():
     """
diff --git a/synmark/__main__.py b/synmark/__main__.py
index 17df9ddeb7..de13c1a909 100644
--- a/synmark/__main__.py
+++ b/synmark/__main__.py
@@ -12,20 +12,20 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import sys
 from argparse import REMAINDER
 from contextlib import redirect_stderr
 from io import StringIO
 
 import pyperf
-from synmark import make_reactor
-from synmark.suites import SUITES
 
 from twisted.internet.defer import Deferred, ensureDeferred
 from twisted.logger import globalLogBeginner, textFileLogObserver
 from twisted.python.failure import Failure
 
+from synmark import make_reactor
+from synmark.suites import SUITES
+
 from tests.utils import setupdb
 
 
diff --git a/synmark/suites/logging.py b/synmark/suites/logging.py
index d8e4c7d58f..c9d9cf761e 100644
--- a/synmark/suites/logging.py
+++ b/synmark/suites/logging.py
@@ -13,20 +13,22 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import warnings
 from io import StringIO
 
 from mock import Mock
 
 from pyperf import perf_counter
-from synmark import make_homeserver
 
 from twisted.internet.defer import Deferred
 from twisted.internet.protocol import ServerFactory
-from twisted.logger import LogBeginner, Logger, LogPublisher
+from twisted.logger import LogBeginner, LogPublisher
 from twisted.protocols.basic import LineOnlyReceiver
 
-from synapse.logging._structured import setup_structured_logging
+from synapse.config.logger import _setup_stdlib_logging
+from synapse.logging import RemoteHandler
+from synapse.util import Clock
 
 
 class LineCounter(LineOnlyReceiver):
@@ -62,7 +64,15 @@ async def main(reactor, loops):
     logger_factory.on_done = Deferred()
     port = reactor.listenTCP(0, logger_factory, interface="127.0.0.1")
 
-    hs, wait, cleanup = await make_homeserver(reactor)
+    # A fake homeserver config.
+    class Config:
+        server_name = "synmark-" + str(loops)
+        no_redirect_stdio = True
+
+    hs_config = Config()
+
+    # To be able to sleep.
+    clock = Clock(reactor)
 
     errors = StringIO()
     publisher = LogPublisher()
@@ -72,47 +82,49 @@ async def main(reactor, loops):
     )
 
     log_config = {
-        "loggers": {"synapse": {"level": "DEBUG"}},
-        "drains": {
+        "version": 1,
+        "loggers": {"synapse": {"level": "DEBUG", "handlers": ["tersejson"]}},
+        "formatters": {"tersejson": {"class": "synapse.logging.TerseJsonFormatter"}},
+        "handlers": {
             "tersejson": {
-                "type": "network_json_terse",
+                "class": "synapse.logging.RemoteHandler",
                 "host": "127.0.0.1",
                 "port": port.getHost().port,
                 "maximum_buffer": 100,
+                "_reactor": reactor,
             }
         },
     }
 
-    logger = Logger(namespace="synapse.logging.test_terse_json", observer=publisher)
-    logging_system = setup_structured_logging(
-        hs, hs.config, log_config, logBeginner=beginner, redirect_stdlib_logging=False
+    logger = logging.getLogger("synapse.logging.test_terse_json")
+    _setup_stdlib_logging(
+        hs_config, log_config, logBeginner=beginner,
     )
 
     # Wait for it to connect...
-    await logging_system._observers[0]._service.whenConnected()
+    for handler in logging.getLogger("synapse").handlers:
+        if isinstance(handler, RemoteHandler):
+            break
+    else:
+        raise RuntimeError("Improperly configured: no RemoteHandler found.")
+
+    await handler._service.whenConnected()
 
     start = perf_counter()
 
     # Send a bunch of useful messages
     for i in range(0, loops):
-        logger.info("test message %s" % (i,))
-
-        if (
-            len(logging_system._observers[0]._buffer)
-            == logging_system._observers[0].maximum_buffer
-        ):
-            while (
-                len(logging_system._observers[0]._buffer)
-                > logging_system._observers[0].maximum_buffer / 2
-            ):
-                await wait(0.01)
+        logger.info("test message %s", i)
+
+        if len(handler._buffer) == handler.maximum_buffer:
+            while len(handler._buffer) > handler.maximum_buffer / 2:
+                await clock.sleep(0.01)
 
     await logger_factory.on_done
 
     end = perf_counter() - start
 
-    logging_system.stop()
+    handler.close()
     port.stopListening()
-    cleanup()
 
     return end
diff --git a/tests/logging/__init__.py b/tests/logging/__init__.py
index e69de29bb2..a58d51441c 100644
--- a/tests/logging/__init__.py
+++ b/tests/logging/__init__.py
@@ -0,0 +1,34 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+
+class LoggerCleanupMixin:
+    def get_logger(self, handler):
+        """
+        Attach a handler to a logger and add clean-ups to remove revert this.
+        """
+        # Create a logger and add the handler to it.
+        logger = logging.getLogger(__name__)
+        logger.addHandler(handler)
+
+        # Ensure the logger actually logs something.
+        logger.setLevel(logging.INFO)
+
+        # Ensure the logger gets cleaned-up appropriately.
+        self.addCleanup(logger.removeHandler, handler)
+        self.addCleanup(logger.setLevel, logging.NOTSET)
+
+        return logger
diff --git a/tests/logging/test_remote_handler.py b/tests/logging/test_remote_handler.py
new file mode 100644
index 0000000000..58ee1f2f3c
--- /dev/null
+++ b/tests/logging/test_remote_handler.py
@@ -0,0 +1,153 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from twisted.test.proto_helpers import AccumulatingProtocol
+
+from synapse.logging import RemoteHandler
+
+from tests.logging import LoggerCleanupMixin
+from tests.server import FakeTransport, get_clock
+from tests.unittest import TestCase
+
+
+def connect_logging_client(reactor, client_id):
+    # This is essentially tests.server.connect_client, but disabling autoflush on
+    # the client transport. This is necessary to avoid an infinite loop due to
+    # sending of data via the logging transport causing additional logs to be
+    # written.
+    factory = reactor.tcpClients.pop(client_id)[2]
+    client = factory.buildProtocol(None)
+    server = AccumulatingProtocol()
+    server.makeConnection(FakeTransport(client, reactor))
+    client.makeConnection(FakeTransport(server, reactor, autoflush=False))
+
+    return client, server
+
+
+class RemoteHandlerTestCase(LoggerCleanupMixin, TestCase):
+    def setUp(self):
+        self.reactor, _ = get_clock()
+
+    def test_log_output(self):
+        """
+        The remote handler delivers logs over TCP.
+        """
+        handler = RemoteHandler("127.0.0.1", 9000, _reactor=self.reactor)
+        logger = self.get_logger(handler)
+
+        logger.info("Hello there, %s!", "wally")
+
+        # Trigger the connection
+        client, server = connect_logging_client(self.reactor, 0)
+
+        # Trigger data being sent
+        client.transport.flush()
+
+        # One log message, with a single trailing newline
+        logs = server.data.decode("utf8").splitlines()
+        self.assertEqual(len(logs), 1)
+        self.assertEqual(server.data.count(b"\n"), 1)
+
+        # Ensure the data passed through properly.
+        self.assertEqual(logs[0], "Hello there, wally!")
+
+    def test_log_backpressure_debug(self):
+        """
+        When backpressure is hit, DEBUG logs will be shed.
+        """
+        handler = RemoteHandler(
+            "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
+        )
+        logger = self.get_logger(handler)
+
+        # Send some debug messages
+        for i in range(0, 3):
+            logger.debug("debug %s" % (i,))
+
+        # Send a bunch of useful messages
+        for i in range(0, 7):
+            logger.info("info %s" % (i,))
+
+        # The last debug message pushes it past the maximum buffer
+        logger.debug("too much debug")
+
+        # Allow the reconnection
+        client, server = connect_logging_client(self.reactor, 0)
+        client.transport.flush()
+
+        # Only the 7 infos made it through, the debugs were elided
+        logs = server.data.splitlines()
+        self.assertEqual(len(logs), 7)
+        self.assertNotIn(b"debug", server.data)
+
+    def test_log_backpressure_info(self):
+        """
+        When backpressure is hit, DEBUG and INFO logs will be shed.
+        """
+        handler = RemoteHandler(
+            "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
+        )
+        logger = self.get_logger(handler)
+
+        # Send some debug messages
+        for i in range(0, 3):
+            logger.debug("debug %s" % (i,))
+
+        # Send a bunch of useful messages
+        for i in range(0, 10):
+            logger.warning("warn %s" % (i,))
+
+        # Send a bunch of info messages
+        for i in range(0, 3):
+            logger.info("info %s" % (i,))
+
+        # The last debug message pushes it past the maximum buffer
+        logger.debug("too much debug")
+
+        # Allow the reconnection
+        client, server = connect_logging_client(self.reactor, 0)
+        client.transport.flush()
+
+        # The 10 warnings made it through, the debugs and infos were elided
+        logs = server.data.splitlines()
+        self.assertEqual(len(logs), 10)
+        self.assertNotIn(b"debug", server.data)
+        self.assertNotIn(b"info", server.data)
+
+    def test_log_backpressure_cut_middle(self):
+        """
+        When backpressure is hit, and no more DEBUG and INFOs cannot be culled,
+        it will cut the middle messages out.
+        """
+        handler = RemoteHandler(
+            "127.0.0.1", 9000, maximum_buffer=10, _reactor=self.reactor
+        )
+        logger = self.get_logger(handler)
+
+        # Send a bunch of useful messages
+        for i in range(0, 20):
+            logger.warning("warn %s" % (i,))
+
+        # Allow the reconnection
+        client, server = connect_logging_client(self.reactor, 0)
+        client.transport.flush()
+
+        # The first five and last five warnings made it through, the debugs and
+        # infos were elided
+        logs = server.data.decode("utf8").splitlines()
+        self.assertEqual(
+            ["warn %s" % (i,) for i in range(5)]
+            + ["warn %s" % (i,) for i in range(15, 20)],
+            logs,
+        )
diff --git a/tests/logging/test_structured.py b/tests/logging/test_structured.py
deleted file mode 100644
index d36f5f426c..0000000000
--- a/tests/logging/test_structured.py
+++ /dev/null
@@ -1,214 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2019 The Matrix.org Foundation C.I.C.
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import os
-import os.path
-import shutil
-import sys
-import textwrap
-
-from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile
-
-from synapse.config.logger import setup_logging
-from synapse.logging._structured import setup_structured_logging
-from synapse.logging.context import LoggingContext
-
-from tests.unittest import DEBUG, HomeserverTestCase
-
-
-class FakeBeginner:
-    def beginLoggingTo(self, observers, **kwargs):
-        self.observers = observers
-
-
-class StructuredLoggingTestBase:
-    """
-    Test base that registers a cleanup handler to reset the stdlib log handler
-    to 'unset'.
-    """
-
-    def prepare(self, reactor, clock, hs):
-        def _cleanup():
-            logging.getLogger("synapse").setLevel(logging.NOTSET)
-
-        self.addCleanup(_cleanup)
-
-
-class StructuredLoggingTestCase(StructuredLoggingTestBase, HomeserverTestCase):
-    """
-    Tests for Synapse's structured logging support.
-    """
-
-    def test_output_to_json_round_trip(self):
-        """
-        Synapse logs can be outputted to JSON and then read back again.
-        """
-        temp_dir = self.mktemp()
-        os.mkdir(temp_dir)
-        self.addCleanup(shutil.rmtree, temp_dir)
-
-        json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json"))
-
-        log_config = {
-            "drains": {"jsonfile": {"type": "file_json", "location": json_log_file}}
-        }
-
-        # Begin the logger with our config
-        beginner = FakeBeginner()
-        setup_structured_logging(
-            self.hs, self.hs.config, log_config, logBeginner=beginner
-        )
-
-        # Make a logger and send an event
-        logger = Logger(
-            namespace="tests.logging.test_structured", observer=beginner.observers[0]
-        )
-        logger.info("Hello there, {name}!", name="wally")
-
-        # Read the log file and check it has the event we sent
-        with open(json_log_file, "r") as f:
-            logged_events = list(eventsFromJSONLogFile(f))
-        self.assertEqual(len(logged_events), 1)
-
-        # The event pulled from the file should render fine
-        self.assertEqual(
-            eventAsText(logged_events[0], includeTimestamp=False),
-            "[tests.logging.test_structured#info] Hello there, wally!",
-        )
-
-    def test_output_to_text(self):
-        """
-        Synapse logs can be outputted to text.
-        """
-        temp_dir = self.mktemp()
-        os.mkdir(temp_dir)
-        self.addCleanup(shutil.rmtree, temp_dir)
-
-        log_file = os.path.abspath(os.path.join(temp_dir, "out.log"))
-
-        log_config = {"drains": {"file": {"type": "file", "location": log_file}}}
-
-        # Begin the logger with our config
-        beginner = FakeBeginner()
-        setup_structured_logging(
-            self.hs, self.hs.config, log_config, logBeginner=beginner
-        )
-
-        # Make a logger and send an event
-        logger = Logger(
-            namespace="tests.logging.test_structured", observer=beginner.observers[0]
-        )
-        logger.info("Hello there, {name}!", name="wally")
-
-        # Read the log file and check it has the event we sent
-        with open(log_file, "r") as f:
-            logged_events = f.read().strip().split("\n")
-        self.assertEqual(len(logged_events), 1)
-
-        # The event pulled from the file should render fine
-        self.assertTrue(
-            logged_events[0].endswith(
-                " - tests.logging.test_structured - INFO - None - Hello there, wally!"
-            )
-        )
-
-    def test_collects_logcontext(self):
-        """
-        Test that log outputs have the attached logging context.
-        """
-        log_config = {"drains": {}}
-
-        # Begin the logger with our config
-        beginner = FakeBeginner()
-        publisher = setup_structured_logging(
-            self.hs, self.hs.config, log_config, logBeginner=beginner
-        )
-
-        logs = []
-
-        publisher.addObserver(logs.append)
-
-        # Make a logger and send an event
-        logger = Logger(
-            namespace="tests.logging.test_structured", observer=beginner.observers[0]
-        )
-
-        with LoggingContext("testcontext", request="somereq"):
-            logger.info("Hello there, {name}!", name="steve")
-
-        self.assertEqual(len(logs), 1)
-        self.assertEqual(logs[0]["request"], "somereq")
-
-
-class StructuredLoggingConfigurationFileTestCase(
-    StructuredLoggingTestBase, HomeserverTestCase
-):
-    def make_homeserver(self, reactor, clock):
-
-        tempdir = self.mktemp()
-        os.mkdir(tempdir)
-        log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml"))
-        self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log"))
-
-        config = self.default_config()
-        config["log_config"] = log_config_file
-
-        with open(log_config_file, "w") as f:
-            f.write(
-                textwrap.dedent(
-                    """\
-                    structured: true
-
-                    drains:
-                        file:
-                            type: file_json
-                            location: %s
-                    """
-                    % (self.homeserver_log,)
-                )
-            )
-
-        self.addCleanup(self._sys_cleanup)
-
-        return self.setup_test_homeserver(config=config)
-
-    def _sys_cleanup(self):
-        sys.stdout = sys.__stdout__
-        sys.stderr = sys.__stderr__
-
-    # Do not remove! We need the logging system to be set other than WARNING.
-    @DEBUG
-    def test_log_output(self):
-        """
-        When a structured logging config is given, Synapse will use it.
-        """
-        beginner = FakeBeginner()
-        publisher = setup_logging(self.hs, self.hs.config, logBeginner=beginner)
-
-        # Make a logger and send an event
-        logger = Logger(namespace="tests.logging.test_structured", observer=publisher)
-
-        with LoggingContext("testcontext", request="somereq"):
-            logger.info("Hello there, {name}!", name="steve")
-
-        with open(self.homeserver_log, "r") as f:
-            logged_events = [
-                eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f)
-            ]
-
-        logs = "\n".join(logged_events)
-        self.assertTrue("***** STARTING SERVER *****" in logs)
-        self.assertTrue("Hello there, steve!" in logs)
diff --git a/tests/logging/test_terse_json.py b/tests/logging/test_terse_json.py
index fd128b88e0..73f469b802 100644
--- a/tests/logging/test_terse_json.py
+++ b/tests/logging/test_terse_json.py
@@ -14,57 +14,33 @@
 # limitations under the License.
 
 import json
-from collections import Counter
+import logging
+from io import StringIO
 
-from twisted.logger import Logger
+from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
 
-from synapse.logging._structured import setup_structured_logging
+from tests.logging import LoggerCleanupMixin
+from tests.unittest import TestCase
 
-from tests.server import connect_client
-from tests.unittest import HomeserverTestCase
 
-from .test_structured import FakeBeginner, StructuredLoggingTestBase
-
-
-class TerseJSONTCPTestCase(StructuredLoggingTestBase, HomeserverTestCase):
-    def test_log_output(self):
+class TerseJsonTestCase(LoggerCleanupMixin, TestCase):
+    def test_terse_json_output(self):
         """
-        The Terse JSON outputter delivers simplified structured logs over TCP.
+        The Terse JSON formatter converts log messages to JSON.
         """
-        log_config = {
-            "drains": {
-                "tersejson": {
-                    "type": "network_json_terse",
-                    "host": "127.0.0.1",
-                    "port": 8000,
-                }
-            }
-        }
-
-        # Begin the logger with our config
-        beginner = FakeBeginner()
-        setup_structured_logging(
-            self.hs, self.hs.config, log_config, logBeginner=beginner
-        )
-
-        logger = Logger(
-            namespace="tests.logging.test_terse_json", observer=beginner.observers[0]
-        )
-        logger.info("Hello there, {name}!", name="wally")
-
-        # Trigger the connection
-        self.pump()
+        output = StringIO()
 
-        _, server = connect_client(self.reactor, 0)
+        handler = logging.StreamHandler(output)
+        handler.setFormatter(TerseJsonFormatter())
+        logger = self.get_logger(handler)
 
-        # Trigger data being sent
-        self.pump()
+        logger.info("Hello there, %s!", "wally")
 
-        # One log message, with a single trailing newline
-        logs = server.data.decode("utf8").splitlines()
+        # One log message, with a single trailing newline.
+        data = output.getvalue()
+        logs = data.splitlines()
         self.assertEqual(len(logs), 1)
-        self.assertEqual(server.data.count(b"\n"), 1)
-
+        self.assertEqual(data.count("\n"), 1)
         log = json.loads(logs[0])
 
         # The terse logger should give us these keys.
@@ -72,163 +48,74 @@ class TerseJSONTCPTestCase(StructuredLoggingTestBase, HomeserverTestCase):
             "log",
             "time",
             "level",
-            "log_namespace",
-            "request",
-            "scope",
-            "server_name",
-            "name",
+            "namespace",
         ]
         self.assertCountEqual(log.keys(), expected_log_keys)
+        self.assertEqual(log["log"], "Hello there, wally!")
 
-        # It contains the data we expect.
-        self.assertEqual(log["name"], "wally")
-
-    def test_log_backpressure_debug(self):
+    def test_extra_data(self):
         """
-        When backpressure is hit, DEBUG logs will be shed.
+        Additional information can be included in the structured logging.
         """
-        log_config = {
-            "loggers": {"synapse": {"level": "DEBUG"}},
-            "drains": {
-                "tersejson": {
-                    "type": "network_json_terse",
-                    "host": "127.0.0.1",
-                    "port": 8000,
-                    "maximum_buffer": 10,
-                }
-            },
-        }
-
-        # Begin the logger with our config
-        beginner = FakeBeginner()
-        setup_structured_logging(
-            self.hs,
-            self.hs.config,
-            log_config,
-            logBeginner=beginner,
-            redirect_stdlib_logging=False,
-        )
-
-        logger = Logger(
-            namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
-        )
+        output = StringIO()
 
-        # Send some debug messages
-        for i in range(0, 3):
-            logger.debug("debug %s" % (i,))
+        handler = logging.StreamHandler(output)
+        handler.setFormatter(TerseJsonFormatter())
+        logger = self.get_logger(handler)
 
-        # Send a bunch of useful messages
-        for i in range(0, 7):
-            logger.info("test message %s" % (i,))
-
-        # The last debug message pushes it past the maximum buffer
-        logger.debug("too much debug")
-
-        # Allow the reconnection
-        _, server = connect_client(self.reactor, 0)
-        self.pump()
-
-        # Only the 7 infos made it through, the debugs were elided
-        logs = server.data.splitlines()
-        self.assertEqual(len(logs), 7)
-
-    def test_log_backpressure_info(self):
-        """
-        When backpressure is hit, DEBUG and INFO logs will be shed.
-        """
-        log_config = {
-            "loggers": {"synapse": {"level": "DEBUG"}},
-            "drains": {
-                "tersejson": {
-                    "type": "network_json_terse",
-                    "host": "127.0.0.1",
-                    "port": 8000,
-                    "maximum_buffer": 10,
-                }
-            },
-        }
-
-        # Begin the logger with our config
-        beginner = FakeBeginner()
-        setup_structured_logging(
-            self.hs,
-            self.hs.config,
-            log_config,
-            logBeginner=beginner,
-            redirect_stdlib_logging=False,
-        )
-
-        logger = Logger(
-            namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+        logger.info(
+            "Hello there, %s!", "wally", extra={"foo": "bar", "int": 3, "bool": True}
         )
 
-        # Send some debug messages
-        for i in range(0, 3):
-            logger.debug("debug %s" % (i,))
-
-        # Send a bunch of useful messages
-        for i in range(0, 10):
-            logger.warn("test warn %s" % (i,))
-
-        # Send a bunch of info messages
-        for i in range(0, 3):
-            logger.info("test message %s" % (i,))
-
-        # The last debug message pushes it past the maximum buffer
-        logger.debug("too much debug")
-
-        # Allow the reconnection
-        client, server = connect_client(self.reactor, 0)
-        self.pump()
+        # One log message, with a single trailing newline.
+        data = output.getvalue()
+        logs = data.splitlines()
+        self.assertEqual(len(logs), 1)
+        self.assertEqual(data.count("\n"), 1)
+        log = json.loads(logs[0])
 
-        # The 10 warnings made it through, the debugs and infos were elided
-        logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
-        self.assertEqual(len(logs), 10)
+        # The terse logger should give us these keys.
+        expected_log_keys = [
+            "log",
+            "time",
+            "level",
+            "namespace",
+            # The additional keys given via extra.
+            "foo",
+            "int",
+            "bool",
+        ]
+        self.assertCountEqual(log.keys(), expected_log_keys)
 
-        self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
+        # Check the values of the extra fields.
+        self.assertEqual(log["foo"], "bar")
+        self.assertEqual(log["int"], 3)
+        self.assertIs(log["bool"], True)
 
-    def test_log_backpressure_cut_middle(self):
+    def test_json_output(self):
         """
-        When backpressure is hit, and no more DEBUG and INFOs cannot be culled,
-        it will cut the middle messages out.
+        The Terse JSON formatter converts log messages to JSON.
         """
-        log_config = {
-            "loggers": {"synapse": {"level": "DEBUG"}},
-            "drains": {
-                "tersejson": {
-                    "type": "network_json_terse",
-                    "host": "127.0.0.1",
-                    "port": 8000,
-                    "maximum_buffer": 10,
-                }
-            },
-        }
-
-        # Begin the logger with our config
-        beginner = FakeBeginner()
-        setup_structured_logging(
-            self.hs,
-            self.hs.config,
-            log_config,
-            logBeginner=beginner,
-            redirect_stdlib_logging=False,
-        )
+        output = StringIO()
 
-        logger = Logger(
-            namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
-        )
+        handler = logging.StreamHandler(output)
+        handler.setFormatter(JsonFormatter())
+        logger = self.get_logger(handler)
 
-        # Send a bunch of useful messages
-        for i in range(0, 20):
-            logger.warn("test warn", num=i)
+        logger.info("Hello there, %s!", "wally")
 
-        # Allow the reconnection
-        client, server = connect_client(self.reactor, 0)
-        self.pump()
+        # One log message, with a single trailing newline.
+        data = output.getvalue()
+        logs = data.splitlines()
+        self.assertEqual(len(logs), 1)
+        self.assertEqual(data.count("\n"), 1)
+        log = json.loads(logs[0])
 
-        # The first five and last five warnings made it through, the debugs and
-        # infos were elided
-        logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
-        self.assertEqual(len(logs), 10)
-        self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
-        self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs])
+        # The terse logger should give us these keys.
+        expected_log_keys = [
+            "log",
+            "level",
+            "namespace",
+        ]
+        self.assertCountEqual(log.keys(), expected_log_keys)
+        self.assertEqual(log["log"], "Hello there, wally!")
diff --git a/tests/server.py b/tests/server.py
index ea9c22bc51..b97003fa5a 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -571,12 +571,10 @@ def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol
         reactor
         factory: The connecting factory to build.
     """
-    factory = reactor.tcpClients[client_id][2]
+    factory = reactor.tcpClients.pop(client_id)[2]
     client = factory.buildProtocol(None)
     server = AccumulatingProtocol()
     server.makeConnection(FakeTransport(client, reactor))
     client.makeConnection(FakeTransport(server, reactor))
 
-    reactor.tcpClients.pop(client_id)
-
     return client, server