summary refs log tree commit diff
path: root/synapse/app/pusher.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2017-04-04 15:22:54 +0100
committerErik Johnston <erik@matrix.org>2017-04-04 15:22:54 +0100
commitac66e11f2b1235f801195ad0065008cdca2f1b0b (patch)
treebe58241935d8c7361788e247abc19dae6d8648d3 /synapse/app/pusher.py
parentRemove unused worker config option (diff)
downloadsynapse-ac66e11f2b1235f801195ad0065008cdca2f1b0b.tar.xz
Add the appropriate amount of preserve_fn
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r--synapse/app/pusher.py13
1 files changed, 7 insertions, 6 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py

index cb76f058b0..f9114acfcb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py
@@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize @@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - self.poke_pushers(stream_name, token, rows) + preserve_fn(self.poke_pushers)(stream_name, token, rows) + @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): if stream_name == "pushers": for row in rows: if row.deleted: - self.stop_pusher(row.user_id, row.app_id, row.pushkey) + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) else: - self.start_pusher(row.user_id, row.app_id, row.pushkey) + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - preserve_fn(self.pusher_pool.on_new_notifications)( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - preserve_fn(self.pusher_pool.on_new_receipts)( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) )