summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7847.feature1
-rw-r--r--changelog.d/7853.misc1
-rw-r--r--docs/admin_api/user_admin_api.rst6
-rw-r--r--synapse/handlers/deactivate_account.py48
-rw-r--r--synapse/http/client.py24
-rw-r--r--synapse/rest/admin/users.py10
-rw-r--r--synapse/server.pyi5
-rw-r--r--tests/replication/_base.py168
-rw-r--r--tests/replication/test_client_reader_shard.py59
-rw-r--r--tests/replication/test_federation_sender_shard.py191
-rw-r--r--tests/rest/admin/test_user.py47
-rw-r--r--tests/server.py26
12 files changed, 390 insertions, 196 deletions
diff --git a/changelog.d/7847.feature b/changelog.d/7847.feature
new file mode 100644
index 0000000000..4b9a8d8569
--- /dev/null
+++ b/changelog.d/7847.feature
@@ -0,0 +1 @@
+Add the ability to re-activate an account from the admin API.
diff --git a/changelog.d/7853.misc b/changelog.d/7853.misc
new file mode 100644
index 0000000000..b4f614084d
--- /dev/null
+++ b/changelog.d/7853.misc
@@ -0,0 +1 @@
+Add support for handling registration requests across multiple client reader workers.
diff --git a/docs/admin_api/user_admin_api.rst b/docs/admin_api/user_admin_api.rst
index 7b030a6285..be05128b3e 100644
--- a/docs/admin_api/user_admin_api.rst
+++ b/docs/admin_api/user_admin_api.rst
@@ -91,10 +91,14 @@ Body parameters:
 
 - ``admin``, optional, defaults to ``false``.
 
-- ``deactivated``, optional, defaults to ``false``.
+- ``deactivated``, optional. If unspecified, deactivation state will be left
+  unchanged on existing accounts and set to ``false`` for new accounts.
 
 If the user already exists then optional parameters default to the current value.
 
+In order to re-activate an account ``deactivated`` must be set to ``false``. If
+users do not login via single-sign-on, a new ``password`` must be provided.
+
 List Accounts
 =============
 
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 3e3e6bd475..696d85b5f9 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -14,6 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import Optional
 
 from synapse.api.errors import SynapseError
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -45,19 +46,20 @@ class DeactivateAccountHandler(BaseHandler):
 
         self._account_validity_enabled = hs.config.account_validity.enabled
 
-    async def deactivate_account(self, user_id, erase_data, id_server=None):
+    async def deactivate_account(
+        self, user_id: str, erase_data: bool, id_server: Optional[str] = None
+    ) -> bool:
         """Deactivate a user's account
 
         Args:
-            user_id (str): ID of user to be deactivated
-            erase_data (bool): whether to GDPR-erase the user's data
-            id_server (str|None): Use the given identity server when unbinding
+            user_id: ID of user to be deactivated
+            erase_data: whether to GDPR-erase the user's data
+            id_server: Use the given identity server when unbinding
                 any threepids. If None then will attempt to unbind using the
                 identity server specified when binding (if known).
 
         Returns:
-            Deferred[bool]: True if identity server supports removing
-            threepids, otherwise False.
+            True if identity server supports removing threepids, otherwise False.
         """
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
@@ -134,11 +136,11 @@ class DeactivateAccountHandler(BaseHandler):
 
         return identity_server_supports_unbinding
 
-    async def _reject_pending_invites_for_user(self, user_id):
+    async def _reject_pending_invites_for_user(self, user_id: str):
         """Reject pending invites addressed to a given user ID.
 
         Args:
-            user_id (str): The user ID to reject pending invites for.
+            user_id: The user ID to reject pending invites for.
         """
         user = UserID.from_string(user_id)
         pending_invites = await self.store.get_invited_rooms_for_local_user(user_id)
@@ -166,22 +168,16 @@ class DeactivateAccountHandler(BaseHandler):
                     room.room_id,
                 )
 
-    def _start_user_parting(self):
+    def _start_user_parting(self) -> None:
         """
         Start the process that goes through the table of users
         pending deactivation, if it isn't already running.
-
-        Returns:
-            None
         """
         if not self._user_parter_running:
             run_as_background_process("user_parter_loop", self._user_parter_loop)
 
