diff options
author | David Baker <dave@matrix.org> | 2015-06-05 11:40:22 +0100 |
---|---|---|
committer | David Baker <dave@matrix.org> | 2015-06-05 11:40:22 +0100 |
commit | b8690dd840ea293717cd44bffe91e2cfe36bd9c8 (patch) | |
tree | 230a95c14c445aafbe2ffc6df6162c973707439f | |
parent | pep8 (diff) | |
download | synapse-b8690dd840ea293717cd44bffe91e2cfe36bd9c8.tar.xz |
Catch any exceptions in the pusher loop. Use a lower timeout for pushers so we can see if they're actually still running.
-rw-r--r-- | synapse/push/__init__.py | 245 |
1 files changed, 133 insertions, 112 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 8059fff1b2..36f450c31d 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -24,6 +24,7 @@ import baserules import logging import simplejson as json import re +import random logger = logging.getLogger(__name__) @@ -256,134 +257,154 @@ class Pusher(object): logger.info("Pusher %s for user %s starting from token %s", self.pushkey, self.user_name, self.last_token) + wait = 0 while self.alive: - from_tok = StreamToken.from_string(self.last_token) - config = PaginationConfig(from_token=from_tok, limit='1') - chunk = yield self.evStreamHandler.get_stream( - self.user_name, config, - timeout=100*365*24*60*60*1000, affect_presence=False - ) + try: + if wait > 0: + yield synapse.util.async.sleep(wait) + yield self.get_and_dispatch() + wait = 0 + except: + if wait == 0: + wait = 1 + else: + wait = min(wait * 2, 1800) + logger.exception( + "Exception in pusher loop for pushkey %s. Pausing for %ds", + self.pushkey, wait + ) - # limiting to 1 may get 1 event plus 1 presence event, so - # pick out the actual event - single_event = None - for c in chunk['chunk']: - if 'event_id' in c: # Hmmm... - single_event = c - break - if not single_event: - self.last_token = chunk['end'] - continue + @defer.inlineCallbacks + def get_and_dispatch(self): + from_tok = StreamToken.from_string(self.last_token) + config = PaginationConfig(from_token=from_tok, limit='1') + timeout = (300 + random.randint(-60, 60)) * 1000 + chunk = yield self.evStreamHandler.get_stream( + self.user_name, config, + timeout=timeout, affect_presence=False + ) - if not self.alive: - continue + # limiting to 1 may get 1 event plus 1 presence event, so + # pick out the actual event + single_event = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + single_event = c + break + if not single_event: + self.last_token = chunk['end'] + logger.debug("Event stream timeout for pushkey %s", self.pushkey) + return - processed = False - actions = yield self._actions_for_event(single_event) - tweaks = _tweaks_for_actions(actions) + if not self.alive: + return - if len(actions) == 0: - logger.warn("Empty actions! Using default action.") - actions = Pusher.DEFAULT_ACTIONS + processed = False + actions = yield self._actions_for_event(single_event) + tweaks = _tweaks_for_actions(actions) - if 'notify' not in actions and 'dont_notify' not in actions: - logger.warn("Neither notify nor dont_notify in actions: adding default") - actions.extend(Pusher.DEFAULT_ACTIONS) + if len(actions) == 0: + logger.warn("Empty actions! Using default action.") + actions = Pusher.DEFAULT_ACTIONS - if 'dont_notify' in actions: - logger.debug( - "%s for %s: dont_notify", - single_event['event_id'], self.user_name - ) + if 'notify' not in actions and 'dont_notify' not in actions: + logger.warn("Neither notify nor dont_notify in actions: adding default") + actions.extend(Pusher.DEFAULT_ACTIONS) + + if 'dont_notify' in actions: + logger.debug( + "%s for %s: dont_notify", + single_event['event_id'], self.user_name + ) + processed = True + else: + rejected = yield self.dispatch_push(single_event, tweaks) + self.has_unread = True + if isinstance(rejected, list) or isinstance(rejected, tuple): processed = True - else: - rejected = yield self.dispatch_push(single_event, tweaks) - self.has_unread = True - if isinstance(rejected, list) or isinstance(rejected, tuple): - processed = True - for pk in rejected: - if pk != self.pushkey: - # for sanity, we only remove the pushkey if it - # was the one we actually sent... - logger.warn( - ("Ignoring rejected pushkey %s because we" - " didn't send it"), pk - ) - else: - logger.info( - "Pushkey %s was rejected: removing", - pk - ) - yield self.hs.get_pusherpool().remove_pusher( - self.app_id, pk, self.user_name - ) - - if not self.alive: - continue + for pk in rejected: + if pk != self.pushkey: + # for sanity, we only remove the pushkey if it + # was the one we actually sent... + logger.warn( + ("Ignoring rejected pushkey %s because we" + " didn't send it"), pk + ) + else: + logger.info( + "Pushkey %s was rejected: removing", + pk + ) + yield self.hs.get_pusherpool().remove_pusher( + self.app_id, pk, self.user_name + ) + + if not self.alive: + return + + if processed: + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + self.store.update_pusher_last_token_and_success( + self.app_id, + self.pushkey, + self.user_name, + self.last_token, + self.clock.time_msec() + ) + if self.failing_since: + self.failing_since = None + self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_name, + self.failing_since) + else: + if not self.failing_since: + self.failing_since = self.clock.time_msec() + self.store.update_pusher_failing_since( + self.app_id, + self.pushkey, + self.user_name, + self.failing_since + ) - if processed: + if (self.failing_since and + self.failing_since < + self.clock.time_msec() - Pusher.GIVE_UP_AFTER): + # we really only give up so that if the URL gets + # fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, " + "pushkey %s", + self.user_name, self.pushkey) self.backoff_delay = Pusher.INITIAL_BACKOFF self.last_token = chunk['end'] - self.store.update_pusher_last_token_and_success( + self.store.update_pusher_last_token( + self.app_id, + self.pushkey, + self.user_name, + self.last_token + ) + + self.failing_since = None + self.store.update_pusher_failing_since( self.app_id, self.pushkey, self.user_name, - self.last_token, - self.clock.time_msec() + self.failing_since ) - if self.failing_since: - self.failing_since = None - self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_name, - self.failing_since) else: - if not self.failing_since: - self.failing_since = self.clock.time_msec() - self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_name, - self.failing_since - ) - - if (self.failing_since and - self.failing_since < - self.clock.time_msec() - Pusher.GIVE_UP_AFTER): - # we really only give up so that if the URL gets - # fixed, we don't suddenly deliver a load - # of old notifications. - logger.warn("Giving up on a notification to user %s, " - "pushkey %s", - self.user_name, self.pushkey) - self.backoff_delay = Pusher.INITIAL_BACKOFF - self.last_token = chunk['end'] - self.store.update_pusher_last_token( - self.app_id, - self.pushkey, - self.user_name, - self.last_token - ) - - self.failing_since = None - self.store.update_pusher_failing_since( - self.app_id, - self.pushkey, - self.user_name, - self.failing_since - ) - else: - logger.warn("Failed to dispatch push for user %s " - "(failing for %dms)." - "Trying again in %dms", - self.user_name, - self.clock.time_msec() - self.failing_since, - self.backoff_delay) - yield synapse.util.async.sleep(self.backoff_delay / 1000.0) - self.backoff_delay *= 2 - if self.backoff_delay > Pusher.MAX_BACKOFF: - self.backoff_delay = Pusher.MAX_BACKOFF + logger.warn("Failed to dispatch push for user %s " + "(failing for %dms)." + "Trying again in %dms", + self.user_name, + self.clock.time_msec() - self.failing_since, + self.backoff_delay) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *= 2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF def stop(self): self.alive = False |