diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-03 16:51:34 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-20 15:31:51 +0000 |
commit | 8734b75ca8b4b81f5998f5c2ef57dfa0998c66ac (patch) | |
tree | 3851721a7f1b1622d3302af2386406f5726e7782 /synapse/replication/tcp | |
parent | Move stream catchup to workers. (diff) | |
download | synapse-8734b75ca8b4b81f5998f5c2ef57dfa0998c66ac.tar.xz |
Remove unused token param from REPLICATE cmd
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r-- | synapse/replication/tcp/commands.py | 23 | ||||
-rw-r--r-- | synapse/replication/tcp/protocol.py | 24 |
2 files changed, 13 insertions, 34 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index 451671412d..e506f52935 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -183,35 +183,22 @@ class ReplicateCommand(Command): Format:: - REPLICATE <stream_name> <token> + REPLICATE <stream_name> - Where <token> may be either: - * a numeric stream_id to stream updates from - * "NOW" to stream all subsequent updates. - - The <stream_name> can be "ALL" to subscribe to all known streams, in which - case the <token> must be set to "NOW", i.e.:: - - REPLICATE ALL NOW + The <stream_name> can be "ALL" to subscribe to all known streams """ NAME = "REPLICATE" - def __init__(self, stream_name, token): + def __init__(self, stream_name): self.stream_name = stream_name - self.token = token @classmethod def from_line(cls, line): - stream_name, token = line.split(" ", 1) - if token in ("NOW", "now"): - token = "NOW" - else: - token = int(token) - return cls(stream_name, token) + return cls(line) def to_line(self): - return " ".join((self.stream_name, str(self.token))) + return self.stream_name def get_logcontext_id(self): return "REPLICATE-" + self.stream_name diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py index 649312f022..817b84ad7f 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py @@ -435,12 +435,11 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): async def on_REPLICATE(self, cmd): stream_name = cmd.stream_name - token = cmd.token if stream_name == "ALL": # Subscribe to all streams we're publishing to. deferreds = [ - run_in_background(self.subscribe_to_stream, stream, token) + run_in_background(self.subscribe_to_stream, stream) for stream in iterkeys(self.streamer.streams_by_name) ] @@ -448,7 +447,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): defer.gatherResults(deferreds, consumeErrors=True) ) else: - await self.subscribe_to_stream(stream_name, token) + await self.subscribe_to_stream(stream_name) async def on_FEDERATION_ACK(self, cmd): self.streamer.federation_ack(cmd.token) @@ -472,12 +471,8 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): cmd.last_seen, ) - async def subscribe_to_stream(self, stream_name, token): + async def subscribe_to_stream(self, stream_name): """Subscribe the remote to a stream. - - This invloves checking if they've missed anything and sending those - updates down if they have. During that time new updates for the stream - are queued and sent once we've sent down any missed updates. """ self.replication_streams.discard(stream_name) @@ -620,8 +615,8 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): BaseReplicationStreamProtocol.connectionMade(self) # Once we've connected subscribe to the necessary streams - for stream_name, token in iteritems(self.handler.get_streams_to_replicate()): - self.replicate(stream_name, token) + for stream_name in self.handler.get_streams_to_replicate(): + self.replicate(stream_name) # Tell the server if we have any users currently syncing (should only # happen on synchrotrons) @@ -711,22 +706,19 @@ class ClientReplicationStreamProtocol(BaseReplicationStreamProtocol): async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand): self.handler.on_remote_server_up(cmd.data) - def replicate(self, stream_name, token): + def replicate(self, stream_name): """Send the subscription request to the server """ if stream_name not in STREAMS_MAP: raise Exception("Invalid stream name %r" % (stream_name,)) logger.info( - "[%s] Subscribing to replication stream: %r from %r", - self.id(), - stream_name, - token, + "[%s] Subscribing to replication stream: %r", self.id(), stream_name, ) self.streams_connecting.add(stream_name) - self.send_command(ReplicateCommand(stream_name, token)) + self.send_command(ReplicateCommand(stream_name)) def on_connection_closed(self): BaseReplicationStreamProtocol.on_connection_closed(self) |