-    async def _user_parter_loop(self):
+    async def _user_parter_loop(self) -> None:
         """Loop that parts deactivated users from rooms
-
-        Returns:
-            None
         """
         self._user_parter_running = True
         logger.info("Starting user parter")
@@ -198,11 +194,8 @@ class DeactivateAccountHandler(BaseHandler):
         finally:
             self._user_parter_running = False
 
-    async def _part_user(self, user_id):
+    async def _part_user(self, user_id: str) -> None:
         """Causes the given user_id to leave all the rooms they're joined to
-
-        Returns:
-            None
         """
         user = UserID.from_string(user_id)
 
@@ -224,3 +217,18 @@ class DeactivateAccountHandler(BaseHandler):
                     user_id,
                     room_id,
                 )
+
+    async def activate_account(self, user_id: str) -> None:
+        """
+        Activate an account that was previously deactivated.
+
+        This simply marks the user as activate in the database and does not
+        attempt to rejoin rooms, re-add threepids, etc.
+
+        The user will also need a password hash set to actually login.
+
+        Args:
+            user_id: ID of user to be deactivated
+        """
+        # Mark the user as activate.
+        await self.store.set_user_deactivated_status(user_id, False)
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 505872ee90..b80681135e 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -31,6 +31,7 @@ from twisted.internet.interfaces import (
     IReactorPluggableNameResolver,
     IResolutionReceiver,
 )
+from twisted.internet.task import Cooperator
 from twisted.python.failure import Failure
 from twisted.web._newclient import ResponseDone
 from twisted.web.client import Agent, HTTPConnectionPool, readBody
@@ -69,6 +70,21 @@ def check_against_blacklist(ip_address, ip_whitelist, ip_blacklist):
     return False
 
 
