diff options
author | Erik Johnston <erik@matrix.org> | 2019-04-17 19:44:40 +0100 |
---|---|---|
committer | Erik Johnston <erik@matrix.org> | 2019-04-17 19:44:40 +0100 |
commit | ca90336a6935b36b5761244005b0f68b496d5d79 (patch) | |
tree | 6bbce5eafc0db3b24ccc3b59b051da850382ae09 /synapse/push/pusherpool.py | |
parent | Add management endpoints for account validity (diff) | |
parent | Merge pull request #5047 from matrix-org/babolivier/account_expiration (diff) | |
download | synapse-ca90336a6935b36b5761244005b0f68b496d5d79.tar.xz |
Merge branch 'develop' of github.com:matrix-org/synapse into babolivier/account_expiration
Diffstat (limited to 'synapse/push/pusherpool.py')
-rw-r--r-- | synapse/push/pusherpool.py | 28 |
1 files changed, 24 insertions, 4 deletions
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index abf1a1a9c1..40a7709c09 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException from synapse.push.pusher import PusherFactory +from synapse.util.async_helpers import concurrently_execute logger = logging.getLogger(__name__) @@ -197,7 +198,7 @@ class PusherPool: p = r if p: - self._start_pusher(p) + yield self._start_pusher(p) @defer.inlineCallbacks def _start_pushers(self): @@ -208,10 +209,14 @@ class PusherPool: """ pushers = yield self.store.get_all_pushers() logger.info("Starting %d pushers", len(pushers)) - for pusherdict in pushers: - self._start_pusher(pusherdict) + + # Stagger starting up the pushers so we don't completely drown the + # process on start up. + yield concurrently_execute(self._start_pusher, pushers, 10) + logger.info("Started pushers") + @defer.inlineCallbacks def _start_pusher(self, pusherdict): """Start the given pusher @@ -248,7 +253,22 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - p.on_started() + + # Check if there *may* be push to process. We do this as this check is a + # lot cheaper to do than actually fetching the exact rows we need to + # push. + user_id = pusherdict["user_name"] + last_stream_ordering = pusherdict["last_stream_ordering"] + if last_stream_ordering: + have_notifs = yield self.store.get_if_maybe_push_in_range_for_user( + user_id, last_stream_ordering, + ) + else: + # We always want to default to starting up the pusher rather than + # risk missing push. + have_notifs = True + + p.on_started(have_notifs) @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): |