diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index afd03137f0..c14a18ba2e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)
+ # Marks if we should send POSITION commands for all streams ASAP. This
+ # is checked by the `ReplicationStreamer` which manages sending
+ # RDATA/POSITION commands
+ self._should_announce_positions = True
+
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -397,29 +402,23 @@ class ReplicationCommandHandler:
return self._streams_to_replicate
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
- self.send_positions_to_connection(conn)
+ self.send_positions_to_connection()
- def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
+ def send_positions_to_connection(self) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""
- # We respond with current position of all streams this instance
- # replicates.
- for stream in self.get_streams_to_replicate():
- # Note that we use the current token as the prev token here (rather
- # than stream.last_token), as we can't be sure that there have been
- # no rows written between last token and the current token (since we
- # might be racing with the replication sending bg process).
- current_token = stream.current_token(self._instance_name)
- self.send_command(
- PositionCommand(
- stream.NAME,
- self._instance_name,
- current_token,
- current_token,
- )
- )
+ self._should_announce_positions = True
+ self._notifier.notify_replication()
+
+ def should_announce_positions(self) -> bool:
+ """Check if we should send POSITION commands for all streams ASAP."""
+ return self._should_announce_positions
+
+ def will_announce_positions(self) -> None:
+ """Mark that we're about to send POSITIONs out for all streams."""
+ self._should_announce_positions = False
def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -588,6 +587,21 @@ class ReplicationCommandHandler:
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
+ # Check if we can early discard this position. We can only do so for
+ # connected streams.
+ stream = self._streams[cmd.stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
self._add_command_to_stream_queue(conn, cmd)
async def _process_position(
@@ -599,6 +613,18 @@ class ReplicationCommandHandler:
"""
stream = self._streams[stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
# We're about to go and catch up with the stream, so remove from set
# of connected streams.
for streams in self._streams_by_connection.values():
@@ -626,8 +652,9 @@ class ReplicationCommandHandler:
# for why this can happen.
logger.info(
- "Fetching replication rows for '%s' between %i and %i",
+ "Fetching replication rows for '%s' / %s between %i and %i",
stream_name,
+ cmd.instance_name,
current_token,
cmd.new_token,
)
@@ -657,6 +684,13 @@ class ReplicationCommandHandler:
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
+ def is_stream_connected(
+ self, conn: IReplicationConnection, stream_name: str
+ ) -> bool:
+ """Return if stream has been successfully connected and is ready to
+ receive updates"""
+ return stream_name in self._streams_by_connection.get(conn, ())
+
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
) -> None:
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 7e96145b3b..1fa37bb888 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
# We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE.
- self.synapse_handler.send_positions_to_connection(self)
+ self.synapse_handler.send_positions_to_connection()
def messageReceived(self, pattern: str, channel: str, message: str) -> None:
"""Received a message from redis."""
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 38abb5df54..d15828f2d3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -123,7 +123,7 @@ class ReplicationStreamer:
# We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances.
- if all(
+ if not self.command_handler.should_announce_positions() and all(
stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams
):
@@ -158,6 +158,21 @@ class ReplicationStreamer:
all_streams = list(all_streams)
random.shuffle(all_streams)
+ if self.command_handler.should_announce_positions():
+ # We need to send out POSITIONs for all streams, usually
+ # because a worker has reconnected.
+ self.command_handler.will_announce_positions()
+
+ for stream in all_streams:
+ self.command_handler.send_command(
+ PositionCommand(
+ stream.NAME,
+ self._instance_name,
+ stream.last_token,
+ stream.last_token,
+ )
+ )
+
for stream in all_streams:
if stream.last_token == stream.current_token(
self._instance_name
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 58a44029aa..cc34dfb322 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -144,6 +144,16 @@ class Stream:
"""
raise NotImplementedError()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ """Whether or not a position command for this stream can be discarded.
+
+ Useful for streams that can never go backwards and where we already know
+ the stream ID for the instance has advanced.
+ """
+ return False
+
def discard_updates_and_advance(self) -> None:
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
@@ -221,6 +231,14 @@ class _StreamFromIdGen(Stream):
def minimal_local_current_token(self) -> Token:
return self._stream_id_gen.get_minimal_local_current_token()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ # These streams can't go backwards, so we know we can ignore any
+ # positions where the tokens are from before the current token.
+
+ return new_token <= self.current_token(instance_name)
+
def current_token_without_instance(
current_token: Callable[[], int]
|