summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2022-08-15 09:33:17 -0400
committerGitHub <noreply@github.com>2022-08-15 09:33:17 -0400
commit46bd7f4ed9020bbed459c03a11c26d7f7c3093b0 (patch)
tree7e10433b52e7a452674895daac245e98f62e226a
parentUpdate locked versions of mypy and mypy-zope (#13521) (diff)
downloadsynapse-46bd7f4ed9020bbed459c03a11c26d7f7c3093b0.tar.xz
Clarifications for event push action processing. (#13485)
* Clarifies comments.
* Fixes an erroneous comment (about return type) added in #13455
  (ec24813220f9d54108924dc04aecd24555277b99).
* Clarifies the name of a variable.
* Simplifies logic of pulling out the latest join for the requesting user.
-rw-r--r--changelog.d/13485.misc1
-rw-r--r--synapse/storage/databases/main/event_push_actions.py53
-rw-r--r--synapse/storage/databases/main/receipts.py2
3 files changed, 35 insertions, 21 deletions
diff --git a/changelog.d/13485.misc b/changelog.d/13485.misc
new file mode 100644
index 0000000000..c75712b9ff
--- /dev/null
+++ b/changelog.d/13485.misc
@@ -0,0 +1 @@
+Add comments about how event push actions are rotated.
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 161aad0f89..f62aa45ca1 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -227,7 +227,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         user_id: str,
     ) -> NotifCounts:
         """Get the notification count, the highlight count and the unread message count
-        for a given user in a given room after the given read receipt.
+        for a given user in a given room after their latest read receipt.
 
         Note that this function assumes the user to be a current member of the room,
         since it's either called by the sync handler to handle joined room entries, or by
@@ -238,9 +238,8 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             user_id: The user to retrieve the counts for.
 
         Returns
-            A dict containing the counts mentioned earlier in this docstring,
-            respectively under the keys "notify_count", "highlight_count" and
-            "unread_count".
+            A NotifCounts object containing the notification count, the highlight count
+            and the unread message count.
         """
         return await self.db_pool.runInteraction(
             "get_unread_event_push_actions_by_room",
@@ -255,6 +254,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         room_id: str,
         user_id: str,
     ) -> NotifCounts:
+        # Get the stream ordering of the user's latest receipt in the room.
         result = self.get_last_receipt_for_user_txn(
             txn,
             user_id,
@@ -266,13 +266,11 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             ),
         )
 
-        stream_ordering = None
         if result:
             _, stream_ordering = result
 
-        if stream_ordering is None:
-            # Either last_read_event_id is None, or it's an event we don't have (e.g.
-            # because it's been purged), in which case retrieve the stream ordering for
+        else:
+            # If the user has no receipts in the room, retrieve the stream ordering for
             # the latest membership event from this user in this room (which we assume is
             # a join).
             event_id = self.db_pool.simple_select_one_onecol_txn(
@@ -289,10 +287,26 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
         )
 
     def _get_unread_counts_by_pos_txn(
-        self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
+        self,
+        txn: LoggingTransaction,
+        room_id: str,
+        user_id: str,
+        receipt_stream_ordering: int,
     ) -> NotifCounts:
         """Get the number of unread messages for a user/room that have happened
         since the given stream ordering.
+
+        Args:
+            txn: The database transaction.
+            room_id: The room ID to get unread counts for.
+            user_id: The user ID to get unread counts for.
+            receipt_stream_ordering: The stream ordering of the user's latest
+                receipt in the room. If there are no receipts, the stream ordering
+                of the user's join event.
+
+        Returns
+            A NotifCounts object containing the notification count, the highlight count
+            and the unread message count.
         """
 
         counts = NotifCounts()
@@ -320,7 +334,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                     OR last_receipt_stream_ordering = ?
                 )
             """,
-            (room_id, user_id, stream_ordering, stream_ordering),
+            (room_id, user_id, receipt_stream_ordering, receipt_stream_ordering),
         )
         row = txn.fetchone()
 
@@ -338,17 +352,20 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
                 AND stream_ordering > ?
                 AND highlight = 1
         """
-        txn.execute(sql, (user_id, room_id, stream_ordering))
+        txn.execute(sql, (user_id, room_id, receipt_stream_ordering))
         row = txn.fetchone()
         if row:
             counts.highlight_count += row[0]
 
         # Finally we need to count push actions that aren't included in the
-        # summary returned above, e.g. recent events that haven't been
-        # summarised yet, or the summary is empty due to a recent read receipt.
-        stream_ordering = max(stream_ordering, summary_stream_ordering)
+        # summary returned above. This might be due to recent events that haven't
+        # been summarised yet or the summary is out of date due to a recent read
+        # receipt.
+        start_unread_stream_ordering = max(
+            receipt_stream_ordering, summary_stream_ordering
+        )
         notify_count, unread_count = self._get_notif_unread_count_for_user_room(
-            txn, room_id, user_id, stream_ordering
+            txn, room_id, user_id, start_unread_stream_ordering
         )
 
         counts.notify_count += notify_count
@@ -1151,8 +1168,6 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             txn: The database transaction.
             old_rotate_stream_ordering: The previous maximum event stream ordering.
             rotate_to_stream_ordering: The new maximum event stream ordering to summarise.
-
-        Returns whether the archiving process has caught up or not.
         """
 
         # Calculate the new counts that should be upserted into event_push_summary
@@ -1238,9 +1253,7 @@ class EventPushActionsWorkerStore(ReceiptsWorkerStore, StreamWorkerStore, SQLBas
             (rotate_to_stream_ordering,),
         )
 
-    async def _remove_old_push_actions_that_have_rotated(
-        self,
-    ) -> None:
+    async def _remove_old_push_actions_that_have_rotated(self) -> None:
         """Clear out old push actions that have been summarised."""
 
         # We want to clear out anything that is older than a day that *has* already
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index 0090c9f225..124c70ad37 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -161,7 +161,7 @@ class ReceiptsWorkerStore(SQLBaseStore):
             receipt_type: The receipt types to fetch.
 
         Returns:
-            The latest receipt, if one exists.
+            The event ID and stream ordering of the latest receipt, if one exists.
         """
 
         clause, args = make_in_list_sql_clause(