summary refs log tree commit diff
diff options
context:
space:
mode:
authorAmber Brown <hawkowl@atleastfornow.net>2019-05-20 17:01:50 -0500
committerGitHub <noreply@github.com>2019-05-20 17:01:50 -0500
commitc99c1051589a8370ae955bc21695343f87d6b8c1 (patch)
treebb9201cc2d9649bb3f2bc497fa503ed1fd6eba9e
parentMerge remote-tracking branch 'origin/develop' into shhs (diff)
downloadsynapse-c99c1051589a8370ae955bc21695343f87d6b8c1.tar.xz
SHHS - Room Join Complexity (#5072)
-rw-r--r--.gitignore2
-rw-r--r--changelog.d/5072.feature1
-rw-r--r--docs/sample_config.yaml11
-rw-r--r--synapse/api/urls.py1
-rw-r--r--synapse/config/server.py17
-rw-r--r--synapse/federation/federation_client.py36
-rw-r--r--synapse/federation/transport/client.py38
-rw-r--r--synapse/federation/transport/server.py27
-rw-r--r--synapse/handlers/federation.py25
-rw-r--r--synapse/handlers/room_member.py89
-rw-r--r--synapse/rest/admin/__init__.py12
-rw-r--r--synapse/storage/events_worker.py44
-rw-r--r--tests/federation/test_complexity.py160
-rw-r--r--tests/rest/client/v1/utils.py17
-rw-r--r--tests/unittest.py6
15 files changed, 462 insertions, 24 deletions
diff --git a/.gitignore b/.gitignore
index a84c41b0c9..702dd33eb1 100644
--- a/.gitignore
+++ b/.gitignore
@@ -19,6 +19,7 @@ _trial_temp*/
 /*.signing.key
 /env/
 /homeserver*.yaml
+/logs
 /media_store/
 /uploads
 
@@ -37,4 +38,3 @@ _trial_temp*/
 /docs/build/
 /htmlcov
 /pip-wheel-metadata/
-
diff --git a/changelog.d/5072.feature b/changelog.d/5072.feature
new file mode 100644
index 0000000000..99fda5616a
--- /dev/null
+++ b/changelog.d/5072.feature
@@ -0,0 +1 @@
+Synapse can now be configured to not join remote rooms of a given "complexity" (currently, state events). This option can be used to prevent adverse performance on resource-constrained homeservers.
\ No newline at end of file
diff --git a/docs/sample_config.yaml b/docs/sample_config.yaml
index d218aefee5..3388a2d513 100644
--- a/docs/sample_config.yaml
+++ b/docs/sample_config.yaml
@@ -271,6 +271,17 @@ listeners:
 # Used by phonehome stats to group together related servers.
 #server_context: context
 
+# Resource-constrained Homeserver Settings
+#
+# If limit_large_remote_room_joins is True, the room complexity will be
+# checked before a user joins a new remote room. If it is above
+# limit_large_remote_room_complexity, it will disallow joining or
+# instantly leave.
+#
+# Uncomment the below lines to enable:
+#limit_large_remote_room_joins: True
+#limit_large_remote_room_complexity: 1.0
+
 # Whether to require a user to be in the room to add an alias to it.
 # Defaults to 'true'.
 #
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index 3c6bddff7a..e16c386a14 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -26,6 +26,7 @@ CLIENT_API_PREFIX = "/_matrix/client"
 FEDERATION_PREFIX = "/_matrix/federation"
 FEDERATION_V1_PREFIX = FEDERATION_PREFIX + "/v1"
 FEDERATION_V2_PREFIX = FEDERATION_PREFIX + "/v2"
+FEDERATION_UNSTABLE_PREFIX = FEDERATION_PREFIX + "/unstable"
 STATIC_PREFIX = "/_matrix/static"
 WEB_CLIENT_PREFIX = "/_matrix/client"
 CONTENT_REPO_PREFIX = "/_matrix/content"
diff --git a/synapse/config/server.py b/synapse/config/server.py
index f34aa42afa..30adb5c5e7 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -221,6 +221,12 @@ class ServerConfig(Config):
 
         self.gc_thresholds = read_gc_thresholds(config.get("gc_thresholds", None))
 
+        # Resource-constrained Homeserver Configuration
+        self.limit_large_room_joins = config.get("limit_large_remote_room_joins", False)
+        self.limit_large_room_complexity = config.get(
+            "limit_large_remote_room_complexity", 1.0
+        )
+
         bind_port = config.get("bind_port")
         if bind_port:
             if config.get("no_tls", False):
@@ -572,6 +578,17 @@ class ServerConfig(Config):
         # Used by phonehome stats to group together related servers.
         #server_context: context
 
+        # Resource-constrained Homeserver Settings
+        #
+        # If limit_large_remote_room_joins is True, the room complexity will be
+        # checked before a user joins a new remote room. If it is above
+        # limit_large_remote_room_complexity, it will disallow joining or
+        # instantly leave.
+        #
+        # Uncomment the below lines to enable:
+        #limit_large_remote_room_joins: True
+        #limit_large_remote_room_complexity: 1.0
+
         # Whether to require a user to be in the room to add an alias to it.
         # Defaults to 'true'.
         #
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f3fc897a0a..d4c62ec0c8 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -992,3 +992,39 @@ class FederationClient(FederationBase):
                 )
 
         raise RuntimeError("Failed to send to any server.")
