diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index f8c164b48b..c5d6c6bd86 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -17,7 +17,7 @@ import logging
from synapse.api.constants import EventTypes
from synapse.storage import DataStore
from synapse.storage.event_federation import EventFederationStore
-from synapse.storage.event_push_actions import EventPushActionsStore
+from synapse.storage.event_push_actions import EventPushActionsWorkerStore
from synapse.storage.roommember import RoomMemberStore
from synapse.storage.state import StateGroupWorkerStore
from synapse.storage.stream import StreamStore
@@ -38,7 +38,8 @@ logger = logging.getLogger(__name__)
# the method descriptor on the DataStore and chuck them into our class.
-class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
+class SlavedEventStore(EventPushActionsWorkerStore, StateGroupWorkerStore,
+ BaseSlavedStore):
def __init__(self, db_conn, hs):
super(SlavedEventStore, self).__init__(db_conn, hs)
@@ -80,30 +81,12 @@ class SlavedEventStore(StateGroupWorkerStore, BaseSlavedStore):
get_invited_rooms_for_user = RoomMemberStore.__dict__[
"get_invited_rooms_for_user"
]
- get_unread_event_push_actions_by_room_for_user = (
- EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
- )
- _get_unread_counts_by_receipt_txn = (
- DataStore._get_unread_counts_by_receipt_txn.__func__
- )
- _get_unread_counts_by_pos_txn = (
- DataStore._get_unread_counts_by_pos_txn.__func__
- )
get_recent_event_ids_for_room = (
StreamStore.__dict__["get_recent_event_ids_for_room"]
)
_get_joined_hosts_cache = RoomMemberStore.__dict__["_get_joined_hosts_cache"]
has_room_changed_since = DataStore.has_room_changed_since.__func__
- get_unread_push_actions_for_user_in_range_for_http = (
- DataStore.get_unread_push_actions_for_user_in_range_for_http.__func__
- )
- get_unread_push_actions_for_user_in_range_for_email = (
- DataStore.get_unread_push_actions_for_user_in_range_for_email.__func__
- )
- get_push_action_users_in_range = (
- DataStore.get_push_action_users_in_range.__func__
- )
get_event = DataStore.get_event.__func__
get_events = DataStore.get_events.__func__
get_rooms_for_user_where_membership_is = (
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index f787431b7a..124583835d 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -62,77 +62,7 @@ def _deserialize_action(actions, is_highlight):
return DEFAULT_NOTIF_ACTION
-class EventPushActionsStore(SQLBaseStore):
- EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
-
- def __init__(self, db_conn, hs):
- super(EventPushActionsStore, self).__init__(db_conn, hs)
-
- self.register_background_index_update(
- self.EPA_HIGHLIGHT_INDEX,
- index_name="event_push_actions_u_highlight",
- table="event_push_actions",
- columns=["user_id", "stream_ordering"],
- )
-
- self.register_background_index_update(
- "event_push_actions_highlights_index",
- index_name="event_push_actions_highlights_index",
- table="event_push_actions",
- columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
- where_clause="highlight=1"
- )
-
- self._doing_notif_rotation = False
- self._rotate_notif_loop = self._clock.looping_call(
- self._rotate_notifs, 30 * 60 * 1000
- )
-
- def _set_push_actions_for_event_and_users_txn(self, txn, event):
- """
- Args:
- event: the event set actions for
- tuples: list of tuples of (user_id, actions)
- """
-
- sql = """
- INSERT INTO event_push_actions (
- room_id, event_id, user_id, actions, stream_ordering,
- topological_ordering, notif, highlight
- )
- SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
- FROM event_push_actions_staging
- WHERE event_id = ?
- """
-
- txn.execute(sql, (
- event.room_id, event.internal_metadata.stream_ordering,
- event.depth, event.event_id,
- ))
-
- user_ids = self._simple_select_onecol_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event.event_id,
- },
- retcol="user_id",
- )
-
- self._simple_delete_txn(
- txn,
- table="event_push_actions_staging",
- keyvalues={
- "event_id": event.event_id,
- },
- )
-
- for uid in user_ids:
- txn.call_after(
- self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
- (event.room_id, uid,)
- )
-
+class EventPushActionsWorkerStore(SQLBaseStore):
@cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
def get_unread_event_push_actions_by_room_for_user(
self, room_id, user_id, last_read_event_id
@@ -449,6 +379,78 @@ class EventPushActionsStore(SQLBaseStore):
# Now return the first `limit`
defer.returnValue(notifs[:limit])
+
+class EventPushActionsStore(EventPushActionsWorkerStore):
+ EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
+
+ def __init__(self, db_conn, hs):
+ super(EventPushActionsStore, self).__init__(db_conn, hs)
+
+ self.register_background_index_update(
+ self.EPA_HIGHLIGHT_INDEX,
+ index_name="event_push_actions_u_highlight",
+ table="event_push_actions",
+ columns=["user_id", "stream_ordering"],
+ )
+
+ self.register_background_index_update(
+ "event_push_actions_highlights_index",
+ index_name="event_push_actions_highlights_index",
+ table="event_push_actions",
+ columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
+ where_clause="highlight=1"
+ )
+
+ self._doing_notif_rotation = False
+ self._rotate_notif_loop = self._clock.looping_call(
+ self._rotate_notifs, 30 * 60 * 1000
+ )
+
+ def _set_push_actions_for_event_and_users_txn(self, txn, event):
+ """
+ Args:
+ event: the event set actions for
+ tuples: list of tuples of (user_id, actions)
+ """
+
+ sql = """
+ INSERT INTO event_push_actions (
+ room_id, event_id, user_id, actions, stream_ordering,
+ topological_ordering, notif, highlight
+ )
+ SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
+ FROM event_push_actions_staging
+ WHERE event_id = ?
+ """
+
+ txn.execute(sql, (
+ event.room_id, event.internal_metadata.stream_ordering,
+ event.depth, event.event_id,
+ ))
+
+ user_ids = self._simple_select_onecol_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ retcol="user_id",
+ )
+
+ self._simple_delete_txn(
+ txn,
+ table="event_push_actions_staging",
+ keyvalues={
+ "event_id": event.event_id,
+ },
+ )
+
+ for uid in user_ids:
+ txn.call_after(
+ self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
+ (event.room_id, uid,)
+ )
+
@defer.inlineCallbacks
def get_push_actions_for_user(self, user_id, before=None, limit=50,
only_highlight=False):
|