diff --git a/changelog.d/16640.misc b/changelog.d/16640.misc
new file mode 100644
index 0000000000..3b1cc2185d
--- /dev/null
+++ b/changelog.d/16640.misc
@@ -0,0 +1 @@
+More efficiently handle no-op `POSITION` over replication.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index afd03137f0..1748182663 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -588,6 +588,21 @@ class ReplicationCommandHandler:
logger.debug("Handling '%s %s'", cmd.NAME, cmd.to_line())
+ # Check if we can early discard this position. We can only do so for
+ # connected streams.
+ stream = self._streams[cmd.stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
self._add_command_to_stream_queue(conn, cmd)
async def _process_position(
@@ -599,6 +614,18 @@ class ReplicationCommandHandler:
"""
stream = self._streams[stream_name]
+ if stream.can_discard_position(
+ cmd.instance_name, cmd.prev_token, cmd.new_token
+ ) and self.is_stream_connected(conn, cmd.stream_name):
+ logger.debug(
+ "Discarding redundant POSITION %s/%s %s %s",
+ cmd.instance_name,
+ cmd.stream_name,
+ cmd.prev_token,
+ cmd.new_token,
+ )
+ return
+
# We're about to go and catch up with the stream, so remove from set
# of connected streams.
for streams in self._streams_by_connection.values():
@@ -657,6 +684,13 @@ class ReplicationCommandHandler:
self._streams_by_connection.setdefault(conn, set()).add(stream_name)
+ def is_stream_connected(
+ self, conn: IReplicationConnection, stream_name: str
+ ) -> bool:
+ """Return if stream has been successfully connected and is ready to
+ receive updates"""
+ return stream_name in self._streams_by_connection.get(conn, ())
+
def on_REMOTE_SERVER_UP(
self, conn: IReplicationConnection, cmd: RemoteServerUpCommand
) -> None:
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 58a44029aa..cc34dfb322 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -144,6 +144,16 @@ class Stream:
"""
raise NotImplementedError()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ """Whether or not a position command for this stream can be discarded.
+
+ Useful for streams that can never go backwards and where we already know
+ the stream ID for the instance has advanced.
+ """
+ return False
+
def discard_updates_and_advance(self) -> None:
"""Called when the stream should advance but the updates would be discarded,
e.g. when there are no currently connected workers.
@@ -221,6 +231,14 @@ class _StreamFromIdGen(Stream):
def minimal_local_current_token(self) -> Token:
return self._stream_id_gen.get_minimal_local_current_token()
+ def can_discard_position(
+ self, instance_name: str, prev_token: int, new_token: int
+ ) -> bool:
+ # These streams can't go backwards, so we know we can ignore any
+ # positions where the tokens are from before the current token.
+
+ return new_token <= self.current_token(instance_name)
+
def current_token_without_instance(
current_token: Callable[[], int]
|