summary refs log tree commit diff
path: root/tests/replication/slave/storage
diff options
context:
space:
mode:
Diffstat (limited to 'tests/replication/slave/storage')
-rw-r--r--tests/replication/slave/storage/_base.py30
1 files changed, 22 insertions, 8 deletions
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index b82868054d..81063f19a1 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -12,12 +12,15 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+from twisted.internet import defer, reactor
 from tests import unittest
 
 from mock import Mock, NonCallableMock
 from tests.utils import setup_test_homeserver
-from synapse.replication.resource import ReplicationResource
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
+from synapse.replication.tcp.client import (
+    ReplicationClientHandler, ReplicationClientFactory,
+)
 
 
 class BaseSlavedStoreTestCase(unittest.TestCase):
@@ -33,18 +36,29 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
         )
         self.hs.get_ratelimiter().send_message.return_value = (True, 0)
 
-        self.replication = ReplicationResource(self.hs)
-
         self.master_store = self.hs.get_datastore()
         self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs)
         self.event_id = 0
 
+        server_factory = ReplicationStreamProtocolFactory(self.hs)
+        listener = reactor.listenUNIX("\0xxx", server_factory)
+        self.addCleanup(listener.stopListening)
+        self.streamer = server_factory.streamer
+
+        self.replication_handler = ReplicationClientHandler(self.slaved_store)
+        client_factory = ReplicationClientFactory(
+            self.hs, "client_name", self.replication_handler
+        )
+        client_connector = reactor.connectUNIX("\0xxx", client_factory)
+        self.addCleanup(client_factory.stopTrying)
+        self.addCleanup(client_connector.disconnect)
+
     @defer.inlineCallbacks
     def replicate(self):
-        streams = self.slaved_store.stream_positions()
-        writer = yield self.replication.replicate(streams, 100)
-        result = writer.finish()
-        yield self.slaved_store.process_replication(result)
+        yield self.streamer.on_notifier_poke()
+        d = self.replication_handler.await_sync("replication_test")
+        self.streamer.send_sync_to_all_connections("replication_test")
+        yield d
 
     @defer.inlineCallbacks
     def check(self, method, args, expected_result=None):