1 files changed, 16 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 38abb5df54..d15828f2d3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -123,7 +123,7 @@ class ReplicationStreamer:
# We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances.
- if all(
+ if not self.command_handler.should_announce_positions() and all(
stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams
):
@@ -158,6 +158,21 @@ class ReplicationStreamer:
all_streams = list(all_streams)
random.shuffle(all_streams)
+ if self.command_handler.should_announce_positions():
+ # We need to send out POSITIONs for all streams, usually
+ # because a worker has reconnected.
+ self.command_handler.will_announce_positions()
+
+ for stream in all_streams:
+ self.command_handler.send_command(
+ PositionCommand(
+ stream.NAME,
+ self._instance_name,
+ stream.last_token,
+ stream.last_token,
+ )
+ )
+
for stream in all_streams:
if stream.last_token == stream.current_token(
self._instance_name
|