summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/config/test_database.py52
-rw-r--r--tests/config/test_server.py101
-rw-r--r--tests/config/test_tls.py44
-rw-r--r--tests/federation/test_federation_server.py1
-rw-r--r--tests/http/federation/test_matrix_federation_agent.py62
-rw-r--r--tests/http/federation/test_srv_resolver.py8
-rw-r--r--tests/logging/__init__.py0
-rw-r--r--tests/logging/test_structured.py197
-rw-r--r--tests/logging/test_terse_json.py234
-rw-r--r--tests/server.py27
-rw-r--r--tests/test_visibility.py1
11 files changed, 717 insertions, 10 deletions
diff --git a/tests/config/test_database.py b/tests/config/test_database.py
new file mode 100644
index 0000000000..151d3006ac
--- /dev/null
+++ b/tests/config/test_database.py
@@ -0,0 +1,52 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import yaml
+
+from synapse.config.database import DatabaseConfig
+
+from tests import unittest
+
+
+class DatabaseConfigTestCase(unittest.TestCase):
+    def test_database_configured_correctly_no_database_conf_param(self):
+        conf = yaml.safe_load(
+            DatabaseConfig().generate_config_section("/data_dir_path", None)
+        )
+
+        expected_database_conf = {
+            "name": "sqlite3",
+            "args": {"database": "/data_dir_path/homeserver.db"},
+        }
+
+        self.assertEqual(conf["database"], expected_database_conf)
+
+    def test_database_configured_correctly_database_conf_param(self):
+
+        database_conf = {
+            "name": "my super fast datastore",
+            "args": {
+                "user": "matrix",
+                "password": "synapse_database_password",
+                "host": "synapse_database_host",
+                "database": "matrix",
+            },
+        }
+
+        conf = yaml.safe_load(
+            DatabaseConfig().generate_config_section("/data_dir_path", database_conf)
+        )
+
+        self.assertEqual(conf["database"], database_conf)
diff --git a/tests/config/test_server.py b/tests/config/test_server.py
index 1ca5ea54ca..a10d017120 100644
--- a/tests/config/test_server.py
+++ b/tests/config/test_server.py
@@ -13,7 +13,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.config.server import is_threepid_reserved
+import yaml
+
+from synapse.config.server import ServerConfig, is_threepid_reserved
 
 from tests import unittest
 
@@ -29,3 +31,100 @@ class ServerConfigTestCase(unittest.TestCase):
         self.assertTrue(is_threepid_reserved(config, user1))
         self.assertFalse(is_threepid_reserved(config, user3))
         self.assertFalse(is_threepid_reserved(config, user1_msisdn))