+_EPSILON = 0.00000001
+
+
+def _make_scheduler(reactor):
+    """Makes a schedular suitable for a Cooperator using the given reactor.
+
+    (This is effectively just a copy from `twisted.internet.task`)
+    """
+
+    def _scheduler(x):
+        return reactor.callLater(_EPSILON, x)
+
+    return _scheduler
+
+
 class IPBlacklistingResolver(object):
     """
     A proxy for reactor.nameResolver which only produces non-blacklisted IP
@@ -212,6 +228,10 @@ class SimpleHttpClient(object):
         if hs.config.user_agent_suffix:
             self.user_agent = "%s %s" % (self.user_agent, hs.config.user_agent_suffix)
 
+        # We use this for our body producers to ensure that they use the correct
+        # reactor.
+        self._cooperator = Cooperator(scheduler=_make_scheduler(hs.get_reactor()))
+
         self.user_agent = self.user_agent.encode("ascii")
 
         if self._ip_blacklist:
@@ -292,7 +312,9 @@ class SimpleHttpClient(object):
             try:
                 body_producer = None
                 if data is not None:
-                    body_producer = QuieterFileBodyProducer(BytesIO(data))
+                    body_producer = QuieterFileBodyProducer(
+                        BytesIO(data), cooperator=self._cooperator,
+                    )
 
                 request_deferred = treq.request(
                     method,
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index e4330c39d6..cc0bdfa5c9 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -239,6 +239,15 @@ class UserRestServletV2(RestServlet):
                     await self.deactivate_account_handler.deactivate_account(
                         target_user.to_string(), False
                     )
+                elif not deactivate and user["deactivated"]:
+                    if "password" not in body:
+                        raise SynapseError(
+                            400, "Must provide a password to re-activate an account."
+                        )
+
+                    await self.deactivate_account_handler.activate_account(
+                        target_user.to_string()
+                    )
 
             user = await self.admin_handler.get_user(target_user)
             return 200, user
@@ -254,7 +263,6 @@ class UserRestServletV2(RestServlet):
             admin = body.get("admin", None)
             user_type = body.get("user_type", None)
             displayname = body.get("displayname", None)
-            threepids = body.get("threepids", None)
 
             if user_type is not None and user_type not in UserTypes.ALL_USER_TYPES:
                 raise SynapseError(400, "Invalid user type")
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 58cd099e6d..cd50c721b8 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -20,6 +20,7 @@ import synapse.handlers.room
 import synapse.handlers.room_member
 import synapse.handlers.set_password
 import synapse.http.client
+import synapse.http.matrixfederationclient
 import synapse.notifier
 import synapse.push.pusherpool
 import synapse.replication.tcp.client
@@ -143,3 +144,7 @@ class HomeServer(object):
         pass
     def get_replication_streams(self) -> Dict[str, Stream]:
         pass
+    def get_http_client(
+        self,
+    ) -> synapse.http.matrixfederationclient.MatrixFederationHttpClient:
+        pass
diff --git a/tests/replication/_base.py b/tests/replication/_base.py
index 9d4f0bbe44..06575ba0a6 100644
--- a/tests/replication/_base.py
+++ b/tests/replication/_base.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import Any, List, Optional, Tuple
+from typing import Any, Callable, List, Optional, Tuple
 
 import attr
 
@@ -26,8 +26,9 @@ from synapse.app.generic_worker import (
     GenericWorkerReplicationHandler,
     GenericWorkerServer,
 )
+from synapse.http.server import JsonResource
 from synapse.http.site import SynapseRequest
-from synapse.replication.http import streams
+from synapse.replication.http import ReplicationRestResource, streams
 from synapse.replication.tcp.handler import ReplicationCommandHandler
 from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
 from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@@ -35,7 +36,7 @@ from synapse.server import HomeServer
 from synapse.util import Clock
 
 from tests import unittest
-from tests.server import FakeTransport
+from tests.server import FakeTransport, render
 
 logger = logging.getLogger(__name__)
 
@@ -180,6 +181,159 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
         self.assertEqual(request.method, b"GET")
 
 
+class BaseMultiWorkerStreamTestCase(unittest.HomeserverTestCase):
+    """Base class for tests running multiple workers.
+
+    Automatically handle HTTP replication requests from workers to master,
+    unlike `BaseStreamTestCase`.
+    """
+
+    servlets = []  # type: List[Callable[[HomeServer, JsonResource], None]]
+
+    def setUp(self):
+        super().setUp()
+
+        # build a replication server
+        self.server_factory = ReplicationStreamProtocolFactory(self.hs)
+        self.streamer = self.hs.get_replication_streamer()
+
+        store = self.hs.get_datastore()
+        self.database = store.db
+
+        self.reactor.lookups["testserv"] = "1.2.3.4"
+
+        self._worker_hs_to_resource = {}
+
+        # When we see a connection attempt to the master replication listener we
+        # automatically set up the connection. This is so that tests don't
+        # manually have to go and explicitly set it up each time (plus sometimes
+        # it is impossible to write the handling explicitly in the tests).
+        self.reactor.add_tcp_client_callback(
+            "1.2.3.4", 8765, self._handle_http_replication_attempt
+        )
+
+    def create_test_json_resource(self):
+        """Overrides `HomeserverTestCase.create_test_json_resource`.
+        """
+        # We override this so that it automatically registers all the HTTP
+        # replication servlets, without having to explicitly do that in all
+        # subclassses.
+
+        resource = ReplicationRestResource(self.hs)
+
+        for servlet in self.servlets:
+            servlet(self.hs, resource)
+
+        return resource
+
+    def make_worker_hs(
+        self, worker_app: str, extra_config: dict = {}, **kwargs
+    ) -> HomeServer:
+        """Make a new worker HS instance, correctly connecting replcation
+        stream to the master HS.
+
+        Args:
+            worker_app: Type of worker, e.g. `synapse.app.federation_sender`.
+            extra_config: Any extra config to use for this instances.
+            **kwargs: Options that get passed to `self.setup_test_homeserver`,
+                useful to e.g. pass some mocks for things like `http_client`
+
+        Returns:
+            The new worker HomeServer instance.
+        """
+
+        config = self._get_worker_hs_config()
+        config["worker_app"] = worker_app
+        config.update(extra_config)
+
+        worker_hs = self.setup_test_homeserver(
+            homeserverToUse=GenericWorkerServer,
+            config=config,
+            reactor=self.reactor,
+            **kwargs
+        )
+
+        store = worker_hs.get_datastore()
+        store.db._db_pool = self.database._db_pool
+
+        repl_handler = ReplicationCommandHandler(worker_hs)
+        client = ClientReplicationStreamProtocol(
+            worker_hs, "client", "test", self.clock, repl_handler,
+        )
+        server = self.server_factory.buildProtocol(None)
+
+        client_transport = FakeTransport(server, self.reactor)
+        client.makeConnection(client_transport)
+
+        server_transport = FakeTransport(client, self.reactor)
+        server.makeConnection(server_transport)
+
+        # Set up a resource for the worker
+        resource = ReplicationRestResource(self.hs)
+
+        for servlet in self.servlets:
+            servlet(worker_hs, resource)
+
+        self._worker_hs_to_resource[worker_hs] = resource
+
+        return worker_hs
+
+    def _get_worker_hs_config(self) -> dict:
+        config = self.default_config()
+        config["worker_replication_host"] = "testserv"
+        config["worker_replication_http_port"] = "8765"
+        return config
+
+    def render_on_worker(self, worker_hs: HomeServer, request: SynapseRequest):
+        render(request, self._worker_hs_to_resource[worker_hs], self.reactor)
+
+    def replicate(self):
+        """Tell the master side of replication that something has happened, and then
+        wait for the replication to occur.
+        """
+        self.streamer.on_notifier_poke()
+        self.pump()
+
+    def _handle_http_replication_attempt(self):
+        """Handles a connection attempt to the master replication HTTP
+        listener.
+        """
+
+        # We should have at least one outbound connection attempt, where the
+        # last is one to the HTTP repication IP/port.
+        clients = self.reactor.tcpClients
+        self.assertGreaterEqual(len(clients), 1)
+        (host, port, client_factory, _timeout, _bindAddress) = clients.pop()
+        self.assertEqual(host, "1.2.3.4")
+        self.assertEqual(port, 8765)
+
+        # Set up client side protocol
+        client_protocol = client_factory.buildProtocol(None)
+
+        request_factory = OneShotRequestFactory()
+
+        # Set up the server side protocol
+        channel = _PushHTTPChannel(self.reactor)
+        channel.requestFactory = request_factory
+        channel.site = self.site
+
+        # Connect client to server and vice versa.
+        client_to_server_transport = FakeTransport(
+            channel, self.reactor, client_protocol
+        )
+        client_protocol.makeConnection(client_to_server_transport)
+
+        server_to_client_transport = FakeTransport(
+            client_protocol, self.reactor, channel
+        )
+        channel.makeConnection(server_to_client_transport)
+
+        # Note: at this point we've wired everything up, but we need to return
+        # before the data starts flowing over the connections as this is called
+        # inside `connecTCP` before the connection has been passed back to the
+        # code that requested the TCP connection.
+
+
 class TestReplicationDataHandler(GenericWorkerReplicationHandler):
     """Drop-in for ReplicationDataHandler which just collects RDATA rows"""
 
@@ -241,6 +395,14 @@ class _PushHTTPChannel(HTTPChannel):
             # We need to manually stop the _PullToPushProducer.
             self._pull_to_push_producer.stop()
 
+    def checkPersistence(self, request, version):
+        """Check whether the connection can be re-used
+        """
+        # We hijack this to always say no for ease of wiring stuff up in
+        # `handle_http_replication_attempt`.
+        request.responseHeaders.setRawHeaders(b"connection", [b"close"])
+        return False
+
 
 class _PullToPushProducer:
     """A push producer that wraps a pull producer.
