summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@matrix.org>2023-10-27 14:27:20 +0100
committerGitHub <noreply@github.com>2023-10-27 13:27:20 +0000
commit89dbbd68e120a8d33c9f5c14d29fd56ecb7c6a93 (patch)
tree6a2e788a5ffd0c74f8d9469793ecd4522214d975 /synapse/replication/tcp/handler.py
parentFix cross-worker ratelimiting (#16558) (diff)
downloadsynapse-89dbbd68e120a8d33c9f5c14d29fd56ecb7c6a93.tar.xz
Reduce spurious replication catchup (#16555)
Diffstat (limited to '')
-rw-r--r--synapse/replication/tcp/handler.py14
1 files changed, 9 insertions, 5 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1d586fb180..afd03137f0 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -611,10 +611,14 @@ class ReplicationCommandHandler:
         # Find where we previously streamed up to.
         current_token = stream.current_token(cmd.instance_name)
 
-        # If the position token matches our current token then we're up to
-        # date and there's nothing to do. Otherwise, fetch all updates
-        # between then and now.
-        missing_updates = cmd.prev_token != current_token
+        # If the incoming previous position is less than our current position
+        # then we're up to date and there's nothing to do. Otherwise, fetch
+        # all updates between then and now.
+        #
+        # Note: We also have to check that `current_token` is at most the
+        # new position, to handle the case where the stream gets "reset"
+        # (e.g. for `caches` and `typing` after the writer's restart).
+        missing_updates = not (cmd.prev_token <= current_token <= cmd.new_token)
         while missing_updates:
             # Note: There may very well not be any new updates, but we check to
             # make sure. This can particularly happen for the event stream where
@@ -644,7 +648,7 @@ class ReplicationCommandHandler:
                     [stream.parse_row(row) for row in rows],
                 )
 
-        logger.info("Caught up with stream '%s' to %i", stream_name, cmd.new_token)
+            logger.info("Caught up with stream '%s' to %i", stream_name, current_token)
 
         # We've now caught up to position sent to us, notify handler.
         await self._replication_data_handler.on_position(