diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 6ae3c86a8d..6864204616 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -415,16 +415,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
token, row = update[0], update[1]
self.send_command(RdataCommand(stream_name, token, row))
- # Now we can send any updates that came in while we were subscribing
- pending_rdata = self.pending_rdata.pop(stream_name, [])
- for token, update in pending_rdata:
- self.send_command(RdataCommand(stream_name, token, update))
-
# We send a POSITION command to ensure that they have an up to
# date token (especially useful if we didn't send any updates
# above)
self.send_command(PositionCommand(stream_name, current_token))
+ # Now we can send any updates that came in while we were subscribing
+ pending_rdata = self.pending_rdata.pop(stream_name, [])
+ for token, update in pending_rdata:
+ # Only send updates newer than the current token
+ if token > current_token:
+ self.send_command(RdataCommand(stream_name, token, update))
+
# They're now fully subscribed
self.replication_streams.add(stream_name)
except Exception as e:
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 0d7ea57318..8b2c4c3043 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -124,7 +124,7 @@ class ReplicationStreamer(object):
# Don't bother if nothing is listening. We still need to advance
# the stream tokens otherwise they'll fall beihind forever
for stream in self.streams:
- stream.advance_current_token()
+ stream.discard_updates_and_advance()
return
# If we're in the process of checking for new updates, mark that fact
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index fada40c6ef..4de4ebe84d 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -89,6 +89,13 @@ class Stream(object):
"""
self.upto_token = self.current_token()
+ def discard_updates_and_advance(self):
+ """Called when the stream should advance but the updates would be discarded,
+ e.g. when there are no currently connected workers.
+ """
+ self.upto_token = self.current_token()
+ self.last_token = self.upto_token
+
@defer.inlineCallbacks
def get_updates(self):
"""Gets all updates since the last time this function was called (or
|