diff --git a/tests/replication/test_client_reader_shard.py b/tests/replication/test_client_reader_shard.py
index b7d753e0a3..86c03fd89c 100644
--- a/tests/replication/test_client_reader_shard.py
+++ b/tests/replication/test_client_reader_shard.py
@@ -15,63 +15,26 @@
 import logging
 
 from synapse.api.constants import LoginType
-from synapse.app.generic_worker import GenericWorkerServer
-from synapse.http.server import JsonResource
 from synapse.http.site import SynapseRequest
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest.client.v2_alpha import register
 
-from tests import unittest
+from tests.replication._base import BaseMultiWorkerStreamTestCase
 from tests.rest.client.v2_alpha.test_auth import DummyRecaptchaChecker
-from tests.server import FakeChannel, render
+from tests.server import FakeChannel
 
 logger = logging.getLogger(__name__)
 
 
-class ClientReaderTestCase(unittest.HomeserverTestCase):
+class ClientReaderTestCase(BaseMultiWorkerStreamTestCase):
     """Base class for tests of the replication streams"""
 
-    servlets = [
-        register.register_servlets,
-    ]
+    servlets = [register.register_servlets]
 
     def prepare(self, reactor, clock, hs):
-        # build a replication server
-        self.server_factory = ReplicationStreamProtocolFactory(hs)
-        self.streamer = hs.get_replication_streamer()
-
-        store = hs.get_datastore()
-        self.database = store.db
-
         self.recaptcha_checker = DummyRecaptchaChecker(hs)
         auth_handler = hs.get_auth_handler()
         auth_handler.checkers[LoginType.RECAPTCHA] = self.recaptcha_checker
 