+
+    @defer.inlineCallbacks
+    def get_room_complexity(self, destination, room_id):
+        """
+        Fetch the complexity of a remote room from another server.
+
+        Args:
+            destination (str): The remote server
+            room_id (str): The room ID to ask about.
+
+        Returns:
+            Deferred[dict] or Deferred[None]: Dict contains the complexity
+            metric versions, while None means we could not fetch the complexity.
+        """
+        try:
+            complexity = yield self.transport_layer.get_room_complexity(
+                destination=destination,
+                room_id=room_id
+            )
+            defer.returnValue(complexity)
+        except CodeMessageException as e:
+            # We didn't manage to get it -- probably a 404. We are okay if other
+            # servers don't give it to us.
+            logger.debug(
+                "Failed to fetch room complexity via %s for %s, got a %d",
+                destination, room_id, e.code
+            )
+        except Exception:
+            logger.exception(
+                "Failed to fetch room complexity via %s for %s",
+                destination, room_id
+            )
+
+        # If we don't manage to find it, return None. It's not an error if a
+        # server doesn't give it to us.
+        defer.returnValue(None)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index e424c40fdf..2797122700 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -21,7 +21,11 @@ from six.moves import urllib
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
-from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
+from synapse.api.urls import (
+    FEDERATION_UNSTABLE_PREFIX,
+    FEDERATION_V1_PREFIX,
+    FEDERATION_V2_PREFIX,
+)
 from synapse.util.logutils import log_function
 
 logger = logging.getLogger(__name__)
@@ -959,6 +963,28 @@ class TransportLayerClient(object):
             ignore_backoff=True,
         )
 
+    def get_room_complexity(self, destination, room_id):
+        """
+        Args:
+            destination (str): The remote server
+            room_id (str): The room ID to ask about.
+        """
+        path = _create_path(
+            FEDERATION_UNSTABLE_PREFIX, "/rooms/%s/complexity", room_id
+        )
+
+        return self.client.get_json(
+            destination=destination,
+            path=path
+        )
+
+
+def _create_path(federation_prefix, path, *args):
+    """
+    Ensures that all args are url encoded.
+    """
+    return federation_prefix + path % tuple(urllib.parse.quote(arg, "") for arg in args)
+
 
 def _create_v1_path(path, *args):
     """Creates a path against V1 federation API from the path template and
@@ -975,10 +1001,7 @@ def _create_v1_path(path, *args):
     Returns:
         str
     """
-    return (
-        FEDERATION_V1_PREFIX
-        + path % tuple(urllib.parse.quote(arg, "") for arg in args)
-    )
+    return _create_path(FEDERATION_V1_PREFIX, path, *args)
 
 
 def _create_v2_path(path, *args):
