summary refs log tree commit diff
path: root/synapse/push/__init__.py
diff options
context:
space:
mode:
authorDavid Baker <dbkr@matrix.org>2014-11-21 12:21:00 +0000
committerDavid Baker <dbkr@matrix.org>2014-11-21 12:21:00 +0000
commiteb6aedf92c0fe467fd4724623262907ad78573bb (patch)
tree69b1f04952ffd7dd82b6643a56f1bc4e34c2087b /synapse/push/__init__.py
parentMerge branch 'develop' into pushers (diff)
downloadsynapse-eb6aedf92c0fe467fd4724623262907ad78573bb.tar.xz
More work on pushers. Attempt to do HTTP pokes. Not sure if the actual HTTP pokes work or not yet but the retry semantics are pretty good.
Diffstat (limited to 'synapse/push/__init__.py')
-rw-r--r--synapse/push/__init__.py58
1 files changed, 48 insertions, 10 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index df0b91a8e9..a96f0f0183 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -26,12 +26,15 @@ logger = logging.getLogger(__name__)
 
 class Pusher(object):
     INITIAL_BACKOFF = 1000
-    MAX_BACKOFF = 10 * 60 * 1000
+    MAX_BACKOFF = 60 * 60 * 1000
+    GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
-    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
+    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+                 last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
+        self.clock = self.hs.get_clock()
         self.user_name = user_name
         self.app = app
         self.app_display_name = app_display_name
@@ -40,6 +43,7 @@ class Pusher(object):
         self.data = data
         self.last_token = last_token
         self.backoff_delay = Pusher.INITIAL_BACKOFF
+        self.failing_since = None
 
     @defer.inlineCallbacks
     def start(self):
@@ -58,17 +62,51 @@ class Pusher(object):
             config = PaginationConfig(from_token=from_tok, limit='1')
             chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
 
-            if (self.dispatchPush(chunk['chunk'][0])):
+            # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event
+            singleEvent = None
+            for c in chunk['chunk']:
+                if 'event_id' in c: # Hmmm...
+                    singleEvent = c
+                    break
+            if not singleEvent:
+                continue
+
+            ret = yield self.dispatchPush(singleEvent)
+            if (ret):
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
-                self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+                self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey,
+                                                                self.last_token, self.clock.time_msec())
+                if self.failing_since:
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
             else:
-                logger.warn("Failed to dispatch push for user %s. Trying again in %dms",
-                            self.user_name, self.backoff_delay)
-                yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
-                self.backoff_delay *=2
-                if self.backoff_delay > Pusher.MAX_BACKOFF:
-                    self.backoff_delay = Pusher.MAX_BACKOFF
+                if not self.failing_since:
+                    self.failing_since = self.clock.time_msec()
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+
+                if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
+                    # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load
+                    # of old notifications.
+                    logger.warn("Giving up on a notification to user %s, pushkey %s",
+                                self.user_name, self.pushkey)
+                    self.backoff_delay = Pusher.INITIAL_BACKOFF
+                    self.last_token = chunk['end']
+                    self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+                else:
+                    logger.warn("Failed to dispatch push for user %s (failing for %dms)."
+                                "Trying again in %dms",
+                            self.user_name,
+                            self.clock.time_msec() - self.failing_since,
+                            self.backoff_delay
+                    )
+                    yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+                    self.backoff_delay *=2
+                    if self.backoff_delay > Pusher.MAX_BACKOFF:
+                        self.backoff_delay = Pusher.MAX_BACKOFF
 
 
 class PusherConfigException(Exception):