summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py74
-rw-r--r--synapse/push/action_generator.py44
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py45
-rw-r--r--synapse/push/push_rule_evaluator.py70
4 files changed, 152 insertions, 81 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py

index a1b7711098..57c4d70466 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py
@@ -12,6 +12,80 @@ # See the License for the specific language governing permissions and # limitations under the License. +""" +This module implements the push rules & notifications portion of the Matrix +specification. + +There's a few related features: + +* Push notifications (i.e. email or outgoing requests to a Push Gateway). +* Calculation of unread notifications (for /sync and /notifications). + +When Synapse receives a new event (locally, via the Client-Server API, or via +federation), the following occurs: + +1. The push rules get evaluated to generate a set of per-user actions. +2. The event is persisted into the database. +3. (In the background) The notifier is notified about the new event. + +The per-user actions are initially stored in the event_push_actions_staging table, +before getting moved into the event_push_actions table when the event is persisted. +The event_push_actions table is periodically summarised into the event_push_summary +and event_push_summary_stream_ordering tables. + +Since push actions block an event from being persisted the generation of push +actions is performance sensitive. + +The general interaction of the classes are: + + +---------------------------------------------+ + | FederationEventHandler/EventCreationHandler | + +---------------------------------------------+ + | + v + +-----------------------+ +---------------------------+ + | BulkPushRuleEvaluator |---->| PushRuleEvaluatorForEvent | + +-----------------------+ +---------------------------+ + | + v + +-----------------------------+ + | EventPushActionsWorkerStore | + +-----------------------------+ + +The notifier notifies the pusher pool of the new event, which checks for affected +users. Each user-configured pusher of the affected users then performs the +previously calculated action. + +The general interaction of the classes are: + + +----------+ + | Notifier | + +----------+ + | + v + +------------+ +--------------+ + | PusherPool |---->| PusherConfig | + +------------+ +--------------+ + | + | +---------------+ + +<--->| PusherFactory | + | +---------------+ + v + +------------------------+ +-----------------------------------------------+ + | EmailPusher/HttpPusher |---->| EventPushActionsWorkerStore/PusherWorkerStore | + +------------------------+ +-----------------------------------------------+ + | + v + +-------------------------+ + | Mailer/SimpleHttpClient | + +-------------------------+ + +The Pusher instance also calls out to various utilities for generating payloads +(or email templates), but those interactions are not detailed in this diagram +(and are specific to the type of pusher). + +""" + import abc from typing import TYPE_CHECKING, Any, Dict, Optional diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py deleted file mode 100644
index 60758df016..0000000000 --- a/synapse/push/action_generator.py +++ /dev/null
@@ -1,44 +0,0 @@ -# Copyright 2015 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import logging -from typing import TYPE_CHECKING - -from synapse.events import EventBase -from synapse.events.snapshot import EventContext -from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator -from synapse.util.metrics import Measure - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - - -class ActionGenerator: - def __init__(self, hs: "HomeServer"): - self.clock = hs.get_clock() - self.bulk_evaluator = BulkPushRuleEvaluator(hs) - # really we want to get all user ids and all profile tags too, - # since we want the actions for each profile tag for every user and - # also actions for a client with no profile tag for each user. - # Currently the event stream doesn't support profile tags on an - # event stream, so we just run the rules for a client with no profile - # tag (ie. we just need all the users). - - async def handle_push_actions_for_event( - self, event: EventBase, context: EventContext - ) -> None: - with Measure(self.clock, "action_for_event_by_user"): - await self.bulk_evaluator.action_for_event_by_user(event, context) diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index b07cf2eee7..4ac2c546bf 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -21,7 +21,7 @@ from prometheus_client import Counter from synapse.api.constants import EventTypes, Membership, RelationTypes from synapse.event_auth import get_user_power_level -from synapse.events import EventBase +from synapse.events import EventBase, relation_from_event from synapse.events.snapshot import EventContext from synapse.state import POWER_KEY from synapse.storage.databases.main.roommember import EventIdMembership @@ -29,6 +29,7 @@ from synapse.util.async_helpers import Linearizer from synapse.util.caches import CacheMetric, register_cache from synapse.util.caches.descriptors import lru_cache from synapse.util.caches.lrucache import LruCache +from synapse.util.metrics import measure_func from .push_rule_evaluator import PushRuleEvaluatorForEvent @@ -77,8 +78,8 @@ def _should_count_as_unread(event: EventBase, context: EventContext) -> bool: return False # Exclude edits. - relates_to = event.content.get("m.relates_to", {}) - if relates_to.get("rel_type") == RelationTypes.REPLACE: + relates_to = relation_from_event(event) + if relates_to and relates_to.rel_type == RelationTypes.REPLACE: return False # Mark events that have a non-empty string body as unread. @@ -105,6 +106,7 @@ class BulkPushRuleEvaluator: def __init__(self, hs: "HomeServer"): self.hs = hs self.store = hs.get_datastores().main + self.clock = hs.get_clock() self._event_auth_handler = hs.get_event_auth_handler() # Used by `RulesForRoom` to ensure only one thing mutates the cache at a @@ -185,6 +187,7 @@ class BulkPushRuleEvaluator: return pl_event.content if pl_event else {}, sender_level + @measure_func("action_for_event_by_user") async def action_for_event_by_user( self, event: EventBase, context: EventContext ) -> None: @@ -192,6 +195,10 @@ class BulkPushRuleEvaluator: should increment the unread count, and insert the results into the event_push_actions_staging table. """ + if event.internal_metadata.is_outlier(): + # This can happen due to out of band memberships + return + count_as_unread = _should_count_as_unread(event, context) rules_by_user = await self._get_rules_for_event(event, context) @@ -208,8 +215,6 @@ class BulkPushRuleEvaluator: event, len(room_members), sender_power_level, power_levels ) - condition_cache: Dict[str, bool] = {} - # If the event is not a state event check if any users ignore the sender. if not event.is_state(): ignorers = await self.store.ignored_by(event.sender) @@ -247,8 +252,8 @@ class BulkPushRuleEvaluator: if "enabled" in rule and not rule["enabled"]: continue - matches = _condition_checker( - evaluator, rule["conditions"], uid, display_name, condition_cache + matches = evaluator.check_conditions( + rule["conditions"], uid, display_name ) if matches: actions = [x for x in rule["actions"] if x != "dont_notify"] @@ -267,32 +272,6 @@ class BulkPushRuleEvaluator: ) -def _condition_checker( - evaluator: PushRuleEvaluatorForEvent, - conditions: List[dict], - uid: str, - display_name: Optional[str], - cache: Dict[str, bool], -) -> bool: - for cond in conditions: - _cache_key = cond.get("_cache_key", None) - if _cache_key: - res = cache.get(_cache_key, None) - if res is False: - return False - elif res is True: - continue - - res = evaluator.matches(cond, uid, display_name) - if _cache_key: - cache[_cache_key] = bool(res) - - if not res: - return False - - return True - - MemberMap = Dict[str, Optional[EventIdMembership]] Rule = Dict[str, dict] RulesByUser = Dict[str, List[Rule]] diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index f617c759e6..54db6b5612 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py
@@ -129,9 +129,55 @@ class PushRuleEvaluatorForEvent: # Maps strings of e.g. 'content.body' -> event["content"]["body"] self._value_cache = _flatten_dict(event) + # Maps cache keys to final values. + self._condition_cache: Dict[str, bool] = {} + + def check_conditions( + self, conditions: List[dict], uid: str, display_name: Optional[str] + ) -> bool: + """ + Returns true if a user's conditions/user ID/display name match the event. + + Args: + conditions: The user's conditions to match. + uid: The user's MXID. + display_name: The display name. + + Returns: + True if all conditions match the event, False otherwise. + """ + for cond in conditions: + _cache_key = cond.get("_cache_key", None) + if _cache_key: + res = self._condition_cache.get(_cache_key, None) + if res is False: + return False + elif res is True: + continue + + res = self.matches(cond, uid, display_name) + if _cache_key: + self._condition_cache[_cache_key] = bool(res) + + if not res: + return False + + return True + def matches( self, condition: Dict[str, Any], user_id: str, display_name: Optional[str] ) -> bool: + """ + Returns true if a user's condition/user ID/display name match the event. + + Args: + condition: The user's condition to match. + uid: The user's MXID. + display_name: The display name, or None if there is not one. + + Returns: + True if the condition matches the event, False otherwise. + """ if condition["kind"] == "event_match": return self._event_match(condition, user_id) elif condition["kind"] == "contains_display_name": @@ -146,6 +192,16 @@ class PushRuleEvaluatorForEvent: return True def _event_match(self, condition: dict, user_id: str) -> bool: + """ + Check an "event_match" push rule condition. + + Args: + condition: The "event_match" push rule condition to match. + user_id: The user's MXID. + + Returns: + True if the condition matches the event, False otherwise. + """ pattern = condition.get("pattern", None) if not pattern: @@ -167,13 +223,22 @@ class PushRuleEvaluatorForEvent: return _glob_matches(pattern, body, word_boundary=True) else: - haystack = self._get_value(condition["key"]) + haystack = self._value_cache.get(condition["key"], None) if haystack is None: return False return _glob_matches(pattern, haystack) def _contains_display_name(self, display_name: Optional[str]) -> bool: + """ + Check an "event_match" push rule condition. + + Args: + display_name: The display name, or None if there is not one. + + Returns: + True if the display name is found in the event body, False otherwise. + """ if not display_name: return False @@ -191,9 +256,6 @@ class PushRuleEvaluatorForEvent: return bool(r.search(body)) - def _get_value(self, dotted_key: str) -> Optional[str]: - return self._value_cache.get(dotted_key, None) - # Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches regex_cache: LruCache[Tuple[str, bool, bool], Pattern] = LruCache(