diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 8cd47770c1..3bc06d59d5 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -171,6 +171,37 @@ class PositionCommand(Command):
return " ".join((self.stream_name, self.instance_name, str(self.token)))
+class PersistedToCommand(Command):
+ """Sent by writers to inform others that it has persisted up to the included
+ token.
+
+ The included `token` will *not* have been persisted by the instance.
+
+ Format::
+
+ PERSISTED_TO <stream_name> <instance_name> <token>
+
+ On receipt the client should mark that the given instances has persisted
+ everything up to the given token. Note: this does *not* mean that other
+ instances have also persisted all their rows up to that point.
+ """
+
+ NAME = "PERSISTED_TO"
+
+ def __init__(self, stream_name, instance_name, token):
+ self.stream_name = stream_name
+ self.instance_name = instance_name
+ self.token = token
+
+ @classmethod
+ def from_line(cls, line):
+ stream_name, instance_name, token = line.split(" ", 2)
+ return cls(stream_name, instance_name, int(token))
+
+ def to_line(self):
+ return " ".join((self.stream_name, self.instance_name, str(self.token)))
+
+
class ErrorCommand(_SimpleCommand):
"""Sent by either side if there was an ERROR. The data is a string describing
the error.
@@ -405,6 +436,7 @@ _COMMANDS = (
UserIpCommand,
RemoteServerUpCommand,
ClearUserSyncsCommand,
+ PersistedToCommand,
) # type: Tuple[Type[Command], ...]
# Map of command name to command type.
|