diff options
author | Erik Johnston <erikj@jki.re> | 2017-04-05 09:36:21 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-04-05 09:36:21 +0100 |
commit | a5c401bd12fd4aab08df5c2fa74ec469760a9c1d (patch) | |
tree | cabfe2496262251db0cef9bbb69143839643b940 /tests | |
parent | Merge pull request #2099 from matrix-org/erikj/deviceinbox_reduce (diff) | |
parent | Add comment (diff) | |
download | synapse-a5c401bd12fd4aab08df5c2fa74ec469760a9c1d.tar.xz |
Merge pull request #2097 from matrix-org/erikj/repl_tcp_client
Move to using TCP replication
Diffstat (limited to 'tests')
-rw-r--r-- | tests/replication/slave/storage/_base.py | 30 |
1 files changed, 22 insertions, 8 deletions
diff --git a/tests/replication/slave/storage/_base.py b/tests/replication/slave/storage/_base.py index b82868054d..81063f19a1 100644 --- a/tests/replication/slave/storage/_base.py +++ b/tests/replication/slave/storage/_base.py @@ -12,12 +12,15 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +from twisted.internet import defer, reactor from tests import unittest from mock import Mock, NonCallableMock from tests.utils import setup_test_homeserver -from synapse.replication.resource import ReplicationResource +from synapse.replication.tcp.resource import ReplicationStreamProtocolFactory +from synapse.replication.tcp.client import ( + ReplicationClientHandler, ReplicationClientFactory, +) class BaseSlavedStoreTestCase(unittest.TestCase): @@ -33,18 +36,29 @@ class BaseSlavedStoreTestCase(unittest.TestCase): ) self.hs.get_ratelimiter().send_message.return_value = (True, 0) - self.replication = ReplicationResource(self.hs) - self.master_store = self.hs.get_datastore() self.slaved_store = self.STORE_TYPE(self.hs.get_db_conn(), self.hs) self.event_id = 0 + server_factory = ReplicationStreamProtocolFactory(self.hs) + listener = reactor.listenUNIX("\0xxx", server_factory) + self.addCleanup(listener.stopListening) + self.streamer = server_factory.streamer + + self.replication_handler = ReplicationClientHandler(self.slaved_store) + client_factory = ReplicationClientFactory( + self.hs, "client_name", self.replication_handler + ) + client_connector = reactor.connectUNIX("\0xxx", client_factory) + self.addCleanup(client_factory.stopTrying) + self.addCleanup(client_connector.disconnect) + @defer.inlineCallbacks def replicate(self): - streams = self.slaved_store.stream_positions() - writer = yield self.replication.replicate(streams, 100) - result = writer.finish() - yield self.slaved_store.process_replication(result) + yield self.streamer.on_notifier_poke() + d = self.replication_handler.await_sync("replication_test") + self.streamer.send_sync_to_all_connections("replication_test") + yield d @defer.inlineCallbacks def check(self, method, args, expected_result=None): |