diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py
index 9705db5c47..336c03c68a 100644
--- a/synapse/storage/event_push_actions.py
+++ b/synapse/storage/event_push_actions.py
@@ -22,6 +22,8 @@ import ujson as json
logger = logging.getLogger(__name__)
+KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000
+
class EventPushActionsStore(SQLBaseStore):
def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples):
@@ -224,16 +226,46 @@ class EventPushActionsStore(SQLBaseStore):
(room_id, event_id)
)
- def _remove_push_actions_before_txn(self, txn, room_id, user_id,
- topological_ordering):
+ def _remove_old_push_actions_before_txn(self, txn, room_id, user_id,
+ topological_ordering):
+ """
+ Purges old, stale push actions for a user and room before a given
+ topological_ordering
+ Args:
+ txn: The transcation
+ room_id: Room ID to delete from
+ user_id: user ID to delete for
+ topological_ordering: The lowest topological ordering which will
+ not be deleted.
+
+ Returns:
+
+ """
txn.call_after(
self.get_unread_event_push_actions_by_room_for_user.invalidate_many,
(room_id, user_id, )
)
+
+ threshold = self._clock.time_msec() - KEEP_PUSH_ACTIONS_FOR_MS
+
+ # We need to join on the events table to get the received_ts for
+ # event_push_actions and sqlite won't let us use a join in a delete so
+ # we can't just delete where received_ts < x. Furthermore we can
+ # only identify event_push_actions by a tuple of room_id, event_id
+ # we we can't use a subquery.
+ # Instead, we look up the stream ordering for the last event in that
+ # room received before the threshold time and delete event_push_actions
+ # in the room with a stream_odering before that.
txn.execute(
- "DELETE FROM event_push_actions"
- " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?",
- (room_id, user_id, topological_ordering,)
+ "DELETE FROM event_push_actions "
+ " WHERE user_id = ? AND room_id = ? AND "
+ " topological_ordering < ? AND stream_ordering < ("
+ " SELECT stream_ordering FROM events"
+ " WHERE room_id = ? AND received_ts < ?"
+ " ORDER BY stream_ordering DESC"
+ " LIMIT 1"
+ ")",
+ (user_id, room_id, topological_ordering, room_id, threshold)
)
diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py
index fdcf28f3e1..f1774f0e44 100644
--- a/synapse/storage/receipts.py
+++ b/synapse/storage/receipts.py
@@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore):
)
if receipt_type == "m.read" and topological_ordering:
- self._remove_push_actions_before_txn(
+ self._remove_old_push_actions_before_txn(
txn,
room_id=room_id,
user_id=user_id,
|