diff options
author | Erik Johnston <erikj@jki.re> | 2019-04-02 18:23:32 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2019-04-02 18:23:32 +0100 |
commit | 3039d61baf7b4ab914a2d659737337cd09127a51 (patch) | |
tree | 672c4e457a5596a408423ff37c1ead65475f73b3 /synapse/push/pusherpool.py | |
parent | Transfer related groups on room upgrade (#4990) (diff) | |
parent | s/misc/feature/ (diff) | |
download | synapse-3039d61baf7b4ab914a2d659737337cd09127a51.tar.xz |
Merge pull request #4991 from matrix-org/erikj/stagger_push_startup
Make starting pushers faster during start up
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r-- | synapse/push/pusherpool.py | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index abf1a1a9c1..40a7709c09 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException from synapse.push.pusher import PusherFactory +from synapse.util.async_helpers import concurrently_execute logger = logging.getLogger(__name__) @@ -197,7 +198,7 @@ class PusherPool: p = r if p: - self._start_pusher(p) + yield self._start_pusher(p) @defer.inlineCallbacks def _start_pushers(self): @@ -208,10 +209,14 @@ class PusherPool: """ pushers = yield self.store.get_all_pushers() logger.info("Starting %d pushers", len(pushers)) - for pusherdict in pushers: - self._start_pusher(pusherdict) + + # Stagger starting up the pushers so we don't completely drown the + # process on start up. + yield concurrently_execute(self._start_pusher, pushers, 10) + logger.info("Started pushers") + @defer.inlineCallbacks def _start_pusher(self, pusherdict): """Start the given pusher @@ -248,7 +253,22 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - p.on_started() + + # Check if there *may* be push to process. We do this as this check is a + # lot cheaper to do than actually fetching the exact rows we need to + # push. + user_id = pusherdict["user_name"] + last_stream_ordering = pusherdict["last_stream_ordering"] + if last_stream_ordering: + have_notifs = yield self.store.get_if_maybe_push_in_range_for_user( + user_id, last_stream_ordering, + ) + else: + # We always want to default to starting up the pusher rather than + # risk missing push. + have_notifs = True + + p.on_started(have_notifs) @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): |