summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py38
1 files changed, 19 insertions, 19 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1748182663..c14a18ba2e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -257,6 +257,11 @@ class ReplicationCommandHandler:
         if hs.config.redis.redis_enabled:
             self._notifier.add_lock_released_callback(self.on_lock_released)
 
+        # Marks if we should send POSITION commands for all streams ASAP. This
+        # is checked by the `ReplicationStreamer` which manages sending
+        # RDATA/POSITION commands
+        self._should_announce_positions = True
+
     def subscribe_to_channel(self, channel_name: str) -> None:
         """
         Indicates that we wish to subscribe to a Redis channel by name.
@@ -397,29 +402,23 @@ class ReplicationCommandHandler:
         return self._streams_to_replicate
 
     def on_REPLICATE(self, conn: IReplicationConnection, cmd: ReplicateCommand) -> None:
-        self.send_positions_to_connection(conn)
+        self.send_positions_to_connection()
 
-    def send_positions_to_connection(self, conn: IReplicationConnection) -> None:
+    def send_positions_to_connection(self) -> None:
         """Send current position of all streams this process is source of to
         the connection.
         """
 
-        # We respond with current position of all streams this instance
-        # replicates.
-        for stream in self.get_streams_to_replicate():
-            # Note that we use the current token as the prev token here (rather
-            # than stream.last_token), as we can't be sure that there have been
-            # no rows written between last token and the current token (since we
-            # might be racing with the replication sending bg process).
-            current_token = stream.current_token(self._instance_name)
-            self.send_command(
-                PositionCommand(
-                    stream.NAME,
-                    self._instance_name,
-                    current_token,
-                    current_token,
-                )
-            )
+        self._should_announce_positions = True
+        self._notifier.notify_replication()
+
+    def should_announce_positions(self) -> bool:
+        """Check if we should send POSITION commands for all streams ASAP."""
+        return self._should_announce_positions
+
+    def will_announce_positions(self) -> None:
+        """Mark that we're about to send POSITIONs out for all streams."""
+        self._should_announce_positions = False
 
     def on_USER_SYNC(
         self, conn: IReplicationConnection, cmd: UserSyncCommand
@@ -653,8 +652,9 @@ class ReplicationCommandHandler:
             # for why this can happen.
 
             logger.info(
-                "Fetching replication rows for '%s' between %i and %i",
+                "Fetching replication rows for '%s' / %s between %i and %i",
                 stream_name,
+                cmd.instance_name,
                 current_token,
                 cmd.new_token,
             )