summary refs log tree commit diff
path: root/synapse/app/pusher.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r--synapse/app/pusher.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index cb76f058b0..f9114acfcb 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string
 
 from synapse import events
 
-from twisted.internet import reactor
+from twisted.internet import reactor, defer
 from twisted.web.resource import Resource
 
 from daemonize import Daemonize
@@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler):
 
     def on_rdata(self, stream_name, token, rows):
         super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
-        self.poke_pushers(stream_name, token, rows)
+        preserve_fn(self.poke_pushers)(stream_name, token, rows)
 
+    @defer.inlineCallbacks
     def poke_pushers(self, stream_name, token, rows):
         if stream_name == "pushers":
             for row in rows:
                 if row.deleted:
-                    self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+                    yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
                 else:
-                    self.start_pusher(row.user_id, row.app_id, row.pushkey)
+                    yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
         elif stream_name == "events":
-            preserve_fn(self.pusher_pool.on_new_notifications)(
+            yield self.pusher_pool.on_new_notifications(
                 token, token,
             )
         elif stream_name == "receipts":
-            preserve_fn(self.pusher_pool.on_new_receipts)(
+            yield self.pusher_pool.on_new_receipts(
                 token, token, set(row.room_id for row in rows)
             )