diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index ccc7f1f0d1..f33801f883 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -293,20 +293,22 @@ class FederationAckCommand(Command):
Format::
- FEDERATION_ACK <token>
+ FEDERATION_ACK <instance_name> <token>
"""
NAME = "FEDERATION_ACK"
- def __init__(self, token):
+ def __init__(self, instance_name, token):
+ self.instance_name = instance_name
self.token = token
@classmethod
def from_line(cls, line):
- return cls(int(line))
+ instance_name, token = line.split(" ")
+ return cls(instance_name, int(token))
def to_line(self):
- return str(self.token)
+ return "%s %s" % (self.instance_name, self.token)
class RemovePusherCommand(Command):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 55b3b79008..80f5df60f9 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -238,7 +238,7 @@ class ReplicationCommandHandler:
federation_ack_counter.inc()
if self._federation_sender:
- self._federation_sender.federation_ack(cmd.token)
+ self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
async def on_REMOVE_PUSHER(
self, conn: AbstractConnection, cmd: RemovePusherCommand
@@ -527,7 +527,7 @@ class ReplicationCommandHandler:
"""Ack data for the federation stream. This allows the master to drop
data stored purely in memory.
"""
- self.send_command(FederationAckCommand(token))
+ self.send_command(FederationAckCommand(self._instance_name, token))
def send_user_sync(
self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int
|