diff --git a/tests/replication/tcp/streams/_base.py b/tests/replication/_base.py
index 7b56d2028d..9d4f0bbe44 100644
--- a/tests/replication/tcp/streams/_base.py
+++ b/tests/replication/_base.py
@@ -27,6 +27,7 @@ from synapse.app.generic_worker import (
GenericWorkerServer,
)
from synapse.http.site import SynapseRequest
+from synapse.replication.http import streams
from synapse.replication.tcp.handler import ReplicationCommandHandler
from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
@@ -42,6 +43,10 @@ logger = logging.getLogger(__name__)
class BaseStreamTestCase(unittest.HomeserverTestCase):
"""Base class for tests of the replication streams"""
+ servlets = [
+ streams.register_servlets,
+ ]
+
def prepare(self, reactor, clock, hs):
# build a replication server
server_factory = ReplicationStreamProtocolFactory(hs)
@@ -49,17 +54,11 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self.server = server_factory.buildProtocol(None)
# Make a new HomeServer object for the worker
- config = self.default_config()
- config["worker_app"] = "synapse.app.generic_worker"
- config["worker_replication_host"] = "testserv"
- config["worker_replication_http_port"] = "8765"
-
self.reactor.lookups["testserv"] = "1.2.3.4"
-
self.worker_hs = self.setup_test_homeserver(
http_client=None,
homeserverToUse=GenericWorkerServer,
- config=config,
+ config=self._get_worker_hs_config(),
reactor=self.reactor,
)
@@ -78,6 +77,13 @@ class BaseStreamTestCase(unittest.HomeserverTestCase):
self._client_transport = None
self._server_transport = None
+ def _get_worker_hs_config(self) -> dict:
+ config = self.default_config()
+ config["worker_app"] = "synapse.app.generic_worker"
+ config["worker_replication_host"] = "testserv"
+ config["worker_replication_http_port"] = "8765"
+ return config
+
def _build_replication_data_handler(self):
return TestReplicationDataHandler(self.worker_hs)
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 1615dfab5e..32cb04645f 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -15,23 +15,13 @@
from mock import Mock, NonCallableMock
-from synapse.replication.tcp.client import (
- DirectTcpReplicationClientFactory,
- ReplicationDataHandler,
-)
-from synapse.replication.tcp.handler import ReplicationCommandHandler
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.storage.database import make_conn
+from tests.replication._base import BaseStreamTestCase
-from tests import unittest
-from tests.server import FakeTransport
-
-class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
+class BaseSlavedStoreTestCase(BaseStreamTestCase):
def make_homeserver(self, reactor, clock):
hs = self.setup_test_homeserver(
- "blue",
federation_client=Mock(),
ratelimiter=NonCallableMock(spec_set=["can_do_action"]),
)
@@ -41,39 +31,13 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
return hs
def prepare(self, reactor, clock, hs):
+ super().prepare(reactor, clock, hs)
- db_config = hs.config.database.get_single_database()
- self.master_store = self.hs.get_datastore()
- self.storage = hs.get_storage()
- database = hs.get_datastores().databases[0]
- self.slaved_store = self.STORE_TYPE(
- database, make_conn(db_config, database.engine), self.hs
- )
- self.event_id = 0
-
- server_factory = ReplicationStreamProtocolFactory(self.hs)
- self.streamer = hs.get_replication_streamer()
-
- # We now do some gut wrenching so that we have a client that is based
- # off of the slave store rather than the main store.
- self.replication_handler = ReplicationCommandHandler(self.hs)
- self.replication_handler._instance_name = "worker"
- self.replication_handler._replication_data_handler = ReplicationDataHandler(
- self.slaved_store
- )
+ self.reconnect()
- client_factory = DirectTcpReplicationClientFactory(
- self.hs, "client_name", self.replication_handler
- )
- client_factory.handler = self.replication_handler
-
- server = server_factory.buildProtocol(None)
- client = client_factory.buildProtocol(None)
-
- client.makeConnection(FakeTransport(server, reactor))
-
- self.server_to_client_transport = FakeTransport(client, reactor)
- server.makeConnection(self.server_to_client_transport)
+ self.master_store = hs.get_datastore()
+ self.slaved_store = self.worker_hs.get_datastore()
+ self.storage = hs.get_storage()
def replicate(self):
"""Tell the master side of replication that something has happened, and then
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index f0561b30e3..0fee8a71c4 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -24,10 +24,10 @@ from synapse.storage.roommember import RoomsForUser
from ._base import BaseSlavedStoreTestCase
-USER_ID = "@feeling:blue"
-USER_ID_2 = "@bright:blue"
+USER_ID = "@feeling:test"
+USER_ID_2 = "@bright:test"
OUTLIER = {"outlier": True}
-ROOM_ID = "!room:blue"
+ROOM_ID = "!room:test"
logger = logging.getLogger(__name__)
@@ -239,7 +239,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
# limit the replication rate
- repl_transport = self.server_to_client_transport
+ repl_transport = self._server_transport
repl_transport.autoflush = False
# build the join and message events and persist them in the same batch.
diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
index 8bd67bb9f1..51bf0ef4e9 100644
--- a/tests/replication/tcp/streams/test_events.py
+++ b/tests/replication/tcp/streams/test_events.py
@@ -26,7 +26,7 @@ from synapse.replication.tcp.streams.events import (
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
-from tests.replication.tcp.streams._base import BaseStreamTestCase
+from tests.replication._base import BaseStreamTestCase
from tests.test_utils.event_injection import inject_event, inject_member_event
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
new file mode 100644
index 0000000000..2babea4e3e
--- /dev/null
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.federation.send_queue import EduRow
+from synapse.replication.tcp.streams.federation import FederationStream
+
+from tests.replication._base import BaseStreamTestCase
+
+
+class FederationStreamTestCase(BaseStreamTestCase):
+ def _get_worker_hs_config(self) -> dict:
+ # enable federation sending on the worker
+ config = super()._get_worker_hs_config()
+ # TODO: make it so we don't need both of these
+ config["send_federation"] = True
+ config["worker_app"] = "synapse.app.federation_sender"
+ return config
+
+ def test_catchup(self):
+ """Basic test of catchup on reconnect
+
+ Makes sure that updates sent while we are offline are received later.
+ """
+ fed_sender = self.hs.get_federation_sender()
+ received_rows = self.test_handler.received_rdata_rows
+
+ fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
+
+ self.reconnect()
+ self.reactor.advance(0)
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual(received_rows, [])
+
+ # We should now see an attempt to connect to the master
+ request = self.handle_http_replication_attempt()
+ self.assert_request_is_get_repl_stream_updates(request, "federation")
+
+ # we should have received an update row
+ stream_name, token, row = received_rows.pop()
+ self.assertEqual(stream_name, "federation")
+ self.assertIsInstance(row, FederationStream.FederationStreamRow)
+ self.assertEqual(row.type, EduRow.TypeId)
+ edurow = EduRow.from_data(row.data)
+ self.assertEqual(edurow.edu.edu_type, "m.test_edu")
+ self.assertEqual(edurow.edu.origin, self.hs.hostname)
+ self.assertEqual(edurow.edu.destination, "testdest")
+ self.assertEqual(edurow.edu.content, {"a": "b"})
+
+ self.assertEqual(received_rows, [])
+
+ # additional updates should be transferred without an HTTP hit
+ fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
+ self.reactor.advance(0)
+ # there should be no http hit
+ self.assertEqual(len(self.reactor.tcpClients), 0)
+ # ... but we should have a row
+ self.assertEqual(len(received_rows), 1)
+
+ stream_name, token, row = received_rows.pop()
+ self.assertEqual(stream_name, "federation")
+ self.assertIsInstance(row, FederationStream.FederationStreamRow)
+ self.assertEqual(row.type, EduRow.TypeId)
+ edurow = EduRow.from_data(row.data)
+ self.assertEqual(edurow.edu.edu_type, "m.test1")
+ self.assertEqual(edurow.edu.origin, self.hs.hostname)
+ self.assertEqual(edurow.edu.destination, "testdest")
+ self.assertEqual(edurow.edu.content, {"c": "d"})
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index 5853314fd4..56b062ecc1 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -19,7 +19,7 @@ from mock import Mock
from synapse.replication.tcp.streams._base import ReceiptsStream
-from tests.replication.tcp.streams._base import BaseStreamTestCase
+from tests.replication._base import BaseStreamTestCase
USER_ID = "@feeling:blue"
diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
index d25a7b194e..fd62b26356 100644
--- a/tests/replication/tcp/streams/test_typing.py
+++ b/tests/replication/tcp/streams/test_typing.py
@@ -15,19 +15,14 @@
from mock import Mock
from synapse.handlers.typing import RoomMember
-from synapse.replication.http import streams
from synapse.replication.tcp.streams import TypingStream
-from tests.replication.tcp.streams._base import BaseStreamTestCase
+from tests.replication._base import BaseStreamTestCase
USER_ID = "@feeling:blue"
class TypingStreamTestCase(BaseStreamTestCase):
- servlets = [
- streams.register_servlets,
- ]
-
def _build_replication_data_handler(self):
return Mock(wraps=super()._build_replication_data_handler())
|