summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/13877.feature1
-rw-r--r--synapse/storage/databases/main/event_push_actions.py277
-rw-r--r--synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres23
-rw-r--r--synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite76
-rw-r--r--tests/storage/test_event_push_actions.py189
5 files changed, 504 insertions, 62 deletions
diff --git a/changelog.d/13877.feature b/changelog.d/13877.feature
new file mode 100644
index 0000000000..d0cb902dff
--- /dev/null
+++ b/changelog.d/13877.feature
@@ -0,0 +1 @@
+Experimental support for thread-specific receipts ([MSC3771](https://github.com/matrix-org/matrix-spec-proposals/pull/3771)).
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 3210e9cca1..7469cd336c 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -421,7 +421,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         txn: LoggingTransaction,
         room_id: str,
         user_id: str,
-        receipt_stream_ordering: int,
+        unthreaded_receipt_stream_ordering: int,
     ) -> RoomNotifCounts:
         """Get the number of unread messages for a user/room that have happened
         since the given stream ordering.
@@ -430,9 +430,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             txn: The database transaction.
             room_id: The room ID to get unread counts for.
             user_id: The user ID to get unread counts for.
-            receipt_stream_ordering: The stream ordering of the user's latest
-                receipt in the room. If there are no receipts, the stream ordering
-                of the user's join event.
+            unthreaded_receipt_stream_ordering: The stream ordering of the user's latest
+                unthreaded receipt in the room. If there are no unthreaded receipts,
+                the stream ordering of the user's join event.
 
         Returns:
             A RoomNotifCounts object containing the notification count, the
@@ -448,71 +448,181 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 return main_counts
             return thread_counts.setdefault(thread_id, NotifCounts())
 
+        receipt_types_clause, receipts_args = make_in_list_sql_clause(
+            self.database_engine,
+            "receipt_type",
+            (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+        )
+
         # First we pull the counts from the summary table.
         #
-        # We check that `last_receipt_stream_ordering` matches the stream
-        # ordering given. If it doesn't match then a new read receipt has arrived and
-        # we haven't yet updated the counts in `event_push_summary` to reflect
-        # that; in that case we simply ignore `event_push_summary` counts
-        # and do a manual count of all of the rows in the `event_push_actions` table
-        # for this user/room.
+        # We check that `last_receipt_stream_ordering` matches the stream ordering of the
+        # latest receipt for the thread (which may be either the unthreaded read receipt
+        # or the threaded read receipt).
         #
-        # If `last_receipt_stream_ordering` is null then that means it's up to
-        # date (as the row was written by an older version of Synapse that
+        # If it doesn't match then a new read receipt has arrived and we haven't yet
+        # updated the counts in `event_push_summary` to reflect that; in that case we
+        # simply ignore `event_push_summary` counts.
+        #
+        # We then do a manual count of all the rows in the `event_push_actions` table
+        # for any user/room/thread which did not have a valid summary found.
+        #
+        # If `last_receipt_stream_ordering` is null then that means it's up-to-date
+        # (as the row was written by an older version of Synapse that
         # updated `event_push_summary` synchronously when persisting a new read
         # receipt).
         txn.execute(
-            """
-                SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
+            f"""
+                SELECT notif_count, COALESCE(unread_count, 0), thread_id
                 FROM event_push_summary
+                LEFT JOIN (
+                    SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+                    FROM receipts_linearized
+                    LEFT JOIN events USING (room_id, event_id)
+                    WHERE
+                        user_id = ?
+                        AND room_id = ?
+                        AND stream_ordering > ?
+                        AND {receipt_types_clause}
+                    GROUP BY thread_id
+                ) AS receipts USING (thread_id)
                 WHERE room_id = ? AND user_id = ?
                 AND (
-                    (last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
-                    OR last_receipt_stream_ordering = ?
+                    (last_receipt_stream_ordering IS NULL AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?))
+                    OR last_receipt_stream_ordering = COALESCE(threaded_receipt_stream_ordering, ?)
                 ) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0)
             """,
-            (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
+            (
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *receipts_args,
+                room_id,
+                user_id,
+                unthreaded_receipt_stream_ordering,
+                unthreaded_receipt_stream_ordering,
+            ),
         )
-        max_summary_stream_ordering = 0
-        for summary_stream_ordering, notif_count, unread_count, thread_id in txn:
+        summarised_threads = set()
+        for notif_count, unread_count, thread_id in txn:
+            summarised_threads.add(thread_id)
             counts = _get_thread(thread_id)
             counts.notify_count += notif_count
             counts.unread_count += unread_count
 
-            # Summaries will only be used if they have not been invalidated by
-            # a recent receipt; track the latest stream ordering or a valid summary.
-            #
-            # Note that since there's only one read receipt in the room per user,
-            # valid summaries are contiguous.
-            max_summary_stream_ordering = max(
-                summary_stream_ordering, max_summary_stream_ordering
-            )
-
         # Next we need to count highlights, which aren't summarised
-        sql = """
+        sql = f"""
             SELECT COUNT(*), thread_id FROM event_push_actions
+            LEFT JOIN (
+                SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+                FROM receipts_linearized
+                LEFT JOIN events USING (room_id, event_id)
+                WHERE
+                    user_id = ?
+                    AND room_id = ?
+                    AND stream_ordering > ?
+                    AND {receipt_types_clause}
+                GROUP BY thread_id
+            ) AS receipts USING (thread_id)
             WHERE user_id = ?
                 AND room_id = ?
-                AND stream_ordering > ?
+                AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
                 AND highlight = 1
             GROUP BY thread_id
         """
-        txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
+        txn.execute(
+            sql,
+            (
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *receipts_args,
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+            ),
+        )
         for highlight_count, thread_id in txn:
             _get_thread(thread_id).highlight_count += highlight_count
 
+        # For threads which were summarised we need to count actions since the last
+        # rotation.
+        thread_id_clause, thread_id_args = make_in_list_sql_clause(
+            self.database_engine, "thread_id", summarised_threads
+        )
+
+        # The (inclusive) event stream ordering that was previously summarised.
+        rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
+            txn,
+            table="event_push_summary_stream_ordering",
+            keyvalues={},
+            retcol="stream_ordering",
+        )
+
+        unread_counts = self._get_notif_unread_count_for_user_room(
+            txn, room_id, user_id, rotated_upto_stream_ordering
+        )
+        for notif_count, unread_count, thread_id in unread_counts:
+            if thread_id not in summarised_threads:
+                continue
+
+            if thread_id == MAIN_TIMELINE:
+                counts.notify_count += notif_count
+                counts.unread_count += unread_count
+            elif thread_id in thread_counts:
+                thread_counts[thread_id].notify_count += notif_count
+                thread_counts[thread_id].unread_count += unread_count
+            else:
+                # Previous thread summaries of 0 are discarded above.
+                #
+                # TODO If empty summaries are deleted this can be removed.
+                thread_counts[thread_id] = NotifCounts(
+                    notify_count=notif_count,
+                    unread_count=unread_count,
+                    highlight_count=0,
+                )
+
         # Finally we need to count push actions that aren't included in the
         # summary returned above. This might be due to recent events that haven't
         # been summarised yet or the summary is out of date due to a recent read
         # receipt.
-        start_unread_stream_ordering = max(
-            receipt_stream_ordering, max_summary_stream_ordering
-        )
-        unread_counts = self._get_notif_unread_count_for_user_room(
-            txn, room_id, user_id, start_unread_stream_ordering
+        sql = f"""
+            SELECT
+                COUNT(CASE WHEN notif = 1 THEN 1 END),
+                COUNT(CASE WHEN unread = 1 THEN 1 END),
+                thread_id
+            FROM event_push_actions
+            LEFT JOIN (
+                SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+                FROM receipts_linearized
+                LEFT JOIN events USING (room_id, event_id)
+                WHERE
+                    user_id = ?
+                    AND room_id = ?
+                    AND stream_ordering > ?
+                    AND {receipt_types_clause}
+                GROUP BY thread_id
+            ) AS receipts USING (thread_id)
+            WHERE user_id = ?
+                AND room_id = ?
+                AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
+                AND NOT {thread_id_clause}
+            GROUP BY thread_id
+        """
+        txn.execute(
+            sql,
+            (
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *receipts_args,
+                user_id,
+                room_id,
+                unthreaded_receipt_stream_ordering,
+                *thread_id_args,
+            ),
         )
-
-        for notif_count, unread_count, thread_id in unread_counts:
+        for notif_count, unread_count, thread_id in txn:
             counts = _get_thread(thread_id)
             counts.notify_count += notif_count
             counts.unread_count += unread_count
@@ -526,6 +636,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         user_id: str,
         stream_ordering: int,
         max_stream_ordering: Optional[int] = None,
+        thread_id: Optional[str] = None,
     ) -> List[Tuple[int, int, str]]:
         """Returns the notify and unread counts from `event_push_actions` for
         the given user/room in the given range.
@@ -540,6 +651,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             stream_ordering: The (exclusive) minimum stream ordering to consider.
             max_stream_ordering: The (inclusive) maximum stream ordering to consider.
                 If this is not given, then no maximum is applied.
+            thread_id: The thread ID to fetch unread counts for. If this is not provided
+                then the results for *all* threads is returned.
+
+                Note that if this is provided the resulting list will only have 0 or
+                1 tuples in it.
 
         Return:
             A tuple of the notif count and unread count in the given range for
@@ -551,10 +667,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
             return []
 
-        clause = ""
+        stream_ordering_clause = ""
         args = [user_id, room_id, stream_ordering]
         if max_stream_ordering is not None:
-            clause = "AND ea.stream_ordering <= ?"
+            stream_ordering_clause = "AND ea.stream_ordering <= ?"
             args.append(max_stream_ordering)
 
             # If the max stream ordering is less than the min stream ordering,
@@ -562,6 +678,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             if max_stream_ordering <= stream_ordering:
                 return []
 
+        # Either limit the results to a specific thread or fetch all threads.
+        thread_id_clause = ""
+        if thread_id is not None:
+            thread_id_clause = "AND thread_id = ?"
+            args.append(thread_id)
+
         sql = f"""
             SELECT
                COUNT(CASE WHEN notif = 1 THEN 1 END),
@@ -571,7 +693,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             WHERE user_id = ?
                AND room_id = ?
                AND ea.stream_ordering > ?
-               {clause}
+               {stream_ordering_clause}
+               {thread_id_clause}
             GROUP BY thread_id
         """
 
@@ -1086,7 +1209,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         )
 
         sql = """
-            SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
+            SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
             FROM receipts_linearized AS r
             INNER JOIN events AS e USING (event_id)
             WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
@@ -1107,45 +1230,69 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 limit,
             ),
         )
-        rows = cast(List[Tuple[int, str, str, int]], txn.fetchall())
+        rows = cast(List[Tuple[int, str, str, Optional[str], int]], txn.fetchall())
 
         # For each new read receipt we delete push actions from before it and
         # recalculate the summary.
-        for _, room_id, user_id, stream_ordering in rows:
+        #
+        # Care must be taken of whether it is a threaded or unthreaded receipt.
+        for _, room_id, user_id, thread_id, stream_ordering in rows:
             # Only handle our own read receipts.
             if not self.hs.is_mine_id(user_id):
                 continue
 
+            thread_clause = ""
+            thread_args: Tuple = ()
+            if thread_id is not None:
+                thread_clause = "AND thread_id = ?"
+                thread_args = (thread_id,)
+
+            # For each new read receipt we delete push actions from before it and
+            # recalculate the summary.
             txn.execute(
-                """
+                f"""
                 DELETE FROM event_push_actions
                 WHERE room_id = ?
                     AND user_id = ?
                     AND stream_ordering <= ?
                     AND highlight = 0
+                    {thread_clause}
                 """,
