summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-01-20 18:02:18 +0000
committerGitHub <noreply@github.com>2023-01-20 18:02:18 +0000
commit65d03866936adb144631d263a8539a2cb060fd43 (patch)
tree894f71640642a5bf444d475bbb7831cc512d9b13 /tests
parentDockerfile: Bump Python version from 3.9 to 3.11 (#14875) (diff)
downloadsynapse-65d03866936adb144631d263a8539a2cb060fd43.tar.xz
Always notify replication when a stream advances (#14877)
This ensures that all other workers are told about stream updates in a timely manner, without having to remember to manually poke replication.
Diffstat (limited to 'tests')
-rw-r--r--tests/module_api/test_api.py3
-rw-r--r--tests/replication/tcp/test_handler.py23
-rw-r--r--tests/storage/test_id_generators.py4
3 files changed, 14 insertions, 16 deletions
diff --git a/tests/module_api/test_api.py b/tests/module_api/test_api.py
index 9919938e80..8f88c0117d 100644
--- a/tests/module_api/test_api.py
+++ b/tests/module_api/test_api.py
@@ -404,6 +404,9 @@ class ModuleApiTestCase(HomeserverTestCase):
             self.module_api.send_local_online_presence_to([remote_user_id])
         )
 
+        # We don't always send out federation immediately, so we advance the clock.
+        self.reactor.advance(1000)
+
         # Check that a presence update was sent as part of a federation transaction
         found_update = False
         calls = (
diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py
index 555922409d..6e4055cc21 100644
--- a/tests/replication/tcp/test_handler.py
+++ b/tests/replication/tcp/test_handler.py
@@ -14,7 +14,7 @@
 
 from twisted.internet import defer
 
-from synapse.replication.tcp.commands import PositionCommand, RdataCommand
+from synapse.replication.tcp.commands import PositionCommand
 
 from tests.replication._base import BaseMultiWorkerStreamTestCase
 
@@ -111,20 +111,14 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
         next_token = self.get_success(ctx.__aenter__())
         self.get_success(ctx.__aexit__(None, None, None))
 
-        cmd_handler.send_command(
-            RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
-        )
-        self.replicate()
-
         self.get_success(
             data_handler.wait_for_stream_position("worker1", "caches", next_token)
         )
 
-        # `wait_for_stream_position` should only return once master receives an
-        # RDATA from the worker
-        ctx = cache_id_gen.get_next()
-        next_token = self.get_success(ctx.__aenter__())
-        self.get_success(ctx.__aexit__(None, None, None))
+        # `wait_for_stream_position` should only return once master receives a
+        # notification that `next_token` has persisted.
+        ctx_worker1 = cache_id_gen.get_next()
+        next_token = self.get_success(ctx_worker1.__aenter__())
 
         d = defer.ensureDeferred(
             data_handler.wait_for_stream_position("worker1", "caches", next_token)
@@ -142,10 +136,7 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
         )
         self.assertFalse(d.called)
 
-        # ... but receiving the RDATA should
-        cmd_handler.send_command(
-            RdataCommand("caches", "worker1", next_token, ("func_name", [], 0))
-        )
-        self.replicate()
+        # ... but worker1 finishing (and so sending an update) should.
+        self.get_success(ctx_worker1.__aexit__(None, None, None))
 
         self.assertTrue(d.called)
diff --git a/tests/storage/test_id_generators.py b/tests/storage/test_id_generators.py
index ff9691c518..9174fb0964 100644
--- a/tests/storage/test_id_generators.py
+++ b/tests/storage/test_id_generators.py
@@ -52,6 +52,7 @@ class StreamIdGeneratorTestCase(HomeserverTestCase):
         def _create(conn: LoggingDatabaseConnection) -> StreamIdGenerator:
             return StreamIdGenerator(
                 db_conn=conn,
+                notifier=self.hs.get_replication_notifier(),
                 table="foobar",
                 column="stream_id",
             )
@@ -196,6 +197,7 @@ class MultiWriterIdGeneratorTestCase(HomeserverTestCase):
             return MultiWriterIdGenerator(
                 conn,
                 self.db_pool,
+                notifier=self.hs.get_replication_notifier(),
                 stream_name="test_stream",
                 instance_name=instance_name,
                 tables=[("foobar", "instance_name", "stream_id")],
@@ -630,6 +632,7 @@ class BackwardsMultiWriterIdGeneratorTestCase(HomeserverTestCase):
             return MultiWriterIdGenerator(
                 conn,
                 self.db_pool,
+                notifier=self.hs.get_replication_notifier(),
                 stream_name="test_stream",
                 instance_name=instance_name,
                 tables=[("foobar", "instance_name", "stream_id")],
@@ -766,6 +769,7 @@ class MultiTableMultiWriterIdGeneratorTestCase(HomeserverTestCase):
             return MultiWriterIdGenerator(
                 conn,
                 self.db_pool,
+                notifier=self.hs.get_replication_notifier(),
                 stream_name="test_stream",
                 instance_name=instance_name,
                 tables=[