diff --git a/tests/replication/tcp/streams/_base.py b/tests/replication/tcp/streams/_base.py
deleted file mode 100644
index e96ad4ca4e..0000000000
--- a/tests/replication/tcp/streams/_base.py
+++ /dev/null
@@ -1,78 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-from mock import Mock
-
-from synapse.replication.tcp.commands import ReplicateCommand
-from synapse.replication.tcp.protocol import ClientReplicationStreamProtocol
-from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
-
-from tests import unittest
-from tests.server import FakeTransport
-
-
-class BaseStreamTestCase(unittest.HomeserverTestCase):
- """Base class for tests of the replication streams"""
-
- def prepare(self, reactor, clock, hs):
- # build a replication server
- server_factory = ReplicationStreamProtocolFactory(self.hs)
- self.streamer = server_factory.streamer
- server = server_factory.buildProtocol(None)
-
- # build a replication client, with a dummy handler
- handler_factory = Mock()
- self.test_handler = TestReplicationClientHandler()
- self.test_handler.factory = handler_factory
- self.client = ClientReplicationStreamProtocol(
- "client", "test", clock, self.test_handler
- )
-
- # wire them together
- self.client.makeConnection(FakeTransport(server, reactor))
- server.makeConnection(FakeTransport(self.client, reactor))
-
- def replicate(self):
- """Tell the master side of replication that something has happened, and then
- wait for the replication to occur.
- """
- self.streamer.on_notifier_poke()
- self.pump(0.1)
-
- def replicate_stream(self, stream, token="NOW"):
- """Make the client end a REPLICATE command to set up a subscription to a stream"""
- self.client.send_command(ReplicateCommand(stream, token))
-
-
-class TestReplicationClientHandler(object):
- """Drop-in for ReplicationClientHandler which just collects RDATA rows"""
-
- def __init__(self):
- self.received_rdata_rows = []
-
- def get_streams_to_replicate(self):
- return {}
-
- def get_currently_syncing_users(self):
- return []
-
- def update_connection(self, connection):
- pass
-
- def finished_connecting(self):
- pass
-
- async def on_rdata(self, stream_name, token, rows):
- for r in rows:
- self.received_rdata_rows.append((stream_name, token, r))
diff --git a/tests/replication/tcp/streams/test_account_data.py b/tests/replication/tcp/streams/test_account_data.py
new file mode 100644
index 0000000000..6a5116dd2a
--- /dev/null
+++ b/tests/replication/tcp/streams/test_account_data.py
@@ -0,0 +1,117 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.replication.tcp.streams._base import (
+ _STREAM_UPDATE_TARGET_ROW_COUNT,
+ AccountDataStream,
+)
+
+from tests.replication._base import BaseStreamTestCase
+
+
+class AccountDataStreamTestCase(BaseStreamTestCase):
+ def test_update_function_room_account_data_limit(self):
+ """Test replication with many room account data updates
+ """
+ store = self.hs.get_datastore()
+
+ # generate lots of account data updates
+ updates = []
+ for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
+ update = "m.test_type.%i" % (i,)
+ self.get_success(
+ store.add_account_data_to_room("test_user", "test_room", update, {})
+ )
+ updates.append(update)
+
+ # also one global update
+ self.get_success(store.add_account_data_for_user("test_user", "m.global", {}))
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order
+ received_rows = self.test_handler.received_rdata_rows
+
+ for t in updates:
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertEqual(stream_name, AccountDataStream.NAME)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, t)
+ self.assertEqual(row.room_id, "test_room")
+
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, "m.global")
+ self.assertIsNone(row.room_id)
+
+ self.assertEqual([], received_rows)
+
+ def test_update_function_global_account_data_limit(self):
+ """Test replication with many global account data updates
+ """
+ store = self.hs.get_datastore()
+
+ # generate lots of account data updates
+ updates = []
+ for i in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 5):
+ update = "m.test_type.%i" % (i,)
+ self.get_success(store.add_account_data_for_user("test_user", update, {}))
+ updates.append(update)
+
+ # also one per-room update
+ self.get_success(
+ store.add_account_data_to_room("test_user", "test_room", "m.per_room", {})
+ )
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order
+ received_rows = self.test_handler.received_rdata_rows
+
+ for t in updates:
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertEqual(stream_name, AccountDataStream.NAME)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, t)
+ self.assertIsNone(row.room_id)
+
+ (stream_name, token, row) = received_rows.pop(0)
+ self.assertIsInstance(row, AccountDataStream.AccountDataStreamRow)
+ self.assertEqual(row.data_type, "m.per_room")
+ self.assertEqual(row.room_id, "test_room")
+
+ self.assertEqual([], received_rows)
diff --git a/tests/replication/tcp/streams/test_events.py b/tests/replication/tcp/streams/test_events.py
new file mode 100644
index 0000000000..51bf0ef4e9
--- /dev/null
+++ b/tests/replication/tcp/streams/test_events.py
@@ -0,0 +1,425 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from typing import List, Optional
+
+from synapse.api.constants import EventTypes, Membership
+from synapse.events import EventBase
+from synapse.replication.tcp.streams._base import _STREAM_UPDATE_TARGET_ROW_COUNT
+from synapse.replication.tcp.streams.events import (
+ EventsStreamCurrentStateRow,
+ EventsStreamEventRow,
+ EventsStreamRow,
+)
+from synapse.rest import admin
+from synapse.rest.client.v1 import login, room
+
+from tests.replication._base import BaseStreamTestCase
+from tests.test_utils.event_injection import inject_event, inject_member_event
+
+
+class EventsStreamTestCase(BaseStreamTestCase):
+ servlets = [
+ admin.register_servlets,
+ login.register_servlets,
+ room.register_servlets,
+ ]
+
+ def prepare(self, reactor, clock, hs):
+ super().prepare(reactor, clock, hs)
+ self.user_id = self.register_user("u1", "pass")
+ self.user_tok = self.login("u1", "pass")
+
+ self.reconnect()
+
+ self.room_id = self.helper.create_room_as(tok=self.user_tok)
+ self.test_handler.received_rdata_rows.clear()
+
+ def test_update_function_event_row_limit(self):
+ """Test replication with many non-state events
+
+ Checks that all events are correctly replicated when there are lots of
+ event rows to be replicated.
+ """
+ # disconnect, so that we can stack up some changes
+ self.disconnect()
+
+ # generate lots of non-state events. We inject them using inject_event
+ # so that they are not send out over replication until we call self.replicate().
+ events = [
+ self._inject_test_event()
+ for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT + 1)
+ ]
+
+ # also one state event
+ state_event = self._inject_state_event()
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order (as
+ # well as various cache invalidation updates which we ignore)
+ received_rows = [
+ row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+ ]
+
+ for event in events:
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, event.event_id)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, state_event.event_id)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ self.assertEqual(row.data.event_id, state_event.event_id)
+
+ self.assertEqual([], received_rows)
+
+ def test_update_function_huge_state_change(self):
+ """Test replication with many state events
+
+ Ensures that all events are correctly replicated when there are lots of
+ state change rows to be replicated.
+ """
+
+ # we want to generate lots of state changes at a single stream ID.
+ #
+ # We do this by having two branches in the DAG. On one, we have a moderator
+ # which that generates lots of state; on the other, we de-op the moderator,
+ # thus invalidating all the state.
+
+ OTHER_USER = "@other_user:localhost"
+
+ # have the user join
+ inject_member_event(self.hs, self.room_id, OTHER_USER, Membership.JOIN)
+
+ # Update existing power levels with mod at PL50
+ pls = self.helper.get_state(
+ self.room_id, EventTypes.PowerLevels, tok=self.user_tok
+ )
+ pls["users"][OTHER_USER] = 50
+ self.helper.send_state(
+ self.room_id, EventTypes.PowerLevels, pls, tok=self.user_tok,
+ )
+
+ # this is the point in the DAG where we make a fork
+ fork_point = self.get_success(
+ self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
+ ) # type: List[str]
+
+ events = [
+ self._inject_state_event(sender=OTHER_USER)
+ for _ in range(_STREAM_UPDATE_TARGET_ROW_COUNT)
+ ]
+
+ self.replicate()
+ # all those events and state changes should have landed
+ self.assertGreaterEqual(
+ len(self.test_handler.received_rdata_rows), 2 * len(events)
+ )
+
+ # disconnect, so that we can stack up the changes
+ self.disconnect()
+ self.test_handler.received_rdata_rows.clear()
+
+ # a state event which doesn't get rolled back, to check that the state
+ # before the huge update comes through ok
+ state1 = self._inject_state_event()
+
+ # roll back all the state by de-modding the user
+ prev_events = fork_point
+ pls["users"][OTHER_USER] = 0
+ pl_event = inject_event(
+ self.hs,
+ prev_event_ids=prev_events,
+ type=EventTypes.PowerLevels,
+ state_key="",
+ sender=self.user_id,
+ room_id=self.room_id,
+ content=pls,
+ )
+
+ # one more bit of state that doesn't get rolled back
+ state2 = self._inject_state_event()
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order (as
+ # well as various cache invalidation updates which we ignore)
+ #
+ # we expect:
+ #
+ # - two rows for state1
+ # - the PL event row, plus state rows for the PL event and each
+ # of the states that got reverted.
+ # - two rows for state2
+
+ received_rows = [
+ row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+ ]
+
+ # first check the first two rows, which should be state1
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, state1.event_id)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ self.assertEqual(row.data.event_id, state1.event_id)
+
+ # now the last two rows, which should be state2
+ stream_name, token, row = received_rows.pop(-2)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, state2.event_id)
+
+ stream_name, token, row = received_rows.pop(-1)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ self.assertEqual(row.data.event_id, state2.event_id)
+
+ # that should leave us with the rows for the PL event
+ self.assertEqual(len(received_rows), len(events) + 2)
+
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, pl_event.event_id)
+
+ # the state rows are unsorted
+ state_rows = [] # type: List[EventsStreamCurrentStateRow]
+ for stream_name, token, row in received_rows:
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ state_rows.append(row.data)
+
+ state_rows.sort(key=lambda r: r.state_key)
+
+ sr = state_rows.pop(0)
+ self.assertEqual(sr.type, EventTypes.PowerLevels)
+ self.assertEqual(sr.event_id, pl_event.event_id)
+ for sr in state_rows:
+ self.assertEqual(sr.type, "test_state_event")
+ # "None" indicates the state has been deleted
+ self.assertIsNone(sr.event_id)
+
+ def test_update_function_state_row_limit(self):
+ """Test replication with many state events over several stream ids.
+ """
+
+ # we want to generate lots of state changes, but for this test, we want to
+ # spread out the state changes over a few stream IDs.
+ #
+ # We do this by having two branches in the DAG. On one, we have four moderators,
+ # each of which that generates lots of state; on the other, we de-op the users,
+ # thus invalidating all the state.
+
+ NUM_USERS = 4
+ STATES_PER_USER = _STREAM_UPDATE_TARGET_ROW_COUNT // 4 + 1
+
+ user_ids = ["@user%i:localhost" % (i,) for i in range(NUM_USERS)]
+
+ # have the users join
+ for u in user_ids:
+ inject_member_event(self.hs, self.room_id, u, Membership.JOIN)
+
+ # Update existing power levels with mod at PL50
+ pls = self.helper.get_state(
+ self.room_id, EventTypes.PowerLevels, tok=self.user_tok
+ )
+ pls["users"].update({u: 50 for u in user_ids})
+ self.helper.send_state(
+ self.room_id, EventTypes.PowerLevels, pls, tok=self.user_tok,
+ )
+
+ # this is the point in the DAG where we make a fork
+ fork_point = self.get_success(
+ self.hs.get_datastore().get_latest_event_ids_in_room(self.room_id)
+ ) # type: List[str]
+
+ events = [] # type: List[EventBase]
+ for user in user_ids:
+ events.extend(
+ self._inject_state_event(sender=user) for _ in range(STATES_PER_USER)
+ )
+
+ self.replicate()
+
+ # all those events and state changes should have landed
+ self.assertGreaterEqual(
+ len(self.test_handler.received_rdata_rows), 2 * len(events)
+ )
+
+ # disconnect, so that we can stack up the changes
+ self.disconnect()
+ self.test_handler.received_rdata_rows.clear()
+
+ # now roll back all that state by de-modding the users
+ prev_events = fork_point
+ pl_events = []
+ for u in user_ids:
+ pls["users"][u] = 0
+ e = inject_event(
+ self.hs,
+ prev_event_ids=prev_events,
+ type=EventTypes.PowerLevels,
+ state_key="",
+ sender=self.user_id,
+ room_id=self.room_id,
+ content=pls,
+ )
+ prev_events = [e.event_id]
+ pl_events.append(e)
+
+ # tell the notifier to catch up to avoid duplicate rows.
+ # workaround for https://github.com/matrix-org/synapse/issues/7360
+ # FIXME remove this when the above is fixed
+ self.replicate()
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual([], self.test_handler.received_rdata_rows)
+
+ # now reconnect to pull the updates
+ self.reconnect()
+ self.replicate()
+
+ # we should have received all the expected rows in the right order (as
+ # well as various cache invalidation updates which we ignore)
+ received_rows = [
+ row for row in self.test_handler.received_rdata_rows if row[0] == "events"
+ ]
+ self.assertGreaterEqual(len(received_rows), len(events))
+ for i in range(NUM_USERS):
+ # for each user, we expect the PL event row, followed by state rows for
+ # the PL event and each of the states that got reverted.
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "ev")
+ self.assertIsInstance(row.data, EventsStreamEventRow)
+ self.assertEqual(row.data.event_id, pl_events[i].event_id)
+
+ # the state rows are unsorted
+ state_rows = [] # type: List[EventsStreamCurrentStateRow]
+ for j in range(STATES_PER_USER + 1):
+ stream_name, token, row = received_rows.pop(0)
+ self.assertEqual("events", stream_name)
+ self.assertIsInstance(row, EventsStreamRow)
+ self.assertEqual(row.type, "state")
+ self.assertIsInstance(row.data, EventsStreamCurrentStateRow)
+ state_rows.append(row.data)
+
+ state_rows.sort(key=lambda r: r.state_key)
+
+ sr = state_rows.pop(0)
+ self.assertEqual(sr.type, EventTypes.PowerLevels)
+ self.assertEqual(sr.event_id, pl_events[i].event_id)
+ for sr in state_rows:
+ self.assertEqual(sr.type, "test_state_event")
+ # "None" indicates the state has been deleted
+ self.assertIsNone(sr.event_id)
+
+ self.assertEqual([], received_rows)
+
+ event_count = 0
+
+ def _inject_test_event(
+ self, body: Optional[str] = None, sender: Optional[str] = None, **kwargs
+ ) -> EventBase:
+ if sender is None:
+ sender = self.user_id
+
+ if body is None:
+ body = "event %i" % (self.event_count,)
+ self.event_count += 1
+
+ return inject_event(
+ self.hs,
+ room_id=self.room_id,
+ sender=sender,
+ type="test_event",
+ content={"body": body},
+ **kwargs
+ )
+
+ def _inject_state_event(
+ self,
+ body: Optional[str] = None,
+ state_key: Optional[str] = None,
+ sender: Optional[str] = None,
+ ) -> EventBase:
+ if sender is None:
+ sender = self.user_id
+
+ if state_key is None:
+ state_key = "state_%i" % (self.event_count,)
+ self.event_count += 1
+
+ if body is None:
+ body = "state event %s" % (state_key,)
+
+ return inject_event(
+ self.hs,
+ room_id=self.room_id,
+ sender=sender,
+ type="test_state_event",
+ state_key=state_key,
+ content={"body": body},
+ )
diff --git a/tests/replication/tcp/streams/test_federation.py b/tests/replication/tcp/streams/test_federation.py
new file mode 100644
index 0000000000..2babea4e3e
--- /dev/null
+++ b/tests/replication/tcp/streams/test_federation.py
@@ -0,0 +1,81 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.federation.send_queue import EduRow
+from synapse.replication.tcp.streams.federation import FederationStream
+
+from tests.replication._base import BaseStreamTestCase
+
+
+class FederationStreamTestCase(BaseStreamTestCase):
+ def _get_worker_hs_config(self) -> dict:
+ # enable federation sending on the worker
+ config = super()._get_worker_hs_config()
+ # TODO: make it so we don't need both of these
+ config["send_federation"] = True
+ config["worker_app"] = "synapse.app.federation_sender"
+ return config
+
+ def test_catchup(self):
+ """Basic test of catchup on reconnect
+
+ Makes sure that updates sent while we are offline are received later.
+ """
+ fed_sender = self.hs.get_federation_sender()
+ received_rows = self.test_handler.received_rdata_rows
+
+ fed_sender.build_and_send_edu("testdest", "m.test_edu", {"a": "b"})
+
+ self.reconnect()
+ self.reactor.advance(0)
+
+ # check we're testing what we think we are: no rows should yet have been
+ # received
+ self.assertEqual(received_rows, [])
+
+ # We should now see an attempt to connect to the master
+ request = self.handle_http_replication_attempt()
+ self.assert_request_is_get_repl_stream_updates(request, "federation")
+
+ # we should have received an update row
+ stream_name, token, row = received_rows.pop()
+ self.assertEqual(stream_name, "federation")
+ self.assertIsInstance(row, FederationStream.FederationStreamRow)
+ self.assertEqual(row.type, EduRow.TypeId)
+ edurow = EduRow.from_data(row.data)
+ self.assertEqual(edurow.edu.edu_type, "m.test_edu")
+ self.assertEqual(edurow.edu.origin, self.hs.hostname)
+ self.assertEqual(edurow.edu.destination, "testdest")
+ self.assertEqual(edurow.edu.content, {"a": "b"})
+
+ self.assertEqual(received_rows, [])
+
+ # additional updates should be transferred without an HTTP hit
+ fed_sender.build_and_send_edu("testdest", "m.test1", {"c": "d"})
+ self.reactor.advance(0)
+ # there should be no http hit
+ self.assertEqual(len(self.reactor.tcpClients), 0)
+ # ... but we should have a row
+ self.assertEqual(len(received_rows), 1)
+
+ stream_name, token, row = received_rows.pop()
+ self.assertEqual(stream_name, "federation")
+ self.assertIsInstance(row, FederationStream.FederationStreamRow)
+ self.assertEqual(row.type, EduRow.TypeId)
+ edurow = EduRow.from_data(row.data)
+ self.assertEqual(edurow.edu.edu_type, "m.test1")
+ self.assertEqual(edurow.edu.origin, self.hs.hostname)
+ self.assertEqual(edurow.edu.destination, "testdest")
+ self.assertEqual(edurow.edu.content, {"c": "d"})
diff --git a/tests/replication/tcp/streams/test_receipts.py b/tests/replication/tcp/streams/test_receipts.py
index d5a99f6caa..56b062ecc1 100644
--- a/tests/replication/tcp/streams/test_receipts.py
+++ b/tests/replication/tcp/streams/test_receipts.py
@@ -12,35 +12,73 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.replication.tcp.streams._base import ReceiptsStreamRow
-from tests.replication.tcp.streams._base import BaseStreamTestCase
+# type: ignore
+
+from mock import Mock
+
+from synapse.replication.tcp.streams._base import ReceiptsStream
+
+from tests.replication._base import BaseStreamTestCase
USER_ID = "@feeling:blue"
-ROOM_ID = "!room:blue"
-EVENT_ID = "$event:blue"
class ReceiptsStreamTestCase(BaseStreamTestCase):
+ def _build_replication_data_handler(self):
+ return Mock(wraps=super()._build_replication_data_handler())
+
def test_receipt(self):
- # make the client subscribe to the receipts stream
- self.replicate_stream("receipts", "NOW")
+ self.reconnect()
# tell the master to send a new receipt
self.get_success(
self.hs.get_datastore().insert_receipt(
- ROOM_ID, "m.read", USER_ID, [EVENT_ID], {"a": 1}
+ "!room:blue", "m.read", USER_ID, ["$event:blue"], {"a": 1}
)
)
self.replicate()
# there should be one RDATA command
- rdata_rows = self.test_handler.received_rdata_rows
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "receipts")
self.assertEqual(1, len(rdata_rows))
- self.assertEqual(rdata_rows[0][0], "receipts")
- row = rdata_rows[0][2] # type: ReceiptsStreamRow
- self.assertEqual(ROOM_ID, row.room_id)
+ row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow
+ self.assertEqual("!room:blue", row.room_id)
self.assertEqual("m.read", row.receipt_type)
self.assertEqual(USER_ID, row.user_id)
- self.assertEqual(EVENT_ID, row.event_id)
+ self.assertEqual("$event:blue", row.event_id)
self.assertEqual({"a": 1}, row.data)
+
+ # Now let's disconnect and insert some data.
+ self.disconnect()
+
+ self.test_handler.on_rdata.reset_mock()
+
+ self.get_success(
+ self.hs.get_datastore().insert_receipt(
+ "!room2:blue", "m.read", USER_ID, ["$event2:foo"], {"a": 2}
+ )
+ )
+ self.replicate()
+
+ # Nothing should have happened as we are disconnected
+ self.test_handler.on_rdata.assert_not_called()
+
+ self.reconnect()
+ self.pump(0.1)
+
+ # We should now have caught up and get the missing data
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "receipts")
+ self.assertEqual(token, 3)
+ self.assertEqual(1, len(rdata_rows))
+
+ row = rdata_rows[0] # type: ReceiptsStream.ReceiptsStreamRow
+ self.assertEqual("!room2:blue", row.room_id)
+ self.assertEqual("m.read", row.receipt_type)
+ self.assertEqual(USER_ID, row.user_id)
+ self.assertEqual("$event2:foo", row.event_id)
+ self.assertEqual({"a": 2}, row.data)
diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
new file mode 100644
index 0000000000..fd62b26356
--- /dev/null
+++ b/tests/replication/tcp/streams/test_typing.py
@@ -0,0 +1,77 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from mock import Mock
+
+from synapse.handlers.typing import RoomMember
+from synapse.replication.tcp.streams import TypingStream
+
+from tests.replication._base import BaseStreamTestCase
+
+USER_ID = "@feeling:blue"
+
+
+class TypingStreamTestCase(BaseStreamTestCase):
+ def _build_replication_data_handler(self):
+ return Mock(wraps=super()._build_replication_data_handler())
+
+ def test_typing(self):
+ typing = self.hs.get_typing_handler()
+
+ room_id = "!bar:blue"
+
+ self.reconnect()
+
+ typing._push_update(member=RoomMember(room_id, USER_ID), typing=True)
+
+ self.reactor.advance(0)
+
+ # We should now see an attempt to connect to the master
+ request = self.handle_http_replication_attempt()
+ self.assert_request_is_get_repl_stream_updates(request, "typing")
+
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "typing")
+ self.assertEqual(1, len(rdata_rows))
+ row = rdata_rows[0] # type: TypingStream.TypingStreamRow
+ self.assertEqual(room_id, row.room_id)
+ self.assertEqual([USER_ID], row.user_ids)
+
+ # Now let's disconnect and insert some data.
+ self.disconnect()
+
+ self.test_handler.on_rdata.reset_mock()
+
+ typing._push_update(member=RoomMember(room_id, USER_ID), typing=False)
+
+ self.test_handler.on_rdata.assert_not_called()
+
+ self.reconnect()
+ self.pump(0.1)
+
+ # We should now see an attempt to connect to the master
+ request = self.handle_http_replication_attempt()
+ self.assert_request_is_get_repl_stream_updates(request, "typing")
+
+ # The from token should be the token from the last RDATA we got.
+ self.assertEqual(int(request.args[b"from_token"][0]), token)
+
+ self.test_handler.on_rdata.assert_called_once()
+ stream_name, _, token, rdata_rows = self.test_handler.on_rdata.call_args[0]
+ self.assertEqual(stream_name, "typing")
+ self.assertEqual(1, len(rdata_rows))
+ row = rdata_rows[0]
+ self.assertEqual(room_id, row.room_id)
+ self.assertEqual([], row.user_ids)
|