diff --git a/synapse/storage/databases/main/push_rule.py b/synapse/storage/databases/main/push_rule.py
index 923166974c..37135d431d 100644
--- a/synapse/storage/databases/main/push_rule.py
+++ b/synapse/storage/databases/main/push_rule.py
@@ -28,8 +28,11 @@ from typing import (
cast,
)
+from twisted.internet import defer
+
from synapse.api.errors import StoreError
from synapse.config.homeserver import ExperimentalConfig
+from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.replication.tcp.streams import PushRulesStream
from synapse.storage._base import SQLBaseStore
from synapse.storage.database import (
@@ -51,7 +54,8 @@ from synapse.storage.util.id_generators import (
)
from synapse.synapse_rust.push import FilteredPushRules, PushRule, PushRules
from synapse.types import JsonDict
-from synapse.util import json_encoder
+from synapse.util import json_encoder, unwrapFirstError
+from synapse.util.async_helpers import gather_results
from synapse.util.caches.descriptors import cached, cachedList
from synapse.util.caches.stream_change_cache import StreamChangeCache
@@ -62,20 +66,34 @@ logger = logging.getLogger(__name__)
def _load_rules(
- rawrules: List[JsonDict],
+ rawrules: List[Tuple[str, int, str, str]],
enabled_map: Dict[str, bool],
experimental_config: ExperimentalConfig,
) -> FilteredPushRules:
"""Take the DB rows returned from the DB and convert them into a full
`FilteredPushRules` object.
+
+ Args:
+ rawrules: List of tuples of:
+ * rule ID
+ * Priority lass
+ * Conditions (as serialized JSON)
+ * Actions (as serialized JSON)
+ enabled_map: A dictionary of rule ID to a boolean of whether the rule is
+ enabled. This might not include all rule IDs from rawrules.
+ experimental_config: The `experimental_features` section of the Synapse
+ config. (Used to check if various features are enabled.)
+
+ Returns:
+ A new FilteredPushRules object.
"""
ruleslist = [
PushRule.from_db(
- rule_id=rawrule["rule_id"],
- priority_class=rawrule["priority_class"],
- conditions=rawrule["conditions"],
- actions=rawrule["actions"],
+ rule_id=rawrule[0],
+ priority_class=rawrule[1],
+ conditions=rawrule[2],
+ actions=rawrule[3],
)
for rawrule in rawrules
]
@@ -165,34 +183,44 @@ class PushRulesWorkerStore(
@cached(max_entries=5000)
async def get_push_rules_for_user(self, user_id: str) -> FilteredPushRules:
- rows = await self.db_pool.simple_select_list(
- table="push_rules",
- keyvalues={"user_name": user_id},
- retcols=(
- "user_name",
- "rule_id",
- "priority_class",
- "priority",
- "conditions",
- "actions",
+ rows = cast(
+ List[Tuple[str, int, int, str, str]],
+ await self.db_pool.simple_select_list(
+ table="push_rules",
+ keyvalues={"user_name": user_id},
+ retcols=(
+ "rule_id",
+ "priority_class",
+ "priority",
+ "conditions",
+ "actions",
+ ),
+ desc="get_push_rules_for_user",
),
- desc="get_push_rules_for_user",
)
- rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
+ # Sort by highest priority_class, then highest priority.
+ rows.sort(key=lambda row: (-int(row[1]), -int(row[2])))
enabled_map = await self.get_push_rules_enabled_for_user(user_id)
- return _load_rules(rows, enabled_map, self.hs.config.experimental)
+ return _load_rules(
+ [(row[0], row[1], row[3], row[4]) for row in rows],
+ enabled_map,
+ self.hs.config.experimental,
+ )
async def get_push_rules_enabled_for_user(self, user_id: str) -> Dict[str, bool]:
- results = await self.db_pool.simple_select_list(
- table="push_rules_enable",
- keyvalues={"user_name": user_id},
- retcols=("rule_id", "enabled"),
- desc="get_push_rules_enabled_for_user",
+ results = cast(
+ List[Tuple[str, Optional[Union[int, bool]]]],
+ await self.db_pool.simple_select_list(
+ table="push_rules_enable",
+ keyvalues={"user_name": user_id},
+ retcols=("rule_id", "enabled"),
+ desc="get_push_rules_enabled_for_user",
+ ),
)
- return {r["rule_id"]: bool(r["enabled"]) for r in results}
+ return {r[0]: bool(r[1]) for r in results}
async def have_push_rules_changed_for_user(
self, user_id: str, last_id: int
@@ -221,23 +249,46 @@ class PushRulesWorkerStore(
if not user_ids:
return {}
- raw_rules: Dict[str, List[JsonDict]] = {user_id: [] for user_id in user_ids}
+ raw_rules: Dict[str, List[Tuple[str, int, str, str]]] = {
+ user_id: [] for user_id in user_ids
+ }
- rows = await self.db_pool.simple_select_many_batch(
- table="push_rules",
- column="user_name",
- iterable=user_ids,
- retcols=("*",),
- desc="bulk_get_push_rules",
- batch_size=1000,
+ # gatherResults loses all type information.
+ rows, enabled_map_by_user = await make_deferred_yieldable(
+ gather_results(
+ (
+ cast(
+ "defer.Deferred[List[Tuple[str, str, int, int, str, str]]]",
+ run_in_background(
+ self.db_pool.simple_select_many_batch,
+ table="push_rules",
+ column="user_name",
+ iterable=user_ids,
+ retcols=(
+ "user_name",
+ "rule_id",
+ "priority_class",
+ "priority",
+ "conditions",
+ "actions",
+ ),
+ desc="bulk_get_push_rules",
+ batch_size=1000,
+ ),
+ ),
+ run_in_background(self.bulk_get_push_rules_enabled, user_ids),
+ ),
+ consumeErrors=True,
+ ).addErrback(unwrapFirstError)
)
- rows.sort(key=lambda row: (-int(row["priority_class"]), -int(row["priority"])))
-
- for row in rows:
- raw_rules.setdefault(row["user_name"], []).append(row)
+ # Sort by highest priority_class, then highest priority.
+ rows.sort(key=lambda row: (-int(row[2]), -int(row[3])))
- enabled_map_by_user = await self.bulk_get_push_rules_enabled(user_ids)
+ for user_name, rule_id, priority_class, _, conditions, actions in rows:
+ raw_rules.setdefault(user_name, []).append(
+ (rule_id, priority_class, conditions, actions)
+ )
results: Dict[str, FilteredPushRules] = {}
@@ -256,17 +307,19 @@ class PushRulesWorkerStore(
results: Dict[str, Dict[str, bool]] = {user_id: {} for user_id in user_ids}
- rows = await self.db_pool.simple_select_many_batch(
- table="push_rules_enable",
- column="user_name",
- iterable=user_ids,
- retcols=("user_name", "rule_id", "enabled"),
- desc="bulk_get_push_rules_enabled",
- batch_size=1000,
+ rows = cast(
+ List[Tuple[str, str, Optional[int]]],
+ await self.db_pool.simple_select_many_batch(
+ table="push_rules_enable",
+ column="user_name",
+ iterable=user_ids,
+ retcols=("user_name", "rule_id", "enabled"),
+ desc="bulk_get_push_rules_enabled",
+ batch_size=1000,
+ ),
)
- for row in rows:
- enabled = bool(row["enabled"])
- results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
+ for user_name, rule_id, enabled in rows:
+ results.setdefault(user_name, {})[rule_id] = bool(enabled)
return results
async def get_all_push_rule_updates(
|