1 files changed, 24 insertions, 4 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index abf1a1a9c1..40a7709c09 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -21,6 +21,7 @@ from twisted.internet import defer
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
from synapse.push.pusher import PusherFactory
+from synapse.util.async_helpers import concurrently_execute
logger = logging.getLogger(__name__)
@@ -197,7 +198,7 @@ class PusherPool:
p = r
if p:
- self._start_pusher(p)
+ yield self._start_pusher(p)
@defer.inlineCallbacks
def _start_pushers(self):
@@ -208,10 +209,14 @@ class PusherPool:
"""
pushers = yield self.store.get_all_pushers()
logger.info("Starting %d pushers", len(pushers))
- for pusherdict in pushers:
- self._start_pusher(pusherdict)
+
+ # Stagger starting up the pushers so we don't completely drown the
+ # process on start up.
+ yield concurrently_execute(self._start_pusher, pushers, 10)
+
logger.info("Started pushers")
+ @defer.inlineCallbacks
def _start_pusher(self, pusherdict):
"""Start the given pusher
@@ -248,7 +253,22 @@ class PusherPool:
if appid_pushkey in byuser:
byuser[appid_pushkey].on_stop()
byuser[appid_pushkey] = p
- p.on_started()
+
+ # Check if there *may* be push to process. We do this as this check is a
+ # lot cheaper to do than actually fetching the exact rows we need to
+ # push.
+ user_id = pusherdict["user_name"]
+ last_stream_ordering = pusherdict["last_stream_ordering"]
+ if last_stream_ordering:
+ have_notifs = yield self.store.get_if_maybe_push_in_range_for_user(
+ user_id, last_stream_ordering,
+ )
+ else:
+ # We always want to default to starting up the pusher rather than
+ # risk missing push.
+ have_notifs = True
+
+ p.on_started(have_notifs)
@defer.inlineCallbacks
def remove_pusher(self, app_id, pushkey, user_id):
|