diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/event_push_actions.py | 253 |
1 files changed, 146 insertions, 107 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 650b8c8135..6d4e2942ea 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -357,10 +357,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas This function is intentionally not cached because it is called to calculate the unread badge for push notifications and thus the result is expected to change. - Note that this function assumes the user is a member of the room. Because - summary rows are not removed when a user leaves a room, the caller must - filter out those results from the result. - Returns: A map of room ID to notification counts for the given user. """ @@ -373,127 +369,170 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas def _get_unread_counts_by_room_for_user_txn( self, txn: LoggingTransaction, user_id: str ) -> Dict[str, int]: - receipt_types_clause, args = make_in_list_sql_clause( + # To get the badge count of all rooms we need to make three queries: + # 1. Fetch all counts from `event_push_summary`, discarding any stale + # rooms. + # 2. Fetch all notifications from `event_push_actions` that haven't + # been rotated yet. + # 3. Fetch all notifications from `event_push_actions` for the stale + # rooms. + # + # The "stale room" scenario generally happens when there is a new read + # receipt that hasn't yet been processed to update the + # `event_push_summary` table. When that happens we ignore the + # `event_push_summary` table for that room and calculate the count + # manually from `event_push_actions`. + + # We need to only take into account read receipts of these types. + receipt_types_clause, receipt_types_args = make_in_list_sql_clause( self.database_engine, "receipt_type", (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE), ) - args.extend([user_id, user_id]) - - receipts_cte = f""" - WITH all_receipts AS ( - SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering - FROM receipts_linearized - LEFT JOIN events USING (room_id, event_id) - WHERE - {receipt_types_clause} - AND user_id = ? - GROUP BY room_id, thread_id - ) - """ - - receipts_joins = """ - LEFT JOIN ( - SELECT room_id, thread_id, - max_receipt_stream_ordering AS threaded_receipt_stream_ordering - FROM all_receipts - WHERE thread_id IS NOT NULL - ) AS threaded_receipts USING (room_id, thread_id) - LEFT JOIN ( - SELECT room_id, thread_id, - max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering - FROM all_receipts - WHERE thread_id IS NULL - ) AS unthreaded_receipts USING (room_id) - """ - - # First get summary counts by room / thread for the user. We use the max receipt - # stream ordering of both threaded & unthreaded receipts to compare against the - # summary table. - # - # PostgreSQL and SQLite differ in comparing scalar numerics. - if isinstance(self.database_engine, PostgresEngine): - # GREATEST ignores NULLs. - max_clause = """GREATEST( - threaded_receipt_stream_ordering, - unthreaded_receipt_stream_ordering - )""" - else: - # MAX returns NULL if any are NULL, so COALESCE to 0 first. - max_clause = """MAX( - COALESCE(threaded_receipt_stream_ordering, 0), - COALESCE(unthreaded_receipt_stream_ordering, 0) - )""" + # Step 1, fetch all counts from `event_push_summary` for the user. This + # is slightly convoluted as we also need to pull out the stream ordering + # of the most recent receipt of the user in the room (either a thread + # aware receipt or thread unaware receipt) in order to determine + # whether the row in `event_push_summary` is stale. Hence the outer + # GROUP BY and odd join condition against `receipts_linearized`. sql = f""" - {receipts_cte} - SELECT eps.room_id, eps.thread_id, notif_count - FROM event_push_summary AS eps - {receipts_joins} - WHERE user_id = ? - AND notif_count != 0 - AND ( - (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause}) - OR last_receipt_stream_ordering = {max_clause} + SELECT room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering, + MAX(receipt_stream_ordering) + FROM ( + SELECT e.room_id, notif_count, e.stream_ordering, e.thread_id, last_receipt_stream_ordering, + ev.stream_ordering AS receipt_stream_ordering + FROM event_push_summary AS e + INNER JOIN local_current_membership USING (user_id, room_id) + LEFT JOIN receipts_linearized AS r ON ( + e.user_id = r.user_id + AND e.room_id = r.room_id + AND (e.thread_id = r.thread_id OR r.thread_id IS NULL) + AND {receipt_types_clause} ) + LEFT JOIN events AS ev ON (r.event_id = ev.event_id) + WHERE e.user_id = ? and notif_count > 0 + ) AS es + GROUP BY room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering """ - txn.execute(sql, args) - - seen_thread_ids = set() - room_to_count: Dict[str, int] = defaultdict(int) - for room_id, thread_id, notif_count in txn: - room_to_count[room_id] += notif_count - seen_thread_ids.add(thread_id) + txn.execute( + sql, + receipt_types_args + + [ + user_id, + ], + ) - # Now get any event push actions that haven't been rotated using the same OR - # join and filter by receipt and event push summary rotated up to stream ordering. - sql = f""" - {receipts_cte} - SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count - FROM event_push_actions AS epa - {receipts_joins} - WHERE user_id = ? - AND epa.notif = 1 - AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering) - AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) - AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) - GROUP BY epa.room_id, epa.thread_id - """ - txn.execute(sql, args) + room_to_count: Dict[str, int] = defaultdict(int) + stale_room_ids = set() + for row in txn: + room_id = row[0] + notif_count = row[1] + stream_ordering = row[2] + _thread_id = row[3] + last_receipt_stream_ordering = row[4] + receipt_stream_ordering = row[5] + + if last_receipt_stream_ordering is None: + if receipt_stream_ordering is None: + room_to_count[room_id] += notif_count + elif stream_ordering > receipt_stream_ordering: + room_to_count[room_id] += notif_count + else: + # The latest read receipt from the user is after all the rows for + # this room in `event_push_summary`. We ignore them, and + # calculate the count from `event_push_actions` in step 3. + pass + elif last_receipt_stream_ordering == receipt_stream_ordering: + room_to_count[room_id] += notif_count + else: + # The row is stale if `last_receipt_stream_ordering` is set and + # *doesn't* match the latest receipt from the user. + stale_room_ids.add(room_id) - for room_id, thread_id, notif_count in txn: - # Note: only count push actions we have valid summaries for with up to date receipt. - if thread_id not in seen_thread_ids: - continue - room_to_count[room_id] += notif_count + # Discard any stale rooms from `room_to_count`, as we will recalculate + # them in step 3. + for room_id in stale_room_ids: + room_to_count.pop(room_id, None) - thread_id_clause, thread_ids_args = make_in_list_sql_clause( - self.database_engine, "epa.thread_id", seen_thread_ids + # Step 2, basically the same query, except against `event_push_actions` + # and only fetching rows inserted since the last rotation. + rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", ) - # Finally re-check event_push_actions for any rooms not in the summary, ignoring - # the rotated up-to position. This handles the case where a read receipt has arrived - # but not been rotated meaning the summary table is out of date, so we go back to - # the push actions table. sql = f""" - {receipts_cte} - SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count - FROM event_push_actions AS epa - {receipts_joins} - WHERE user_id = ? - AND NOT {thread_id_clause} - AND epa.notif = 1 - AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering) - AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering) - GROUP BY epa.room_id + SELECT room_id, thread_id + FROM ( + SELECT e.room_id, e.stream_ordering, e.thread_id, + ev.stream_ordering AS receipt_stream_ordering + FROM event_push_actions AS e + INNER JOIN local_current_membership USING (user_id, room_id) + LEFT JOIN receipts_linearized AS r ON ( + e.user_id = r.user_id + AND e.room_id = r.room_id + AND (e.thread_id = r.thread_id OR r.thread_id IS NULL) + AND {receipt_types_clause} + ) + LEFT JOIN events AS ev ON (r.event_id = ev.event_id) + WHERE e.user_id = ? and notif > 0 + AND e.stream_ordering > ? + ) AS es + GROUP BY room_id, stream_ordering, thread_id + HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0) """ - args.extend(thread_ids_args) - txn.execute(sql, args) + txn.execute( + sql, + receipt_types_args + [user_id, rotated_upto_stream_ordering], + ) + for room_id, _thread_id in txn: + # Again, we ignore any stale rooms. + if room_id not in stale_room_ids: + # For event push actions it is one notification per row. + room_to_count[room_id] += 1 + + # Step 3, if we have stale rooms then we need to recalculate the counts + # from `event_push_actions`. Again, this is basically the same query as + # above except without a lower bound on stream ordering and only against + # a specific set of rooms. + if stale_room_ids: + room_id_clause, room_id_args = make_in_list_sql_clause( + self.database_engine, + "e.room_id", + stale_room_ids, + ) - for room_id, notif_count in txn: - room_to_count[room_id] += notif_count + sql = f""" + SELECT room_id, thread_id + FROM ( + SELECT e.room_id, e.stream_ordering, e.thread_id, + ev.stream_ordering AS receipt_stream_ordering + FROM event_push_actions AS e + INNER JOIN local_current_membership USING (user_id, room_id) + LEFT JOIN receipts_linearized AS r ON ( + e.user_id = r.user_id + AND e.room_id = r.room_id + AND (e.thread_id = r.thread_id OR r.thread_id IS NULL) + AND {receipt_types_clause} + ) + LEFT JOIN events AS ev ON (r.event_id = ev.event_id) + WHERE e.user_id = ? and notif > 0 + AND {room_id_clause} + ) AS es + GROUP BY room_id, stream_ordering, thread_id + HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0) + """ + txn.execute( + sql, + receipt_types_args + [user_id] + room_id_args, + ) + for room_id, _ in txn: + room_to_count[room_id] += 1 return room_to_count |