-                (room_id, user_id, stream_ordering),
+                (room_id, user_id, stream_ordering, *thread_args),
             )
 
             # Fetch the notification counts between the stream ordering of the
             # latest receipt and what was previously summarised.
             unread_counts = self._get_notif_unread_count_for_user_room(
-                txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
-            )
-
-            # First mark the summary for all threads in the room as cleared.
-            self.db_pool.simple_update_txn(
                 txn,
-                table="event_push_summary",
-                keyvalues={"user_id": user_id, "room_id": room_id},
-                updatevalues={
-                    "notif_count": 0,
-                    "unread_count": 0,
-                    "stream_ordering": old_rotate_stream_ordering,
-                    "last_receipt_stream_ordering": stream_ordering,
-                },
+                room_id,
+                user_id,
+                stream_ordering,
+                old_rotate_stream_ordering,
+                thread_id,
             )
 
+            # For an unthreaded receipt, mark the summary for all threads in the room
+            # as cleared.
+            if thread_id is None:
+                self.db_pool.simple_update_txn(
+                    txn,
+                    table="event_push_summary",
+                    keyvalues={"user_id": user_id, "room_id": room_id},
+                    updatevalues={
+                        "notif_count": 0,
+                        "unread_count": 0,
+                        "stream_ordering": old_rotate_stream_ordering,
+                        "last_receipt_stream_ordering": stream_ordering,
+                    },
+                )
+
+            # For a threaded receipt, we *always* want to update that receipt,
+            # event if there are no new notifications in that thread. This ensures
+            # the stream_ordering & last_receipt_stream_ordering are updated.
+            elif not unread_counts:
+                unread_counts = [(0, 0, thread_id)]
+
             # Then any updated threads get their notification count and unread
             # count updated.
             self.db_pool.simple_update_many_txn(
@@ -1153,8 +1300,16 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 table="event_push_summary",
                 key_names=("room_id", "user_id", "thread_id"),
                 key_values=[(room_id, user_id, row[2]) for row in unread_counts],
-                value_names=("notif_count", "unread_count"),
-                value_values=[(row[0], row[1]) for row in unread_counts],
+                value_names=(
+                    "notif_count",
+                    "unread_count",
+                    "stream_ordering",
+                    "last_receipt_stream_ordering",
+                ),
+                value_values=[
+                    (row[0], row[1], old_rotate_stream_ordering, stream_ordering)
+                    for row in unread_counts
+                ],
             )
 
         # We always update `event_push_summary_last_receipt_stream_id` to
