From 149fa411e24a30b738094e40f02f18c2b9230cbe Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 20 May 2016 15:25:12 +0100 Subject: Only delete push actions after 30 days --- synapse/storage/event_push_actions.py | 42 ++++++++++++++++++++++++++++++----- synapse/storage/receipts.py | 2 +- 2 files changed, 38 insertions(+), 6 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 9705db5c47..336c03c68a 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,6 +22,8 @@ import ujson as json logger = logging.getLogger(__name__) +KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000 + class EventPushActionsStore(SQLBaseStore): def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): @@ -224,16 +226,46 @@ class EventPushActionsStore(SQLBaseStore): (room_id, event_id) ) - def _remove_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering): + def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, + topological_ordering): + """ + Purges old, stale push actions for a user and room before a given + topological_ordering + Args: + txn: The transcation + room_id: Room ID to delete from + user_id: user ID to delete for + topological_ordering: The lowest topological ordering which will + not be deleted. + + Returns: + + """ txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, (room_id, user_id, ) ) + + threshold = self._clock.time_msec() - KEEP_PUSH_ACTIONS_FOR_MS + + # We need to join on the events table to get the received_ts for + # event_push_actions and sqlite won't let us use a join in a delete so + # we can't just delete where received_ts < x. Furthermore we can + # only identify event_push_actions by a tuple of room_id, event_id + # we we can't use a subquery. + # Instead, we look up the stream ordering for the last event in that + # room received before the threshold time and delete event_push_actions + # in the room with a stream_odering before that. txn.execute( - "DELETE FROM event_push_actions" - " WHERE room_id = ? AND user_id = ? AND topological_ordering < ?", - (room_id, user_id, topological_ordering,) + "DELETE FROM event_push_actions " + " WHERE user_id = ? AND room_id = ? AND " + " topological_ordering < ? AND stream_ordering < (" + " SELECT stream_ordering FROM events" + " WHERE room_id = ? AND received_ts < ?" + " ORDER BY stream_ordering DESC" + " LIMIT 1" + ")", + (user_id, room_id, topological_ordering, room_id, threshold) ) diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index fdcf28f3e1..f1774f0e44 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -297,7 +297,7 @@ class ReceiptsStore(SQLBaseStore): ) if receipt_type == "m.read" and topological_ordering: - self._remove_push_actions_before_txn( + self._remove_old_push_actions_before_txn( txn, room_id=room_id, user_id=user_id, -- cgit 1.4.1 From d4503e25ed01b6053bd5bb503f858a2ab934e350 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 20 May 2016 17:56:10 +0100 Subject: Make deleting push actions more efficient There's no index on received_ts, so manually binary search using the stream_ordering index, and only update it once an hour. --- synapse/storage/__init__.py | 9 +++++ synapse/storage/_base.py | 1 - synapse/storage/event_push_actions.py | 71 +++++++++++++++++++++++++++++------ 3 files changed, 68 insertions(+), 13 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index d970fde9e8..49feb77779 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -88,6 +88,7 @@ class DataStore(RoomMemberStore, RoomStore, def __init__(self, db_conn, hs): self.hs = hs + self._clock = hs.get_clock() self.database_engine = hs.database_engine self.client_ip_last_seen = Cache( @@ -173,6 +174,14 @@ class DataStore(RoomMemberStore, RoomStore, prefilled_cache=push_rules_prefill, ) + cur = db_conn.cursor() + self._find_stream_orderings_for_times_txn(cur) + cur.close() + + self.find_stream_orderings_looping_call = self._clock.looping_call( + self._find_stream_orderings_for_times, 60 * 60 * 1000 + ) + super(DataStore, self).__init__(hs) def take_presence_startup_info(self): diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index e0d7098692..56a0dd80f3 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -153,7 +153,6 @@ class SQLBaseStore(object): def __init__(self, hs): self.hs = hs self._db_pool = hs.get_db_pool() - self._clock = hs.get_clock() self._previous_txn_total_time = 0 self._current_txn_total_time = 0 diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 336c03c68a..4425d4bce5 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -22,10 +22,12 @@ import ujson as json logger = logging.getLogger(__name__) -KEEP_PUSH_ACTIONS_FOR_MS = 30 * 24 * 60 * 60 * 1000 - class EventPushActionsStore(SQLBaseStore): + def __init__(self, hs): + self.stream_ordering_month_ago = None + super(EventPushActionsStore, self).__init__(hs) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -237,9 +239,6 @@ class EventPushActionsStore(SQLBaseStore): user_id: user ID to delete for topological_ordering: The lowest topological ordering which will not be deleted. - - Returns: - """ txn.call_after( self.get_unread_event_push_actions_by_room_for_user.invalidate_many, @@ -259,15 +258,63 @@ class EventPushActionsStore(SQLBaseStore): txn.execute( "DELETE FROM event_push_actions " " WHERE user_id = ? AND room_id = ? AND " - " topological_ordering < ? AND stream_ordering < (" - " SELECT stream_ordering FROM events" - " WHERE room_id = ? AND received_ts < ?" - " ORDER BY stream_ordering DESC" - " LIMIT 1" - ")", - (user_id, room_id, topological_ordering, room_id, threshold) + " topological_ordering < ? AND stream_ordering < ?" + (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) + ) + + @defer.inlineCallbacks + def _find_stream_orderings_for_times(self): + yield self.runInteraction( + "_find_stream_orderings_for_times", + self._find_stream_orderings_for_times_txn + ) + + def _find_stream_orderings_for_times_txn(self, txn): + logger.info("Searching for stream ordering 1 month ago") + self.stream_ordering_month_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 30 * 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 month ago: it's %d", + self.stream_ordering_month_ago ) + def _find_first_stream_ordering_after_ts_txn(self, txn, ts): + """ + Find the stream_ordering of the first event that was received after + a given timestamp. This is relatively slow as there is no index on + received_ts but we can then use this to delete push actions before + this. + + received_ts must necessarily be in the same order as stream_ordering + and stream_ordering is indexed, so we manually binary search using + stream_ordering + """ + txn.execute("SELECT MAX(stream_ordering) FROM events") + max_stream_ordering = txn.fetchone()[0] + + range_start = 0 + range_end = max_stream_ordering + + sql = ( + "SELECT received_ts FROM events" + " WHERE stream_ordering > ?" + " ORDER BY stream_ordering" + " LIMIT 1" + ) + + while range_end - range_start > 1: + middle = int((range_end + range_start) / 2) + txn.execute(sql, (middle,)) + middle_ts = txn.fetchone()[0] + if ts > middle_ts: + range_start = middle + else: + range_end = middle + logger.info("done: picking %d from %d and %d", range_end, range_start, range_end) + + return range_end + def _action_has_highlight(actions): for action in actions: -- cgit 1.4.1 From 18d68bfee40078e4195adc43c93872d4a6facaf8 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 20 May 2016 17:58:09 +0100 Subject: Handle empty events table --- synapse/storage/event_push_actions.py | 3 +++ 1 file changed, 3 insertions(+) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 4425d4bce5..d630a846c9 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -293,6 +293,9 @@ class EventPushActionsStore(SQLBaseStore): txn.execute("SELECT MAX(stream_ordering) FROM events") max_stream_ordering = txn.fetchone()[0] + if max_stream_ordering is None: + return 0 + range_start = 0 range_end = max_stream_ordering -- cgit 1.4.1 From ccffb0965de8a567ba56eeae5c16945921268802 Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 20 May 2016 17:59:10 +0100 Subject: Remove stale line --- synapse/storage/event_push_actions.py | 2 -- 1 file changed, 2 deletions(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index d630a846c9..cb38d5f525 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -245,8 +245,6 @@ class EventPushActionsStore(SQLBaseStore): (room_id, user_id, ) ) - threshold = self._clock.time_msec() - KEEP_PUSH_ACTIONS_FOR_MS - # We need to join on the events table to get the received_ts for # event_push_actions and sqlite won't let us use a join in a delete so # we can't just delete where received_ts < x. Furthermore we can -- cgit 1.4.1 From c2da3406fcadefa3efbd302ae22a21c7ffbac9ce Mon Sep 17 00:00:00 2001 From: David Baker Date: Fri, 20 May 2016 18:03:31 +0100 Subject: Oops, missing comma --- synapse/storage/event_push_actions.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index cb38d5f525..88f9b4b642 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -256,7 +256,7 @@ class EventPushActionsStore(SQLBaseStore): txn.execute( "DELETE FROM event_push_actions " " WHERE user_id = ? AND room_id = ? AND " - " topological_ordering < ? AND stream_ordering < ?" + " topological_ordering < ? AND stream_ordering < ?", (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) ) -- cgit 1.4.1 From 31b5395ab6f1bec6a7198f2302ec43a891c95d03 Mon Sep 17 00:00:00 2001 From: David Baker Date: Mon, 23 May 2016 16:32:01 +0100 Subject: Remove debug logging --- synapse/storage/event_push_actions.py | 1 - 1 file changed, 1 deletion(-) (limited to 'synapse') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 88f9b4b642..4dae51a172 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -312,7 +312,6 @@ class EventPushActionsStore(SQLBaseStore): range_start = middle else: range_end = middle - logger.info("done: picking %d from %d and %d", range_end, range_start, range_end) return range_end -- cgit 1.4.1