diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cfba255897..ac4d6d1dd1 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler:
self.presence_handler = hs.get_presence_handler()
self.instance_id = hs.get_instance_id()
+ self.instance_name = hs.config.worker.worker_name or "master"
+
self.connections = [] # type: List[Any]
self.streams = {
@@ -134,7 +136,9 @@ class ReplicationClientHandler:
for stream_name, stream in self.streams.items():
current_token = stream.current_token()
- self.send_command(PositionCommand(stream_name, "master", current_token))
+ self.send_command(
+ PositionCommand(stream_name, self.instance_name, current_token)
+ )
async def on_USER_SYNC(self, cmd: UserSyncCommand):
user_sync_counter.inc()
@@ -232,17 +236,17 @@ class ReplicationClientHandler:
return
# Find where we previously streamed up to.
- current_token = (
- self.replication_data_handler.get_streams_to_replicate()
- .get(cmd.stream_name, {})
- .get(cmd.instance_name)
+ current_tokens = self.replication_data_handler.get_streams_to_replicate().get(
+ cmd.stream_name
)
- if current_token is None:
+ if current_tokens is None:
logger.debug(
"Got POSITION for stream we're not subscribed to: %s", cmd.stream_name
)
return
+ current_token = current_tokens.get(cmd.instance_name, 0)
+
# Fetch all updates between then and now.
limited = cmd.token != current_token
while limited:
@@ -335,7 +339,7 @@ class ReplicationClientHandler:
We need to check if the client is interested in the stream or not
"""
- self.send_command(RdataCommand(stream_name, "master", token, data))
+ self.send_command(RdataCommand(stream_name, self.instance_name, token, data))
class ReplicationDataHandler:
|