-        self.reactor.lookups["testserv"] = "1.2.3.4"
-
-    def make_worker_hs(self, extra_config={}):
-        config = self._get_worker_hs_config()
-        config.update(extra_config)
-
-        worker_hs = self.setup_test_homeserver(
-            homeserverToUse=GenericWorkerServer, config=config, reactor=self.reactor,
-        )
-
-        store = worker_hs.get_datastore()
-        store.db._db_pool = self.database._db_pool
-
-        # Register the expected servlets, essentially this is HomeserverTestCase.create_test_json_resource.
-        resource = JsonResource(self.hs)
-
-        for servlet in self.servlets:
-            servlet(worker_hs, resource)
-
-        # Essentially HomeserverTestCase.render.
-        def _render(request):
-            render(request, self.resource, self.reactor)
-
-        return worker_hs, _render
-
     def _get_worker_hs_config(self) -> dict:
         config = self.default_config()
         config["worker_app"] = "synapse.app.client_reader"
@@ -82,14 +45,14 @@ class ClientReaderTestCase(unittest.HomeserverTestCase):
     def test_register_single_worker(self):
         """Test that registration works when using a single client reader worker.
         """
-        _, worker_render = self.make_worker_hs()
+        worker_hs = self.make_worker_hs("synapse.app.client_reader")
 
         request_1, channel_1 = self.make_request(
             "POST",
             "register",
             {"username": "user", "type": "m.login.password", "password": "bar"},
         )  # type: SynapseRequest, FakeChannel
-        worker_render(request_1)
+        self.render_on_worker(worker_hs, request_1)
         self.assertEqual(request_1.code, 401)
 
         # Grab the session
@@ -99,7 +62,7 @@ class ClientReaderTestCase(unittest.HomeserverTestCase):
         request_2, channel_2 = self.make_request(
             "POST", "register", {"auth": {"session": session, "type": "m.login.dummy"}}
         )  # type: SynapseRequest, FakeChannel
-        worker_render(request_2)
+        self.render_on_worker(worker_hs, request_2)
         self.assertEqual(request_2.code, 200)
 
         # We're given a registered user.
@@ -108,15 +71,15 @@ class ClientReaderTestCase(unittest.HomeserverTestCase):
     def test_register_multi_worker(self):
         """Test that registration works when using multiple client reader workers.
         """
-        _, worker_render_1 = self.make_worker_hs()
-        _, worker_render_2 = self.make_worker_hs()
+        worker_hs_1 = self.make_worker_hs("synapse.app.client_reader")
+        worker_hs_2 = self.make_worker_hs("synapse.app.client_reader")
 
         request_1, channel_1 = self.make_request(
             "POST",
             "register",
             {"username": "user", "type": "m.login.password", "password": "bar"},
         )  # type: SynapseRequest, FakeChannel
