summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-02-10 11:25:32 +0000
committerErik Johnston <erik@matrix.org>2016-02-10 11:25:32 +0000
commitf7ef5c1d57617f4bd0d40251fdd3145a82a47742 (patch)
tree5e93b2013aa788f52ea61fd5271204c411f9abfa
parentMerge pull request #532 from floviolleau/floviolleau/documentation (diff)
parentRename functions (diff)
downloadsynapse-f7ef5c1d57617f4bd0d40251fdd3145a82a47742.tar.xz
Merge pull request #568 from matrix-org/erikj/unread_notif
	Atomically persit push actions when we persist the event
-rw-r--r--synapse/events/snapshot.py1
-rw-r--r--synapse/handlers/_base.py39
-rw-r--r--synapse/handlers/federation.py12
-rw-r--r--synapse/push/action_generator.py22
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py10
-rw-r--r--synapse/storage/event_push_actions.py45
-rw-r--r--synapse/storage/events.py26
7 files changed, 66 insertions, 89 deletions
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index f51200d18e..8a475417a6 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -20,3 +20,4 @@ class EventContext(object):
         self.current_state = current_state
         self.state_group = None
         self.rejected = False
+        self.push_actions = []
diff --git a/synapse/handlers/_base.py b/synapse/handlers/_base.py
index fa83d3e464..064e8723c8 100644
--- a/synapse/handlers/_base.py
+++ b/synapse/handlers/_base.py
@@ -53,25 +53,10 @@ class BaseHandler(object):
         self.event_builder_factory = hs.get_event_builder_factory()
 
     @defer.inlineCallbacks
-    def _filter_events_for_clients(self, user_tuples, events):
+    def _filter_events_for_clients(self, user_tuples, events, event_id_to_state):
         """ Returns dict of user_id -> list of events that user is allowed to
         see.
         """
