summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/push/__init__.py245
1 files changed, 133 insertions, 112 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index 8059fff1b2..36f450c31d 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -24,6 +24,7 @@ import baserules
 import logging
 import simplejson as json
 import re
+import random
 
 logger = logging.getLogger(__name__)
 
@@ -256,134 +257,154 @@ class Pusher(object):
             logger.info("Pusher %s for user %s starting from token %s",
                         self.pushkey, self.user_name, self.last_token)
 
+        wait = 0
         while self.alive:
-            from_tok = StreamToken.from_string(self.last_token)
-            config = PaginationConfig(from_token=from_tok, limit='1')
-            chunk = yield self.evStreamHandler.get_stream(
-                self.user_name, config,
-                timeout=100*365*24*60*60*1000, affect_presence=False
-            )
+            try:
+                if wait > 0:
+                    yield synapse.util.async.sleep(wait)
+                yield self.get_and_dispatch()
+                wait = 0
+            except:
+                if wait == 0:
+                    wait = 1
+                else:
+                    wait = min(wait * 2, 1800)
+                logger.exception(
+                    "Exception in pusher loop for pushkey %s. Pausing for %ds",
+                    self.pushkey, wait
+                )
 
-            # limiting to 1 may get 1 event plus 1 presence event, so
-            # pick out the actual event
-            single_event = None
-            for c in chunk['chunk']:
-                if 'event_id' in c:  # Hmmm...
-                    single_event = c
-                    break
-            if not single_event:
-                self.last_token = chunk['end']
-                continue
+    @defer.inlineCallbacks
+    def get_and_dispatch(self):
+        from_tok = StreamToken.from_string(self.last_token)
+        config = PaginationConfig(from_token=from_tok, limit='1')
+        timeout = (300 + random.randint(-60, 60)) * 1000
+        chunk = yield self.evStreamHandler.get_stream(
+            self.user_name, config,
+            timeout=timeout, affect_presence=False
+        )
 
-            if not self.alive:
-                continue
+        # limiting to 1 may get 1 event plus 1 presence event, so
+        # pick out the actual event
+        single_event = None
+        for c in chunk['chunk']:
+            if 'event_id' in c:  # Hmmm...
+                single_event = c
+                break
+        if not single_event:
+            self.last_token = chunk['end']
+            logger.debug("Event stream timeout for pushkey %s", self.pushkey)
+            return
 
-            processed = False
-            actions = yield self._actions_for_event(single_event)
-            tweaks = _tweaks_for_actions(actions)
+        if not self.alive:
+            return
 
-            if len(actions) == 0:
-                logger.warn("Empty actions! Using default action.")
-                actions = Pusher.DEFAULT_ACTIONS
+        processed = False
+        actions = yield self._actions_for_event(single_event)
+        tweaks = _tweaks_for_actions(actions)
 
-            if 'notify' not in actions and 'dont_notify' not in actions:
-                logger.warn("Neither notify nor dont_notify in actions: adding default")
-                actions.extend(Pusher.DEFAULT_ACTIONS)
+        if len(actions) == 0:
+            logger.warn("Empty actions! Using default action.")
+            actions = Pusher.DEFAULT_ACTIONS
 
-            if 'dont_notify' in actions:
-                logger.debug(
-                    "%s for %s: dont_notify",
-                    single_event['event_id'], self.user_name
-                )
+        if 'notify' not in actions and 'dont_notify' not in actions:
+            logger.warn("Neither notify nor dont_notify in actions: adding default")
+            actions.extend(Pusher.DEFAULT_ACTIONS)
+
+        if 'dont_notify' in actions:
+            logger.debug(
+                "%s for %s: dont_notify",
+                single_event['event_id'], self.user_name
+            )
+            processed = True
+        else:
+            rejected = yield self.dispatch_push(single_event, tweaks)
+            self.has_unread = True
+            if isinstance(rejected, list) or isinstance(rejected, tuple):
                 processed = True
