diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index cb76f058b0..f9114acfcb 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string
from synapse import events
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
from twisted.web.resource import Resource
from daemonize import Daemonize
@@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
- self.poke_pushers(stream_name, token, rows)
+ preserve_fn(self.poke_pushers)(stream_name, token, rows)
+ @defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
if stream_name == "pushers":
for row in rows:
if row.deleted:
- self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+ yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
else:
- self.start_pusher(row.user_id, row.app_id, row.pushkey)
+ yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
elif stream_name == "events":
- preserve_fn(self.pusher_pool.on_new_notifications)(
+ yield self.pusher_pool.on_new_notifications(
token, token,
)
elif stream_name == "receipts":
- preserve_fn(self.pusher_pool.on_new_receipts)(
+ yield self.pusher_pool.on_new_receipts(
token, token, set(row.room_id for row in rows)
)
|