-        # If there is only one user, just get the state for that one user,
-        # otherwise just get all the state.
-        if len(user_tuples) == 1:
-            types = (
-                (EventTypes.RoomHistoryVisibility, ""),
-                (EventTypes.Member, user_tuples[0][0]),
-            )
-        else:
-            types = None
-
-        event_id_to_state = yield self.store.get_state_for_events(
-            frozenset(e.event_id for e in events),
-            types=types
-        )
-
         forgotten = yield defer.gatherResults([
             self.store.who_forgot_in_room(
                 room_id,
@@ -135,7 +120,17 @@ class BaseHandler(object):
     @defer.inlineCallbacks
     def _filter_events_for_client(self, user_id, events, is_peeking=False):
         # Assumes that user has at some point joined the room if not is_guest.
-        res = yield self._filter_events_for_clients([(user_id, is_peeking)], events)
+        types = (
+            (EventTypes.RoomHistoryVisibility, ""),
+            (EventTypes.Member, user_id),
+        )
+        event_id_to_state = yield self.store.get_state_for_events(
+            frozenset(e.event_id for e in events),
+            types=types
+        )
+        res = yield self._filter_events_for_clients(
+            [(user_id, is_peeking)], events, event_id_to_state
+        )
         defer.returnValue(res.get(user_id, []))
 
     def ratelimit(self, user_id):
@@ -269,13 +264,13 @@ class BaseHandler(object):
                         "You don't have permission to redact events"
                     )
 
-        (event_stream_id, max_stream_id) = yield self.store.persist_event(
-            event, context=context
-        )
-
         action_generator = ActionGenerator(self.hs)
         yield action_generator.handle_push_actions_for_event(
-            event, self
+            event, context, self
+        )
+
+        (event_stream_id, max_stream_id) = yield self.store.persist_event(
+            event, context=context
         )
 
         destinations = set()
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b78b0502d9..da55d43541 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -236,12 +236,6 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     yield user_joined_room(self.distributor, user, event.room_id)
 
-        if not backfilled and not event.internal_metadata.is_outlier():
-            action_generator = ActionGenerator(self.hs)
-            yield action_generator.handle_push_actions_for_event(
-                event, self
-            )
-
     @defer.inlineCallbacks
     def _filter_events_for_server(self, server_name, room_id, events):
         event_to_state = yield self.store.get_state_for_events(
@@ -1073,6 +1067,12 @@ class FederationHandler(BaseHandler):
             auth_events=auth_events,
         )
 
+        if not backfilled and not event.internal_metadata.is_outlier():
+            action_generator = ActionGenerator(self.hs)
+            yield action_generator.handle_push_actions_for_event(
+                event, context, self
+            )
+
         event_stream_id, max_stream_id = yield self.store.persist_event(
             event,
             context=context,
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index 1d2e558f9a..e0da0868ec 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -19,8 +19,6 @@ import bulk_push_rule_evaluator
 
 import logging
 
-from synapse.api.constants import EventTypes
-
 logger = logging.getLogger(__name__)
 
 
@@ -36,21 +34,15 @@ class ActionGenerator:
         # tag (ie. we just need all the users).
 
     @defer.inlineCallbacks
-    def handle_push_actions_for_event(self, event, handler):
-        if event.type == EventTypes.Redaction and event.redacts is not None:
-            yield self.store.remove_push_actions_for_event_id(
-                event.room_id, event.redacts
-            )
-
+    def handle_push_actions_for_event(self, event, context, handler):
         bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
             event.room_id, self.hs, self.store
         )
 
-        actions_by_user = yield bulk_evaluator.action_for_event_by_user(event, handler)
-
-        yield self.store.set_push_actions_for_event_and_users(
-            event,
-            [
-                (uid, None, actions) for uid, actions in actions_by_user.items()
-            ]
+        actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+            event, handler, context.current_state
         )
+
+        context.push_actions = [
+            (uid, None, actions) for uid, actions in actions_by_user.items()
+        ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 20c60422bf..8ac5ceb9ef 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -98,25 +98,21 @@ class BulkPushRuleEvaluator:
         self.store = store
 
     @defer.inlineCallbacks
-    def action_for_event_by_user(self, event, handler):
+    def action_for_event_by_user(self, event, handler, current_state):
         actions_by_user = {}
 
         users_dict = yield self.store.are_guests(self.rules_by_user.keys())
 
         filtered_by_user = yield handler._filter_events_for_clients(
-            users_dict.items(), [event]
+            users_dict.items(), [event], {event.event_id: current_state}
         )
 
         evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
 
         condition_cache = {}
 
-        member_state = yield self.store.get_state_for_event(
-            event.event_id,
-        )
-
         display_names = {}
-        for ev in member_state.values():
+        for ev in current_state.values():
             nm = ev.content.get("displayname", None)
             if nm and ev.type == EventTypes.Member:
                 display_names[ev.state_key] = nm
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index d0a969f50b..d77a817682 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -24,8 +24,7 @@ logger = logging.getLogger(__name__)
 
 
 class EventPushActionsStore(SQLBaseStore):
-    @defer.inlineCallbacks
-    def set_push_actions_for_event_and_users(self, event, tuples):
+    def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
         :param event: the event set actions for
         :param tuples: list of tuples of (user_id, profile_tag, actions)
@@ -44,18 +43,12 @@ class EventPushActionsStore(SQLBaseStore):
                 'highlight': 1 if _action_has_highlight(actions) else 0,
             })
 
-        def f(txn):
-            for uid, _, __ in tuples:
-                txn.call_after(
-                    self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                    (event.room_id, uid)
-                )
-            return self._simple_insert_many_txn(txn, "event_push_actions", values)
-
-        yield self.runInteraction(
-            "set_actions_for_event_and_users",
-            f,
-        )
+        for uid, _, __ in tuples:
+            txn.call_after(
+                self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+                (event.room_id, uid)
+            )
+        self._simple_insert_many_txn(txn, "event_push_actions", values)
 
     @cachedInlineCallbacks(num_args=3, lru=True, tree=True)
     def get_unread_event_push_actions_by_room_for_user(
@@ -107,21 +100,15 @@ class EventPushActionsStore(SQLBaseStore):
         )
         defer.returnValue(ret)
 
-    @defer.inlineCallbacks
-    def remove_push_actions_for_event_id(self, room_id, event_id):
-        def f(txn):
-            # Sad that we have to blow away the cache for the whole room here
-            txn.call_after(
-                self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                (room_id,)
-            )
-            txn.execute(
-                "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
-                (room_id, event_id)
-            )
-        yield self.runInteraction(
-            "remove_push_actions_for_event_id",
-            f
+    def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
+        # Sad that we have to blow away the cache for the whole room here
+        txn.call_after(
+            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+            (room_id,)
+        )
+        txn.execute(
+            "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
+            (room_id, event_id)
         )
 
 
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index c6ed54721c..3a5c6ee4b1 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -205,23 +205,29 @@ class EventsStore(SQLBaseStore):
     @log_function
     def _persist_events_txn(self, txn, events_and_contexts, backfilled,
                             is_new_state=True):
-
-        # Remove the any existing cache entries for the event_ids
-        for event, _ in events_and_contexts:
+        depth_updates = {}
+        for event, context in events_and_contexts:
+            # Remove the any existing cache entries for the event_ids
             txn.call_after(self._invalidate_get_event_cache, event.event_id)
-
             if not backfilled:
                 txn.call_after(
                     self._events_stream_cache.entity_has_changed,
                     event.room_id, event.internal_metadata.stream_ordering,
                 )
 
-        depth_updates = {}
-        for event, _ in events_and_contexts:
-            if event.internal_metadata.is_outlier():
-                continue
-            depth_updates[event.room_id] = max(
-                event.depth, depth_updates.get(event.room_id, event.depth)
+            if not event.internal_metadata.is_outlier():
+                depth_updates[event.room_id] = max(
+                    event.depth, depth_updates.get(event.room_id, event.depth)
+                )
+
+            if context.push_actions:
+                self._set_push_actions_for_event_and_users_txn(
+                    txn, event, context.push_actions
+                )
+
+        if event.type == EventTypes.Redaction and event.redacts is not None:
+            self._remove_push_actions_for_event_id_txn(
+                txn, event.room_id, event.redacts
             )
 
         for room_id, depth in depth_updates.items():