summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/push/push_tools.py8
-rw-r--r--synapse/storage/databases/main/event_push_actions.py253
2 files changed, 114 insertions, 147 deletions
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 76c7ab6477..1ef881f702 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -29,11 +29,17 @@ from synapse.storage.databases.main import DataStore
 
 async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
     invites = await store.get_invited_rooms_for_local_user(user_id)
+    joins = await store.get_rooms_for_user(user_id)
 
     badge = len(invites)
 
     room_to_count = await store.get_unread_counts_by_room_for_user(user_id)
-    for _room_id, notify_count in room_to_count.items():
+    for room_id, notify_count in room_to_count.items():
+        # room_to_count may include rooms which the user has left,
+        # ignore those.
+        if room_id not in joins:
+            continue
+
         if notify_count == 0:
             continue
 
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index d7aa8a0ee0..3a5666cd9b 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -358,6 +358,10 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         This function is intentionally not cached because it is called to calculate the
         unread badge for push notifications and thus the result is expected to change.
 
+        Note that this function assumes the user is a member of the room. Because
+        summary rows are not removed when a user leaves a room, the caller must
+        filter out those results from the result.
+
         Returns:
             A map of room ID to notification counts for the given user.
         """
@@ -370,170 +374,127 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
     def _get_unread_counts_by_room_for_user_txn(
         self, txn: LoggingTransaction, user_id: str
     ) -> Dict[str, int]:
-        # To get the badge count of all rooms we need to make three queries:
-        #   1. Fetch all counts from `event_push_summary`, discarding any stale
-        #      rooms.
-        #   2. Fetch all notifications from `event_push_actions` that haven't
-        #      been rotated yet.
-        #   3. Fetch all notifications from `event_push_actions` for the stale
-        #      rooms.
-        #
-        # The "stale room" scenario generally happens when there is a new read
-        # receipt that hasn't yet been processed to update the
-        # `event_push_summary` table. When that happens we ignore the
-        # `event_push_summary` table for that room and calculate the count
-        # manually from `event_push_actions`.
-
-        # We need to only take into account read receipts of these types.
-        receipt_types_clause, receipt_types_args = make_in_list_sql_clause(
+        receipt_types_clause, args = make_in_list_sql_clause(
             self.database_engine,
             "receipt_type",
             (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE),
         )
+        args.extend([user_id, user_id])
+
+        receipts_cte = f"""
+            WITH all_receipts AS (
+                SELECT room_id, thread_id, MAX(event_stream_ordering) AS max_receipt_stream_ordering
+                FROM receipts_linearized
+                LEFT JOIN events USING (room_id, event_id)
+                WHERE
+                    {receipt_types_clause}
+                    AND user_id = ?
+                GROUP BY room_id, thread_id
+            )
+        """
+
+        receipts_joins = """
+            LEFT JOIN (
+                SELECT room_id, thread_id,
+                max_receipt_stream_ordering AS threaded_receipt_stream_ordering
+                FROM all_receipts
+                WHERE thread_id IS NOT NULL
+            ) AS threaded_receipts USING (room_id, thread_id)
+            LEFT JOIN (
+                SELECT room_id, thread_id,
+                max_receipt_stream_ordering AS unthreaded_receipt_stream_ordering
+                FROM all_receipts
+                WHERE thread_id IS NULL
+            ) AS unthreaded_receipts USING (room_id)
+        """
+
+        # First get summary counts by room / thread for the user. We use the max receipt
+        # stream ordering of both threaded & unthreaded receipts to compare against the
+        # summary table.
+        #
+        # PostgreSQL and SQLite differ in comparing scalar numerics.
+        if isinstance(self.database_engine, PostgresEngine):
+            # GREATEST ignores NULLs.
+            max_clause = """GREATEST(
+                threaded_receipt_stream_ordering,
+                unthreaded_receipt_stream_ordering
+            )"""
+        else:
+            # MAX returns NULL if any are NULL, so COALESCE to 0 first.
+            max_clause = """MAX(
+                COALESCE(threaded_receipt_stream_ordering, 0),
+                COALESCE(unthreaded_receipt_stream_ordering, 0)
+            )"""
 
-        # Step 1, fetch all counts from `event_push_summary` for the user. This
-        # is slightly convoluted as we also need to pull out the stream ordering
-        # of the most recent receipt of the user in the room (either a thread
-        # aware receipt or thread unaware receipt) in order to determine
-        # whether the row in `event_push_summary` is stale. Hence the outer
-        # GROUP BY and odd join condition against `receipts_linearized`.
         sql = f"""
