diff options
author | Erik Johnston <erikj@matrix.org> | 2023-10-27 12:51:08 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-10-27 12:51:08 +0100 |
commit | 0680d76659c03cb190e0ae37af7d9db3014e3627 (patch) | |
tree | 8234d8d88ae882b77f8c4ccc06c622920ae2a9d6 /synapse | |
parent | Add new module API for adding custom fields to events `unsigned` section (#16... (diff) | |
download | synapse-0680d76659c03cb190e0ae37af7d9db3014e3627.tar.xz |
Reduce replication traffic due to reflected cache stream POSITION (#16557)
Diffstat (limited to '')
-rw-r--r-- | synapse/replication/tcp/resource.py | 19 |
1 files changed, 18 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 1d9a29d22e..38abb5df54 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -27,7 +27,7 @@ from synapse.metrics.background_process_metrics import run_as_background_process from synapse.replication.tcp.commands import PositionCommand from synapse.replication.tcp.protocol import ServerReplicationStreamProtocol from synapse.replication.tcp.streams import EventsStream -from synapse.replication.tcp.streams._base import StreamRow, Token +from synapse.replication.tcp.streams._base import CachesStream, StreamRow, Token from synapse.util.metrics import Measure if TYPE_CHECKING: @@ -204,6 +204,23 @@ class ReplicationStreamer: # The token has advanced but there is no data to # send, so we send a `POSITION` to inform other # workers of the updated position. + # + # There are two reasons for this: 1) this instance + # requested a stream ID but didn't use it, or 2) + # this instance advanced its own stream position due + # to receiving notifications about other instances + # advancing their stream position. + + # We skip sending `POSITION` for the `caches` stream + # for the second case as a) it generates a lot of + # traffic as every worker would echo each write, and + # b) nothing cares if a given worker's caches stream + # position lags. + if stream.NAME == CachesStream.NAME: + # If there haven't been any writes since the + # `last_token` then we're in the second case. + if stream.minimal_local_current_token() <= last_token: + continue # Note: `last_token` may not *actually* be the # last token we sent out in a RDATA or POSITION. |