summary refs log tree commit diff
path: root/synapse/push/push_rule_evaluator.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push/push_rule_evaluator.py')
-rw-r--r--synapse/push/push_rule_evaluator.py361
1 files changed, 0 insertions, 361 deletions
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
deleted file mode 100644
index f8176c5a42..0000000000
--- a/synapse/push/push_rule_evaluator.py
+++ /dev/null
@@ -1,361 +0,0 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2017 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-import re
-from typing import (
-    Any,
-    Dict,
-    List,
-    Mapping,
-    Optional,
-    Pattern,
-    Sequence,
-    Set,
-    Tuple,
-    Union,
-)
-
-from matrix_common.regex import glob_to_regex, to_word_pattern
-
-from synapse.events import EventBase
-from synapse.types import UserID
-from synapse.util.caches.lrucache import LruCache
-
-logger = logging.getLogger(__name__)
-
-
-GLOB_REGEX = re.compile(r"\\\[(\\\!|)(.*)\\\]")
-IS_GLOB = re.compile(r"[\?\*\[\]]")
-INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
-
-
-def _room_member_count(condition: Mapping[str, Any], room_member_count: int) -> bool:
-    return _test_ineq_condition(condition, room_member_count)
-
-
-def _sender_notification_permission(
-    condition: Mapping[str, Any],
-    sender_power_level: Optional[int],
-    power_levels: Dict[str, Union[int, Dict[str, int]]],
-) -> bool:
-    if sender_power_level is None:
-        return False
-
-    notif_level_key = condition.get("key")
-    if notif_level_key is None:
-        return False
-
-    notif_levels = power_levels.get("notifications", {})
-    assert isinstance(notif_levels, dict)
-    room_notif_level = notif_levels.get(notif_level_key, 50)
-
-    return sender_power_level >= room_notif_level
-
-
-def _test_ineq_condition(condition: Mapping[str, Any], number: int) -> bool:
-    if "is" not in condition:
-        return False
-    m = INEQUALITY_EXPR.match(condition["is"])
-    if not m:
-        return False
-    ineq = m.group(1)
-    rhs = m.group(2)
-    if not rhs.isdigit():
-        return False
-    rhs_int = int(rhs)
-
-    if ineq == "" or ineq == "==":
-        return number == rhs_int
-    elif ineq == "<":
-        return number < rhs_int
-    elif ineq == ">":
-        return number > rhs_int
-    elif ineq == ">=":
-        return number >= rhs_int
-    elif ineq == "<=":
-        return number <= rhs_int
-    else:
-        return False
-
-
-def tweaks_for_actions(actions: List[Union[str, Dict]]) -> Dict[str, Any]:
-    """
-    Converts a list of actions into a `tweaks` dict (which can then be passed to
-        the push gateway).
-
-    This function ignores all actions other than `set_tweak` actions, and treats
-    absent `value`s as `True`, which agrees with the only spec-defined treatment
-    of absent `value`s (namely, for `highlight` tweaks).
-
-    Args:
-        actions: list of actions
-            e.g. [
-                {"set_tweak": "a", "value": "AAA"},
-                {"set_tweak": "b", "value": "BBB"},
-                {"set_tweak": "highlight"},
-                "notify"
-            ]
-
-    Returns:
-        dictionary of tweaks for those actions
-            e.g. {"a": "AAA", "b": "BBB", "highlight": True}
-    """
-    tweaks = {}
-    for a in actions:
-        if not isinstance(a, dict):
-            continue
-        if "set_tweak" in a:
-            # value is allowed to be absent in which case the value assumed
-            # should be True.
-            tweaks[a["set_tweak"]] = a.get("value", True)
-    return tweaks
-
-
-class PushRuleEvaluatorForEvent:
-    def __init__(
-        self,
-        event: EventBase,
-        room_member_count: int,
-        sender_power_level: Optional[int],
-        power_levels: Dict[str, Union[int, Dict[str, int]]],
-        relations: Dict[str, Set[Tuple[str, str]]],
-        relations_match_enabled: bool,
-    ):
-        self._event = event
-        self._room_member_count = room_member_count
-        self._sender_power_level = sender_power_level
-        self._power_levels = power_levels
-        self._relations = relations
-        self._relations_match_enabled = relations_match_enabled
-
-        # Maps strings of e.g. 'content.body' -> event["content"]["body"]
-        self._value_cache = _flatten_dict(event)
-
-        # Maps cache keys to final values.
-        self._condition_cache: Dict[str, bool] = {}
-
-    def check_conditions(
-        self, conditions: Sequence[Mapping], uid: str, display_name: Optional[str]
-    ) -> bool:
-        """
-        Returns true if a user's conditions/user ID/display name match the event.
-
-        Args:
-            conditions: The user's conditions to match.
-            uid: The user's MXID.
-            display_name: The display name.
-
-        Returns:
-             True if all conditions match the event, False otherwise.
-        """
-        for cond in conditions:
-            _cache_key = cond.get("_cache_key", None)
-            if _cache_key:
-                res = self._condition_cache.get(_cache_key, None)
-                if res is False:
-                    return False
-                elif res is True:
-                    continue
-
-            res = self.matches(cond, uid, display_name)
-            if _cache_key:
-                self._condition_cache[_cache_key] = bool(res)
-
-            if not res:
-                return False
-
-        return True
-
-    def matches(
-        self, condition: Mapping[str, Any], user_id: str, display_name: Optional[str]
-    ) -> bool:
-        """
-        Returns true if a user's condition/user ID/display name match the event.
-
-        Args:
-            condition: The user's condition to match.
-            uid: The user's MXID.
-            display_name: The display name, or None if there is not one.
-
-        Returns:
-             True if the condition matches the event, False otherwise.
-        """
-        if condition["kind"] == "event_match":
-            return self._event_match(condition, user_id)
-        elif condition["kind"] == "contains_display_name":
-            return self._contains_display_name(display_name)
-        elif condition["kind"] == "room_member_count":
-            return _room_member_count(condition, self._room_member_count)
-        elif condition["kind"] == "sender_notification_permission":
-            return _sender_notification_permission(
-                condition, self._sender_power_level, self._power_levels
-            )
-        elif (
-            condition["kind"] == "org.matrix.msc3772.relation_match"
-            and self._relations_match_enabled
-        ):
-            return self._relation_match(condition, user_id)
-        else:
-            # XXX This looks incorrect -- we have reached an unknown condition
-            #     kind and are unconditionally returning that it matches. Note
-            #     that it seems possible to provide a condition to the /pushrules
-            #     endpoint with an unknown kind, see _rule_tuple_from_request_object.
-            return True
-
-    def _event_match(self, condition: Mapping, user_id: str) -> bool:
-        """
-        Check an "event_match" push rule condition.
-
-        Args:
-            condition: The "event_match" push rule condition to match.
-            user_id: The user's MXID.
-
-        Returns:
-             True if the condition matches the event, False otherwise.
-        """
-        pattern = condition.get("pattern", None)
-
-        if not pattern:
-            pattern_type = condition.get("pattern_type", None)
-            if pattern_type == "user_id":
-                pattern = user_id
-            elif pattern_type == "user_localpart":
-                pattern = UserID.from_string(user_id).localpart
-
-        if not pattern:
-            logger.warning("event_match condition with no pattern")
-            return False
-
-        # XXX: optimisation: cache our pattern regexps
-        if condition["key"] == "content.body":
-            body = self._event.content.get("body", None)
-            if not body or not isinstance(body, str):
-                return False
-
-            return _glob_matches(pattern, body, word_boundary=True)
-        else:
-            haystack = self._value_cache.get(condition["key"], None)
-            if haystack is None:
-                return False
-
-            return _glob_matches(pattern, haystack)
-
-    def _contains_display_name(self, display_name: Optional[str]) -> bool:
-        """
-        Check an "event_match" push rule condition.
-
-        Args:
-            display_name: The display name, or None if there is not one.
-
-        Returns:
-             True if the display name is found in the event body, False otherwise.
-        """
-        if not display_name:
-            return False
-
-        body = self._event.content.get("body", None)
-        if not body or not isinstance(body, str):
-            return False
-
-        # Similar to _glob_matches, but do not treat display_name as a glob.
-        r = regex_cache.get((display_name, False, True), None)
-        if not r:
-            r1 = re.escape(display_name)
-            r1 = to_word_pattern(r1)
-            r = re.compile(r1, flags=re.IGNORECASE)
-            regex_cache[(display_name, False, True)] = r
-
-        return bool(r.search(body))
-
-    def _relation_match(self, condition: Mapping, user_id: str) -> bool:
-        """
-        Check an "relation_match" push rule condition.
-
-        Args:
-            condition: The "event_match" push rule condition to match.
-            user_id: The user's MXID.
-
-        Returns:
-             True if the condition matches the event, False otherwise.
-        """
-        rel_type = condition.get("rel_type")
-        if not rel_type:
-            logger.warning("relation_match condition missing rel_type")
-            return False
-
-        sender_pattern = condition.get("sender")
-        if sender_pattern is None:
-            sender_type = condition.get("sender_type")
-            if sender_type == "user_id":
-                sender_pattern = user_id
-        type_pattern = condition.get("type")
-
-        # If any other relations matches, return True.
-        for sender, event_type in self._relations.get(rel_type, ()):
-            if sender_pattern and not _glob_matches(sender_pattern, sender):
-                continue
-            if type_pattern and not _glob_matches(type_pattern, event_type):
-                continue
-            # All values must have matched.
-            return True
-
-        # No relations matched.
-        return False
-
-
-# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
-regex_cache: LruCache[Tuple[str, bool, bool], Pattern] = LruCache(
-    50000, "regex_push_cache"
-)
-
-
-def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
-    """Tests if value matches glob.
-
-    Args:
-        glob
-        value: String to test against glob.
-        word_boundary: Whether to match against word boundaries or entire
-            string. Defaults to False.
-    """
-
-    try:
-        r = regex_cache.get((glob, True, word_boundary), None)
-        if not r:
-            r = glob_to_regex(glob, word_boundary=word_boundary)
-            regex_cache[(glob, True, word_boundary)] = r
-        return bool(r.search(value))
-    except re.error:
-        logger.warning("Failed to parse glob to regex: %r", glob)
-        return False
-
-
-def _flatten_dict(
-    d: Union[EventBase, Mapping[str, Any]],
-    prefix: Optional[List[str]] = None,
-    result: Optional[Dict[str, str]] = None,
-) -> Dict[str, str]:
-    if prefix is None:
-        prefix = []
-    if result is None:
-        result = {}
-    for key, value in d.items():
-        if isinstance(value, str):
-            result[".".join(prefix + [key])] = value.lower()
-        elif isinstance(value, Mapping):
-            _flatten_dict(value, prefix=(prefix + [key]), result=result)
-
-    return result