summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/protocol.py16
-rw-r--r--synapse/replication/tcp/resource.py2
-rw-r--r--synapse/replication/tcp/streams.py7
3 files changed, 17 insertions, 8 deletions
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 80f732b455..6864204616 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -365,6 +365,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         self.streamer.new_connection(self)
 
     def on_NAME(self, cmd):
+        logger.info("[%s] Renamed to %r", self.id(), cmd.data)
         self.name = cmd.data
 
     def on_USER_SYNC(self, cmd):
@@ -414,16 +415,18 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
                 token, row = update[0], update[1]
                 self.send_command(RdataCommand(stream_name, token, row))
 
-            # Now we can send any updates that came in while we were subscribing
-            pending_rdata = self.pending_rdata.pop(stream_name, [])
-            for token, update in pending_rdata:
-                self.send_command(RdataCommand(stream_name, token, update))
-
             # We send a POSITION command to ensure that they have an up to
             # date token (especially useful if we didn't send any updates
             # above)
             self.send_command(PositionCommand(stream_name, current_token))
 
+            # Now we can send any updates that came in while we were subscribing
+            pending_rdata = self.pending_rdata.pop(stream_name, [])
+            for token, update in pending_rdata:
+                # Only send updates newer than the current token
+                if token > current_token:
+                    self.send_command(RdataCommand(stream_name, token, update))
+
             # They're now fully subscribed
             self.replication_streams.add(stream_name)
         except Exception as e:
@@ -442,7 +445,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
             self.send_command(RdataCommand(stream_name, token, data))
         elif stream_name in self.connecting_streams:
             # The client is being subscribed to the stream
-            logger.info("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
+            logger.debug("[%s] Queuing RDATA %r %r", self.id(), stream_name, token)
             self.pending_rdata.setdefault(stream_name, []).append((token, data))
         else:
             # The client isn't subscribed
@@ -453,7 +456,6 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
 
     def on_connection_closed(self):
         BaseReplicationStreamProtocol.on_connection_closed(self)
-        logger.info("[%s] Replication connection closed", self.id())
         self.streamer.lost_connection(self)
 
 
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 0d7ea57318..8b2c4c3043 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -124,7 +124,7 @@ class ReplicationStreamer(object):
             # Don't bother if nothing is listening. We still need to advance
             # the stream tokens otherwise they'll fall beihind forever
             for stream in self.streams:
-                stream.advance_current_token()
+                stream.discard_updates_and_advance()
             return
 
         # If we're in the process of checking for new updates, mark that fact
diff --git a/synapse/replication/tcp/streams.py b/synapse/replication/tcp/streams.py
index fada40c6ef..4de4ebe84d 100644
--- a/synapse/replication/tcp/streams.py
+++ b/synapse/replication/tcp/streams.py
@@ -89,6 +89,13 @@ class Stream(object):
         """
         self.upto_token = self.current_token()
 
+    def discard_updates_and_advance(self):
+        """Called when the stream should advance but the updates would be discarded,
+        e.g. when there are no currently connected workers.
+        """
+        self.upto_token = self.current_token()
+        self.last_token = self.upto_token
+
     @defer.inlineCallbacks
     def get_updates(self):
         """Gets all updates since the last time this function was called (or