summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py14
1 files changed, 7 insertions, 7 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index aa7fd90e26..fc06a7b053 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -110,7 +110,7 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
         port = hs.config.worker_replication_port
         hs.get_reactor().connectTCP(host, port, self.factory)
 
-    def on_rdata(self, stream_name, token, rows):
+    async def on_rdata(self, stream_name, token, rows):
         """Called to handle a batch of replication data with a given stream token.
 
         By default this just pokes the slave store. Can be overridden in subclasses to
@@ -121,20 +121,17 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
             token (int): stream token for this batch of rows
             rows (list): a list of Stream.ROW_TYPE objects as returned by
                 Stream.parse_row.
-
-        Returns:
-            Deferred|None
         """
         logger.debug("Received rdata %s -> %s", stream_name, token)
-        return self.store.process_replication_rows(stream_name, token, rows)
+        self.store.process_replication_rows(stream_name, token, rows)
 
-    def on_position(self, stream_name, token):
+    async def on_position(self, stream_name, token):
         """Called when we get new position data. By default this just pokes
         the slave store.
 
         Can be overriden in subclasses to handle more.
         """
-        return self.store.process_replication_rows(stream_name, token, [])
+        self.store.process_replication_rows(stream_name, token, [])
 
     def on_sync(self, data):
         """When we received a SYNC we wake up any deferreds that were waiting
@@ -146,6 +143,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
         if d:
             d.callback(data)
 
+    def on_remote_server_up(self, server: str):
+        """Called when get a new REMOTE_SERVER_UP command."""
+
     def get_streams_to_replicate(self) -> Dict[str, int]:
         """Called when a new connection has been established and we need to
         subscribe to streams.