summary refs log tree commit diff
path: root/synapse/replication/tcp/protocol.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-04 13:19:26 +0100
committerErik Johnston <erik@matrix.org>2017-04-04 13:19:26 +0100
commit023ee197be23c02fa5f2e5bffba23fb0da2bf35b (patch)
treec0abea570b1eb5f83e01d43d61c9b46b7b9957c4 /synapse/replication/tcp/protocol.py
parentMerge pull request #2082 from matrix-org/erikj/repl_tcp_server (diff)
downloadsynapse-023ee197be23c02fa5f2e5bffba23fb0da2bf35b.tar.xz
Advance replication streams even if nothing is listening
Otherwise the streams don't advance and steadily fall behind, so when a
worker does connect either a) they'll be streamed lots of old updates or
b) the connection will fail as the streams are too far behind.
Diffstat (limited to 'synapse/replication/tcp/protocol.py')
-rw-r--r--synapse/replication/tcp/protocol.py12
1 files changed, 7 insertions, 5 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 80f732b455..743f0207fa 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -414,16 +414,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
                 token, row = update[0], update[1]
                 self.send_command(RdataCommand(stream_name, token, row))
 
-            # Now we can send any updates that came in while we were subscribing
-            pending_rdata = self.pending_rdata.pop(stream_name, [])
-            for token, update in pending_rdata:
-                self.send_command(RdataCommand(stream_name, token, update))
-
             # We send a POSITION command to ensure that they have an up to
             # date token (especially useful if we didn't send any updates
             # above)
             self.send_command(PositionCommand(stream_name, current_token))
 
+            # Now we can send any updates that came in while we were subscribing
+            pending_rdata = self.pending_rdata.pop(stream_name, [])
+            for token, update in pending_rdata:
+                # Only send updates newer than the current token
+                if token > current_token:
+                    self.send_command(RdataCommand(stream_name, token, update))
+
             # They're now fully subscribed
             self.replication_streams.add(stream_name)
         except Exception as e: