summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-03-25 14:55:02 +0000
committerErik Johnston <erik@matrix.org>2020-03-25 14:55:02 +0000
commit83ecaeecbf9997b5ceb532fa965f071084ca61ee (patch)
treef62a181a3a651ff595311bfa166778b4d4bfb000 /synapse/replication/tcp/handler.py
parentPass instance name through to rdata (diff)
downloadsynapse-github/erikj/split_out_fed_stream.tar.xz
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py18
1 files changed, 11 insertions, 7 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py

index cfba255897..ac4d6d1dd1 100644 --- a/synapse/replication/tcp/handler.py +++ b/synapse/replication/tcp/handler.py
@@ -63,6 +63,8 @@ class ReplicationClientHandler: self.presence_handler = hs.get_presence_handler() self.instance_id = hs.get_instance_id() + self.instance_name = hs.config.worker.worker_name or "master" + self.connections = [] # type: List[Any] self.streams = { @@ -134,7 +136,9 @@ class ReplicationClientHandler: for stream_name, stream in self.streams.items(): current_token = stream.current_token() - self.send_command(PositionCommand(stream_name, "master", current_token)) + self.send_command( + PositionCommand(stream_name, self.instance_name, current_token) + ) async def on_USER_SYNC(self, cmd: UserSyncCommand): user_sync_counter.inc() @@ -232,17 +236,17 @@ class ReplicationClientHandler: return # Find where we previously streamed up to. - current_token = ( - self.replication_data_handler.get_streams_to_replicate() - .get(cmd.stream_name, {}) - .get(cmd.instance_name) + current_tokens = self.replication_data_handler.get_streams_to_replicate().get( + cmd.stream_name ) - if current_token is None: + if current_tokens is None: logger.debug( "Got POSITION for stream we're not subscribed to: %s", cmd.stream_name ) return + current_token = current_tokens.get(cmd.instance_name, 0) + # Fetch all updates between then and now. limited = cmd.token != current_token while limited: @@ -335,7 +339,7 @@ class ReplicationClientHandler: We need to check if the client is interested in the stream or not """ - self.send_command(RdataCommand(stream_name, "master", token, data)) + self.send_command(RdataCommand(stream_name, self.instance_name, token, data)) class ReplicationDataHandler: