Reduce replication traffic due to reflected cache stream POSITION (#16557)
2 files changed, 19 insertions, 1 deletions
diff --git a/changelog.d/16557.bugfix b/changelog.d/16557.bugfix
new file mode 100644
index 0000000000..4f4a0380cd
--- /dev/null
+++ b/changelog.d/16557.bugfix
@@ -0,0 +1 @@
+Fix a long-standing, exceedingly rare edge case where the first event persisted by a new event persister worker might not be sent down `/sync`.
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d9a29d22e..38abb5df54 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -27,7 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.replication.tcp.commands import PositionCommand
from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
from synapse.replication.tcp.streams import EventsStream
-from synapse.replication.tcp.streams._base import StreamRow, Token
+from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token
from synapse.util.metrics import Measure
if TYPE_CHECKING:
@@ -204,6 +204,23 @@ class ReplicationStreamer:
# The token has advanced but there is no data to
# send, so we send a `POSITION` to inform other
# workers of the updated position.
+ #
+ # There are two reasons for this: 1) this instance
+ # requested a stream ID but didn't use it, or 2)
+ # this instance advanced its own stream position due
+ # to receiving notifications about other instances
+ # advancing their stream position.
+
+ # We skip sending `POSITION` for the `caches` stream
+ # for the second case as a) it generates a lot of
+ # traffic as every worker would echo each write, and
+ # b) nothing cares if a given worker's caches stream
+ # position lags.
+ if stream.NAME == CachesStream.NAME:
+ # If there haven't been any writes since the
+ # `last_token` then we're in the second case.
+ if stream.minimal_local_current_token() <= last_token:
+ continue
# Note: `last_token` may not *actually* be the
# last token we sent out in a RDATA or POSITION.
|