diff --git a/tests/replication/_base.py b/tests/replication/_base.py
new file mode 100644
index 0000000000..9d4f0bbe44
--- /dev/null
+++ b/tests/replication/_base.py
@@ -0,0 +1,307 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import Any, List, Optional, Tuple
+
+import attr
+
+from twisted.internet.interfaces import IConsumer, IPullProducer, IReactorTime
+from twisted.internet.task import LoopingCall
+from twisted.web.http import HTTPChannel
+
+from synapse.app.generic_worker import (
+ GenericWorkerReplicationHandler,
+ GenericWorkerServer,
+)
+from synapse.http.site import SynapseRequest
+from synapse.replication.http import streams
+from synapse.replication.tcp.handler import ReplicationCommandHandler
+from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
+from synapse.server import HomeServer
+from synapse.util import Clock
+
+from tests import unittest
+from tests.server import FakeTransport
+
+logger = logging.getLogger(__name__)
+
+
+class BaseStreamTestCase(unittest.HomeserverTestCase):
+ """Base class for tests of the replication streams"""
+
+ servlets = [
+ streams.register_servlets,
+ ]
+
+ def prepare(self, reactor, clock, hs):
+ # build a replication server
+ server_factory = ReplicationStreamProtocolFactory(hs)
+ self.streamer = hs.get_replication_streamer()
+ self.server = server_factory.buildProtocol(None)
+
+ # Make a new HomeServer object for the worker
+ self.reactor.lookups["testserv"] = "1.2.3.4"
+ self.worker_hs = self.setup_test_homeserver(
+ http_client=None,
+ homeserverToUse=GenericWorkerServer,
+ config=self._get_worker_hs_config(),
+ reactor=self.reactor,
+ )
+
+ # Since we use sqlite in memory databases we need to make sure the
+ # databases objects are the same.
+ self.worker_hs.get_datastore().db = hs.get_datastore().db
+
+ self.test_handler = self._build_replication_data_handler()
+ self.worker_hs.replication_data_handler = self.test_handler
+
+ repl_handler = ReplicationCommandHandler(self.worker_hs)
+ self.client = ClientReplicationStreamProtocol(
+ self.worker_hs, "client", "test", clock, repl_handler,
+ )
+
+ self._client_transport = None
+ self._server_transport = None
+
+ def _get_worker_hs_config(self) -> dict:
+ config = self.default_config()
+ config["worker_app"] = "synapse.app.generic_worker"
+ config["worker_replication_host"] = "testserv"
+ config["worker_replication_http_port"] = "8765"
+ return config
+
+ def _build_replication_data_handler(self):
+ return TestReplicationDataHandler(self.worker_hs)
+
+ def reconnect(self):
+ if self._client_transport:
+ self.client.close()
+
+ if self._server_transport:
+ self.server.close()
+
+ self._client_transport = FakeTransport(self.server, self.reactor)
+ self.client.makeConnection(self._client_transport)
+
+ self._server_transport = FakeTransport(self.client, self.reactor)
+ self.server.makeConnection(self._server_transport)
+
+ def disconnect(self):
+ if self._client_transport:
+ self._client_transport = None
+ self.client.close()
+
+ if self._server_transport:
+ self._server_transport = None
+ self.server.close()
+
+ def replicate(self):
+ """Tell the master side of replication that something has happened, and then
+ wait for the replication to occur.
+ """
+ self.streamer.on_notifier_poke()
+ self.pump(0.1)
+
+ def handle_http_replication_attempt(self) -> SynapseRequest:
+ """Asserts that a connection attempt was made to the master HS on the
+ HTTP replication port, then proxies it to the master HS object to be
+ handled.
+
+ Returns:
+ The request object received by master HS.
+ """
+
+ # We should have an outbound connection attempt.
+ clients = self.reactor.tcpClients
+ self.assertEqual(len(clients), 1)
+ (host, port, client_factory, _timeout, _bindAddress) = clients.pop(0)
+ self.assertEqual(host, "1.2.3.4")
+ self.assertEqual(port, 8765)
+
+ # Set up client side protocol
+ client_protocol = client_factory.buildProtocol(None)
+
+ request_factory = OneShotRequestFactory()
+
+ # Set up the server side protocol
+ channel = _PushHTTPChannel(self.reactor)
+ channel.requestFactory = request_factory
+ channel.site = self.site
+
+ # Connect client to server and vice versa.
+ client_to_server_transport = FakeTransport(
+ channel, self.reactor, client_protocol
+ )
+ client_protocol.makeConnection(client_to_server_transport)
+
+ server_to_client_transport = FakeTransport(
+ client_protocol, self.reactor, channel
+ )
+ channel.makeConnection(server_to_client_transport)
+
+ # The request will now be processed by `self.site` and the response
+ # streamed back.
+ self.reactor.advance(0)
+
+ # We tear down the connection so it doesn't get reused without our
+ # knowledge.
+ server_to_client_transport.loseConnection()
+ client_to_server_transport.loseConnection()
+
+ return request_factory.request
+
+ def assert_request_is_get_repl_stream_updates(
+ self, request: SynapseRequest, stream_name: str
+ ):
+ """Asserts that the given request is a HTTP replication request for
+ fetching updates for given stream.
+ """
+
+ self.assertRegex(
+ request.path,
+ br"^/_synapse/replication/get_repl_stream_updates/%s/[^/]+$"
+ % (stream_name.encode("ascii"),),
+ )
+
+ self.assertEqual(request.method, b"GET")
+
+
+class TestReplicationDataHandler(GenericWorkerReplicationHandler):
+ """Drop-in for ReplicationDataHandler which just collects RDATA rows"""
+
+ def __init__(self, hs: HomeServer):
+ super().__init__(hs)
+
+ # list of received (stream_name, token, row) tuples
+ self.received_rdata_rows = [] # type: List[Tuple[str, int, Any]]
+
+ async def on_rdata(self, stream_name, instance_name, token, rows):
+ await super().on_rdata(stream_name, instance_name, token, rows)
+ for r in rows:
+ self.received_rdata_rows.append((stream_name, token, r))
+
+
+@attr.s()
+class OneShotRequestFactory:
+ """A simple request factory that generates a single `SynapseRequest` and
+ stores it for future use. Can only be used once.
+ """
+
+ request = attr.ib(default=None)
+
+ def __call__(self, *args, **kwargs):
+ assert self.request is None
+
+ self.request = SynapseRequest(*args, **kwargs)
+ return self.request
+
+
+class _PushHTTPChannel(HTTPChannel):
+ """A HTTPChannel that wraps pull producers to push producers.
+
+ This is a hack to get around the fact that HTTPChannel transparently wraps a
+ pull producer (which is what Synapse uses to reply to requests) with
+ `_PullToPush` to convert it to a push producer. Unfortunately `_PullToPush`
+ uses the standard reactor rather than letting us use our test reactor, which
+ makes it very hard to test.
+ """
+
+ def __init__(self, reactor: IReactorTime):
+ super().__init__()
+ self.reactor = reactor
+
+ self._pull_to_push_producer = None # type: Optional[_PullToPushProducer]
+
+ def registerProducer(self, producer, streaming):
+ # Convert pull producers to push producer.
+ if not streaming:
+ self._pull_to_push_producer = _PullToPushProducer(
+ self.reactor, producer, self
+ )
+ producer = self._pull_to_push_producer
+
+ super().registerProducer(producer, True)
+
+ def unregisterProducer(self):
+ if self._pull_to_push_producer:
+ # We need to manually stop the _PullToPushProducer.
+ self._pull_to_push_producer.stop()
+
+
+class _PullToPushProducer:
+ """A push producer that wraps a pull producer.
+ """
+
+ def __init__(
+ self, reactor: IReactorTime, producer: IPullProducer, consumer: IConsumer
+ ):
+ self._clock = Clock(reactor)
+ self._producer = producer
+ self._consumer = consumer
+
+ # While running we use a looping call with a zero delay to call
+ # resumeProducing on given producer.
+ self._looping_call = None # type: Optional[LoopingCall]
+
+ # We start writing next reactor tick.
+ self._start_loop()
+
+ def _start_loop(self):
+ """Start the looping call to
+ """
+
+ if not self._looping_call:
+ # Start a looping call which runs every tick.
+ self._looping_call = self._clock.looping_call(self._run_once, 0)
+
+ def stop(self):
+ """Stops calling resumeProducing.
+ """
+ if self._looping_call:
+ self._looping_call.stop()
+ self._looping_call = None
+
+ def pauseProducing(self):
+ """Implements IPushProducer
+ """
+ self.stop()
+
+ def resumeProducing(self):
+ """Implements IPushProducer
+ """
+ self._start_loop()
+
+ def stopProducing(self):
+ """Implements IPushProducer
+ """
+ self.stop()
+ self._producer.stopProducing()
+
+ def _run_once(self):
+ """Calls resumeProducing on producer once.
+ """
+
+ try:
+ self._producer.resumeProducing()
+ except Exception:
+ logger.exception("Failed to call resumeProducing")
+ try:
+ self._consumer.unregisterProducer()
+ except Exception:
+ pass
+
+ self.stopProducing()
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index 2a1e7c7166..56497b8476 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -13,61 +13,26 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from mock import Mock, NonCallableMock
+from mock import Mock
-from synapse.replication.tcp.client import (
- ReplicationClientFactory,
- ReplicationClientHandler,
-)
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-from synapse.storage.database import make_conn
+from tests.replication._base import BaseStreamTestCase
-from tests import unittest
-from tests.server import FakeTransport
-
-class BaseSlavedStoreTestCase(unittest.HomeserverTestCase):
+class BaseSlavedStoreTestCase(BaseStreamTestCase):
def make_homeserver(self, reactor, clock):
- hs = self.setup_test_homeserver(
- "blue",
- federation_client=Mock(),
- ratelimiter=NonCallableMock(spec_set=["can_do_action"]),
- )
-
- hs.get_ratelimiter().can_do_action.return_value = (True, 0)
+ hs = self.setup_test_homeserver(federation_client=Mock())
return hs
def prepare(self, reactor, clock, hs):
+ super().prepare(reactor, clock, hs)
- db_config = hs.config.database.get_single_database()
- self.master_store = self.hs.get_datastore()
- self.storage = hs.get_storage()
- database = hs.get_datastores().databases[0]
- self.slaved_store = self.STORE_TYPE(
- database, make_conn(db_config, database.engine), self.hs
- )
- self.event_id = 0
-
- server_factory = ReplicationStreamProtocolFactory(self.hs)
- self.streamer = server_factory.streamer
+ self.reconnect()
- handler_factory = Mock()
- self.replication_handler = ReplicationClientHandler(self.slaved_store)
- self.replication_handler.factory = handler_factory
-
- client_factory = ReplicationClientFactory(
- self.hs, "client_name", self.replication_handler
- )
-
- server = server_factory.buildProtocol(None)
- client = client_factory.buildProtocol(None)
-
- client.makeConnection(FakeTransport(server, reactor))
-
- self.server_to_client_transport = FakeTransport(client, reactor)
- server.makeConnection(self.server_to_client_transport)
+ self.master_store = hs.get_datastore()
+ self.slaved_store = self.worker_hs.get_datastore()
+ self.storage = hs.get_storage()
def replicate(self):
"""Tell the master side of replication that something has happened, and then
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index f0561b30e3..1a88c7fb80 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -17,17 +17,18 @@ from canonicaljson import encode_canonical_json
from synapse.api.room_versions import RoomVersions
from synapse.events import FrozenEvent, _EventInternalMetadata, make_event_from_dict
-from synapse.events.snapshot import EventContext
from synapse.handlers.room import RoomEventSource
from synapse.replication.slave.storage.events import SlavedEventStore
from synapse.storage.roommember import RoomsForUser
+from tests.server import FakeTransport
+
from ._base import BaseSlavedStoreTestCase
-USER_ID = "@feeling:blue"
-USER_ID_2 = "@bright:blue"
+USER_ID = "@feeling:test"
+USER_ID_2 = "@bright:test"
OUTLIER = {"outlier": True}
-ROOM_ID = "!room:blue"
+ROOM_ID = "!room:test"
logger = logging.getLogger(__name__)
@@ -239,7 +240,8 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
self.check("get_rooms_for_user_with_stream_ordering", (USER_ID_2,), set())
# limit the replication rate
- repl_transport = self.server_to_client_transport
+ repl_transport = self._server_transport
+ assert isinstance(repl_transport, FakeTransport)
repl_transport.autoflush = False
# build the join and message events and persist them in the same batch.
@@ -322,7 +324,6 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
type="m.room.message",
key=None,
internal={},
- state=None,
depth=None,
prev_events=[],
auth_events=[],
@@ -362,15 +363,8 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
event = make_event_from_dict(event_dict, internal_metadata_dict=internal)
self.event_id += 1
-
- if state is not None:
- state_ids = {key: e.event_id for key, e in state.items()}
- context = EventContext.with_state(
- state_group=None, current_state_ids=state_ids, prev_state_ids=state_ids
- )
- else:
- state_handler = self.hs.get_state_handler()
- context = self.get_success(state_handler.compute_event_context(event))
+ state_handler = self.hs.get_state_handler()
+ context = self.get_success(state_handler.compute_event_context(event))
self.master_store.add_push_actions_to_staging(
event.event_id, {user_id: actions for user_id, actions in push_actions}
diff --git a/tests/replication/tcp/streams/_base.py b/tests/replication/tcp/streams/_base.py
deleted file mode 100644
index e96ad4ca4e..0000000000
--- a/tests/replication/tcp/streams/_base.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from mock import Mock
-
-from synapse.replication.tcp.commands import ReplicateCommand
-from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-
-from tests import unittest
-from tests.server import FakeTransport
-
-
-class BaseStreamTestCase(unittest.HomeserverTestCase):
- """Base class for tests of the replication streams"""
-
- def prepare(self, reactor, clock, hs):
- # build a replication server
- server_factory = ReplicationStreamProtocolFactory(self.hs)
- self.streamer = server_factory.streamer
- server = server_factory.buildProtocol(None)
-
- # build a replication client, with a dummy handler
- handler_factory = Mock()
- self.test_handler = TestReplicationClientHandler()
- self.test_handler.factory = handler_factory
- self.client = ClientReplicationStreamProtocol(
- "client", "test", clock, self.test_handler
- )
-
- # wire them together
- self.client.makeConnection(FakeTransport(server, reactor))
- server.makeConnection(FakeTransport(self.client, reactor))
-
- def replicate(self):
- """Tell the master side of replication that something has happened, and then
- wait for the replication to occur.
- """
- self.streamer.on_notifier_poke()
- self.pump(0.1)
-
- def replicate_stream(self, stream, token="NOW"):
- """Make the client end a REPLICATE command to set up a subscription to a stream"""
- self.client.send_command(ReplicateCommand(stream, token))
-
-
-class TestReplicationClientHandler(object):
- """Drop-in for ReplicationClientHandler which just collects RDATA rows"""
-
- def __init__(self):
- self.received_rdata_rows = []
-
- def get_streams_to_replicate(self):
- return {}
-
- def get_currently_syncing_users(self):
- return []
-
- def update_connection(self, connection):
- pass
-
- def finished_connecting(self):
- pass
-
- async def on_rdata(self, stream_name, token, rows):
- for r in rows:
- self.received_rdata_rows.append((stream_name, token, r))
diff --git a/tests/replication/tcp/streams/test_account_data.py b/tests/replication/tcp/streams/test_account_data.py
new file mode 100644
index 0000000000..6a5116dd2a
--- /dev/null
+++ b/tests/replication/tcp/streams/test_account_data.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.tcp.streams._base import (
+ _STREAM_UPDATE_TARGET_ROW_COUNT,
+ AccountDataStream,
+)
+
+from tests.replication._base import BaseStreamTestCase
+
+
+class AccountDataStreamTestCase(BaseStreamTestCase):
+ def test_update_function_room_account_data_limit(self):
+ """Test replication with many room account data updates
+ """
+ store = self.hs.get_datastore()
+
+ # generate lots of account data updates
+ updates = []
+ for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
+ update = "m.test_type.%i" % (i,)
+ self.get_success(
+ store.add_account_data_to_room("test_user", "test_room", update, {})
+ )
+ updates.append(update)
+
+ # also one global update
+ self.get_success(store.add_account_data_for_user("test_user", "m.global", {}))
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order
+ received_rows = self.test_handler.received_rdata_rows
+
+ for t in updates:
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertEqual(stream_name, AccountDataStream.NAME)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, t)
+ self.assertEqual(row.room_id, "test_room")
+
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, "m.global")
+ self.assertIsNone(row.room_id)
+
+ self.assertEqual([], received_rows)
+
+ def test_update_function_global_account_data_limit(self):
+ """Test replication with many global account data updates
+ """
+ store = self.hs.get_datastore()
+
+ # generate lots of account data updates
+ updates = []
+ for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
+ update = "m.test_type.%i" % (i,)
+ self.get_success(store.add_account_data_for_user("test_user", update, {}))
+ updates.append(update)
+
+ # also one per-room update
+ self.get_success(
+ store.add_account_data_to_room("test_user", "test_room", "m.per_room", {})
+ )
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order
+ received_rows = self.test_handler.received_rdata_rows
+
+ for t in updates:
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertEqual(stream_name, AccountDataStream.NAME)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, t)
+ self.assertIsNone(row.room_id)
+
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, "m.per_room")
+ self.assertEqual(row.room_id, "test_room")
+
+ self.assertEqual([], received_rows)
diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
new file mode 100644
index 0000000000..51bf0ef4e9
--- /dev/null
+++ b/tests/replication/tcp/streams/test_events.py
@@ -0,0 +1,425 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.events import EventBase
+from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
+from synapse.replication.tcp.streams.events import (
+ EventsStreamCurrentStateRow,
+ EventsStreamEventRow,
+ EventsStreamRow,
+)
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, room
+
+from tests.replication._base import BaseStreamTestCase
+from tests.test_utils.event_injection import inject_event, inject_member_event
+
+
+class EventsStreamTestCase(BaseStreamTestCase):
+ servlets = [
+ admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ ]
+
+ def prepare(self, reactor, clock, hs):
+ super().prepare(reactor, clock, hs)
+ self.user_id = self.register_user("u1", "pass")
+ self.user_tok = self.login("u1", "pass")
+
+ self.reconnect()
+
+ self.room_id = self.helper.create_room_as(tok=self.user_tok)
+ self.test_handler.received_rdata_rows.clear()
+
+ def test_update_function_event_row_limit(self):
+ """Test replication with many non-state events
+
+ Checks that all events are correctly replicated when there are lots of
+ event rows to be replicated.
+ """
+ # disconnect, so that we can stack up some changes
+ self.disconnect()
+
+ # generate lots of non-state events. We inject them using inject_event
+ # so that they are not send out over replication until we call self.replicate().
+ events = [
+ self._inject_test_event()
+ for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 1)
+ ]
+
+ # also one state event
+ state_event = self._inject_state_event()
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order (as
+ # well as various cache invalidation updates which we ignore)
+ received_rows = [
+ row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+ ]
+
+ for event in events:
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, event.event_id)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, state_event.event_id)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ self.assertEqual(row.data.event_id, state_event.event_id)
+
+ self.assertEqual([], received_rows)
+
+ def test_update_function_huge_state_change(self):
+ """Test replication with many state events
+
+ Ensures that all events are correctly replicated when there are lots of
+ state change rows to be replicated.
+ """
+
+ # we want to generate lots of state changes at a single stream ID.
+ #
+ # We do this by having two branches in the DAG. On one, we have a moderator
+ # which that generates lots of state; on the other, we de-op the moderator,
+ # thus invalidating all the state.
+
+ OTHER_USER = "@other_user:localhost"
+
+ # have the user join
+ inject_member_event(self.hs, self.room_id, OTHER_USER, Membership.JOIN)
+
+ # Update existing power levels with mod at PL50
+ pls = self.helper.get_state(
+ self.room_id, EventTypes.PowerLevels, tok=self.user_tok
+ )
+ pls["users"][OTHER_USER] = 50
+ self.helper.send_state(
+ self.room_id, EventTypes.PowerLevels, pls, tok=self.user_tok,
+ )
+
+ # this is the point in the DAG where we make a fork
+ fork_point = self.get_success(
+ self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
+ ) # type: List[str]
+
+ events = [
+ self._inject_state_event(sender=OTHER_USER)
+ for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
+ ]
+
+ self.replicate()
+ # all those events and state changes should have landed
+ self.assertGreaterEqual(
+ len(self.test_handler.received_rdata_rows), 2 * len(events)
+ )
+
+ # disconnect, so that we can stack up the changes
+ self.disconnect()
+ self.test_handler.received_rdata_rows.clear()
+
+ # a state event which doesn't get rolled back, to check that the state
+ # before the huge update comes through ok
+ state1 = self._inject_state_event()
+
+ # roll back all the state by de-modding the user
+ prev_events = fork_point
+ pls["users"][OTHER_USER] = 0
+ pl_event = inject_event(
+ self.hs,
+ prev_event_ids=prev_events,
+ type=EventTypes.PowerLevels,
+ state_key="",
+ sender=self.user_id,
+ room_id=self.room_id,
+ content=pls,
+ )
+
+ # one more bit of state that doesn't get rolled back
+ state2 = self._inject_state_event()
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order (as
+ # well as various cache invalidation updates which we ignore)
+ #
+ # we expect:
+ #
+ # - two rows for state1
+ # - the PL event row, plus state rows for the PL event and each
+ # of the states that got reverted.
+ # - two rows for state2
+
+ received_rows = [
+ row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+ ]
+
+ # first check the first two rows, which should be state1
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, state1.event_id)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ self.assertEqual(row.data.event_id, state1.event_id)
+
+ # now the last two rows, which should be state2
+ stream_name, token, row = received_rows.pop(-2)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, state2.event_id)
+
+ stream_name, token, row = received_rows.pop(-1)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ self.assertEqual(row.data.event_id, state2.event_id)
+
+ # that should leave us with the rows for the PL event
+ self.assertEqual(len(received_rows), len(events) + 2)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, pl_event.event_id)
+
+ # the state rows are unsorted
+ state_rows = [] # type: List[EventsStreamCurrentStateRow]
+ for stream_name, token, row in received_rows:
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ state_rows.append(row.data)
+
+ state_rows.sort(key=lambda r: r.state_key)
+
+ sr = state_rows.pop(0)
+ self.assertEqual(sr.type, EventTypes.PowerLevels)
+ self.assertEqual(sr.event_id, pl_event.event_id)
+ for sr in state_rows:
+ self.assertEqual(sr.type, "test_state_event")
+ # "None" indicates the state has been deleted
+ self.assertIsNone(sr.event_id)
+
+ def test_update_function_state_row_limit(self):
+ """Test replication with many state events over several stream ids.
+ """
+
+ # we want to generate lots of state changes, but for this test, we want to
+ # spread out the state changes over a few stream IDs.
+ #
+ # We do this by having two branches in the DAG. On one, we have four moderators,
+ # each of which that generates lots of state; on the other, we de-op the users,
+ # thus invalidating all the state.
+
+ NUM_USERS = 4
+ STATES_PER_USER = _STREAM_UPDATE_TARGET_ROW_COUNT // 4 + 1
+
+ user_ids = ["@user%i:localhost" % (i,) for i in range(NUM_USERS)]
+
+ # have the users join
+ for u in user_ids:
+ inject_member_event(self.hs, self.room_id, u, Membership.JOIN)
+
+ # Update existing power levels with mod at PL50
+ pls = self.helper.get_state(
+ self.room_id, EventTypes.PowerLevels, tok=self.user_tok
+ )
+ pls["users"].update({u: 50 for u in user_ids})
+ self.helper.send_state(
+ self.room_id, EventTypes.PowerLevels, pls, tok=self.user_tok,
+ )
+
+ # this is the point in the DAG where we make a fork
+ fork_point = self.get_success(
+ self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
+ ) # type: List[str]
+
+ events = [] # type: List[EventBase]
+ for user in user_ids:
+ events.extend(
+ self._inject_state_event(sender=user) for _ in range(STATES_PER_USER)
+ )
+
+ self.replicate()
+
+ # all those events and state changes should have landed
+ self.assertGreaterEqual(
+ len(self.test_handler.received_rdata_rows), 2 * len(events)
+ )
+
+ # disconnect, so that we can stack up the changes
+ self.disconnect()
+ self.test_handler.received_rdata_rows.clear()
+
+ # now roll back all that state by de-modding the users
+ prev_events = fork_point
+ pl_events = []
+ for u in user_ids:
+ pls["users"][u] = 0
+ e = inject_event(
+ self.hs,
+ prev_event_ids=prev_events,
+ type=EventTypes.PowerLevels,
+ state_key="",
+ sender=self.user_id,
+ room_id=self.room_id,
+ content=pls,
+ )
+ prev_events = [e.event_id]
+ pl_events.append(e)
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order (as
+ # well as various cache invalidation updates which we ignore)
+ received_rows = [
+ row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+ ]
+ self.assertGreaterEqual(len(received_rows), len(events))
+ for i in range(NUM_USERS):
+ # for each user, we expect the PL event row, followed by state rows for
+ # the PL event and each of the states that got reverted.
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, pl_events[i].event_id)
+
+ # the state rows are unsorted
+ state_rows = [] # type: List[EventsStreamCurrentStateRow]
+ for j in range(STATES_PER_USER + 1):
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ state_rows.append(row.data)
+
+ state_rows.sort(key=lambda r: r.state_key)
+
+ sr = state_rows.pop(0)
+ self.assertEqual(sr.type, EventTypes.PowerLevels)
+ self.assertEqual(sr.event_id, pl_events[i].event_id)
+ for sr in state_rows:
+ self.assertEqual(sr.type, "test_state_event")
+ # "None" indicates the state has been deleted
+ self.assertIsNone(sr.event_id)
+
+ self.assertEqual([], received_rows)
+
+ event_count = 0
+
+ def _inject_test_event(
+ self, body: Optional[str] = None, sender: Optional[str] = None, **kwargs
+ ) -> EventBase:
+ if sender is None:
+ sender = self.user_id
+
+ if body is None:
+ body = "event %i" % (self.event_count,)
+ self.event_count += 1
+
+ return inject_event(
+ self.hs,
+ room_id=self.room_id,
+ sender=sender,
+ type="test_event",
+ content={"body": body},
+ **kwargs
+ )
+
+ def _inject_state_event(
+ self,
+ body: Optional[str] = None,
+ state_key: Optional[str] = None,
+ sender: Optional[str] = None,
+ ) -> EventBase:
+ if sender is None:
+ sender = self.user_id
+
+ if state_key is None:
+ state_key = "state_%i" % (self.event_count,)
+ self.event_count += 1
+
+ if body is None:
+ body = "state event %s" % (state_key,)
+
+ return inject_event(
+ self.hs,
+ room_id=self.room_id,
+ sender=sender,
+ type="test_state_event",
+ state_key=state_key,
+ content={"body": body},
+ )
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
new file mode 100644
index 0000000000..2babea4e3e
--- /dev/null
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.federation.send_queue import EduRow
+from synapse.replication.tcp.streams.federation import FederationStream
+
+from tests.replication._base import BaseStreamTestCase
+
+
+class FederationStreamTestCase(BaseStreamTestCase):
+ def _get_worker_hs_config(self) -> dict:
+ # enable federation sending on the worker
+ config = super()._get_worker_hs_config()
+ # TODO: make it so we don't need both of these
+ config["send_federation"] = True
+ config["worker_app"] = "synapse.app.federation_sender"
+ return config
+
+ def test_catchup(self):
+ """Basic test of catchup on reconnect
+
+ Makes sure that updates sent while we are offline are received later.
+ """
+ fed_sender = self.hs.get_federation_sender()
+ received_rows = self.test_handler.received_rdata_rows
+
+ fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
+
+ self.reconnect()
+ self.reactor.advance(0)
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual(received_rows, [])
+
+ # We should now see an attempt to connect to the master
+ request = self.handle_http_replication_attempt()
+ self.assert_request_is_get_repl_stream_updates(request, "federation")
+
+ # we should have received an update row
+ stream_name, token, row = received_rows.pop()
+ self.assertEqual(stream_name, "federation")
+ self.assertIsInstance(row, FederationStream.FederationStreamRow)
+ self.assertEqual(row.type, EduRow.TypeId)
+ edurow = EduRow.from_data(row.data)
+ self.assertEqual(edurow.edu.edu_type, "m.test_edu")
+ self.assertEqual(edurow.edu.origin, self.hs.hostname)
+ self.assertEqual(edurow.edu.destination, "testdest")
+ self.assertEqual(edurow.edu.content, {"a": "b"})
+
+ self.assertEqual(received_rows, [])
+
+ # additional updates should be transferred without an HTTP hit
+ fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
+ self.reactor.advance(0)
+ # there should be no http hit
+ self.assertEqual(len(self.reactor.tcpClients), 0)
+ # ... but we should have a row
+ self.assertEqual(len(received_rows), 1)
+
+ stream_name, token, row = received_rows.pop()
+ self.assertEqual(stream_name, "federation")
+ self.assertIsInstance(row, FederationStream.FederationStreamRow)
+ self.assertEqual(row.type, EduRow.TypeId)
+ edurow = EduRow.from_data(row.data)
+ self.assertEqual(edurow.edu.edu_type, "m.test1")
+ self.assertEqual(edurow.edu.origin, self.hs.hostname)
+ self.assertEqual(edurow.edu.destination, "testdest")
+ self.assertEqual(edurow.edu.content, {"c": "d"})
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index d5a99f6caa..56b062ecc1 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,35 +12,73 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.replication.tcp.streams._base import ReceiptsStreamRow
-from tests.replication.tcp.streams._base import BaseStreamTestCase
+# type: ignore
+
+from mock import Mock
+
+from synapse.replication.tcp.streams._base import ReceiptsStream
+
+from tests.replication._base import BaseStreamTestCase
USER_ID = "@feeling:blue"
-ROOM_ID = "!room:blue"
-EVENT_ID = "$event:blue"
class ReceiptsStreamTestCase(BaseStreamTestCase):
+ def _build_replication_data_handler(self):
+ return Mock(wraps=super()._build_replication_data_handler())
+
def test_receipt(self):
- # make the client subscribe to the receipts stream
- self.replicate_stream("receipts", "NOW")
+ self.reconnect()
# tell the master to send a new receipt
self.get_success(
self.hs.get_datastore().insert_receipt(
- ROOM_ID, "m.read", USER_ID, [EVENT_ID], {"a": 1}
+ "!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1}
)
)
self.replicate()
# there should be one RDATA command
- rdata_rows = self.test_handler.received_rdata_rows
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "receipts")
self.assertEqual(1, len(rdata_rows))
- self.assertEqual(rdata_rows[0][0], "receipts")
- row = rdata_rows[0][2] # type: ReceiptsStreamRow
- self.assertEqual(ROOM_ID, row.room_id)
+ row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow
+ self.assertEqual("!room:blue", row.room_id)
self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id)
- self.assertEqual(EVENT_ID, row.event_id)
+ self.assertEqual("$event:blue", row.event_id)
self.assertEqual({"a": 1}, row.data)
+
+ # Now let's disconnect and insert some data.
+ self.disconnect()
+
+ self.test_handler.on_rdata.reset_mock()
+
+ self.get_success(
+ self.hs.get_datastore().insert_receipt(
+ "!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2}
+ )
+ )
+ self.replicate()
+
+ # Nothing should have happened as we are disconnected
+ self.test_handler.on_rdata.assert_not_called()
+
+ self.reconnect()
+ self.pump(0.1)
+
+ # We should now have caught up and get the missing data
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "receipts")
+ self.assertEqual(token, 3)
+ self.assertEqual(1, len(rdata_rows))
+
+ row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow
+ self.assertEqual("!room2:blue", row.room_id)
+ self.assertEqual("m.read", row.receipt_type)
+ self.assertEqual(USER_ID, row.user_id)
+ self.assertEqual("$event2:foo", row.event_id)
+ self.assertEqual({"a": 2}, row.data)
diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
new file mode 100644
index 0000000000..fd62b26356
--- /dev/null
+++ b/tests/replication/tcp/streams/test_typing.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from mock import Mock
+
+from synapse.handlers.typing import RoomMember
+from synapse.replication.tcp.streams import TypingStream
+
+from tests.replication._base import BaseStreamTestCase
+
+USER_ID = "@feeling:blue"
+
+
+class TypingStreamTestCase(BaseStreamTestCase):
+ def _build_replication_data_handler(self):
+ return Mock(wraps=super()._build_replication_data_handler())
+
+ def test_typing(self):
+ typing = self.hs.get_typing_handler()
+
+ room_id = "!bar:blue"
+
+ self.reconnect()
+
+ typing._push_update(member=RoomMember(room_id, USER_ID), typing=True)
+
+ self.reactor.advance(0)
+
+ # We should now see an attempt to connect to the master
+ request = self.handle_http_replication_attempt()
+ self.assert_request_is_get_repl_stream_updates(request, "typing")
+
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "typing")
+ self.assertEqual(1, len(rdata_rows))
+ row = rdata_rows[0] # type: TypingStream.TypingStreamRow
+ self.assertEqual(room_id, row.room_id)
+ self.assertEqual([USER_ID], row.user_ids)
+
+ # Now let's disconnect and insert some data.
+ self.disconnect()
+
+ self.test_handler.on_rdata.reset_mock()
+
+ typing._push_update(member=RoomMember(room_id, USER_ID), typing=False)
+
+ self.test_handler.on_rdata.assert_not_called()
+
+ self.reconnect()
+ self.pump(0.1)
+
+ # We should now see an attempt to connect to the master
+ request = self.handle_http_replication_attempt()
+ self.assert_request_is_get_repl_stream_updates(request, "typing")
+
+ # The from token should be the token from the last RDATA we got.
+ self.assertEqual(int(request.args[b"from_token"][0]), token)
+
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "typing")
+ self.assertEqual(1, len(rdata_rows))
+ row = rdata_rows[0]
+ self.assertEqual(room_id, row.room_id)
+ self.assertEqual([], row.user_ids)
diff --git a/tests/replication/tcp/test_commands.py b/tests/replication/tcp/test_commands.py
new file mode 100644
index 0000000000..60c10a441a
--- /dev/null
+++ b/tests/replication/tcp/test_commands.py
@@ -0,0 +1,44 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.replication.tcp.commands import (
+ RdataCommand,
+ ReplicateCommand,
+ parse_command_from_line,
+)
+
+from tests.unittest import TestCase
+
+
+class ParseCommandTestCase(TestCase):
+ def test_parse_one_word_command(self):
+ line = "REPLICATE"
+ cmd = parse_command_from_line(line)
+ self.assertIsInstance(cmd, ReplicateCommand)
+
+ def test_parse_rdata(self):
+ line = 'RDATA events master 6287863 ["ev", ["$eventid", "!roomid", "type", null, null, null]]'
+ cmd = parse_command_from_line(line)
+ assert isinstance(cmd, RdataCommand)
+ self.assertEqual(cmd.stream_name, "events")
+ self.assertEqual(cmd.instance_name, "master")
+ self.assertEqual(cmd.token, 6287863)
+
+ def test_parse_rdata_batch(self):
+ line = 'RDATA presence master batch ["@foo:example.com", "online"]'
+ cmd = parse_command_from_line(line)
+ assert isinstance(cmd, RdataCommand)
+ self.assertEqual(cmd.stream_name, "presence")
+ self.assertEqual(cmd.instance_name, "master")
+ self.assertIsNone(cmd.token)
diff --git a/tests/replication/tcp/test_remote_server_up.py b/tests/replication/tcp/test_remote_server_up.py
new file mode 100644
index 0000000000..d1c15caeb0
--- /dev/null
+++ b/tests/replication/tcp/test_remote_server_up.py
@@ -0,0 +1,62 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import Tuple
+
+from twisted.internet.interfaces import IProtocol
+from twisted.test.proto_helpers import StringTransport
+
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
+
+from tests.unittest import HomeserverTestCase
+
+
+class RemoteServerUpTestCase(HomeserverTestCase):
+ def prepare(self, reactor, clock, hs):
+ self.factory = ReplicationStreamProtocolFactory(hs)
+
+ def _make_client(self) -> Tuple[IProtocol, StringTransport]:
+ """Create a new direct TCP replication connection
+ """
+
+ proto = self.factory.buildProtocol(("127.0.0.1", 0))
+ transport = StringTransport()
+ proto.makeConnection(transport)
+
+ # We can safely ignore the commands received during connection.
+ self.pump()
+ transport.clear()
+
+ return proto, transport
+
+ def test_relay(self):
+ """Test that Synapse will relay REMOTE_SERVER_UP commands to all
+ other connections, but not the one that sent it.
+ """
+
+ proto1, transport1 = self._make_client()
+
+ # We shouldn't receive an echo.
+ proto1.dataReceived(b"REMOTE_SERVER_UP example.com\n")
+ self.pump()
+ self.assertEqual(transport1.value(), b"")
+
+ # But we should see an echo if we connect another client
+ proto2, transport2 = self._make_client()
+ proto1.dataReceived(b"REMOTE_SERVER_UP example.com\n")
+
+ self.pump()
+ self.assertEqual(transport1.value(), b"")
+ self.assertEqual(transport2.value(), b"REMOTE_SERVER_UP example.com\n")
diff --git a/tests/replication/test_federation_ack.py b/tests/replication/test_federation_ack.py
new file mode 100644
index 0000000000..5448d9f0dc
--- /dev/null
+++ b/tests/replication/test_federation_ack.py
@@ -0,0 +1,71 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import mock
+
+from synapse.app.generic_worker import GenericWorkerServer
+from synapse.replication.tcp.commands import FederationAckCommand
+from synapse.replication.tcp.protocol import AbstractConnection
+from synapse.replication.tcp.streams.federation import FederationStream
+
+from tests.unittest import HomeserverTestCase
+
+
+class FederationAckTestCase(HomeserverTestCase):
+ def default_config(self) -> dict:
+ config = super().default_config()
+ config["worker_app"] = "synapse.app.federation_sender"
+ config["send_federation"] = True
+ return config
+
+ def make_homeserver(self, reactor, clock):
+ hs = self.setup_test_homeserver(homeserverToUse=GenericWorkerServer)
+ return hs
+
+ def test_federation_ack_sent(self):
+ """A FEDERATION_ACK should be sent back after each RDATA federation
+
+ This test checks that the federation sender is correctly sending back
+ FEDERATION_ACK messages. The test works by spinning up a federation_sender
+ worker server, and then fishing out its ReplicationCommandHandler. We wire
+ the RCH up to a mock connection (so that we can observe the command being sent)
+ and then poke in an RDATA row.
+
+ XXX: it might be nice to do this by pretending to be a synapse master worker
+ (or a redis server), and having the worker connect to us via a mocked-up TCP
+ transport, rather than assuming that the implementation has a
+ ReplicationCommandHandler.
+ """
+ rch = self.hs.get_tcp_replication()
+
+ # wire up the ReplicationCommandHandler to a mock connection
+ mock_connection = mock.Mock(spec=AbstractConnection)
+ rch.new_connection(mock_connection)
+
+ # tell it it received an RDATA row
+ self.get_success(
+ rch.on_rdata(
+ "federation",
+ "master",
+ token=10,
+ rows=[FederationStream.FederationStreamRow(type="x", data=[1, 2, 3])],
+ )
+ )
+
+ # now check that the FEDERATION_ACK was sent
+ mock_connection.send_command.assert_called_once()
+ cmd = mock_connection.send_command.call_args[0][0]
+ assert isinstance(cmd, FederationAckCommand)
+ self.assertEqual(cmd.token, 10)
|