summary refs log tree commit diff
path: root/synapse/storage/event_push_actions.py
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-10-21 12:56:42 +0100
committerErik Johnston <erik@matrix.org>2019-10-21 16:05:06 +0100
commitc66a06ac6b69b0a03f5c6284ded980399e9df94e (patch)
tree01dfd3b9098a9ace759403744d122c18efbd97ff /synapse/storage/event_push_actions.py
parentMerge branch 'master' into develop (diff)
downloadsynapse-c66a06ac6b69b0a03f5c6284ded980399e9df94e.tar.xz
Move storage classes into a main "data store".
This is in preparation for having multiple data stores that offer
different functionality, e.g. splitting out state or event storage.
Diffstat (limited to 'synapse/storage/event_push_actions.py')
-rw-r--r--synapse/storage/event_push_actions.py960
1 files changed, 0 insertions, 960 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
deleted file mode 100644
index 22025effbc..0000000000
--- a/synapse/storage/event_push_actions.py
+++ /dev/null
@@ -1,960 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-
-from six import iteritems
-
-from canonicaljson import json
-
-from twisted.internet import defer
-
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage._base import LoggingTransaction, SQLBaseStore
-from synapse.util.caches.descriptors import cachedInlineCallbacks
-
-logger = logging.getLogger(__name__)
-
-
-DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}]
-DEFAULT_HIGHLIGHT_ACTION = [
-    "notify",
-    {"set_tweak": "sound", "value": "default"},
-    {"set_tweak": "highlight"},
-]
-
-
-def _serialize_action(actions, is_highlight):
-    """Custom serializer for actions. This allows us to "compress" common actions.
-
-    We use the fact that most users have the same actions for notifs (and for
-    highlights).
-    We store these default actions as the empty string rather than the full JSON.
-    Since the empty string isn't valid JSON there is no risk of this clashing with
-    any real JSON actions
-    """
-    if is_highlight:
-        if actions == DEFAULT_HIGHLIGHT_ACTION:
-            return ""  # We use empty string as the column is non-NULL
-    else:
-        if actions == DEFAULT_NOTIF_ACTION:
-            return ""
-    return json.dumps(actions)
-
-
-def _deserialize_action(actions, is_highlight):
-    """Custom deserializer for actions. This allows us to "compress" common actions
-    """
-    if actions:
-        return json.loads(actions)
-
-    if is_highlight:
-        return DEFAULT_HIGHLIGHT_ACTION
-    else:
-        return DEFAULT_NOTIF_ACTION
-
-
-class EventPushActionsWorkerStore(SQLBaseStore):
-    def __init__(self, db_conn, hs):
-        super(EventPushActionsWorkerStore, self).__init__(db_conn, hs)
-
-        # These get correctly set by _find_stream_orderings_for_times_txn
-        self.stream_ordering_month_ago = None
-        self.stream_ordering_day_ago = None
-
-        cur = LoggingTransaction(
-            db_conn.cursor(),
-            name="_find_stream_orderings_for_times_txn",
-            database_engine=self.database_engine,
-        )
-        self._find_stream_orderings_for_times_txn(cur)
-        cur.close()
-
-        self.find_stream_orderings_looping_call = self._clock.looping_call(
-            self._find_stream_orderings_for_times, 10 * 60 * 1000
-        )
-        self._rotate_delay = 3
-        self._rotate_count = 10000
-
-    @cachedInlineCallbacks(num_args=3, tree=True, max_entries=5000)
-    def get_unread_event_push_actions_by_room_for_user(
-        self, room_id, user_id, last_read_event_id
-    ):
-        ret = yield self.runInteraction(
-            "get_unread_event_push_actions_by_room",
-            self._get_unread_counts_by_receipt_txn,
-            room_id,
-            user_id,
-            last_read_event_id,
-        )
-        return ret
-
-    def _get_unread_counts_by_receipt_txn(
-        self, txn, room_id, user_id, last_read_event_id
-    ):
-        sql = (
-            "SELECT stream_ordering"
-            " FROM events"
-            " WHERE room_id = ? AND event_id = ?"
-        )
-        txn.execute(sql, (room_id, last_read_event_id))
-        results = txn.fetchall()
-        if len(results) == 0:
-            return {"notify_count": 0, "highlight_count": 0}
-
-        stream_ordering = results[0][0]
-
-        return self._get_unread_counts_by_pos_txn(
-            txn, room_id, user_id, stream_ordering
-        )
-
-    def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
-
-        # First get number of notifications.
-        # We don't need to put a notif=1 clause as all rows always have
-        # notif=1
-        sql = (
-            "SELECT count(*)"
-            " FROM event_push_actions ea"
-            " WHERE"
-            " user_id = ?"
-            " AND room_id = ?"
-            " AND stream_ordering > ?"
-        )
-
-        txn.execute(sql, (user_id, room_id, stream_ordering))
-        row = txn.fetchone()
-        notify_count = row[0] if row else 0
-
-        txn.execute(
-            """
-            SELECT notif_count FROM event_push_summary
-            WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
-        """,
-            (room_id, user_id, stream_ordering),
-        )
-        rows = txn.fetchall()
-        if rows:
-            notify_count += rows[0][0]
-
-        # Now get the number of highlights
-        sql = (
-            "SELECT count(*)"
-            " FROM event_push_actions ea"
-            " WHERE"
-            " highlight = 1"
-            " AND user_id = ?"
-            " AND room_id = ?"
-            " AND stream_ordering > ?"
-        )
-
-        txn.execute(sql, (user_id, room_id, stream_ordering))
-        row = txn.fetchone()
-        highlight_count = row[0] if row else 0
-
-        return {"notify_count": notify_count, "highlight_count": highlight_count}
-
-    @defer.inlineCallbacks
-    def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
-        def f(txn):
-            sql = (
-                "SELECT DISTINCT(user_id) FROM event_push_actions WHERE"
-                " stream_ordering >= ? AND stream_ordering <= ?"
-            )
-            txn.execute(sql, (min_stream_ordering, max_stream_ordering))
-            return [r[0] for r in txn]
-
-        ret = yield self.runInteraction("get_push_action_users_in_range", f)
-        return ret
-
-    @defer.inlineCallbacks
-    def get_unread_push_actions_for_user_in_range_for_http(
-        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
-    ):
-        """Get a list of the most recent unread push actions for a given user,
-        within the given stream ordering range. Called by the httppusher.
-
-        Args:
-            user_id (str): The user to fetch push actions for.
-            min_stream_ordering(int): The exclusive lower bound on the
-                stream ordering of event push actions to fetch.
-            max_stream_ordering(int): The inclusive upper bound on the
-                stream ordering of event push actions to fetch.
-            limit (int): The maximum number of rows to return.
-        Returns:
-            A promise which resolves to a list of dicts with the keys "event_id",
-            "room_id", "stream_ordering", "actions".
-            The list will be ordered by ascending stream_ordering.
-            The list will have between 0~limit entries.
-        """
-        # find rooms that have a read receipt in them and return the next
-        # push actions
-        def get_after_receipt(txn):
-            # find rooms that have a read receipt in them and return the next
-            # push actions
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "   ep.highlight "
-                " FROM ("
-                "   SELECT room_id,"
-                "       MAX(stream_ordering) as stream_ordering"
-                "   FROM events"
-                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
-                "   WHERE receipt_type = 'm.read' AND user_id = ?"
-                "   GROUP BY room_id"
-                ") AS rl,"
-                " event_push_actions AS ep"
-                " WHERE"
-                "   ep.room_id = rl.room_id"
-                "   AND ep.stream_ordering > rl.stream_ordering"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                " ORDER BY ep.stream_ordering ASC LIMIT ?"
-            )
-            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
-            txn.execute(sql, args)
-            return txn.fetchall()
-
-        after_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range_http_arr", get_after_receipt
-        )
-
-        # There are rooms with push actions in them but you don't have a read receipt in
-        # them e.g. rooms you've been invited to, so get push actions for rooms which do
-        # not have read receipts in them too.
-        def get_no_receipt(txn):
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "   ep.highlight "
-                " FROM event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
-                " WHERE"
-                "   ep.room_id NOT IN ("
-                "     SELECT room_id FROM receipts_linearized"
-                "       WHERE receipt_type = 'm.read' AND user_id = ?"
-                "       GROUP BY room_id"
-                "   )"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                " ORDER BY ep.stream_ordering ASC LIMIT ?"
-            )
-            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
-            txn.execute(sql, args)
-            return txn.fetchall()
-
-        no_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range_http_nrr", get_no_receipt
-        )
-
-        notifs = [
-            {
-                "event_id": row[0],
-                "room_id": row[1],
-                "stream_ordering": row[2],
-                "actions": _deserialize_action(row[3], row[4]),
-            }
-            for row in after_read_receipt + no_read_receipt
-        ]
-
-        # Now sort it so it's ordered correctly, since currently it will
-        # contain results from the first query, correctly ordered, followed
-        # by results from the second query, but we want them all ordered
-        # by stream_ordering, oldest first.
-        notifs.sort(key=lambda r: r["stream_ordering"])
-
-        # Take only up to the limit. We have to stop at the limit because
-        # one of the subqueries may have hit the limit.
-        return notifs[:limit]
-
-    @defer.inlineCallbacks
-    def get_unread_push_actions_for_user_in_range_for_email(
-        self, user_id, min_stream_ordering, max_stream_ordering, limit=20
-    ):
-        """Get a list of the most recent unread push actions for a given user,
-        within the given stream ordering range. Called by the emailpusher
-
-        Args:
-            user_id (str): The user to fetch push actions for.
-            min_stream_ordering(int): The exclusive lower bound on the
-                stream ordering of event push actions to fetch.
-            max_stream_ordering(int): The inclusive upper bound on the
-                stream ordering of event push actions to fetch.
-            limit (int): The maximum number of rows to return.
-        Returns:
-            A promise which resolves to a list of dicts with the keys "event_id",
-            "room_id", "stream_ordering", "actions", "received_ts".
-            The list will be ordered by descending received_ts.
-            The list will have between 0~limit entries.
-        """
-        # find rooms that have a read receipt in them and return the most recent
-        # push actions
-        def get_after_receipt(txn):
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "  ep.highlight, e.received_ts"
-                " FROM ("
-                "   SELECT room_id,"
-                "       MAX(stream_ordering) as stream_ordering"
-                "   FROM events"
-                "   INNER JOIN receipts_linearized USING (room_id, event_id)"
-                "   WHERE receipt_type = 'm.read' AND user_id = ?"
-                "   GROUP BY room_id"
-                ") AS rl,"
-                " event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
-                " WHERE"
-                "   ep.room_id = rl.room_id"
-                "   AND ep.stream_ordering > rl.stream_ordering"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            )
-            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
-            txn.execute(sql, args)
-            return txn.fetchall()
-
-        after_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range_email_arr", get_after_receipt
-        )
-
-        # There are rooms with push actions in them but you don't have a read receipt in
-        # them e.g. rooms you've been invited to, so get push actions for rooms which do
-        # not have read receipts in them too.
-        def get_no_receipt(txn):
-            sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "   ep.highlight, e.received_ts"
-                " FROM event_push_actions AS ep"
-                " INNER JOIN events AS e USING (room_id, event_id)"
-                " WHERE"
-                "   ep.room_id NOT IN ("
-                "     SELECT room_id FROM receipts_linearized"
-                "       WHERE receipt_type = 'm.read' AND user_id = ?"
-                "       GROUP BY room_id"
-                "   )"
-                "   AND ep.user_id = ?"
-                "   AND ep.stream_ordering > ?"
-                "   AND ep.stream_ordering <= ?"
-                " ORDER BY ep.stream_ordering DESC LIMIT ?"
-            )
-            args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
-            txn.execute(sql, args)
-            return txn.fetchall()
-
-        no_read_receipt = yield self.runInteraction(
-            "get_unread_push_actions_for_user_in_range_email_nrr", get_no_receipt
-        )
-
-        # Make a list of dicts from the two sets of results.
-        notifs = [
-            {
-                "event_id": row[0],
-                "room_id": row[1],
-                "stream_ordering": row[2],
-                "actions": _deserialize_action(row[3], row[4]),
-                "received_ts": row[5],
-            }
-            for row in after_read_receipt + no_read_receipt
-        ]
-
-        # Now sort it so it's ordered correctly, since currently it will
-        # contain results from the first query, correctly ordered, followed
-        # by results from the second query, but we want them all ordered
-        # by received_ts (most recent first)
-        notifs.sort(key=lambda r: -(r["received_ts"] or 0))
-
-        # Now return the first `limit`
-        return notifs[:limit]
-
-    def get_if_maybe_push_in_range_for_user(self, user_id, min_stream_ordering):
-        """A fast check to see if there might be something to push for the
-        user since the given stream ordering. May return false positives.
-
-        Useful to know whether to bother starting a pusher on start up or not.
-
-        Args:
-            user_id (str)
-            min_stream_ordering (int)
-
-        Returns:
-            Deferred[bool]: True if there may be push to process, False if
-            there definitely isn't.
-        """
-
-        def _get_if_maybe_push_in_range_for_user_txn(txn):
-            sql = """
-                SELECT 1 FROM event_push_actions
-                WHERE user_id = ? AND stream_ordering > ?
-                LIMIT 1
-            """
-
-            txn.execute(sql, (user_id, min_stream_ordering))
-            return bool(txn.fetchone())
-
-        return self.runInteraction(
-            "get_if_maybe_push_in_range_for_user",
-            _get_if_maybe_push_in_range_for_user_txn,
-        )
-
-    def add_push_actions_to_staging(self, event_id, user_id_actions):
-        """Add the push actions for the event to the push action staging area.
-
-        Args:
-            event_id (str)
-            user_id_actions (dict[str, list[dict|str])]): A dictionary mapping
-                user_id to list of push actions, where an action can either be
-                a string or dict.
-
-        Returns:
-            Deferred
-        """
-
-        if not user_id_actions:
-            return
-
-        # This is a helper function for generating the necessary tuple that
-        # can be used to inert into the `event_push_actions_staging` table.
-        def _gen_entry(user_id, actions):
-            is_highlight = 1 if _action_has_highlight(actions) else 0
-            return (
-                event_id,  # event_id column
-                user_id,  # user_id column
-                _serialize_action(actions, is_highlight),  # actions column
-                1,  # notif column
-                is_highlight,  # highlight column
-            )
-
-        def _add_push_actions_to_staging_txn(txn):
-            # We don't use _simple_insert_many here to avoid the overhead
-            # of generating lists of dicts.
-
-            sql = """
-                INSERT INTO event_push_actions_staging
-                    (event_id, user_id, actions, notif, highlight)
-                VALUES (?, ?, ?, ?, ?)
-            """
-
-            txn.executemany(
-                sql,
-                (
-                    _gen_entry(user_id, actions)
-                    for user_id, actions in iteritems(user_id_actions)
-                ),
-            )
-
-        return self.runInteraction(
-            "add_push_actions_to_staging", _add_push_actions_to_staging_txn
-        )
-
-    @defer.inlineCallbacks
-    def remove_push_actions_from_staging(self, event_id):
-        """Called if we failed to persist the event to ensure that stale push
-        actions don't build up in the DB
-
-        Args:
-            event_id (str)
-        """
-
-        try:
-            res = yield self._simple_delete(
-                table="event_push_actions_staging",
-                keyvalues={"event_id": event_id},
-                desc="remove_push_actions_from_staging",
-            )
-            return res
-        except Exception:
-            # this method is called from an exception handler, so propagating
-            # another exception here really isn't helpful - there's nothing
-            # the caller can do about it. Just log the exception and move on.
-            logger.exception(
-                "Error removing push actions after event persistence failure"
-            )
-
-    def _find_stream_orderings_for_times(self):
-        return run_as_background_process(
-            "event_push_action_stream_orderings",
-            self.runInteraction,
-            "_find_stream_orderings_for_times",
-            self._find_stream_orderings_for_times_txn,
-        )
-
-    def _find_stream_orderings_for_times_txn(self, txn):
-        logger.info("Searching for stream ordering 1 month ago")
-        self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 month ago: it's %d", self.stream_ordering_month_ago
-        )
-        logger.info("Searching for stream ordering 1 day ago")
-        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
-            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
-        )
-        logger.info(
-            "Found stream ordering 1 day ago: it's %d", self.stream_ordering_day_ago
-        )
-
-    def find_first_stream_ordering_after_ts(self, ts):
-        """Gets the stream ordering corresponding to a given timestamp.
-
-        Specifically, finds the stream_ordering of the first event that was
-        received on or after the timestamp. This is done by a binary search on
-        the events table, since there is no index on received_ts, so is
-        relatively slow.
-
-        Args:
-            ts (int): timestamp in millis
-
-        Returns:
-            Deferred[int]: stream ordering of the first event received on/after
-                the timestamp
-        """
-        return self.runInteraction(
-            "_find_first_stream_ordering_after_ts_txn",
-            self._find_first_stream_ordering_after_ts_txn,
-            ts,
-        )
-
-    @staticmethod
-    def _find_first_stream_ordering_after_ts_txn(txn, ts):
-        """
-        Find the stream_ordering of the first event that was received on or
-        after a given timestamp. This is relatively slow as there is no index
-        on received_ts but we can then use this to delete push actions before
-        this.
-
-        received_ts must necessarily be in the same order as stream_ordering
-        and stream_ordering is indexed, so we manually binary search using
-        stream_ordering
-
-        Args:
-            txn (twisted.enterprise.adbapi.Transaction):
-            ts (int): timestamp to search for
-
-        Returns:
-            int: stream ordering
-        """
-        txn.execute("SELECT MAX(stream_ordering) FROM events")
-        max_stream_ordering = txn.fetchone()[0]
-
-        if max_stream_ordering is None:
-            return 0
-
-        # We want the first stream_ordering in which received_ts is greater
-        # than or equal to ts. Call this point X.
-        #
-        # We maintain the invariants:
-        #
-        #   range_start <= X <= range_end
-        #
-        range_start = 0
-        range_end = max_stream_ordering + 1
-
-        # Given a stream_ordering, look up the timestamp at that
-        # stream_ordering.
-        #
-        # The array may be sparse (we may be missing some stream_orderings).
-        # We treat the gaps as the same as having the same value as the
-        # preceding entry, because we will pick the lowest stream_ordering
-        # which satisfies our requirement of received_ts >= ts.
-        #
-        # For example, if our array of events indexed by stream_ordering is
-        # [10, <none>, 20], we should treat this as being equivalent to
-        # [10, 10, 20].
-        #
-        sql = (
-            "SELECT received_ts FROM events"
-            " WHERE stream_ordering <= ?"
-            " ORDER BY stream_ordering DESC"
-            " LIMIT 1"
-        )
-
-        while range_end - range_start > 0:
-            middle = (range_end + range_start) // 2
-            txn.execute(sql, (middle,))
-            row = txn.fetchone()
-            if row is None:
-                # no rows with stream_ordering<=middle
-                range_start = middle + 1
-                continue
-
-            middle_ts = row[0]
-            if ts > middle_ts:
-                # we got a timestamp lower than the one we were looking for.
-                # definitely need to look higher: X > middle.
-                range_start = middle + 1
-            else:
-                # we got a timestamp higher than (or the same as) the one we
-                # were looking for. We aren't yet sure about the point we
-                # looked up, but we can be sure that X <= middle.
-                range_end = middle
-
-        return range_end
-
-
-class EventPushActionsStore(EventPushActionsWorkerStore):
-    EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
-
-    def __init__(self, db_conn, hs):
-        super(EventPushActionsStore, self).__init__(db_conn, hs)
-
-        self.register_background_index_update(
-            self.EPA_HIGHLIGHT_INDEX,
-            index_name="event_push_actions_u_highlight",
-            table="event_push_actions",
-            columns=["user_id", "stream_ordering"],
-        )
-
-        self.register_background_index_update(
-            "event_push_actions_highlights_index",
-            index_name="event_push_actions_highlights_index",
-            table="event_push_actions",
-            columns=["user_id", "room_id", "topological_ordering", "stream_ordering"],
-            where_clause="highlight=1",
-        )
-
-        self._doing_notif_rotation = False
-        self._rotate_notif_loop = self._clock.looping_call(
-            self._start_rotate_notifs, 30 * 60 * 1000
-        )
-
-    def _set_push_actions_for_event_and_users_txn(
-        self, txn, events_and_contexts, all_events_and_contexts
-    ):
-        """Handles moving push actions from staging table to main
-        event_push_actions table for all events in `events_and_contexts`.
-
-        Also ensures that all events in `all_events_and_contexts` are removed
-        from the push action staging area.
-
-        Args:
-            events_and_contexts (list[(EventBase, EventContext)]): events
-                we are persisting
-            all_events_and_contexts (list[(EventBase, EventContext)]): all
-                events that we were going to persist. This includes events
-                we've already persisted, etc, that wouldn't appear in
-                events_and_context.
-        """
-
-        sql = """
-            INSERT INTO event_push_actions (
-                room_id, event_id, user_id, actions, stream_ordering,
-                topological_ordering, notif, highlight
-            )
-            SELECT ?, event_id, user_id, actions, ?, ?, notif, highlight
-            FROM event_push_actions_staging
-            WHERE event_id = ?
-        """
-
-        if events_and_contexts:
-            txn.executemany(
-                sql,
-                (
-                    (
-                        event.room_id,
-                        event.internal_metadata.stream_ordering,
-                        event.depth,
-                        event.event_id,
-                    )
-                    for event, _ in events_and_contexts
-                ),
-            )
-
-        for event, _ in events_and_contexts:
-            user_ids = self._simple_select_onecol_txn(
-                txn,
-                table="event_push_actions_staging",
-                keyvalues={"event_id": event.event_id},
-                retcol="user_id",
-            )
-
-            for uid in user_ids:
-                txn.call_after(
-                    self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-                    (event.room_id, uid),
-                )
-
-        # Now we delete the staging area for *all* events that were being
-        # persisted.
-        txn.executemany(
-            "DELETE FROM event_push_actions_staging WHERE event_id = ?",
-            ((event.event_id,) for event, _ in all_events_and_contexts),
-        )
-
-    @defer.inlineCallbacks
-    def get_push_actions_for_user(
-        self, user_id, before=None, limit=50, only_highlight=False
-    ):
-        def f(txn):
-            before_clause = ""
-            if before:
-                before_clause = "AND epa.stream_ordering < ?"
-                args = [user_id, before, limit]
-            else:
-                args = [user_id, limit]
-
-            if only_highlight:
-                if len(before_clause) > 0:
-                    before_clause += " "
-                before_clause += "AND epa.highlight = 1"
-
-            # NB. This assumes event_ids are globally unique since
-            # it makes the query easier to index
-            sql = (
-                "SELECT epa.event_id, epa.room_id,"
-                " epa.stream_ordering, epa.topological_ordering,"
-                " epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
-                " FROM event_push_actions epa, events e"
-                " WHERE epa.event_id = e.event_id"
-                " AND epa.user_id = ? %s"
-                " ORDER BY epa.stream_ordering DESC"
-                " LIMIT ?" % (before_clause,)
-            )
-            txn.execute(sql, args)
-            return self.cursor_to_dict(txn)
-
-        push_actions = yield self.runInteraction("get_push_actions_for_user", f)
-        for pa in push_actions:
-            pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
-        return push_actions
-
-    @defer.inlineCallbacks
-    def get_time_of_last_push_action_before(self, stream_ordering):
-        def f(txn):
-            sql = (
-                "SELECT e.received_ts"
-                " FROM event_push_actions AS ep"
-                " JOIN events e ON ep.room_id = e.room_id AND ep.event_id = e.event_id"
-                " WHERE ep.stream_ordering > ?"
-                " ORDER BY ep.stream_ordering ASC"
-                " LIMIT 1"
-            )
-            txn.execute(sql, (stream_ordering,))
-            return txn.fetchone()
-
-        result = yield self.runInteraction("get_time_of_last_push_action_before", f)
-        return result[0] if result else None
-
-    @defer.inlineCallbacks
-    def get_latest_push_action_stream_ordering(self):
-        def f(txn):
-            txn.execute("SELECT MAX(stream_ordering) FROM event_push_actions")
-            return txn.fetchone()
-
-        result = yield self.runInteraction("get_latest_push_action_stream_ordering", f)
-        return result[0] or 0
-
-    def _remove_push_actions_for_event_id_txn(self, txn, room_id, event_id):
-        # Sad that we have to blow away the cache for the whole room here
-        txn.call_after(
-            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-            (room_id,),
-        )
-        txn.execute(
-            "DELETE FROM event_push_actions WHERE room_id = ? AND event_id = ?",
-            (room_id, event_id),
-        )
-
-    def _remove_old_push_actions_before_txn(
-        self, txn, room_id, user_id, stream_ordering
-    ):
-        """
-        Purges old push actions for a user and room before a given
-        stream_ordering.
-
-        We however keep a months worth of highlighted notifications, so that
-        users can still get a list of recent highlights.
-
-        Args:
-            txn: The transcation
-            room_id: Room ID to delete from
-            user_id: user ID to delete for
-            stream_ordering: The lowest stream ordering which will
-                                  not be deleted.
-        """
-        txn.call_after(
-            self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
-            (room_id, user_id),
-        )
-
-        # We need to join on the events table to get the received_ts for
-        # event_push_actions and sqlite won't let us use a join in a delete so
-        # we can't just delete where received_ts < x. Furthermore we can
-        # only identify event_push_actions by a tuple of room_id, event_id
-        # we we can't use a subquery.
-        # Instead, we look up the stream ordering for the last event in that
-        # room received before the threshold time and delete event_push_actions
-        # in the room with a stream_odering before that.
-        txn.execute(
-            "DELETE FROM event_push_actions "
-            " WHERE user_id = ? AND room_id = ? AND "
-            " stream_ordering <= ?"
-            " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
-            (user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
-        )
-
-        txn.execute(
-            """
-            DELETE FROM event_push_summary
-            WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
-        """,
-            (room_id, user_id, stream_ordering),
-        )
-
-    def _start_rotate_notifs(self):
-        return run_as_background_process("rotate_notifs", self._rotate_notifs)
-
-    @defer.inlineCallbacks
-    def _rotate_notifs(self):
-        if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
-            return
-        self._doing_notif_rotation = True
-
-        try:
-            while True:
-                logger.info("Rotating notifications")
-
-                caught_up = yield self.runInteraction(
-                    "_rotate_notifs", self._rotate_notifs_txn
-                )
-                if caught_up:
-                    break
-                yield self.hs.get_clock().sleep(self._rotate_delay)
-        finally:
-            self._doing_notif_rotation = False
-
-    def _rotate_notifs_txn(self, txn):
-        """Archives older notifications into event_push_summary. Returns whether
-        the archiving process has caught up or not.
-        """
-
-        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
-            txn,
-            table="event_push_summary_stream_ordering",
-            keyvalues={},
-            retcol="stream_ordering",
-        )
-
-        # We don't to try and rotate millions of rows at once, so we cap the
-        # maximum stream ordering we'll rotate before.
-        txn.execute(
-            """
-            SELECT stream_ordering FROM event_push_actions
-            WHERE stream_ordering > ?
-            ORDER BY stream_ordering ASC LIMIT 1 OFFSET ?
-        """,
-            (old_rotate_stream_ordering, self._rotate_count),
-        )
-        stream_row = txn.fetchone()
-        if stream_row:
-            offset_stream_ordering, = stream_row
-            rotate_to_stream_ordering = min(
-                self.stream_ordering_day_ago, offset_stream_ordering
-            )
-            caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
-        else:
-            rotate_to_stream_ordering = self.stream_ordering_day_ago
-            caught_up = True
-
-        logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering)
-
-        self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
-
-        # We have caught up iff we were limited by `stream_ordering_day_ago`
-        return caught_up
-
-    def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
-        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
-            txn,
-            table="event_push_summary_stream_ordering",
-            keyvalues={},
-            retcol="stream_ordering",
-        )
-
-        # Calculate the new counts that should be upserted into event_push_summary
-        sql = """
-            SELECT user_id, room_id,
-                coalesce(old.notif_count, 0) + upd.notif_count,
-                upd.stream_ordering,
-                old.user_id
-            FROM (
-                SELECT user_id, room_id, count(*) as notif_count,
-                    max(stream_ordering) as stream_ordering
-                FROM event_push_actions
-                WHERE ? <= stream_ordering AND stream_ordering < ?
-                    AND highlight = 0
-                GROUP BY user_id, room_id
-            ) AS upd
-            LEFT JOIN event_push_summary AS old USING (user_id, room_id)
-        """
-
-        txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering))
-        rows = txn.fetchall()
-
-        logger.info("Rotating notifications, handling %d rows", len(rows))
-
-        # If the `old.user_id` above is NULL then we know there isn't already an
-        # entry in the table, so we simply insert it. Otherwise we update the
-        # existing table.
-        self._simple_insert_many_txn(
-            txn,
-            table="event_push_summary",
-            values=[
-                {
-                    "user_id": row[0],
-                    "room_id": row[1],
-                    "notif_count": row[2],
-                    "stream_ordering": row[3],
-                }
-                for row in rows
-                if row[4] is None
-            ],
-        )
-
-        txn.executemany(
-            """
-                UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
-                WHERE user_id = ? AND room_id = ?
-            """,
-            ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None),
-        )
-
-        txn.execute(
-            "DELETE FROM event_push_actions"
-            " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
-            (old_rotate_stream_ordering, rotate_to_stream_ordering),
-        )
-
-        logger.info("Rotating notifications, deleted %s push actions", txn.rowcount)
-
-        txn.execute(
-            "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
-            (rotate_to_stream_ordering,),
-        )
-
-
-def _action_has_highlight(actions):
-    for action in actions:
-        try:
-            if action.get("set_tweak", None) == "highlight":
-                return action.get("value", True)
-        except AttributeError:
-            pass
-
-    return False