summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py24
1 files changed, 8 insertions, 16 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 649312f022..817b84ad7f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -435,12 +435,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
     async def on_REPLICATE(self, cmd):
         stream_name = cmd.stream_name
-        token = cmd.token
 
         if stream_name == "ALL":
             # Subscribe to all streams we're publishing to.
             deferreds = [
-                run_in_background(self.subscribe_to_stream, stream, token)
+                run_in_background(self.subscribe_to_stream, stream)
                 for stream in iterkeys(self.streamer.streams_by_name)
             ]
 
@@ -448,7 +447,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
                 defer.gatherResults(deferreds, consumeErrors=True)
             )
         else:
-            await self.subscribe_to_stream(stream_name, token)
+            await self.subscribe_to_stream(stream_name)
 
     async def on_FEDERATION_ACK(self, cmd):
         self.streamer.federation_ack(cmd.token)
@@ -472,12 +471,8 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
             cmd.last_seen,
         )
 
-    async def subscribe_to_stream(self, stream_name, token):
+    async def subscribe_to_stream(self, stream_name):
         """Subscribe the remote to a stream.
-
-        This invloves checking if they've missed anything and sending those
-        updates down if they have. During that time new updates for the stream
-        are queued and sent once we've sent down any missed updates.
         """
         self.replication_streams.discard(stream_name)
 
@@ -620,8 +615,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
         BaseReplicationStreamProtocol.connectionMade(self)
 
         # Once we've connected subscribe to the necessary streams
-        for stream_name, token in iteritems(self.handler.get_streams_to_replicate()):
-            self.replicate(stream_name, token)
+        for stream_name in self.handler.get_streams_to_replicate():
+            self.replicate(stream_name)
 
         # Tell the server if we have any users currently syncing (should only
         # happen on synchrotrons)
@@ -711,22 +706,19 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
     async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
         self.handler.on_remote_server_up(cmd.data)
 
-    def replicate(self, stream_name, token):
+    def replicate(self, stream_name):
         """Send the subscription request to the server
         """
         if stream_name not in STREAMS_MAP:
             raise Exception("Invalid stream name %r" % (stream_name,))
 
         logger.info(
-            "[%s] Subscribing to replication stream: %r from %r",
-            self.id(),
-            stream_name,
-            token,
+            "[%s] Subscribing to replication stream: %r", self.id(), stream_name,
         )
 
         self.streams_connecting.add(stream_name)
 
-        self.send_command(ReplicateCommand(stream_name, token))
+        self.send_command(ReplicateCommand(stream_name))
 
     def on_connection_closed(self):
         BaseReplicationStreamProtocol.on_connection_closed(self)