diff --git a/synapse/app/pusher.py b/synapse/app/pusher.py
index 98a4a7c62c..26930d1b3b 100644
--- a/synapse/app/pusher.py
+++ b/synapse/app/pusher.py
@@ -33,11 +33,11 @@ from synapse.server import HomeServer
from synapse.storage import DataStore
from synapse.storage.engines import create_engine
from synapse.util.httpresourcetree import create_resource_tree
-from synapse.util.logcontext import LoggingContext, preserve_fn
+from synapse.util.logcontext import LoggingContext, run_in_background
from synapse.util.manhole import manhole
from synapse.util.versionstring import get_version_string
from twisted.internet import defer, reactor
-from twisted.web.resource import Resource
+from twisted.web.resource import NoResource
logger = logging.getLogger("synapse.app.pusher")
@@ -94,7 +94,7 @@ class PusherServer(HomeServer):
if name == "metrics":
resources[METRICS_PREFIX] = MetricsResource(self)
- root_resource = create_resource_tree(resources, Resource())
+ root_resource = create_resource_tree(resources, NoResource())
_base.listen_tcp(
bind_addresses,
@@ -140,24 +140,27 @@ class PusherReplicationHandler(ReplicationClientHandler):
def on_rdata(self, stream_name, token, rows):
super(PusherReplicationHandler, self).on_rdata(stream_name, token, rows)
- preserve_fn(self.poke_pushers)(stream_name, token, rows)
+ run_in_background(self.poke_pushers, stream_name, token, rows)
@defer.inlineCallbacks
def poke_pushers(self, stream_name, token, rows):
- if stream_name == "pushers":
- for row in rows:
- if row.deleted:
- yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
- else:
- yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
- elif stream_name == "events":
- yield self.pusher_pool.on_new_notifications(
- token, token,
- )
- elif stream_name == "receipts":
- yield self.pusher_pool.on_new_receipts(
- token, token, set(row.room_id for row in rows)
- )
+ try:
+ if stream_name == "pushers":
+ for row in rows:
+ if row.deleted:
+ yield self.stop_pusher(row.user_id, row.app_id, row.pushkey)
+ else:
+ yield self.start_pusher(row.user_id, row.app_id, row.pushkey)
+ elif stream_name == "events":
+ yield self.pusher_pool.on_new_notifications(
+ token, token,
+ )
+ elif stream_name == "receipts":
+ yield self.pusher_pool.on_new_receipts(
+ token, token, set(row.room_id for row in rows)
+ )
+ except Exception:
+ logger.exception("Error poking pushers")
def stop_pusher(self, user_id, app_id, pushkey):
key = "%s:%s" % (app_id, pushkey)
|