diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d5a99b838c..0bc548ae7a 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -72,16 +72,9 @@ class EmailPusher(object):
self.processing = False
- @defer.inlineCallbacks
def on_started(self):
if self.mailer is not None:
- try:
- self.throttle_params = yield self.store.get_throttle_params_by_room(
- self.pusher_id
- )
- self._start_processing()
- except Exception:
- logger.exception("Error starting email pusher")
+ self._start_processing()
def on_stop(self):
if self.timed_call:
@@ -116,6 +109,12 @@ class EmailPusher(object):
try:
self.processing = True
+ if self.throttle_params is None:
+ # this is our first loop: load up the throttle params
+ self.throttle_params = yield self.store.get_throttle_params_by_room(
+ self.pusher_id
+ )
+
# if the max ordering changes while we're running _unsafe_process,
# call it again, and so on until we've caught up.
while True:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 770f55feae..33034d44da 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -93,7 +93,6 @@ class HttpPusher(object):
def on_started(self):
self._start_processing()
- return defer.succeed(None)
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index a4d1ce3aad..695e582dce 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,12 +19,24 @@ import logging
from twisted.internet import defer
from synapse.push.pusher import PusherFactory
-from synapse.util.logcontext import run_in_background
logger = logging.getLogger(__name__)
class PusherPool:
+ """
+ The pusher pool. This is responsible for dispatching notifications of new events to
+ the http and email pushers.
+
+ It provides three methods which are designed to be called by the rest of the
+ application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
+ delegates to each of the relevant pushers.
+
+ Note that it is expected that each pusher will have its own 'processing' loop which
+ will send out the notifications in the background, rather than blocking until the
+ notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
+ Pusher.on_new_receipts are not expected to return deferreds.
+ """
def __init__(self, _hs):
self.hs = _hs
self.pusher_factory = PusherFactory(_hs)
@@ -216,7 +228,7 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
- run_in_background(p.on_started)
+ p.on_started()
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
|