summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/device.py38
-rw-r--r--synapse/replication/slave/storage/events.py6
-rw-r--r--synapse/storage/event_federation.py17
-rw-r--r--synapse/storage/event_push_actions.py328
-rw-r--r--synapse/storage/receipts.py1
-rw-r--r--synapse/storage/schema/delta/40/event_push_summary.sql37
-rw-r--r--synapse/storage/state.py14
-rw-r--r--tests/storage/test_event_push_actions.py86
8 files changed, 448 insertions, 79 deletions
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 8cb47ac417..ca7137f315 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -12,7 +12,6 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from synapse.api import errors
 from synapse.api.constants import EventTypes
 from synapse.util import stringutils
@@ -246,30 +245,51 @@ class DeviceHandler(BaseHandler):
         # Then work out if any users have since joined
         rooms_changed = self.store.get_rooms_that_changed(room_ids, from_token.room_key)
 
+        stream_ordering = RoomStreamToken.parse_stream_token(
+            from_token.room_key).stream
+
         possibly_changed = set(changed)
         for room_id in rooms_changed:
-            # Fetch  the current state at the time.
-            stream_ordering = RoomStreamToken.parse_stream_token(from_token.room_key)
-
+            # Fetch the current state at the time.
             try:
                 event_ids = yield self.store.get_forward_extremeties_for_room(
                     room_id, stream_ordering=stream_ordering
                 )
-                prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
-            except:
-                prev_state_ids = {}
+            except errors.StoreError:
+                # we have purged the stream_ordering index since the stream
+                # ordering: treat it the same as a new room
+                event_ids = []
 
             current_state_ids = yield self.state.get_current_state_ids(room_id)
 
+            # special-case for an empty prev state: include all members
+            # in the changed list
+            if not event_ids:
+                for key, event_id in current_state_ids.iteritems():
+                    etype, state_key = key
+                    if etype != EventTypes.Member:
+                        continue
+                    possibly_changed.add(state_key)
+                continue
+
+            # mapping from event_id -> state_dict
+            prev_state_ids = yield self.store.get_state_ids_for_events(event_ids)
+
             # If there has been any change in membership, include them in the
             # possibly changed list. We'll check if they are joined below,
             # and we're not toooo worried about spuriously adding users.
             for key, event_id in current_state_ids.iteritems():
                 etype, state_key = key
-                if etype == EventTypes.Member:
-                    prev_event_id = prev_state_ids.get(key, None)
+                if etype != EventTypes.Member:
+                    continue
+
+                # check if this member has changed since any of the extremities
+                # at the stream_ordering, and add them to the list if so.
+                for state_dict in prev_state_ids.values():
+                    prev_event_id = state_dict.get(key, None)
                     if not prev_event_id or prev_event_id != event_id:
                         possibly_changed.add(state_key)
+                        break
 
         users_who_share_room = yield self.store.get_users_who_share_room_with_user(
             user_id
diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py
index d72ff6055c..622b2d8540 100644
--- a/synapse/replication/slave/storage/events.py
+++ b/synapse/replication/slave/storage/events.py
@@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore):
     get_unread_event_push_actions_by_room_for_user = (
         EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"]
     )
+    _get_unread_counts_by_receipt_txn = (
+        DataStore._get_unread_counts_by_receipt_txn.__func__
+    )
+    _get_unread_counts_by_pos_txn = (
+        DataStore._get_unread_counts_by_pos_txn.__func__
+    )
     _get_state_group_for_events = (
         StateStore.__dict__["_get_state_group_for_events"]
     )
diff --git a/synapse/storage/event_federation.py b/synapse/storage/event_federation.py
index ee88c61954..256e50dc20 100644
--- a/synapse/storage/event_federation.py
+++ b/synapse/storage/event_federation.py
@@ -281,15 +281,30 @@ class EventFederationStore(SQLBaseStore):
         )
 
     def get_forward_extremeties_for_room(self, room_id, stream_ordering):
+        """For a given room_id and stream_ordering, return the forward
+        extremeties of the room at that point in "time".
+
+        Throws a StoreError if we have since purged the index for
+        stream_orderings from that point.
+
+        Args:
+            room_id (str):
+            stream_ordering (int):
+
+        Returns:
+            deferred, which resolves to a list of event_ids
+        """
         # We want to make the cache more effective, so we clamp to the last
         # change before the given ordering.
         last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)
 
         # We don't always have a full stream_to_exterm_id table, e.g. after
         # the upgrade that introduced it, so we make sure we never ask for a