@@ -996,7 +1019,4 @@ def _create_v2_path(path, *args):
     Returns:
         str
     """
-    return (
-        FEDERATION_V2_PREFIX
-        + path % tuple(urllib.parse.quote(arg, "") for arg in args)
-    )
+    return _create_path(FEDERATION_V2_PREFIX, path, *args)
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 385eda2dca..db73385ccd 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -23,7 +23,11 @@ from twisted.internet import defer
 import synapse
 from synapse.api.errors import Codes, FederationDeniedError, SynapseError
 from synapse.api.room_versions import RoomVersions
-from synapse.api.urls import FEDERATION_V1_PREFIX, FEDERATION_V2_PREFIX
+from synapse.api.urls import (
+    FEDERATION_UNSTABLE_PREFIX,
+    FEDERATION_V1_PREFIX,
+    FEDERATION_V2_PREFIX,
+)
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.http.server import JsonResource
 from synapse.http.servlet import (
@@ -1304,6 +1308,26 @@ class FederationGroupsSettingJoinPolicyServlet(BaseFederationServlet):
         defer.returnValue((200, new_content))
 
 
+class RoomComplexityServlet(BaseFederationServlet):
+    PATH = "/rooms/(?P<room_id>[^/]*)/complexity"
+    PREFIX = FEDERATION_UNSTABLE_PREFIX
+
+    @defer.inlineCallbacks
+    def on_GET(self, origin, content, query, room_id):
+
+        store = self.handler.hs.get_datastore()
+
+        is_public = yield store.is_room_world_readable_or_publicly_joinable(
+            room_id
+        )
+
+        if not is_public:
+            raise SynapseError(404, "Room not found", errcode=Codes.INVALID_PARAM)
+
+        complexity = yield store.get_room_complexity(room_id)
+        defer.returnValue((200, complexity))
+
+
 FEDERATION_SERVLET_CLASSES = (
     FederationSendServlet,
     FederationEventServlet,
@@ -1327,6 +1351,7 @@ FEDERATION_SERVLET_CLASSES = (
     FederationThirdPartyInviteExchangeServlet,
     On3pidBindServlet,
     FederationVersionServlet,
+    RoomComplexityServlet,
 )
 
 OPENID_SERVLET_CLASSES = (
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 0684778882..71d4bfb690 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2724,3 +2724,28 @@ class FederationHandler(BaseHandler):
             )
         else:
             return user_joined_room(self.distributor, user, room_id)
+
+    @defer.inlineCallbacks
+    def get_room_complexity(self, remote_room_hosts, room_id):
+        """
+        Fetch the complexity of a remote room over federation.
+
+        Args:
+            remote_room_hosts (list[str]): The remote servers to ask.
+            room_id (str): The room ID to ask about.
+
+        Returns:
+            Deferred[dict] or Deferred[None]: Dict contains the complexity
+            metric versions, while None means we could not fetch the complexity.
+        """
+
+        for host in remote_room_hosts:
+            res = yield self.federation_client.get_room_complexity(host, room_id)
+
+            # We got a result, return it.
+            if res:
+                defer.returnValue(res)
+
+        # We fell off the bottom, couldn't get the complexity from anyone. Oh
+        # well.
+        defer.returnValue(None)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 93ac986c86..7aefbc426a 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -26,8 +26,7 @@ from unpaddedbase64 import decode_base64
 
 from twisted.internet import defer
 
-import synapse.server
-import synapse.types
+from synapse import types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
 from synapse.types import RoomID, UserID
@@ -590,7 +589,7 @@ class RoomMemberHandler(object):
             )
             assert self.hs.is_mine(sender), "Sender must be our own: %s" % (sender,)
         else:
-            requester = synapse.types.create_requester(target_user)
+            requester = types.create_requester(target_user)
 
         prev_event = yield self.event_creation_handler.deduplicate_state_event(
             event, context,
@@ -1018,13 +1017,53 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         self.distributor.declare("user_left_room")
 
     @defer.inlineCallbacks
+    def _is_remote_room_too_complex(self, room_id, remote_room_hosts):
+        """
+        Check if complexity of a remote room is too great.
+
+        Args:
+            room_id (str)
+            remote_room_hosts (list[str])
+
+        Returns: bool of whether the complexity is too great, or None
+            if unable to be fetched
+        """
+        max_complexity = self.hs.config.limit_large_room_complexity
+        complexity = yield self.federation_handler.get_room_complexity(
+            remote_room_hosts, room_id
+        )
+
+        if complexity:
+            if complexity["v1"] > max_complexity:
+                return True
+            return False
+        return None
+
+    @defer.inlineCallbacks
+    def _is_local_room_too_complex(self, room_id):
+        """
+        Check if the complexity of a local room is too great.
+
+        Args:
+            room_id (str)
+
+        Returns: bool
+        """
+        max_complexity = self.hs.config.limit_large_room_complexity
+        complexity = yield self.store.get_room_complexity(room_id)
+
+        if complexity["v1"] > max_complexity:
+            return True
+
+        return False
+
+    @defer.inlineCallbacks
     def _remote_join(self, requester, remote_room_hosts, room_id, user, content):
         """Implements RoomMemberHandler._remote_join
         """
         # filter ourselves out of remote_room_hosts: do_invite_join ignores it
         # and if it is the only entry we'd like to return a 404 rather than a
         # 500.
-
         remote_room_hosts = [
             host for host in remote_room_hosts if host != self.hs.hostname
         ]
@@ -1032,6 +1071,18 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         if len(remote_room_hosts) == 0:
             raise SynapseError(404, "No known servers")
 
+        if self.hs.config.limit_large_room_joins:
+            # Fetch the room complexity
+            too_complex = yield self._is_remote_room_too_complex(
+                room_id, remote_room_hosts
+            )
+            if too_complex is True:
+                msg = "Room too large (preflight)"
+                raise SynapseError(
+                    code=400, msg=msg,
+                    errcode=Codes.RESOURCE_LIMIT_EXCEEDED
+                )
+
         # We don't do an auth check if we are doing an invite
         # join dance for now, since we're kinda implicitly checking
         # that we are allowed to join when we decide whether or not we
@@ -1044,6 +1095,36 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         )
         yield self._user_joined_room(user, room_id)
 
+        # Check the room we just joined wasn't too large, if we didn't fetch the
+        # complexity of it before.
+        if self.hs.config.limit_large_room_joins:
+            if too_complex is False:
+                # We checked, and we're under the limit.
+                return
+
+            # Check again, but with the local state events
+            too_complex = yield self._is_local_room_too_complex(room_id)
+
+            if too_complex is False:
+                # We're under the limit.
+                return
+
+            # The room is too large. Leave.
+            requester = types.create_requester(
+                user, None, False, None
+            )
+            yield self.update_membership(
+                requester=requester,
+                target=user,
+                room_id=room_id,
+                action="leave"
+            )
+            msg = "Room too large (postflight)"
+            raise SynapseError(
+                code=400, msg=msg,
+                errcode=Codes.RESOURCE_LIMIT_EXCEEDED
+            )
+
     @defer.inlineCallbacks
     def _remote_reject_invite(self, requester, remote_room_hosts, room_id, target):
         """Implements RoomMemberHandler._remote_reject_invite
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 744d85594f..d6c4dcdb18 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -822,10 +822,16 @@ class AdminRestResource(JsonResource):
 
     def __init__(self, hs):
         JsonResource.__init__(self, hs, canonical_json=False)
