summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2023-02-24 14:39:50 +0000
committerGitHub <noreply@github.com>2023-02-24 14:39:50 +0000
commitb2357a898cdd1f4a2222609abfe471801ea88dcd (patch)
treeb02eb9a2d99e5bbf66743fe644c4629f731e25e8
parentTweak changelog (diff)
downloadsynapse-b2357a898cdd1f4a2222609abfe471801ea88dcd.tar.xz
Fix bug where 5s delays would occasionally happen. (#15150)
This only affects deployments using workers.
-rw-r--r--changelog.d/15150.bugfix1
-rw-r--r--synapse/replication/tcp/resource.py18
-rw-r--r--tests/replication/tcp/test_handler.py61
3 files changed, 80 insertions, 0 deletions
diff --git a/changelog.d/15150.bugfix b/changelog.d/15150.bugfix
new file mode 100644
index 0000000000..8668bc587f
--- /dev/null
+++ b/changelog.d/15150.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in Synapse 1.76 where 5s delays would occasionally occur in deployments using workers.
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 9d17eff714..347467d863 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -238,6 +238,24 @@ class ReplicationStreamer:
                             except Exception:
                                 logger.exception("Failed to replicate")
 
+                        # The last token we send may not match the current
+                        # token, in which case we want to send out a `POSITION`
+                        # to tell other workers the actual current position.
+                        if updates[-1][0] < current_token:
+                            logger.info(
+                                "Sending position: %s -> %s",
+                                stream.NAME,
+                                current_token,
+                            )
+                            self.command_handler.send_command(
+                                PositionCommand(
+                                    stream.NAME,
+                                    self._instance_name,
+                                    updates[-1][0],
+                                    current_token,
+                                )
+                            )
+
             logger.debug("No more pending updates, breaking poke loop")
         finally:
             self.pending_updates = False
diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py
index bf927beb6a..bab77b2df7 100644
--- a/tests/replication/tcp/test_handler.py
+++ b/tests/replication/tcp/test_handler.py
@@ -141,3 +141,64 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
         self.get_success(ctx_worker1.__aexit__(None, None, None))
 
         self.assertTrue(d.called)
+
+    def test_wait_for_stream_position_rdata(self) -> None:
+        """Check that wait for stream position correctly waits for an update
+        from the correct instance, when RDATA is sent.
+        """
+        store = self.hs.get_datastores().main
+        cmd_handler = self.hs.get_replication_command_handler()
+        data_handler = self.hs.get_replication_data_handler()
+
+        worker1 = self.make_worker_hs(
+            "synapse.app.generic_worker",
+            extra_config={
+                "worker_name": "worker1",
+                "run_background_tasks_on": "worker1",
+                "redis": {"enabled": True},
+            },
+        )
+
+        cache_id_gen = worker1.get_datastores().main._cache_id_gen
+        assert cache_id_gen is not None
+
+        self.replicate()
+
+        # First, make sure the master knows that `worker1` exists.
+        initial_token = cache_id_gen.get_current_token()
+        cmd_handler.send_command(
+            PositionCommand("caches", "worker1", initial_token, initial_token)
+        )
+        self.replicate()
+
+        # `wait_for_stream_position` should only return once master receives a
+        # notification that `next_token2` has persisted.
+        ctx_worker1 = cache_id_gen.get_next_mult(2)
+        next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__())
+
+        d = defer.ensureDeferred(
+            data_handler.wait_for_stream_position("worker1", "caches", next_token2)
+        )
+        self.assertFalse(d.called)
+
+        # Insert an entry into the cache stream with token `next_token1`, but
+        # not `next_token2`.
+        self.get_success(
+            store.db_pool.simple_insert(
+                table="cache_invalidation_stream_by_instance",
+                values={
+                    "stream_id": next_token1,
+                    "instance_name": "worker1",
+                    "cache_func": "foo",
+                    "keys": [],
+                    "invalidation_ts": 0,
+                },
+            )
+        )
+
+        # Finish the context manager, triggering the data to be sent to master.
+        self.get_success(ctx_worker1.__aexit__(None, None, None))
+
+        # Master should get told about `next_token2`, so the deferred should
+        # resolve.
+        self.assertTrue(d.called)