diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index f9445bef13..8e684d91b5 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -52,7 +52,6 @@ class EventContext(object):
"prev_state_ids",
"state_group",
"rejected",
- "push_actions",
"prev_group",
"delta_ids",
"prev_state_events",
@@ -67,7 +66,6 @@ class EventContext(object):
self.state_group = None
self.rejected = False
- self.push_actions = []
# A previously persisted state group and a delta between that
# and this state.
@@ -104,7 +102,6 @@ class EventContext(object):
"event_state_key": event.state_key if event.is_state() else None,
"state_group": self.state_group,
"rejected": self.rejected,
- "push_actions": self.push_actions,
"prev_group": self.prev_group,
"delta_ids": _encode_state_dict(self.delta_ids),
"prev_state_events": self.prev_state_events,
@@ -127,7 +124,6 @@ class EventContext(object):
context = EventContext()
context.state_group = input["state_group"]
context.rejected = input["rejected"]
- context.push_actions = input["push_actions"]
context.prev_group = input["prev_group"]
context.delta_ids = _decode_state_dict(input["delta_ids"])
context.prev_state_events = input["prev_state_events"]
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 1c3ac03f20..d99d8049b3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -683,9 +683,15 @@ class EventCreationHandler(object):
event, context
)
- (event_stream_id, max_stream_id) = yield self.store.persist_event(
- event, context=context
- )
+ try:
+ (event_stream_id, max_stream_id) = yield self.store.persist_event(
+ event, context=context
+ )
+ except: # noqa: E722, as we reraise the exception this is fine.
+ # Ensure that we actually remove the entries in the push actions
+ # staging area
+ preserve_fn(self.store.remove_push_actions_from_staging)(event.event_id)
+ raise
# this intentionally does not yield: we don't care about the result
# and don't need to wait for it.
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index fe09d50d55..8f619a7a1b 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -40,10 +40,6 @@ class ActionGenerator(object):
@defer.inlineCallbacks
def handle_push_actions_for_event(self, event, context):
with Measure(self.clock, "action_for_event_by_user"):
- actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
+ yield self.bulk_evaluator.action_for_event_by_user(
event, context
)
-
- context.push_actions = [
- (uid, actions) for uid, actions in actions_by_user.iteritems()
- ]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 425a017bdf..bf4f1c5836 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -137,14 +137,13 @@ class BulkPushRuleEvaluator(object):
@defer.inlineCallbacks
def action_for_event_by_user(self, event, context):
- """Given an event and context, evaluate the push rules and return
- the results
+ """Given an event and context, evaluate the push rules and insert the
+ results into the event_push_actions_staging table.
Returns:
- dict of user_id -> action
+ Deferred
"""
rules_by_user = yield self._get_rules_for_event(event, context)
- actions_by_user = {}
room_members = yield self.store.get_joined_users_from_context(
event, context
@@ -190,9 +189,13 @@ class BulkPushRuleEvaluator(object):
if matches:
actions = [x for x in rule['actions'] if x != 'dont_notify']
if actions and 'notify' in actions:
- actions_by_user[uid] = actions
+ # Push rules say we should notify the user of this event,
+ # so we mark it in the DB in the staging area. (This
+ # will then get handled when we persist the event)
+ yield self.store.add_push_actions_to_staging(
+ event.event_id, uid, actions,
+ )
break
- defer.returnValue(actions_by_user)
def _condition_checker(evaluator, conditions, uid, display_name, cache):
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 8efe2fd4bb..f787431b7a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -88,33 +88,50 @@ class EventPushActionsStore(SQLBaseStore):
self._rotate_notifs, 30 * 60 * 1000
)
- def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
+ def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""
- values = []
- for uid, actions in tuples:
- is_highlight = 1 if _action_has_highlight(actions) else 0
-
- values.append({
- 'room_id': event.room_id,
- 'event_id': event.event_id,
- 'user_id': uid,
- 'actions': _serialize_action(actions, is_highlight),
- 'stream_ordering': event.internal_metadata.stream_ordering,
- 'topological_ordering': event.depth,
- 'notif': 1,
- 'highlight': is_highlight,
- })
-
- for uid, __ in tuples:
+
+ sql = """
+ INSERT INTO event_push_actions (
+ room_id, event_id, user_id, actions, stream_ordering,
+ topological_ordering, notif, highlight
+ )
+ SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
+ FROM event_push_actions_staging
+ WHERE event_id = ?
+ """
+
+ txn.execute(sql, (
+ event.room_id, event.internal_metadata.stream_ordering,
+ event.depth, event.event_id,
+ ))
+
+ user_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ retcol="user_id",
+ )
+
+ self._simple_delete_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ )
+
+ for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid)
+ (event.room_id, uid,)
)
- self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@@ -738,6 +755,50 @@ class EventPushActionsStore(SQLBaseStore):
(rotate_to_stream_ordering,)
)
+ def add_push_actions_to_staging(self, event_id, user_id, actions):
+ """Add the push actions for the user and event to the push
+ action staging area.
+
+ Args:
+ event_id (str)
+ user_id (str)
+ actions (list[dict|str]): An action can either be a string or
+ dict.
+
+ Returns:
+ Deferred
+ """
+
+ is_highlight = 1 if _action_has_highlight(actions) else 0
+
+ return self._simple_insert(
+ table="event_push_actions_staging",
+ values={
+ "event_id": event_id,
+ "user_id": user_id,
+ "actions": _serialize_action(actions, is_highlight),
+ "notif": 1,
+ "highlight": is_highlight,
+ },
+ desc="add_push_actions_to_staging",
+ )
+
+ def remove_push_actions_from_staging(self, event_id):
+ """Called if we failed to persist the event to ensure that stale push
+ actions don't build up in the DB
+
+ Args:
+ event_id (str)
+ """
+
+ return self._simple_delete(
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event_id,
+ },
+ desc="remove_push_actions_from_staging",
+ )
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 28cce2979c..52b7b34749 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1168,10 +1168,9 @@ class EventsStore(SQLBaseStore):
for event, context in events_and_contexts:
# Insert all the push actions into the event_push_actions table.
- if context.push_actions:
- self._set_push_actions_for_event_and_users_txn(
- txn, event, context.push_actions
- )
+ self._set_push_actions_for_event_and_users_txn(
+ txn, event,
+ )
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
diff --git a/synapse/storage/schema/delta/47/push_actions_staging.sql b/synapse/storage/schema/delta/47/push_actions_staging.sql
new file mode 100644
index 0000000000..edccf4a96f
--- /dev/null
+++ b/synapse/storage/schema/delta/47/push_actions_staging.sql
@@ -0,0 +1,28 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Temporary staging area for push actions that have been calculated for an
+-- event, but the event hasn't yet been persisted.
+-- When the event is persisted the rows are moved over to the
+-- event_push_actions table.
+CREATE TABLE event_push_actions_staging (
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ actions TEXT NOT NULL,
+ notif SMALLINT NOT NULL,
+ highlight SMALLINT NOT NULL
+);
+
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index f430cce931..4780f2ab72 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -230,7 +230,10 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
state_handler = self.hs.get_state_handler()
context = yield state_handler.compute_event_context(event)
- context.push_actions = push_actions
+ for user_id, actions in push_actions:
+ yield self.master_store.add_push_actions_to_staging(
+ event.event_id, user_id, actions,
+ )
ordering = None
if backfill:
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index 3135488353..d483e7cf9e 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -62,6 +62,7 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
{"notify_count": noitf_count, "highlight_count": highlight_count}
)
+ @defer.inlineCallbacks
def _inject_actions(stream, action):
event = Mock()
event.room_id = room_id
@@ -69,11 +70,12 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
event.internal_metadata.stream_ordering = stream
event.depth = stream
- tuples = [(user_id, action)]
-
- return self.store.runInteraction(
+ yield self.store.add_push_actions_to_staging(
+ event.event_id, user_id, action,
+ )
+ yield self.store.runInteraction(
"", self.store._set_push_actions_for_event_and_users_txn,
- event, tuples
+ event,
)
def _rotate(stream):
|