summary refs log tree commit diff
path: root/synapse/replication/tcp/commands.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/commands.py')
-rw-r--r--synapse/replication/tcp/commands.py36
1 files changed, 26 insertions, 10 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 8cd47770c1..ac532ed588 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -141,15 +141,23 @@ class RdataCommand(Command):
 
 
 class PositionCommand(Command):
-    """Sent by the server to tell the client the stream position without
-    needing to send an RDATA.
+    """Sent by an instance to tell others the stream position without needing to
+    send an RDATA.
+
+    Two tokens are sent, the new position and the last position sent by the
+    instance (in an RDATA or other POSITION). The tokens are chosen so that *no*
+    rows were written by the instance between the `prev_token` and `new_token`.
+    (If an instance hasn't sent a position before then the new position can be
+    used for both.)
 
     Format::
 
-        POSITION <stream_name> <instance_name> <token>
+        POSITION <stream_name> <instance_name> <prev_token> <new_token>
 
-    On receipt of a POSITION command clients should check if they have missed
-    any updates, and if so then fetch them out of band.
+    On receipt of a POSITION command instances should check if they have missed
+    any updates, and if so then fetch them out of band. Instances can check this
+    by comparing their view of the current token for the sending instance with
+    the included `prev_token`.
 
     The `<instance_name>` is the process that sent the command and is the source
     of the stream.
@@ -157,18 +165,26 @@ class PositionCommand(Command):
 
     NAME = "POSITION"
 
-    def __init__(self, stream_name, instance_name, token):
+    def __init__(self, stream_name, instance_name, prev_token, new_token):
         self.stream_name = stream_name
         self.instance_name = instance_name
-        self.token = token
+        self.prev_token = prev_token
+        self.new_token = new_token
 
     @classmethod
     def from_line(cls, line):
-        stream_name, instance_name, token = line.split(" ", 2)
-        return cls(stream_name, instance_name, int(token))
+        stream_name, instance_name, prev_token, new_token = line.split(" ", 3)
+        return cls(stream_name, instance_name, int(prev_token), int(new_token))
 
     def to_line(self):
-        return " ".join((self.stream_name, self.instance_name, str(self.token)))
+        return " ".join(
+            (
+                self.stream_name,
+                self.instance_name,
+                str(self.prev_token),
+                str(self.new_token),
+            )
+        )
 
 
 class ErrorCommand(_SimpleCommand):