+        register_servlets(hs, self)
 
-        register_servlets_for_client_rest_resource(hs, self)
-        SendServerNoticeServlet(hs).register(self)
-        VersionServlet(hs).register(self)
+
+def register_servlets(hs, http_server):
+    """
+    Register all the admin servlets.
+    """
+    register_servlets_for_client_rest_resource(hs, http_server)
+    SendServerNoticeServlet(hs).register(http_server)
+    VersionServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/storage/events_worker.py b/synapse/storage/events_worker.py
index adc6cf26b5..e96f801631 100644
--- a/synapse/storage/events_worker.py
+++ b/synapse/storage/events_worker.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from __future__ import division
+
 import itertools
 import logging
 from collections import namedtuple
@@ -611,3 +613,45 @@ class EventsWorkerStore(SQLBaseStore):
             return res
 
         return self.runInteraction("get_rejection_reasons", f)
+
+    def _get_state_event_counts_txn(self, txn, room_id):
+        """
+        See get_state_event_counts.
+        """
+        sql = "SELECT COUNT(*) FROM current_state_events WHERE room_id=?"
+        txn.execute(sql, (room_id,))
+        row = txn.fetchone()
+        return row[0] if row else 0
+
+    def get_state_event_counts(self, room_id):
+        """
+        Gets the total number of state events in a room.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[int]
+        """
+        return self.runInteraction(
+            "get_state_event_counts", self._get_state_event_counts_txn, room_id
+        )
+
+    @defer.inlineCallbacks
+    def get_room_complexity(self, room_id):
+        """
+        Get the complexity of a room.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[dict[str:int]] of complexity version to complexity.
+        """
+        state_events = yield self.get_state_event_counts(room_id)
+
+        # Call this one "v1", so we can introduce new ones as we want to develop
+        # it.
+        complexity_v1 = round(state_events / 500, 2)
+
+        defer.returnValue({"v1": complexity_v1})
diff --git a/tests/federation/test_complexity.py b/tests/federation/test_complexity.py
new file mode 100644
index 0000000000..18af1f5c8d
--- /dev/null
+++ b/tests/federation/test_complexity.py
@@ -0,0 +1,160 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 Matrix.org Foundation
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from mock import Mock
+
+from twisted.internet import defer
+
+from synapse.api.errors import Codes, SynapseError
+from synapse.config.ratelimiting import FederationRateLimitConfig
+from synapse.federation.transport import server
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, room
+from synapse.types import UserID
+from synapse.util.ratelimitutils import FederationRateLimiter
+
+from tests import unittest
+
+
+class RoomComplexityTests(unittest.HomeserverTestCase):
+
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def default_config(self, name='test'):
+        config = super().default_config(name=name)
+        config["limit_large_remote_room_joins"] = True
+        config["limit_large_remote_room_complexity"] = 0.05
+        return config
+
+    def prepare(self, reactor, clock, homeserver):
+        class Authenticator(object):
+            def authenticate_request(self, request, content):
+                return defer.succeed("otherserver.nottld")
+
+        ratelimiter = FederationRateLimiter(
+            clock,
+            FederationRateLimitConfig(
+                window_size=1,
+                sleep_limit=1,
+                sleep_msec=1,
+                reject_limit=1000,
+                concurrent_requests=1000,
+            ),
+        )
+        server.register_servlets(
+            homeserver, self.resource, Authenticator(), ratelimiter
+        )
+
+    def test_complexity_simple(self):
+
+        u1 = self.register_user("u1", "pass")
+        u1_token = self.login("u1", "pass")
+
+        room_1 = self.helper.create_room_as(u1, tok=u1_token)
+        self.helper.send_state(
+            room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
+        )
+
+        # Get the room complexity
+        request, channel = self.make_request(
+            "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
+        )
+        self.render(request)
+        self.assertEquals(200, channel.code)
+        complexity = channel.json_body["v1"]
+        self.assertTrue(complexity > 0, complexity)
+
+        # Artificially raise the complexity
+        store = self.hs.get_datastore()
+        store.get_state_event_counts = lambda x: defer.succeed(500 * 1.23)
+
+        # Get the room complexity again -- make sure it's our artificial value
+        request, channel = self.make_request(
+            "GET", "/_matrix/federation/unstable/rooms/%s/complexity" % (room_1,)
+        )
+        self.render(request)
+        self.assertEquals(200, channel.code)
+        complexity = channel.json_body["v1"]
+        self.assertEqual(complexity, 1.23)
+
+    def test_join_too_large(self):
+
+        u1 = self.register_user("u1", "pass")
+
+        handler = self.hs.get_room_member_handler()
+        fed_transport = self.hs.get_federation_transport_client()
+
+        # Mock out some things, because we don't want to test the whole join
+        fed_transport.client.get_json = Mock(return_value=defer.succeed({"v1": 9999}))
+        handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1))
+
+        d = handler._remote_join(
+            None,
+            ["otherserver.example"],
+            "roomid",
+            UserID.from_string(u1),
+            {"membership": "join"},
+        )
+
+        self.pump()
+
+        # The request failed with a SynapseError saying the resource limit was
+        # exceeded.
+        f = self.get_failure(d, SynapseError)
+        self.assertEqual(f.value.code, 400, f.value)
+        self.assertEqual(f.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
+
+    def test_join_too_large_once_joined(self):
+
+        u1 = self.register_user("u1", "pass")
+        u1_token = self.login("u1", "pass")
+
+        # Ok, this might seem a bit weird -- I want to test that we actually
+        # leave the room, but I don't want to simulate two servers. So, we make
+        # a local room, which we say we're joining remotely, even if there's no
+        # remote, because we mock that out. Then, we'll leave the (actually
+        # local) room, which will be propagated over federation in a real
+        # scenario.
+        room_1 = self.helper.create_room_as(u1, tok=u1_token)
+
+        handler = self.hs.get_room_member_handler()
+        fed_transport = self.hs.get_federation_transport_client()
+
+        # Mock out some things, because we don't want to test the whole join
+        fed_transport.client.get_json = Mock(return_value=defer.succeed(None))
+        handler.federation_handler.do_invite_join = Mock(return_value=defer.succeed(1))
+
+        # Artificially raise the complexity
+        self.hs.get_datastore().get_state_event_counts = lambda x: defer.succeed(600)
+
+        d = handler._remote_join(
+            None,
+            ["otherserver.example"],
+            room_1,
+            UserID.from_string(u1),
+            {"membership": "join"},
+        )
+
+        self.pump()
+
+        # The request failed with a SynapseError saying the resource limit was
+        # exceeded.
+        f = self.get_failure(d, SynapseError)
+        self.assertEqual(f.value.code, 400)
+        self.assertEqual(f.value.errcode, Codes.RESOURCE_LIMIT_EXCEEDED)
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index 05b0143c42..f7133fc12e 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -127,3 +127,20 @@ class RestHelper(object):
         )
 
         return channel.json_body
