diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 8efe2fd4bb..f787431b7a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -88,33 +88,50 @@ class EventPushActionsStore(SQLBaseStore):
self._rotate_notifs, 30 * 60 * 1000
)
- def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
+ def _set_push_actions_for_event_and_users_txn(self, txn, event):
"""
Args:
event: the event set actions for
tuples: list of tuples of (user_id, actions)
"""
- values = []
- for uid, actions in tuples:
- is_highlight = 1 if _action_has_highlight(actions) else 0
-
- values.append({
- 'room_id': event.room_id,
- 'event_id': event.event_id,
- 'user_id': uid,
- 'actions': _serialize_action(actions, is_highlight),
- 'stream_ordering': event.internal_metadata.stream_ordering,
- 'topological_ordering': event.depth,
- 'notif': 1,
- 'highlight': is_highlight,
- })
-
- for uid, __ in tuples:
+
+ sql = """
+ INSERT INTO event_push_actions (
+ room_id, event_id, user_id, actions, stream_ordering,
+ topological_ordering, notif, highlight
+ )
+ SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
+ FROM event_push_actions_staging
+ WHERE event_id = ?
+ """
+
+ txn.execute(sql, (
+ event.room_id, event.internal_metadata.stream_ordering,
+ event.depth, event.event_id,
+ ))
+
+ user_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ retcol="user_id",
+ )
+
+ self._simple_delete_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ )
+
+ for uid in user_ids:
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid)
+ (event.room_id, uid,)
)
- self._simple_insert_many_txn(txn, "event_push_actions", values)
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
@@ -738,6 +755,50 @@ class EventPushActionsStore(SQLBaseStore):
(rotate_to_stream_ordering,)
)
+ def add_push_actions_to_staging(self, event_id, user_id, actions):
+ """Add the push actions for the user and event to the push
+ action staging area.
+
+ Args:
+ event_id (str)
+ user_id (str)
+ actions (list[dict|str]): An action can either be a string or
+ dict.
+
+ Returns:
+ Deferred
+ """
+
+ is_highlight = 1 if _action_has_highlight(actions) else 0
+
+ return self._simple_insert(
+ table="event_push_actions_staging",
+ values={
+ "event_id": event_id,
+ "user_id": user_id,
+ "actions": _serialize_action(actions, is_highlight),
+ "notif": 1,
+ "highlight": is_highlight,
+ },
+ desc="add_push_actions_to_staging",
+ )
+
+ def remove_push_actions_from_staging(self, event_id):
+ """Called if we failed to persist the event to ensure that stale push
+ actions don't build up in the DB
+
+ Args:
+ event_id (str)
+ """
+
+ return self._simple_delete(
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event_id,
+ },
+ desc="remove_push_actions_from_staging",
+ )
+
def _action_has_highlight(actions):
for action in actions:
|