summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/7673.feature1
-rw-r--r--synapse/handlers/sync.py3
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py7
-rw-r--r--synapse/push/push_tools.py5
-rw-r--r--synapse/rest/client/v1/push_rule.py4
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py131
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql23
-rw-r--r--tests/replication/slave/storage/test_events.py19
-rw-r--r--tests/storage/test_event_push_actions.py45
9 files changed, 187 insertions, 51 deletions
diff --git a/changelog.d/7673.feature b/changelog.d/7673.feature
new file mode 100644
index 0000000000..ecc3ffd8d5
--- /dev/null
+++ b/changelog.d/7673.feature
@@ -0,0 +1 @@
+Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625).
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6bdb24baff..15cf647737 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1895,6 +1895,9 @@ class SyncHandler(object):
                 if notifs is not None:
                     unread_notifications["notification_count"] = notifs["notify_count"]
                     unread_notifications["highlight_count"] = notifs["highlight_count"]
+                    unread_notifications["org.matrix.msc2625.unread_count"] = notifs[
+                        "unread_count"
+                    ]
 
                 sync_result_builder.joined.append(room_sync)
 
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index e75d964ac8..3244d39c37 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -191,8 +191,11 @@ class BulkPushRuleEvaluator(object):
                 )
                 if matches:
                     actions = [x for x in rule["actions"] if x != "dont_notify"]
-                    if actions and "notify" in actions:
-                        # Push rules say we should notify the user of this event
+                    if (
+                        "notify" in actions
+                        or "org.matrix.msc2625.mark_unread" in actions
+                    ):
+                        # Push rules say we should act on this event.
                         actions_by_user[uid] = actions
                     break
 
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 5dae4648c0..9f264ca4a4 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -39,7 +39,10 @@ def get_badge_count(store, user_id):
             )
             # return one badge count per conversation, as count per
             # message is so noisy as to be almost useless
-            badge += 1 if notifs["notify_count"] else 0
+            # We're populating this badge using the unread_count (instead of the
+            # notify_count) as this badge is the number of missed messages, not the
+            # number of missed notifications.
+            badge += 1 if notifs["unread_count"] else 0
     return badge
 
 
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index 9fd4908136..f563b3dc35 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2014-2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -267,7 +267,7 @@ def _check_actions(actions):
         raise InvalidRuleException("No actions found")
 
     for a in actions:
-        if a in ["notify", "dont_notify", "coalesce"]:
+        if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]:
             pass
         elif isinstance(a, dict) and "set_tweak" in a:
             pass
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 0321274de2..b1a2804b34 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -1,6 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2015-2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,9 +14,11 @@
 # limitations under the License.
 
 import logging
+from typing import Dict, Tuple
 
 from six import iteritems
 
+import attr
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -38,6 +39,16 @@ DEFAULT_HIGHLIGHT_ACTION = [
 ]
 
 
+@attr.s
+class EventPushSummary:
+    """Summary of pending event push actions for a given user in a given room."""
+
+    unread_count = attr.ib(type=int)
+    stream_ordering = attr.ib(type=int)
+    old_user_id = attr.ib(type=str)
+    notif_count = attr.ib(type=int)
+
+
 def _serialize_action(actions, is_highlight):
     """Custom serializer for actions. This allows us to "compress" common actions.
 
@@ -124,25 +135,42 @@ class EventPushActionsWorkerStore(SQLBaseStore):
 
     def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering):
 
-        # First get number of notifications.
-        # We don't need to put a notif=1 clause as all rows always have
-        # notif=1
+        # First get number of actions, grouped on whether the action notifies.
         sql = (
-            "SELECT count(*)"
+            "SELECT count(*), notif"
             " FROM event_push_actions ea"
             " WHERE"
             " user_id = ?"
             " AND room_id = ?"
             " AND stream_ordering > ?"
+            " GROUP BY notif"
         )
-
         txn.execute(sql, (user_id, room_id, stream_ordering))
-        row = txn.fetchone()
-        notify_count = row[0] if row else 0
+        rows = txn.fetchall()
+
+        # We should get a maximum number of two rows: one for notif = 0, which is the
+        # number of actions that contribute to the unread_count but not to the
+        # notify_count, and one for notif = 1, which is the number of actions that
+        # contribute to both counters. If one or both rows don't appear, then the
+        # value for the matching counter should be 0.
+        unread_count = 0
+        notify_count = 0
+        for row in rows:
+            # We always increment unread_count because actions that notify also
+            # contribute to it.
+            unread_count += row[0]
+            if row[1] == 1:
+                notify_count = row[0]
+            elif row[1] != 0:
+                logger.warning(
+                    "Unexpected value %d for column 'notif' in table"
+                    " 'event_push_actions'",
+                    row[1],
+                )
 
         txn.execute(
             """
