diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index afd03137f0..1748182663 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -588,6 +588,21 @@ class ReplicationCommandHandler:
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
+ # Check if we can early discard this position. We can only do so for
+ # connected streams.
+ stream = self._streams[cmd.stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
self._add_command_to_stream_queue(conn, cmd)
async def _process_position(
@@ -599,6 +614,18 @@ class ReplicationCommandHandler:
"""
stream = self._streams[stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
# We're about to go and catch up with the stream, so remove from set
# of connected streams.
for streams in self._streams_by_connection.values():
@@ -657,6 +684,13 @@ class ReplicationCommandHandler:
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
+ def is_stream_connected(
+ self, conn: IReplicationConnection, stream_name: str
+ ) -> bool:
+ """Return if stream has been successfully connected and is ready to
+ receive updates"""
+ return stream_name in self._streams_by_connection.get(conn, ())
+
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
) -> None:
|