diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index afd03137f0..c14a18ba2e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)
+ # Marks if we should send POSITION commands for all streams ASAP. This
+ # is checked by the `ReplicationStreamer` which manages sending
+ # RDATA/POSITION commands
+ self._should_announce_positions = True
+
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -397,29 +402,23 @@ class ReplicationCommandHandler:
return self._streams_to_replicate
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
- self.send_positions_to_connection(conn)
+ self.send_positions_to_connection()
- def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
+ def send_positions_to_connection(self) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""
- # We respond with current position of all streams this instance
- # replicates.
- for stream in self.get_streams_to_replicate():
- # Note that we use the current token as the prev token here (rather
- # than stream.last_token), as we can't be sure that there have been
- # no rows written between last token and the current token (since we
- # might be racing with the replication sending bg process).
- current_token = stream.current_token(self._instance_name)
- self.send_command(
- PositionCommand(
- stream.NAME,
- self._instance_name,
- current_token,
- current_token,
- )
- )
+ self._should_announce_positions = True
+ self._notifier.notify_replication()
+
+ def should_announce_positions(self) -> bool:
+ """Check if we should send POSITION commands for all streams ASAP."""
+ return self._should_announce_positions
+
+ def will_announce_positions(self) -> None:
+ """Mark that we're about to send POSITIONs out for all streams."""
+ self._should_announce_positions = False
def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -588,6 +587,21 @@ class ReplicationCommandHandler:
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
+ # Check if we can early discard this position. We can only do so for
+ # connected streams.
+ stream = self._streams[cmd.stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
self._add_command_to_stream_queue(conn, cmd)
async def _process_position(
@@ -599,6 +613,18 @@ class ReplicationCommandHandler:
"""
stream = self._streams[stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
# We're about to go and catch up with the stream, so remove from set
# of connected streams.
for streams in self._streams_by_connection.values():
@@ -626,8 +652,9 @@ class ReplicationCommandHandler:
# for why this can happen.
logger.info(
- "Fetching replication rows for '%s' between %i and %i",
+ "Fetching replication rows for '%s' / %s between %i and %i",
stream_name,
+ cmd.instance_name,
current_token,
cmd.new_token,
)
@@ -657,6 +684,13 @@ class ReplicationCommandHandler:
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
+ def is_stream_connected(
+ self, conn: IReplicationConnection, stream_name: str
+ ) -> bool:
+ """Return if stream has been successfully connected and is ready to
+ receive updates"""
+ return stream_name in self._streams_by_connection.get(conn, ())
+
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
) -> None:
|