From fac3c03087b14abf7b9857f937fd3311e0619a6f Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 1 Feb 2017 18:27:24 +0000 Subject: Be more agressive about purging old room event_push_actions --- synapse/storage/event_push_actions.py | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 7de3e8c58c..522d0114cb 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -450,8 +450,12 @@ class EventPushActionsStore(SQLBaseStore): def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, topological_ordering): """ - Purges old, stale push actions for a user and room before a given - topological_ordering + Purges old push actions for a user and room before a given + topological_ordering. + + We however keep a months worth of highlighted notifications, so that + users can still get a list of recent highlights. + Args: txn: The transcation room_id: Room ID to delete from @@ -475,7 +479,8 @@ class EventPushActionsStore(SQLBaseStore): txn.execute( "DELETE FROM event_push_actions " " WHERE user_id = ? AND room_id = ? AND " - " topological_ordering < ? AND stream_ordering < ?", + " topological_ordering < ?" + " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) ) -- cgit 1.4.1 From 095b45c1653bc93788a45b891f29efc30f0b1b07 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Fri, 3 Feb 2017 18:12:53 +0000 Subject: Aggregate event push actions --- synapse/replication/slave/storage/events.py | 6 + synapse/storage/event_push_actions.py | 267 ++++++++++++++++----- synapse/storage/receipts.py | 1 + .../storage/schema/delta/40/event_push_summary.sql | 37 +++ tests/storage/test_event_push_actions.py | 86 +++++++ 5 files changed, 340 insertions(+), 57 deletions(-) create mode 100644 synapse/storage/schema/delta/40/event_push_summary.sql (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/replication/slave/storage/events.py b/synapse/replication/slave/storage/events.py index d72ff6055c..622b2d8540 100644 --- a/synapse/replication/slave/storage/events.py +++ b/synapse/replication/slave/storage/events.py @@ -85,6 +85,12 @@ class SlavedEventStore(BaseSlavedStore): get_unread_event_push_actions_by_room_for_user = ( EventPushActionsStore.__dict__["get_unread_event_push_actions_by_room_for_user"] ) + _get_unread_counts_by_receipt_txn = ( + DataStore._get_unread_counts_by_receipt_txn.__func__ + ) + _get_unread_counts_by_pos_txn = ( + DataStore._get_unread_counts_by_pos_txn.__func__ + ) _get_state_group_for_events = ( StateStore.__dict__["_get_state_group_for_events"] ) diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 522d0114cb..300571b78b 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -15,6 +15,7 @@ from ._base import SQLBaseStore from twisted.internet import defer +from synapse.util.async import sleep from synapse.util.caches.descriptors import cachedInlineCallbacks from synapse.types import RoomStreamToken from .stream import lower_bound @@ -29,7 +30,6 @@ class EventPushActionsStore(SQLBaseStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" def __init__(self, hs): - self.stream_ordering_month_ago = None super(EventPushActionsStore, self).__init__(hs) self.register_background_index_update( @@ -47,6 +47,9 @@ class EventPushActionsStore(SQLBaseStore): where_clause="highlight=1" ) + self._doing_notif_rotation = False + self._clock.looping_call(self._rotate_notifs, 60 * 1000) + def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ Args: @@ -77,66 +80,89 @@ class EventPushActionsStore(SQLBaseStore): def get_unread_event_push_actions_by_room_for_user( self, room_id, user_id, last_read_event_id ): - def _get_unread_event_push_actions_by_room(txn): - sql = ( - "SELECT stream_ordering, topological_ordering" - " FROM events" - " WHERE room_id = ? AND event_id = ?" - ) - txn.execute( - sql, (room_id, last_read_event_id) - ) - results = txn.fetchall() - if len(results) == 0: - return {"notify_count": 0, "highlight_count": 0} - - stream_ordering = results[0][0] - topological_ordering = results[0][1] - token = RoomStreamToken( - topological_ordering, stream_ordering - ) - - # First get number of notifications. - # We don't need to put a notif=1 clause as all rows always have - # notif=1 - sql = ( - "SELECT count(*)" - " FROM event_push_actions ea" - " WHERE" - " user_id = ?" - " AND room_id = ?" - " AND %s" - ) % (lower_bound(token, self.database_engine, inclusive=False),) + ret = yield self.runInteraction( + "get_unread_event_push_actions_by_room", + self._get_unread_counts_by_receipt_txn, + room_id, user_id, last_read_event_id + ) + defer.returnValue(ret) - txn.execute(sql, (user_id, room_id)) - row = txn.fetchone() - notify_count = row[0] if row else 0 + def _get_unread_counts_by_receipt_txn(self, txn, room_id, user_id, + last_read_event_id): + sql = ( + "SELECT stream_ordering, topological_ordering" + " FROM events" + " WHERE room_id = ? AND event_id = ?" + ) + txn.execute( + sql, (room_id, last_read_event_id) + ) + results = txn.fetchall() + if len(results) == 0: + return {"notify_count": 0, "highlight_count": 0} - # Now get the number of highlights - sql = ( - "SELECT count(*)" - " FROM event_push_actions ea" - " WHERE" - " highlight = 1" - " AND user_id = ?" - " AND room_id = ?" - " AND %s" - ) % (lower_bound(token, self.database_engine, inclusive=False),) + stream_ordering = results[0][0] + topological_ordering = results[0][1] - txn.execute(sql, (user_id, room_id)) - row = txn.fetchone() - highlight_count = row[0] if row else 0 + return self._get_unread_counts_by_pos_txn( + txn, room_id, user_id, topological_ordering, stream_ordering + ) - return { - "notify_count": notify_count, - "highlight_count": highlight_count, - } + def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, topological_ordering, + stream_ordering): + token = RoomStreamToken( + topological_ordering, stream_ordering + ) - ret = yield self.runInteraction( - "get_unread_event_push_actions_by_room", - _get_unread_event_push_actions_by_room + # First get number of notifications. + # We don't need to put a notif=1 clause as all rows always have + # notif=1 + sql = ( + "SELECT count(*)" + " FROM event_push_actions ea" + " WHERE" + " user_id = ?" + " AND room_id = ?" + " AND %s" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + txn.execute(sql, (user_id, room_id)) + row = txn.fetchone() + notify_count = row[0] if row else 0 + + summary_notif_count = self._simple_select_one_onecol_txn( + txn, + table="event_push_summary", + keyvalues={ + "user_id": user_id, + "room_id": room_id, + }, + retcol="notif_count", + allow_none=True, ) - defer.returnValue(ret) + + if summary_notif_count: + notify_count += summary_notif_count + + # Now get the number of highlights + sql = ( + "SELECT count(*)" + " FROM event_push_actions ea" + " WHERE" + " highlight = 1" + " AND user_id = ?" + " AND room_id = ?" + " AND %s" + ) % (lower_bound(token, self.database_engine, inclusive=False),) + + txn.execute(sql, (user_id, room_id)) + row = txn.fetchone() + highlight_count = row[0] if row else 0 + + return { + "notify_count": notify_count, + "highlight_count": highlight_count, + } @defer.inlineCallbacks def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): @@ -448,7 +474,7 @@ class EventPushActionsStore(SQLBaseStore): ) def _remove_old_push_actions_before_txn(self, txn, room_id, user_id, - topological_ordering): + topological_ordering, stream_ordering): """ Purges old push actions for a user and room before a given topological_ordering. @@ -479,11 +505,16 @@ class EventPushActionsStore(SQLBaseStore): txn.execute( "DELETE FROM event_push_actions " " WHERE user_id = ? AND room_id = ? AND " - " topological_ordering < ?" + " topological_ordering <= ?" " AND ((stream_ordering < ? AND highlight = 1) or highlight = 0)", (user_id, room_id, topological_ordering, self.stream_ordering_month_ago) ) + txn.execute(""" + DELETE FROM event_push_summary + WHERE room_id = ? AND user_id = ? AND stream_ordering <= ? + """, (room_id, user_id, stream_ordering)) + @defer.inlineCallbacks def _find_stream_orderings_for_times(self): yield self.runInteraction( @@ -500,6 +531,14 @@ class EventPushActionsStore(SQLBaseStore): "Found stream ordering 1 month ago: it's %d", self.stream_ordering_month_ago ) + logger.info("Searching for stream ordering 1 day ago") + self.stream_ordering_day_ago = self._find_first_stream_ordering_after_ts_txn( + txn, self._clock.time_msec() - 24 * 60 * 60 * 1000 + ) + logger.info( + "Found stream ordering 1 day ago: it's %d", + self.stream_ordering_day_ago + ) def _find_first_stream_ordering_after_ts_txn(self, txn, ts): """ @@ -539,6 +578,120 @@ class EventPushActionsStore(SQLBaseStore): return range_end + @defer.inlineCallbacks + def _rotate_notifs(self): + if self._doing_notif_rotation or self.stream_ordering_day_ago is None: + return + self._doing_notif_rotation = True + + try: + while True: + logger.info("Rotating notifications") + + caught_up = yield self.runInteraction( + "_rotate_notifs", + self._rotate_notifs_txn + ) + if caught_up: + break + yield sleep(1) + finally: + self._doing_notif_rotation = False + + def _rotate_notifs_txn(self, txn): + """Archives older notifications into event_push_summary. Returns whether + the archiving process has caught up or not. + """ + + # We want to make sure that we only ever do this one at a time + self.database_engine.lock_table(txn, "event_push_summary") + + # We don't to try and rotate millions of rows at once, so we cap the + # maximum stream ordering we'll rotate before. + txn.execute(""" + SELECT stream_ordering FROM event_push_actions + ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 + """) + stream_row = txn.fetchone() + if stream_row: + offset_stream_ordering, = stream_row + rotate_to_stream_ordering = min( + self.stream_ordering_day_ago, offset_stream_ordering + ) + caught_up = offset_stream_ordering >= self.stream_ordering_day_ago + else: + rotate_to_stream_ordering = self.stream_ordering_day_ago + caught_up = True + + self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) + + # We have caught up iff we were limited by `stream_ordering_day_ago` + return caught_up + + def _rotate_notifs_before_txn(self, txn, rotate_to_stream_ordering): + old_rotate_stream_ordering = self._simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + + # Calculate the new counts that should be upserted into event_push_summary + sql = """ + SELECT user_id, room_id, + coalesce(old.notif_count, 0) + upd.notif_count, + upd.stream_ordering, + old.user_id + FROM ( + SELECT user_id, room_id, count(*) as notif_count, + max(stream_ordering) as stream_ordering + FROM event_push_actions + WHERE ? <= stream_ordering AND stream_ordering < ? + AND highlight = 0 + GROUP BY user_id, room_id + ) AS upd + LEFT JOIN event_push_summary AS old USING (user_id, room_id) + """ + + txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,)) + rows = txn.fetchall() + + # If the `old.user_id` above is NULL then we know there isn't already an + # entry in the table, so we simply insert it. Otherwise we update the + # existing table. + self._simple_insert_many_txn( + txn, + table="event_push_summary", + values=[ + { + "user_id": row[0], + "room_id": row[1], + "notif_count": row[2], + "stream_ordering": row[3], + } + for row in rows if row[4] is None + ] + ) + + txn.executemany( + """ + UPDATE event_push_summary SET notif_count = ?, stream_ordering = ? + WHERE user_id = ? AND room_id = ? + """, + ((row[2], row[3], row[0], row[1],) for row in rows if row[4] is not None) + ) + + txn.execute( + "DELETE FROM event_push_actions" + " WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0", + (old_rotate_stream_ordering, rotate_to_stream_ordering,) + ) + + txn.execute( + "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", + (rotate_to_stream_ordering,) + ) + def _action_has_highlight(actions): for action in actions: diff --git a/synapse/storage/receipts.py b/synapse/storage/receipts.py index f72d15f5ed..5cf41501ea 100644 --- a/synapse/storage/receipts.py +++ b/synapse/storage/receipts.py @@ -351,6 +351,7 @@ class ReceiptsStore(SQLBaseStore): room_id=room_id, user_id=user_id, topological_ordering=topological_ordering, + stream_ordering=stream_ordering, ) return True diff --git a/synapse/storage/schema/delta/40/event_push_summary.sql b/synapse/storage/schema/delta/40/event_push_summary.sql new file mode 100644 index 0000000000..3918f0b794 --- /dev/null +++ b/synapse/storage/schema/delta/40/event_push_summary.sql @@ -0,0 +1,37 @@ +/* Copyright 2017 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- Aggregate of old notification counts that have been deleted out of the +-- main event_push_actions table. This count does not include those that were +-- highlights, as they remain in the event_push_actions table. +CREATE TABLE event_push_summary ( + user_id TEXT NOT NULL, + room_id TEXT NOT NULL, + notif_count BIGINT NOT NULL, + stream_ordering BIGINT NOT NULL +); + +CREATE INDEX event_push_summary_user_rm ON event_push_summary(user_id, room_id); + + +-- The stream ordering up to which we have aggregated the event_push_actions +-- table into event_push_summary +CREATE TABLE event_push_summary_stream_ordering ( + Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE, -- Makes sure this table only has one row. + stream_ordering BIGINT NOT NULL, + CHECK (Lock='X') +); + +INSERT INTO event_push_summary_stream_ordering (stream_ordering) VALUES (0); diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index e9044afa2e..3135488353 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -17,9 +17,15 @@ from twisted.internet import defer import tests.unittest import tests.utils +from mock import Mock USER_ID = "@user:example.com" +PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}] +HIGHLIGHT = [ + "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"} +] + class EventPushActionsStoreTestCase(tests.unittest.TestCase): @@ -39,3 +45,83 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): yield self.store.get_unread_push_actions_for_user_in_range_for_email( USER_ID, 0, 1000, 20 ) + + @defer.inlineCallbacks + def test_count_aggregation(self): + room_id = "!foo:example.com" + user_id = "@user1235:example.com" + + @defer.inlineCallbacks + def _assert_counts(noitf_count, highlight_count): + counts = yield self.store.runInteraction( + "", self.store._get_unread_counts_by_pos_txn, + room_id, user_id, 0, 0 + ) + self.assertEquals( + counts, + {"notify_count": noitf_count, "highlight_count": highlight_count} + ) + + def _inject_actions(stream, action): + event = Mock() + event.room_id = room_id + event.event_id = "$test:example.com" + event.internal_metadata.stream_ordering = stream + event.depth = stream + + tuples = [(user_id, action)] + + return self.store.runInteraction( + "", self.store._set_push_actions_for_event_and_users_txn, + event, tuples + ) + + def _rotate(stream): + return self.store.runInteraction( + "", self.store._rotate_notifs_before_txn, stream + ) + + def _mark_read(stream, depth): + return self.store.runInteraction( + "", self.store._remove_old_push_actions_before_txn, + room_id, user_id, depth, stream + ) + + yield _assert_counts(0, 0) + yield _inject_actions(1, PlAIN_NOTIF) + yield _assert_counts(1, 0) + yield _rotate(2) + yield _assert_counts(1, 0) + + yield _inject_actions(3, PlAIN_NOTIF) + yield _assert_counts(2, 0) + yield _rotate(4) + yield _assert_counts(2, 0) + + yield _inject_actions(5, PlAIN_NOTIF) + yield _mark_read(3, 3) + yield _assert_counts(1, 0) + + yield _mark_read(5, 5) + yield _assert_counts(0, 0) + + yield _inject_actions(6, PlAIN_NOTIF) + yield _rotate(7) + + yield self.store._simple_delete( + table="event_push_actions", + keyvalues={"1": 1}, + desc="", + ) + + yield _assert_counts(1, 0) + + yield _mark_read(7, 7) + yield _assert_counts(0, 0) + + yield _inject_actions(8, HIGHLIGHT) + yield _assert_counts(1, 1) + yield _rotate(9) + yield _assert_counts(1, 1) + yield _rotate(10) + yield _assert_counts(1, 1) -- cgit 1.4.1 From ce3c8df6dff456cdc548f1bc40a47f4b2a09a0a6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Feb 2017 13:41:24 +0000 Subject: Less aggressive timers --- synapse/storage/event_push_actions.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 300571b78b..1b7888a914 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -48,7 +48,7 @@ class EventPushActionsStore(SQLBaseStore): ) self._doing_notif_rotation = False - self._clock.looping_call(self._rotate_notifs, 60 * 1000) + self._clock.looping_call(self._rotate_notifs, 30 * 60 * 1000) def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ @@ -594,7 +594,7 @@ class EventPushActionsStore(SQLBaseStore): ) if caught_up: break - yield sleep(1) + yield sleep(5) finally: self._doing_notif_rotation = False -- cgit 1.4.1 From e6acf0c399b1baadc56f57d8398643a4cb1de56a Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Feb 2017 16:37:25 +0000 Subject: Store the default push actions in a more efficient manner --- synapse/storage/event_push_actions.py | 51 +++++++++++++++++++++++++++-------- 1 file changed, 40 insertions(+), 11 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 1b7888a914..db20b7de20 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -26,6 +26,32 @@ import ujson as json logger = logging.getLogger(__name__) +DEFAULT_NOTIF_ACITON = ["notify", {"set_tweak": "highlight", "value": False}] +DEFAULT_HIGHLIGHT_ACITON = [ + "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"} +] + + +def _serialize_action(actions, is_highlight): + if is_highlight: + if actions == DEFAULT_HIGHLIGHT_ACITON: + return "" + else: + if actions == DEFAULT_NOTIF_ACITON: + return "" + return json.dumps(actions) + + +def _deserialize_action(actions, is_highlight): + if actions: + return json.loads(actions) + + if is_highlight: + return DEFAULT_HIGHLIGHT_ACITON + else: + return DEFAULT_NOTIF_ACITON + + class EventPushActionsStore(SQLBaseStore): EPA_HIGHLIGHT_INDEX = "epa_highlight_index" @@ -58,15 +84,17 @@ class EventPushActionsStore(SQLBaseStore): """ values = [] for uid, actions in tuples: + is_highlight = 1 if _action_has_highlight(actions) else 0 + values.append({ 'room_id': event.room_id, 'event_id': event.event_id, 'user_id': uid, - 'actions': json.dumps(actions), + 'actions': _serialize_action(actions, is_highlight), 'stream_ordering': event.internal_metadata.stream_ordering, 'topological_ordering': event.depth, 'notif': 1, - 'highlight': 1 if _action_has_highlight(actions) else 0, + 'highlight': is_highlight, }) for uid, __ in tuples: @@ -202,7 +230,8 @@ class EventPushActionsStore(SQLBaseStore): # find rooms that have a read receipt in them and return the next # push actions sql = ( - "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions" + "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," + " ep.highlight " " FROM (" " SELECT room_id," " MAX(topological_ordering) as topological_ordering," @@ -243,7 +272,7 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " e.received_ts" + " ep.highlight " " FROM event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" @@ -272,7 +301,7 @@ class EventPushActionsStore(SQLBaseStore): "event_id": row[0], "room_id": row[1], "stream_ordering": row[2], - "actions": json.loads(row[3]), + "actions": _deserialize_action(row[3], row[4]), } for row in after_read_receipt + no_read_receipt ] @@ -311,7 +340,7 @@ class EventPushActionsStore(SQLBaseStore): def get_after_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " e.received_ts" + " ep.highlight, e.received_ts" " FROM (" " SELECT room_id," " MAX(topological_ordering) as topological_ordering," @@ -353,7 +382,7 @@ class EventPushActionsStore(SQLBaseStore): def get_no_receipt(txn): sql = ( "SELECT ep.event_id, ep.room_id, ep.stream_ordering, ep.actions," - " e.received_ts" + " ep.highlight, e.received_ts" " FROM event_push_actions AS ep" " INNER JOIN events AS e USING (room_id, event_id)" " WHERE" @@ -383,8 +412,8 @@ class EventPushActionsStore(SQLBaseStore): "event_id": row[0], "room_id": row[1], "stream_ordering": row[2], - "actions": json.loads(row[3]), - "received_ts": row[4], + "actions": _deserialize_action(row[3], row[4]), + "received_ts": row[5], } for row in after_read_receipt + no_read_receipt ] @@ -418,7 +447,7 @@ class EventPushActionsStore(SQLBaseStore): sql = ( "SELECT epa.event_id, epa.room_id," " epa.stream_ordering, epa.topological_ordering," - " epa.actions, epa.profile_tag, e.received_ts" + " epa.actions, epa.highlight, epa.profile_tag, e.received_ts" " FROM event_push_actions epa, events e" " WHERE epa.event_id = e.event_id" " AND epa.user_id = ? %s" @@ -433,7 +462,7 @@ class EventPushActionsStore(SQLBaseStore): "get_push_actions_for_user", f ) for pa in push_actions: - pa["actions"] = json.loads(pa["actions"]) + pa["actions"] = _deserialize_action(pa["actions"], pa["highlight"]) defer.returnValue(push_actions) @defer.inlineCallbacks -- cgit 1.4.1 From 502ae6c663e9b96950d1b474fbec7cf200e4e8a1 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 14 Feb 2017 16:54:37 +0000 Subject: Comment --- synapse/storage/event_push_actions.py | 21 ++++++++++++++------- 1 file changed, 14 insertions(+), 7 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index db20b7de20..ea6722df63 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -26,30 +26,37 @@ import ujson as json logger = logging.getLogger(__name__) -DEFAULT_NOTIF_ACITON = ["notify", {"set_tweak": "highlight", "value": False}] -DEFAULT_HIGHLIGHT_ACITON = [ +DEFAULT_NOTIF_ACTION = ["notify", {"set_tweak": "highlight", "value": False}] +DEFAULT_HIGHLIGHT_ACTION = [ "notify", {"set_tweak": "sound", "value": "default"}, {"set_tweak": "highlight"} ] def _serialize_action(actions, is_highlight): + """Custom serializer for actions. This allows us to "compress" common actions. + + We use the fact that most users have the same actions for notifs (and for + highlights). We replaces these default actions with the emtpy string. + """ if is_highlight: - if actions == DEFAULT_HIGHLIGHT_ACITON: - return "" + if actions == DEFAULT_HIGHLIGHT_ACTION: + return "" # We use empty string as the column is non-NULL else: - if actions == DEFAULT_NOTIF_ACITON: + if actions == DEFAULT_NOTIF_ACTION: return "" return json.dumps(actions) def _deserialize_action(actions, is_highlight): + """Custom deserializer for actions. This allows us to "compress" common actions + """ if actions: return json.loads(actions) if is_highlight: - return DEFAULT_HIGHLIGHT_ACITON + return DEFAULT_HIGHLIGHT_ACTION else: - return DEFAULT_NOTIF_ACITON + return DEFAULT_NOTIF_ACTION class EventPushActionsStore(SQLBaseStore): -- cgit 1.4.1 From 138e030cfe6e00956d79eed0702ff3f284692357 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 16 Feb 2017 15:03:36 +0000 Subject: Comment --- synapse/storage/event_push_actions.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index ea6722df63..808c9d22fc 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -36,7 +36,10 @@ def _serialize_action(actions, is_highlight): """Custom serializer for actions. This allows us to "compress" common actions. We use the fact that most users have the same actions for notifs (and for - highlights). We replaces these default actions with the emtpy string. + highlights). + We store these default actions as the empty string rather than the full JSON. + Since the empty string isn't valid JSON there is no risk of this clashing with + any real JSON actions """ if is_highlight: if actions == DEFAULT_HIGHLIGHT_ACTION: -- cgit 1.4.1 From 699be7d1be5238172677c98078c2a973ab28fb9d Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Sat, 18 Feb 2017 14:41:31 +0000 Subject: Fix up notif rotation --- synapse/storage/event_push_actions.py | 36 +++++++++++++++++++++-------------- 1 file changed, 22 insertions(+), 14 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 808c9d22fc..52d2bd3ffb 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -168,19 +168,13 @@ class EventPushActionsStore(SQLBaseStore): row = txn.fetchone() notify_count = row[0] if row else 0 - summary_notif_count = self._simple_select_one_onecol_txn( - txn, - table="event_push_summary", - keyvalues={ - "user_id": user_id, - "room_id": room_id, - }, - retcol="notif_count", - allow_none=True, - ) - - if summary_notif_count: - notify_count += summary_notif_count + txn.execute(""" + SELECT notif_count FROM event_push_summary + WHERE room_id = ? AND user_id = ? AND stream_ordering > ? + """, (room_id, user_id, stream_ordering,)) + rows = txn.fetchall() + if rows: + notify_count += rows[0][0] # Now get the number of highlights sql = ( @@ -645,12 +639,20 @@ class EventPushActionsStore(SQLBaseStore): # We want to make sure that we only ever do this one at a time self.database_engine.lock_table(txn, "event_push_summary") + old_rotate_stream_ordering = self._simple_select_one_onecol_txn( + txn, + table="event_push_summary_stream_ordering", + keyvalues={}, + retcol="stream_ordering", + ) + # We don't to try and rotate millions of rows at once, so we cap the # maximum stream ordering we'll rotate before. txn.execute(""" SELECT stream_ordering FROM event_push_actions + WHERE stream_ordering > ? ORDER BY stream_ordering ASC LIMIT 1 OFFSET 50000 - """) + """, (old_rotate_stream_ordering,)) stream_row = txn.fetchone() if stream_row: offset_stream_ordering, = stream_row @@ -662,6 +664,8 @@ class EventPushActionsStore(SQLBaseStore): rotate_to_stream_ordering = self.stream_ordering_day_ago caught_up = True + logger.info("Rotating notifications up to: %s", rotate_to_stream_ordering) + self._rotate_notifs_before_txn(txn, rotate_to_stream_ordering) # We have caught up iff we were limited by `stream_ordering_day_ago` @@ -695,6 +699,8 @@ class EventPushActionsStore(SQLBaseStore): txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering,)) rows = txn.fetchall() + logger.info("Rotating notifications, handling %d rows", len(rows)) + # If the `old.user_id` above is NULL then we know there isn't already an # entry in the table, so we simply insert it. Otherwise we update the # existing table. @@ -726,6 +732,8 @@ class EventPushActionsStore(SQLBaseStore): (old_rotate_stream_ordering, rotate_to_stream_ordering,) ) + logger.info("Rotating notifications, deleted %s push actions", txn.rowcount) + txn.execute( "UPDATE event_push_summary_stream_ordering SET stream_ordering = ?", (rotate_to_stream_ordering,) -- cgit 1.4.1 From b7442c3e2b333616093af76ab7848fa82b6490a9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Tue, 21 Feb 2017 13:59:25 +0000 Subject: Store looping call --- synapse/storage/event_push_actions.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index 52d2bd3ffb..cde141e20d 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -84,7 +84,9 @@ class EventPushActionsStore(SQLBaseStore): ) self._doing_notif_rotation = False - self._clock.looping_call(self._rotate_notifs, 30 * 60 * 1000) + self._rotate_notif_loop = self._clock.looping_call( + self._rotate_notifs, 30 * 60 * 1000 + ) def _set_push_actions_for_event_and_users_txn(self, txn, event, tuples): """ -- cgit 1.4.1 From b2d20e94fa0a2efa84a084ddededb52d89af64c6 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 22 Feb 2017 14:23:54 +0000 Subject: Remove lock from rotate notifs --- synapse/storage/event_push_actions.py | 3 --- 1 file changed, 3 deletions(-) (limited to 'synapse/storage/event_push_actions.py') diff --git a/synapse/storage/event_push_actions.py b/synapse/storage/event_push_actions.py index cde141e20d..14543b4269 100644 --- a/synapse/storage/event_push_actions.py +++ b/synapse/storage/event_push_actions.py @@ -638,9 +638,6 @@ class EventPushActionsStore(SQLBaseStore): the archiving process has caught up or not. """ - # We want to make sure that we only ever do this one at a time - self.database_engine.lock_table(txn, "event_push_summary") - old_rotate_stream_ordering = self._simple_select_one_onecol_txn( txn, table="event_push_summary_stream_ordering", -- cgit 1.4.1