summary refs log tree commit diff
path: root/synapse/replication/tcp/client.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/replication/tcp/client.py')
-rw-r--r--synapse/replication/tcp/client.py11
1 files changed, 4 insertions, 7 deletions
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index aa7fd90e26..52a0aefe68 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -110,7 +110,7 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
         port = hs.config.worker_replication_port
         hs.get_reactor().connectTCP(host, port, self.factory)
 
-    def on_rdata(self, stream_name, token, rows):
+    async def on_rdata(self, stream_name, token, rows):
         """Called to handle a batch of replication data with a given stream token.
 
         By default this just pokes the slave store. Can be overridden in subclasses to
@@ -121,20 +121,17 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
             token (int): stream token for this batch of rows
             rows (list): a list of Stream.ROW_TYPE objects as returned by
                 Stream.parse_row.
-
-        Returns:
-            Deferred|None
         """
         logger.debug("Received rdata %s -> %s", stream_name, token)
-        return self.store.process_replication_rows(stream_name, token, rows)
+        self.store.process_replication_rows(stream_name, token, rows)
 
-    def on_position(self, stream_name, token):
+    async def on_position(self, stream_name, token):
         """Called when we get new position data. By default this just pokes
         the slave store.
 
         Can be overriden in subclasses to handle more.
         """
-        return self.store.process_replication_rows(stream_name, token, [])
+        self.store.process_replication_rows(stream_name, token, [])
 
     def on_sync(self, data):
         """When we received a SYNC we wake up any deferreds that were waiting