+
+    def test_unsecure_listener_no_listeners_open_private_ports_false(self):
+        conf = yaml.safe_load(
+            ServerConfig().generate_config_section(
+                "che.org", "/data_dir_path", False, None
+            )
+        )
+
+        expected_listeners = [
+            {
+                "port": 8008,
+                "tls": False,
+                "type": "http",
+                "x_forwarded": True,
+                "bind_addresses": ["::1", "127.0.0.1"],
+                "resources": [{"names": ["client", "federation"], "compress": False}],
+            }
+        ]
+
+        self.assertEqual(conf["listeners"], expected_listeners)
+
+    def test_unsecure_listener_no_listeners_open_private_ports_true(self):
+        conf = yaml.safe_load(
+            ServerConfig().generate_config_section(
+                "che.org", "/data_dir_path", True, None
+            )
+        )
+
+        expected_listeners = [
+            {
+                "port": 8008,
+                "tls": False,
+                "type": "http",
+                "x_forwarded": True,
+                "resources": [{"names": ["client", "federation"], "compress": False}],
+            }
+        ]
+
+        self.assertEqual(conf["listeners"], expected_listeners)
+
+    def test_listeners_set_correctly_open_private_ports_false(self):
+        listeners = [
+            {
+                "port": 8448,
+                "resources": [{"names": ["federation"]}],
+                "tls": True,
+                "type": "http",
+            },
+            {
+                "port": 443,
+                "resources": [{"names": ["client"]}],
+                "tls": False,
+                "type": "http",
+            },
+        ]
+
+        conf = yaml.safe_load(
+            ServerConfig().generate_config_section(
+                "this.one.listens", "/data_dir_path", True, listeners
+            )
+        )
+
+        self.assertEqual(conf["listeners"], listeners)
+
+    def test_listeners_set_correctly_open_private_ports_true(self):
+        listeners = [
+            {
+                "port": 8448,
+                "resources": [{"names": ["federation"]}],
+                "tls": True,
+                "type": "http",
+            },
+            {
+                "port": 443,
+                "resources": [{"names": ["client"]}],
+                "tls": False,
+                "type": "http",
+            },
+            {
+                "port": 1243,
+                "resources": [{"names": ["client"]}],
+                "tls": False,
+                "type": "http",
+                "bind_addresses": ["this_one_is_bound"],
+            },
+        ]
+
+        expected_listeners = listeners.copy()
+        expected_listeners[1]["bind_addresses"] = ["::1", "127.0.0.1"]
+
+        conf = yaml.safe_load(
+            ServerConfig().generate_config_section(
+                "this.one.listens", "/data_dir_path", True, listeners
+            )
+        )
+
+        self.assertEqual(conf["listeners"], expected_listeners)
diff --git a/tests/config/test_tls.py b/tests/config/test_tls.py
index 4f8a87a3df..8e0c4b9533 100644
--- a/tests/config/test_tls.py
+++ b/tests/config/test_tls.py
@@ -16,6 +16,8 @@
 
 import os
 
+import yaml
+
 from OpenSSL import SSL
 
 from synapse.config.tls import ConfigError, TlsConfig
@@ -191,3 +193,45 @@ s4niecZKPBizL6aucT59CsunNmmb5Glq8rlAcU+1ZTZZzGYqVYhF6axB9Qg=
         self.assertEqual(cf._verify_ssl._options & SSL.OP_NO_TLSv1, 0)
         self.assertEqual(cf._verify_ssl._options & SSL.OP_NO_TLSv1_1, 0)
         self.assertEqual(cf._verify_ssl._options & SSL.OP_NO_TLSv1_2, 0)
+
+    def test_acme_disabled_in_generated_config_no_acme_domain_provied(self):
+        """
+        Checks acme is disabled by default.
+        """
+        conf = TestConfig()
+        conf.read_config(
+            yaml.safe_load(
+                TestConfig().generate_config_section(
+                    "/config_dir_path",
+                    "my_super_secure_server",
+                    "/data_dir_path",
+                    "/tls_cert_path",
+                    "tls_private_key",
+                    None,  # This is the acme_domain
+                )
+            ),
+            "/config_dir_path",
+        )
+
+        self.assertFalse(conf.acme_enabled)
+
+    def test_acme_enabled_in_generated_config_domain_provided(self):
+        """
+        Checks acme is enabled if the acme_domain arg is set to some string.
+        """
+        conf = TestConfig()
+        conf.read_config(
+            yaml.safe_load(
+                TestConfig().generate_config_section(
+                    "/config_dir_path",
+                    "my_super_secure_server",
+                    "/data_dir_path",
+                    "/tls_cert_path",
+                    "tls_private_key",
+                    "my_supe_secure_server",  # This is the acme_domain
+                )
+            ),
+            "/config_dir_path",
+        )
+
+        self.assertTrue(conf.acme_enabled)
diff --git a/tests/federation/test_federation_server.py b/tests/federation/test_federation_server.py
index af15f4cc5a..b08be451aa 100644
--- a/tests/federation/test_federation_server.py
+++ b/tests/federation/test_federation_server.py
@@ -20,7 +20,6 @@ from synapse.federation.federation_server import server_matches_acl_event
 from tests import unittest
 
 
