diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index c5ddfb564c..abfb324fb4 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -16,7 +16,9 @@
from twisted.internet import defer
from synapse.streams.config import PaginationConfig
-from synapse.types import StreamToken
+from synapse.types import StreamToken, UserID
+from synapse.api.constants import Membership
+from synapse.api.filtering import FilterCollection
import synapse.util.async
import push_rule_evaluator as push_rule_evaluator
@@ -55,6 +57,7 @@ class Pusher(object):
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.failing_since = failing_since
self.alive = True
+ self.badge = None
# The last value of last_active_time that we saw
self.last_last_active_time = 0
@@ -92,8 +95,7 @@ class Pusher(object):
# we fail to dispatch the push)
config = PaginationConfig(from_token=None, limit='1')
chunk = yield self.evStreamHandler.get_stream(
- self.user_name, config, timeout=0, affect_presence=False,
- only_room_events=True
+ self.user_name, config, timeout=0, affect_presence=False
)
self.last_token = chunk['end']
self.store.update_pusher_last_token(
@@ -125,20 +127,30 @@ class Pusher(object):
config = PaginationConfig(from_token=from_tok, limit='1')
timeout = (300 + random.randint(-60, 60)) * 1000
chunk = yield self.evStreamHandler.get_stream(
- self.user_name, config, timeout=timeout, affect_presence=False,
- only_room_events=True
+ self.user_name, config, timeout=timeout, affect_presence=False
)
# limiting to 1 may get 1 event plus 1 presence event, so
# pick out the actual event
single_event = None
+ read_receipt = None
for c in chunk['chunk']:
if 'event_id' in c: # Hmmm...
single_event = c
- break
+ elif c['type'] == 'm.receipt':
+ read_receipt = c
+
+ have_updated_badge = False
+ if read_receipt:
+ for receipt_part in read_receipt['content'].values():
+ if 'm.read' in receipt_part:
+ if self.user_name in receipt_part['m.read'].keys():
+ have_updated_badge = True
+
if not single_event:
+ if have_updated_badge:
+ yield self.update_badge()
self.last_token = chunk['end']
- logger.debug("Event stream timeout for pushkey %s", self.pushkey)
yield self.store.update_pusher_last_token(
self.app_id,
self.pushkey,
@@ -161,7 +173,8 @@ class Pusher(object):
tweaks = rule_evaluator.tweaks_for_actions(actions)
if 'notify' in actions:
- rejected = yield self.dispatch_push(single_event, tweaks)
+ self.badge = yield self._get_badge_count()
+ rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
self.has_unread = True
if isinstance(rejected, list) or isinstance(rejected, tuple):
processed = True
@@ -182,6 +195,8 @@ class Pusher(object):
self.app_id, pk, self.user_name
)
else:
+ if have_updated_badge:
+ yield self.update_badge()
processed = True
if not self.alive:
@@ -254,7 +269,7 @@ class Pusher(object):
def stop(self):
self.alive = False
- def dispatch_push(self, p, tweaks):
+ def dispatch_push(self, p, tweaks, badge):
"""
Overridden by implementing classes to actually deliver the notification
Args:
@@ -266,23 +281,64 @@ class Pusher(object):
"""
pass
- def reset_badge_count(self):
- pass
+ @defer.inlineCallbacks
+ def update_badge(self):
+ new_badge = yield self._get_badge_count()
+ if self.badge != new_badge:
+ self.badge = new_badge
+ yield self.send_badge(self.badge)
- def presence_changed(self, state):
+ def send_badge(self, badge):
"""
- We clear badge counts whenever a user's last_active time is bumped
- This is by no means perfect but I think it's the best we can do
- without read receipts.
+ Overridden by implementing classes to send an updated badge count
"""
- if 'last_active' in state.state:
- last_active = state.state['last_active']
- if last_active > self.last_last_active_time:
- self.last_last_active_time = last_active
- if self.has_unread:
- logger.info("Resetting badge count for %s", self.user_name)
- self.reset_badge_count()
- self.has_unread = False
+ pass
+
+ @defer.inlineCallbacks
+ def _get_badge_count(self):
+ membership_list = (Membership.INVITE, Membership.JOIN)
+
+ room_list = yield self.store.get_rooms_for_user_where_membership_is(
+ user_id=self.user_name,
+ membership_list=membership_list
+ )
+
+ user_is_guest = yield self.store.is_guest(UserID.from_string(self.user_name))
+
+ # XXX: importing inside method to break circular dependency.
+ # should sort out the mess by moving all this logic out of
+ # push/__init__.py and probably moving the logic we use from the sync
+ # handler to somewhere more amenable to re-use.
+ from synapse.handlers.sync import SyncConfig
+ sync_config = SyncConfig(
+ user=UserID.from_string(self.user_name),
+ filter=FilterCollection({}),
+ is_guest=user_is_guest,
+ )
+ now_token = yield self.hs.get_event_sources().get_current_token()
+ sync_handler = self.hs.get_handlers().sync_handler
+ _, ephemeral_by_room = yield sync_handler.ephemeral_by_room(
+ sync_config, now_token
+ )
+
+ badge = 0
+
+ for r in room_list:
+ if r.membership == Membership.INVITE:
+ badge += 1
+ else:
+ last_unread_event_id = sync_handler.last_read_event_id_for_room_and_user(
+ r.room_id, self.user_name, ephemeral_by_room
+ )
+
+ if last_unread_event_id:
+ notifs = yield (
+ self.store.get_unread_event_push_actions_by_room_for_user(
+ r.room_id, self.user_name, last_unread_event_id
+ )
+ )
+ badge += len(notifs)
+ defer.returnValue(badge)
class PusherConfigException(Exception):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 7866db6a24..acb687d114 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -51,7 +51,7 @@ class HttpPusher(Pusher):
del self.data_minus_url['url']
@defer.inlineCallbacks
- def _build_notification_dict(self, event, tweaks):
+ def _build_notification_dict(self, event, tweaks, badge):
# we probably do not want to push for every presence update
# (we may want to be able to set up notifications when specific
# people sign in, but we'd want to only deliver the pertinent ones)
@@ -71,7 +71,7 @@ class HttpPusher(Pusher):
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
- 'unread': 1,
+ 'unread': badge,
# 'missed_calls': 2
},
'devices': [
@@ -101,8 +101,8 @@ class HttpPusher(Pusher):
defer.returnValue(d)
@defer.inlineCallbacks
- def dispatch_push(self, event, tweaks):
- notification_dict = yield self._build_notification_dict(event, tweaks)
+ def dispatch_push(self, event, tweaks, badge):
+ notification_dict = yield self._build_notification_dict(event, tweaks, badge)
if not notification_dict:
defer.returnValue([])
try:
@@ -116,15 +116,15 @@ class HttpPusher(Pusher):
defer.returnValue(rejected)
@defer.inlineCallbacks
- def reset_badge_count(self):
+ def send_badge(self, badge):
+ logger.info("Sending updated badge count %d to %r", badge, self.user_name)
d = {
'notification': {
'id': '',
'type': None,
'sender': '',
'counts': {
- 'unread': 0,
- 'missed_calls': 0
+ 'unread': badge
},
'devices': [
{
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 4208e5c76c..5d1179abf6 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -31,21 +31,6 @@ class PusherPool:
self.pushers = {}
self.last_pusher_started = -1
- distributor = self.hs.get_distributor()
- distributor.observe(
- "user_presence_changed", self.user_presence_changed
- )
-
- @defer.inlineCallbacks
- def user_presence_changed(self, user, state):
- user_name = user.to_string()
-
- # until we have read receipts, pushers use this to reset a user's
- # badge counters to zero
- for p in self.pushers.values():
- if p.user_name == user_name:
- yield p.presence_changed(state)
-
@defer.inlineCallbacks
def start(self):
pushers = yield self.store.get_all_pushers()
|