diff --git a/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres
new file mode 100644
index 0000000000..3e0bc9e5eb
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop constraint on (room_id, receipt_type, user_id).
+
+-- Rebuild the unique constraint with the thread_id.
+ALTER TABLE receipts_linearized
+    DROP CONSTRAINT receipts_linearized_uniqueness;
+
+ALTER TABLE receipts_graph
+    DROP CONSTRAINT receipts_graph_uniqueness;
diff --git a/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite
new file mode 100644
index 0000000000..e664889fbc
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite
@@ -0,0 +1,76 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop constraint on (room_id, receipt_type, user_id).
+--
+-- SQLite doesn't support modifying constraints to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE receipts_linearized_new (
+    stream_id BIGINT NOT NULL,
+    room_id TEXT NOT NULL,
+    receipt_type TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    event_id TEXT NOT NULL,
+    thread_id TEXT,
+    event_stream_ordering BIGINT,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+CREATE TABLE receipts_graph_new (
+    room_id TEXT NOT NULL,
+    receipt_type TEXT NOT NULL,
+    user_id TEXT NOT NULL,
+    event_ids TEXT NOT NULL,
+    thread_id TEXT,
+    data TEXT NOT NULL,
+    CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+-- Drop the old indexes.
+DROP INDEX IF EXISTS receipts_linearized_id;
+DROP INDEX IF EXISTS receipts_linearized_room_stream;
+DROP INDEX IF EXISTS receipts_linearized_user;
+
+-- Copy the data.
+INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data)
+    SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+    FROM receipts_linearized;
+INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
+    SELECT room_id, receipt_type, user_id, event_ids, data
+    FROM receipts_graph;
+
+-- Drop the old tables.
+DROP TABLE receipts_linearized;
+DROP TABLE receipts_graph;
+
+-- Rename the tables.
+ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
+ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
+
+-- Create the indices.
+CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
+CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
+CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
+
+-- Re-run background updates from 72/08thread_receipts.sql.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7308, 'receipts_linearized_unique_index', '{}')
+  ON CONFLICT (update_name) DO NOTHING;
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+  (7308, 'receipts_graph_unique_index', '{}')
+  ON CONFLICT (update_name) DO NOTHING;
diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py
index 89f986ac34..6fa0cafb75 100644
--- a/tests/storage/test_event_push_actions.py
+++ b/tests/storage/test_event_push_actions.py
@@ -16,6 +16,7 @@ from typing import Optional, Tuple
 
 from twisted.test.proto_helpers import MemoryReactor
 
