diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 214ace27c9..ca0d71e6b8 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015 OpenMarket Ltd
+# Copyright 2018 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -62,94 +63,7 @@ def _deserialize_action(actions, is_highlight):
return DEFAULT_NOTIF_ACTION
-class EventPushActionsStore(SQLBaseStore):
- EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
-
- def __init__(self, db_conn, hs):
- super(EventPushActionsStore, self).__init__(db_conn, hs)
-
- self.register_background_index_update(
- self.EPA_HIGHLIGHT_INDEX,
- index_name="event_push_actions_u_highlight",
- table="event_push_actions",
- columns=["user_id", "stream_ordering"],
- )
-
- self.register_background_index_update(
- "event_push_actions_highlights_index",
- index_name="event_push_actions_highlights_index",
- table="event_push_actions",
- columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
- where_clause="highlight=1"
- )
-
- self._doing_notif_rotation = False
- self._rotate_notif_loop = self._clock.looping_call(
- self._rotate_notifs, 30 * 60 * 1000
- )
-
- def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
- all_events_and_contexts):
- """Handles moving push actions from staging table to main
- event_push_actions table for all events in `events_and_contexts`.
-
- Also ensures that all events in `all_events_and_contexts` are removed
- from the push action staging area.
-
- Args:
- events_and_contexts (list[(EventBase, EventContext)]): events
- we are persisting
- all_events_and_contexts (list[(EventBase, EventContext)]): all
- events that we were going to persist. This includes events
- we've already persisted, etc, that wouldn't appear in
- events_and_context.
- """
-
- sql = """
- INSERT INTO event_push_actions (
- room_id, event_id, user_id, actions, stream_ordering,
- topological_ordering, notif, highlight
- )
- SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
- FROM event_push_actions_staging
- WHERE event_id = ?
- """
-
- if events_and_contexts:
- txn.executemany(sql, (
- (
- event.room_id, event.internal_metadata.stream_ordering,
- event.depth, event.event_id,
- )
- for event, _ in events_and_contexts
- ))
-
- for event, _ in events_and_contexts:
- user_ids = self._simple_select_onecol_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event.event_id,
- },
- retcol="user_id",
- )
-
- for uid in user_ids:
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid,)
- )
-
- # Now we delete the staging area for *all* events that were being
- # persisted.
- txn.executemany(
- "DELETE FROM event_push_actions_staging WHERE event_id = ?",
- (
- (event.event_id,)
- for event, _ in all_events_and_contexts
- )
- )
-
+class EventPushActionsWorkerStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
@@ -466,6 +380,95 @@ class EventPushActionsStore(SQLBaseStore):
# Now return the first `limit`
defer.returnValue(notifs[:limit])
+
+class EventPushActionsStore(EventPushActionsWorkerStore):
+ EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
+
+ def __init__(self, db_conn, hs):
+ super(EventPushActionsStore, self).__init__(db_conn, hs)
+
+ self.register_background_index_update(
+ self.EPA_HIGHLIGHT_INDEX,
+ index_name="event_push_actions_u_highlight",
+ table="event_push_actions",
+ columns=["user_id", "stream_ordering"],
+ )
+
+ self.register_background_index_update(
+ "event_push_actions_highlights_index",
+ index_name="event_push_actions_highlights_index",
+ table="event_push_actions",
+ columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
+ where_clause="highlight=1"
+ )
+
+ self._doing_notif_rotation = False
+ self._rotate_notif_loop = self._clock.looping_call(
+ self._rotate_notifs, 30 * 60 * 1000
+ )
+
+ def _set_push_actions_for_event_and_users_txn(self, txn, events_and_contexts,
+ all_events_and_contexts):
+ """Handles moving push actions from staging table to main
+ event_push_actions table for all events in `events_and_contexts`.
+
+ Also ensures that all events in `all_events_and_contexts` are removed
+ from the push action staging area.
+
+ Args:
+ events_and_contexts (list[(EventBase, EventContext)]): events
+ we are persisting
+ all_events_and_contexts (list[(EventBase, EventContext)]): all
+ events that we were going to persist. This includes events
+ we've already persisted, etc, that wouldn't appear in
+ events_and_context.
+ """
+
+ sql = """
+ INSERT INTO event_push_actions (
+ room_id, event_id, user_id, actions, stream_ordering,
+ topological_ordering, notif, highlight
+ )
+ SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
+ FROM event_push_actions_staging
+ WHERE event_id = ?
+ """
+
+ if events_and_contexts:
+ txn.executemany(sql, (
+ (
+ event.room_id, event.internal_metadata.stream_ordering,
+ event.depth, event.event_id,
+ )
+ for event, _ in events_and_contexts
+ ))
+
+ for event, _ in events_and_contexts:
+ user_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ retcol="user_id",
+ )
+
+ for uid in user_ids:
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (event.room_id, uid,)
+ )
+
+ # Now we delete the staging area for *all* events that were being
+ # persisted.
+ txn.executemany(
+ "DELETE FROM event_push_actions_staging WHERE event_id = ?",
+ (
+ (event.event_id,)
+ for event, _ in all_events_and_contexts
+ )
+ )
+
@defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50,
only_highlight=False):
|