-            SELECT notif_count FROM event_push_summary
+            SELECT notif_count, unread_count FROM event_push_summary
             WHERE room_id = ? AND user_id = ? AND stream_ordering > ?
         """,
             (room_id, user_id, stream_ordering),
@@ -150,6 +178,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         rows = txn.fetchall()
         if rows:
             notify_count += rows[0][0]
+            unread_count += rows[0][1]
 
         # Now get the number of highlights
         sql = (
@@ -166,7 +195,11 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         row = txn.fetchone()
         highlight_count = row[0] if row else 0
 
-        return {"notify_count": notify_count, "highlight_count": highlight_count}
+        return {
+            "unread_count": unread_count,
+            "notify_count": notify_count,
+            "highlight_count": highlight_count,
+        }
 
     @defer.inlineCallbacks
     def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering):
@@ -224,6 +257,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
+                "   AND ep.notif = 1"
                 " ORDER BY ep.stream_ordering ASC LIMIT ?"
             )
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -252,6 +286,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
+                "   AND ep.notif = 1"
                 " ORDER BY ep.stream_ordering ASC LIMIT ?"
             )
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -324,6 +359,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
+                "   AND ep.notif = 1"
                 " ORDER BY ep.stream_ordering DESC LIMIT ?"
             )
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -352,6 +388,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 "   AND ep.user_id = ?"
                 "   AND ep.stream_ordering > ?"
                 "   AND ep.stream_ordering <= ?"
+                "   AND ep.notif = 1"
                 " ORDER BY ep.stream_ordering DESC LIMIT ?"
             )
             args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit]
@@ -401,7 +438,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         def _get_if_maybe_push_in_range_for_user_txn(txn):
             sql = """
                 SELECT 1 FROM event_push_actions
-                WHERE user_id = ? AND stream_ordering > ?
+                WHERE user_id = ? AND stream_ordering > ? AND notif = 1
                 LIMIT 1
             """
 
@@ -430,14 +467,15 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             return
 
         # This is a helper function for generating the necessary tuple that
-        # can be used to inert into the `event_push_actions_staging` table.
+        # can be used to insert into the `event_push_actions_staging` table.
         def _gen_entry(user_id, actions):
             is_highlight = 1 if _action_has_highlight(actions) else 0
+            notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1
             return (
                 event_id,  # event_id column
                 user_id,  # user_id column
                 _serialize_action(actions, is_highlight),  # actions column
-                1,  # notif column
+                notif,  # notif column
                 is_highlight,  # highlight column
             )
 
@@ -819,24 +857,51 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
         # Calculate the new counts that should be upserted into event_push_summary
         sql = """
             SELECT user_id, room_id,