-        worker_render_1(request_1)
+        self.render_on_worker(worker_hs_1, request_1)
         self.assertEqual(request_1.code, 401)
 
         # Grab the session
@@ -126,7 +89,7 @@ class ClientReaderTestCase(unittest.HomeserverTestCase):
         request_2, channel_2 = self.make_request(
             "POST", "register", {"auth": {"session": session, "type": "m.login.dummy"}}
         )  # type: SynapseRequest, FakeChannel
-        worker_render_2(request_2)
+        self.render_on_worker(worker_hs_2, request_2)
         self.assertEqual(request_2.code, 200)
 
         # We're given a registered user.
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 519a2dc510..8d4dbf232e 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -19,132 +19,40 @@ from mock import Mock
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.app.generic_worker import GenericWorkerServer
 from synapse.events.builder import EventBuilderFactory
-from synapse.replication.http import streams
-from synapse.replication.tcp.handler import ReplicationCommandHandler
-from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest.admin import register_servlets_for_client_rest_resource
 from synapse.rest.client.v1 import login, room
 from synapse.types import UserID
 
-from tests import unittest
-from tests.server import FakeTransport
+from tests.replication._base import BaseMultiWorkerStreamTestCase
 
 logger = logging.getLogger(__name__)
 
 
-class BaseStreamTestCase(unittest.HomeserverTestCase):
-    """Base class for tests of the replication streams"""
-
+class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
     servlets = [
-        streams.register_servlets,
+        login.register_servlets,
+        register_servlets_for_client_rest_resource,
+        room.register_servlets,
     ]
 
-    def prepare(self, reactor, clock, hs):
-        # build a replication server
-        self.server_factory = ReplicationStreamProtocolFactory(hs)
-        self.streamer = hs.get_replication_streamer()
-
-        store = hs.get_datastore()
-        self.database = store.db
-
-        self.reactor.lookups["testserv"] = "1.2.3.4"
-
     def default_config(self):
         conf = super().default_config()
         conf["send_federation"] = False
         return conf
 
-    def make_worker_hs(self, extra_config={}):
-        config = self._get_worker_hs_config()
-        config.update(extra_config)
-
-        mock_federation_client = Mock(spec=["put_json"])
-        mock_federation_client.put_json.side_effect = lambda *_, **__: defer.succeed({})
-
-        worker_hs = self.setup_test_homeserver(
-            http_client=mock_federation_client,
-            homeserverToUse=GenericWorkerServer,
-            config=config,
-            reactor=self.reactor,
-        )
-
-        store = worker_hs.get_datastore()
-        store.db._db_pool = self.database._db_pool
-
-        repl_handler = ReplicationCommandHandler(worker_hs)
-        client = ClientReplicationStreamProtocol(
-            worker_hs, "client", "test", self.clock, repl_handler,
-        )
-        server = self.server_factory.buildProtocol(None)
-
-        client_transport = FakeTransport(server, self.reactor)
-        client.makeConnection(client_transport)
-
-        server_transport = FakeTransport(client, self.reactor)
-        server.makeConnection(server_transport)
-
-        return worker_hs
-
-    def _get_worker_hs_config(self) -> dict:
-        config = self.default_config()
-        config["worker_app"] = "synapse.app.federation_sender"
-        config["worker_replication_host"] = "testserv"
-        config["worker_replication_http_port"] = "8765"
-        return config
-
-    def replicate(self):
-        """Tell the master side of replication that something has happened, and then
-        wait for the replication to occur.
-        """
-        self.streamer.on_notifier_poke()
-        self.pump()
-
-    def create_room_with_remote_server(self, user, token, remote_server="other_server"):
-        room = self.helper.create_room_as(user, tok=token)
-        store = self.hs.get_datastore()
-        federation = self.hs.get_handlers().federation_handler
-
-        prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
-        room_version = self.get_success(store.get_room_version(room))
-
-        factory = EventBuilderFactory(self.hs)
-        factory.hostname = remote_server
-
-        user_id = UserID("user", remote_server).to_string()
-
-        event_dict = {
-            "type": EventTypes.Member,
-            "state_key": user_id,
-            "content": {"membership": Membership.JOIN},
-            "sender": user_id,
-            "room_id": room,
-        }
-
-        builder = factory.for_room_version(room_version, event_dict)
-        join_event = self.get_success(builder.build(prev_event_ids))
-
-        self.get_success(federation.on_send_join_request(remote_server, join_event))
-        self.replicate()
-
-        return room
-
-
-class FederationSenderTestCase(BaseStreamTestCase):
-    servlets = [
-        login.register_servlets,
-        register_servlets_for_client_rest_resource,
-        room.register_servlets,
-    ]
-
     def test_send_event_single_sender(self):
         """Test that using a single federation sender worker correctly sends a
         new event.
         """
