Change slave storage to use new replication interface
As the TCP replication uses a slightly different API and streams than
the HTTP replication.
This breaks HTTP replication.
1 files changed, 22 insertions, 8 deletions
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py
index b82868054d..81063f19a1 100644
--- a/tests/replication/slave/storage/_base.py
+++ b/tests/replication/slave/storage/_base.py
@@ -12,12 +12,15 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from twisted.internet import defer
+from twisted.internet import defer, reactor
from tests import unittest
from mock import Mock, NonCallableMock
from tests.utils import setup_test_homeserver
-from synapse.replication.resource import ReplicationResource
+from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory
+from synapse.replication.tcp.client import (
+ ReplicationClientHandler, ReplicationClientFactory,
+)
class BaseSlavedStoreTestCase(unittest.TestCase):
@@ -33,18 +36,29 @@ class BaseSlavedStoreTestCase(unittest.TestCase):
)
self.hs.get_ratelimiter().send_message.return_value = (True, 0)
- self.replication = ReplicationResource(self.hs)
-
self.master_store = self.hs.get_datastore()
self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs)
self.event_id = 0
+ server_factory = ReplicationStreamProtocolFactory(self.hs)
+ listener = reactor.listenUNIX("\0xxx", server_factory)
+ self.addCleanup(listener.stopListening)
+ self.streamer = server_factory.streamer
+
+ self.replication_handler = ReplicationClientHandler(self.slaved_store)
+ client_factory = ReplicationClientFactory(
+ self.hs, "client_name", self.replication_handler
+ )
+ client_connector = reactor.connectUNIX("\0xxx", client_factory)
+ self.addCleanup(client_factory.stopTrying)
+ self.addCleanup(client_connector.disconnect)
+
@defer.inlineCallbacks
def replicate(self):
- streams = self.slaved_store.stream_positions()
- writer = yield self.replication.replicate(streams, 100)
- result = writer.finish()
- yield self.slaved_store.process_replication(result)
+ yield self.streamer.on_notifier_poke()
+ d = self.replication_handler.await_sync("replication_test")
+ self.streamer.send_sync_to_all_connections("replication_test")
+ yield d
@defer.inlineCallbacks
def check(self, method, args, expected_result=None):
|