summary refs log tree commit diff
diff options
context:
space:
mode:
authorPatrick Cloke <patrickc@matrix.org>2022-05-26 16:07:39 -0400
committerPatrick Cloke <patrickc@matrix.org>2022-06-13 09:57:05 -0400
commit580fbb740fd5a601b76efebce1f4e6055842d912 (patch)
tree9818e48d3ac91f9bb39bff77f8816f49340508b7
parentUse the ranged receipts table when fetching receipts for /sync. (diff)
downloadsynapse-580fbb740fd5a601b76efebce1f4e6055842d912.tar.xz
Discard notifications in ranges.
-rw-r--r--synapse/storage/databases/main/event_push_actions.py38
-rw-r--r--synapse/storage/databases/main/receipts.py13
2 files changed, 38 insertions, 13 deletions
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index b019979350..b0b3695012 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -927,8 +927,13 @@ class EventPushActionsWorkerStore(SQLBaseStore):
             (rotate_to_stream_ordering,),
         )
 
-    def _remove_old_push_actions_before_txn(
-        self, txn: LoggingTransaction, room_id: str, user_id: str, stream_ordering: int
+    def _remove_old_push_actions_txn(
+        self,
+        txn: LoggingTransaction,
+        room_id: str,
+        user_id: str,
+        end_stream_ordering: int,
+        start_stream_ordering: Optional[int],
     ) -> None:
         """
         Purges old push actions for a user and room before a given
@@ -957,20 +962,33 @@ class EventPushActionsWorkerStore(SQLBaseStore):
         # Instead, we look up the stream ordering for the last event in that
         # room received before the threshold time and delete event_push_actions
         # in the room with a stream_odering before that.
+        if start_stream_ordering is None:
+            stream_ordering_clause = "stream_ordering <= ?"
+            stream_ordering_args: Tuple[int, ...] = (end_stream_ordering,)
+        else:
+            stream_ordering_clause = "stream_ordering >= ? AND stream_ordering <= ?"
+            stream_ordering_args = (start_stream_ordering, end_stream_ordering)
+
         txn.execute(
-            "DELETE FROM event_push_actions "
-            " WHERE user_id = ? AND room_id = ? AND "
-            " stream_ordering <= ?"
-            " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)",
-            (user_id, room_id, stream_ordering, self.stream_ordering_month_ago),
+            f"""
+            DELETE FROM event_push_actions
+            WHERE user_id = ? AND room_id = ?
+            AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)
+            AND {stream_ordering_clause}
+            """,
+            (user_id, room_id, self.stream_ordering_month_ago) + stream_ordering_args,
         )
 
+        # XXX What to do about these summaries? They're currently updated daily.
+        #     Deleting a chunk of them if any region overlaps seems suspect.
+        #     Maybe we can do a daily update to limit the damage? That would not
+        #     give true unread status per event, however.
         txn.execute(
-            """
+            f"""
             DELETE FROM event_push_summary
-            WHERE room_id = ? AND user_id = ? AND stream_ordering <= ?
+            WHERE room_id = ? AND user_id = ? AND {stream_ordering_clause}
         """,
-            (room_id, user_id, stream_ordering),
+            (room_id, user_id) + stream_ordering_args,
         )
 
 
diff --git a/synapse/storage/databases/main/receipts.py b/synapse/storage/databases/main/receipts.py
index c2381017b0..abf1dfaecd 100644
--- a/synapse/storage/databases/main/receipts.py
+++ b/synapse/storage/databases/main/receipts.py
@@ -672,13 +672,20 @@ class ReceiptsWorkerStore(SQLBaseStore):
 
         # When updating a local users read receipt, remove any push actions
         # which resulted from the receipt's event and all earlier events.
+        #
+        # XXX Can the stream orderings from local users not be known? Maybe if
+        #     events are purged (retention?)
+        #
+        # XXX Do we need to differentiate between an unbounded start
+        #     (start_event_id == None) vs. being unable to find the event
+        #     (start_stream_ordering == None)?
         if (
             self.hs.is_mine_id(user_id)
             and receipt_type in (ReceiptTypes.READ, ReceiptTypes.READ_PRIVATE)
-            and end_stream_ordering is not None
+            and (start_stream_ordering is not None or end_stream_ordering is not None)
         ):
-            self._remove_old_push_actions_before_txn(  # type: ignore[attr-defined]
-                txn, room_id=room_id, user_id=user_id, stream_ordering=end_stream_ordering
+            self._remove_old_push_actions_txn(  # type: ignore[attr-defined]
+                txn, room_id, user_id, end_stream_ordering, start_stream_ordering
             )
 
         return rx_ts