-            else:
-                rejected = yield self.dispatch_push(single_event, tweaks)
-                self.has_unread = True
-                if isinstance(rejected, list) or isinstance(rejected, tuple):
-                    processed = True
-                    for pk in rejected:
-                        if pk != self.pushkey:
-                            # for sanity, we only remove the pushkey if it
-                            # was the one we actually sent...
-                            logger.warn(
-                                ("Ignoring rejected pushkey %s because we"
-                                 " didn't send it"), pk
-                            )
-                        else:
-                            logger.info(
-                                "Pushkey %s was rejected: removing",
-                                pk
-                            )
-                            yield self.hs.get_pusherpool().remove_pusher(
-                                self.app_id, pk, self.user_name
-                            )
-
-            if not self.alive:
-                continue
+                for pk in rejected:
+                    if pk != self.pushkey:
+                        # for sanity, we only remove the pushkey if it
+                        # was the one we actually sent...
+                        logger.warn(
+                            ("Ignoring rejected pushkey %s because we"
+                             " didn't send it"), pk
+                        )
+                    else:
+                        logger.info(
+                            "Pushkey %s was rejected: removing",
+                            pk
+                        )
+                        yield self.hs.get_pusherpool().remove_pusher(
+                            self.app_id, pk, self.user_name
+                        )
+
+        if not self.alive:
+            return
+
+        if processed:
+            self.backoff_delay = Pusher.INITIAL_BACKOFF
+            self.last_token = chunk['end']
+            self.store.update_pusher_last_token_and_success(
+                self.app_id,
+                self.pushkey,
+                self.user_name,
+                self.last_token,
+                self.clock.time_msec()
+            )
+            if self.failing_since:
+                self.failing_since = None
+                self.store.update_pusher_failing_since(
+                    self.app_id,
+                    self.pushkey,
+                    self.user_name,
+                    self.failing_since)
+        else:
+            if not self.failing_since:
+                self.failing_since = self.clock.time_msec()
+                self.store.update_pusher_failing_since(
+                    self.app_id,
+                    self.pushkey,
+                    self.user_name,
+                    self.failing_since
+                )
 
-            if processed:
+            if (self.failing_since and
+               self.failing_since <
+               self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
+                # we really only give up so that if the URL gets
+                # fixed, we don't suddenly deliver a load
+                # of old notifications.
+                logger.warn("Giving up on a notification to user %s, "
+                            "pushkey %s",
+                            self.user_name, self.pushkey)
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
-                self.store.update_pusher_last_token_and_success(
+                self.store.update_pusher_last_token(
+                    self.app_id,
+                    self.pushkey,
+                    self.user_name,
+                    self.last_token
+                )
+
+                self.failing_since = None
+                self.store.update_pusher_failing_since(
                     self.app_id,
                     self.pushkey,
                     self.user_name,
-                    self.last_token,
-                    self.clock.time_msec()
+                    self.failing_since
                 )
-                if self.failing_since:
-                    self.failing_since = None
-                    self.store.update_pusher_failing_since(
-                        self.app_id,
-                        self.pushkey,
-                        self.user_name,
-                        self.failing_since)
             else:
-                if not self.failing_since:
-                    self.failing_since = self.clock.time_msec()
-                    self.store.update_pusher_failing_since(
-                        self.app_id,
-                        self.pushkey,
-                        self.user_name,
-                        self.failing_since
-                    )
-
-                if (self.failing_since and
-                   self.failing_since <
-                   self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
-                    # we really only give up so that if the URL gets
-                    # fixed, we don't suddenly deliver a load
-                    # of old notifications.
-                    logger.warn("Giving up on a notification to user %s, "
-                                "pushkey %s",
-                                self.user_name, self.pushkey)
-                    self.backoff_delay = Pusher.INITIAL_BACKOFF
-                    self.last_token = chunk['end']
-                    self.store.update_pusher_last_token(
-                        self.app_id,
-                        self.pushkey,
-                        self.user_name,
-                        self.last_token
-                    )
-
-                    self.failing_since = None
-                    self.store.update_pusher_failing_since(
-                        self.app_id,
-                        self.pushkey,
-                        self.user_name,
-                        self.failing_since
-                    )
-                else:
-                    logger.warn("Failed to dispatch push for user %s "
-                                "(failing for %dms)."
-                                "Trying again in %dms",
-                                self.user_name,
-                                self.clock.time_msec() - self.failing_since,
-                                self.backoff_delay)
-                    yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
-                    self.backoff_delay *= 2
-                    if self.backoff_delay > Pusher.MAX_BACKOFF:
-                        self.backoff_delay = Pusher.MAX_BACKOFF
+                logger.warn("Failed to dispatch push for user %s "
+                            "(failing for %dms)."
+                            "Trying again in %dms",
+                            self.user_name,
+                            self.clock.time_msec() - self.failing_since,
+                            self.backoff_delay)
+                yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+                self.backoff_delay *= 2
+                if self.backoff_delay > Pusher.MAX_BACKOFF:
+                    self.backoff_delay = Pusher.MAX_BACKOFF
 
     def stop(self):
         self.alive = False