summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-01-11 11:52:13 +0000
committerGitHub <noreply@github.com>2024-01-11 11:52:13 +0000
commitb11f7b5122061d4908b3328689486bc16dc58445 (patch)
treef937c9d6f2e1b34653d7adc3b4024bafbbda7b0e /synapse/storage
parentBump pillow from 10.1.0 to 10.2.0 (#16802) (diff)
downloadsynapse-b11f7b5122061d4908b3328689486bc16dc58445.tar.xz
Improve DB performance of calculating badge counts for push. (#16756)
The crux of the change is to try and make the queries simpler and pull
out fewer rows. Before, there were quite a few joins against subqueries,
which caused postgres to pull out more rows than necessary.

Instead, let's simplify the query and do some of the filtering out in
Python instead, letting Postgres do better optimizations now that it
doesn't have to deal with joins against subqueries.

Review note: this is a complete rewrite of the function, so not sure how
useful the diff is.

---------

Co-authored-by: Andrew Morgan <1342360+anoadragon453@users.noreply.github.com>
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/event_push_actions.py253
1 files changed, 146 insertions, 107 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 650b8c8135..6d4e2942ea 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -357,10 +357,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         This function is intentionally not cached because it is called to calculate the
         unread badge for push notifications and thus the result is expected to change.
 
-        Note that this function assumes the user is a member of the room. Because
-        summary rows are not removed when a user leaves a room, the caller must
-        filter out those results from the result.
-
         Returns:
             A map of room ID to notification counts for the given user.
         """
@@ -373,127 +369,170 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
     def _get_unread_counts_by_room_for_user_txn(
         self, txn: LoggingTransaction, user_id: str
     ) -> Dict[str, int]:
-        receipt_types_clause, args = make_in_list_sql_clause(
+        # To get the badge count of all rooms we need to make three queries:
+        #   1. Fetch all counts from `event_push_summary`, discarding any stale
+        #      rooms.
+        #   2. Fetch all notifications from `event_push_actions` that haven't
+        #      been rotated yet.
+        #   3. Fetch all notifications from `event_push_actions` for the stale
+        #      rooms.
+        #
+        # The "stale room" scenario generally happens when there is a new read
+        # receipt that hasn't yet been processed to update the
+        # `event_push_summary` table. When that happens we ignore the
+        # `event_push_summary` table for that room and calculate the count
+        # manually from `event_push_actions`.
+
+        # We need to only take into account read receipts of these types.
+        receipt_types_clause, receipt_types_args = make_in_list_sql_clause(
             self.database_engine,
             "receipt_type",
             (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
         )
-        args.extend([user_id, user_id])
-
-        receipts_cte = f"""
-            WITH all_receipts AS (
-                SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
-                FROM receipts_linearized
-                LEFT JOIN events USING (room_id, event_id)
-                WHERE
-                    {receipt_types_clause}
-                    AND user_id = ?
-                GROUP BY room_id, thread_id
-            )
-        """
-
-        receipts_joins = """
-            LEFT JOIN (
-                SELECT room_id, thread_id,
-                max_receipt_stream_ordering AS threaded_receipt_stream_ordering
-                FROM all_receipts
-                WHERE thread_id IS NOT NULL
-            ) AS threaded_receipts USING (room_id, thread_id)
-            LEFT JOIN (
-                SELECT room_id, thread_id,
-                max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering
-                FROM all_receipts
-                WHERE thread_id IS NULL
-            ) AS unthreaded_receipts USING (room_id)
-        """
-
-        # First get summary counts by room / thread for the user. We use the max receipt
-        # stream ordering of both threaded & unthreaded receipts to compare against the
-        # summary table.
-        #
-        # PostgreSQL and SQLite differ in comparing scalar numerics.
-        if isinstance(self.database_engine, PostgresEngine):
-            # GREATEST ignores NULLs.
-            max_clause = """GREATEST(
-                threaded_receipt_stream_ordering,
-                unthreaded_receipt_stream_ordering
-            )"""
-        else:
-            # MAX returns NULL if any are NULL, so COALESCE to 0 first.
-            max_clause = """MAX(
-                COALESCE(threaded_receipt_stream_ordering, 0),
-                COALESCE(unthreaded_receipt_stream_ordering, 0)
-            )"""
 
+        # Step 1, fetch all counts from `event_push_summary` for the user. This
+        # is slightly convoluted as we also need to pull out the stream ordering
+        # of the most recent receipt of the user in the room (either a thread
+        # aware receipt or thread unaware receipt) in order to determine
+        # whether the row in `event_push_summary` is stale. Hence the outer
+        # GROUP BY and odd join condition against `receipts_linearized`.
         sql = f"""
-            {receipts_cte}
-            SELECT eps.room_id, eps.thread_id, notif_count
-            FROM event_push_summary AS eps
-            {receipts_joins}
-            WHERE user_id = ?
-                AND notif_count != 0
-                AND (
-                    (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause})
-                    OR last_receipt_stream_ordering = {max_clause}
+            SELECT room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering,
+                MAX(receipt_stream_ordering)
+            FROM (
+                SELECT e.room_id, notif_count, e.stream_ordering, e.thread_id, last_receipt_stream_ordering,
+                    ev.stream_ordering AS receipt_stream_ordering
+                FROM event_push_summary AS e
+                INNER JOIN local_current_membership USING (user_id, room_id)
+                LEFT JOIN receipts_linearized AS r ON (
+                    e.user_id = r.user_id
+                    AND e.room_id = r.room_id
+                    AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
+                    AND {receipt_types_clause}
                 )
+                LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
+                WHERE e.user_id = ? and notif_count > 0
+            ) AS es
+            GROUP BY room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering
         """
-        txn.execute(sql, args)
-
-        seen_thread_ids = set()
-        room_to_count: Dict[str, int] = defaultdict(int)
 
-        for room_id, thread_id, notif_count in txn:
-            room_to_count[room_id] += notif_count
-            seen_thread_ids.add(thread_id)
+        txn.execute(
+            sql,
+            receipt_types_args
+            + [
+                user_id,
+            ],
+        )
 
-        # Now get any event push actions that haven't been rotated using the same OR
-        # join and filter by receipt and event push summary rotated up to stream ordering.
-        sql = f"""
-            {receipts_cte}
-            SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
-            FROM event_push_actions AS epa
-            {receipts_joins}
-            WHERE user_id = ?
-                AND epa.notif = 1
-                AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
-                AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
-                AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
-            GROUP BY epa.room_id, epa.thread_id
-        """
-        txn.execute(sql, args)
+        room_to_count: Dict[str, int] = defaultdict(int)
+        stale_room_ids = set()
+        for row in txn:
+            room_id = row[0]
+            notif_count = row[1]
+            stream_ordering = row[2]
+            _thread_id = row[3]
+            last_receipt_stream_ordering = row[4]
+            receipt_stream_ordering = row[5]
+
+            if last_receipt_stream_ordering is None:
+                if receipt_stream_ordering is None:
+                    room_to_count[room_id] += notif_count
+                elif stream_ordering > receipt_stream_ordering:
+                    room_to_count[room_id] += notif_count
+                else:
+                    # The latest read receipt from the user is after all the rows for
+                    # this room in `event_push_summary`. We ignore them, and
+                    # calculate the count from `event_push_actions` in step 3.
+                    pass
+            elif last_receipt_stream_ordering == receipt_stream_ordering:
+                room_to_count[room_id] += notif_count
+            else:
+                # The row is stale if `last_receipt_stream_ordering` is set and
+                # *doesn't* match the latest receipt from the user.
+                stale_room_ids.add(room_id)
 
-        for room_id, thread_id, notif_count in txn:
-            # Note: only count push actions we have valid summaries for with up to date receipt.
-            if thread_id not in seen_thread_ids:
-                continue
-            room_to_count[room_id] += notif_count
+        # Discard any stale rooms from `room_to_count`, as we will recalculate
+        # them in step 3.
+        for room_id in stale_room_ids:
+            room_to_count.pop(room_id, None)
 
-        thread_id_clause, thread_ids_args = make_in_list_sql_clause(
-            self.database_engine, "epa.thread_id", seen_thread_ids
+        # Step 2, basically the same query, except against `event_push_actions`
+        # and only fetching rows inserted since the last rotation.
+        rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
+            txn,
+            table="event_push_summary_stream_ordering",
+            keyvalues={},
+            retcol="stream_ordering",
         )
 
-        # Finally re-check event_push_actions for any rooms not in the summary, ignoring
-        # the rotated up-to position. This handles the case where a read receipt has arrived
-        # but not been rotated meaning the summary table is out of date, so we go back to
-        # the push actions table.
         sql = f"""
-            {receipts_cte}
-            SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
-            FROM event_push_actions AS epa
-            {receipts_joins}
-            WHERE user_id = ?
-            AND NOT {thread_id_clause}
-            AND epa.notif = 1
-            AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
-            AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
-            GROUP BY epa.room_id
+            SELECT room_id, thread_id
+            FROM (
+                SELECT e.room_id, e.stream_ordering, e.thread_id,
+                    ev.stream_ordering AS receipt_stream_ordering
+                FROM event_push_actions AS e
+                INNER JOIN local_current_membership USING (user_id, room_id)
+                LEFT JOIN receipts_linearized AS r ON (
+                    e.user_id = r.user_id
+                    AND e.room_id = r.room_id
+                    AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
+                    AND {receipt_types_clause}
+                )
+                LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
+                WHERE e.user_id = ? and notif > 0
+                    AND e.stream_ordering > ?
+            ) AS es
+            GROUP BY room_id, stream_ordering, thread_id
+            HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0)
         """
 
-        args.extend(thread_ids_args)
-        txn.execute(sql, args)
+        txn.execute(
+            sql,
+            receipt_types_args + [user_id, rotated_upto_stream_ordering],
+        )
+        for room_id, _thread_id in txn:
+            # Again, we ignore any stale rooms.
+            if room_id not in stale_room_ids:
+                # For event push actions it is one notification per row.
+                room_to_count[room_id] += 1
+
+        # Step 3, if we have stale rooms then we need to recalculate the counts
+        # from `event_push_actions`. Again, this is basically the same query as
+        # above except without a lower bound on stream ordering and only against
+        # a specific set of rooms.
+        if stale_room_ids:
+            room_id_clause, room_id_args = make_in_list_sql_clause(
+                self.database_engine,
+                "e.room_id",
+                stale_room_ids,
+            )
 
-        for room_id, notif_count in txn:
-            room_to_count[room_id] += notif_count
+            sql = f"""
+                SELECT room_id, thread_id
+                FROM (
+                    SELECT e.room_id, e.stream_ordering, e.thread_id,
+                        ev.stream_ordering AS receipt_stream_ordering
+                    FROM event_push_actions AS e
+                    INNER JOIN local_current_membership USING (user_id, room_id)
+                    LEFT JOIN receipts_linearized AS r ON (
+                        e.user_id = r.user_id
+                        AND e.room_id = r.room_id
+                        AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
+                        AND {receipt_types_clause}
+                    )
+                    LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
+                    WHERE e.user_id = ? and notif > 0
+                        AND {room_id_clause}
+                ) AS es
+                GROUP BY room_id, stream_ordering, thread_id
+                HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0)
+            """
+            txn.execute(
+                sql,
+                receipt_types_args + [user_id] + room_id_args,
+            )
+            for room_id, _ in txn:
+                room_to_count[room_id] += 1
 
         return room_to_count