summary refs log tree commit diff
path: root/synapse/storage/databases/main/push_rule.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases/main/push_rule.py')
-rw-r--r--synapse/storage/databases/main/push_rule.py149
1 files changed, 101 insertions, 48 deletions
diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py

index 923166974c..37135d431d 100644 --- a/synapse/storage/databases/main/push_rule.py +++ b/synapse/storage/databases/main/push_rule.py
@@ -28,8 +28,11 @@ from typing import ( cast, ) +from twisted.internet import defer + from synapse.api.errors import StoreError from synapse.config.homeserver import ExperimentalConfig +from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.replication.tcp.streams import PushRulesStream from synapse.storage._base import SQLBaseStore from synapse.storage.database import ( @@ -51,7 +54,8 @@ from synapse.storage.util.id_generators import ( ) from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules from synapse.types import JsonDict -from synapse.util import json_encoder +from synapse.util import json_encoder, unwrapFirstError +from synapse.util.async_helpers import gather_results from synapse.util.caches.descriptors import cached, cachedList from synapse.util.caches.stream_change_cache import StreamChangeCache @@ -62,20 +66,34 @@ logger = logging.getLogger(__name__) def _load_rules( - rawrules: List[JsonDict], + rawrules: List[Tuple[str, int, str, str]], enabled_map: Dict[str, bool], experimental_config: ExperimentalConfig, ) -> FilteredPushRules: """Take the DB rows returned from the DB and convert them into a full `FilteredPushRules` object. + + Args: + rawrules: List of tuples of: + * rule ID + * Priority lass + * Conditions (as serialized JSON) + * Actions (as serialized JSON) + enabled_map: A dictionary of rule ID to a boolean of whether the rule is + enabled. This might not include all rule IDs from rawrules. + experimental_config: The `experimental_features` section of the Synapse + config. (Used to check if various features are enabled.) + + Returns: + A new FilteredPushRules object. """ ruleslist = [ PushRule.from_db( - rule_id=rawrule["rule_id"], - priority_class=rawrule["priority_class"], - conditions=rawrule["conditions"], - actions=rawrule["actions"], + rule_id=rawrule[0], + priority_class=rawrule[1], + conditions=rawrule[2], + actions=rawrule[3], ) for rawrule in rawrules ] @@ -165,34 +183,44 @@ class PushRulesWorkerStore( @cached(max_entries=5000) async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules: - rows = await self.db_pool.simple_select_list( - table="push_rules", - keyvalues={"user_name": user_id}, - retcols=( - "user_name", - "rule_id", - "priority_class", - "priority", - "conditions", - "actions", + rows = cast( + List[Tuple[str, int, int, str, str]], + await self.db_pool.simple_select_list( + table="push_rules", + keyvalues={"user_name": user_id}, + retcols=( + "rule_id", + "priority_class", + "priority", + "conditions", + "actions", + ), + desc="get_push_rules_for_user", ), - desc="get_push_rules_for_user", ) - rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) + # Sort by highest priority_class, then highest priority. + rows.sort(key=lambda row: (-int(row[1]), -int(row[2]))) enabled_map = await self.get_push_rules_enabled_for_user(user_id) - return _load_rules(rows, enabled_map, self.hs.config.experimental) + return _load_rules( + [(row[0], row[1], row[3], row[4]) for row in rows], + enabled_map, + self.hs.config.experimental, + ) async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]: - results = await self.db_pool.simple_select_list( - table="push_rules_enable", - keyvalues={"user_name": user_id}, - retcols=("rule_id", "enabled"), - desc="get_push_rules_enabled_for_user", + results = cast( + List[Tuple[str, Optional[Union[int, bool]]]], + await self.db_pool.simple_select_list( + table="push_rules_enable", + keyvalues={"user_name": user_id}, + retcols=("rule_id", "enabled"), + desc="get_push_rules_enabled_for_user", + ), ) - return {r["rule_id"]: bool(r["enabled"]) for r in results} + return {r[0]: bool(r[1]) for r in results} async def have_push_rules_changed_for_user( self, user_id: str, last_id: int @@ -221,23 +249,46 @@ class PushRulesWorkerStore( if not user_ids: return {} - raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids} + raw_rules: Dict[str, List[Tuple[str, int, str, str]]] = { + user_id: [] for user_id in user_ids + } - rows = await self.db_pool.simple_select_many_batch( - table="push_rules", - column="user_name", - iterable=user_ids, - retcols=("*",), - desc="bulk_get_push_rules", - batch_size=1000, + # gatherResults loses all type information. + rows, enabled_map_by_user = await make_deferred_yieldable( + gather_results( + ( + cast( + "defer.Deferred[List[Tuple[str, str, int, int, str, str]]]", + run_in_background( + self.db_pool.simple_select_many_batch, + table="push_rules", + column="user_name", + iterable=user_ids, + retcols=( + "user_name", + "rule_id", + "priority_class", + "priority", + "conditions", + "actions", + ), + desc="bulk_get_push_rules", + batch_size=1000, + ), + ), + run_in_background(self.bulk_get_push_rules_enabled, user_ids), + ), + consumeErrors=True, + ).addErrback(unwrapFirstError) ) - rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"]))) - - for row in rows: - raw_rules.setdefault(row["user_name"], []).append(row) + # Sort by highest priority_class, then highest priority. + rows.sort(key=lambda row: (-int(row[2]), -int(row[3]))) - enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids) + for user_name, rule_id, priority_class, _, conditions, actions in rows: + raw_rules.setdefault(user_name, []).append( + (rule_id, priority_class, conditions, actions) + ) results: Dict[str, FilteredPushRules] = {} @@ -256,17 +307,19 @@ class PushRulesWorkerStore( results: Dict[str, Dict[str, bool]] = {user_id: {} for user_id in user_ids} - rows = await self.db_pool.simple_select_many_batch( - table="push_rules_enable", - column="user_name", - iterable=user_ids, - retcols=("user_name", "rule_id", "enabled"), - desc="bulk_get_push_rules_enabled", - batch_size=1000, + rows = cast( + List[Tuple[str, str, Optional[int]]], + await self.db_pool.simple_select_many_batch( + table="push_rules_enable", + column="user_name", + iterable=user_ids, + retcols=("user_name", "rule_id", "enabled"), + desc="bulk_get_push_rules_enabled", + batch_size=1000, + ), ) - for row in rows: - enabled = bool(row["enabled"]) - results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled + for user_name, rule_id, enabled in rows: + results.setdefault(user_name, {})[rule_id] = bool(enabled) return results async def get_all_push_rule_updates(