summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py19
1 files changed, 18 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 1d9a29d22e..38abb5df54 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -27,7 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.tcp.commands import PositionCommand
 from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol
 from synapse.replication.tcp.streams import EventsStream
-from synapse.replication.tcp.streams._base import StreamRow, Token
+from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -204,6 +204,23 @@ class ReplicationStreamer:
                             # The token has advanced but there is no data to
                             # send, so we send a `POSITION` to inform other
                             # workers of the updated position.
+                            #
+                            # There are two reasons for this: 1) this instance
+                            # requested a stream ID but didn't use it, or 2)
+                            # this instance advanced its own stream position due
+                            # to receiving notifications about other instances
+                            # advancing their stream position.
+
+                            # We skip sending `POSITION` for the `caches` stream
+                            # for the second case as a) it generates a lot of
+                            # traffic as every worker would echo each write, and
+                            # b) nothing cares if a given worker's caches stream
+                            # position lags.
+                            if stream.NAME == CachesStream.NAME:
+                                # If there haven't been any writes since the
+                                # `last_token` then we're in the second case.
+                                if stream.minimal_local_current_token() <= last_token:
+                                    continue
 
                             # Note: `last_token` may not *actually* be the
                             # last token we sent out in a RDATA or POSITION.