From 6f6a4bfc079c51f130630237cf86488179bae63e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 14:24:01 +0100 Subject: Rename dont_push into mark_unread --- synapse/rest/client/v1/push_rule.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/rest/client/v1/push_rule.py') diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 9fd4908136..c27e05d1dc 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2014-2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -267,7 +267,7 @@ def _check_actions(actions): raise InvalidRuleException("No actions found") for a in actions: - if a in ["notify", "dont_notify", "coalesce"]: + if a in ["notify", "dont_notify", "coalesce", "mark_unread"]: pass elif isinstance(a, dict) and "set_tweak" in a: pass -- cgit 1.5.1 From df3323a7cfe831813c00df32c85b983587f8529e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 10 Jun 2020 20:32:01 +0100 Subject: Use temporary prefixes as per the MSC --- synapse/handlers/sync.py | 4 +++- synapse/rest/client/v1/push_rule.py | 2 +- synapse/storage/data_stores/main/event_push_actions.py | 2 +- 3 files changed, 5 insertions(+), 3 deletions(-) (limited to 'synapse/rest/client/v1/push_rule.py') diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index cec0ca427e..5a38f3e9a8 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1895,7 +1895,9 @@ class SyncHandler(object): if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] - unread_notifications["unread_count"] = notifs["unread_count"] + unread_notifications["org.matrix.msc2625.unread_count"] = ( + notifs["unread_count"] + ) sync_result_builder.joined.append(room_sync) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index c27e05d1dc..f563b3dc35 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -267,7 +267,7 @@ def _check_actions(actions): raise InvalidRuleException("No actions found") for a in actions: - if a in ["notify", "dont_notify", "coalesce", "mark_unread"]: + if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]: pass elif isinstance(a, dict) and "set_tweak" in a: pass diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 7ba741cce0..52dcc7be47 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -454,7 +454,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): # can be used to insert into the `event_push_actions_staging` table. def _gen_entry(user_id, actions): is_highlight = 1 if _action_has_highlight(actions) else 0 - notif = 0 if "mark_unread" in actions else 1 + notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1 return ( event_id, # event_id column user_id, # user_id column -- cgit 1.5.1 From 74d3e177f0443f27e670f0b99299d715c58fd238 Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Wed, 1 Jul 2020 11:08:25 +0100 Subject: Back out MSC2625 implementation (#7761) --- changelog.d/7673.feature | 1 - changelog.d/7716.feature | 1 - changelog.d/7761.feature | 1 + synapse/handlers/sync.py | 3 - synapse/push/bulk_push_rule_evaluator.py | 7 +- synapse/push/push_tools.py | 5 +- synapse/rest/client/v1/push_rule.py | 4 +- .../storage/data_stores/main/event_push_actions.py | 133 +++++---------------- .../delta/58/07push_summary_unread_count.sql | 23 ---- tests/replication/slave/storage/test_events.py | 19 +-- tests/storage/test_event_push_actions.py | 45 +++---- 11 files changed, 53 insertions(+), 189 deletions(-) delete mode 100644 changelog.d/7673.feature delete mode 100644 changelog.d/7716.feature create mode 100644 changelog.d/7761.feature delete mode 100644 synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql (limited to 'synapse/rest/client/v1/push_rule.py') diff --git a/changelog.d/7673.feature b/changelog.d/7673.feature deleted file mode 100644 index ecc3ffd8d5..0000000000 --- a/changelog.d/7673.feature +++ /dev/null @@ -1 +0,0 @@ -Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625). diff --git a/changelog.d/7716.feature b/changelog.d/7716.feature deleted file mode 100644 index ecc3ffd8d5..0000000000 --- a/changelog.d/7716.feature +++ /dev/null @@ -1 +0,0 @@ -Add a per-room counter for unread messages in responses to `/sync` requests. Implements [MSC2625](https://github.com/matrix-org/matrix-doc/pull/2625). diff --git a/changelog.d/7761.feature b/changelog.d/7761.feature new file mode 100644 index 0000000000..c97864677a --- /dev/null +++ b/changelog.d/7761.feature @@ -0,0 +1 @@ +Add unread messages count to sync responses. diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 0b82aa72a6..4c7524493e 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1893,9 +1893,6 @@ class SyncHandler(object): if notifs is not None: unread_notifications["notification_count"] = notifs["notify_count"] unread_notifications["highlight_count"] = notifs["highlight_count"] - unread_notifications["org.matrix.msc2625.unread_count"] = notifs[ - "unread_count" - ] sync_result_builder.joined.append(room_sync) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 5b00602a56..43ffe6faf0 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -189,11 +189,8 @@ class BulkPushRuleEvaluator(object): ) if matches: actions = [x for x in rule["actions"] if x != "dont_notify"] - if ( - "notify" in actions - or "org.matrix.msc2625.mark_unread" in actions - ): - # Push rules say we should act on this event. + if actions and "notify" in actions: + # Push rules say we should notify the user of this event actions_by_user[uid] = actions break diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 4ea683fee0..5dae4648c0 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -39,10 +39,7 @@ def get_badge_count(store, user_id): ) # return one badge count per conversation, as count per # message is so noisy as to be almost useless - # We're populating this badge using the unread_count (instead of the - # notify_count) as this badge is the number of missed messages, not the - # number of missed notifications. - badge += 1 if notifs.get("unread_count") else 0 + badge += 1 if notifs["notify_count"] else 0 return badge diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index f563b3dc35..9fd4908136 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -1,5 +1,5 @@ # -*- coding: utf-8 -*- -# Copyright 2014-2020 The Matrix.org Foundation C.I.C. +# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -267,7 +267,7 @@ def _check_actions(actions): raise InvalidRuleException("No actions found") for a in actions: - if a in ["notify", "dont_notify", "coalesce", "org.matrix.msc2625.mark_unread"]: + if a in ["notify", "dont_notify", "coalesce"]: pass elif isinstance(a, dict) and "set_tweak" in a: pass diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py index 815d52ab4c..bc9f4f08ea 100644 --- a/synapse/storage/data_stores/main/event_push_actions.py +++ b/synapse/storage/data_stores/main/event_push_actions.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- -# Copyright 2015-2020 The Matrix.org Foundation C.I.C. +# Copyright 2015 OpenMarket Ltd +# Copyright 2018 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,9 +15,7 @@ # limitations under the License. import logging -from typing import Dict, Tuple -import attr from canonicaljson import json from twisted.internet import defer @@ -37,16 +36,6 @@ DEFAULT_HIGHLIGHT_ACTION = [ ] -@attr.s -class EventPushSummary: - """Summary of pending event push actions for a given user in a given room.""" - - unread_count = attr.ib(type=int) - stream_ordering = attr.ib(type=int) - old_user_id = attr.ib(type=str) - notif_count = attr.ib(type=int) - - def _serialize_action(actions, is_highlight): """Custom serializer for actions. This allows us to "compress" common actions. @@ -123,7 +112,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): txn.execute(sql, (room_id, last_read_event_id)) results = txn.fetchall() if len(results) == 0: - return {"notify_count": 0, "highlight_count": 0, "unread_count": 0} + return {"notify_count": 0, "highlight_count": 0} stream_ordering = results[0][0] @@ -133,42 +122,25 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_unread_counts_by_pos_txn(self, txn, room_id, user_id, stream_ordering): - # First get number of actions, grouped on whether the action notifies. + # First get number of notifications. + # We don't need to put a notif=1 clause as all rows always have + # notif=1 sql = ( - "SELECT count(*), notif" + "SELECT count(*)" " FROM event_push_actions ea" " WHERE" " user_id = ?" " AND room_id = ?" " AND stream_ordering > ?" - " GROUP BY notif" ) - txn.execute(sql, (user_id, room_id, stream_ordering)) - rows = txn.fetchall() - # We should get a maximum number of two rows: one for notif = 0, which is the - # number of actions that contribute to the unread_count but not to the - # notify_count, and one for notif = 1, which is the number of actions that - # contribute to both counters. If one or both rows don't appear, then the - # value for the matching counter should be 0. - unread_count = 0 - notify_count = 0 - for row in rows: - # We always increment unread_count because actions that notify also - # contribute to it. - unread_count += row[0] - if row[1] == 1: - notify_count = row[0] - elif row[1] != 0: - logger.warning( - "Unexpected value %d for column 'notif' in table" - " 'event_push_actions'", - row[1], - ) + txn.execute(sql, (user_id, room_id, stream_ordering)) + row = txn.fetchone() + notify_count = row[0] if row else 0 txn.execute( """ - SELECT notif_count, unread_count FROM event_push_summary + SELECT notif_count FROM event_push_summary WHERE room_id = ? AND user_id = ? AND stream_ordering > ? """, (room_id, user_id, stream_ordering), @@ -176,7 +148,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): rows = txn.fetchall() if rows: notify_count += rows[0][0] - unread_count += rows[0][1] # Now get the number of highlights sql = ( @@ -193,11 +164,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): row = txn.fetchone() highlight_count = row[0] if row else 0 - return { - "unread_count": unread_count, - "notify_count": notify_count, - "highlight_count": highlight_count, - } + return {"notify_count": notify_count, "highlight_count": highlight_count} @defer.inlineCallbacks def get_push_action_users_in_range(self, min_stream_ordering, max_stream_ordering): @@ -255,7 +222,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -284,7 +250,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" " ORDER BY ep.stream_ordering ASC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -357,7 +322,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -386,7 +350,6 @@ class EventPushActionsWorkerStore(SQLBaseStore): " AND ep.user_id = ?" " AND ep.stream_ordering > ?" " AND ep.stream_ordering <= ?" - " AND ep.notif = 1" " ORDER BY ep.stream_ordering DESC LIMIT ?" ) args = [user_id, user_id, min_stream_ordering, max_stream_ordering, limit] @@ -436,7 +399,7 @@ class EventPushActionsWorkerStore(SQLBaseStore): def _get_if_maybe_push_in_range_for_user_txn(txn): sql = """ SELECT 1 FROM event_push_actions - WHERE user_id = ? AND stream_ordering > ? AND notif = 1 + WHERE user_id = ? AND stream_ordering > ? LIMIT 1 """ @@ -465,15 +428,14 @@ class EventPushActionsWorkerStore(SQLBaseStore): return # This is a helper function for generating the necessary tuple that - # can be used to insert into the `event_push_actions_staging` table. + # can be used to inert into the `event_push_actions_staging` table. def _gen_entry(user_id, actions): is_highlight = 1 if _action_has_highlight(actions) else 0 - notif = 0 if "org.matrix.msc2625.mark_unread" in actions else 1 return ( event_id, # event_id column user_id, # user_id column _serialize_action(actions, is_highlight), # actions column - notif, # notif column + 1, # notif column is_highlight, # highlight column ) @@ -855,51 +817,24 @@ class EventPushActionsStore(EventPushActionsWorkerStore): # Calculate the new counts that should be upserted into event_push_summary sql = """ SELECT user_id, room_id, - coalesce(old.%s, 0) + upd.cnt, + coalesce(old.notif_count, 0) + upd.notif_count, upd.stream_ordering, old.user_id FROM ( - SELECT user_id, room_id, count(*) as cnt, + SELECT user_id, room_id, count(*) as notif_count, max(stream_ordering) as stream_ordering FROM event_push_actions WHERE ? <= stream_ordering AND stream_ordering < ? AND highlight = 0 - %s GROUP BY user_id, room_id ) AS upd LEFT JOIN event_push_summary AS old USING (user_id, room_id) """ - # First get the count of unread messages. - txn.execute( - sql % ("unread_count", ""), - (old_rotate_stream_ordering, rotate_to_stream_ordering), - ) - - # We need to merge both lists into a single object because we might not have the - # same amount of rows in each of them. In this case we use a dict indexed on the - # user ID and room ID to make it easier to populate. - summaries = {} # type: Dict[Tuple[str, str], EventPushSummary] - for row in txn: - summaries[(row[0], row[1])] = EventPushSummary( - unread_count=row[2], - stream_ordering=row[3], - old_user_id=row[4], - notif_count=0, - ) - - # Then get the count of notifications. - txn.execute( - sql % ("notif_count", "AND notif = 1"), - (old_rotate_stream_ordering, rotate_to_stream_ordering), - ) - - # notif_rows is populated based on a subset of the query used to populate - # unread_rows, so we can be sure that there will be no KeyError here. - for row in txn: - summaries[(row[0], row[1])].notif_count = row[2] + txn.execute(sql, (old_rotate_stream_ordering, rotate_to_stream_ordering)) + rows = txn.fetchall() - logger.info("Rotating notifications, handling %d rows", len(summaries)) + logger.info("Rotating notifications, handling %d rows", len(rows)) # If the `old.user_id` above is NULL then we know there isn't already an # entry in the table, so we simply insert it. Otherwise we update the @@ -909,34 +844,22 @@ class EventPushActionsStore(EventPushActionsWorkerStore): table="event_push_summary", values=[ { - "user_id": user_id, - "room_id": room_id, - "notif_count": summary.notif_count, - "unread_count": summary.unread_count, - "stream_ordering": summary.stream_ordering, + "user_id": row[0], + "room_id": row[1], + "notif_count": row[2], + "stream_ordering": row[3], } - for ((user_id, room_id), summary) in summaries.items() - if summary.old_user_id is None + for row in rows + if row[4] is None ], ) txn.executemany( """ - UPDATE event_push_summary - SET notif_count = ?, unread_count = ?, stream_ordering = ? + UPDATE event_push_summary SET notif_count = ?, stream_ordering = ? WHERE user_id = ? AND room_id = ? """, - ( - ( - summary.notif_count, - summary.unread_count, - summary.stream_ordering, - user_id, - room_id, - ) - for ((user_id, room_id), summary) in summaries.items() - if summary.old_user_id is not None - ), + ((row[2], row[3], row[0], row[1]) for row in rows if row[4] is not None), ) txn.execute( diff --git a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql b/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql deleted file mode 100644 index f1459ef7f0..0000000000 --- a/synapse/storage/data_stores/main/schema/delta/58/07push_summary_unread_count.sql +++ /dev/null @@ -1,23 +0,0 @@ -/* Copyright 2020 The Matrix.org Foundation C.I.C - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - --- Store the number of unread messages, i.e. messages that triggered either a notify --- action or a mark_unread one. -ALTER TABLE event_push_summary ADD COLUMN unread_count BIGINT NOT NULL DEFAULT 0; - --- Pre-populate the new column with the count of pending notifications. --- We expect event_push_summary to be relatively small, so we can do this update --- synchronously without impacting Synapse's startup time too much. -UPDATE event_push_summary SET unread_count = notif_count; \ No newline at end of file diff --git a/tests/replication/slave/storage/test_events.py b/tests/replication/slave/storage/test_events.py index cd8680e812..1a88c7fb80 100644 --- a/tests/replication/slave/storage/test_events.py +++ b/tests/replication/slave/storage/test_events.py @@ -160,7 +160,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 0, "notify_count": 0, "unread_count": 0}, + {"highlight_count": 0, "notify_count": 0}, ) self.persist( @@ -173,7 +173,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 0, "notify_count": 1, "unread_count": 1}, + {"highlight_count": 0, "notify_count": 1}, ) self.persist( @@ -188,20 +188,7 @@ class SlavedEventStoreTestCase(BaseSlavedStoreTestCase): self.check( "get_unread_event_push_actions_by_room_for_user", [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 1, "notify_count": 2, "unread_count": 2}, - ) - - self.persist( - type="m.room.message", - msgtype="m.text", - body="world", - push_actions=[(USER_ID_2, ["org.matrix.msc2625.mark_unread"])], - ) - self.replicate() - self.check( - "get_unread_event_push_actions_by_room_for_user", - [ROOM_ID, USER_ID_2, event1.event_id], - {"highlight_count": 1, "notify_count": 2, "unread_count": 3}, + {"highlight_count": 1, "notify_count": 2}, ) def test_get_rooms_for_user_with_stream_ordering(self): diff --git a/tests/storage/test_event_push_actions.py b/tests/storage/test_event_push_actions.py index 303dc8571c..b45bc9c115 100644 --- a/tests/storage/test_event_push_actions.py +++ b/tests/storage/test_event_push_actions.py @@ -22,10 +22,6 @@ import tests.utils USER_ID = "@user:example.com" -MARK_UNREAD = [ - "org.matrix.msc2625.mark_unread", - {"set_tweak": "highlight", "value": False}, -] PlAIN_NOTIF = ["notify", {"set_tweak": "highlight", "value": False}] HIGHLIGHT = [ "notify", @@ -59,17 +55,13 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): user_id = "@user1235:example.com" @defer.inlineCallbacks - def _assert_counts(unread_count, notif_count, highlight_count): + def _assert_counts(noitf_count, highlight_count): counts = yield self.store.db.runInteraction( "", self.store._get_unread_counts_by_pos_txn, room_id, user_id, 0 ) self.assertEquals( counts, - { - "unread_count": unread_count, - "notify_count": notif_count, - "highlight_count": highlight_count, - }, + {"notify_count": noitf_count, "highlight_count": highlight_count}, ) @defer.inlineCallbacks @@ -104,23 +96,23 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): stream, ) - yield _assert_counts(0, 0, 0) + yield _assert_counts(0, 0) yield _inject_actions(1, PlAIN_NOTIF) - yield _assert_counts(1, 1, 0) + yield _assert_counts(1, 0) yield _rotate(2) - yield _assert_counts(1, 1, 0) + yield _assert_counts(1, 0) yield _inject_actions(3, PlAIN_NOTIF) - yield _assert_counts(2, 2, 0) + yield _assert_counts(2, 0) yield _rotate(4) - yield _assert_counts(2, 2, 0) + yield _assert_counts(2, 0) yield _inject_actions(5, PlAIN_NOTIF) yield _mark_read(3, 3) - yield _assert_counts(1, 1, 0) + yield _assert_counts(1, 0) yield _mark_read(5, 5) - yield _assert_counts(0, 0, 0) + yield _assert_counts(0, 0) yield _inject_actions(6, PlAIN_NOTIF) yield _rotate(7) @@ -129,22 +121,17 @@ class EventPushActionsStoreTestCase(tests.unittest.TestCase): table="event_push_actions", keyvalues={"1": 1}, desc="" ) - yield _assert_counts(1, 1, 0) + yield _assert_counts(1, 0) yield _mark_read(7, 7) - yield _assert_counts(0, 0, 0) + yield _assert_counts(0, 0) - yield _inject_actions(8, MARK_UNREAD) - yield _assert_counts(1, 0, 0) + yield _inject_actions(8, HIGHLIGHT) + yield _assert_counts(1, 1) yield _rotate(9) - yield _assert_counts(1, 0, 0) - - yield _inject_actions(10, HIGHLIGHT) - yield _assert_counts(2, 1, 1) - yield _rotate(11) - yield _assert_counts(2, 1, 1) - yield _rotate(12) - yield _assert_counts(2, 1, 1) + yield _assert_counts(1, 1) + yield _rotate(10) + yield _assert_counts(1, 1) @defer.inlineCallbacks def test_find_first_stream_ordering_after_ts(self): -- cgit 1.5.1 From e2f1cccc8aec83f265220283c1f9d3707d49bb7e Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Mon, 3 Aug 2020 11:52:52 +0100 Subject: Fix PUT /pushrules to use the right rule IDs --- synapse/rest/client/v1/push_rule.py | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) (limited to 'synapse/rest/client/v1/push_rule.py') diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 9fd4908136..f66b8fa7c4 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -25,7 +25,7 @@ from synapse.http.servlet import ( parse_json_value_from_request, parse_string, ) -from synapse.push.baserules import BASE_RULE_IDS +from synapse.push.baserules import BASE_RULE_IDS, NEW_RULE_IDS from synapse.push.clientformat import format_push_rules_for_user from synapse.push.rulekinds import PRIORITY_CLASS_MAP from synapse.rest.client.v2_alpha._base import client_patterns @@ -45,6 +45,8 @@ class PushRuleRestServlet(RestServlet): self.notifier = hs.get_notifier() self._is_worker = hs.config.worker_app is not None + self.users_new_default_push_rules = hs.config.users_new_default_push_rules + async def on_PUT(self, request, path): if self._is_worker: raise Exception("Cannot handle PUT /push_rules on worker") @@ -179,7 +181,12 @@ class PushRuleRestServlet(RestServlet): rule_id = spec["rule_id"] is_default_rule = rule_id.startswith(".") if is_default_rule: - if namespaced_rule_id not in BASE_RULE_IDS: + if user_id in self.users_new_default_push_rules: + rule_ids = NEW_RULE_IDS + else: + rule_ids = BASE_RULE_IDS + + if namespaced_rule_id not in rule_ids: raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,)) return self.store.set_push_rule_actions( user_id, namespaced_rule_id, actions, is_default_rule -- cgit 1.5.1 From dd11f575a29b59aced6cfa7ea7b9faea6f968f8d Mon Sep 17 00:00:00 2001 From: Brendan Abolivier Date: Thu, 6 Aug 2020 10:52:26 +0100 Subject: Incorporate review --- synapse/config/server.py | 3 +++ synapse/push/baserules.py | 20 ++++---------------- synapse/rest/client/v1/push_rule.py | 4 ++-- synapse/storage/data_stores/main/push_rule.py | 6 +++--- 4 files changed, 12 insertions(+), 21 deletions(-) (limited to 'synapse/rest/client/v1/push_rule.py') diff --git a/synapse/config/server.py b/synapse/config/server.py index 68d143410f..00fa07c225 100644 --- a/synapse/config/server.py +++ b/synapse/config/server.py @@ -540,6 +540,9 @@ class ServerConfig(Config): if not isinstance(self.users_new_default_push_rules, list): raise ConfigError("'users_new_default_push_rules' must be a list") + # Turn the list into a set to improve lookup speed. + self.users_new_default_push_rules = set(self.users_new_default_push_rules) + def has_tls_listener(self) -> bool: return any(listener.tls for listener in self.listeners) diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 172fd00f19..8047873ff1 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -24,6 +24,8 @@ def list_with_base_rules(rawrules, use_new_defaults=False): Args: rawrules(list): The rules the user has modified or set. + use_new_defaults(bool): Whether to use the new experimental default rules when + appending or prepending default rules. Returns: A new list with the rules set by the user combined with the defaults. @@ -125,11 +127,7 @@ def make_base_prepend_rules(kind, modified_base_rules, use_new_defaults=False): rules = [] if kind == "override": - rules = ( - NEW_PREPEND_OVERRIDE_RULES - if use_new_defaults - else BASE_PREPEND_OVERRIDE_RULES - ) + rules = BASE_PREPEND_OVERRIDE_RULES # Copy the rules before modifying them rules = copy.deepcopy(rules) @@ -171,16 +169,6 @@ BASE_PREPEND_OVERRIDE_RULES = [ ] -NEW_PREPEND_OVERRIDE_RULES = [ - { - "rule_id": "global/override/.m.rule.master", - "enabled": False, - "conditions": [], - "actions": [], - } -] - - BASE_APPEND_OVERRIDE_RULES = [ { "rule_id": "global/override/.m.rule.suppress_notices", @@ -573,7 +561,7 @@ for r in BASE_APPEND_CONTENT_RULES: r["default"] = True NEW_RULE_IDS.add(r["rule_id"]) -for r in NEW_PREPEND_OVERRIDE_RULES: +for r in BASE_PREPEND_OVERRIDE_RULES: r["priority_class"] = PRIORITY_CLASS_MAP["override"] r["default"] = True NEW_RULE_IDS.add(r["rule_id"]) diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index f66b8fa7c4..00831879f3 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -45,7 +45,7 @@ class PushRuleRestServlet(RestServlet): self.notifier = hs.get_notifier() self._is_worker = hs.config.worker_app is not None - self.users_new_default_push_rules = hs.config.users_new_default_push_rules + self._users_new_default_push_rules = hs.config.users_new_default_push_rules async def on_PUT(self, request, path): if self._is_worker: @@ -181,7 +181,7 @@ class PushRuleRestServlet(RestServlet): rule_id = spec["rule_id"] is_default_rule = rule_id.startswith(".") if is_default_rule: - if user_id in self.users_new_default_push_rules: + if user_id in self._users_new_default_push_rules: rule_ids = NEW_RULE_IDS else: rule_ids = BASE_RULE_IDS diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py index d644a0b8ce..6b650f8ba8 100644 --- a/synapse/storage/data_stores/main/push_rule.py +++ b/synapse/storage/data_stores/main/push_rule.py @@ -105,7 +105,7 @@ class PushRulesWorkerStore( prefilled_cache=push_rules_prefill, ) - self.users_new_default_push_rules = hs.config.users_new_default_push_rules + self._users_new_default_push_rules = hs.config.users_new_default_push_rules @abc.abstractmethod def get_max_push_rules_stream_id(self): @@ -136,7 +136,7 @@ class PushRulesWorkerStore( enabled_map = yield self.get_push_rules_enabled_for_user(user_id) - use_new_defaults = user_id in self.users_new_default_push_rules + use_new_defaults = user_id in self._users_new_default_push_rules rules = _load_rules(rows, enabled_map, use_new_defaults) @@ -198,7 +198,7 @@ class PushRulesWorkerStore( enabled_map_by_user = yield self.bulk_get_push_rules_enabled(user_ids) for user_id, rules in results.items(): - use_new_defaults = user_id in self.users_new_default_push_rules + use_new_defaults = user_id in self._users_new_default_push_rules results[user_id] = _load_rules( rules, enabled_map_by_user.get(user_id, {}), use_new_defaults, -- cgit 1.5.1 From b069b78bb4fc9ce005cc84099e208497a0789ddc Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Fri, 14 Aug 2020 10:30:16 -0400 Subject: Convert pusher databases to async/await. (#8075) --- changelog.d/8075.misc | 1 + synapse/rest/client/v1/push_rule.py | 9 +-- synapse/storage/databases/main/push_rule.py | 80 ++++++++++++------------ synapse/storage/databases/main/pusher.py | 95 ++++++++++++++--------------- 4 files changed, 90 insertions(+), 95 deletions(-) create mode 100644 changelog.d/8075.misc (limited to 'synapse/rest/client/v1/push_rule.py') diff --git a/changelog.d/8075.misc b/changelog.d/8075.misc new file mode 100644 index 0000000000..dfe4c03171 --- /dev/null +++ b/changelog.d/8075.misc @@ -0,0 +1 @@ +Convert various parts of the codebase to async/await. diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index 00831879f3..e2df638cc5 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -13,7 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. - from synapse.api.errors import ( NotFoundError, StoreError, @@ -163,7 +162,7 @@ class PushRuleRestServlet(RestServlet): stream_id, _ = self.store.get_push_rules_stream_token() self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) - def set_rule_attr(self, user_id, spec, val): + async def set_rule_attr(self, user_id, spec, val): if spec["attr"] == "enabled": if isinstance(val, dict) and "enabled" in val: val = val["enabled"] @@ -173,7 +172,9 @@ class PushRuleRestServlet(RestServlet): # bools directly, so let's not break them. raise SynapseError(400, "Value for 'enabled' must be boolean") namespaced_rule_id = _namespaced_rule_id_from_spec(spec) - return self.store.set_push_rule_enabled(user_id, namespaced_rule_id, val) + return await self.store.set_push_rule_enabled( + user_id, namespaced_rule_id, val + ) elif spec["attr"] == "actions": actions = val.get("actions") _check_actions(actions) @@ -188,7 +189,7 @@ class PushRuleRestServlet(RestServlet): if namespaced_rule_id not in rule_ids: raise SynapseError(404, "Unknown rule %r" % (namespaced_rule_id,)) - return self.store.set_push_rule_actions( + return await self.store.set_push_rule_actions( user_id, namespaced_rule_id, actions, is_default_rule ) else: diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index 6aa5802977..c2289a9557 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -32,7 +32,7 @@ from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException from synapse.storage.util.id_generators import ChainedIdGenerator from synapse.util import json_encoder -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache logger = logging.getLogger(__name__) @@ -115,9 +115,9 @@ class PushRulesWorkerStore( """ raise NotImplementedError() - @cachedInlineCallbacks(max_entries=5000) - def get_push_rules_for_user(self, user_id): - rows = yield self.db_pool.simple_select_list( + @cached(max_entries=5000) + async def get_push_rules_for_user(self, user_id): + rows = await self.db_pool.simple_select_list( table="push_rules", keyvalues={"user_name": user_id}, retcols=( @@ -133,17 +133,15 @@ class PushRulesWorkerStore( rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) - enabled_map = yield self.get_push_rules_enabled_for_user(user_id) + enabled_map = await self.get_push_rules_enabled_for_user(user_id) use_new_defaults = user_id in self._users_new_default_push_rules - rules = _load_rules(rows, enabled_map, use_new_defaults) + return _load_rules(rows, enabled_map, use_new_defaults) - return rules - - @cachedInlineCallbacks(max_entries=5000) - def get_push_rules_enabled_for_user(self, user_id): - results = yield self.db_pool.simple_select_list( + @cached(max_entries=5000) + async def get_push_rules_enabled_for_user(self, user_id): + results = await self.db_pool.simple_select_list( table="push_rules_enable", keyvalues={"user_name": user_id}, retcols=("user_name", "rule_id", "enabled"), @@ -202,14 +200,15 @@ class PushRulesWorkerStore( return results - @defer.inlineCallbacks - def copy_push_rule_from_room_to_room(self, new_room_id, user_id, rule): + async def copy_push_rule_from_room_to_room( + self, new_room_id: str, user_id: str, rule: dict + ) -> None: """Copy a single push rule from one room to another for a specific user. Args: - new_room_id (str): ID of the new room. - user_id (str): ID of user the push rule belongs to. - rule (Dict): A push rule. + new_room_id: ID of the new room. + user_id : ID of user the push rule belongs to. + rule: A push rule. """ # Create new rule id rule_id_scope = "/".join(rule["rule_id"].split("/")[:-1]) @@ -221,7 +220,7 @@ class PushRulesWorkerStore( condition["pattern"] = new_room_id # Add the rule for the new room - yield self.add_push_rule( + await self.add_push_rule( user_id=user_id, rule_id=new_rule_id, priority_class=rule["priority_class"], @@ -229,20 +228,19 @@ class PushRulesWorkerStore( actions=rule["actions"], ) - @defer.inlineCallbacks - def copy_push_rules_from_room_to_room_for_user( - self, old_room_id, new_room_id, user_id - ): + async def copy_push_rules_from_room_to_room_for_user( + self, old_room_id: str, new_room_id: str, user_id: str + ) -> None: """Copy all of the push rules from one room to another for a specific user. Args: - old_room_id (str): ID of the old room. - new_room_id (str): ID of the new room. - user_id (str): ID of user to copy push rules for. + old_room_id: ID of the old room. + new_room_id: ID of the new room. + user_id: ID of user to copy push rules for. """ # Retrieve push rules for this user - user_push_rules = yield self.get_push_rules_for_user(user_id) + user_push_rules = await self.get_push_rules_for_user(user_id) # Get rules relating to the old room and copy them to the new room for rule in user_push_rules: @@ -251,7 +249,7 @@ class PushRulesWorkerStore( (c.get("key") == "room_id" and c.get("pattern") == old_room_id) for c in conditions ): - yield self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) + await self.copy_push_rule_from_room_to_room(new_room_id, user_id, rule) @cachedList( cached_method_name="get_push_rules_enabled_for_user", @@ -328,8 +326,7 @@ class PushRulesWorkerStore( class PushRuleStore(PushRulesWorkerStore): - @defer.inlineCallbacks - def add_push_rule( + async def add_push_rule( self, user_id, rule_id, @@ -338,13 +335,13 @@ class PushRuleStore(PushRulesWorkerStore): actions, before=None, after=None, - ): + ) -> None: conditions_json = json_encoder.encode(conditions) actions_json = json_encoder.encode(actions) with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids if before or after: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_add_push_rule_relative_txn", self._add_push_rule_relative_txn, stream_id, @@ -358,7 +355,7 @@ class PushRuleStore(PushRulesWorkerStore): after, ) else: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_add_push_rule_highest_priority_txn", self._add_push_rule_highest_priority_txn, stream_id, @@ -542,16 +539,15 @@ class PushRuleStore(PushRulesWorkerStore): }, ) - @defer.inlineCallbacks - def delete_push_rule(self, user_id, rule_id): + async def delete_push_rule(self, user_id: str, rule_id: str) -> None: """ Delete a push rule. Args specify the row to be deleted and can be any of the columns in the push_rule table, but below are the standard ones Args: - user_id (str): The matrix ID of the push rule owner - rule_id (str): The rule_id of the rule to be deleted + user_id: The matrix ID of the push rule owner + rule_id: The rule_id of the rule to be deleted """ def delete_push_rule_txn(txn, stream_id, event_stream_ordering): @@ -565,18 +561,17 @@ class PushRuleStore(PushRulesWorkerStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "delete_push_rule", delete_push_rule_txn, stream_id, event_stream_ordering, ) - @defer.inlineCallbacks - def set_push_rule_enabled(self, user_id, rule_id, enabled): + async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None: with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "_set_push_rule_enabled_txn", self._set_push_rule_enabled_txn, stream_id, @@ -607,8 +602,9 @@ class PushRuleStore(PushRulesWorkerStore): op="ENABLE" if enabled else "DISABLE", ) - @defer.inlineCallbacks - def set_push_rule_actions(self, user_id, rule_id, actions, is_default_rule): + async def set_push_rule_actions( + self, user_id, rule_id, actions, is_default_rule + ) -> None: actions_json = json_encoder.encode(actions) def set_push_rule_actions_txn(txn, stream_id, event_stream_ordering): @@ -649,7 +645,7 @@ class PushRuleStore(PushRulesWorkerStore): with self._push_rules_stream_id_gen.get_next() as ids: stream_id, event_stream_ordering = ids - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "set_push_rule_actions", set_push_rule_actions_txn, stream_id, diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index 8b793d1487..1126fd0751 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -19,10 +19,8 @@ from typing import Iterable, Iterator, List, Tuple from canonicaljson import encode_canonical_json -from twisted.internet import defer - from synapse.storage._base import SQLBaseStore, db_to_json -from synapse.util.caches.descriptors import cachedInlineCallbacks, cachedList +from synapse.util.caches.descriptors import cached, cachedList logger = logging.getLogger(__name__) @@ -34,23 +32,22 @@ class PusherWorkerStore(SQLBaseStore): Drops any rows whose data cannot be decoded """ for r in rows: - dataJson = r["data"] + data_json = r["data"] try: - r["data"] = db_to_json(dataJson) + r["data"] = db_to_json(data_json) except Exception as e: logger.warning( "Invalid JSON in data for pusher %d: %s, %s", r["id"], - dataJson, + data_json, e.args[0], ) continue yield r - @defer.inlineCallbacks - def user_has_pusher(self, user_id): - ret = yield self.db_pool.simple_select_one_onecol( + async def user_has_pusher(self, user_id): + ret = await self.db_pool.simple_select_one_onecol( "pushers", {"user_name": user_id}, "id", allow_none=True ) return ret is not None @@ -61,9 +58,8 @@ class PusherWorkerStore(SQLBaseStore): def get_pushers_by_user_id(self, user_id): return self.get_pushers_by({"user_name": user_id}) - @defer.inlineCallbacks - def get_pushers_by(self, keyvalues): - ret = yield self.db_pool.simple_select_list( + async def get_pushers_by(self, keyvalues): + ret = await self.db_pool.simple_select_list( "pushers", keyvalues, [ @@ -87,16 +83,14 @@ class PusherWorkerStore(SQLBaseStore): ) return self._decode_pushers_rows(ret) - @defer.inlineCallbacks - def get_all_pushers(self): + async def get_all_pushers(self): def get_pushers(txn): txn.execute("SELECT * FROM pushers") rows = self.db_pool.cursor_to_dict(txn) return self._decode_pushers_rows(rows) - rows = yield self.db_pool.runInteraction("get_all_pushers", get_pushers) - return rows + return await self.db_pool.runInteraction("get_all_pushers", get_pushers) async def get_all_updated_pushers_rows( self, instance_name: str, last_id: int, current_id: int, limit: int @@ -164,8 +158,8 @@ class PusherWorkerStore(SQLBaseStore): "get_all_updated_pushers_rows", get_all_updated_pushers_rows_txn ) - @cachedInlineCallbacks(num_args=1, max_entries=15000) - def get_if_user_has_pusher(self, user_id): + @cached(num_args=1, max_entries=15000) + async def get_if_user_has_pusher(self, user_id): # This only exists for the cachedList decorator raise NotImplementedError() @@ -186,34 +180,38 @@ class PusherWorkerStore(SQLBaseStore): return result - @defer.inlineCallbacks - def update_pusher_last_stream_ordering( + async def update_pusher_last_stream_ordering( self, app_id, pushkey, user_id, last_stream_ordering - ): - yield self.db_pool.simple_update_one( + ) -> None: + await self.db_pool.simple_update_one( "pushers", {"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, {"last_stream_ordering": last_stream_ordering}, desc="update_pusher_last_stream_ordering", ) - @defer.inlineCallbacks - def update_pusher_last_stream_ordering_and_success( - self, app_id, pushkey, user_id, last_stream_ordering, last_success - ): + async def update_pusher_last_stream_ordering_and_success( + self, + app_id: str, + pushkey: str, + user_id: str, + last_stream_ordering: int, + last_success: int, + ) -> bool: """Update the last stream ordering position we've processed up to for the given pusher. Args: - app_id (str) - pushkey (str) - last_stream_ordering (int) - last_success (int) + app_id + pushkey + user_id + last_stream_ordering + last_success Returns: - Deferred[bool]: True if the pusher still exists; False if it has been deleted. + True if the pusher still exists; False if it has been deleted. """ - updated = yield self.db_pool.simple_update( + updated = await self.db_pool.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={ @@ -225,18 +223,18 @@ class PusherWorkerStore(SQLBaseStore): return bool(updated) - @defer.inlineCallbacks - def update_pusher_failing_since(self, app_id, pushkey, user_id, failing_since): - yield self.db_pool.simple_update( + async def update_pusher_failing_since( + self, app_id, pushkey, user_id, failing_since + ) -> None: + await self.db_pool.simple_update( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, updatevalues={"failing_since": failing_since}, desc="update_pusher_failing_since", ) - @defer.inlineCallbacks - def get_throttle_params_by_room(self, pusher_id): - res = yield self.db_pool.simple_select_list( + async def get_throttle_params_by_room(self, pusher_id): + res = await self.db_pool.simple_select_list( "pusher_throttle", {"pusher": pusher_id}, ["room_id", "last_sent_ts", "throttle_ms"], @@ -252,11 +250,10 @@ class PusherWorkerStore(SQLBaseStore): return params_by_room - @defer.inlineCallbacks - def set_throttle_params(self, pusher_id, room_id, params): + async def set_throttle_params(self, pusher_id, room_id, params) -> None: # no need to lock because `pusher_throttle` has a primary key on # (pusher, room_id) so simple_upsert will retry - yield self.db_pool.simple_upsert( + await self.db_pool.simple_upsert( "pusher_throttle", {"pusher": pusher_id, "room_id": room_id}, params, @@ -269,8 +266,7 @@ class PusherStore(PusherWorkerStore): def get_pushers_stream_token(self): return self._pushers_id_gen.get_current_token() - @defer.inlineCallbacks - def add_pusher( + async def add_pusher( self, user_id, access_token, @@ -284,11 +280,11 @@ class PusherStore(PusherWorkerStore): data, last_stream_ordering, profile_tag="", - ): + ) -> None: with self._pushers_id_gen.get_next() as stream_id: # no need to lock because `pushers` has a unique key on # (app_id, pushkey, user_name) so simple_upsert will retry - yield self.db_pool.simple_upsert( + await self.db_pool.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, values={ @@ -313,15 +309,16 @@ class PusherStore(PusherWorkerStore): if user_has_pusher is not True: # invalidate, since we the user might not have had a pusher before - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "add_pusher", self._invalidate_cache_and_stream, self.get_if_user_has_pusher, (user_id,), ) - @defer.inlineCallbacks - def delete_pusher_by_app_id_pushkey_user_id(self, app_id, pushkey, user_id): + async def delete_pusher_by_app_id_pushkey_user_id( + self, app_id, pushkey, user_id + ) -> None: def delete_pusher_txn(txn, stream_id): self._invalidate_cache_and_stream( txn, self.get_if_user_has_pusher, (user_id,) @@ -348,6 +345,6 @@ class PusherStore(PusherWorkerStore): ) with self._pushers_id_gen.get_next() as stream_id: - yield self.db_pool.runInteraction( + await self.db_pool.runInteraction( "delete_pusher", delete_pusher_txn, stream_id ) -- cgit 1.5.1 From c9c544cda5748ab106464a8f58031bd60d1aba7b Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Wed, 19 Aug 2020 13:41:51 +0100 Subject: Remove `ChainedIdGenerator`. (#8123) It's just a thin wrapper around two ID gens to make `get_current_token` and `get_next` return tuples. This can easily be replaced by calling the appropriate methods on the underlying ID gens directly. --- changelog.d/8123.misc | 1 + synapse/replication/slave/storage/push_rule.py | 10 ++-- synapse/replication/tcp/streams/_base.py | 2 +- synapse/rest/client/v1/push_rule.py | 2 +- synapse/storage/databases/main/push_rule.py | 36 +++++++------- synapse/storage/util/id_generators.py | 68 +------------------------- synapse/streams/events.py | 2 +- 7 files changed, 26 insertions(+), 95 deletions(-) create mode 100644 changelog.d/8123.misc (limited to 'synapse/rest/client/v1/push_rule.py') diff --git a/changelog.d/8123.misc b/changelog.d/8123.misc new file mode 100644 index 0000000000..7245122896 --- /dev/null +++ b/changelog.d/8123.misc @@ -0,0 +1 @@ +Remove `ChainedIdGenerator`. diff --git a/synapse/replication/slave/storage/push_rule.py b/synapse/replication/slave/storage/push_rule.py index 590187df46..90d90833f9 100644 --- a/synapse/replication/slave/storage/push_rule.py +++ b/synapse/replication/slave/storage/push_rule.py @@ -14,6 +14,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +from synapse.replication.slave.storage._slaved_id_tracker import SlavedIdTracker from synapse.replication.tcp.streams import PushRulesStream from synapse.storage.databases.main.push_rule import PushRulesWorkerStore @@ -21,16 +22,13 @@ from .events import SlavedEventStore class SlavedPushRuleStore(SlavedEventStore, PushRulesWorkerStore): - def get_push_rules_stream_token(self): - return ( - self._push_rules_stream_id_gen.get_current_token(), - self._stream_id_gen.get_current_token(), - ) - def get_max_push_rules_stream_id(self): return self._push_rules_stream_id_gen.get_current_token() def process_replication_rows(self, stream_name, instance_name, token, rows): + # We assert this for the benefit of mypy + assert isinstance(self._push_rules_stream_id_gen, SlavedIdTracker) + if stream_name == PushRulesStream.NAME: self._push_rules_stream_id_gen.advance(token) for row in rows: diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py index 1e92d52165..8c3caf30c9 100644 --- a/synapse/replication/tcp/streams/_base.py +++ b/synapse/replication/tcp/streams/_base.py @@ -352,7 +352,7 @@ class PushRulesStream(Stream): ) def _current_token(self, instance_name: str) -> int: - push_rules_token, _ = self.store.get_push_rules_stream_token() + push_rules_token = self.store.get_max_push_rules_stream_id() return push_rules_token diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py index e2df638cc5..e781a3bcf4 100644 --- a/synapse/rest/client/v1/push_rule.py +++ b/synapse/rest/client/v1/push_rule.py @@ -159,7 +159,7 @@ class PushRuleRestServlet(RestServlet): return 200, {} def notify_user(self, user_id): - stream_id, _ = self.store.get_push_rules_stream_token() + stream_id = self.store.get_max_push_rules_stream_id() self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id]) async def set_rule_attr(self, user_id, spec, val): diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py index c2289a9557..a585e54812 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py @@ -30,7 +30,7 @@ from synapse.storage.databases.main.pusher import PusherWorkerStore from synapse.storage.databases.main.receipts import ReceiptsWorkerStore from synapse.storage.databases.main.roommember import RoomMemberWorkerStore from synapse.storage.push_rule import InconsistentRuleException, RuleNotFoundException -from synapse.storage.util.id_generators import ChainedIdGenerator +from synapse.storage.util.id_generators import StreamIdGenerator from synapse.util import json_encoder from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -82,9 +82,9 @@ class PushRulesWorkerStore( super(PushRulesWorkerStore, self).__init__(database, db_conn, hs) if hs.config.worker.worker_app is None: - self._push_rules_stream_id_gen = ChainedIdGenerator( - self._stream_id_gen, db_conn, "push_rules_stream", "stream_id" - ) # type: Union[ChainedIdGenerator, SlavedIdTracker] + self._push_rules_stream_id_gen = StreamIdGenerator( + db_conn, "push_rules_stream", "stream_id" + ) # type: Union[StreamIdGenerator, SlavedIdTracker] else: self._push_rules_stream_id_gen = SlavedIdTracker( db_conn, "push_rules_stream", "stream_id" @@ -338,8 +338,9 @@ class PushRuleStore(PushRulesWorkerStore): ) -> None: conditions_json = json_encoder.encode(conditions) actions_json = json_encoder.encode(actions) - with self._push_rules_stream_id_gen.get_next() as ids: - stream_id, event_stream_ordering = ids + with self._push_rules_stream_id_gen.get_next() as stream_id: + event_stream_ordering = self._stream_id_gen.get_current_token() + if before or after: await self.db_pool.runInteraction( "_add_push_rule_relative_txn", @@ -559,8 +560,9 @@ class PushRuleStore(PushRulesWorkerStore): txn, stream_id, event_stream_ordering, user_id, rule_id, op="DELETE" ) - with self._push_rules_stream_id_gen.get_next() as ids: - stream_id, event_stream_ordering = ids + with self._push_rules_stream_id_gen.get_next() as stream_id: + event_stream_ordering = self._stream_id_gen.get_current_token() + await self.db_pool.runInteraction( "delete_push_rule", delete_push_rule_txn, @@ -569,8 +571,9 @@ class PushRuleStore(PushRulesWorkerStore): ) async def set_push_rule_enabled(self, user_id, rule_id, enabled) -> None: - with self._push_rules_stream_id_gen.get_next() as ids: - stream_id, event_stream_ordering = ids + with self._push_rules_stream_id_gen.get_next() as stream_id: + event_stream_ordering = self._stream_id_gen.get_current_token() + await self.db_pool.runInteraction( "_set_push_rule_enabled_txn", self._set_push_rule_enabled_txn, @@ -643,8 +646,9 @@ class PushRuleStore(PushRulesWorkerStore): data={"actions": actions_json}, ) - with self._push_rules_stream_id_gen.get_next() as ids: - stream_id, event_stream_ordering = ids + with self._push_rules_stream_id_gen.get_next() as stream_id: + event_stream_ordering = self._stream_id_gen.get_current_token() + await self.db_pool.runInteraction( "set_push_rule_actions", set_push_rule_actions_txn, @@ -673,11 +677,5 @@ class PushRuleStore(PushRulesWorkerStore): self.push_rules_stream_cache.entity_has_changed, user_id, stream_id ) - def get_push_rules_stream_token(self): - """Get the position of the push rules stream. - Returns a pair of a stream id for the push_rules stream and the - room stream ordering it corresponds to.""" - return self._push_rules_stream_id_gen.get_current_token() - def get_max_push_rules_stream_id(self): - return self.get_push_rules_stream_token()[0] + return self._push_rules_stream_id_gen.get_current_token() diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 8276a755e5..0bf772d4d1 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -16,7 +16,7 @@ import contextlib import threading from collections import deque -from typing import Dict, Set, Tuple +from typing import Dict, Set from typing_extensions import Deque @@ -167,72 +167,6 @@ class StreamIdGenerator(object): return self.get_current_token() -class ChainedIdGenerator(object): - """Used to generate new stream ids where the stream must be kept in sync - with another stream. It generates pairs of IDs, the first element is an - integer ID for this stream, the second element is the ID for the stream - that this stream needs to be kept in sync with.""" - - def __init__(self, chained_generator, db_conn, table, column): - self.chained_generator = chained_generator - self._table = table - self._lock = threading.Lock() - self._current_max = _load_current_id(db_conn, table, column) - self._unfinished_ids = deque() # type: Deque[Tuple[int, int]] - - def get_next(self): - """ - Usage: - with stream_id_gen.get_next() as (stream_id, chained_id): - # ... persist event ... - """ - with self._lock: - self._current_max += 1 - next_id = self._current_max - chained_id = self.chained_generator.get_current_token() - - self._unfinished_ids.append((next_id, chained_id)) - - @contextlib.contextmanager - def manager(): - try: - yield (next_id, chained_id) - finally: - with self._lock: - self._unfinished_ids.remove((next_id, chained_id)) - - return manager() - - def get_current_token(self): - """Returns the maximum stream id such that all stream ids less than or - equal to it have been successfully persisted. - """ - with self._lock: - if self._unfinished_ids: - stream_id, chained_id = self._unfinished_ids[0] - return stream_id - 1, chained_id - - return self._current_max, self.chained_generator.get_current_token() - - def advance(self, token: int): - """Stub implementation for advancing the token when receiving updates - over replication; raises an exception as this instance should be the - only source of updates. - """ - - raise Exception( - "Attempted to advance token on source for table %r", self._table - ) - - def get_current_token_for_writer(self, instance_name: str) -> Tuple[int, int]: - """Returns the position of the given writer. - - For streams with single writers this is equivalent to - `get_current_token`. - """ - return self.get_current_token() - - class MultiWriterIdGenerator: """An ID generator that tracks a stream that can have multiple writers. diff --git a/synapse/streams/events.py b/synapse/streams/events.py index 393e34b9fb..7ab46f42bf 100644 --- a/synapse/streams/events.py +++ b/synapse/streams/events.py @@ -39,7 +39,7 @@ class EventSources(object): self.store = hs.get_datastore() def get_current_token(self) -> StreamToken: - push_rules_key, _ = self.store.get_push_rules_stream_token() + push_rules_key = self.store.get_max_push_rules_stream_id() to_device_key = self.store.get_to_device_stream_token() device_list_key = self.store.get_device_stream_token() groups_key = self.store.get_group_stream_token() -- cgit 1.5.1