summary refs log tree commit diff
path: root/synapse/replication/tcp/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/commands.py')
-rw-r--r--synapse/replication/tcp/commands.py32
1 files changed, 32 insertions, 0 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py

index 8cd47770c1..3bc06d59d5 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py
@@ -171,6 +171,37 @@ class PositionCommand(Command): return " ".join((self.stream_name, self.instance_name, str(self.token))) +class PersistedToCommand(Command): + """Sent by writers to inform others that it has persisted up to the included + token. + + The included `token` will *not* have been persisted by the instance. + + Format:: + + PERSISTED_TO <stream_name> <instance_name> <token> + + On receipt the client should mark that the given instances has persisted + everything up to the given token. Note: this does *not* mean that other + instances have also persisted all their rows up to that point. + """ + + NAME = "PERSISTED_TO" + + def __init__(self, stream_name, instance_name, token): + self.stream_name = stream_name + self.instance_name = instance_name + self.token = token + + @classmethod + def from_line(cls, line): + stream_name, instance_name, token = line.split(" ", 2) + return cls(stream_name, instance_name, int(token)) + + def to_line(self): + return " ".join((self.stream_name, self.instance_name, str(self.token))) + + class ErrorCommand(_SimpleCommand): """Sent by either side if there was an ERROR. The data is a string describing the error. @@ -405,6 +436,7 @@ _COMMANDS = ( UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, + PersistedToCommand, ) # type: Tuple[Type[Command], ...] # Map of command name to command type.