diff options
author | David Baker <dbkr@matrix.org> | 2014-11-21 12:21:00 +0000 |
---|---|---|
committer | David Baker <dbkr@matrix.org> | 2014-11-21 12:21:00 +0000 |
commit | eb6aedf92c0fe467fd4724623262907ad78573bb (patch) | |
tree | 69b1f04952ffd7dd82b6643a56f1bc4e34c2087b /synapse/push/__init__.py | |
parent | Merge branch 'develop' into pushers (diff) | |
download | synapse-eb6aedf92c0fe467fd4724623262907ad78573bb.tar.xz |
More work on pushers. Attempt to do HTTP pokes. Not sure if the actual HTTP pokes work or not yet but the retry semantics are pretty good.
Diffstat (limited to 'synapse/push/__init__.py')
-rw-r--r-- | synapse/push/__init__.py | 58 |
1 files changed, 48 insertions, 10 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index df0b91a8e9..a96f0f0183 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,12 +26,15 @@ logger = logging.getLogger(__name__) class Pusher(object): INITIAL_BACKOFF = 1000 - MAX_BACKOFF = 10 * 60 * 1000 + MAX_BACKOFF = 60 * 60 * 1000 + GIVE_UP_AFTER = 24 * 60 * 60 * 1000 - def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token): + def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, + last_token, last_success, failing_since): self.hs = _hs self.evStreamHandler = self.hs.get_handlers().event_stream_handler self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() self.user_name = user_name self.app = app self.app_display_name = app_display_name @@ -40,6 +43,7 @@ class Pusher(object): self.data = data self.last_token = last_token self.backoff_delay = Pusher.INITIAL_BACKOFF + self.failing_since = None @defer.inlineCallbacks def start(self): @@ -58,17 +62,51 @@ class Pusher(object): config = PaginationConfig(from_token=from_tok, limit='1') chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000) - if (self.dispatchPush(chunk['chunk'][0])): + # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event + singleEvent = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + singleEvent = c + break + if not singleEvent: + continue + + ret = yield self.dispatchPush(singleEvent) + if (ret): self.backoff_delay = Pusher.INITIAL_BACKOFF self.last_token = chunk['end'] - self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token) + self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey, + self.last_token, self.clock.time_msec()) + if self.failing_since: + self.failing_since = None + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) else: - logger.warn("Failed to dispatch push for user %s. Trying again in %dms", - self.user_name, self.backoff_delay) - yield synapse.util.async.sleep(self.backoff_delay / 1000.0) - self.backoff_delay *=2 - if self.backoff_delay > Pusher.MAX_BACKOFF: - self.backoff_delay = Pusher.MAX_BACKOFF + if not self.failing_since: + self.failing_since = self.clock.time_msec() + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) + + if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER: + # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, pushkey %s", + self.user_name, self.pushkey) + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token) + + self.failing_since = None + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) + else: + logger.warn("Failed to dispatch push for user %s (failing for %dms)." + "Trying again in %dms", + self.user_name, + self.clock.time_msec() - self.failing_since, + self.backoff_delay + ) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *=2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF class PusherConfigException(Exception): |