-        # try and pin to a stream_ordering from before a restart
+        # stream_ordering from before a restart
         last_change = max(self._stream_order_on_start, last_change)
 
+        # provided the last_change is recent enough, we now clamp the requested
+        # stream_ordering to it.
         if last_change > self.stream_ordering_month_ago:
             stream_ordering = min(last_change, stream_ordering)
 
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 522d0114cb..808c9d22fc 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -15,6 +15,7 @@
 
 from ._base import SQLBaseStore
 from twisted.internet import defer
+from synapse.util.async import sleep
 from synapse.util.caches.descriptors import cachedInlineCallbacks
 from synapse.types import RoomStreamToken
 from .stream import lower_bound
@@ -25,11 +26,46 @@ import ujson as json
 logger = logging.getLogger(__name__)
 
 
+DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}]
+DEFAULT_HIGHLIGHT_ACTION = [
+    "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
+]
+
+
+def _serialize_action(actions, is_highlight):
+    """Custom serializer for actions. This allows us to "compress" common actions.
+
+    We use the fact that most users have the same actions for notifs (and for
+    highlights).
+    We store these default actions as the empty string rather than the full JSON.
+    Since the empty string isn't valid JSON there is no risk of this clashing with
+    any real JSON actions
+    """
+    if is_highlight:
+        if actions == DEFAULT_HIGHLIGHT_ACTION:
+            return ""  # We use empty string as the column is non-NULL
+    else:
+        if actions == DEFAULT_NOTIF_ACTION:
+            return ""
+    return json.dumps(actions)
+
+
+def _deserialize_action(actions, is_highlight):
+    """Custom deserializer for actions. This allows us to "compress" common actions
+    """
+    if actions:
+        return json.loads(actions)
+
+    if is_highlight:
+        return DEFAULT_HIGHLIGHT_ACTION
+    else:
+        return DEFAULT_NOTIF_ACTION
+
+
 class EventPushActionsStore(SQLBaseStore):
     EPA_HIGHLIGHT_INDEX = "epa_highlight_index"
 
     def __init__(self, hs):
-        self.stream_ordering_month_ago = None
         super(EventPushActionsStore, self).__init__(hs)
 
         self.register_background_index_update(
@@ -47,6 +83,9 @@ class EventPushActionsStore(SQLBaseStore):
             where_clause="highlight=1"
         )
 
+        self._doing_notif_rotation = False
+        self._clock.looping_call(self._rotate_notifs, 30 * 60 * 1000)
+
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
         """
         Args:
@@ -55,15 +94,17 @@ class EventPushActionsStore(SQLBaseStore):
         """
         values = []
         for uid, actions in tuples:
+            is_highlight = 1 if _action_has_highlight(actions) else 0
+
             values.append({
                 'room_id': event.room_id,
                 'event_id': event.event_id,
                 'user_id': uid,
-                'actions': json.dumps(actions),
+                'actions': _serialize_action(actions, is_highlight),
                 'stream_ordering': event.internal_metadata.stream_ordering,
                 'topological_ordering': event.depth,
                 'notif': 1,
-                'highlight': 1 if _action_has_highlight(actions) else 0,
+                'highlight': is_highlight,
             })
 
         for uid, __ in tuples:
@@ -77,66 +118,89 @@ class EventPushActionsStore(SQLBaseStore):
     def get_unread_event_push_actions_by_room_for_user(
             self, room_id, user_id, last_read_event_id
     ):
