diff options
Diffstat (limited to 'synapse/replication/tcp/resource.py')
-rw-r--r-- | synapse/replication/tcp/resource.py | 17 |
1 files changed, 16 insertions, 1 deletions
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 38abb5df54..d15828f2d3 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -123,7 +123,7 @@ class ReplicationStreamer: # We check up front to see if anything has actually changed, as we get # poked because of changes that happened on other instances. - if all( + if not self.command_handler.should_announce_positions() and all( stream.last_token == stream.current_token(self._instance_name) for stream in self.streams ): @@ -158,6 +158,21 @@ class ReplicationStreamer: all_streams = list(all_streams) random.shuffle(all_streams) + if self.command_handler.should_announce_positions(): + # We need to send out POSITIONs for all streams, usually + # because a worker has reconnected. + self.command_handler.will_announce_positions() + + for stream in all_streams: + self.command_handler.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + stream.last_token, + stream.last_token, + ) + ) + for stream in all_streams: if stream.last_token == stream.current_token( self._instance_name |