summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py16
1 files changed, 9 insertions, 7 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py

index 80f732b455..6864204616 100644 --- a/synapse/replication/tcp/protocol.py +++ b/synapse/replication/tcp/protocol.py
@@ -365,6 +365,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.streamer.new_connection(self) def on_NAME(self, cmd): + logger.info("[%s] Renamed to %r", self.id(), cmd.data) self.name = cmd.data def on_USER_SYNC(self, cmd): @@ -414,16 +415,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): token, row = update[0], update[1] self.send_command(RdataCommand(stream_name, token, row)) - # Now we can send any updates that came in while we were subscribing - pending_rdata = self.pending_rdata.pop(stream_name, []) - for token, update in pending_rdata: - self.send_command(RdataCommand(stream_name, token, update)) - # We send a POSITION command to ensure that they have an up to # date token (especially useful if we didn't send any updates # above) self.send_command(PositionCommand(stream_name, current_token)) + # Now we can send any updates that came in while we were subscribing + pending_rdata = self.pending_rdata.pop(stream_name, []) + for token, update in pending_rdata: + # Only send updates newer than the current token + if token > current_token: + self.send_command(RdataCommand(stream_name, token, update)) + # They're now fully subscribed self.replication_streams.add(stream_name) except Exception as e: @@ -442,7 +445,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): self.send_command(RdataCommand(stream_name, token, data)) elif stream_name in self.connecting_streams: # The client is being subscribed to the stream - logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token) + logger.debug("[%s] Queuing RDATA %r %r", self.id(), stream_name, token) self.pending_rdata.setdefault(stream_name, []).append((token, data)) else: # The client isn't subscribed @@ -453,7 +456,6 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol): def on_connection_closed(self): BaseReplicationStreamProtocol.on_connection_closed(self) - logger.info("[%s] Replication connection closed", self.id()) self.streamer.lost_connection(self)