summary refs log tree commit diff
path: root/tests/replication/test_federation_sender_shard.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/replication/test_federation_sender_shard.py')
-rw-r--r--tests/replication/test_federation_sender_shard.py191
1 files changed, 70 insertions, 121 deletions
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 519a2dc510..8d4dbf232e 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -19,132 +19,40 @@ from mock import Mock
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.app.generic_worker import GenericWorkerServer
 from synapse.events.builder import EventBuilderFactory
-from synapse.replication.http import streams
-from synapse.replication.tcp.handler import ReplicationCommandHandler
-from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
 from synapse.rest.admin import register_servlets_for_client_rest_resource
 from synapse.rest.client.v1 import login, room
 from synapse.types import UserID
 
-from tests import unittest
-from tests.server import FakeTransport
+from tests.replication._base import BaseMultiWorkerStreamTestCase
 
 logger = logging.getLogger(__name__)
 
 
-class BaseStreamTestCase(unittest.HomeserverTestCase):
-    """Base class for tests of the replication streams"""
-
+class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
     servlets = [
-        streams.register_servlets,
+        login.register_servlets,
+        register_servlets_for_client_rest_resource,
+        room.register_servlets,
     ]
 
-    def prepare(self, reactor, clock, hs):
-        # build a replication server
-        self.server_factory = ReplicationStreamProtocolFactory(hs)
-        self.streamer = hs.get_replication_streamer()
-
-        store = hs.get_datastore()
-        self.database = store.db
-
-        self.reactor.lookups["testserv"] = "1.2.3.4"
-
     def default_config(self):
         conf = super().default_config()
         conf["send_federation"] = False
         return conf
 
-    def make_worker_hs(self, extra_config={}):
-        config = self._get_worker_hs_config()
-        config.update(extra_config)
-
-        mock_federation_client = Mock(spec=["put_json"])
-        mock_federation_client.put_json.side_effect = lambda *_, **__: defer.succeed({})
-
-        worker_hs = self.setup_test_homeserver(
-            http_client=mock_federation_client,
-            homeserverToUse=GenericWorkerServer,
-            config=config,
-            reactor=self.reactor,
-        )
-
-        store = worker_hs.get_datastore()
-        store.db._db_pool = self.database._db_pool
-
-        repl_handler = ReplicationCommandHandler(worker_hs)
-        client = ClientReplicationStreamProtocol(
-            worker_hs, "client", "test", self.clock, repl_handler,
-        )
-        server = self.server_factory.buildProtocol(None)
-
-        client_transport = FakeTransport(server, self.reactor)
-        client.makeConnection(client_transport)
-
-        server_transport = FakeTransport(client, self.reactor)
-        server.makeConnection(server_transport)
-
-        return worker_hs
-
-    def _get_worker_hs_config(self) -> dict:
-        config = self.default_config()
-        config["worker_app"] = "synapse.app.federation_sender"
-        config["worker_replication_host"] = "testserv"
-        config["worker_replication_http_port"] = "8765"
-        return config
-
-    def replicate(self):
-        """Tell the master side of replication that something has happened, and then
-        wait for the replication to occur.
-        """
-        self.streamer.on_notifier_poke()
-        self.pump()
-
-    def create_room_with_remote_server(self, user, token, remote_server="other_server"):
-        room = self.helper.create_room_as(user, tok=token)
-        store = self.hs.get_datastore()
-        federation = self.hs.get_handlers().federation_handler
-
-        prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
-        room_version = self.get_success(store.get_room_version(room))
-
-        factory = EventBuilderFactory(self.hs)
-        factory.hostname = remote_server
-
-        user_id = UserID("user", remote_server).to_string()
-
-        event_dict = {
-            "type": EventTypes.Member,
-            "state_key": user_id,
-            "content": {"membership": Membership.JOIN},
-            "sender": user_id,
-            "room_id": room,
-        }
-
-        builder = factory.for_room_version(room_version, event_dict)
-        join_event = self.get_success(builder.build(prev_event_ids))
-
-        self.get_success(federation.on_send_join_request(remote_server, join_event))
-        self.replicate()
-
-        return room
-
-
-class FederationSenderTestCase(BaseStreamTestCase):
-    servlets = [
-        login.register_servlets,
-        register_servlets_for_client_rest_resource,
-        room.register_servlets,
-    ]
-
     def test_send_event_single_sender(self):
         """Test that using a single federation sender worker correctly sends a
         new event.
         """