+from synapse.api.constants import MAIN_TIMELINE
 from synapse.rest import admin
 from synapse.rest.client import login, room
 from synapse.server import HomeServer
@@ -312,7 +313,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         def _rotate() -> None:
             self.get_success(self.store._rotate_notifs())
 
-        def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None:
+        def _mark_read(event_id: str, thread_id: str = MAIN_TIMELINE) -> None:
             self.get_success(
                 self.store.insert_receipt(
                     room_id,
@@ -348,9 +349,12 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         _create_event()
         _create_event(thread_id=thread_id)
         _mark_read(event_id)
+        _assert_counts(1, 0, 3, 0)
+        _mark_read(event_id, thread_id)
         _assert_counts(1, 0, 1, 0)
 
         _mark_read(last_event_id)
+        _mark_read(last_event_id, thread_id)
         _assert_counts(0, 0, 0, 0)
 
         _create_event()
@@ -364,6 +368,7 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         _assert_counts(1, 0, 1, 0)
 
         _mark_read(last_event_id)
+        _mark_read(last_event_id, thread_id)
         _assert_counts(0, 0, 0, 0)
 
         _create_event(True)
@@ -389,8 +394,190 @@ class EventPushActionsStoreTestCase(HomeserverTestCase):
         # Check that sending read receipts at different points results in the
         # right counts.
         _mark_read(event_id)
+        _assert_counts(1, 0, 2, 1)
+        _mark_read(event_id, thread_id)
         _assert_counts(1, 0, 1, 0)
         _mark_read(last_event_id)
+        _assert_counts(0, 0, 1, 0)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event(True)
+        _create_event(True, thread_id)
+        _assert_counts(1, 1, 1, 1)
+        _mark_read(last_event_id)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+        _rotate()
+        _assert_counts(0, 0, 0, 0)
+
+    def test_count_aggregation_mixed(self) -> None:
+        """
+        This is essentially the same test as test_count_aggregation_threads, but
+        sends both unthreaded and threaded receipts.
+        """
+
+        # Create a user to receive notifications and send receipts.
+        user_id = self.register_user("user1235", "pass")
+        token = self.login("user1235", "pass")
+
+        # And another users to send events.
+        other_id = self.register_user("other", "pass")
+        other_token = self.login("other", "pass")
+
+        # Create a room and put both users in it.
+        room_id = self.helper.create_room_as(user_id, tok=token)
+        self.helper.join(room_id, other_id, tok=other_token)
+        thread_id: str
+
+        last_event_id: str
+
+        def _assert_counts(
+            noitf_count: int,
+            highlight_count: int,
+            thread_notif_count: int,
+            thread_highlight_count: int,
+        ) -> None:
+            counts = self.get_success(
+                self.store.db_pool.runInteraction(
+                    "get-unread-counts",
+                    self.store._get_unread_counts_by_receipt_txn,
+                    room_id,
+                    user_id,
+                )
+            )
+            self.assertEqual(
+                counts.main_timeline,
+                NotifCounts(
+                    notify_count=noitf_count,
+                    unread_count=0,
+                    highlight_count=highlight_count,
+                ),
+            )
+            if thread_notif_count or thread_highlight_count:
+                self.assertEqual(
+                    counts.threads,
+                    {
+                        thread_id: NotifCounts(
+                            notify_count=thread_notif_count,
+                            unread_count=0,
+                            highlight_count=thread_highlight_count,
+                        ),
+                    },
+                )
+            else:
+                self.assertEqual(counts.threads, {})
+
+        def _create_event(
+            highlight: bool = False, thread_id: Optional[str] = None
+        ) -> str:
+            content: JsonDict = {
+                "msgtype": "m.text",
+                "body": user_id if highlight else "msg",
+            }
+            if thread_id:
+                content["m.relates_to"] = {
+                    "rel_type": "m.thread",
+                    "event_id": thread_id,
+                }
+
+            result = self.helper.send_event(
+                room_id,
+                type="m.room.message",
+                content=content,
+                tok=other_token,
+            )
+            nonlocal last_event_id
+            last_event_id = result["event_id"]
+            return last_event_id
+
+        def _rotate() -> None:
+            self.get_success(self.store._rotate_notifs())
+
+        def _mark_read(event_id: str, thread_id: Optional[str] = None) -> None:
+            self.get_success(
+                self.store.insert_receipt(
+                    room_id,
+                    "m.read",
+                    user_id=user_id,
+                    event_ids=[event_id],
+                    thread_id=thread_id,
+                    data={},
+                )
+            )
+
+        _assert_counts(0, 0, 0, 0)
+        thread_id = _create_event()
+        _assert_counts(1, 0, 0, 0)
+        _rotate()
+        _assert_counts(1, 0, 0, 0)
+
+        _create_event(thread_id=thread_id)
+        _assert_counts(1, 0, 1, 0)
+        _rotate()
+        _assert_counts(1, 0, 1, 0)
+
+        _create_event()
+        _assert_counts(2, 0, 1, 0)
+        _rotate()
+        _assert_counts(2, 0, 1, 0)
+
+        event_id = _create_event(thread_id=thread_id)
+        _assert_counts(2, 0, 2, 0)
+        _rotate()
+        _assert_counts(2, 0, 2, 0)
+
+        _create_event()
+        _create_event(thread_id=thread_id)
+        _mark_read(event_id)
+        _assert_counts(1, 0, 1, 0)
+
+        _mark_read(last_event_id, MAIN_TIMELINE)
+        _mark_read(last_event_id, thread_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event()
+        _create_event(thread_id=thread_id)
+        _assert_counts(1, 0, 1, 0)
+        _rotate()
+        _assert_counts(1, 0, 1, 0)
+
+        # Delete old event push actions, this should not affect the (summarised) count.
+        self.get_success(self.store._remove_old_push_actions_that_have_rotated())
+        _assert_counts(1, 0, 1, 0)
+
+        _mark_read(last_event_id)
+        _assert_counts(0, 0, 0, 0)
+
+        _create_event(True)
+        _assert_counts(1, 1, 0, 0)
+        _rotate()
+        _assert_counts(1, 1, 0, 0)
+
+        event_id = _create_event(True, thread_id)
+        _assert_counts(1, 1, 1, 1)
+        _rotate()
+        _assert_counts(1, 1, 1, 1)
+
+        # Check that adding another notification and rotating after highlight
+        # works.
+        _create_event()
+        _rotate()
+        _assert_counts(2, 1, 1, 1)
+
+        _create_event(thread_id=thread_id)
+        _rotate()
+        _assert_counts(2, 1, 2, 1)
+
+        # Check that sending read receipts at different points results in the
+        # right counts.
+        _mark_read(event_id)
+        _assert_counts(1, 0, 1, 0)
+        _mark_read(event_id, MAIN_TIMELINE)
+        _assert_counts(1, 0, 1, 0)
+        _mark_read(last_event_id, MAIN_TIMELINE)
+        _assert_counts(0, 0, 1, 0)
+        _mark_read(last_event_id, thread_id)
         _assert_counts(0, 0, 0, 0)
 
         _create_event(True)