diff options
author | Erik Johnston <erik@matrix.org> | 2020-03-25 10:17:29 +0000 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2020-03-25 10:21:22 +0000 |
commit | f6e7daaac3e96a5859e0bee9fceec5122fe3b099 (patch) | |
tree | 05701b25236362577fdc866da618ac35b7dbfd2d /synapse | |
parent | DFSDJFDSLKF (diff) | |
download | synapse-f6e7daaac3e96a5859e0bee9fceec5122fe3b099.tar.xz |
Add instance name to command
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/replication/tcp/commands.py | 22 | ||||
-rw-r--r-- | synapse/replication/tcp/handler.py | 4 | ||||
-rw-r--r-- | synapse/replication/tcp/redis.py | 2 |
3 files changed, 17 insertions, 11 deletions
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py index e4eec643f7..98ea607412 100644 --- a/synapse/replication/tcp/commands.py +++ b/synapse/replication/tcp/commands.py @@ -86,7 +86,7 @@ class RdataCommand(Command): Format:: - RDATA <stream_name> <token> <row_json> + RDATA <stream_name> <instance_name> <token> <row_json> The `<token>` may either be a numeric stream id OR "batch". The latter case is used to support sending multiple updates with the same stream ID. This @@ -107,22 +107,27 @@ class RdataCommand(Command): NAME = "RDATA" - def __init__(self, stream_name, token, row): + def __init__(self, stream_name, instance_name, token, row): self.stream_name = stream_name + self.instance_name = instance_name self.token = token self.row = row @classmethod def from_line(cls, line): - stream_name, token, row_json = line.split(" ", 2) + stream_name, instance_name, token, row_json = line.split(" ", 3) return cls( - stream_name, None if token == "batch" else int(token), json.loads(row_json) + stream_name, + instance_name, + None if token == "batch" else int(token), + json.loads(row_json), ) def to_line(self): return " ".join( ( self.stream_name, + self.instance_name, str(self.token) if self.token is not None else "batch", _json_encoder.encode(self.row), ) @@ -142,17 +147,18 @@ class PositionCommand(Command): NAME = "POSITION" - def __init__(self, stream_name, token): + def __init__(self, stream_name, instance_name, token): self.stream_name = stream_name + self.instance_name = instance_name self.token = token @classmethod def from_line(cls, line): - stream_name, token = line.split(" ", 1) - return cls(stream_name, int(token)) + stream_name, instance_name, token = line.split(" ", 2) + return cls(stream_name, instance_name, int(token)) def to_line(self): - return " ".join((self.stream_name, str(self.token))) + return " ".join((self.stream_name, self.instance_name, str(self.token))) class ErrorCommand(Command): diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py index 62471a887b..ffc6e7d398 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py @@ -134,7 +134,7 @@ class ReplicationClientHandler: for stream_name, stream in self.streams.items(): current_token = stream.current_token() - self.send_command(PositionCommand(stream_name, current_token)) + self.send_command(PositionCommand(stream_name, "master", current_token)) async def on_USER_SYNC(self, cmd: UserSyncCommand): user_sync_counter.inc() @@ -326,7 +326,7 @@ class ReplicationClientHandler: We need to check if the client is interested in the stream or not """ - self.send_command(RdataCommand(stream_name, token, data)) + self.send_command(RdataCommand(stream_name, "master", token, data)) class ReplicationDataHandler: diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py index 6173ca4385..4a83b526bf 100644 --- a/synapse/replication/tcp/redis.py +++ b/synapse/replication/tcp/redis.py @@ -122,7 +122,7 @@ class RedisSubscriber(txredisapi.SubscriberProtocol): We need to check if the client is interested in the stream or not """ - self.send_command(RdataCommand(stream_name, token, data)) + self.send_command(RdataCommand(stream_name, "master", token, data)) class RedisFactory(txredisapi.SubscriberFactory): |