From 771528ab1323715271b9e968d2d337b88910fb2f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Feb 2016 10:50:49 +0000 Subject: Change event_push_actions_rm_tokens schema --- synapse/storage/event_push_actions.py | 47 ++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 15 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index a05c4f84cf..aca3219206 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -37,7 +37,11 @@ class EventPushActionsStore(SQLBaseStore): 'event_id': event.event_id, 'user_id': uid, 'profile_tag': profile_tag, - 'actions': json.dumps(actions) + 'actions': json.dumps(actions), + 'stream_ordering': event.internal_metadata.stream_ordering, + 'topological_ordering': event.depth, + 'notif': 1, + 'highlight': 1 if _action_has_highlight(actions) else 0, }) def f(txn): @@ -74,26 +78,28 @@ class EventPushActionsStore(SQLBaseStore): topological_ordering = results[0][1] sql = ( - "SELECT ea.event_id, ea.actions" - " FROM event_push_actions ea, events e" - " WHERE ea.room_id = e.room_id" - " AND ea.event_id = e.event_id" - " AND ea.user_id = ?" - " AND ea.room_id = ?" + "SELECT sum(notif), sum(highlight)" + " FROM event_push_actions ea" + " WHERE" + " user_id = ?" + " AND room_id = ?" " AND (" - " e.topological_ordering > ?" - " OR (e.topological_ordering = ? AND e.stream_ordering > ?)" + " topological_ordering > ?" + " OR (topological_ordering = ? AND stream_ordering > ?)" ")" ) txn.execute(sql, ( user_id, room_id, topological_ordering, topological_ordering, stream_ordering - ) - ) - return [ - {"event_id": row[0], "actions": json.loads(row[1])} - for row in txn.fetchall() - ] + )) + row = txn.fetchone() + if row: + return { + "notify_count": row[0] or 0, + "highlight_count": row[1] or 0, + } + else: + return {"notify_count": 0, "highlight_count": 0} ret = yield self.runInteraction( "get_unread_event_push_actions_by_room", @@ -117,3 +123,14 @@ class EventPushActionsStore(SQLBaseStore): "remove_push_actions_for_event_id", f ) + + +def _action_has_highlight(actions): + for action in actions: + try: + if action.get("set_tweak", None) == "highlight": + return action.get("value", True) + except AttributeError: + pass + + return False -- cgit 1.5.1 From aa4af94c69b8b1c263dacfce0358aaef97d3e323 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Feb 2016 16:29:32 +0000 Subject: We return dicts now. --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index aca3219206..2742e0c008 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -72,7 +72,7 @@ class EventPushActionsStore(SQLBaseStore): ) results = txn.fetchall() if len(results) == 0: - return [] + return {} stream_ordering = results[0][0] topological_ordering = results[0][1] -- cgit 1.5.1 From 4d36e732307ad35eb070af384058f227d7d85dd0 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 3 Feb 2016 16:35:00 +0000 Subject: Actually return something sensible --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 2742e0c008..d0a969f50b 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -72,7 +72,7 @@ class EventPushActionsStore(SQLBaseStore): ) results = txn.fetchall() if len(results) == 0: - return {} + return {"notify_count": 0, "highlight_count": 0} stream_ordering = results[0][0] topological_ordering = results[0][1] -- cgit 1.5.1 From 7b0d846407a593ccd204f82aaa1090b8af8df84c Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 9 Feb 2016 16:19:15 +0000 Subject: Atomically persit push actions when we persist the event --- synapse/events/snapshot.py | 1 + synapse/handlers/_base.py | 10 ++++---- synapse/handlers/federation.py | 12 +++++----- synapse/push/action_generator.py | 20 ++++------------ synapse/storage/event_push_actions.py | 45 +++++++++++++---------------------- synapse/storage/events.py | 26 ++++++++++++-------- 6 files changed, 49 insertions(+), 65 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py index f51200d18e..8a475417a6 100644 --- a/synapse/events/snapshot.py +++ b/synapse/events/snapshot.py @@ -20,3 +20,4 @@ class EventContext(object): self.current_state = current_state self.state_group = None self.rejected = False + self.push_actions = [] diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py index d3f722b22e..064e8723c8 100644 --- a/synapse/handlers/_base.py +++ b/synapse/handlers/_base.py @@ -264,13 +264,13 @@ class BaseHandler(object): "You don't have permission to redact events" ) - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) - action_generator = ActionGenerator(self.hs) yield action_generator.handle_push_actions_for_event( - event, self, context.current_state + event, context, self + ) + + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context ) destinations = set() diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b78b0502d9..da55d43541 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -236,12 +236,6 @@ class FederationHandler(BaseHandler): user = UserID.from_string(event.state_key) yield user_joined_room(self.distributor, user, event.room_id) - if not backfilled and not event.internal_metadata.is_outlier(): - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, self - ) - @defer.inlineCallbacks def _filter_events_for_server(self, server_name, room_id, events): event_to_state = yield self.store.get_state_for_events( @@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler): auth_events=auth_events, ) + if not backfilled and not event.internal_metadata.is_outlier(): + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context, self + ) + event_stream_id, max_stream_id = yield self.store.persist_event( event, context=context, diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index d8f8256a1f..e0da0868ec 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -19,8 +19,6 @@ import bulk_push_rule_evaluator import logging -from synapse.api.constants import EventTypes - logger = logging.getLogger(__name__) @@ -36,23 +34,15 @@ class ActionGenerator: # tag (ie. we just need all the users). @defer.inlineCallbacks - def handle_push_actions_for_event(self, event, handler, current_state): - if event.type == EventTypes.Redaction and event.redacts is not None: - yield self.store.remove_push_actions_for_event_id( - event.room_id, event.redacts - ) - + def handle_push_actions_for_event(self, event, context, handler): bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id( event.room_id, self.hs, self.store ) actions_by_user = yield bulk_evaluator.action_for_event_by_user( - event, handler, current_state + event, handler, context.current_state ) - yield self.store.set_push_actions_for_event_and_users( - event, - [ - (uid, None, actions) for uid, actions in actions_by_user.items() - ] - ) + context.push_actions = [ + (uid, None, actions) for uid, actions in actions_by_user.items() + ] diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d0a969f50b..466f07a1c4 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,8 +24,7 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): - @defer.inlineCallbacks - def set_push_actions_for_event_and_users(self, event, tuples): + def _set_push_actions_for_event_and_users(self, txn, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore): 'highlight': 1 if _action_has_highlight(actions) else 0, }) - def f(txn): - for uid, _, __ in tuples: - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (event.room_id, uid) - ) - return self._simple_insert_many_txn(txn, "event_push_actions", values) - - yield self.runInteraction( - "set_actions_for_event_and_users", - f, - ) + for uid, _, __ in tuples: + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (event.room_id, uid) + ) + self._simple_insert_many_txn(txn, "event_push_actions", values) @cachedInlineCallbacks(num_args=3, lru=True, tree=True) def get_unread_event_push_actions_by_room_for_user( @@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) - @defer.inlineCallbacks - def remove_push_actions_for_event_id(self, room_id, event_id): - def f(txn): - # Sad that we have to blow away the cache for the whole room here - txn.call_after( - self.get_unread_event_push_actions_by_room_for_user.invalidate_many, - (room_id,) - ) - txn.execute( - "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", - (room_id, event_id) - ) - yield self.runInteraction( - "remove_push_actions_for_event_id", - f + def _remove_push_actions_for_event_id(self, txn, room_id, event_id): + # Sad that we have to blow away the cache for the whole room here + txn.call_after( + self.get_unread_event_push_actions_by_room_for_user.invalidate_many, + (room_id,) + ) + txn.execute( + "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?", + (room_id, event_id) ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index c6ed54721c..7d4012c414 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore): @log_function def _persist_events_txn(self, txn, events_and_contexts, backfilled, is_new_state=True): - - # Remove the any existing cache entries for the event_ids - for event, _ in events_and_contexts: + depth_updates = {} + for event, context in events_and_contexts: + # Remove the any existing cache entries for the event_ids txn.call_after(self._invalidate_get_event_cache, event.event_id) - if not backfilled: txn.call_after( self._events_stream_cache.entity_has_changed, event.room_id, event.internal_metadata.stream_ordering, ) - depth_updates = {} - for event, _ in events_and_contexts: - if event.internal_metadata.is_outlier(): - continue - depth_updates[event.room_id] = max( - event.depth, depth_updates.get(event.room_id, event.depth) + if not event.internal_metadata.is_outlier(): + depth_updates[event.room_id] = max( + event.depth, depth_updates.get(event.room_id, event.depth) + ) + + if context.push_actions: + self._set_push_actions_for_event_and_users( + txn, event, context.push_actions + ) + + if event.type == EventTypes.Redaction and event.redacts is not None: + self._remove_push_actions_for_event_id( + txn, event.room_id, event.redacts ) for room_id, depth in depth_updates.items(): -- cgit 1.5.1 From 02147452396c67e7874b201460f8b1cc8996a90a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 10 Feb 2016 11:09:56 +0000 Subject: Rename functions --- synapse/storage/event_push_actions.py | 4 ++-- synapse/storage/events.py | 4 ++-- 2 files changed, 4 insertions(+), 4 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 466f07a1c4..d77a817682 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -24,7 +24,7 @@ logger = logging.getLogger(__name__) class EventPushActionsStore(SQLBaseStore): - def _set_push_actions_for_event_and_users(self, txn, event, tuples): + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ :param event: the event set actions for :param tuples: list of tuples of (user_id, profile_tag, actions) @@ -100,7 +100,7 @@ class EventPushActionsStore(SQLBaseStore): ) defer.returnValue(ret) - def _remove_push_actions_for_event_id(self, txn, room_id, event_id): + def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id): # Sad that we have to blow away the cache for the whole room here txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 7d4012c414..3a5c6ee4b1 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -221,12 +221,12 @@ class EventsStore(SQLBaseStore): ) if context.push_actions: - self._set_push_actions_for_event_and_users( + self._set_push_actions_for_event_and_users_txn( txn, event, context.push_actions ) if event.type == EventTypes.Redaction and event.redacts is not None: - self._remove_push_actions_for_event_id( + self._remove_push_actions_for_event_id_txn( txn, event.room_id, event.redacts ) -- cgit 1.5.1