diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index b9522a8050..8da2d8716c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -128,7 +128,8 @@ class Pusher(object):
try:
if wait > 0:
yield synapse.util.async.sleep(wait)
- yield self.get_and_dispatch()
+ with Measure(self.clock, "push"):
+ yield self.get_and_dispatch()
wait = 0
except:
if wait == 0:
@@ -150,27 +151,115 @@ class Pusher(object):
only_keys=("room", "receipt",),
)
- with Measure(self.clock, "push"):
- # limiting to 1 may get 1 event plus 1 presence event, so
- # pick out the actual event
- single_event = None
- read_receipt = None
- for c in chunk['chunk']:
- if 'event_id' in c: # Hmmm...
- single_event = c
- elif c['type'] == 'm.receipt':
- read_receipt = c
-
- have_updated_badge = False
- if read_receipt:
- for receipt_part in read_receipt['content'].values():
- if 'm.read' in receipt_part:
- if self.user_id in receipt_part['m.read'].keys():
- have_updated_badge = True
-
- if not single_event:
- if have_updated_badge:
- yield self.update_badge()
+ # limiting to 1 may get 1 event plus 1 presence event, so
+ # pick out the actual event
+ single_event = None
+ read_receipt = None
+ for c in chunk['chunk']:
+ if 'event_id' in c: # Hmmm...
+ single_event = c
+ elif c['type'] == 'm.receipt':
+ read_receipt = c
+
+ have_updated_badge = False
+ if read_receipt:
+ for receipt_part in read_receipt['content'].values():
+ if 'm.read' in receipt_part:
+ if self.user_id in receipt_part['m.read'].keys():
+ have_updated_badge = True
+
+ if not single_event:
+ if have_updated_badge:
+ yield self.update_badge()
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_token
+ )
+ return
+
+ if not self.alive:
+ return
+
+ processed = False
+
+ rule_evaluator = yield \
+ push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
+ self.user_id, self.profile_tag, single_event['room_id'], self.store
+ )
+
+ actions = yield rule_evaluator.actions_for_event(single_event)
+ tweaks = rule_evaluator.tweaks_for_actions(actions)
+
+ if 'notify' in actions:
+ self.badge = yield self._get_badge_count()
+ rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
+ self.has_unread = True
+ if isinstance(rejected, list) or isinstance(rejected, tuple):
+ processed = True
+ for pk in rejected:
+ if pk != self.pushkey:
+ # for sanity, we only remove the pushkey if it
+ # was the one we actually sent...
+ logger.warn(
+ ("Ignoring rejected pushkey %s because we"
+ " didn't send it"), pk
+ )
+ else:
+ logger.info(
+ "Pushkey %s was rejected: removing",
+ pk
+ )
+ yield self.hs.get_pusherpool().remove_pusher(
+ self.app_id, pk, self.user_id
+ )
+ else:
+ if have_updated_badge:
+ yield self.update_badge()
+ processed = True
+
+ if not self.alive:
+ return
+
+ if processed:
+ self.backoff_delay = Pusher.INITIAL_BACKOFF
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token_and_success(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_token,
+ self.clock.time_msec()
+ )
+ if self.failing_since:
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since)
+ else:
+ if not self.failing_since:
+ self.failing_since = self.clock.time_msec()
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since
+ )
+
+ if (self.failing_since and
+ self.failing_since <
+ self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
+ # we really only give up so that if the URL gets
+ # fixed, we don't suddenly deliver a load
+ # of old notifications.
+ logger.warn("Giving up on a notification to user %s, "
+ "pushkey %s",
+ self.user_id, self.pushkey)
+ self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
yield self.store.update_pusher_last_token(
self.app_id,
@@ -178,114 +267,25 @@ class Pusher(object):
self.user_id,
self.last_token
)
- return
-
- if not self.alive:
- return
-
- processed = False
-
- rule_evaluator = yield \
- push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
- self.user_id, self.profile_tag, single_event['room_id'], self.store
- )
- actions = yield rule_evaluator.actions_for_event(single_event)
- tweaks = rule_evaluator.tweaks_for_actions(actions)
-
- if 'notify' in actions:
- self.badge = yield self._get_badge_count()
- rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
- self.has_unread = True
- if isinstance(rejected, list) or isinstance(rejected, tuple):
- processed = True
- for pk in rejected:
- if pk != self.pushkey:
- # for sanity, we only remove the pushkey if it
- # was the one we actually sent...
- logger.warn(
- ("Ignoring rejected pushkey %s because we"
- " didn't send it"), pk
- )
- else:
- logger.info(
- "Pushkey %s was rejected: removing",
- pk
- )
- yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk, self.user_id
- )
- else:
- if have_updated_badge:
- yield self.update_badge()
- processed = True
-
- if not self.alive:
- return
-
- if processed:
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token_and_success(
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
self.user_id,
- self.last_token,
- self.clock.time_msec()
+ self.failing_since
)
- if self.failing_since:
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since)
else:
- if not self.failing_since:
- self.failing_since = self.clock.time_msec()
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
-
- if (self.failing_since and
- self.failing_since <
- self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
- # we really only give up so that if the URL gets
- # fixed, we don't suddenly deliver a load
- # of old notifications.
- logger.warn("Giving up on a notification to user %s, "
- "pushkey %s",
- self.user_id, self.pushkey)
- self.backoff_delay = Pusher.INITIAL_BACKOFF
- self.last_token = chunk['end']
- yield self.store.update_pusher_last_token(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.last_token
- )
-
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id,
- self.pushkey,
- self.user_id,
- self.failing_since
- )
- else:
- logger.warn("Failed to dispatch push for user %s "
- "(failing for %dms)."
- "Trying again in %dms",
- self.user_id,
- self.clock.time_msec() - self.failing_since,
- self.backoff_delay)
- yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
- self.backoff_delay *= 2
- if self.backoff_delay > Pusher.MAX_BACKOFF:
- self.backoff_delay = Pusher.MAX_BACKOFF
+ logger.warn("Failed to dispatch push for user %s "
+ "(failing for %dms)."
+ "Trying again in %dms",
+ self.user_id,
+ self.clock.time_msec() - self.failing_since,
+ self.backoff_delay)
+ yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+ self.backoff_delay *= 2
+ if self.backoff_delay > Pusher.MAX_BACKOFF:
+ self.backoff_delay = Pusher.MAX_BACKOFF
def stop(self):
self.alive = False
|