diff --git a/CHANGES.md b/CHANGES.md
index f5c19bcb97..644ef6e036 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,12 @@
+Synapse 1.78.0 (2023-02-28)
+===========================
+
+Bugfixes
+--------
+
+- Fix a bug introduced in Synapse 1.76 where 5s delays would occasionally occur in deployments using workers. ([\#15150](https://github.com/matrix-org/synapse/issues/15150))
+
+
Synapse 1.78.0rc1 (2023-02-21)
==============================
diff --git a/debian/changelog b/debian/changelog
index f9e95ee5e2..0f094308c1 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+matrix-synapse-py3 (1.78.0) stable; urgency=medium
+
+ * New Synapse release 1.78.0.
+
+ -- Synapse Packaging team <packages@matrix.org> Tue, 28 Feb 2023 08:56:03 -0800
+
matrix-synapse-py3 (1.78.0~rc1) stable; urgency=medium
* Add `matrix-org-archive-keyring` package as recommended.
diff --git a/pyproject.toml b/pyproject.toml
index a48b35fa63..27785b6e13 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -89,7 +89,7 @@ manifest-path = "rust/Cargo.toml"
[tool.poetry]
name = "matrix-synapse"
-version = "1.78.0rc1"
+version = "1.78.0"
description = "Homeserver for the Matrix decentralised comms protocol"
authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
license = "Apache-2.0"
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 9d17eff714..347467d863 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -238,6 +238,24 @@ class ReplicationStreamer:
except Exception:
logger.exception("Failed to replicate")
+ # The last token we send may not match the current
+ # token, in which case we want to send out a `POSITION`
+ # to tell other workers the actual current position.
+ if updates[-1][0] < current_token:
+ logger.info(
+ "Sending position: %s -> %s",
+ stream.NAME,
+ current_token,
+ )
+ self.command_handler.send_command(
+ PositionCommand(
+ stream.NAME,
+ self._instance_name,
+ updates[-1][0],
+ current_token,
+ )
+ )
+
logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
diff --git a/tests/replication/tcp/test_handler.py b/tests/replication/tcp/test_handler.py
index bf927beb6a..bab77b2df7 100644
--- a/tests/replication/tcp/test_handler.py
+++ b/tests/replication/tcp/test_handler.py
@@ -141,3 +141,64 @@ class ChannelsTestCase(BaseMultiWorkerStreamTestCase):
self.get_success(ctx_worker1.__aexit__(None, None, None))
self.assertTrue(d.called)
+
+ def test_wait_for_stream_position_rdata(self) -> None:
+ """Check that wait for stream position correctly waits for an update
+ from the correct instance, when RDATA is sent.
+ """
+ store = self.hs.get_datastores().main
+ cmd_handler = self.hs.get_replication_command_handler()
+ data_handler = self.hs.get_replication_data_handler()
+
+ worker1 = self.make_worker_hs(
+ "synapse.app.generic_worker",
+ extra_config={
+ "worker_name": "worker1",
+ "run_background_tasks_on": "worker1",
+ "redis": {"enabled": True},
+ },
+ )
+
+ cache_id_gen = worker1.get_datastores().main._cache_id_gen
+ assert cache_id_gen is not None
+
+ self.replicate()
+
+ # First, make sure the master knows that `worker1` exists.
+ initial_token = cache_id_gen.get_current_token()
+ cmd_handler.send_command(
+ PositionCommand("caches", "worker1", initial_token, initial_token)
+ )
+ self.replicate()
+
+ # `wait_for_stream_position` should only return once master receives a
+ # notification that `next_token2` has persisted.
+ ctx_worker1 = cache_id_gen.get_next_mult(2)
+ next_token1, next_token2 = self.get_success(ctx_worker1.__aenter__())
+
+ d = defer.ensureDeferred(
+ data_handler.wait_for_stream_position("worker1", "caches", next_token2)
+ )
+ self.assertFalse(d.called)
+
+ # Insert an entry into the cache stream with token `next_token1`, but
+ # not `next_token2`.
+ self.get_success(
+ store.db_pool.simple_insert(
+ table="cache_invalidation_stream_by_instance",
+ values={
+ "stream_id": next_token1,
+ "instance_name": "worker1",
+ "cache_func": "foo",
+ "keys": [],
+ "invalidation_ts": 0,
+ },
+ )
+ )
+
+ # Finish the context manager, triggering the data to be sent to master.
+ self.get_success(ctx_worker1.__aexit__(None, None, None))
+
+ # Master should get told about `next_token2`, so the deferred should
+ # resolve.
+ self.assertTrue(d.called)
|