diff options
author | Erik Johnston <erik@matrix.org> | 2017-04-04 15:22:54 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2017-04-04 15:22:54 +0100 |
commit | ac66e11f2b1235f801195ad0065008cdca2f1b0b (patch) | |
tree | be58241935d8c7361788e247abc19dae6d8648d3 /synapse/app/pusher.py | |
parent | Remove unused worker config option (diff) | |
download | synapse-ac66e11f2b1235f801195ad0065008cdca2f1b0b.tar.xz |
Add the appropriate amount of preserve_fn
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r-- | synapse/app/pusher.py | 13 |
1 files changed, 7 insertions, 6 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index cb76f058b0..f9114acfcb 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -39,7 +39,7 @@ from synapse.util.versionstring import get_version_string from synapse import events -from twisted.internet import reactor +from twisted.internet import reactor, defer from twisted.web.resource import Resource from daemonize import Daemonize @@ -170,21 +170,22 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - self.poke_pushers(stream_name, token, rows) + preserve_fn(self.poke_pushers)(stream_name, token, rows) + @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): if stream_name == "pushers": for row in rows: if row.deleted: - self.stop_pusher(row.user_id, row.app_id, row.pushkey) + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) else: - self.start_pusher(row.user_id, row.app_id, row.pushkey) + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) elif stream_name == "events": - preserve_fn(self.pusher_pool.on_new_notifications)( + yield self.pusher_pool.on_new_notifications( token, token, ) elif stream_name == "receipts": - preserve_fn(self.pusher_pool.on_new_receipts)( + yield self.pusher_pool.on_new_receipts( token, token, set(row.room_id for row in rows) ) |