summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-03 16:51:34 +0000
committerErik Johnston <erik@matrix.org>2020-03-20 15:31:51 +0000
commit8734b75ca8b4b81f5998f5c2ef57dfa0998c66ac (patch)
tree3851721a7f1b1622d3302af2386406f5726e7782 /synapse/replication/tcp
parentMove stream catchup to workers. (diff)
downloadsynapse-8734b75ca8b4b81f5998f5c2ef57dfa0998c66ac.tar.xz
Remove unused token param from REPLICATE cmd
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/commands.py23
-rw-r--r--synapse/replication/tcp/protocol.py24
2 files changed, 13 insertions, 34 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index 451671412d..e506f52935 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -183,35 +183,22 @@ class ReplicateCommand(Command):
 
     Format::
 
-        REPLICATE <stream_name> <token>
+        REPLICATE <stream_name>
 
-    Where <token> may be either:
-        * a numeric stream_id to stream updates from
-        * "NOW" to stream all subsequent updates.
-
-    The <stream_name> can be "ALL" to subscribe to all known streams, in which
-    case the <token> must be set to "NOW", i.e.::
-
-        REPLICATE ALL NOW
+    The <stream_name> can be "ALL" to subscribe to all known streams
     """
 
     NAME = "REPLICATE"
 
-    def __init__(self, stream_name, token):
+    def __init__(self, stream_name):
         self.stream_name = stream_name
-        self.token = token
 
     @classmethod
     def from_line(cls, line):
-        stream_name, token = line.split(" ", 1)
-        if token in ("NOW", "now"):
-            token = "NOW"
-        else:
-            token = int(token)
-        return cls(stream_name, token)
+        return cls(line)
 
     def to_line(self):
-        return " ".join((self.stream_name, str(self.token)))
+        return self.stream_name
 
     def get_logcontext_id(self):
         return "REPLICATE-" + self.stream_name
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 649312f022..817b84ad7f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -435,12 +435,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
     async def on_REPLICATE(self, cmd):
         stream_name = cmd.stream_name
-        token = cmd.token
 
         if stream_name == "ALL":
             # Subscribe to all streams we're publishing to.
             deferreds = [
-                run_in_background(self.subscribe_to_stream, stream, token)
+                run_in_background(self.subscribe_to_stream, stream)
                 for stream in iterkeys(self.streamer.streams_by_name)
             ]
 
@@ -448,7 +447,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
                 defer.gatherResults(deferreds, consumeErrors=True)
             )
         else:
-            await self.subscribe_to_stream(stream_name, token)
+            await self.subscribe_to_stream(stream_name)
 
     async def on_FEDERATION_ACK(self, cmd):
         self.streamer.federation_ack(cmd.token)
@@ -472,12 +471,8 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
             cmd.last_seen,
         )
 
-    async def subscribe_to_stream(self, stream_name, token):
+    async def subscribe_to_stream(self, stream_name):
         """Subscribe the remote to a stream.
-
-        This invloves checking if they've missed anything and sending those
-        updates down if they have. During that time new updates for the stream
-        are queued and sent once we've sent down any missed updates.
         """
         self.replication_streams.discard(stream_name)
 
@@ -620,8 +615,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         BaseReplicationStreamProtocol.connectionMade(self)
 
         # Once we've connected subscribe to the necessary streams
-        for stream_name, token in iteritems(self.handler.get_streams_to_replicate()):
-            self.replicate(stream_name, token)
+        for stream_name in self.handler.get_streams_to_replicate():
+            self.replicate(stream_name)
 
         # Tell the server if we have any users currently syncing (should only
         # happen on synchrotrons)
@@ -711,22 +706,19 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
     async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
         self.handler.on_remote_server_up(cmd.data)
 
-    def replicate(self, stream_name, token):
+    def replicate(self, stream_name):
         """Send the subscription request to the server
         """
         if stream_name not in STREAMS_MAP:
             raise Exception("Invalid stream name %r" % (stream_name,))
 
         logger.info(
-            "[%s] Subscribing to replication stream: %r from %r",
-            self.id(),
-            stream_name,
-            token,
+            "[%s] Subscribing to replication stream: %r", self.id(), stream_name,
         )
 
         self.streams_connecting.add(stream_name)
 
-        self.send_command(ReplicateCommand(stream_name, token))
+        self.send_command(ReplicateCommand(stream_name))
 
     def on_connection_closed(self):
         BaseReplicationStreamProtocol.on_connection_closed(self)