summary refs log tree commit diff
path: root/synapse/replication/tcp/resource.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r--synapse/replication/tcp/resource.py17
1 files changed, 16 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 38abb5df54..d15828f2d3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -123,7 +123,7 @@ class ReplicationStreamer:
 
         # We check up front to see if anything has actually changed, as we get
         # poked because of changes that happened on other instances.
-        if all(
+        if not self.command_handler.should_announce_positions() and all(
             stream.last_token == stream.current_token(self._instance_name)
             for stream in self.streams
         ):
@@ -158,6 +158,21 @@ class ReplicationStreamer:
                         all_streams = list(all_streams)
                         random.shuffle(all_streams)
 
+                    if self.command_handler.should_announce_positions():
+                        # We need to send out POSITIONs for all streams, usually
+                        # because a worker has reconnected.
+                        self.command_handler.will_announce_positions()
+
+                        for stream in all_streams:
+                            self.command_handler.send_command(
+                                PositionCommand(
+                                    stream.NAME,
+                                    self._instance_name,
+                                    stream.last_token,
+                                    stream.last_token,
+                                )
+                            )
+
                     for stream in all_streams:
                         if stream.last_token == stream.current_token(
                             self._instance_name