diff --git a/changelog.d/16639.bugfix b/changelog.d/16639.bugfix
new file mode 100644
index 0000000000..3feff89af6
--- /dev/null
+++ b/changelog.d/16639.bugfix
@@ -0,0 +1 @@
+Fix sending out of order `POSITION` over replication, causing additional database load.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1748182663..c14a18ba2e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler:
if hs.config.redis.redis_enabled:
self._notifier.add_lock_released_callback(self.on_lock_released)
+ # Marks if we should send POSITION commands for all streams ASAP. This
+ # is checked by the `ReplicationStreamer` which manages sending
+ # RDATA/POSITION commands
+ self._should_announce_positions = True
+
def subscribe_to_channel(self, channel_name: str) -> None:
"""
Indicates that we wish to subscribe to a Redis channel by name.
@@ -397,29 +402,23 @@ class ReplicationCommandHandler:
return self._streams_to_replicate
def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
- self.send_positions_to_connection(conn)
+ self.send_positions_to_connection()
- def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
+ def send_positions_to_connection(self) -> None:
"""Send current position of all streams this process is source of to
the connection.
"""
- # We respond with current position of all streams this instance
- # replicates.
- for stream in self.get_streams_to_replicate():
- # Note that we use the current token as the prev token here (rather
- # than stream.last_token), as we can't be sure that there have been
- # no rows written between last token and the current token (since we
- # might be racing with the replication sending bg process).
- current_token = stream.current_token(self._instance_name)
- self.send_command(
- PositionCommand(
- stream.NAME,
- self._instance_name,
- current_token,
- current_token,
- )
- )
+ self._should_announce_positions = True
+ self._notifier.notify_replication()
+
+ def should_announce_positions(self) -> bool:
+ """Check if we should send POSITION commands for all streams ASAP."""
+ return self._should_announce_positions
+
+ def will_announce_positions(self) -> None:
+ """Mark that we're about to send POSITIONs out for all streams."""
+ self._should_announce_positions = False
def on_USER_SYNC(
self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -653,8 +652,9 @@ class ReplicationCommandHandler:
# for why this can happen.
logger.info(
- "Fetching replication rows for '%s' between %i and %i",
+ "Fetching replication rows for '%s' / %s between %i and %i",
stream_name,
+ cmd.instance_name,
current_token,
cmd.new_token,
)
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 7e96145b3b..1fa37bb888 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
# We send out our positions when there is a new connection in case the
# other side missed updates. We do this for Redis connections as the
# otherside won't know we've connected and so won't issue a REPLICATE.
- self.synapse_handler.send_positions_to_connection(self)
+ self.synapse_handler.send_positions_to_connection()
def messageReceived(self, pattern: str, channel: str, message: str) -> None:
"""Received a message from redis."""
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 38abb5df54..d15828f2d3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -123,7 +123,7 @@ class ReplicationStreamer:
# We check up front to see if anything has actually changed, as we get
# poked because of changes that happened on other instances.
- if all(
+ if not self.command_handler.should_announce_positions() and all(
stream.last_token == stream.current_token(self._instance_name)
for stream in self.streams
):
@@ -158,6 +158,21 @@ class ReplicationStreamer:
all_streams = list(all_streams)
random.shuffle(all_streams)
+ if self.command_handler.should_announce_positions():
+ # We need to send out POSITIONs for all streams, usually
+ # because a worker has reconnected.
+ self.command_handler.will_announce_positions()
+
+ for stream in all_streams:
+ self.command_handler.send_command(
+ PositionCommand(
+ stream.NAME,
+ self._instance_name,
+ stream.last_token,
+ stream.last_token,
+ )
+ )
+
for stream in all_streams:
if stream.last_token == stream.current_token(
self._instance_name
diff --git a/tests/replication/tcp/streams/test_typing.py b/tests/replication/tcp/streams/test_typing.py
index 5a38ac831f..20b3d431ba 100644
--- a/tests/replication/tcp/streams/test_typing.py
+++ b/tests/replication/tcp/streams/test_typing.py
@@ -35,6 +35,10 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)
+ # Create a typing update before we reconnect so that there is a missing
+ # update to fetch.
+ typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
+
self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
@@ -91,6 +95,10 @@ class TypingStreamTestCase(BaseStreamTestCase):
typing = self.hs.get_typing_handler()
assert isinstance(typing, TypingWriterHandler)
+ # Create a typing update before we reconnect so that there is a missing
+ # update to fetch.
+ typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
+
self.reconnect()
typing._push_update(member=RoomMember(ROOM_ID, USER_ID), typing=True)
|