-        def _get_unread_event_push_actions_by_room(txn):
-            sql = (
-                "SELECT stream_ordering, topological_ordering"
-                " FROM events"
-                " WHERE room_id = ? AND event_id = ?"
-            )
-            txn.execute(
-                sql, (room_id, last_read_event_id)
-            )
-            results = txn.fetchall()
-            if len(results) == 0:
-                return {"notify_count": 0, "highlight_count": 0}
-
-            stream_ordering = results[0][0]
-            topological_ordering = results[0][1]
-            token = RoomStreamToken(
-                topological_ordering, stream_ordering
-            )
-
-            # First get number of notifications.
-            # We don't need to put a notif=1 clause as all rows always have
-            # notif=1
-            sql = (
-                "SELECT count(*)"
-                " FROM event_push_actions ea"
-                " WHERE"
-                " user_id = ?"
-                " AND room_id = ?"
-                " AND %s"
-            ) % (lower_bound(token, self.database_engine, inclusive=False),)
+        ret = yield self.runInteraction(
+            "get_unread_event_push_actions_by_room",
+            self._get_unread_counts_by_receipt_txn,
+            room_id, user_id, last_read_event_id
+        )
+        defer.returnValue(ret)
 
-            txn.execute(sql, (user_id, room_id))
-            row = txn.fetchone()
-            notify_count = row[0] if row else 0
+    def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id,
+                                          last_read_event_id):
+        sql = (
+            "SELECT stream_ordering, topological_ordering"
+            " FROM events"
+            " WHERE room_id = ? AND event_id = ?"
+        )
+        txn.execute(
+            sql, (room_id, last_read_event_id)
+        )
+        results = txn.fetchall()
+        if len(results) == 0:
+            return {"notify_count": 0, "highlight_count": 0}
 
-            # Now get the number of highlights
-            sql = (
-                "SELECT count(*)"
-                " FROM event_push_actions ea"
-                " WHERE"
-                " highlight = 1"
-                " AND user_id = ?"
-                " AND room_id = ?"
-                " AND %s"
-            ) % (lower_bound(token, self.database_engine, inclusive=False),)
+        stream_ordering = results[0][0]
+        topological_ordering = results[0][1]
 
-            txn.execute(sql, (user_id, room_id))
-            row = txn.fetchone()
-            highlight_count = row[0] if row else 0
+        return self._get_unread_counts_by_pos_txn(
+            txn, room_id, user_id, topological_ordering, stream_ordering
+        )
 
-            return {
-                "notify_count": notify_count,
-                "highlight_count": highlight_count,
-            }
+    def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering,
+                                      stream_ordering):
+        token = RoomStreamToken(
+            topological_ordering, stream_ordering
+        )
 