-                coalesce(old.notif_count, 0) + upd.notif_count,
+                coalesce(old.%s, 0) + upd.cnt,
                 upd.stream_ordering,
                 old.user_id
             FROM (
-                SELECT user_id, room_id, count(*) as notif_count,
+                SELECT user_id, room_id, count(*) as cnt,
                     max(stream_ordering) as stream_ordering
                 FROM event_push_actions
                 WHERE ? <= stream_ordering AND stream_ordering < ?
                     AND highlight = 0
+                    %s
                 GROUP BY user_id, room_id
             ) AS upd
             LEFT JOIN event_push_summary AS old USING (user_id, room_id)
         """
 
-        txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering))
-        rows = txn.fetchall()
+        # First get the count of unread messages.
+        txn.execute(
+            sql % ("unread_count", ""),
+            (old_rotate_stream_ordering, rotate_to_stream_ordering),
+        )
 
-        logger.info("Rotating notifications, handling %d rows", len(rows))
+        # We need to merge both lists into a single object because we might not have the
+        # same amount of rows in each of them. In this case we use a dict indexed on the
+        # user ID and room ID to make it easier to populate.
+        summaries = {}  # type: Dict[Tuple[str, str], EventPushSummary]
+        for row in txn:
+            summaries[(row[0], row[1])] = EventPushSummary(
+                unread_count=row[2],
+                stream_ordering=row[3],
+                old_user_id=row[4],
+                notif_count=0,
+            )
+
+        # Then get the count of notifications.
+        txn.execute(
+            sql % ("notif_count", "AND notif = 1"),
+            (old_rotate_stream_ordering, rotate_to_stream_ordering),
+        )
+
+        # notif_rows is populated based on a subset of the query used to populate
+        # unread_rows, so we can be sure that there will be no KeyError here.
+        for row in txn:
+            summaries[(row[0], row[1])].notif_count = row[2]
+
+        logger.info("Rotating notifications, handling %d rows", len(summaries))
 
         # If the `old.user_id` above is NULL then we know there isn't already an
         # entry in the table, so we simply insert it. Otherwise we update the
@@ -846,22 +911,34 @@ class EventPushActionsStore(EventPushActionsWorkerStore):
             table="event_push_summary",
             values=[
                 {
-                    "user_id": row[0],
-                    "room_id": row[1],
-                    "notif_count": row[2],
-                    "stream_ordering": row[3],
+                    "user_id": user_id,
+                    "room_id": room_id,
+                    "notif_count": summary.notif_count,
+                    "unread_count": summary.unread_count,
+                    "stream_ordering": summary.stream_ordering,
                 }
-                for row in rows
-                if row[4] is None
+                for ((user_id, room_id), summary) in summaries.items()
+                if summary.old_user_id is None
             ],
         )
 
         txn.executemany(
             """