-        worker_hs = self.make_worker_hs({"send_federation": True})
-        mock_client = worker_hs.get_http_client()
+        mock_client = Mock(spec=["put_json"])
+        mock_client.put_json.side_effect = lambda *_, **__: defer.succeed({})
+
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
+            {"send_federation": True},
+            http_client=mock_client,
+        )
 
         user = self.register_user("user", "pass")
         token = self.login("user", "pass")
@@ -165,23 +73,29 @@ class FederationSenderTestCase(BaseStreamTestCase):
         """Test that using two federation sender workers correctly sends
         new events.
         """
-        worker1 = self.make_worker_hs(
+        mock_client1 = Mock(spec=["put_json"])
+        mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender1",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client1,
         )
-        mock_client1 = worker1.get_http_client()
 
-        worker2 = self.make_worker_hs(
+        mock_client2 = Mock(spec=["put_json"])
+        mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender2",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client2,
         )
-        mock_client2 = worker2.get_http_client()
 
         user = self.register_user("user2", "pass")
         token = self.login("user2", "pass")
@@ -191,8 +105,8 @@ class FederationSenderTestCase(BaseStreamTestCase):
         for i in range(20):
             server_name = "other_server_%d" % (i,)
             room = self.create_room_with_remote_server(user, token, server_name)
-            mock_client1.reset_mock()
-            mock_client2.reset_mock()
+            mock_client1.reset_mock()  # type: ignore[attr-defined]
+            mock_client2.reset_mock()  # type: ignore[attr-defined]
 
             self.create_and_send_event(room, UserID.from_string(user))
             self.replicate()
@@ -222,23 +136,29 @@ class FederationSenderTestCase(BaseStreamTestCase):
         """Test that using two federation sender workers correctly sends
         new typing EDUs.
         """
-        worker1 = self.make_worker_hs(
+        mock_client1 = Mock(spec=["put_json"])
+        mock_client1.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender1",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client1,
         )
-        mock_client1 = worker1.get_http_client()
 
-        worker2 = self.make_worker_hs(
+        mock_client2 = Mock(spec=["put_json"])
+        mock_client2.put_json.side_effect = lambda *_, **__: defer.succeed({})
+        self.make_worker_hs(
+            "synapse.app.federation_sender",
             {
                 "send_federation": True,
                 "worker_name": "sender2",
                 "federation_sender_instances": ["sender1", "sender2"],
-            }
+            },
+            http_client=mock_client2,
         )
-        mock_client2 = worker2.get_http_client()
 
         user = self.register_user("user3", "pass")
         token = self.login("user3", "pass")
@@ -250,8 +170,8 @@ class FederationSenderTestCase(BaseStreamTestCase):
         for i in range(20):
             server_name = "other_server_%d" % (i,)
             room = self.create_room_with_remote_server(user, token, server_name)
-            mock_client1.reset_mock()
-            mock_client2.reset_mock()
+            mock_client1.reset_mock()  # type: ignore[attr-defined]
+            mock_client2.reset_mock()  # type: ignore[attr-defined]
 
             self.get_success(
                 typing_handler.started_typing(
@@ -284,3 +204,32 @@ class FederationSenderTestCase(BaseStreamTestCase):
 
         self.assertTrue(sent_on_1)
         self.assertTrue(sent_on_2)
+
+    def create_room_with_remote_server(self, user, token, remote_server="other_server"):
+        room = self.helper.create_room_as(user, tok=token)
+        store = self.hs.get_datastore()
+        federation = self.hs.get_handlers().federation_handler
+
+        prev_event_ids = self.get_success(store.get_latest_event_ids_in_room(room))
+        room_version = self.get_success(store.get_room_version(room))
+
+        factory = EventBuilderFactory(self.hs)
+        factory.hostname = remote_server
+
+        user_id = UserID("user", remote_server).to_string()
+
+        event_dict = {
+            "type": EventTypes.Member,
+            "state_key": user_id,
+            "content": {"membership": Membership.JOIN},
+            "sender": user_id,
+            "room_id": room,
+        }
+
+        builder = factory.for_room_version(room_version, event_dict)
+        join_event = self.get_success(builder.build(prev_event_ids))
+
+        self.get_success(federation.on_send_join_request(remote_server, join_event))
+        self.replicate()
+
+        return room