diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/push/emailpusher.py | 11 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 12 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 28 | ||||
-rw-r--r-- | synapse/storage/event_push_actions.py | 30 |
4 files changed, 73 insertions, 8 deletions
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 50e1007d84..e8ee67401f 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -72,8 +72,15 @@ class EmailPusher(object): self._is_processing = False - def on_started(self): - if self.mailer is not None: + def on_started(self, should_check_for_notifs): + """Called when this pusher has been started. + + Args: + should_check_for_notifs (bool): Whether we should immediately + check for push to send. Set to False only if it's known there + is nothing to send + """ + if should_check_for_notifs and self.mailer is not None: self._start_processing() def on_stop(self): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index e65f8c63d3..fac05aa44c 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -112,8 +112,16 @@ class HttpPusher(object): self.data_minus_url.update(self.data) del self.data_minus_url['url'] - def on_started(self): - self._start_processing() + def on_started(self, should_check_for_notifs): + """Called when this pusher has been started. + + Args: + should_check_for_notifs (bool): Whether we should immediately + check for push to send. Set to False only if it's known there + is nothing to send + """ + if should_check_for_notifs: + self._start_processing() def on_new_notifications(self, min_stream_ordering, max_stream_ordering): self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering or 0) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index abf1a1a9c1..40a7709c09 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -21,6 +21,7 @@ from twisted.internet import defer from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException from synapse.push.pusher import PusherFactory +from synapse.util.async_helpers import concurrently_execute logger = logging.getLogger(__name__) @@ -197,7 +198,7 @@ class PusherPool: p = r if p: - self._start_pusher(p) + yield self._start_pusher(p) @defer.inlineCallbacks def _start_pushers(self): @@ -208,10 +209,14 @@ class PusherPool: """ pushers = yield self.store.get_all_pushers() logger.info("Starting %d pushers", len(pushers)) - for pusherdict in pushers: - self._start_pusher(pusherdict) + + # Stagger starting up the pushers so we don't completely drown the + # process on start up. + yield concurrently_execute(self._start_pusher, pushers, 10) + logger.info("Started pushers") + @defer.inlineCallbacks def _start_pusher(self, pusherdict): """Start the given pusher @@ -248,7 +253,22 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - p.on_started() + + # Check if there *may* be push to process. We do this as this check is a + # lot cheaper to do than actually fetching the exact rows we need to + # push. + user_id = pusherdict["user_name"] + last_stream_ordering = pusherdict["last_stream_ordering"] + if last_stream_ordering: + have_notifs = yield self.store.get_if_maybe_push_in_range_for_user( + user_id, last_stream_ordering, + ) + else: + # We always want to default to starting up the pusher rather than + # risk missing push. + have_notifs = True + + p.on_started(have_notifs) @defer.inlineCallbacks def remove_pusher(self, app_id, pushkey, user_id): diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 6840320641..13ce296a9b 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -386,6 +386,36 @@ class EventPushActionsWorkerStore(SQLBaseStore): # Now return the first `limit` defer.returnValue(notifs[:limit]) + def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering): + """A fast check to see if there might be something to push for the + user since the given stream ordering. May return false positives. + + Useful to know whether to bother starting a pusher on start up or not. + + Args: + user_id (str) + min_stream_ordering (int) + + Returns: + Deferred[bool]: True if there may be push to process, False if + there definitely isn't. + """ + + def _get_if_maybe_push_in_range_for_user_txn(txn): + sql = """ + SELECT 1 FROM event_push_actions + WHERE user_id = ? AND stream_ordering > ? + LIMIT 1 + """ + + txn.execute(sql, (user_id, min_stream_ordering,)) + return bool(txn.fetchone()) + + return self.runInteraction( + "get_if_maybe_push_in_range_for_user", + _get_if_maybe_push_in_range_for_user_txn, + ) + def add_push_actions_to_staging(self, event_id, user_id_actions): """Add the push actions for the event to the push action staging area. |