diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 3210e9cca1..7469cd336c 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -421,7 +421,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn: LoggingTransaction,
room_id: str,
user_id: str,
- receipt_stream_ordering: int,
+ unthreaded_receipt_stream_ordering: int,
) -> RoomNotifCounts:
"""Get the number of unread messages for a user/room that have happened
since the given stream ordering.
@@ -430,9 +430,9 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
txn: The database transaction.
room_id: The room ID to get unread counts for.
user_id: The user ID to get unread counts for.
- receipt_stream_ordering: The stream ordering of the user's latest
- receipt in the room. If there are no receipts, the stream ordering
- of the user's join event.
+ unthreaded_receipt_stream_ordering: The stream ordering of the user's latest
+ unthreaded receipt in the room. If there are no unthreaded receipts,
+ the stream ordering of the user's join event.
Returns:
A RoomNotifCounts object containing the notification count, the
@@ -448,71 +448,181 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
return main_counts
return thread_counts.setdefault(thread_id, NotifCounts())
+ receipt_types_clause, receipts_args = make_in_list_sql_clause(
+ self.database_engine,
+ "receipt_type",
+ (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
+ )
+
# First we pull the counts from the summary table.
#
- # We check that `last_receipt_stream_ordering` matches the stream
- # ordering given. If it doesn't match then a new read receipt has arrived and
- # we haven't yet updated the counts in `event_push_summary` to reflect
- # that; in that case we simply ignore `event_push_summary` counts
- # and do a manual count of all of the rows in the `event_push_actions` table
- # for this user/room.
+ # We check that `last_receipt_stream_ordering` matches the stream ordering of the
+ # latest receipt for the thread (which may be either the unthreaded read receipt
+ # or the threaded read receipt).
#
- # If `last_receipt_stream_ordering` is null then that means it's up to
- # date (as the row was written by an older version of Synapse that
+ # If it doesn't match then a new read receipt has arrived and we haven't yet
+ # updated the counts in `event_push_summary` to reflect that; in that case we
+ # simply ignore `event_push_summary` counts.
+ #
+ # We then do a manual count of all the rows in the `event_push_actions` table
+ # for any user/room/thread which did not have a valid summary found.
+ #
+ # If `last_receipt_stream_ordering` is null then that means it's up-to-date
+ # (as the row was written by an older version of Synapse that
# updated `event_push_summary` synchronously when persisting a new read
# receipt).
txn.execute(
- """
- SELECT stream_ordering, notif_count, COALESCE(unread_count, 0), thread_id
+ f"""
+ SELECT notif_count, COALESCE(unread_count, 0), thread_id
FROM event_push_summary
+ LEFT JOIN (
+ SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+ FROM receipts_linearized
+ LEFT JOIN events USING (room_id, event_id)
+ WHERE
+ user_id = ?
+ AND room_id = ?
+ AND stream_ordering > ?
+ AND {receipt_types_clause}
+ GROUP BY thread_id
+ ) AS receipts USING (thread_id)
WHERE room_id = ? AND user_id = ?
AND (
- (last_receipt_stream_ordering IS NULL AND stream_ordering > ?)
- OR last_receipt_stream_ordering = ?
+ (last_receipt_stream_ordering IS NULL AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?))
+ OR last_receipt_stream_ordering = COALESCE(threaded_receipt_stream_ordering, ?)
) AND (notif_count != 0 OR COALESCE(unread_count, 0) != 0)
""",
- (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
+ (
+ user_id,
+ room_id,
+ unthreaded_receipt_stream_ordering,
+ *receipts_args,
+ room_id,
+ user_id,
+ unthreaded_receipt_stream_ordering,
+ unthreaded_receipt_stream_ordering,
+ ),
)
- max_summary_stream_ordering = 0
- for summary_stream_ordering, notif_count, unread_count, thread_id in txn:
+ summarised_threads = set()
+ for notif_count, unread_count, thread_id in txn:
+ summarised_threads.add(thread_id)
counts = _get_thread(thread_id)
counts.notify_count += notif_count
counts.unread_count += unread_count
- # Summaries will only be used if they have not been invalidated by
- # a recent receipt; track the latest stream ordering or a valid summary.
- #
- # Note that since there's only one read receipt in the room per user,
- # valid summaries are contiguous.
- max_summary_stream_ordering = max(
- summary_stream_ordering, max_summary_stream_ordering
- )
-
# Next we need to count highlights, which aren't summarised
- sql = """
+ sql = f"""
SELECT COUNT(*), thread_id FROM event_push_actions
+ LEFT JOIN (
+ SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+ FROM receipts_linearized
+ LEFT JOIN events USING (room_id, event_id)
+ WHERE
+ user_id = ?
+ AND room_id = ?
+ AND stream_ordering > ?
+ AND {receipt_types_clause}
+ GROUP BY thread_id
+ ) AS receipts USING (thread_id)
WHERE user_id = ?
AND room_id = ?
- AND stream_ordering > ?
+ AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
AND highlight = 1
GROUP BY thread_id
"""
- txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
+ txn.execute(
+ sql,
+ (
+ user_id,
+ room_id,
+ unthreaded_receipt_stream_ordering,
+ *receipts_args,
+ user_id,
+ room_id,
+ unthreaded_receipt_stream_ordering,
+ ),
+ )
for highlight_count, thread_id in txn:
_get_thread(thread_id).highlight_count += highlight_count
+ # For threads which were summarised we need to count actions since the last
+ # rotation.
+ thread_id_clause, thread_id_args = make_in_list_sql_clause(
+ self.database_engine, "thread_id", summarised_threads
+ )
+
+ # The (inclusive) event stream ordering that was previously summarised.
+ rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
+ txn,
+ table="event_push_summary_stream_ordering",
+ keyvalues={},
+ retcol="stream_ordering",
+ )
+
+ unread_counts = self._get_notif_unread_count_for_user_room(
+ txn, room_id, user_id, rotated_upto_stream_ordering
+ )
+ for notif_count, unread_count, thread_id in unread_counts:
+ if thread_id not in summarised_threads:
+ continue
+
+ if thread_id == MAIN_TIMELINE:
+ counts.notify_count += notif_count
+ counts.unread_count += unread_count
+ elif thread_id in thread_counts:
+ thread_counts[thread_id].notify_count += notif_count
+ thread_counts[thread_id].unread_count += unread_count
+ else:
+ # Previous thread summaries of 0 are discarded above.
+ #
+ # TODO If empty summaries are deleted this can be removed.
+ thread_counts[thread_id] = NotifCounts(
+ notify_count=notif_count,
+ unread_count=unread_count,
+ highlight_count=0,
+ )
+
# Finally we need to count push actions that aren't included in the
# summary returned above. This might be due to recent events that haven't
# been summarised yet or the summary is out of date due to a recent read
# receipt.
- start_unread_stream_ordering = max(
- receipt_stream_ordering, max_summary_stream_ordering
- )
- unread_counts = self._get_notif_unread_count_for_user_room(
- txn, room_id, user_id, start_unread_stream_ordering
+ sql = f"""
+ SELECT
+ COUNT(CASE WHEN notif = 1 THEN 1 END),
+ COUNT(CASE WHEN unread = 1 THEN 1 END),
+ thread_id
+ FROM event_push_actions
+ LEFT JOIN (
+ SELECT thread_id, MAX(stream_ordering) AS threaded_receipt_stream_ordering
+ FROM receipts_linearized
+ LEFT JOIN events USING (room_id, event_id)
+ WHERE
+ user_id = ?
+ AND room_id = ?
+ AND stream_ordering > ?
+ AND {receipt_types_clause}
+ GROUP BY thread_id
+ ) AS receipts USING (thread_id)
+ WHERE user_id = ?
+ AND room_id = ?
+ AND stream_ordering > COALESCE(threaded_receipt_stream_ordering, ?)
+ AND NOT {thread_id_clause}
+ GROUP BY thread_id
+ """
+ txn.execute(
+ sql,
+ (
+ user_id,
+ room_id,
+ unthreaded_receipt_stream_ordering,
+ *receipts_args,
+ user_id,
+ room_id,
+ unthreaded_receipt_stream_ordering,
+ *thread_id_args,
+ ),
)
-
- for notif_count, unread_count, thread_id in unread_counts:
+ for notif_count, unread_count, thread_id in txn:
counts = _get_thread(thread_id)
counts.notify_count += notif_count
counts.unread_count += unread_count
@@ -526,6 +636,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
user_id: str,
stream_ordering: int,
max_stream_ordering: Optional[int] = None,
+ thread_id: Optional[str] = None,
) -> List[Tuple[int, int, str]]:
"""Returns the notify and unread counts from `event_push_actions` for
the given user/room in the given range.
@@ -540,6 +651,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
stream_ordering: The (exclusive) minimum stream ordering to consider.
max_stream_ordering: The (inclusive) maximum stream ordering to consider.
If this is not given, then no maximum is applied.
+ thread_id: The thread ID to fetch unread counts for. If this is not provided
+ then the results for *all* threads is returned.
+
+ Note that if this is provided the resulting list will only have 0 or
+ 1 tuples in it.
Return:
A tuple of the notif count and unread count in the given range for
@@ -551,10 +667,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
if not self._events_stream_cache.has_entity_changed(room_id, stream_ordering):
return []
- clause = ""
+ stream_ordering_clause = ""
args = [user_id, room_id, stream_ordering]
if max_stream_ordering is not None:
- clause = "AND ea.stream_ordering <= ?"
+ stream_ordering_clause = "AND ea.stream_ordering <= ?"
args.append(max_stream_ordering)
# If the max stream ordering is less than the min stream ordering,
@@ -562,6 +678,12 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
if max_stream_ordering <= stream_ordering:
return []
+ # Either limit the results to a specific thread or fetch all threads.
+ thread_id_clause = ""
+ if thread_id is not None:
+ thread_id_clause = "AND thread_id = ?"
+ args.append(thread_id)
+
sql = f"""
SELECT
COUNT(CASE WHEN notif = 1 THEN 1 END),
@@ -571,7 +693,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
WHERE user_id = ?
AND room_id = ?
AND ea.stream_ordering > ?
- {clause}
+ {stream_ordering_clause}
+ {thread_id_clause}
GROUP BY thread_id
"""
@@ -1086,7 +1209,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
)
sql = """
- SELECT r.stream_id, r.room_id, r.user_id, e.stream_ordering
+ SELECT r.stream_id, r.room_id, r.user_id, r.thread_id, e.stream_ordering
FROM receipts_linearized AS r
INNER JOIN events AS e USING (event_id)
WHERE ? < r.stream_id AND r.stream_id <= ? AND user_id LIKE ?
@@ -1107,45 +1230,69 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
limit,
),
)
- rows = cast(List[Tuple[int, str, str, int]], txn.fetchall())
+ rows = cast(List[Tuple[int, str, str, Optional[str], int]], txn.fetchall())
# For each new read receipt we delete push actions from before it and
# recalculate the summary.
- for _, room_id, user_id, stream_ordering in rows:
+ #
+ # Care must be taken of whether it is a threaded or unthreaded receipt.
+ for _, room_id, user_id, thread_id, stream_ordering in rows:
# Only handle our own read receipts.
if not self.hs.is_mine_id(user_id):
continue
+ thread_clause = ""
+ thread_args: Tuple = ()
+ if thread_id is not None:
+ thread_clause = "AND thread_id = ?"
+ thread_args = (thread_id,)
+
+ # For each new read receipt we delete push actions from before it and
+ # recalculate the summary.
txn.execute(
- """
+ f"""
DELETE FROM event_push_actions
WHERE room_id = ?
AND user_id = ?
AND stream_ordering <= ?
AND highlight = 0
+ {thread_clause}
""",
- (room_id, user_id, stream_ordering),
+ (room_id, user_id, stream_ordering, *thread_args),
)
# Fetch the notification counts between the stream ordering of the
# latest receipt and what was previously summarised.
unread_counts = self._get_notif_unread_count_for_user_room(
- txn, room_id, user_id, stream_ordering, old_rotate_stream_ordering
- )
-
- # First mark the summary for all threads in the room as cleared.
- self.db_pool.simple_update_txn(
txn,
- table="event_push_summary",
- keyvalues={"user_id": user_id, "room_id": room_id},
- updatevalues={
- "notif_count": 0,
- "unread_count": 0,
- "stream_ordering": old_rotate_stream_ordering,
- "last_receipt_stream_ordering": stream_ordering,
- },
+ room_id,
+ user_id,
+ stream_ordering,
+ old_rotate_stream_ordering,
+ thread_id,
)
+ # For an unthreaded receipt, mark the summary for all threads in the room
+ # as cleared.
+ if thread_id is None:
+ self.db_pool.simple_update_txn(
+ txn,
+ table="event_push_summary",
+ keyvalues={"user_id": user_id, "room_id": room_id},
+ updatevalues={
+ "notif_count": 0,
+ "unread_count": 0,
+ "stream_ordering": old_rotate_stream_ordering,
+ "last_receipt_stream_ordering": stream_ordering,
+ },
+ )
+
+ # For a threaded receipt, we *always* want to update that receipt,
+ # event if there are no new notifications in that thread. This ensures
+ # the stream_ordering & last_receipt_stream_ordering are updated.
+ elif not unread_counts:
+ unread_counts = [(0, 0, thread_id)]
+
# Then any updated threads get their notification count and unread
# count updated.
self.db_pool.simple_update_many_txn(
@@ -1153,8 +1300,16 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
table="event_push_summary",
key_names=("room_id", "user_id", "thread_id"),
key_values=[(room_id, user_id, row[2]) for row in unread_counts],
- value_names=("notif_count", "unread_count"),
- value_values=[(row[0], row[1]) for row in unread_counts],
+ value_names=(
+ "notif_count",
+ "unread_count",
+ "stream_ordering",
+ "last_receipt_stream_ordering",
+ ),
+ value_values=[
+ (row[0], row[1], old_rotate_stream_ordering, stream_ordering)
+ for row in unread_counts
+ ],
)
# We always update `event_push_summary_last_receipt_stream_id` to
diff --git a/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres
new file mode 100644
index 0000000000..3e0bc9e5eb
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.postgres
@@ -0,0 +1,23 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop constraint on (room_id, receipt_type, user_id).
+
+-- Rebuild the unique constraint with the thread_id.
+ALTER TABLE receipts_linearized
+ DROP CONSTRAINT receipts_linearized_uniqueness;
+
+ALTER TABLE receipts_graph
+ DROP CONSTRAINT receipts_graph_uniqueness;
diff --git a/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite
new file mode 100644
index 0000000000..e664889fbc
--- /dev/null
+++ b/synapse/storage/schema/main/delta/73/08thread_receipts_non_null.sql.sqlite
@@ -0,0 +1,76 @@
+/* Copyright 2022 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Drop constraint on (room_id, receipt_type, user_id).
+--
+-- SQLite doesn't support modifying constraints to an existing table, so it must
+-- be recreated.
+
+-- Create the new tables.
+CREATE TABLE receipts_linearized_new (
+ stream_id BIGINT NOT NULL,
+ room_id TEXT NOT NULL,
+ receipt_type TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ event_id TEXT NOT NULL,
+ thread_id TEXT,
+ event_stream_ordering BIGINT,
+ data TEXT NOT NULL,
+ CONSTRAINT receipts_linearized_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+CREATE TABLE receipts_graph_new (
+ room_id TEXT NOT NULL,
+ receipt_type TEXT NOT NULL,
+ user_id TEXT NOT NULL,
+ event_ids TEXT NOT NULL,
+ thread_id TEXT,
+ data TEXT NOT NULL,
+ CONSTRAINT receipts_graph_uniqueness_thread UNIQUE (room_id, receipt_type, user_id, thread_id)
+);
+
+-- Drop the old indexes.
+DROP INDEX IF EXISTS receipts_linearized_id;
+DROP INDEX IF EXISTS receipts_linearized_room_stream;
+DROP INDEX IF EXISTS receipts_linearized_user;
+
+-- Copy the data.
+INSERT INTO receipts_linearized_new (stream_id, room_id, receipt_type, user_id, event_id, data)
+ SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+ FROM receipts_linearized;
+INSERT INTO receipts_graph_new (room_id, receipt_type, user_id, event_ids, data)
+ SELECT room_id, receipt_type, user_id, event_ids, data
+ FROM receipts_graph;
+
+-- Drop the old tables.
+DROP TABLE receipts_linearized;
+DROP TABLE receipts_graph;
+
+-- Rename the tables.
+ALTER TABLE receipts_linearized_new RENAME TO receipts_linearized;
+ALTER TABLE receipts_graph_new RENAME TO receipts_graph;
+
+-- Create the indices.
+CREATE INDEX receipts_linearized_id ON receipts_linearized( stream_id );
+CREATE INDEX receipts_linearized_room_stream ON receipts_linearized( room_id, stream_id );
+CREATE INDEX receipts_linearized_user ON receipts_linearized( user_id );
+
+-- Re-run background updates from 72/08thread_receipts.sql.
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (7308, 'receipts_linearized_unique_index', '{}')
+ ON CONFLICT (update_name) DO NOTHING;
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+ (7308, 'receipts_graph_unique_index', '{}')
+ ON CONFLICT (update_name) DO NOTHING;
|