-        worker_hs = self.make_worker_hs({"send_federation": True})
-        mock_client = worker_hs.get_http_client()
+        mock_client = Mock(spec=["put_json"])
+        mock_client.put_json.side_effect = lambda *_, **__: defer.succeed({})
+
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
+            {"send_federation": True},
+            http_client=mock_client,
+        )
 
         user = self.register_user("user", "pass")
         token = self.login("user", "pass")
@@ -165,23 +73,29 @@ class FederationSenderTestCase(BaseStreamTestCase):
         """Test that using two federation sender workers correctly sends
         new events.
         """
-        worker1 = self.make_worker_hs(
+        mock_client1 = Mock(spec=["put_json"])
+        mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender1",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client1,
         )
-        mock_client1 = worker1.get_http_client()
 
-        worker2 = self.make_worker_hs(
+        mock_client2 = Mock(spec=["put_json"])
+        mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender2",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client2,
         )
-        mock_client2 = worker2.get_http_client()
 
         user = self.register_user("user2", "pass")
         token = self.login("user2", "pass")
@@ -191,8 +105,8 @@ class FederationSenderTestCase(BaseStreamTestCase):
         for i in range(20):
             server_name = "other_server_%d" % (i,)
             room = self.create_room_with_remote_server(user, token, server_name)
-            mock_client1.reset_mock()
-            mock_client2.reset_mock()
+            mock_client1.reset_mock()  # type: ignore[attr-defined]
+            mock_client2.reset_mock()  # type: ignore[attr-defined]
 
             self.create_and_send_event(room, UserID.from_string(user))
             self.replicate()
@@ -222,23 +136,29 @@ class FederationSenderTestCase(BaseStreamTestCase):
         """Test that using two federation sender workers correctly sends
         new typing EDUs.
         """
-        worker1 = self.make_worker_hs(
+        mock_client1 = Mock(spec=["put_json"])
+        mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender1",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client1,
         )
-        mock_client1 = worker1.get_http_client()
 
-        worker2 = self.make_worker_hs(
+        mock_client2 = Mock(spec=["put_json"])
+        mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender2",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client2,
         )
-        mock_client2 = worker2.get_http_client()
 
         user = self.register_user("user3", "pass")
         token = self.login("user3", "pass")
@@ -250,8 +170,8 @@ class FederationSenderTestCase(BaseStreamTestCase):
         for i in range(20):
             server_name = "other_server_%d" % (i,)
             room = self.create_room_with_remote_server(user, token, server_name)
