From 1e05b033af47c9858b5db49982b5b43b3c415729 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 16 Sep 2020 11:48:42 +0100 Subject: Persited up to command --- synapse/replication/tcp/commands.py | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) (limited to 'synapse/replication/tcp/commands.py') diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 8cd47770c1..3bc06d59d5 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -171,6 +171,37 @@ class PositionCommand(Command): return " ".join((self.stream_name, self.instance_name, str(self.token))) +class PersistedToCommand(Command): + """Sent by writers to inform others that it has persisted up to the included + token. + + The included `token` will *not* have been persisted by the instance. + + Format:: + + PERSISTED_TO + + On receipt the client should mark that the given instances has persisted + everything up to the given token. Note: this does *not* mean that other + instances have also persisted all their rows up to that point. + """ + + NAME = "PERSISTED_TO" + + def __init__(self, stream_name, instance_name, token): + self.stream_name = stream_name + self.instance_name = instance_name + self.token = token + + @classmethod + def from_line(cls, line): + stream_name, instance_name, token = line.split(" ", 2) + return cls(stream_name, instance_name, int(token)) + + def to_line(self): + return " ".join((self.stream_name, self.instance_name, str(self.token))) + + class ErrorCommand(_SimpleCommand): """Sent by either side if there was an ERROR. The data is a string describing the error. @@ -405,6 +436,7 @@ _COMMANDS = ( UserIpCommand, RemoteServerUpCommand, ClearUserSyncsCommand, + PersistedToCommand, ) # type: Tuple[Type[Command], ...] # Map of command name to command type. -- cgit 1.5.1