diff --git a/synapse/app/appservice.py b/synapse/app/appservice.py
index 9a37384fb7..3348a8ec6d 100644
--- a/synapse/app/appservice.py
+++ b/synapse/app/appservice.py
@@ -117,8 +117,9 @@ class ASReplicationHandler(ReplicationClientHandler):
super(ASReplicationHandler, self).__init__(hs.get_datastore())
self.appservice_handler = hs.get_application_service_handler()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(ASReplicationHandler, self).on_rdata(stream_name, token, rows)
if stream_name == "events":
max_stream_id = self.store.get_room_max_stream_ordering()
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 7a4310ca18..d59007099b 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -144,8 +144,9 @@ class FederationSenderReplicationHandler(ReplicationClientHandler):
super(FederationSenderReplicationHandler, self).__init__(hs.get_datastore())
self.send_handler = FederationSenderHandler(hs, self)
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(FederationSenderReplicationHandler, self).on_rdata(
+ yield super(FederationSenderReplicationHandler, self).on_rdata(
stream_name, token, rows
)
self.send_handler.process_replication_rows(stream_name, token, rows)
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 0089a7c64f..a4fc7e91fa 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -148,8 +148,9 @@ class PusherReplicationHandler(ReplicationClientHandler):
self.pusher_pool = hs.get_pusherpool()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
diff --git a/synapse/app/synchrotron.py b/synapse/app/synchrotron.py
index cade09d60e..27e1998660 100644
--- a/synapse/app/synchrotron.py
+++ b/synapse/app/synchrotron.py
@@ -338,8 +338,9 @@ class SyncReplicationHandler(ReplicationClientHandler):
self.presence_handler = hs.get_presence_handler()
self.notifier = hs.get_notifier()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
+ yield super(SyncReplicationHandler, self).on_rdata(stream_name, token, rows)
run_in_background(self.process_and_notify, stream_name, token, rows)
def get_streams_to_replicate(self):
diff --git a/synapse/app/user_dir.py b/synapse/app/user_dir.py
index cb78de8834..1388a42b59 100644
--- a/synapse/app/user_dir.py
+++ b/synapse/app/user_dir.py
@@ -169,8 +169,9 @@ class UserDirectoryReplicationHandler(ReplicationClientHandler):
super(UserDirectoryReplicationHandler, self).__init__(hs.get_datastore())
self.user_directory = hs.get_user_directory_handler()
+ @defer.inlineCallbacks
def on_rdata(self, stream_name, token, rows):
- super(UserDirectoryReplicationHandler, self).on_rdata(
+ yield super(UserDirectoryReplicationHandler, self).on_rdata(
stream_name, token, rows
)
if stream_name == "current_state_deltas":
|