diff options
author | Neil Johnson <neil@fragile.org.uk> | 2018-05-14 09:31:42 +0100 |
---|---|---|
committer | Neil Johnson <neil@fragile.org.uk> | 2018-05-14 09:31:42 +0100 |
commit | 977765bde2987770f63065d839f9686a7a144140 (patch) | |
tree | 41d3a247f546cfe50500f465e50a798a597ef464 /synapse/app/pusher.py | |
parent | remove user agent from data model, will just join on user_ips (diff) | |
parent | Merge pull request #2846 from kaiyou/feat-dockerfile (diff) | |
download | synapse-977765bde2987770f63065d839f9686a7a144140.tar.xz |
Merge branch 'develop' of https://github.com/matrix-org/synapse into cohort_analytics
Diffstat (limited to 'synapse/app/pusher.py')
-rw-r--r-- | synapse/app/pusher.py | 36 |
1 files changed, 20 insertions, 16 deletions
diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py index d5c3a85195..3912eae48c 100644 --- a/synapse/app/pusher.py +++ b/synapse/app/pusher.py @@ -33,7 +33,7 @@ from synapse.server import HomeServer from synapse.storage import DataStore from synapse.storage.engines import create_engine from synapse.util.httpresourcetree import create_resource_tree -from synapse.util.logcontext import LoggingContext, preserve_fn +from synapse.util.logcontext import LoggingContext, run_in_background from synapse.util.manhole import manhole from synapse.util.versionstring import get_version_string from twisted.internet import defer, reactor @@ -104,6 +104,7 @@ class PusherServer(HomeServer): site_tag, listener_config, root_resource, + self.version_string, ) ) @@ -140,24 +141,27 @@ class PusherReplicationHandler(ReplicationClientHandler): def on_rdata(self, stream_name, token, rows): super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows) - preserve_fn(self.poke_pushers)(stream_name, token, rows) + run_in_background(self.poke_pushers, stream_name, token, rows) @defer.inlineCallbacks def poke_pushers(self, stream_name, token, rows): - if stream_name == "pushers": - for row in rows: - if row.deleted: - yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) - else: - yield self.start_pusher(row.user_id, row.app_id, row.pushkey) - elif stream_name == "events": - yield self.pusher_pool.on_new_notifications( - token, token, - ) - elif stream_name == "receipts": - yield self.pusher_pool.on_new_receipts( - token, token, set(row.room_id for row in rows) - ) + try: + if stream_name == "pushers": + for row in rows: + if row.deleted: + yield self.stop_pusher(row.user_id, row.app_id, row.pushkey) + else: + yield self.start_pusher(row.user_id, row.app_id, row.pushkey) + elif stream_name == "events": + yield self.pusher_pool.on_new_notifications( + token, token, + ) + elif stream_name == "receipts": + yield self.pusher_pool.on_new_receipts( + token, token, set(row.room_id for row in rows) + ) + except Exception: + logger.exception("Error poking pushers") def stop_pusher(self, user_id, app_id, pushkey): key = "%s:%s" % (app_id, pushkey) |