-        ret = yield self.runInteraction(
-            "get_unread_event_push_actions_by_room",
-            _get_unread_event_push_actions_by_room
+        # First get number of notifications.
+        # We don't need to put a notif=1 clause as all rows always have
+        # notif=1
+        sql = (
+            "SELECT count(*)"
+            " FROM event_push_actions ea"
+            " WHERE"
+            " user_id = ?"
+            " AND room_id = ?"
+            " AND %s"
+        ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+        txn.execute(sql, (user_id, room_id))
+        row = txn.fetchone()
+        notify_count = row[0] if row else 0
+
+        summary_notif_count = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_push_summary",
+            keyvalues={
+                "user_id": user_id,
+                "room_id": room_id,
+            },
+            retcol="notif_count",
+            allow_none=True,
         )
-        defer.returnValue(ret)
+
+        if summary_notif_count:
+            notify_count += summary_notif_count
+
+        # Now get the number of highlights
+        sql = (
+            "SELECT count(*)"
+            " FROM event_push_actions ea"
+            " WHERE"
+            " highlight = 1"
+            " AND user_id = ?"
+            " AND room_id = ?"
+            " AND %s"
+        ) % (lower_bound(token, self.database_engine, inclusive=False),)
+
+        txn.execute(sql, (user_id, room_id))
+        row = txn.fetchone()
+        highlight_count = row[0] if row else 0
+
+        return {
+            "notify_count": notify_count,
+            "highlight_count": highlight_count,
+        }
 
     @defer.inlineCallbacks
     def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
@@ -176,7 +240,8 @@ class EventPushActionsStore(SQLBaseStore):
             # find rooms that have a read receipt in them and return the next
             # push actions
             sql = (
-                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions"
+                "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
+                "   ep.highlight "
                 " FROM ("
                 "   SELECT room_id,"
                 "       MAX(topological_ordering) as topological_ordering,"
@@ -217,7 +282,7 @@ class EventPushActionsStore(SQLBaseStore):
         def get_no_receipt(txn):
             sql = (
                 "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                " e.received_ts"
+                "   ep.highlight "
                 " FROM event_push_actions AS ep"
                 " INNER JOIN events AS e USING (room_id, event_id)"
                 " WHERE"
@@ -246,7 +311,7 @@ class EventPushActionsStore(SQLBaseStore):
                 "event_id": row[0],
                 "room_id": row[1],
                 "stream_ordering": row[2],
-                "actions": json.loads(row[3]),
+                "actions": _deserialize_action(row[3], row[4]),
             } for row in after_read_receipt + no_read_receipt
         ]
 
@@ -285,7 +350,7 @@ class EventPushActionsStore(SQLBaseStore):
         def get_after_receipt(txn):
             sql = (
                 "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                "  e.received_ts"
+                "  ep.highlight, e.received_ts"
                 " FROM ("
                 "   SELECT room_id,"
                 "       MAX(topological_ordering) as topological_ordering,"
@@ -327,7 +392,7 @@ class EventPushActionsStore(SQLBaseStore):
         def get_no_receipt(txn):
             sql = (
                 "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions,"
-                " e.received_ts"
+                "   ep.highlight, e.received_ts"
                 " FROM event_push_actions AS ep"
                 " INNER JOIN events AS e USING (room_id, event_id)"
                 " WHERE"
@@ -357,8 +422,8 @@ class EventPushActionsStore(SQLBaseStore):
                 "event_id": row[0],
                 "room_id": row[1],
                 "stream_ordering": row[2],
-                "actions": json.loads(row[3]),
-                "received_ts": row[4],
+                "actions": _deserialize_action(row[3], row[4]),
+                "received_ts": row[5],
             } for row in after_read_receipt + no_read_receipt
         ]
 
@@ -392,7 +457,7 @@ class EventPushActionsStore(SQLBaseStore):
             sql = (
                 "SELECT epa.event_id, epa.room_id,"
                 " epa.stream_ordering, epa.topological_ordering,"
-                " epa.actions, epa.profile_tag, e.received_ts"
+                " epa.actions, epa.highlight, epa.profile_tag, e.received_ts"
                 " FROM event_push_actions epa, events e"
                 " WHERE epa.event_id = e.event_id"
                 " AND epa.user_id = ? %s"
@@ -407,7 +472,7 @@ class EventPushActionsStore(SQLBaseStore):
             "get_push_actions_for_user", f
         )
         for pa in push_actions:
-            pa["actions"] = json.loads(pa["actions"])
+            pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"])
         defer.returnValue(push_actions)
 
     @defer.inlineCallbacks
@@ -448,7 +513,7 @@ class EventPushActionsStore(SQLBaseStore):
         )
 
     def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
-                                            topological_ordering):
+                                            topological_ordering, stream_ordering):
         """
         Purges old push actions for a user and room before a given
         topological_ordering.
@@ -479,11 +544,16 @@ class EventPushActionsStore(SQLBaseStore):
         txn.execute(
             "DELETE FROM event_push_actions "
             " WHERE user_id = ? AND room_id = ? AND "
-            " topological_ordering < ?"
+            " topological_ordering <= ?"
             " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
             (user_id, room_id, topological_ordering, self.stream_ordering_month_ago)
         )
 
+        txn.execute("""
+            DELETE FROM event_push_summary
+            WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
+        """, (room_id, user_id, stream_ordering))
+
     @defer.inlineCallbacks
     def _find_stream_orderings_for_times(self):
         yield self.runInteraction(
@@ -500,6 +570,14 @@ class EventPushActionsStore(SQLBaseStore):
             "Found stream ordering 1 month ago: it's %d",
             self.stream_ordering_month_ago
         )
+        logger.info("Searching for stream ordering 1 day ago")
+        self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn(
+            txn, self._clock.time_msec() - 24 * 60 * 60 * 1000
+        )
+        logger.info(
+            "Found stream ordering 1 day ago: it's %d",
+            self.stream_ordering_day_ago
+        )
 
     def _find_first_stream_ordering_after_ts_txn(self, txn, ts):
         """
@@ -539,6 +617,120 @@ class EventPushActionsStore(SQLBaseStore):
 
         return range_end
 
