Reduce spurious replication catchup (#16555)
2 files changed, 10 insertions, 5 deletions
diff --git a/changelog.d/16555.misc b/changelog.d/16555.misc
new file mode 100644
index 0000000000..d02efb2114
--- /dev/null
+++ b/changelog.d/16555.misc
@@ -0,0 +1 @@
+Reduce some spurious logging in worker mode.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1d586fb180..afd03137f0 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -611,10 +611,14 @@ class ReplicationCommandHandler:
# Find where we previously streamed up to.
current_token = stream.current_token(cmd.instance_name)
- # If the position token matches our current token then we're up to
- # date and there's nothing to do. Otherwise, fetch all updates
- # between then and now.
- missing_updates = cmd.prev_token != current_token
+ # If the incoming previous position is less than our current position
+ # then we're up to date and there's nothing to do. Otherwise, fetch
+ # all updates between then and now.
+ #
+ # Note: We also have to check that `current_token` is at most the
+ # new position, to handle the case where the stream gets "reset"
+ # (e.g. for `caches` and `typing` after the writer's restart).
+ missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token)
while missing_updates:
# Note: There may very well not be any new updates, but we check to
# make sure. This can particularly happen for the event stream where
@@ -644,7 +648,7 @@ class ReplicationCommandHandler:
[stream.parse_row(row) for row in rows],
)
- logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
+ logger.info("Caught up with stream '%s' to %i", stream_name, current_token)
# We've now caught up to position sent to us, notify handler.
await self._replication_data_handler.on_position(
|