summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/storage/event_push_actions.py42
-rw-r--r--synapse/storage/receipts.py2
2 files changed, 38 insertions, 6 deletions
diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9705db5c47..336c03c68a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,8 @@ import ujson as json
 
 logger = logging.getLogger(__name__)
 
+KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000
+
 
 class EventPushActionsStore(SQLBaseStore):
     def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
@@ -224,16 +226,46 @@ class EventPushActionsStore(SQLBaseStore):
             (room_id, event_id)
         )
 
-    def _remove_push_actions_before_txn(self, txn, room_id, user_id,
-                                        topological_ordering):
+    def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+                                            topological_ordering):
+        """
+        Purges old, stale push actions for a user and room before a given
+        topological_ordering
+        Args:
+            txn: The transcation
+            room_id: Room ID to delete from
+            user_id: user ID to delete for
+            topological_ordering: The lowest topological ordering which will
+                                  not be deleted.
+
+        Returns:
+
+        """
         txn.call_after(
             self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
             (room_id, user_id, )
         )
+
+        threshold = self._clock.time_msec() - KEEP_PUSH_ACTIONS_FOR_MS
+
+        # We need to join on the events table to get the received_ts for
+        # event_push_actions and sqlite won't let us use a join in a delete so
+        # we can't just delete where received_ts < x. Furthermore we can
+        # only identify event_push_actions by a tuple of room_id, event_id
+        # we we can't use a subquery.
+        # Instead, we look up the stream ordering for the last event in that
+        # room received before the threshold time and delete event_push_actions
+        # in the room with a stream_odering before that.
         txn.execute(
-            "DELETE FROM event_push_actions"
-            " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
-            (room_id, user_id, topological_ordering,)
+            "DELETE FROM event_push_actions "
+            " WHERE user_id = ? AND room_id = ? AND "
+            " topological_ordering < ? AND stream_ordering < ("
+            "  SELECT stream_ordering FROM events"
+            "  WHERE room_id = ? AND received_ts < ?"
+            "  ORDER BY stream_ordering DESC"
+            "  LIMIT 1"
+            ")",
+            (user_id, room_id, topological_ordering, room_id, threshold)
         )
 
 
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..f1774f0e44 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
         )
 
         if receipt_type == "m.read" and topological_ordering:
-            self._remove_push_actions_before_txn(
+            self._remove_old_push_actions_before_txn(
                 txn,
                 room_id=room_id,
                 user_id=user_id,