diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 649312f022..817b84ad7f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -435,12 +435,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
async def on_REPLICATE(self, cmd):
stream_name = cmd.stream_name
- token = cmd.token
if stream_name == "ALL":
# Subscribe to all streams we're publishing to.
deferreds = [
- run_in_background(self.subscribe_to_stream, stream, token)
+ run_in_background(self.subscribe_to_stream, stream)
for stream in iterkeys(self.streamer.streams_by_name)
]
@@ -448,7 +447,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
defer.gatherResults(deferreds, consumeErrors=True)
)
else:
- await self.subscribe_to_stream(stream_name, token)
+ await self.subscribe_to_stream(stream_name)
async def on_FEDERATION_ACK(self, cmd):
self.streamer.federation_ack(cmd.token)
@@ -472,12 +471,8 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
cmd.last_seen,
)
- async def subscribe_to_stream(self, stream_name, token):
+ async def subscribe_to_stream(self, stream_name):
"""Subscribe the remote to a stream.
-
- This invloves checking if they've missed anything and sending those
- updates down if they have. During that time new updates for the stream
- are queued and sent once we've sent down any missed updates.
"""
self.replication_streams.discard(stream_name)
@@ -620,8 +615,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
BaseReplicationStreamProtocol.connectionMade(self)
# Once we've connected subscribe to the necessary streams
- for stream_name, token in iteritems(self.handler.get_streams_to_replicate()):
- self.replicate(stream_name, token)
+ for stream_name in self.handler.get_streams_to_replicate():
+ self.replicate(stream_name)
# Tell the server if we have any users currently syncing (should only
# happen on synchrotrons)
@@ -711,22 +706,19 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol):
async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
self.handler.on_remote_server_up(cmd.data)
- def replicate(self, stream_name, token):
+ def replicate(self, stream_name):
"""Send the subscription request to the server
"""
if stream_name not in STREAMS_MAP:
raise Exception("Invalid stream name %r" % (stream_name,))
logger.info(
- "[%s] Subscribing to replication stream: %r from %r",
- self.id(),
- stream_name,
- token,
+ "[%s] Subscribing to replication stream: %r", self.id(), stream_name,
)
self.streams_connecting.add(stream_name)
- self.send_command(ReplicateCommand(stream_name, token))
+ self.send_command(ReplicateCommand(stream_name))
def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
|