diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index e7c964bcd2..8da2d8716c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -1,5 +1,5 @@
# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@ from twisted.internet import defer
from synapse.streams.config import PaginationConfig
from synapse.types import StreamToken
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
import synapse.util.async
import push_rule_evaluator as push_rule_evaluator
@@ -27,12 +29,25 @@ import random
logger = logging.getLogger(__name__)
+_NEXT_ID = 1
+
+
+def _get_next_id():
+ global _NEXT_ID
+ _id = _NEXT_ID
+ _NEXT_ID += 1
+ return _id
+
+
+# Pushers could now be moved to pull out of the event_push_actions table instead
+# of listening on the event stream: this would avoid them having to run the
+# rules again.
class Pusher(object):
INITIAL_BACKOFF = 1000
MAX_BACKOFF = 60 * 60 * 1000
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
- def __init__(self, _hs, profile_tag, user_name, app_id,
+ def __init__(self, _hs, profile_tag, user_id, app_id,
app_display_name, device_display_name, pushkey, pushkey_ts,
data, last_token, last_success, failing_since):
self.hs = _hs
@@ -40,7 +55,7 @@ class Pusher(object):
self.store = self.hs.get_datastore()
self.clock = self.hs.get_clock()
self.profile_tag = profile_tag
- self.user_name = user_name
+ self.user_id = user_id
self.app_id = app_id
self.app_display_name = app_display_name
self.device_display_name = device_display_name
@@ -52,6 +67,9 @@ class Pusher(object):
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.failing_since = failing_since
self.alive = True
+ self.badge = None
+
+ self.name = "Pusher-%d" % (_get_next_id(),)
# The last value of last_active_time that we saw
self.last_last_active_time = 0
@@ -82,64 +100,82 @@ class Pusher(object):
@defer.inlineCallbacks
def start(self):
- if not self.last_token:
- # First-time setup: get a token to start from (we can't
- # just start from no token, ie. 'now'
- # because we need the result to be reproduceable in case
- # we fail to dispatch the push)
- config = PaginationConfig(from_token=None, limit='1')
- chunk = yield self.evStreamHandler.get_stream(
- self.user_name, config, timeout=0, affect_presence=False,
- only_room_events=True
- )
- self.last_token = chunk['end']
- self.store.update_pusher_last_token(
- self.app_id, self.pushkey, self.user_name, self.last_token
- )
- logger.info("Pusher %s for user %s starting from token %s",
- self.pushkey, self.user_name, self.last_token)
-
- wait = 0
- while self.alive:
- try:
- if wait > 0:
- yield synapse.util.async.sleep(wait)
- yield self.get_and_dispatch()
- wait = 0
- except:
- if wait == 0:
- wait = 1
- else:
- wait = min(wait * 2, 1800)
- logger.exception(
- "Exception in pusher loop for pushkey %s. Pausing for %ds",
- self.pushkey, wait
+ with LoggingContext(self.name):
+ if not self.last_token:
+ # First-time setup: get a token to start from (we can't
+ # just start from no token, ie. 'now'
+ # because we need the result to be reproduceable in case
+ # we fail to dispatch the push)
+ config = PaginationConfig(from_token=None, limit='1')
+ chunk = yield self.evStreamHandler.get_stream(
+ self.user_id, config, timeout=0, affect_presence=False
+ )
+ self.last_token = chunk['end']
+ yield self.store.update_pusher_last_token(
+ self.app_id, self.pushkey, self.user_id, self.last_token
+ )
+ logger.info("New pusher %s for user %s starting from token %s",
+ self.pushkey, self.user_id, self.last_token)
+
+ else:
+ logger.info(
+ "Old pusher %s for user %s starting",
+ self.pushkey, self.user_id,
)
+ wait = 0
+ while self.alive:
+ try:
+ if wait > 0:
+ yield synapse.util.async.sleep(wait)
+ with Measure(self.clock, "push"):
+ yield self.get_and_dispatch()
+ wait = 0
+ except:
+ if wait == 0:
+ wait = 1
+ else:
+ wait = min(wait * 2, 1800)
+ logger.exception(
+ "Exception in pusher loop for pushkey %s. Pausing for %ds",
+ self.pushkey, wait
+ )
+
@defer.inlineCallbacks
def get_and_dispatch(self):
from_tok = StreamToken.from_string(self.last_token)
config = PaginationConfig(from_token=from_tok, limit='1')
timeout = (300 + random.randint(-60, 60)) * 1000
chunk = yield self.evStreamHandler.get_stream(
- self.user_name, config, timeout=timeout, affect_presence=False,
- only_room_events=True
+ self.user_id, config, timeout=timeout, affect_presence=False,
+ only_keys=("room", "receipt",),
)
# limiting to 1 may get 1 event plus 1 presence event, so
# pick out the actual event
single_event = None
+ read_receipt = None
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
single_event = c
- break
+ elif c['type'] == 'm.receipt':
+ read_receipt = c
+
+ have_updated_badge = False
+ if read_receipt:
+ for receipt_part in read_receipt['content'].values():
+ if 'm.read' in receipt_part:
+ if self.user_id in receipt_part['m.read'].keys():
+ have_updated_badge = True
+
if not single_event:
+ if have_updated_badge:
+ yield self.update_badge()
self.last_token = chunk['end']
- logger.debug("Event stream timeout for pushkey %s", self.pushkey)
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.last_token
)
return
@@ -150,29 +186,16 @@ class Pusher(object):
processed = False
rule_evaluator = yield \
- push_rule_evaluator.evaluator_for_user_name_and_profile_tag(
- self.user_name, self.profile_tag, single_event['room_id'], self.store
+ push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
+ self.user_id, self.profile_tag, single_event['room_id'], self.store
)
actions = yield rule_evaluator.actions_for_event(single_event)
tweaks = rule_evaluator.tweaks_for_actions(actions)
- if len(actions) == 0:
- logger.warn("Empty actions! Using default action.")
- actions = Pusher.DEFAULT_ACTIONS
-
- if 'notify' not in actions and 'dont_notify' not in actions:
- logger.warn("Neither notify nor dont_notify in actions: adding default")
- actions.extend(Pusher.DEFAULT_ACTIONS)
-
- if 'dont_notify' in actions:
- logger.debug(
- "%s for %s: dont_notify",
- single_event['event_id'], self.user_name
- )
- processed = True
- else:
- rejected = yield self.dispatch_push(single_event, tweaks)
+ if 'notify' in actions:
+ self.badge = yield self._get_badge_count()
+ rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
@@ -190,8 +213,12 @@ class Pusher(object):
pk
)
yield self.hs.get_pusherpool().remove_pusher(
- self.app_id, pk, self.user_name
+ self.app_id, pk, self.user_id
)
+ else:
+ if have_updated_badge:
+ yield self.update_badge()
+ processed = True
if not self.alive:
return
@@ -202,7 +229,7 @@ class Pusher(object):
yield self.store.update_pusher_last_token_and_success(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.last_token,
self.clock.time_msec()
)
@@ -211,7 +238,7 @@ class Pusher(object):
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.failing_since)
else:
if not self.failing_since:
@@ -219,7 +246,7 @@ class Pusher(object):
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.failing_since
)
@@ -231,13 +258,13 @@ class Pusher(object):
# of old notifications.
logger.warn("Giving up on a notification to user %s, "
"pushkey %s",
- self.user_name, self.pushkey)
+ self.user_id, self.pushkey)
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.last_token
)
@@ -245,14 +272,14 @@ class Pusher(object):
yield self.store.update_pusher_failing_since(
self.app_id,
self.pushkey,
- self.user_name,
+ self.user_id,
self.failing_since
)
else:
logger.warn("Failed to dispatch push for user %s "
"(failing for %dms)."
"Trying again in %dms",
- self.user_name,
+ self.user_id,
self.clock.time_msec() - self.failing_since,
self.backoff_delay)
yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
@@ -263,7 +290,7 @@ class Pusher(object):
def stop(self):
self.alive = False
- def dispatch_push(self, p, tweaks):
+ def dispatch_push(self, p, tweaks, badge):
"""
Overridden by implementing classes to actually deliver the notification
Args:
@@ -275,23 +302,44 @@ class Pusher(object):
"""
pass
- def reset_badge_count(self):
- pass
+ @defer.inlineCallbacks
+ def update_badge(self):
+ new_badge = yield self._get_badge_count()
+ if self.badge != new_badge:
+ self.badge = new_badge
+ yield self.send_badge(self.badge)
- def presence_changed(self, state):
+ def send_badge(self, badge):
"""
- We clear badge counts whenever a user's last_active time is bumped
- This is by no means perfect but I think it's the best we can do
- without read receipts.
+ Overridden by implementing classes to send an updated badge count
"""
- if 'last_active' in state.state:
- last_active = state.state['last_active']
- if last_active > self.last_last_active_time:
- self.last_last_active_time = last_active
- if self.has_unread:
- logger.info("Resetting badge count for %s", self.user_name)
- self.reset_badge_count()
- self.has_unread = False
+ pass
+
+ @defer.inlineCallbacks
+ def _get_badge_count(self):
+ invites, joins = yield defer.gatherResults([
+ self.store.get_invites_for_user(self.user_id),
+ self.store.get_rooms_for_user(self.user_id),
+ ], consumeErrors=True)
+
+ my_receipts_by_room = yield self.store.get_receipts_for_user(
+ self.user_id,
+ "m.read",
+ )
+
+ badge = len(invites)
+
+ for r in joins:
+ if r.room_id in my_receipts_by_room:
+ last_unread_event_id = my_receipts_by_room[r.room_id]
+
+ notifs = yield (
+ self.store.get_unread_event_push_actions_by_room_for_user(
+ r.room_id, self.user_id, last_unread_event_id
+ )
+ )
+ badge += notifs["notify_count"]
+ defer.returnValue(badge)
class PusherConfigException(Exception):
|