Fix sending out of order `POSITION` over replication (#16639)
If a worker reconnects to Redis we send out the current positions of all our streams. However, if we're also trying to send out a backlog of RDATA at the same time then we can end up sending a `POSITION` with the current token *before* we've sent all the RDATA before the current token.
This doesn't cause actual bugs as the receiving servers see the POSITION, fetch the relevant rows from the DB, and then ignore the old RDATA as they come in. However, this is inefficient so it'd be better if we didn't send out-of-order positions
1 files changed, 19 insertions, 19 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1748182663..c14a18ba2e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)
+ # Marks if we should send POSITION commands for all streams ASAP. This
+ # is checked by the `ReplicationStreamer` which manages sending
+ # RDATA/POSITION commands
+ self._should_announce_positions = True
+
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -397,29 +402,23 @@ class ReplicationCommandHandler:
return self._streams_to_replicate
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
- self.send_positions_to_connection(conn)
+ self.send_positions_to_connection()
- def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
+ def send_positions_to_connection(self) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""
- # We respond with current position of all streams this instance
- # replicates.
- for stream in self.get_streams_to_replicate():
- # Note that we use the current token as the prev token here (rather
- # than stream.last_token), as we can't be sure that there have been
- # no rows written between last token and the current token (since we
- # might be racing with the replication sending bg process).
- current_token = stream.current_token(self._instance_name)
- self.send_command(
- PositionCommand(
- stream.NAME,
- self._instance_name,
- current_token,
- current_token,
- )
- )
+ self._should_announce_positions = True
+ self._notifier.notify_replication()
+
+ def should_announce_positions(self) -> bool:
+ """Check if we should send POSITION commands for all streams ASAP."""
+ return self._should_announce_positions
+
+ def will_announce_positions(self) -> None:
+ """Mark that we're about to send POSITIONs out for all streams."""
+ self._should_announce_positions = False
def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -653,8 +652,9 @@ class ReplicationCommandHandler:
# for why this can happen.
logger.info(
- "Fetching replication rows for '%s' between %i and %i",
+ "Fetching replication rows for '%s' / %s between %i and %i",
stream_name,
+ cmd.instance_name,
current_token,
cmd.new_token,
)
|