summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2018-10-22 16:12:11 +0100
committerRichard van der Hoff <richard@matrix.org>2018-10-22 16:12:11 +0100
commitf749607c911a82e64685f1cfaeee892c49a1b606 (patch)
treeebb3e70cce4660fc614cb4ddb819b6f83572219b /synapse/push
parentRemove redundant run_as_background_process() from pusherpool (diff)
downloadsynapse-f749607c911a82e64685f1cfaeee892c49a1b606.tar.xz
Make on_started synchronous too
This brings it into line with on_new_notifications and on_new_receipts. It
requires a little bit of hoop-jumping in EmailPusher to load the throttle
params before the first loop.
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/emailpusher.py15
-rw-r--r--synapse/push/httppusher.py1
-rw-r--r--synapse/push/pusherpool.py16
3 files changed, 21 insertions, 11 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index d5a99b838c..0bc548ae7a 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -72,16 +72,9 @@ class EmailPusher(object):
 
         self.processing = False
 
-    @defer.inlineCallbacks
     def on_started(self):
         if self.mailer is not None:
-            try:
-                self.throttle_params = yield self.store.get_throttle_params_by_room(
-                    self.pusher_id
-                )
-                self._start_processing()
-            except Exception:
-                logger.exception("Error starting email pusher")
+            self._start_processing()
 
     def on_stop(self):
         if self.timed_call:
@@ -116,6 +109,12 @@ class EmailPusher(object):
         try:
             self.processing = True
 
+            if self.throttle_params is None:
+                # this is our first loop: load up the throttle params
+                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                    self.pusher_id
+                )
+
             # if the max ordering changes while we're running _unsafe_process,
             # call it again, and so on until we've caught up.
             while True:
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 770f55feae..33034d44da 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -93,7 +93,6 @@ class HttpPusher(object):
 
     def on_started(self):
         self._start_processing()
-        return defer.succeed(None)
 
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
         self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index a4d1ce3aad..695e582dce 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,12 +19,24 @@ import logging
 from twisted.internet import defer
 
 from synapse.push.pusher import PusherFactory
-from synapse.util.logcontext import run_in_background
 
 logger = logging.getLogger(__name__)
 
 
 class PusherPool:
+    """
+    The pusher pool. This is responsible for dispatching notifications of new events to
+    the http and email pushers.
+
+    It provides three methods which are designed to be called by the rest of the
+    application: `start`, `on_new_notifications`, and `on_new_receipts`: each of these
+    delegates to each of the relevant pushers.
+
+    Note that it is expected that each pusher will have its own 'processing' loop which
+    will send out the notifications in the background, rather than blocking until the
+    notifications are sent; accordingly Pusher.on_started, Pusher.on_new_notifications and
+    Pusher.on_new_receipts are not expected to return deferreds.
+    """
     def __init__(self, _hs):
         self.hs = _hs
         self.pusher_factory = PusherFactory(_hs)
@@ -216,7 +228,7 @@ class PusherPool:
         if appid_pushkey in byuser:
             byuser[appid_pushkey].on_stop()
         byuser[appid_pushkey] = p
-        run_in_background(p.on_started)
+        p.on_started()
 
     @defer.inlineCallbacks
     def remove_pusher(self, app_id, pushkey, user_id):