diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index fe6887414e..6454045c2d 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -380,6 +380,69 @@ class EventPushActionsWorkerStore(SQLBaseStore):
# Now return the first `limit`
defer.returnValue(notifs[:limit])
+ def add_push_actions_to_staging(self, event_id, user_id_actions):
+ """Add the push actions for the event to the push action staging area.
+
+ Args:
+ event_id (str)
+ user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
+ user_id to list of push actions, where an action can either be
+ a string or dict.
+
+ Returns:
+ Deferred
+ """
+
+ if not user_id_actions:
+ return
+
+ # This is a helper function for generating the necessary tuple that
+ # can be used to inert into the `event_push_actions_staging` table.
+ def _gen_entry(user_id, actions):
+ is_highlight = 1 if _action_has_highlight(actions) else 0
+ return (
+ event_id, # event_id column
+ user_id, # user_id column
+ _serialize_action(actions, is_highlight), # actions column
+ 1, # notif column
+ is_highlight, # highlight column
+ )
+
+ def _add_push_actions_to_staging_txn(txn):
+ # We don't use _simple_insert_many here to avoid the overhead
+ # of generating lists of dicts.
+
+ sql = """
+ INSERT INTO event_push_actions_staging
+ (event_id, user_id, actions, notif, highlight)
+ VALUES (?, ?, ?, ?, ?)
+ """
+
+ txn.executemany(sql, (
+ _gen_entry(user_id, actions)
+ for user_id, actions in user_id_actions.iteritems()
+ ))
+
+ return self.runInteraction(
+ "add_push_actions_to_staging", _add_push_actions_to_staging_txn
+ )
+
+ def remove_push_actions_from_staging(self, event_id):
+ """Called if we failed to persist the event to ensure that stale push
+ actions don't build up in the DB
+
+ Args:
+ event_id (str)
+ """
+
+ return self._simple_delete(
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event_id,
+ },
+ desc="remove_push_actions_from_staging",
+ )
+
class EventPushActionsStore(EventPushActionsWorkerStore):
EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
@@ -775,69 +838,6 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
(rotate_to_stream_ordering,)
)
- def add_push_actions_to_staging(self, event_id, user_id_actions):
- """Add the push actions for the event to the push action staging area.
-
- Args:
- event_id (str)
- user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
- user_id to list of push actions, where an action can either be
- a string or dict.
-
- Returns:
- Deferred
- """
-
- if not user_id_actions:
- return
-
- # This is a helper function for generating the necessary tuple that
- # can be used to inert into the `event_push_actions_staging` table.
- def _gen_entry(user_id, actions):
- is_highlight = 1 if _action_has_highlight(actions) else 0
- return (
- event_id, # event_id column
- user_id, # user_id column
- _serialize_action(actions, is_highlight), # actions column
- 1, # notif column
- is_highlight, # highlight column
- )
-
- def _add_push_actions_to_staging_txn(txn):
- # We don't use _simple_insert_many here to avoid the overhead
- # of generating lists of dicts.
-
- sql = """
- INSERT INTO event_push_actions_staging
- (event_id, user_id, actions, notif, highlight)
- VALUES (?, ?, ?, ?, ?)
- """
-
- txn.executemany(sql, (
- _gen_entry(user_id, actions)
- for user_id, actions in user_id_actions.iteritems()
- ))
-
- return self.runInteraction(
- "add_push_actions_to_staging", _add_push_actions_to_staging_txn
- )
-
- def remove_push_actions_from_staging(self, event_id):
- """Called if we failed to persist the event to ensure that stale push
- actions don't build up in the DB
-
- Args:
- event_id (str)
- """
-
- return self._simple_delete(
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event_id,
- },
- desc="remove_push_actions_from_staging",
- )
-
def _action_has_highlight(actions):
for action in actions:
|