-            SELECT room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering,
-                MAX(receipt_stream_ordering)
-            FROM (
-                SELECT e.room_id, notif_count, e.stream_ordering, e.thread_id, last_receipt_stream_ordering,
-                    ev.stream_ordering AS receipt_stream_ordering
-                FROM event_push_summary AS e
-                INNER JOIN local_current_membership USING (user_id, room_id)
-                LEFT JOIN receipts_linearized AS r ON (
-                    e.user_id = r.user_id
-                    AND e.room_id = r.room_id
-                    AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
-                    AND {receipt_types_clause}
+            {receipts_cte}
+            SELECT eps.room_id, eps.thread_id, notif_count
+            FROM event_push_summary AS eps
+            {receipts_joins}
+            WHERE user_id = ?
+                AND notif_count != 0
+                AND (
+                    (last_receipt_stream_ordering IS NULL AND stream_ordering > {max_clause})
+                    OR last_receipt_stream_ordering = {max_clause}
                 )
-                LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
-                WHERE e.user_id = ? and notif_count > 0
-            ) AS es
-            GROUP BY room_id, notif_count, stream_ordering, thread_id, last_receipt_stream_ordering
         """
+        txn.execute(sql, args)
 
-        txn.execute(
-            sql,
-            receipt_types_args
-            + [
-                user_id,
-            ],
-        )
-
+        seen_thread_ids = set()
         room_to_count: Dict[str, int] = defaultdict(int)
-        stale_room_ids = set()
-        for row in txn:
-            room_id = row[0]
-            notif_count = row[1]
-            stream_ordering = row[2]
-            _thread_id = row[3]
-            last_receipt_stream_ordering = row[4]
-            receipt_stream_ordering = row[5]
-
-            if last_receipt_stream_ordering is None:
-                if receipt_stream_ordering is None:
-                    room_to_count[room_id] += notif_count
-                elif stream_ordering > receipt_stream_ordering:
-                    room_to_count[room_id] += notif_count
-                else:
-                    # The latest read receipt from the user is after all the rows for
-                    # this room in `event_push_summary`. We ignore them, and
-                    # calculate the count from `event_push_actions` in step 3.
-                    pass
-            elif last_receipt_stream_ordering == receipt_stream_ordering:
-                room_to_count[room_id] += notif_count
-            else:
-                # The row is stale if `last_receipt_stream_ordering` is set and
-                # *doesn't* match the latest receipt from the user.
-                stale_room_ids.add(room_id)
 
-        # Discard any stale rooms from `room_to_count`, as we will recalculate
-        # them in step 3.
-        for room_id in stale_room_ids:
-            room_to_count.pop(room_id, None)
+        for room_id, thread_id, notif_count in txn:
+            room_to_count[room_id] += notif_count
+            seen_thread_ids.add(thread_id)
 
-        # Step 2, basically the same query, except against `event_push_actions`
-        # and only fetching rows inserted since the last rotation.
-        rotated_upto_stream_ordering = self.db_pool.simple_select_one_onecol_txn(
-            txn,
-            table="event_push_summary_stream_ordering",
-            keyvalues={},
-            retcol="stream_ordering",
+        # Now get any event push actions that haven't been rotated using the same OR
+        # join and filter by receipt and event push summary rotated up to stream ordering.
+        sql = f"""
+            {receipts_cte}
+            SELECT epa.room_id, epa.thread_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+            FROM event_push_actions AS epa
+            {receipts_joins}
+            WHERE user_id = ?
+                AND epa.notif = 1
+                AND stream_ordering > (SELECT stream_ordering FROM event_push_summary_stream_ordering)
+                AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+                AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+            GROUP BY epa.room_id, epa.thread_id
+        """
+        txn.execute(sql, args)
+
+        for room_id, thread_id, notif_count in txn:
+            # Note: only count push actions we have valid summaries for with up to date receipt.
+            if thread_id not in seen_thread_ids:
+                continue
+            room_to_count[room_id] += notif_count
+
+        thread_id_clause, thread_ids_args = make_in_list_sql_clause(
+            self.database_engine, "epa.thread_id", seen_thread_ids
         )
 
+        # Finally re-check event_push_actions for any rooms not in the summary, ignoring
+        # the rotated up-to position. This handles the case where a read receipt has arrived
+        # but not been rotated meaning the summary table is out of date, so we go back to
+        # the push actions table.
         sql = f"""
-            SELECT room_id, thread_id
-            FROM (
-                SELECT e.room_id, e.stream_ordering, e.thread_id,
-                    ev.stream_ordering AS receipt_stream_ordering
-                FROM event_push_actions AS e
-                INNER JOIN local_current_membership USING (user_id, room_id)
-                LEFT JOIN receipts_linearized AS r ON (
-                    e.user_id = r.user_id
-                    AND e.room_id = r.room_id
-                    AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
-                    AND {receipt_types_clause}
-                )
-                LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
-                WHERE e.user_id = ? and notif > 0
-                    AND e.stream_ordering > ?
-            ) AS es
-            GROUP BY room_id, stream_ordering, thread_id
-            HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0)
+            {receipts_cte}
+            SELECT epa.room_id, COUNT(CASE WHEN epa.notif = 1 THEN 1 END) AS notif_count
+            FROM event_push_actions AS epa
+            {receipts_joins}
+            WHERE user_id = ?
+            AND NOT {thread_id_clause}
+            AND epa.notif = 1
+            AND (threaded_receipt_stream_ordering IS NULL OR stream_ordering > threaded_receipt_stream_ordering)
+            AND (unthreaded_receipt_stream_ordering IS NULL OR stream_ordering > unthreaded_receipt_stream_ordering)
+            GROUP BY epa.room_id
         """
 
-        txn.execute(
-            sql,
-            receipt_types_args + [user_id, rotated_upto_stream_ordering],
-        )
-        for room_id, _thread_id in txn:
-            # Again, we ignore any stale rooms.
-            if room_id not in stale_room_ids:
-                # For event push actions it is one notification per row.
-                room_to_count[room_id] += 1
-
-        # Step 3, if we have stale rooms then we need to recalculate the counts
-        # from `event_push_actions`. Again, this is basically the same query as
-        # above except without a lower bound on stream ordering and only against
-        # a specific set of rooms.
-        if stale_room_ids:
-            room_id_clause, room_id_args = make_in_list_sql_clause(
-                self.database_engine,
-                "e.room_id",
-                stale_room_ids,
-            )
+        args.extend(thread_ids_args)
+        txn.execute(sql, args)
 
-            sql = f"""
-                SELECT room_id, thread_id
-                FROM (
-                    SELECT e.room_id, e.stream_ordering, e.thread_id,
-                        ev.stream_ordering AS receipt_stream_ordering
-                    FROM event_push_actions AS e
-                    INNER JOIN local_current_membership USING (user_id, room_id)
-                    LEFT JOIN receipts_linearized AS r ON (
-                        e.user_id = r.user_id
-                        AND e.room_id = r.room_id
-                        AND (e.thread_id = r.thread_id OR r.thread_id IS NULL)
-                        AND {receipt_types_clause}
-                    )
-                    LEFT JOIN events AS ev ON (r.event_id = ev.event_id)
-                    WHERE e.user_id = ? and notif > 0
-                        AND {room_id_clause}
-                ) AS es
-                GROUP BY room_id, stream_ordering, thread_id
-                HAVING stream_ordering > COALESCE(MAX(receipt_stream_ordering), 0)
-            """
-            txn.execute(
-                sql,
-                receipt_types_args + [user_id] + room_id_args,
-            )
-            for room_id, _ in txn:
-                room_to_count[room_id] += 1
+        for room_id, notif_count in txn:
+            room_to_count[room_id] += notif_count
 
         return room_to_count