diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py
index 555922409d..6e4055cc21 100644
--- a/tests/replication/tcp/test_handler.py
+++ b/tests/replication/tcp/test_handler.py
@@ -14,7 +14,7 @@
from twisted.internet import defer
-from synapse.replication.tcp.commands import PositionCommand, RdataCommand
+from synapse.replication.tcp.commands import PositionCommand
from tests.replication._base import BaseMultiWorkerStreamTestCase
@@ -111,20 +111,14 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
next_token = self.get_success(ctx.__aenter__())
self.get_success(ctx.__aexit__(None, None, None))
- cmd_handler.send_command(
- RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
- )
- self.replicate()
-
self.get_success(
data_handler.wait_for_stream_position("worker1", "caches", next_token)
)
- # `wait_for_stream_position` should only return once master receives an
- # RDATA from the worker
- ctx = cache_id_gen.get_next()
- next_token = self.get_success(ctx.__aenter__())
- self.get_success(ctx.__aexit__(None, None, None))
+ # `wait_for_stream_position` should only return once master receives a
+ # notification that `next_token` has persisted.
+ ctx_worker1 = cache_id_gen.get_next()
+ next_token = self.get_success(ctx_worker1.__aenter__())
d = defer.ensureDeferred(
data_handler.wait_for_stream_position("worker1", "caches", next_token)
@@ -142,10 +136,7 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
)
self.assertFalse(d.called)
- # ... but receiving the RDATA should
- cmd_handler.send_command(
- RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
- )
- self.replicate()
+ # ... but worker1 finishing (and so sending an update) should.
+ self.get_success(ctx_worker1.__aexit__(None, None, None))
self.assertTrue(d.called)
|