1 files changed, 18 insertions, 0 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 9d17eff714..347467d863 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -238,6 +238,24 @@ class ReplicationStreamer:
except Exception:
logger.exception("Failed to replicate")
+ # The last token we send may not match the current
+ # token, in which case we want to send out a `POSITION`
+ # to tell other workers the actual current position.
+ if updates[-1][0] < current_token:
+ logger.info(
+ "Sending position: %s -> %s",
+ stream.NAME,
+ current_token,
+ )
+ self.command_handler.send_command(
+ PositionCommand(
+ stream.NAME,
+ self._instance_name,
+ updates[-1][0],
+ current_token,
+ )
+ )
+
logger.debug("No more pending updates, breaking poke loop")
finally:
self.pending_updates = False
|