-            mock_client1.reset_mock()
-            mock_client2.reset_mock()
+            mock_client1.reset_mock()  # type: ignore[attr-defined]
+            mock_client2.reset_mock()  # type: ignore[attr-defined]
 
             self.get_success(
                 typing_handler.started_typing(
@@ -284,3 +204,32 @@ class FederationSenderTestCase(BaseStreamTestCase):
 
         self.assertTrue(sent_on_1)
         self.assertTrue(sent_on_2)
+
+    def create_room_with_remote_server(self, user, token, remote_server="other_server"):
+        room = self.helper.create_room_as(user, tok=token)
+        store = self.hs.get_datastore()
+        federation = self.hs.get_handlers().federation_handler
+
+        prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
+        room_version = self.get_success(store.get_room_version(room))
+
+        factory = EventBuilderFactory(self.hs)
+        factory.hostname = remote_server
+
+        user_id = UserID("user", remote_server).to_string()
+
+        event_dict = {
+            "type": EventTypes.Member,
+            "state_key": user_id,
+            "content": {"membership": Membership.JOIN},
+            "sender": user_id,
+            "room_id": room,
+        }
+
+        builder = factory.for_room_version(room_version, event_dict)
+        join_event = self.get_success(builder.build(prev_event_ids))
+
+        self.get_success(federation.on_send_join_request(remote_server, join_event))
+        self.replicate()
+
+        return room
diff --git a/tests/rest/admin/test_user.py b/tests/rest/admin/test_user.py
index cca5f548e6..f16eef15f7 100644
--- a/tests/rest/admin/test_user.py
+++ b/tests/rest/admin/test_user.py
@@ -857,6 +857,53 @@ class UserRestTestCase(unittest.HomeserverTestCase):
         self.assertEqual("@user:test", channel.json_body["name"])
         self.assertEqual(True, channel.json_body["deactivated"])
 
+    def test_reactivate_user(self):
+        """
+        Test reactivating another user.
+        """
+
+        # Deactivate the user.
+        request, channel = self.make_request(
+            "PUT",
+            self.url_other_user,
+            access_token=self.admin_user_tok,
+            content=json.dumps({"deactivated": True}).encode(encoding="utf_8"),
+        )
+        self.render(request)
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Attempt to reactivate the user (without a password).
+        request, channel = self.make_request(
+            "PUT",
+            self.url_other_user,
+            access_token=self.admin_user_tok,
+            content=json.dumps({"deactivated": False}).encode(encoding="utf_8"),
+        )
+        self.render(request)
+        self.assertEqual(400, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Reactivate the user.
+        request, channel = self.make_request(
+            "PUT",
+            self.url_other_user,
+            access_token=self.admin_user_tok,
+            content=json.dumps({"deactivated": False, "password": "foo"}).encode(
+                encoding="utf_8"
+            ),
+        )
+        self.render(request)
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+
+        # Get user
+        request, channel = self.make_request(
+            "GET", self.url_other_user, access_token=self.admin_user_tok,
+        )
+        self.render(request)
+
+        self.assertEqual(200, int(channel.result["code"]), msg=channel.result["body"])
+        self.assertEqual("@user:test", channel.json_body["name"])
+        self.assertEqual(False, channel.json_body["deactivated"])
+
     def test_set_user_as_admin(self):
         """
         Test setting the admin flag on a user.
diff --git a/tests/server.py b/tests/server.py
index a5e57c52fa..b6e0b14e78 100644
--- a/tests/server.py
+++ b/tests/server.py
@@ -237,6 +237,7 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
     def __init__(self):
         self.threadpool = ThreadPool(self)
 
+        self._tcp_callbacks = {}
         self._udp = []
         lookups = self.lookups = {}
 
@@ -268,6 +269,29 @@ class ThreadedMemoryReactorClock(MemoryReactorClock):
     def getThreadPool(self):
         return self.threadpool
 
+    def add_tcp_client_callback(self, host, port, callback):
+        """Add a callback that will be invoked when we receive a connection
+        attempt to the given IP/port using `connectTCP`.
+
+        Note that the callback gets run before we return the connection to the
+        client, which means callbacks cannot block while waiting for writes.
+        """
+        self._tcp_callbacks[(host, port)] = callback
+
+    def connectTCP(self, host, port, factory, timeout=30, bindAddress=None):
+        """Fake L{IReactorTCP.connectTCP}.
+        """
+
+        conn = super().connectTCP(
+            host, port, factory, timeout=timeout, bindAddress=None
+        )
+
+        callback = self._tcp_callbacks.get((host, port))
+        if callback:
+            callback()
+
+        return conn
+
 
 class ThreadPool:
     """
@@ -486,7 +510,7 @@ class FakeTransport(object):
         try:
             self.other.dataReceived(to_write)
         except Exception as e:
-            logger.warning("Exception writing to protocol: %s", e)
+            logger.exception("Exception writing to protocol: %s", e)
             return
 
         self.buffer = self.buffer[len(to_write) :]