diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 8efe2fd4bb..f787431b7a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -88,33 +88,50 @@ class EventPushActionsStore(SQLBaseStore):
self._rotate_notifs, 30 * 60 * 1000
)
- def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
+ def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""
- values = []
- for uid, actions in tuples:
- is_highlight = 1 if _action_has_highlight(actions) else 0
-
- values.append({
- 'room_id': event.room_id,
- 'event_id': event.event_id,
- 'user_id': uid,
- 'actions': _serialize_action(actions, is_highlight),
- 'stream_ordering': event.internal_metadata.stream_ordering,
- 'topological_ordering': event.depth,
- 'notif': 1,
- 'highlight': is_highlight,
- })
-
- for uid, __ in tuples:
+
+ sql = """
+ INSERT INTO event_push_actions (
+ room_id, event_id, user_id, actions, stream_ordering,
+ topological_ordering, notif, highlight
+ )
+ SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
+ FROM event_push_actions_staging
+ WHERE event_id = ?
+ """
+
+ txn.execute(sql, (
+ event.room_id, event.internal_metadata.stream_ordering,
+ event.depth, event.event_id,
+ ))
+
+ user_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ retcol="user_id",
+ )
+
+ self._simple_delete_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ )
+
+ for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid)
+ (event.room_id, uid,)
)
- self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@@ -738,6 +755,50 @@ class EventPushActionsStore(SQLBaseStore):
(rotate_to_stream_ordering,)
)
+ def add_push_actions_to_staging(self, event_id, user_id, actions):
+ """Add the push actions for the user and event to the push
+ action staging area.
+
+ Args:
+ event_id (str)
+ user_id (str)
+ actions (list[dict|str]): An action can either be a string or
+ dict.
+
+ Returns:
+ Deferred
+ """
+
+ is_highlight = 1 if _action_has_highlight(actions) else 0
+
+ return self._simple_insert(
+ table="event_push_actions_staging",
+ values={
+ "event_id": event_id,
+ "user_id": user_id,
+ "actions": _serialize_action(actions, is_highlight),
+ "notif": 1,
+ "highlight": is_highlight,
+ },
+ desc="add_push_actions_to_staging",
+ )
+
+ def remove_push_actions_from_staging(self, event_id):
+ """Called if we failed to persist the event to ensure that stale push
+ actions don't build up in the DB
+
+ Args:
+ event_id (str)
+ """
+
+ return self._simple_delete(
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event_id,
+ },
+ desc="remove_push_actions_from_staging",
+ )
+
def _action_has_highlight(actions):
for action in actions:
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 28cce2979c..52b7b34749 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -1168,10 +1168,9 @@ class EventsStore(SQLBaseStore):
for event, context in events_and_contexts:
# Insert all the push actions into the event_push_actions table.
- if context.push_actions:
- self._set_push_actions_for_event_and_users_txn(
- txn, event, context.push_actions
- )
+ self._set_push_actions_for_event_and_users_txn(
+ txn, event,
+ )
if event.type == EventTypes.Redaction and event.redacts is not None:
# Remove the entries in the event_push_actions table for the
diff --git a/synapse/storage/schema/delta/47/push_actions_staging.sql b/synapse/storage/schema/delta/47/push_actions_staging.sql
new file mode 100644
index 0000000000..edccf4a96f
--- /dev/null
+++ b/synapse/storage/schema/delta/47/push_actions_staging.sql
@@ -0,0 +1,28 @@
+/* Copyright 2018 New Vector Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Temporary staging area for push actions that have been calculated for an
+-- event, but the event hasn't yet been persisted.
+-- When the event is persisted the rows are moved over to the
+-- event_push_actions table.
+CREATE TABLE event_push_actions_staging (
+ event_id TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ actions TEXT NOT NULL,
+ notif SMALLINT NOT NULL,
+ highlight SMALLINT NOT NULL
+);
+
+CREATE INDEX event_push_actions_staging_id ON event_push_actions_staging(event_id);
|