summary refs log tree commit diff
path: root/synapse/replication/tcp
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp')
-rw-r--r--synapse/replication/tcp/handler.py38
-rw-r--r--synapse/replication/tcp/redis.py2
-rw-r--r--synapse/replication/tcp/resource.py17
3 files changed, 36 insertions, 21 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1748182663..c14a18ba2e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler:
         if hs.config.redis.redis_enabled:
             self._notifier.add_lock_released_callback(self.on_lock_released)
 
+        # Marks if we should send POSITION commands for all streams ASAP. This
+        # is checked by the `ReplicationStreamer` which manages sending
+        # RDATA/POSITION commands
+        self._should_announce_positions = True
+
     def subscribe_to_channel(self, channel_name: str) -> None:
         """
         Indicates that we wish to subscribe to a Redis channel by name.
@@ -397,29 +402,23 @@ class ReplicationCommandHandler:
         return self._streams_to_replicate
 
     def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
-        self.send_positions_to_connection(conn)
+        self.send_positions_to_connection()
 
-    def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
+    def send_positions_to_connection(self) -> None:
         """Send current position of all streams this process is source of to
         the connection.
         """
 
-        # We respond with current position of all streams this instance
-        # replicates.
-        for stream in self.get_streams_to_replicate():
-            # Note that we use the current token as the prev token here (rather
-            # than stream.last_token), as we can't be sure that there have been
-            # no rows written between last token and the current token (since we
-            # might be racing with the replication sending bg process).
-            current_token = stream.current_token(self._instance_name)
-            self.send_command(
-                PositionCommand(
-                    stream.NAME,
-                    self._instance_name,
-                    current_token,
-                    current_token,
-                )
-            )
+        self._should_announce_positions = True
+        self._notifier.notify_replication()
+
+    def should_announce_positions(self) -> bool:
+        """Check if we should send POSITION commands for all streams ASAP."""
+        return self._should_announce_positions
+
+    def will_announce_positions(self) -> None:
+        """Mark that we're about to send POSITIONs out for all streams."""
+        self._should_announce_positions = False
 
     def on_USER_SYNC(
         self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -653,8 +652,9 @@ class ReplicationCommandHandler:
             # for why this can happen.
 
             logger.info(
-                "Fetching replication rows for '%s' between %i and %i",
+                "Fetching replication rows for '%s' / %s between %i and %i",
                 stream_name,
+                cmd.instance_name,
                 current_token,
                 cmd.new_token,
             )
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index 7e96145b3b..1fa37bb888 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -141,7 +141,7 @@ class RedisSubscriber(SubscriberProtocol):
         # We send out our positions when there is a new connection in case the
         # other side missed updates. We do this for Redis connections as the
         # otherside won't know we've connected and so won't issue a REPLICATE.
-        self.synapse_handler.send_positions_to_connection(self)
+        self.synapse_handler.send_positions_to_connection()
 
     def messageReceived(self, pattern: str, channel: str, message: str) -> None:
         """Received a message from redis."""
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 38abb5df54..d15828f2d3 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -123,7 +123,7 @@ class ReplicationStreamer:
 
         # We check up front to see if anything has actually changed, as we get
         # poked because of changes that happened on other instances.
-        if all(
+        if not self.command_handler.should_announce_positions() and all(
             stream.last_token == stream.current_token(self._instance_name)
             for stream in self.streams
         ):
@@ -158,6 +158,21 @@ class ReplicationStreamer:
                         all_streams = list(all_streams)
                         random.shuffle(all_streams)
 
+                    if self.command_handler.should_announce_positions():
+                        # We need to send out POSITIONs for all streams, usually
+                        # because a worker has reconnected.
+                        self.command_handler.will_announce_positions()
+
+                        for stream in all_streams:
+                            self.command_handler.send_command(
+                                PositionCommand(
+                                    stream.NAME,
+                                    self._instance_name,
+                                    stream.last_token,
+                                    stream.last_token,
+                                )
+                            )
+
                     for stream in all_streams:
                         if stream.last_token == stream.current_token(
                             self._instance_name