-                UPDATE event_push_summary SET notif_count = ?, stream_ordering = ?
+                UPDATE event_push_summary
+                SET notif_count = ?, unread_count = ?, stream_ordering = ?
                 WHERE user_id = ? AND room_id = ?
             """,
-            ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None),
+            (
+                (
+                    summary.notif_count,
+                    summary.unread_count,
+                    summary.stream_ordering,
+                    user_id,
+                    room_id,
+                )
+                for ((user_id, room_id), summary) in summaries.items()
+                if summary.old_user_id is not None
+            ),
         )
 
         txn.execute(
diff --git a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql
new file mode 100644
index 0000000000..f1459ef7f0
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql
@@ -0,0 +1,23 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Store the number of unread messages, i.e. messages that triggered either a notify
+-- action or a mark_unread one.
+ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0;
+
+-- Pre-populate the new column with the count of pending notifications.
+-- We expect event_push_summary to be relatively small, so we can do this update
+-- synchronously without impacting Synapse's startup time too much.
+UPDATE event_push_summary SET unread_count = notif_count;
\ No newline at end of file
diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py
index 1a88c7fb80..cd8680e812 100644
--- a/tests/replication/slave/storage/test_events.py
+++ b/tests/replication/slave/storage/test_events.py
@@ -160,7 +160,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2, event1.event_id],
-            {"highlight_count": 0, "notify_count": 0},
+            {"highlight_count": 0, "notify_count": 0, "unread_count": 0},
         )
 
         self.persist(
@@ -173,7 +173,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2, event1.event_id],
-            {"highlight_count": 0, "notify_count": 1},
+            {"highlight_count": 0, "notify_count": 1, "unread_count": 1},
         )
 
         self.persist(
@@ -188,7 +188,20 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase):
         self.check(
             "get_unread_event_push_actions_by_room_for_user",
             [ROOM_ID, USER_ID_2, event1.event_id],
-            {"highlight_count": 1, "notify_count": 2},
+            {"highlight_count": 1, "notify_count": 2, "unread_count": 2},
+        )
+
+        self.persist(
+            type="m.room.message",
+            msgtype="m.text",
+            body="world",
+            push_actions=[(USER_ID_2, ["org.matrix.msc2625.mark_unread"])],
+        )
+        self.replicate()
+        self.check(
+            "get_unread_event_push_actions_by_room_for_user",
+            [ROOM_ID, USER_ID_2, event1.event_id],
+            {"highlight_count": 1, "notify_count": 2, "unread_count": 3},
         )
 
     def test_get_rooms_for_user_with_stream_ordering(self):
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index b45bc9c115..303dc8571c 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -22,6 +22,10 @@ import tests.utils
 
 USER_ID = "@user:example.com"
 
+MARK_UNREAD = [
+    "org.matrix.msc2625.mark_unread",
+    {"set_tweak": "highlight", "value": False},
+]
 PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}]
 HIGHLIGHT = [
     "notify",
@@ -55,13 +59,17 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
         user_id = "@user1235:example.com"
 
         @defer.inlineCallbacks
-        def _assert_counts(noitf_count, highlight_count):
+        def _assert_counts(unread_count, notif_count, highlight_count):
             counts = yield self.store.db.runInteraction(
                 "", self.store._get_unread_counts_by_pos_txn, room_id, user_id, 0
             )
             self.assertEquals(
                 counts,
-                {"notify_count": noitf_count, "highlight_count": highlight_count},
+                {
+                    "unread_count": unread_count,
+                    "notify_count": notif_count,
+                    "highlight_count": highlight_count,
+                },
             )
 
         @defer.inlineCallbacks
@@ -96,23 +104,23 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
                 stream,
             )
 
-        yield _assert_counts(0, 0)
+        yield _assert_counts(0, 0, 0)
         yield _inject_actions(1, PlAIN_NOTIF)
-        yield _assert_counts(1, 0)
+        yield _assert_counts(1, 1, 0)
         yield _rotate(2)
-        yield _assert_counts(1, 0)
+        yield _assert_counts(1, 1, 0)
 
         yield _inject_actions(3, PlAIN_NOTIF)
-        yield _assert_counts(2, 0)
+        yield _assert_counts(2, 2, 0)
         yield _rotate(4)
-        yield _assert_counts(2, 0)
+        yield _assert_counts(2, 2, 0)
 
         yield _inject_actions(5, PlAIN_NOTIF)
         yield _mark_read(3, 3)
-        yield _assert_counts(1, 0)
+        yield _assert_counts(1, 1, 0)
 
         yield _mark_read(5, 5)
-        yield _assert_counts(0, 0)
+        yield _assert_counts(0, 0, 0)
 
         yield _inject_actions(6, PlAIN_NOTIF)
         yield _rotate(7)
@@ -121,17 +129,22 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase):
             table="event_push_actions", keyvalues={"1": 1}, desc=""
         )
 
-        yield _assert_counts(1, 0)
+        yield _assert_counts(1, 1, 0)
 
         yield _mark_read(7, 7)
-        yield _assert_counts(0, 0)
+        yield _assert_counts(0, 0, 0)
 
-        yield _inject_actions(8, HIGHLIGHT)
-        yield _assert_counts(1, 1)
+        yield _inject_actions(8, MARK_UNREAD)
+        yield _assert_counts(1, 0, 0)
         yield _rotate(9)
-        yield _assert_counts(1, 1)
-        yield _rotate(10)
-        yield _assert_counts(1, 1)
+        yield _assert_counts(1, 0, 0)
+
+        yield _inject_actions(10, HIGHLIGHT)
+        yield _assert_counts(2, 1, 1)
+        yield _rotate(11)
+        yield _assert_counts(2, 1, 1)
+        yield _rotate(12)
+        yield _assert_counts(2, 1, 1)
 
     @defer.inlineCallbacks
     def test_find_first_stream_ordering_after_ts(self):