diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 80f732b455..6864204616 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -365,6 +365,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.streamer.new_connection(self)
def on_NAME(self, cmd):
+ logger.info("[%s] Renamed to %r", self.id(), cmd.data)
self.name = cmd.data
def on_USER_SYNC(self, cmd):
@@ -414,16 +415,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
token, row = update[0], update[1]
self.send_command(RdataCommand(stream_name, token, row))
- # Now we can send any updates that came in while we were subscribing
- pending_rdata = self.pending_rdata.pop(stream_name, [])
- for token, update in pending_rdata:
- self.send_command(RdataCommand(stream_name, token, update))
-
# We send a POSITION command to ensure that they have an up to
# date token (especially useful if we didn't send any updates
# above)
self.send_command(PositionCommand(stream_name, current_token))
+ # Now we can send any updates that came in while we were subscribing
+ pending_rdata = self.pending_rdata.pop(stream_name, [])
+ for token, update in pending_rdata:
+ # Only send updates newer than the current token
+ if token > current_token:
+ self.send_command(RdataCommand(stream_name, token, update))
+
# They're now fully subscribed
self.replication_streams.add(stream_name)
except Exception as e:
@@ -442,7 +445,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
self.send_command(RdataCommand(stream_name, token, data))
elif stream_name in self.connecting_streams:
# The client is being subscribed to the stream
- logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
+ logger.debug("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
self.pending_rdata.setdefault(stream_name, []).append((token, data))
else:
# The client isn't subscribed
@@ -453,7 +456,6 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
def on_connection_closed(self):
BaseReplicationStreamProtocol.on_connection_closed(self)
- logger.info("[%s] Replication connection closed", self.id())
self.streamer.lost_connection(self)
|