summary refs log tree commit diff
path: root/tests/replication
diff options
context:
space:
mode:
Diffstat (limited to 'tests/replication')
-rw-r--r--tests/replication/_base.py (renamed from tests/replication/tcp/streams/_base.py)20
-rw-r--r--tests/replication/slave/storage/_base.py50
-rw-r--r--tests/replication/slave/storage/test_events.py24
-rw-r--r--tests/replication/tcp/streams/test_account_data.py117
-rw-r--r--tests/replication/tcp/streams/test_events.py2
-rw-r--r--tests/replication/tcp/streams/test_federation.py81
-rw-r--r--tests/replication/tcp/streams/test_receipts.py2
-rw-r--r--tests/replication/tcp/streams/test_typing.py7
-rw-r--r--tests/replication/tcp/test_commands.py4
9 files changed, 232 insertions, 75 deletions
diff --git a/tests/replication/tcp/streams/_base.py b/tests/replication/_base.py

index 7b56d2028d..9d4f0bbe44 100644 --- a/tests/replication/tcp/streams/_base.py +++ b/tests/replication/_base.py
@@ -27,6 +27,7 @@ from synapse.app.generic_worker import ( GenericWorkerServer, ) from synapse.http.site import SynapseRequest +from synapse.replication.http import streams from synapse.replication.tcp.handler import ReplicationCommandHandler from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory @@ -42,6 +43,10 @@ logger = logging.getLogger(__name__) class BaseStreamTestCase(unittest.HomeserverTestCase): """Base class for tests of the replication streams""" + servlets = [ + streams.register_servlets, + ] + def prepare(self, reactor, clock, hs): # build a replication server server_factory = ReplicationStreamProtocolFactory(hs) @@ -49,17 +54,11 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): self.server = server_factory.buildProtocol(None) # Make a new HomeServer object for the worker - config = self.default_config() - config["worker_app"] = "synapse.app.generic_worker" - config["worker_replication_host"] = "testserv" - config["worker_replication_http_port"] = "8765" - self.reactor.lookups["testserv"] = "1.2.3.4" - self.worker_hs = self.setup_test_homeserver( http_client=None, homeserverToUse=GenericWorkerServer, - config=config, + config=self._get_worker_hs_config(), reactor=self.reactor, ) @@ -78,6 +77,13 @@ class BaseStreamTestCase(unittest.HomeserverTestCase): self._client_transport = None self._server_transport = None + def _get_worker_hs_config(self) -> dict: + config = self.default_config() + config["worker_app"] = "synapse.app.generic_worker" + config["worker_replication_host"] = "testserv" + config["worker_replication_http_port"] = "8765" + return config + def _build_replication_data_handler(self): return TestReplicationDataHandler(self.worker_hs) diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 1615dfab5e..32cb04645f 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py
@@ -15,23 +15,13 @@ from mock import Mock, NonCallableMock -from synapse.replication.tcp.client import ( - DirectTcpReplicationClientFactory, - ReplicationDataHandler, -) -from synapse.replication.tcp.handler import ReplicationCommandHandler -from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory -from synapse.storage.database import make_conn +from tests.replication._base import BaseStreamTestCase -from tests import unittest -from tests.server import FakeTransport - -class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): +class BaseSlavedStoreTestCase(BaseStreamTestCase): def make_homeserver(self, reactor, clock): hs = self.setup_test_homeserver( - "blue", federation_client=Mock(), ratelimiter=NonCallableMock(spec_set=["can_do_action"]), ) @@ -41,39 +31,13 @@ class BaseSlavedStoreTestCase(unittest.HomeserverTestCase): return hs def prepare(self, reactor, clock, hs): + super().prepare(reactor, clock, hs) - db_config = hs.config.database.get_single_database() - self.master_store = self.hs.get_datastore() - self.storage = hs.get_storage() - database = hs.get_datastores().databases[0] - self.slaved_store = self.STORE_TYPE( - database, make_conn(db_config, database.engine), self.hs - ) - self.event_id = 0 - - server_factory = ReplicationStreamProtocolFactory(self.hs) - self.streamer = hs.get_replication_streamer() - - # We now do some gut wrenching so that we have a client that is based - # off of the slave store rather than the main store. - self.replication_handler = ReplicationCommandHandler(self.hs) - self.replication_handler._instance_name = "worker" - self.replication_handler._replication_data_handler = ReplicationDataHandler( - self.slaved_store - ) + self.reconnect() - client_factory = DirectTcpReplicationClientFactory( - self.hs, "client_name", self.replication_handler - ) - client_factory.handler = self.replication_handler - - server = server_factory.buildProtocol(None) - client = client_factory.buildProtocol(None) - - client.makeConnection(FakeTransport(server, reactor)) - - self.server_to_client_transport = FakeTransport(client, reactor) - server.makeConnection(self.server_to_client_transport) + self.master_store = hs.get_datastore() + self.slaved_store = self.worker_hs.get_datastore() + self.storage = hs.get_storage() def replicate(self): """Tell the master side of replication that something has happened, and then diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index f0561b30e3..1a88c7fb80 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py
@@ -17,17 +17,18 @@ from canonicaljson import encode_canonical_json from synapse.api.room_versions import RoomVersions from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict -from synapse.events.snapshot import EventContext from synapse.handlers.room import RoomEventSource from synapse.replication.slave.storage.events import SlavedEventStore from synapse.storage.roommember import RoomsForUser +from tests.server import FakeTransport + from ._base import BaseSlavedStoreTestCase -USER_ID = "@feeling:blue" -USER_ID_2 = "@bright:blue" +USER_ID = "@feeling:test" +USER_ID_2 = "@bright:test" OUTLIER = {"outlier": True} -ROOM_ID = "!room:blue" +ROOM_ID = "!room:test" logger = logging.getLogger(__name__) @@ -239,7 +240,8 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set()) # limit the replication rate - repl_transport = self.server_to_client_transport + repl_transport = self._server_transport + assert isinstance(repl_transport, FakeTransport) repl_transport.autoflush = False # build the join and message events and persist them in the same batch. @@ -322,7 +324,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): type="m.room.message", key=None, internal={}, - state=None, depth=None, prev_events=[], auth_events=[], @@ -362,15 +363,8 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): event = make_event_from_dict(event_dict, internal_metadata_dict=internal) self.event_id += 1 - - if state is not None: - state_ids = {key: e.event_id for key, e in state.items()} - context = EventContext.with_state( - state_group=None, current_state_ids=state_ids, prev_state_ids=state_ids - ) - else: - state_handler = self.hs.get_state_handler() - context = self.get_success(state_handler.compute_event_context(event)) + state_handler = self.hs.get_state_handler() + context = self.get_success(state_handler.compute_event_context(event)) self.master_store.add_push_actions_to_staging( event.event_id, {user_id: actions for user_id, actions in push_actions} diff --git a/tests/replication/tcp/streams/test_account_data.py b/tests/replication/tcp/streams/test_account_data.py new file mode 100644
index 0000000000..6a5116dd2a --- /dev/null +++ b/tests/replication/tcp/streams/test_account_data.py
@@ -0,0 +1,117 @@ +# -*- coding: utf-8 -*- +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.replication.tcp.streams._base import ( + _STREAM_UPDATE_TARGET_ROW_COUNT, + AccountDataStream, +) + +from tests.replication._base import BaseStreamTestCase + + +class AccountDataStreamTestCase(BaseStreamTestCase): + def test_update_function_room_account_data_limit(self): + """Test replication with many room account data updates + """ + store = self.hs.get_datastore() + + # generate lots of account data updates + updates = [] + for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5): + update = "m.test_type.%i" % (i,) + self.get_success( + store.add_account_data_to_room("test_user", "test_room", update, {}) + ) + updates.append(update) + + # also one global update + self.get_success(store.add_account_data_for_user("test_user", "m.global", {})) + + # tell the notifier to catch up to avoid duplicate rows. + # workaround for https://github.com/matrix-org/synapse/issues/7360 + # FIXME remove this when the above is fixed + self.replicate() + + # check we're testing what we think we are: no rows should yet have been + # received + self.assertEqual([], self.test_handler.received_rdata_rows) + + # now reconnect to pull the updates + self.reconnect() + self.replicate() + + # we should have received all the expected rows in the right order + received_rows = self.test_handler.received_rdata_rows + + for t in updates: + (stream_name, token, row) = received_rows.pop(0) + self.assertEqual(stream_name, AccountDataStream.NAME) + self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow) + self.assertEqual(row.data_type, t) + self.assertEqual(row.room_id, "test_room") + + (stream_name, token, row) = received_rows.pop(0) + self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow) + self.assertEqual(row.data_type, "m.global") + self.assertIsNone(row.room_id) + + self.assertEqual([], received_rows) + + def test_update_function_global_account_data_limit(self): + """Test replication with many global account data updates + """ + store = self.hs.get_datastore() + + # generate lots of account data updates + updates = [] + for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5): + update = "m.test_type.%i" % (i,) + self.get_success(store.add_account_data_for_user("test_user", update, {})) + updates.append(update) + + # also one per-room update + self.get_success( + store.add_account_data_to_room("test_user", "test_room", "m.per_room", {}) + ) + + # tell the notifier to catch up to avoid duplicate rows. + # workaround for https://github.com/matrix-org/synapse/issues/7360 + # FIXME remove this when the above is fixed + self.replicate() + + # check we're testing what we think we are: no rows should yet have been + # received + self.assertEqual([], self.test_handler.received_rdata_rows) + + # now reconnect to pull the updates + self.reconnect() + self.replicate() + + # we should have received all the expected rows in the right order + received_rows = self.test_handler.received_rdata_rows + + for t in updates: + (stream_name, token, row) = received_rows.pop(0) + self.assertEqual(stream_name, AccountDataStream.NAME) + self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow) + self.assertEqual(row.data_type, t) + self.assertIsNone(row.room_id) + + (stream_name, token, row) = received_rows.pop(0) + self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow) + self.assertEqual(row.data_type, "m.per_room") + self.assertEqual(row.room_id, "test_room") + + self.assertEqual([], received_rows) diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
index 8bd67bb9f1..51bf0ef4e9 100644 --- a/tests/replication/tcp/streams/test_events.py +++ b/tests/replication/tcp/streams/test_events.py
@@ -26,7 +26,7 @@ from synapse.replication.tcp.streams.events import ( from synapse.rest import admin from synapse.rest.client.v1 import login, room -from tests.replication.tcp.streams._base import BaseStreamTestCase +from tests.replication._base import BaseStreamTestCase from tests.test_utils.event_injection import inject_event, inject_member_event diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py new file mode 100644
index 0000000000..2babea4e3e --- /dev/null +++ b/tests/replication/tcp/streams/test_federation.py
@@ -0,0 +1,81 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.federation.send_queue import EduRow +from synapse.replication.tcp.streams.federation import FederationStream + +from tests.replication._base import BaseStreamTestCase + + +class FederationStreamTestCase(BaseStreamTestCase): + def _get_worker_hs_config(self) -> dict: + # enable federation sending on the worker + config = super()._get_worker_hs_config() + # TODO: make it so we don't need both of these + config["send_federation"] = True + config["worker_app"] = "synapse.app.federation_sender" + return config + + def test_catchup(self): + """Basic test of catchup on reconnect + + Makes sure that updates sent while we are offline are received later. + """ + fed_sender = self.hs.get_federation_sender() + received_rows = self.test_handler.received_rdata_rows + + fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"}) + + self.reconnect() + self.reactor.advance(0) + + # check we're testing what we think we are: no rows should yet have been + # received + self.assertEqual(received_rows, []) + + # We should now see an attempt to connect to the master + request = self.handle_http_replication_attempt() + self.assert_request_is_get_repl_stream_updates(request, "federation") + + # we should have received an update row + stream_name, token, row = received_rows.pop() + self.assertEqual(stream_name, "federation") + self.assertIsInstance(row, FederationStream.FederationStreamRow) + self.assertEqual(row.type, EduRow.TypeId) + edurow = EduRow.from_data(row.data) + self.assertEqual(edurow.edu.edu_type, "m.test_edu") + self.assertEqual(edurow.edu.origin, self.hs.hostname) + self.assertEqual(edurow.edu.destination, "testdest") + self.assertEqual(edurow.edu.content, {"a": "b"}) + + self.assertEqual(received_rows, []) + + # additional updates should be transferred without an HTTP hit + fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"}) + self.reactor.advance(0) + # there should be no http hit + self.assertEqual(len(self.reactor.tcpClients), 0) + # ... but we should have a row + self.assertEqual(len(received_rows), 1) + + stream_name, token, row = received_rows.pop() + self.assertEqual(stream_name, "federation") + self.assertIsInstance(row, FederationStream.FederationStreamRow) + self.assertEqual(row.type, EduRow.TypeId) + edurow = EduRow.from_data(row.data) + self.assertEqual(edurow.edu.edu_type, "m.test1") + self.assertEqual(edurow.edu.origin, self.hs.hostname) + self.assertEqual(edurow.edu.destination, "testdest") + self.assertEqual(edurow.edu.content, {"c": "d"}) diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index 5853314fd4..56b062ecc1 100644 --- a/tests/replication/tcp/streams/test_receipts.py +++ b/tests/replication/tcp/streams/test_receipts.py
@@ -19,7 +19,7 @@ from mock import Mock from synapse.replication.tcp.streams._base import ReceiptsStream -from tests.replication.tcp.streams._base import BaseStreamTestCase +from tests.replication._base import BaseStreamTestCase USER_ID = "@feeling:blue" diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
index d25a7b194e..fd62b26356 100644 --- a/tests/replication/tcp/streams/test_typing.py +++ b/tests/replication/tcp/streams/test_typing.py
@@ -15,19 +15,14 @@ from mock import Mock from synapse.handlers.typing import RoomMember -from synapse.replication.http import streams from synapse.replication.tcp.streams import TypingStream -from tests.replication.tcp.streams._base import BaseStreamTestCase +from tests.replication._base import BaseStreamTestCase USER_ID = "@feeling:blue" class TypingStreamTestCase(BaseStreamTestCase): - servlets = [ - streams.register_servlets, - ] - def _build_replication_data_handler(self): return Mock(wraps=super()._build_replication_data_handler()) diff --git a/tests/replication/tcp/test_commands.py b/tests/replication/tcp/test_commands.py
index 7ddfd0a733..60c10a441a 100644 --- a/tests/replication/tcp/test_commands.py +++ b/tests/replication/tcp/test_commands.py
@@ -30,7 +30,7 @@ class ParseCommandTestCase(TestCase): def test_parse_rdata(self): line = 'RDATA events master 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]' cmd = parse_command_from_line(line) - self.assertIsInstance(cmd, RdataCommand) + assert isinstance(cmd, RdataCommand) self.assertEqual(cmd.stream_name, "events") self.assertEqual(cmd.instance_name, "master") self.assertEqual(cmd.token, 6287863) @@ -38,7 +38,7 @@ class ParseCommandTestCase(TestCase): def test_parse_rdata_batch(self): line = 'RDATA presence master batch ["@foo:example.com", "online"]' cmd = parse_command_from_line(line) - self.assertIsInstance(cmd, RdataCommand) + assert isinstance(cmd, RdataCommand) self.assertEqual(cmd.stream_name, "presence") self.assertEqual(cmd.instance_name, "master") self.assertIsNone(cmd.token)