diff options
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/handler.py | 38 | ||||
-rw-r--r-- | synapse/replication/tcp/redis.py | 2 | ||||
-rw-r--r-- | synapse/replication/tcp/resource.py | 17 |
3 files changed, 36 insertions, 21 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 1748182663..c14a18ba2e 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -257,6 +257,11 @@ class ReplicationCommandHandler: if hs.config.redis.redis_enabled: self._notifier.add_lock_released_callback(self.on_lock_released) + # Marks if we should send POSITION commands for all streams ASAP. This + # is checked by the `ReplicationStreamer` which manages sending + # RDATA/POSITION commands + self._should_announce_positions = True + def subscribe_to_channel(self, channel_name: str) -> None: """ Indicates that we wish to subscribe to a Redis channel by name. @@ -397,29 +402,23 @@ class ReplicationCommandHandler: return self._streams_to_replicate def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None: - self.send_positions_to_connection(conn) + self.send_positions_to_connection() - def send_positions_to_connection(self, conn: IReplicationConnection) -> None: + def send_positions_to_connection(self) -> None: """Send current position of all streams this process is source of to the connection. """ - # We respond with current position of all streams this instance - # replicates. - for stream in self.get_streams_to_replicate(): - # Note that we use the current token as the prev token here (rather - # than stream.last_token), as we can't be sure that there have been - # no rows written between last token and the current token (since we - # might be racing with the replication sending bg process). - current_token = stream.current_token(self._instance_name) - self.send_command( - PositionCommand( - stream.NAME, - self._instance_name, - current_token, - current_token, - ) - ) + self._should_announce_positions = True + self._notifier.notify_replication() + + def should_announce_positions(self) -> bool: + """Check if we should send POSITION commands for all streams ASAP.""" + return self._should_announce_positions + + def will_announce_positions(self) -> None: + """Mark that we're about to send POSITIONs out for all streams.""" + self._should_announce_positions = False def on_USER_SYNC( self, conn: IReplicationConnection, cmd: UserSyncCommand @@ -653,8 +652,9 @@ class ReplicationCommandHandler: # for why this can happen. logger.info( - "Fetching replication rows for '%s' between %i and %i", + "Fetching replication rows for '%s' / %s between %i and %i", stream_name, + cmd.instance_name, current_token, cmd.new_token, ) diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 7e96145b3b..1fa37bb888 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol): # We send out our positions when there is a new connection in case the # other side missed updates. We do this for Redis connections as the # otherside won't know we've connected and so won't issue a REPLICATE. - self.synapse_handler.send_positions_to_connection(self) + self.synapse_handler.send_positions_to_connection() def messageReceived(self, pattern: str, channel: str, message: str) -> None: """Received a message from redis.""" diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py index 38abb5df54..d15828f2d3 100644 --- a/synapse/replication/tcp/resource.py +++ b/synapse/replication/tcp/resource.py @@ -123,7 +123,7 @@ class ReplicationStreamer: # We check up front to see if anything has actually changed, as we get # poked because of changes that happened on other instances. - if all( + if not self.command_handler.should_announce_positions() and all( stream.last_token == stream.current_token(self._instance_name) for stream in self.streams ): @@ -158,6 +158,21 @@ class ReplicationStreamer: all_streams = list(all_streams) random.shuffle(all_streams) + if self.command_handler.should_announce_positions(): + # We need to send out POSITIONs for all streams, usually + # because a worker has reconnected. + self.command_handler.will_announce_positions() + + for stream in all_streams: + self.command_handler.send_command( + PositionCommand( + stream.NAME, + self._instance_name, + stream.last_token, + stream.last_token, + ) + ) + for stream in all_streams: if stream.last_token == stream.current_token( self._instance_name |