-@unittest.DEBUG
 class ServerACLsTestCase(unittest.TestCase):
     def test_blacklisted_server(self):
         e = _create_acl_event({"allow": ["*"], "deny": ["evil.com"]})
diff --git a/tests/http/federation/test_matrix_federation_agent.py b/tests/http/federation/test_matrix_federation_agent.py
index c55aad8e11..71d7025264 100644
--- a/tests/http/federation/test_matrix_federation_agent.py
+++ b/tests/http/federation/test_matrix_federation_agent.py
@@ -41,9 +41,9 @@ from synapse.http.federation.well_known_resolver import (
 from synapse.logging.context import LoggingContext
 from synapse.util.caches.ttlcache import TTLCache
 
+from tests import unittest
 from tests.http import TestServerTLSConnectionFactory, get_test_ca_cert_file
 from tests.server import FakeTransport, ThreadedMemoryReactorClock
-from tests.unittest import TestCase
 from tests.utils import default_config
 
 logger = logging.getLogger(__name__)
@@ -67,7 +67,7 @@ def get_connection_factory():
     return test_server_connection_factory
 
 
-class MatrixFederationAgentTests(TestCase):
+class MatrixFederationAgentTests(unittest.TestCase):
     def setUp(self):
         self.reactor = ThreadedMemoryReactorClock()
 
@@ -1069,8 +1069,64 @@ class MatrixFederationAgentTests(TestCase):
         r = self.successResultOf(fetch_d)
         self.assertEqual(r.delegated_server, None)
 
+    def test_srv_fallbacks(self):
+        """Test that other SRV results are tried if the first one fails.
+        """
+
+        self.mock_resolver.resolve_service.side_effect = lambda _: [
+            Server(host=b"target.com", port=8443),
+            Server(host=b"target.com", port=8444),
+        ]
+        self.reactor.lookups["target.com"] = "1.2.3.4"
+
+        test_d = self._make_get_request(b"matrix://testserv/foo/bar")
+
+        # Nothing happened yet
+        self.assertNoResult(test_d)
+
+        self.mock_resolver.resolve_service.assert_called_once_with(
+            b"_matrix._tcp.testserv"
+        )
+
+        # We should see an attempt to connect to the first server
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8443)
+
+        # Fonx the connection
+        client_factory.clientConnectionFailed(None, Exception("nope"))
+
+        # There's a 300ms delay in HostnameEndpoint
+        self.reactor.pump((0.4,))
+
+        # Hasn't failed yet
+        self.assertNoResult(test_d)
+
+        # We shouldnow see an attempt to connect to the second server
+        clients = self.reactor.tcpClients
+        self.assertEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8444)
+
+        # make a test server, and wire up the client
+        http_server = self._make_connection(client_factory, expected_sni=b"testserv")
+
+        self.assertEqual(len(http_server.requests), 1)
+        request = http_server.requests[0]
+        self.assertEqual(request.method, b"GET")
+        self.assertEqual(request.path, b"/foo/bar")
+        self.assertEqual(request.requestHeaders.getRawHeaders(b"host"), [b"testserv"])
+
+        # finish the request
+        request.finish()
+        self.reactor.pump((0.1,))
+        self.successResultOf(test_d)
+
 