+
+    def send_state(self, room_id, event_type, body, tok, expect_code=200):
+        path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
+        if tok:
+            path = path + "?access_token=%s" % tok
+
+        request, channel = make_request(
+            self.hs.get_reactor(), "PUT", path, json.dumps(body).encode('utf8')
+        )
+        render(request, self.resource, self.hs.get_reactor())
+
+        assert int(channel.result["code"]) == expect_code, (
+            "Expected: %d, got: %d, resp: %r"
+            % (expect_code, int(channel.result["code"]), channel.result["body"])
+        )
+
+        return channel.json_body
diff --git a/tests/unittest.py b/tests/unittest.py
index 26204470b1..ea2b7f272e 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -22,8 +22,6 @@ from mock import Mock
 
 from canonicaljson import json
 
-import twisted
-import twisted.logger
 from twisted.internet.defer import Deferred
 from twisted.trial import unittest
 
@@ -77,10 +75,6 @@ class TestCase(unittest.TestCase):
 
         @around(self)
         def setUp(orig):
-            # enable debugging of delayed calls - this means that we get a
-            # traceback when a unit test exits leaving things on the reactor.
-            twisted.internet.base.DelayedCall.debug = True
-
             # if we're not starting in the sentinel logcontext, then to be honest
             # all future bets are off.
             if LoggingContext.current_context() is not LoggingContext.sentinel: