summary refs log tree commit diff
path: root/synapse/replication/tcp/handler.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/handler.py')
-rw-r--r--synapse/replication/tcp/handler.py9
1 files changed, 8 insertions, 1 deletions
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 1b05468483..e6a50aa74e 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -151,6 +151,13 @@ class ReplicationCommandHandler:
             hs.get_reactor().connectTCP(host, port, self._factory)
 
     async def on_REPLICATE(self, conn: AbstractConnection, cmd: ReplicateCommand):
+        self.send_positions_to_connection(conn)
+
+    def send_positions_to_connection(self, conn: AbstractConnection):
+        """Send current position of all streams this process is source of to
+        the connection.
+        """
+
         # We only want to announce positions by the writer of the streams.
         # Currently this is just the master process.
         if not self._is_master:
@@ -158,7 +165,7 @@ class ReplicationCommandHandler:
 
         for stream_name, stream in self._streams.items():
             current_token = stream.current_token(self._instance_name)
-            self.send_command(
+            conn.send_command(
                 PositionCommand(stream_name, self._instance_name, current_token)
             )