-class TestCachePeriodFromHeaders(TestCase):
+class TestCachePeriodFromHeaders(unittest.TestCase):
     def test_cache_control(self):
         # uppercase
         self.assertEqual(
diff --git a/tests/http/federation/test_srv_resolver.py b/tests/http/federation/test_srv_resolver.py
index 3b885ef64b..df034ab237 100644
--- a/tests/http/federation/test_srv_resolver.py
+++ b/tests/http/federation/test_srv_resolver.py
@@ -83,8 +83,10 @@ class SrvResolverTestCase(unittest.TestCase):
 
         service_name = b"test_service.example.com"
 
-        entry = Mock(spec_set=["expires"])
+        entry = Mock(spec_set=["expires", "priority", "weight"])
         entry.expires = 0
+        entry.priority = 0
+        entry.weight = 0
 
         cache = {service_name: [entry]}
         resolver = SrvResolver(dns_client=dns_client_mock, cache=cache)
@@ -105,8 +107,10 @@ class SrvResolverTestCase(unittest.TestCase):
 
         service_name = b"test_service.example.com"
 
-        entry = Mock(spec_set=["expires"])
+        entry = Mock(spec_set=["expires", "priority", "weight"])
         entry.expires = 999999999
+        entry.priority = 0
+        entry.weight = 0
 
         cache = {service_name: [entry]}
         resolver = SrvResolver(
diff --git a/tests/logging/__init__.py b/tests/logging/__init__.py
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/tests/logging/__init__.py
diff --git a/tests/logging/test_structured.py b/tests/logging/test_structured.py
new file mode 100644
index 0000000000..a786de0233
--- /dev/null
+++ b/tests/logging/test_structured.py
@@ -0,0 +1,197 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import os
+import os.path
+import shutil
+import sys
+import textwrap
+
+from twisted.logger import Logger, eventAsText, eventsFromJSONLogFile
+
+from synapse.config.logger import setup_logging
+from synapse.logging._structured import setup_structured_logging
+from synapse.logging.context import LoggingContext
+
+from tests.unittest import DEBUG, HomeserverTestCase
+
+
+class FakeBeginner(object):
+    def beginLoggingTo(self, observers, **kwargs):
+        self.observers = observers
+
+
+class StructuredLoggingTestCase(HomeserverTestCase):
+    """
+    Tests for Synapse's structured logging support.
+    """
+
+    def test_output_to_json_round_trip(self):
+        """
+        Synapse logs can be outputted to JSON and then read back again.
+        """
+        temp_dir = self.mktemp()
+        os.mkdir(temp_dir)
+        self.addCleanup(shutil.rmtree, temp_dir)
+
+        json_log_file = os.path.abspath(os.path.join(temp_dir, "out.json"))
+
+        log_config = {
+            "drains": {"jsonfile": {"type": "file_json", "location": json_log_file}}
+        }
+
+        # Begin the logger with our config
+        beginner = FakeBeginner()
+        setup_structured_logging(
+            self.hs, self.hs.config, log_config, logBeginner=beginner
+        )
+
+        # Make a logger and send an event
+        logger = Logger(
+            namespace="tests.logging.test_structured", observer=beginner.observers[0]
+        )
+        logger.info("Hello there, {name}!", name="wally")
+
+        # Read the log file and check it has the event we sent
+        with open(json_log_file, "r") as f:
+            logged_events = list(eventsFromJSONLogFile(f))
+        self.assertEqual(len(logged_events), 1)
+
+        # The event pulled from the file should render fine
+        self.assertEqual(
+            eventAsText(logged_events[0], includeTimestamp=False),
+            "[tests.logging.test_structured#info] Hello there, wally!",
+        )
+
+    def test_output_to_text(self):
+        """
+        Synapse logs can be outputted to text.
+        """
+        temp_dir = self.mktemp()
+        os.mkdir(temp_dir)
+        self.addCleanup(shutil.rmtree, temp_dir)
+
+        log_file = os.path.abspath(os.path.join(temp_dir, "out.log"))
+
+        log_config = {"drains": {"file": {"type": "file", "location": log_file}}}
+
+        # Begin the logger with our config
+        beginner = FakeBeginner()
+        setup_structured_logging(
+            self.hs, self.hs.config, log_config, logBeginner=beginner
+        )
+
+        # Make a logger and send an event
+        logger = Logger(
+            namespace="tests.logging.test_structured", observer=beginner.observers[0]
+        )
+        logger.info("Hello there, {name}!", name="wally")
+
+        # Read the log file and check it has the event we sent
+        with open(log_file, "r") as f:
+            logged_events = f.read().strip().split("\n")
+        self.assertEqual(len(logged_events), 1)
+
+        # The event pulled from the file should render fine
+        self.assertTrue(
+            logged_events[0].endswith(
+                " - tests.logging.test_structured - INFO - None - Hello there, wally!"
+            )
+        )
+
+    def test_collects_logcontext(self):
+        """
+        Test that log outputs have the attached logging context.
+        """
+        log_config = {"drains": {}}
+
+        # Begin the logger with our config
+        beginner = FakeBeginner()
+        publisher = setup_structured_logging(
+            self.hs, self.hs.config, log_config, logBeginner=beginner
+        )
+
+        logs = []
+
+        publisher.addObserver(logs.append)
+
+        # Make a logger and send an event
+        logger = Logger(
+            namespace="tests.logging.test_structured", observer=beginner.observers[0]
+        )
+
+        with LoggingContext("testcontext", request="somereq"):
+            logger.info("Hello there, {name}!", name="steve")
+
+        self.assertEqual(len(logs), 1)
+        self.assertEqual(logs[0]["request"], "somereq")
+
+
+class StructuredLoggingConfigurationFileTestCase(HomeserverTestCase):
+    def make_homeserver(self, reactor, clock):
+
+        tempdir = self.mktemp()
+        os.mkdir(tempdir)
+        log_config_file = os.path.abspath(os.path.join(tempdir, "log.config.yaml"))
+        self.homeserver_log = os.path.abspath(os.path.join(tempdir, "homeserver.log"))
+
+        config = self.default_config()
+        config["log_config"] = log_config_file
+
+        with open(log_config_file, "w") as f:
+            f.write(
+                textwrap.dedent(
+                    """\
+                    structured: true
+
+                    drains:
+                        file:
+                            type: file_json
+                            location: %s
+                    """
+                    % (self.homeserver_log,)
+                )
+            )
+
+        self.addCleanup(self._sys_cleanup)
+
+        return self.setup_test_homeserver(config=config)
+
+    def _sys_cleanup(self):
+        sys.stdout = sys.__stdout__
+        sys.stderr = sys.__stderr__
+
+    # Do not remove! We need the logging system to be set other than WARNING.
+    @DEBUG
+    def test_log_output(self):
+        """
+        When a structured logging config is given, Synapse will use it.
+        """
+        setup_logging(self.hs, self.hs.config)
+
+        # Make a logger and send an event
+        logger = Logger(namespace="tests.logging.test_structured")
+
+        with LoggingContext("testcontext", request="somereq"):
+            logger.info("Hello there, {name}!", name="steve")
+
+        with open(self.homeserver_log, "r") as f:
+            logged_events = [
+                eventAsText(x, includeTimestamp=False) for x in eventsFromJSONLogFile(f)
+            ]
+
+        logs = "\n".join(logged_events)
+        self.assertTrue("***** STARTING SERVER *****" in logs)
+        self.assertTrue("Hello there, steve!" in logs)
diff --git a/tests/logging/test_terse_json.py b/tests/logging/test_terse_json.py
new file mode 100644
index 0000000000..514282591d
--- /dev/null
+++ b/tests/logging/test_terse_json.py
@@ -0,0 +1,234 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import json
+from collections import Counter
+
+from twisted.logger import Logger
+
+from synapse.logging._structured import setup_structured_logging
+
+from tests.server import connect_client
+from tests.unittest import HomeserverTestCase
+
+from .test_structured import FakeBeginner
+
+
+class TerseJSONTCPTestCase(HomeserverTestCase):
+    def test_log_output(self):
+        """
+        The Terse JSON outputter delivers simplified structured logs over TCP.
+        """
+        log_config = {
+            "drains": {
+                "tersejson": {
+                    "type": "network_json_terse",
+                    "host": "127.0.0.1",
+                    "port": 8000,
+                }
+            }
+        }
+
+        # Begin the logger with our config
+        beginner = FakeBeginner()
+        setup_structured_logging(
+            self.hs, self.hs.config, log_config, logBeginner=beginner
+        )
+
+        logger = Logger(
+            namespace="tests.logging.test_terse_json", observer=beginner.observers[0]
+        )
+        logger.info("Hello there, {name}!", name="wally")
+
+        # Trigger the connection
+        self.pump()
+
+        _, server = connect_client(self.reactor, 0)
+
+        # Trigger data being sent
+        self.pump()
+
+        # One log message, with a single trailing newline
+        logs = server.data.decode("utf8").splitlines()
+        self.assertEqual(len(logs), 1)
+        self.assertEqual(server.data.count(b"\n"), 1)
+
+        log = json.loads(logs[0])
+
+        # The terse logger should give us these keys.
+        expected_log_keys = [
+            "log",
+            "time",
+            "level",
+            "log_namespace",
+            "request",
+            "scope",
+            "server_name",
+            "name",
+        ]
+        self.assertEqual(set(log.keys()), set(expected_log_keys))
+
+        # It contains the data we expect.
+        self.assertEqual(log["name"], "wally")
+
+    def test_log_backpressure_debug(self):
+        """
+        When backpressure is hit, DEBUG logs will be shed.
+        """
+        log_config = {
+            "loggers": {"synapse": {"level": "DEBUG"}},
+            "drains": {
+                "tersejson": {
+                    "type": "network_json_terse",
+                    "host": "127.0.0.1",
+                    "port": 8000,
+                    "maximum_buffer": 10,
+                }
+            },
+        }
+
+        # Begin the logger with our config
+        beginner = FakeBeginner()
+        setup_structured_logging(
+            self.hs,
+            self.hs.config,
+            log_config,
+            logBeginner=beginner,
+            redirect_stdlib_logging=False,
+        )
+
+        logger = Logger(
+            namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+        )
+
+        # Send some debug messages
+        for i in range(0, 3):
+            logger.debug("debug %s" % (i,))
+
+        # Send a bunch of useful messages
+        for i in range(0, 7):
+            logger.info("test message %s" % (i,))
+
+        # The last debug message pushes it past the maximum buffer
+        logger.debug("too much debug")
+
+        # Allow the reconnection
+        _, server = connect_client(self.reactor, 0)
+        self.pump()
+
+        # Only the 7 infos made it through, the debugs were elided
+        logs = server.data.splitlines()
+        self.assertEqual(len(logs), 7)
+
+    def test_log_backpressure_info(self):
+        """
+        When backpressure is hit, DEBUG and INFO logs will be shed.
+        """
+        log_config = {
+            "loggers": {"synapse": {"level": "DEBUG"}},
+            "drains": {
+                "tersejson": {
+                    "type": "network_json_terse",
+                    "host": "127.0.0.1",
+                    "port": 8000,
+                    "maximum_buffer": 10,
+                }
+            },
+        }
+
+        # Begin the logger with our config
+        beginner = FakeBeginner()
+        setup_structured_logging(
+            self.hs,
+            self.hs.config,
+            log_config,
+            logBeginner=beginner,
+            redirect_stdlib_logging=False,
+        )
+
+        logger = Logger(
+            namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+        )
+
+        # Send some debug messages
+        for i in range(0, 3):
+            logger.debug("debug %s" % (i,))
+
+        # Send a bunch of useful messages
+        for i in range(0, 10):
+            logger.warn("test warn %s" % (i,))
+
+        # Send a bunch of info messages
+        for i in range(0, 3):
+            logger.info("test message %s" % (i,))
+
+        # The last debug message pushes it past the maximum buffer
+        logger.debug("too much debug")
+
+        # Allow the reconnection
+        client, server = connect_client(self.reactor, 0)
+        self.pump()
+
+        # The 10 warnings made it through, the debugs and infos were elided
+        logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
+        self.assertEqual(len(logs), 10)
+
+        self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
+
+    def test_log_backpressure_cut_middle(self):
+        """
+        When backpressure is hit, and no more DEBUG and INFOs cannot be culled,
+        it will cut the middle messages out.
+        """
+        log_config = {
+            "loggers": {"synapse": {"level": "DEBUG"}},
+            "drains": {
+                "tersejson": {
+                    "type": "network_json_terse",
+                    "host": "127.0.0.1",
+                    "port": 8000,
+                    "maximum_buffer": 10,
+                }
+            },
+        }
+
+        # Begin the logger with our config
+        beginner = FakeBeginner()
+        setup_structured_logging(
+            self.hs,
+            self.hs.config,
+            log_config,
+            logBeginner=beginner,
+            redirect_stdlib_logging=False,
+        )
+
+        logger = Logger(
+            namespace="synapse.logging.test_terse_json", observer=beginner.observers[0]
+        )
+
+        # Send a bunch of useful messages
+        for i in range(0, 20):
+            logger.warn("test warn", num=i)
+
+        # Allow the reconnection
+        client, server = connect_client(self.reactor, 0)
+        self.pump()
+
+        # The first five and last five warnings made it through, the debugs and
+        # infos were elided
+        logs = list(map(json.loads, server.data.decode("utf8").splitlines()))
+        self.assertEqual(len(logs), 10)
+        self.assertEqual(Counter([x["level"] for x in logs]), {"WARN": 10})
+        self.assertEqual([0, 1, 2, 3, 4, 15, 16, 17, 18, 19], [x["num"] for x in logs])
diff --git a/tests/server.py b/tests/server.py
index e573c4e4c5..c8269619b1 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -11,9 +11,13 @@ from twisted.internet import address, threads, udp
 from twisted.internet._resolver import SimpleResolverComplexifier
 from twisted.internet.defer import Deferred, fail, succeed
 from twisted.internet.error import DNSLookupError
-from twisted.internet.interfaces import IReactorPluggableNameResolver, IResolverSimple
+from twisted.internet.interfaces import (
+    IReactorPluggableNameResolver,
+    IReactorTCP,
+    IResolverSimple,
+)
 from twisted.python.failure import Failure
-from twisted.test.proto_helpers import MemoryReactorClock
+from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactorClock
 from twisted.web.http import unquote
 from twisted.web.http_headers import Headers
 
@@ -465,3 +469,22 @@ class FakeTransport(object):
         self.buffer = self.buffer[len(to_write) :]
         if self.buffer and self.autoflush:
             self._reactor.callLater(0.0, self.flush)
+
+
+def connect_client(reactor: IReactorTCP, client_id: int) -> AccumulatingProtocol:
+    """
+    Connect a client to a fake TCP transport.
+
+    Args:
+        reactor
+        factory: The connecting factory to build.
+    """
+    factory = reactor.tcpClients[client_id][2]
+    client = factory.buildProtocol(None)
+    server = AccumulatingProtocol()
+    server.makeConnection(FakeTransport(client, reactor))
+    client.makeConnection(FakeTransport(server, reactor))
+
+    reactor.tcpClients.pop(client_id)
+
+    return client, server
diff --git a/tests/test_visibility.py b/tests/test_visibility.py
index e0605dac2f..18f1a0035d 100644
--- a/tests/test_visibility.py
+++ b/tests/test_visibility.py
@@ -74,7 +74,6 @@ class FilterEventsForServerTestCase(tests.unittest.TestCase):
             self.assertEqual(events_to_filter[i].event_id, filtered[i].event_id)
             self.assertEqual(filtered[i].content["a"], "b")
 
-    @tests.unittest.DEBUG
     @defer.inlineCallbacks
     def test_erased_user(self):
         # 4 message events, from erased and unerased users, with a membership