diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index a1b7711098..57c4d70466 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -12,6 +12,80 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+"""
+This module implements the push rules & notifications portion of the Matrix
+specification.
+
+There's a few related features:
+
+* Push notifications (i.e. email or outgoing requests to a Push Gateway).
+* Calculation of unread notifications (for /sync and /notifications).
+
+When Synapse receives a new event (locally, via the Client-Server API, or via
+federation), the following occurs:
+
+1. The push rules get evaluated to generate a set of per-user actions.
+2. The event is persisted into the database.
+3. (In the background) The notifier is notified about the new event.
+
+The per-user actions are initially stored in the event_push_actions_staging table,
+before getting moved into the event_push_actions table when the event is persisted.
+The event_push_actions table is periodically summarised into the event_push_summary
+and event_push_summary_stream_ordering tables.
+
+Since push actions block an event from being persisted the generation of push
+actions is performance sensitive.
+
+The general interaction of the classes are:
+
+ +---------------------------------------------+
+ | FederationEventHandler/EventCreationHandler |
+ +---------------------------------------------+
+ |
+ v
+ +-----------------------+ +---------------------------+
+ | BulkPushRuleEvaluator |---->| PushRuleEvaluatorForEvent |
+ +-----------------------+ +---------------------------+
+ |
+ v
+ +-----------------------------+
+ | EventPushActionsWorkerStore |
+ +-----------------------------+
+
+The notifier notifies the pusher pool of the new event, which checks for affected
+users. Each user-configured pusher of the affected users then performs the
+previously calculated action.
+
+The general interaction of the classes are:
+
+ +----------+
+ | Notifier |
+ +----------+
+ |
+ v
+ +------------+ +--------------+
+ | PusherPool |---->| PusherConfig |
+ +------------+ +--------------+
+ |
+ | +---------------+
+ +<--->| PusherFactory |
+ | +---------------+
+ v
+ +------------------------+ +-----------------------------------------------+
+ | EmailPusher/HttpPusher |---->| EventPushActionsWorkerStore/PusherWorkerStore |
+ +------------------------+ +-----------------------------------------------+
+ |
+ v
+ +-------------------------+
+ | Mailer/SimpleHttpClient |
+ +-------------------------+
+
+The Pusher instance also calls out to various utilities for generating payloads
+(or email templates), but those interactions are not detailed in this diagram
+(and are specific to the type of pusher).
+
+"""
+
import abc
from typing import TYPE_CHECKING, Any, Dict, Optional
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
deleted file mode 100644
index 60758df016..0000000000
--- a/synapse/push/action_generator.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2015 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-import logging
-from typing import TYPE_CHECKING
-
-from synapse.events import EventBase
-from synapse.events.snapshot import EventContext
-from synapse.push.bulk_push_rule_evaluator import BulkPushRuleEvaluator
-from synapse.util.metrics import Measure
-
-if TYPE_CHECKING:
- from synapse.server import HomeServer
-
-logger = logging.getLogger(__name__)
-
-
-class ActionGenerator:
- def __init__(self, hs: "HomeServer"):
- self.clock = hs.get_clock()
- self.bulk_evaluator = BulkPushRuleEvaluator(hs)
- # really we want to get all user ids and all profile tags too,
- # since we want the actions for each profile tag for every user and
- # also actions for a client with no profile tag for each user.
- # Currently the event stream doesn't support profile tags on an
- # event stream, so we just run the rules for a client with no profile
- # tag (ie. we just need all the users).
-
- async def handle_push_actions_for_event(
- self, event: EventBase, context: EventContext
- ) -> None:
- with Measure(self.clock, "action_for_event_by_user"):
- await self.bulk_evaluator.action_for_event_by_user(event, context)
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index b07cf2eee7..4ac2c546bf 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -21,7 +21,7 @@ from prometheus_client import Counter
from synapse.api.constants import EventTypes, Membership, RelationTypes
from synapse.event_auth import get_user_power_level
-from synapse.events import EventBase
+from synapse.events import EventBase, relation_from_event
from synapse.events.snapshot import EventContext
from synapse.state import POWER_KEY
from synapse.storage.databases.main.roommember import EventIdMembership
@@ -29,6 +29,7 @@ from synapse.util.async_helpers import Linearizer
from synapse.util.caches import CacheMetric, register_cache
from synapse.util.caches.descriptors import lru_cache
from synapse.util.caches.lrucache import LruCache
+from synapse.util.metrics import measure_func
from .push_rule_evaluator import PushRuleEvaluatorForEvent
@@ -77,8 +78,8 @@ def _should_count_as_unread(event: EventBase, context: EventContext) -> bool:
return False
# Exclude edits.
- relates_to = event.content.get("m.relates_to", {})
- if relates_to.get("rel_type") == RelationTypes.REPLACE:
+ relates_to = relation_from_event(event)
+ if relates_to and relates_to.rel_type == RelationTypes.REPLACE:
return False
# Mark events that have a non-empty string body as unread.
@@ -105,6 +106,7 @@ class BulkPushRuleEvaluator:
def __init__(self, hs: "HomeServer"):
self.hs = hs
self.store = hs.get_datastores().main
+ self.clock = hs.get_clock()
self._event_auth_handler = hs.get_event_auth_handler()
# Used by `RulesForRoom` to ensure only one thing mutates the cache at a
@@ -185,6 +187,7 @@ class BulkPushRuleEvaluator:
return pl_event.content if pl_event else {}, sender_level
+ @measure_func("action_for_event_by_user")
async def action_for_event_by_user(
self, event: EventBase, context: EventContext
) -> None:
@@ -192,6 +195,10 @@ class BulkPushRuleEvaluator:
should increment the unread count, and insert the results into the
event_push_actions_staging table.
"""
+ if event.internal_metadata.is_outlier():
+ # This can happen due to out of band memberships
+ return
+
count_as_unread = _should_count_as_unread(event, context)
rules_by_user = await self._get_rules_for_event(event, context)
@@ -208,8 +215,6 @@ class BulkPushRuleEvaluator:
event, len(room_members), sender_power_level, power_levels
)
- condition_cache: Dict[str, bool] = {}
-
# If the event is not a state event check if any users ignore the sender.
if not event.is_state():
ignorers = await self.store.ignored_by(event.sender)
@@ -247,8 +252,8 @@ class BulkPushRuleEvaluator:
if "enabled" in rule and not rule["enabled"]:
continue
- matches = _condition_checker(
- evaluator, rule["conditions"], uid, display_name, condition_cache
+ matches = evaluator.check_conditions(
+ rule["conditions"], uid, display_name
)
if matches:
actions = [x for x in rule["actions"] if x != "dont_notify"]
@@ -267,32 +272,6 @@ class BulkPushRuleEvaluator:
)
-def _condition_checker(
- evaluator: PushRuleEvaluatorForEvent,
- conditions: List[dict],
- uid: str,
- display_name: Optional[str],
- cache: Dict[str, bool],
-) -> bool:
- for cond in conditions:
- _cache_key = cond.get("_cache_key", None)
- if _cache_key:
- res = cache.get(_cache_key, None)
- if res is False:
- return False
- elif res is True:
- continue
-
- res = evaluator.matches(cond, uid, display_name)
- if _cache_key:
- cache[_cache_key] = bool(res)
-
- if not res:
- return False
-
- return True
-
-
MemberMap = Dict[str, Optional[EventIdMembership]]
Rule = Dict[str, dict]
RulesByUser = Dict[str, List[Rule]]
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index f617c759e6..54db6b5612 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -129,9 +129,55 @@ class PushRuleEvaluatorForEvent:
# Maps strings of e.g. 'content.body' -> event["content"]["body"]
self._value_cache = _flatten_dict(event)
+ # Maps cache keys to final values.
+ self._condition_cache: Dict[str, bool] = {}
+
+ def check_conditions(
+ self, conditions: List[dict], uid: str, display_name: Optional[str]
+ ) -> bool:
+ """
+ Returns true if a user's conditions/user ID/display name match the event.
+
+ Args:
+ conditions: The user's conditions to match.
+ uid: The user's MXID.
+ display_name: The display name.
+
+ Returns:
+ True if all conditions match the event, False otherwise.
+ """
+ for cond in conditions:
+ _cache_key = cond.get("_cache_key", None)
+ if _cache_key:
+ res = self._condition_cache.get(_cache_key, None)
+ if res is False:
+ return False
+ elif res is True:
+ continue
+
+ res = self.matches(cond, uid, display_name)
+ if _cache_key:
+ self._condition_cache[_cache_key] = bool(res)
+
+ if not res:
+ return False
+
+ return True
+
def matches(
self, condition: Dict[str, Any], user_id: str, display_name: Optional[str]
) -> bool:
+ """
+ Returns true if a user's condition/user ID/display name match the event.
+
+ Args:
+ condition: The user's condition to match.
+ uid: The user's MXID.
+ display_name: The display name, or None if there is not one.
+
+ Returns:
+ True if the condition matches the event, False otherwise.
+ """
if condition["kind"] == "event_match":
return self._event_match(condition, user_id)
elif condition["kind"] == "contains_display_name":
@@ -146,6 +192,16 @@ class PushRuleEvaluatorForEvent:
return True
def _event_match(self, condition: dict, user_id: str) -> bool:
+ """
+ Check an "event_match" push rule condition.
+
+ Args:
+ condition: The "event_match" push rule condition to match.
+ user_id: The user's MXID.
+
+ Returns:
+ True if the condition matches the event, False otherwise.
+ """
pattern = condition.get("pattern", None)
if not pattern:
@@ -167,13 +223,22 @@ class PushRuleEvaluatorForEvent:
return _glob_matches(pattern, body, word_boundary=True)
else:
- haystack = self._get_value(condition["key"])
+ haystack = self._value_cache.get(condition["key"], None)
if haystack is None:
return False
return _glob_matches(pattern, haystack)
def _contains_display_name(self, display_name: Optional[str]) -> bool:
+ """
+ Check an "event_match" push rule condition.
+
+ Args:
+ display_name: The display name, or None if there is not one.
+
+ Returns:
+ True if the display name is found in the event body, False otherwise.
+ """
if not display_name:
return False
@@ -191,9 +256,6 @@ class PushRuleEvaluatorForEvent:
return bool(r.search(body))
- def _get_value(self, dotted_key: str) -> Optional[str]:
- return self._value_cache.get(dotted_key, None)
-
# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
regex_cache: LruCache[Tuple[str, bool, bool], Pattern] = LruCache(
|