From 149fa411e24a30b738094e40f02f18c2b9230cbe Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 20 May 2016 15:25:12 +0100 Subject: Only delete push actions after 30 days --- synapse/storage/event_push_actions.py | 42 ++++++++++++++++++++++++++++++----- synapse/storage/receipts.py | 2 +- 2 files changed, 38 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 9705db5c47..336c03c68a 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,8 @@ import ujson as json logger = logging.getLogger(__name__) +KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000 + class EventPushActionsStore(SQLBaseStore): def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): @@ -224,16 +226,46 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) - def _remove_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering): + def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + """ + Purges old, stale push actions for a user and room before a given + topological_ordering + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + topological_ordering: The lowest topological ordering which will + not be deleted. + + Returns: + + """ txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, (room_id, user_id, ) ) + + threshold = self._clock.time_msec() - KEEP_PUSH_ACTIONS_FOR_MS + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. txn.execute( - "DELETE FROM event_push_actions" - " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", - (room_id, user_id, topological_ordering,) + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " topological_ordering < ? AND stream_ordering < (" + " SELECT stream_ordering FROM events" + " WHERE room_id = ? AND received_ts < ?" + " ORDER BY stream_ordering DESC" + " LIMIT 1" + ")", + (user_id, room_id, topological_ordering, room_id, threshold) ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index fdcf28f3e1..f1774f0e44 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore): ) if receipt_type == "m.read" and topological_ordering: - self._remove_push_actions_before_txn( + self._remove_old_push_actions_before_txn( txn, room_id=room_id, user_id=user_id, -- cgit 1.4.1