+    @defer.inlineCallbacks
+    def _rotate_notifs(self):
+        if self._doing_notif_rotation or self.stream_ordering_day_ago is None:
+            return
+        self._doing_notif_rotation = True
+
+        try:
+            while True:
+                logger.info("Rotating notifications")
+
+                caught_up = yield self.runInteraction(
+                    "_rotate_notifs",
+                    self._rotate_notifs_txn
+                )
+                if caught_up:
+                    break
+                yield sleep(5)
+        finally:
+            self._doing_notif_rotation = False
+
+    def _rotate_notifs_txn(self, txn):
+        """Archives older notifications into event_push_summary. Returns whether
+        the archiving process has caught up or not.
+        """
+
+        # We want to make sure that we only ever do this one at a time
+        self.database_engine.lock_table(txn, "event_push_summary")
+
+        # We don't to try and rotate millions of rows at once, so we cap the
+        # maximum stream ordering we'll rotate before.
+        txn.execute("""
+            SELECT stream_ordering FROM event_push_actions
+            ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000
+        """)
+        stream_row = txn.fetchone()
+        if stream_row:
+            offset_stream_ordering, = stream_row
+            rotate_to_stream_ordering = min(
+                self.stream_ordering_day_ago, offset_stream_ordering
+            )
+            caught_up = offset_stream_ordering >= self.stream_ordering_day_ago
+        else:
+            rotate_to_stream_ordering = self.stream_ordering_day_ago
+            caught_up = True
+
+        self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering)
+
+        # We have caught up iff we were limited by `stream_ordering_day_ago`
+        return caught_up
+
+    def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering):
+        old_rotate_stream_ordering = self._simple_select_one_onecol_txn(
+            txn,
+            table="event_push_summary_stream_ordering",
+            keyvalues={},
+            retcol="stream_ordering",
+        )
+
+        # Calculate the new counts that should be upserted into event_push_summary
+        sql = """
+            SELECT user_id, room_id,
+                coalesce(old.notif_count, 0) + upd.notif_count,
+                upd.stream_ordering,
+                old.user_id
+            FROM (
+                SELECT user_id, room_id, count(*) as notif_count,
+                    max(stream_ordering) as stream_ordering
+                FROM event_push_actions
+                WHERE ? <= stream_ordering AND stream_ordering < ?
+                    AND highlight = 0
+                GROUP BY user_id, room_id
+            ) AS upd
+            LEFT JOIN event_push_summary AS old USING (user_id, room_id)
+        """
+
+        txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,))
+        rows = txn.fetchall()
+
+        # If the `old.user_id` above is NULL then we know there isn't already an
+        # entry in the table, so we simply insert it. Otherwise we update the
+        # existing table.
+        self._simple_insert_many_txn(
+            txn,
+            table="event_push_summary",
+            values=[
+                {
+                    "user_id": row[0],
+                    "room_id": row[1],
+                    "notif_count": row[2],
+                    "stream_ordering": row[3],
+                }
+                for row in rows if row[4] is None
+            ]
+        )
+
+        txn.executemany(
+            """
+                UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
+                WHERE user_id = ? AND room_id = ?
+            """,
+            ((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None)
+        )
+
+        txn.execute(
+            "DELETE FROM event_push_actions"
+            " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0",
+            (old_rotate_stream_ordering, rotate_to_stream_ordering,)
+        )
+
+        txn.execute(
+            "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?",
+            (rotate_to_stream_ordering,)
+        )
+
 
 def _action_has_highlight(actions):
     for action in actions:
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index f72d15f5ed..5cf41501ea 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore):
                 room_id=room_id,
                 user_id=user_id,
                 topological_ordering=topological_ordering,
+                stream_ordering=stream_ordering,
             )
 
         return True
