From 3a1f3f838862cfd2486773079d01e30c2237e8c2 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Mon, 27 Mar 2017 16:32:07 +0100 Subject: Change slave storage to use new replication interface As the TCP replication uses a slightly different API and streams than the HTTP replication. This breaks HTTP replication. --- tests/replication/slave/storage/_base.py | 30 ++++++++++++++++++++++-------- 1 file changed, 22 insertions(+), 8 deletions(-) (limited to 'tests/replication/slave') diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index b82868054d..81063f19a1 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor from tests import unittest from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver -from synapse.replication.resource import ReplicationResource +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory +from synapse.replication.tcp.client import ( + ReplicationClientHandler, ReplicationClientFactory, +) class BaseSlavedStoreTestCase(unittest.TestCase): @@ -33,18 +36,29 @@ class BaseSlavedStoreTestCase(unittest.TestCase): ) self.hs.get_ratelimiter().send_message.return_value = (True, 0) - self.replication = ReplicationResource(self.hs) - self.master_store = self.hs.get_datastore() self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.event_id = 0 + server_factory = ReplicationStreamProtocolFactory(self.hs) + listener = reactor.listenUNIX("\0xxx", server_factory) + self.addCleanup(listener.stopListening) + self.streamer = server_factory.streamer + + self.replication_handler = ReplicationClientHandler(self.slaved_store) + client_factory = ReplicationClientFactory( + self.hs, "client_name", self.replication_handler + ) + client_connector = reactor.connectUNIX("\0xxx", client_factory) + self.addCleanup(client_factory.stopTrying) + self.addCleanup(client_connector.disconnect) + @defer.inlineCallbacks def replicate(self): - streams = self.slaved_store.stream_positions() - writer = yield self.replication.replicate(streams, 100) - result = writer.finish() - yield self.slaved_store.process_replication(result) + yield self.streamer.on_notifier_poke() + d = self.replication_handler.await_sync("replication_test") + self.streamer.send_sync_to_all_connections("replication_test") + yield d @defer.inlineCallbacks def check(self, method, args, expected_result=None): -- cgit 1.4.1