diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 8059fff1b2..36f450c31d 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -24,6 +24,7 @@ import baserules
import logging
import simplejson as json
import re
+import random
logger = logging.getLogger(__name__)
@@ -256,134 +257,154 @@ class Pusher(object):
logger.info("Pusher %s for user %s starting from token %s",
self.pushkey, self.user_name, self.last_token)
+ wait = 0
while self.alive:
- from_tok = StreamToken.from_string(self.last_token)
- config = PaginationConfig(from_token=from_tok, limit='1')
- chunk = yield self.evStreamHandler.get_stream(
- self.user_name, config,
- timeout=100*365*24*60*60*1000, affect_presence=False
- )
+ try:
+ if wait > 0:
+ yield synapse.util.async.sleep(wait)
+ yield self.get_and_dispatch()
+ wait = 0
+ except:
+ if wait == 0:
+ wait = 1
+ else:
+ wait = min(wait * 2, 1800)
+ logger.exception(
+ "Exception in pusher loop for pushkey %s. Pausing for %ds",
+ self.pushkey, wait
+ )
- # limiting to 1 may get 1 event plus 1 presence event, so
- # pick out the actual event
- single_event = None
- for c in chunk['chunk']:
- if 'event_id' in c: # Hmmm...
- single_event = c
- break
- if not single_event:
- self.last_token = chunk['end']
- continue
+ @defer.inlineCallbacks
+ def get_and_dispatch(self):
+ from_tok = StreamToken.from_string(self.last_token)
+ config = PaginationConfig(from_token=from_tok, limit='1')
+ timeout = (300 + random.randint(-60, 60)) * 1000
+ chunk = yield self.evStreamHandler.get_stream(
+ self.user_name, config,
+ timeout=timeout, affect_presence=False
+ )
- if not self.alive:
- continue
+ # limiting to 1 may get 1 event plus 1 presence event, so
+ # pick out the actual event
+ single_event = None
+ for c in chunk['chunk']:
+ if 'event_id' in c: # Hmmm...
+ single_event = c
+ break
+ if not single_event:
+ self.last_token = chunk['end']
+ logger.debug("Event stream timeout for pushkey %s", self.pushkey)
+ return
- processed = False
- actions = yield self._actions_for_event(single_event)
- tweaks = _tweaks_for_actions(actions)
+ if not self.alive:
+ return
- if len(actions) == 0:
- logger.warn("Empty actions! Using default action.")
- actions = Pusher.DEFAULT_ACTIONS
+ processed = False
+ actions = yield self._actions_for_event(single_event)
+ tweaks = _tweaks_for_actions(actions)
- if 'notify' not in actions and 'dont_notify' not in actions:
- logger.warn("Neither notify nor dont_notify in actions: adding default")
- actions.extend(Pusher.DEFAULT_ACTIONS)
+ if len(actions) == 0:
+ logger.warn("Empty actions! Using default action.")
+ actions = Pusher.DEFAULT_ACTIONS
- if 'dont_notify' in actions:
- logger.debug(
- "%s for %s: dont_notify",
- single_event['event_id'], self.user_name
- )
+ if 'notify' not in actions and 'dont_notify' not in actions:
+ logger.warn("Neither notify nor dont_notify in actions: adding default")
+ actions.extend(Pusher.DEFAULT_ACTIONS)
+
+ if 'dont_notify' in actions:
+ logger.debug(
+ "%s for %s: dont_notify",
+ single_event['event_id'], self.user_name
+ )
+ processed = True
+ else:
+ rejected = yield self.dispatch_push(single_event, tweaks)
+ self.has_unread = True
+ if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
- else:
- rejected = yield self.dispatch_push(single_event, tweaks)
- self.has_unread = True
- if isinstance(rejected, list) or isinstance(rejected, tuple):
- processed = True
- for pk in rejected:
- if pk != self.pushkey:
- # for sanity, we only remove the pushkey if it
- # was the one we actually sent...
- logger.warn(
- ("Ignoring rejected pushkey %s because we"
- " didn't send it"), pk
- )
- else:
- logger.info(
- "Pushkey %s was rejected: removing",
- pk
- )
- yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk, self.user_name
- )
-
- if not self.alive:
- continue
+ for pk in rejected:
+ if pk != self.pushkey:
+ # for sanity, we only remove the pushkey if it
+ # was the one we actually sent...
+ logger.warn(
+ ("Ignoring rejected pushkey %s because we"
+ " didn't send it"), pk
+ )
+ else:
+ logger.info(
+ "Pushkey %s was rejected: removing",
+ pk
+ )
+ yield self.hs.get_pusherpool().remove_pusher(
+ self.app_id, pk, self.user_name
+ )
+
+ if not self.alive:
+ return
+
+ if processed:
+ self.backoff_delay = Pusher.INITIAL_BACKOFF
+ self.last_token = chunk['end']
+ self.store.update_pusher_last_token_and_success(
+ self.app_id,
+ self.pushkey,
+ self.user_name,
+ self.last_token,
+ self.clock.time_msec()
+ )
+ if self.failing_since:
+ self.failing_since = None
+ self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_name,
+ self.failing_since)
+ else:
+ if not self.failing_since:
+ self.failing_since = self.clock.time_msec()
+ self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_name,
+ self.failing_since
+ )
- if processed:
+ if (self.failing_since and
+ self.failing_since <
+ self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
+ # we really only give up so that if the URL gets
+ # fixed, we don't suddenly deliver a load
+ # of old notifications.
+ logger.warn("Giving up on a notification to user %s, "
+ "pushkey %s",
+ self.user_name, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
- self.store.update_pusher_last_token_and_success(
+ self.store.update_pusher_last_token(
+ self.app_id,
+ self.pushkey,
+ self.user_name,
+ self.last_token
+ )
+
+ self.failing_since = None
+ self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_name,
- self.last_token,
- self.clock.time_msec()
+ self.failing_since
)
- if self.failing_since:
- self.failing_since = None
- self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_name,
- self.failing_since)
else:
- if not self.failing_since:
- self.failing_since = self.clock.time_msec()
- self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_name,
- self.failing_since
- )
-
- if (self.failing_since and
- self.failing_since <
- self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
- # we really only give up so that if the URL gets
- # fixed, we don't suddenly deliver a load
- # of old notifications.
- logger.warn("Giving up on a notification to user %s, "
- "pushkey %s",
- self.user_name, self.pushkey)
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- self.store.update_pusher_last_token(
- self.app_id,
- self.pushkey,
- self.user_name,
- self.last_token
- )
-
- self.failing_since = None
- self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_name,
- self.failing_since
- )
- else:
- logger.warn("Failed to dispatch push for user %s "
- "(failing for %dms)."
- "Trying again in %dms",
- self.user_name,
- self.clock.time_msec() - self.failing_since,
- self.backoff_delay)
- yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
- self.backoff_delay *= 2
- if self.backoff_delay > Pusher.MAX_BACKOFF:
- self.backoff_delay = Pusher.MAX_BACKOFF
+ logger.warn("Failed to dispatch push for user %s "
+ "(failing for %dms)."
+ "Trying again in %dms",
+ self.user_name,
+ self.clock.time_msec() - self.failing_since,
+ self.backoff_delay)
+ yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+ self.backoff_delay *= 2
+ if self.backoff_delay > Pusher.MAX_BACKOFF:
+ self.backoff_delay = Pusher.MAX_BACKOFF
def stop(self):
self.alive = False
|