From 12623c99b6489e1ceefdc89341fa524b1215665d Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 13 Jan 2016 18:55:57 +0000 Subject: Use the unread notification count to send accurate badge counts in push notifications. --- synapse/push/__init__.py | 102 +++++++++++++++++++++++++++++++++++---------- synapse/push/httppusher.py | 14 +++---- synapse/push/pusherpool.py | 15 ------- 3 files changed, 86 insertions(+), 45 deletions(-) (limited to 'synapse') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index c5ddfb564c..abfb324fb4 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -16,7 +16,9 @@ from twisted.internet import defer from synapse.streams.config import PaginationConfig -from synapse.types import StreamToken +from synapse.types import StreamToken, UserID +from synapse.api.constants import Membership +from synapse.api.filtering import FilterCollection import synapse.util.async import push_rule_evaluator as push_rule_evaluator @@ -55,6 +57,7 @@ class Pusher(object): self.backoff_delay = Pusher.INITIAL_BACKOFF self.failing_since = failing_since self.alive = True + self.badge = None # The last value of last_active_time that we saw self.last_last_active_time = 0 @@ -92,8 +95,7 @@ class Pusher(object): # we fail to dispatch the push) config = PaginationConfig(from_token=None, limit='1') chunk = yield self.evStreamHandler.get_stream( - self.user_name, config, timeout=0, affect_presence=False, - only_room_events=True + self.user_name, config, timeout=0, affect_presence=False ) self.last_token = chunk['end'] self.store.update_pusher_last_token( @@ -125,20 +127,30 @@ class Pusher(object): config = PaginationConfig(from_token=from_tok, limit='1') timeout = (300 + random.randint(-60, 60)) * 1000 chunk = yield self.evStreamHandler.get_stream( - self.user_name, config, timeout=timeout, affect_presence=False, - only_room_events=True + self.user_name, config, timeout=timeout, affect_presence=False ) # limiting to 1 may get 1 event plus 1 presence event, so # pick out the actual event single_event = None + read_receipt = None for c in chunk['chunk']: if 'event_id' in c: # Hmmm... single_event = c - break + elif c['type'] == 'm.receipt': + read_receipt = c + + have_updated_badge = False + if read_receipt: + for receipt_part in read_receipt['content'].values(): + if 'm.read' in receipt_part: + if self.user_name in receipt_part['m.read'].keys(): + have_updated_badge = True + if not single_event: + if have_updated_badge: + yield self.update_badge() self.last_token = chunk['end'] - logger.debug("Event stream timeout for pushkey %s", self.pushkey) yield self.store.update_pusher_last_token( self.app_id, self.pushkey, @@ -161,7 +173,8 @@ class Pusher(object): tweaks = rule_evaluator.tweaks_for_actions(actions) if 'notify' in actions: - rejected = yield self.dispatch_push(single_event, tweaks) + self.badge = yield self._get_badge_count() + rejected = yield self.dispatch_push(single_event, tweaks, self.badge) self.has_unread = True if isinstance(rejected, list) or isinstance(rejected, tuple): processed = True @@ -182,6 +195,8 @@ class Pusher(object): self.app_id, pk, self.user_name ) else: + if have_updated_badge: + yield self.update_badge() processed = True if not self.alive: @@ -254,7 +269,7 @@ class Pusher(object): def stop(self): self.alive = False - def dispatch_push(self, p, tweaks): + def dispatch_push(self, p, tweaks, badge): """ Overridden by implementing classes to actually deliver the notification Args: @@ -266,23 +281,64 @@ class Pusher(object): """ pass - def reset_badge_count(self): - pass + @defer.inlineCallbacks + def update_badge(self): + new_badge = yield self._get_badge_count() + if self.badge != new_badge: + self.badge = new_badge + yield self.send_badge(self.badge) - def presence_changed(self, state): + def send_badge(self, badge): """ - We clear badge counts whenever a user's last_active time is bumped - This is by no means perfect but I think it's the best we can do - without read receipts. + Overridden by implementing classes to send an updated badge count """ - if 'last_active' in state.state: - last_active = state.state['last_active'] - if last_active > self.last_last_active_time: - self.last_last_active_time = last_active - if self.has_unread: - logger.info("Resetting badge count for %s", self.user_name) - self.reset_badge_count() - self.has_unread = False + pass + + @defer.inlineCallbacks + def _get_badge_count(self): + membership_list = (Membership.INVITE, Membership.JOIN) + + room_list = yield self.store.get_rooms_for_user_where_membership_is( + user_id=self.user_name, + membership_list=membership_list + ) + + user_is_guest = yield self.store.is_guest(UserID.from_string(self.user_name)) + + # XXX: importing inside method to break circular dependency. + # should sort out the mess by moving all this logic out of + # push/__init__.py and probably moving the logic we use from the sync + # handler to somewhere more amenable to re-use. + from synapse.handlers.sync import SyncConfig + sync_config = SyncConfig( + user=UserID.from_string(self.user_name), + filter=FilterCollection({}), + is_guest=user_is_guest, + ) + now_token = yield self.hs.get_event_sources().get_current_token() + sync_handler = self.hs.get_handlers().sync_handler + _, ephemeral_by_room = yield sync_handler.ephemeral_by_room( + sync_config, now_token + ) + + badge = 0 + + for r in room_list: + if r.membership == Membership.INVITE: + badge += 1 + else: + last_unread_event_id = sync_handler.last_read_event_id_for_room_and_user( + r.room_id, self.user_name, ephemeral_by_room + ) + + if last_unread_event_id: + notifs = yield ( + self.store.get_unread_event_push_actions_by_room_for_user( + r.room_id, self.user_name, last_unread_event_id + ) + ) + badge += len(notifs) + defer.returnValue(badge) class PusherConfigException(Exception): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 7866db6a24..acb687d114 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -51,7 +51,7 @@ class HttpPusher(Pusher): del self.data_minus_url['url'] @defer.inlineCallbacks - def _build_notification_dict(self, event, tweaks): + def _build_notification_dict(self, event, tweaks, badge): # we probably do not want to push for every presence update # (we may want to be able to set up notifications when specific # people sign in, but we'd want to only deliver the pertinent ones) @@ -71,7 +71,7 @@ class HttpPusher(Pusher): 'counts': { # -- we don't mark messages as read yet so # we have no way of knowing # Just set the badge to 1 until we have read receipts - 'unread': 1, + 'unread': badge, # 'missed_calls': 2 }, 'devices': [ @@ -101,8 +101,8 @@ class HttpPusher(Pusher): defer.returnValue(d) @defer.inlineCallbacks - def dispatch_push(self, event, tweaks): - notification_dict = yield self._build_notification_dict(event, tweaks) + def dispatch_push(self, event, tweaks, badge): + notification_dict = yield self._build_notification_dict(event, tweaks, badge) if not notification_dict: defer.returnValue([]) try: @@ -116,15 +116,15 @@ class HttpPusher(Pusher): defer.returnValue(rejected) @defer.inlineCallbacks - def reset_badge_count(self): + def send_badge(self, badge): + logger.info("Sending updated badge count %d to %r", badge, self.user_name) d = { 'notification': { 'id': '', 'type': None, 'sender': '', 'counts': { - 'unread': 0, - 'missed_calls': 0 + 'unread': badge }, 'devices': [ { diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 4208e5c76c..5d1179abf6 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -31,21 +31,6 @@ class PusherPool: self.pushers = {} self.last_pusher_started = -1 - distributor = self.hs.get_distributor() - distributor.observe( - "user_presence_changed", self.user_presence_changed - ) - - @defer.inlineCallbacks - def user_presence_changed(self, user, state): - user_name = user.to_string() - - # until we have read receipts, pushers use this to reset a user's - # badge counters to zero - for p in self.pushers.values(): - if p.user_name == user_name: - yield p.presence_changed(state) - @defer.inlineCallbacks def start(self): pushers = yield self.store.get_all_pushers() -- cgit 1.4.1 From d7265977376eb391007cde55c4b2d9b8f54d452b Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 20 Jan 2016 13:49:00 +0000 Subject: Simplify badge updating code by just updating it every time we get woken up and it's not an event --- synapse/push/__init__.py | 19 ++++--------------- 1 file changed, 4 insertions(+), 15 deletions(-) (limited to 'synapse') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 46c453b6c0..438341b518 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -126,6 +126,9 @@ class Pusher(object): from_tok = StreamToken.from_string(self.last_token) config = PaginationConfig(from_token=from_tok, limit='1') timeout = (300 + random.randint(-60, 60)) * 1000 + # note that we need to get read receipts down the stream as we need to + # wake up when one arrives. we don't need to explicitly look for + # them though. chunk = yield self.evStreamHandler.get_stream( self.user_id, config, timeout=timeout, affect_presence=False ) @@ -133,23 +136,12 @@ class Pusher(object): # limiting to 1 may get 1 event plus 1 presence event, so # pick out the actual event single_event = None - read_receipt = None for c in chunk['chunk']: if 'event_id' in c: # Hmmm... single_event = c - elif c['type'] == 'm.receipt': - read_receipt = c - - have_updated_badge = False - if read_receipt: - for receipt_part in read_receipt['content'].values(): - if 'm.read' in receipt_part: - if self.user_id in receipt_part['m.read'].keys(): - have_updated_badge = True if not single_event: - if have_updated_badge: - yield self.update_badge() + yield self.update_badge() self.last_token = chunk['end'] yield self.store.update_pusher_last_token( self.app_id, @@ -194,9 +186,6 @@ class Pusher(object): yield self.hs.get_pusherpool().remove_pusher( self.app_id, pk, self.user_id ) - else: - if have_updated_badge: - yield self.update_badge() processed = True if not self.alive: -- cgit 1.4.1 From 7cc047455e3f0893569853edbafa19bfe2b5d4ed Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 20 Jan 2016 13:50:28 +0000 Subject: Inline membership specifier --- synapse/push/__init__.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index 438341b518..ffae02a285 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -285,11 +285,9 @@ class Pusher(object): @defer.inlineCallbacks def _get_badge_count(self): - membership_list = (Membership.INVITE, Membership.JOIN) - room_list = yield self.store.get_rooms_for_user_where_membership_is( user_id=self.user_id, - membership_list=membership_list + membership_list=(Membership.INVITE, Membership.JOIN) ) user_is_guest = yield self.store.is_guest(self.user_id) -- cgit 1.4.1 From 3fa344c0376045d1be396701978ba46c552065b6 Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 20 Jan 2016 15:30:31 +0000 Subject: Add storage function to get all receipts for a user. Also add some cache invalidation to the receipts storage because there wasn't any, and remove a method that was unused. --- synapse/push/__init__.py | 28 +++++------------------ synapse/storage/receipts.py | 55 ++++++++++++++++++++++++++++----------------- 2 files changed, 40 insertions(+), 43 deletions(-) (limited to 'synapse') diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index ffae02a285..9a4af2b3ca 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -16,9 +16,8 @@ from twisted.internet import defer from synapse.streams.config import PaginationConfig -from synapse.types import StreamToken, UserID +from synapse.types import StreamToken from synapse.api.constants import Membership -from synapse.api.filtering import FilterCollection import synapse.util.async import push_rule_evaluator as push_rule_evaluator @@ -290,22 +289,9 @@ class Pusher(object): membership_list=(Membership.INVITE, Membership.JOIN) ) - user_is_guest = yield self.store.is_guest(self.user_id) - - # XXX: importing inside method to break circular dependency. - # should sort out the mess by moving all this logic out of - # push/__init__.py and probably moving the logic we use from the sync - # handler to somewhere more amenable to re-use. - from synapse.handlers.sync import SyncConfig - sync_config = SyncConfig( - user=UserID.from_string(self.user_id), - filter=FilterCollection({}), - is_guest=user_is_guest, - ) - now_token = yield self.hs.get_event_sources().get_current_token() - sync_handler = self.hs.get_handlers().sync_handler - _, ephemeral_by_room = yield sync_handler.ephemeral_by_room( - sync_config, now_token + my_receipts_by_room = yield self.store.get_receipts_for_user( + self.user_id, + "m.read", ) badge = 0 @@ -314,11 +300,9 @@ class Pusher(object): if r.membership == Membership.INVITE: badge += 1 else: - last_unread_event_id = sync_handler.last_read_event_id_for_room_and_user( - r.room_id, self.user_id, ephemeral_by_room - ) + if r.room_id in my_receipts_by_room: + last_unread_event_id = my_receipts_by_room[r.room_id] - if last_unread_event_id: notifs = yield ( self.store.get_unread_event_push_actions_by_room_for_user( r.room_id, self.user_id, last_unread_event_id diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index c80e576620..018140f47a 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -45,6 +45,21 @@ class ReceiptsStore(SQLBaseStore): desc="get_receipts_for_room", ) + @cachedInlineCallbacks(num_args=2) + def get_receipts_for_user(self, user_id, receipt_type): + def f(txn): + sql = ( + "SELECT room_id,event_id " + "FROM receipts_linearized " + "WHERE user_id = ? AND receipt_type = ? " + ) + txn.execute(sql, (user_id, receipt_type)) + return txn.fetchall() + + defer.returnValue(dict( + (yield self.runInteraction("get_receipts_for_user", f)) + )) + @defer.inlineCallbacks def get_linearized_receipts_for_rooms(self, room_ids, to_key, from_key=None): """Get receipts for multiple rooms for sending to clients. @@ -194,29 +209,16 @@ class ReceiptsStore(SQLBaseStore): def get_max_receipt_stream_id(self): return self._receipts_id_gen.get_max_token(self) - @cachedInlineCallbacks() - def get_graph_receipts_for_room(self, room_id): - """Get receipts for sending to remote servers. - """ - rows = yield self._simple_select_list( - table="receipts_graph", - keyvalues={"room_id": room_id}, - retcols=["receipt_type", "user_id", "event_id"], - desc="get_linearized_receipts_for_room", - ) - - result = {} - for row in rows: - result.setdefault( - row["user_id"], {} - ).setdefault( - row["receipt_type"], [] - ).append(row["event_id"]) - - defer.returnValue(result) - def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): + txn.call_after( + self.get_receipts_for_room.invalidate, (room_id, receipt_type) + ) + txn.call_after( + self.get_receipts_for_user.invalidate, (user_id, receipt_type) + ) + # FIXME: This shouldn't invalidate the whole cache + txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) # We don't want to clobber receipts for more recent events, so we # have to compare orderings of existing receipts @@ -324,6 +326,7 @@ class ReceiptsStore(SQLBaseStore): ) max_persisted_id = yield self._stream_id_gen.get_max_token(self) + defer.returnValue((stream_id, max_persisted_id)) def insert_graph_receipt(self, room_id, receipt_type, user_id, event_ids, @@ -336,6 +339,16 @@ class ReceiptsStore(SQLBaseStore): def insert_graph_receipt_txn(self, txn, room_id, receipt_type, user_id, event_ids, data): + txn.call_after( + self.get_receipts_for_room.invalidate, (room_id, receipt_type) + ) + txn.call_after( + self.get_receipts_for_user.invalidate, (user_id, receipt_type) + ) + # FIXME: This shouldn't invalidate the whole cache + txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) + + self._simple_delete_txn( txn, table="receipts_graph", -- cgit 1.4.1 From d4315bbf6bf2f7ab979761df10ad1d32d07a89fa Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 20 Jan 2016 15:33:27 +0000 Subject: Add index by user id on receipts_linearized --- .../storage/schema/delta/28/receipts_user_id_index.sql | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 synapse/storage/schema/delta/28/receipts_user_id_index.sql (limited to 'synapse') diff --git a/synapse/storage/schema/delta/28/receipts_user_id_index.sql b/synapse/storage/schema/delta/28/receipts_user_id_index.sql new file mode 100644 index 0000000000..452a1b3c6c --- /dev/null +++ b/synapse/storage/schema/delta/28/receipts_user_id_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2015, 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +CREATE INDEX receipts_linearized_user ON receipts_linearized( + user_id +); -- cgit 1.4.1 From 367cfab4e633c892e0d662e3abcd8e1a9c7f2daf Mon Sep 17 00:00:00 2001 From: David Baker Date: Wed, 20 Jan 2016 16:05:09 +0000 Subject: peppate --- synapse/storage/receipts.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index 018140f47a..c4232bdc65 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -57,7 +57,7 @@ class ReceiptsStore(SQLBaseStore): return txn.fetchall() defer.returnValue(dict( - (yield self.runInteraction("get_receipts_for_user", f)) + (yield self.runInteraction("get_receipts_for_user", f)) )) @defer.inlineCallbacks @@ -212,7 +212,7 @@ class ReceiptsStore(SQLBaseStore): def insert_linearized_receipt_txn(self, txn, room_id, receipt_type, user_id, event_id, data, stream_id): txn.call_after( - self.get_receipts_for_room.invalidate, (room_id, receipt_type) + self.get_receipts_for_room.invalidate, (room_id, receipt_type) ) txn.call_after( self.get_receipts_for_user.invalidate, (user_id, receipt_type) @@ -348,7 +348,6 @@ class ReceiptsStore(SQLBaseStore): # FIXME: This shouldn't invalidate the whole cache txn.call_after(self.get_linearized_receipts_for_room.invalidate_all) - self._simple_delete_txn( txn, table="receipts_graph", -- cgit 1.4.1