diff --git a/synapse/storage/schema/delta/40/event_push_summary.sql b/synapse/storage/schema/delta/40/event_push_summary.sql
new file mode 100644
index 0000000000..3918f0b794
--- /dev/null
+++ b/synapse/storage/schema/delta/40/event_push_summary.sql
@@ -0,0 +1,37 @@
+/* Copyright 2017 OpenMarket Ltd
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Aggregate of old notification counts that have been deleted out of the
+-- main event_push_actions table. This count does not include those that were
+-- highlights, as they remain in the event_push_actions table.
+CREATE TABLE event_push_summary (
+    user_id TEXT NOT NULL,
+    room_id TEXT NOT NULL,
+    notif_count BIGINT NOT NULL,
+    stream_ordering BIGINT NOT NULL
+);
+
+CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id);
+
+
+-- The stream ordering up to which we have aggregated the event_push_actions
+-- table into event_push_summary
+CREATE TABLE event_push_summary_stream_ordering (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_ordering BIGINT NOT NULL,
+    CHECK (Lock='X')
+);
+
+INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 1b3800eb6a..84482d8285 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -413,7 +413,19 @@ class StateStore(SQLBaseStore):
         defer.returnValue({event: event_to_state[event] for event in event_ids})
 
     @defer.inlineCallbacks
-    def get_state_ids_for_events(self, event_ids, types):
+    def get_state_ids_for_events(self, event_ids, types=None):
+        """
+        Get the state dicts corresponding to a list of events
+
+        Args:
+            event_ids(list(str)): events whose state should be returned
+            types(list[(str, str)]|None): List of (type, state_key) tuples
+                which are used to filter the state fetched. May be None, which
+                matches any key
+
+        Returns:
+            A deferred dict from event_id -> (type, state_key) -> state_event
+        """
         event_to_groups = yield self._get_state_group_for_events(
             event_ids,
         )
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index e9044afa2e..3135488353 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -17,9 +17,15 @@ from twisted.internet import defer
 
 import tests.unittest
 import tests.utils
+from mock import Mock
 
 USER_ID = "@user:example.com"
 
+PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}]
+HIGHLIGHT = [
+    "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"}
+]
+
 
 class EventPushActionsStoreTestCase(tests.unittest.TestCase):
 
@@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
         yield self.store.get_unread_push_actions_for_user_in_range_for_email(
             USER_ID, 0, 1000, 20
         )
+
+    @defer.inlineCallbacks
+    def test_count_aggregation(self):
+        room_id = "!foo:example.com"
+        user_id = "@user1235:example.com"
+
+        @defer.inlineCallbacks
+        def _assert_counts(noitf_count, highlight_count):
+            counts = yield self.store.runInteraction(
+                "", self.store._get_unread_counts_by_pos_txn,
+                room_id, user_id, 0, 0
+            )
+            self.assertEquals(
+                counts,
+                {"notify_count": noitf_count, "highlight_count": highlight_count}
+            )
+
+        def _inject_actions(stream, action):
+            event = Mock()
+            event.room_id = room_id
+            event.event_id = "$test:example.com"
+            event.internal_metadata.stream_ordering = stream
+            event.depth = stream
+
+            tuples = [(user_id, action)]
+
+            return self.store.runInteraction(
+                "", self.store._set_push_actions_for_event_and_users_txn,
+                event, tuples
+            )
+
+        def _rotate(stream):
+            return self.store.runInteraction(
+                "", self.store._rotate_notifs_before_txn, stream
+            )
+
+        def _mark_read(stream, depth):
+            return self.store.runInteraction(
+                "", self.store._remove_old_push_actions_before_txn,
+                room_id, user_id, depth, stream
+            )
+
+        yield _assert_counts(0, 0)
+        yield _inject_actions(1, PlAIN_NOTIF)
+        yield _assert_counts(1, 0)
+        yield _rotate(2)
+        yield _assert_counts(1, 0)
+
+        yield _inject_actions(3, PlAIN_NOTIF)
+        yield _assert_counts(2, 0)
+        yield _rotate(4)
+        yield _assert_counts(2, 0)
+
+        yield _inject_actions(5, PlAIN_NOTIF)
+        yield _mark_read(3, 3)
+        yield _assert_counts(1, 0)
+
+        yield _mark_read(5, 5)
+        yield _assert_counts(0, 0)
+
+        yield _inject_actions(6, PlAIN_NOTIF)
+        yield _rotate(7)
+
+        yield self.store._simple_delete(
+            table="event_push_actions",
+            keyvalues={"1": 1},
+            desc="",
+        )
+
+        yield _assert_counts(1, 0)
+
+        yield _mark_read(7, 7)
+        yield _assert_counts(0, 0)
+
+        yield _inject_actions(8, HIGHLIGHT)
+        yield _assert_counts(1, 1)
+        yield _rotate(9)
+        yield _assert_counts(1, 1)
+        yield _rotate(10)
+        yield _assert_counts(1, 1)