summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2020-08-03 17:28:09 -0700
committerAndrew Morgan <andrew@amorgan.xyz>2020-08-03 17:28:09 -0700
commitbf6cf4b6512ce2553e34932de0de18ec994cfd39 (patch)
tree45c499f7a24908324029fc1ba0c5ce065b1c127f /synapse/replication/tcp
parentMerge commit 'f1245dc3c' into anoa/dinsic_release_1_18_x (diff)
parentAdd ability to shard the federation sender (#7798) (diff)
downloadsynapse-bf6cf4b6512ce2553e34932de0de18ec994cfd39.tar.xz
Merge commit 'f299441cc' into anoa/dinsic_release_1_18_x
* commit 'f299441cc':
  Add ability to shard the federation sender (#7798)
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/commands.py10
-rw-r--r--synapse/replication/tcp/handler.py4
2 files changed, 8 insertions, 6 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py

index ccc7f1f0d1..f33801f883 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -293,20 +293,22 @@ class FederationAckCommand(Command): Format:: - FEDERATION_ACK <token> + FEDERATION_ACK <instance_name> <token> """ NAME = "FEDERATION_ACK" - def __init__(self, token): + def __init__(self, instance_name, token): + self.instance_name = instance_name self.token = token @classmethod def from_line(cls, line): - return cls(int(line)) + instance_name, token = line.split(" ") + return cls(instance_name, int(token)) def to_line(self): - return str(self.token) + return "%s %s" % (self.instance_name, self.token) class RemovePusherCommand(Command): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 55b3b79008..80f5df60f9 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -238,7 +238,7 @@ class ReplicationCommandHandler: federation_ack_counter.inc() if self._federation_sender: - self._federation_sender.federation_ack(cmd.token) + self._federation_sender.federation_ack(cmd.instance_name, cmd.token) async def on_REMOVE_PUSHER( self, conn: AbstractConnection, cmd: RemovePusherCommand @@ -527,7 +527,7 @@ class ReplicationCommandHandler: """Ack data for the federation stream. This allows the master to drop data stored purely in memory. """ - self.send_command(FederationAckCommand(token)) + self.send_command(FederationAckCommand(self._instance_name, token)) def send_user_sync( self, instance_id: str, user_id: str, is_syncing: bool, last_sync_ms: int