summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-07-10 18:26:36 +0100
committerGitHub <noreply@github.com>2020-07-10 18:26:36 +0100
commitf299441cc67f31dcd47b8fdfda4a218bee9df9ba (patch)
treebba78ca419a547249491c81f3c9968cf526c13b1 /synapse/replication/tcp
parentFix resync remote devices on receive PDU in worker mode. (#7815) (diff)
downloadsynapse-f299441cc67f31dcd47b8fdfda4a218bee9df9ba.tar.xz
Add ability to shard the federation sender (#7798)
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/commands.py10
-rw-r--r--synapse/replication/tcp/handler.py4
2 files changed, 8 insertions, 6 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index ccc7f1f0d1..f33801f883 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -293,20 +293,22 @@ class FederationAckCommand(Command):
 
     Format::
 
-        FEDERATION_ACK <token>
+        FEDERATION_ACK <instance_name> <token>
     """
 
     NAME = "FEDERATION_ACK"
 
-    def __init__(self, token):
+    def __init__(self, instance_name, token):
+        self.instance_name = instance_name
         self.token = token
 
     @classmethod
     def from_line(cls, line):
-        return cls(int(line))
+        instance_name, token = line.split(" ")
+        return cls(instance_name, int(token))
 
     def to_line(self):
-        return str(self.token)
+        return "%s %s" % (self.instance_name, self.token)
 
 
 class RemovePusherCommand(Command):
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 55b3b79008..80f5df60f9 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -238,7 +238,7 @@ class ReplicationCommandHandler:
         federation_ack_counter.inc()
 
         if self._federation_sender:
-            self._federation_sender.federation_ack(cmd.token)
+            self._federation_sender.federation_ack(cmd.instance_name, cmd.token)
 
     async def on_REMOVE_PUSHER(
         self, conn: AbstractConnection, cmd: RemovePusherCommand
@@ -527,7 +527,7 @@ class ReplicationCommandHandler:
         """Ack data for the federation stream. This allows the master to drop
         data stored purely in memory.
         """
-        self.send_command(FederationAckCommand(token))
+        self.send_command(FederationAckCommand(self._instance_name, token))
 
     def send_user_sync(
         self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int