summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorBen Banfield-Zanin <benbz@matrix.org>2020-12-16 14:49:53 +0000
committerBen Banfield-Zanin <benbz@matrix.org>2020-12-16 14:49:53 +0000
commit0825299cfcf61079f78b7a6c5e31f5df078c291a (patch)
tree5f469584845d065c79f1f6ed4781d0624e87f4d3 /synapse/push
parentMerge remote-tracking branch 'origin/release-v1.21.2' into bbz/info-mainline-... (diff)
parentAdd 'xmlsec1' to dependency list (diff)
downloadsynapse-github/bbz/info-mainline-1.24.0.tar.xz
Merge remote-tracking branch 'origin/release-v1.24.0' into bbz/info-mainline-1.24.0 github/bbz/info-mainline-1.24.0 bbz/info-mainline-1.24.0
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/baserules.py26
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py39
-rw-r--r--synapse/push/emailpusher.py8
-rw-r--r--synapse/push/httppusher.py21
-rw-r--r--synapse/push/mailer.py57
-rw-r--r--synapse/push/push_rule_evaluator.py20
-rw-r--r--synapse/push/push_tools.py16
-rw-r--r--synapse/push/pusherpool.py26
8 files changed, 158 insertions, 55 deletions
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 8047873ff1..f5788c1de7 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -37,7 +37,7 @@ def list_with_base_rules(rawrules, use_new_defaults=False):
     modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0}
 
     # Remove the modified base rules from the list, They'll be added back
-    # in the default postions in the list.
+    # in the default positions in the list.
     rawrules = [r for r in rawrules if r["priority_class"] >= 0]
 
     # shove the server default rules for each kind onto the end of each
@@ -498,6 +498,30 @@ BASE_APPEND_UNDERRIDE_RULES = [
         ],
         "actions": ["notify", {"set_tweak": "highlight", "value": False}],
     },
+    {
+        "rule_id": "global/underride/.im.vector.jitsi",
+        "conditions": [
+            {
+                "kind": "event_match",
+                "key": "type",
+                "pattern": "im.vector.modular.widgets",
+                "_id": "_type_modular_widgets",
+            },
+            {
+                "kind": "event_match",
+                "key": "content.type",
+                "pattern": "jitsi",
+                "_id": "_content_type_jitsi",
+            },
+            {
+                "kind": "event_match",
+                "key": "state_key",
+                "pattern": "*",
+                "_id": "_is_state_event",
+            },
+        ],
+        "actions": ["notify", {"set_tweak": "highlight", "value": False}],
+    },
 ]
 
 
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index c440f2545c..82a72dc34f 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -15,8 +15,8 @@
 # limitations under the License.
 
 import logging
-from collections import namedtuple
 
+import attr
 from prometheus_client import Counter
 
 from synapse.api.constants import EventTypes, Membership, RelationTypes
@@ -26,7 +26,8 @@ from synapse.events.snapshot import EventContext
 from synapse.state import POWER_KEY
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import register_cache
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import lru_cache
+from synapse.util.caches.lrucache import LruCache
 
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator:
             dict of user_id -> push_rules
         """
         room_id = event.room_id
-        rules_for_room = await self._get_rules_for_room(room_id)
+        rules_for_room = self._get_rules_for_room(room_id)
 
         rules_by_user = await rules_for_room.get_rules(event, context)
 
@@ -138,7 +139,7 @@ class BulkPushRuleEvaluator:
 
         return rules_by_user
 
-    @cached()
+    @lru_cache()
     def _get_rules_for_room(self, room_id):
         """Get the current RulesForRoom object for the given room id
 
@@ -275,12 +276,14 @@ class RulesForRoom:
     the entire cache for the room.
     """
 
-    def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
+    def __init__(
+        self, hs, room_id, rules_for_room_cache: LruCache, room_push_rule_cache_metrics
+    ):
         """
         Args:
             hs (HomeServer)
             room_id (str)
-            rules_for_room_cache(Cache): The cache object that caches these
+            rules_for_room_cache: The cache object that caches these
                 RoomsForUser objects.
             room_push_rule_cache_metrics (CacheMetric)
         """
@@ -390,12 +393,12 @@ class RulesForRoom:
                     continue
 
                 # If a user has left a room we remove their push rule. If they
-                # joined then we readd it later in _update_rules_with_member_event_ids
+                # joined then we re-add it later in _update_rules_with_member_event_ids
                 ret_rules_by_user.pop(user_id, None)
                 missing_member_event_ids[user_id] = event_id
 
             if missing_member_event_ids:
-                # If we have some memebr events we haven't seen, look them up
+                # If we have some member events we haven't seen, look them up
                 # and fetch push rules for them if appropriate.
                 logger.debug("Found new member events %r", missing_member_event_ids)
                 await self._update_rules_with_member_event_ids(
@@ -489,12 +492,20 @@ class RulesForRoom:
             self.state_group = state_group
 
 
-class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
-    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
-    # which namedtuple does for us (i.e. two _CacheContext are the same if
-    # their caches and keys match). This is important in particular to
-    # dedupe when we add callbacks to lru cache nodes, otherwise the number
-    # of callbacks would grow.
+@attr.attrs(slots=True, frozen=True)
+class _Invalidation:
+    # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules,
+    # which means that it it is stored on the bulk_get_push_rules cache entry. In order
+    # to ensure that we don't accumulate lots of redunant callbacks on the cache entry,
+    # we need to ensure that two _Invalidation objects are "equal" if they refer to the
+    # same `cache` and `room_id`.
+    #
+    # attrs provides suitable __hash__ and __eq__ methods, provided we remember to
+    # set `frozen=True`.
+
+    cache = attr.ib(type=LruCache)
+    room_id = attr.ib(type=str)
+
     def __call__(self):
         rules = self.cache.get(self.room_id, None, update_metrics=False)
         if rules:
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 28bd8ab748..c6763971ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,6 +18,7 @@ import logging
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
 
 logger = logging.getLogger(__name__)
 
@@ -91,7 +92,12 @@ class EmailPusher:
                 pass
             self.timed_call = None
 
-    def on_new_notifications(self, max_stream_ordering):
+    def on_new_notifications(self, max_token: RoomStreamToken):
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_ordering = max_token.stream
+
         if self.max_stream_ordering:
             self.max_stream_ordering = max(
                 max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 26706bf3e1..eff0975b6a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes
 from synapse.logging import opentracing
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.push import PusherConfigException
+from synapse.types import RoomStreamToken
 
 from . import push_rule_evaluator, push_tools
 
@@ -74,6 +75,7 @@ class HttpPusher:
         self.failing_since = pusherdict["failing_since"]
         self.timed_call = None
         self._is_processing = False
+        self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room
 
         # This is the highest stream ordering we know it's safe to process.
         # When new events arrive, we'll be given a window of new events: we
@@ -114,7 +116,12 @@ class HttpPusher:
         if should_check_for_notifs:
             self._start_processing()
 
-    def on_new_notifications(self, max_stream_ordering):
+    def on_new_notifications(self, max_token: RoomStreamToken):
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_ordering = max_token.stream
+
         self.max_stream_ordering = max(
             max_stream_ordering, self.max_stream_ordering or 0
         )
@@ -130,7 +137,11 @@ class HttpPusher:
     async def _update_badge(self):
         # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
         # to be largely redundant. perhaps we can remove it.
-        badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+        badge = await push_tools.get_badge_count(
+            self.hs.get_datastore(),
+            self.user_id,
+            group_by_room=self._group_unread_count_by_room,
+        )
         await self._send_badge(badge)
 
     def on_timer(self):
@@ -277,7 +288,11 @@ class HttpPusher:
             return True
 
         tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"])
-        badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+        badge = await push_tools.get_badge_count(
+            self.hs.get_datastore(),
+            self.user_id,
+            group_by_room=self._group_unread_count_by_room,
+        )
 
         event = await self.store.get_event(push_action["event_id"], allow_none=True)
         if event is None:
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 455a1acb46..38195c8eea 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -24,7 +24,7 @@ from typing import Iterable, List, TypeVar
 import bleach
 import jinja2
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import StoreError
 from synapse.config.emailconfig import EmailSubjectConfig
 from synapse.logging.context import make_deferred_yieldable
@@ -317,9 +317,14 @@ class Mailer:
     async def get_room_vars(
         self, room_id, user_id, notifs, notif_events, room_state_ids
     ):
-        my_member_event_id = room_state_ids[("m.room.member", user_id)]
-        my_member_event = await self.store.get_event(my_member_event_id)
-        is_invite = my_member_event.content["membership"] == "invite"
+        # Check if one of the notifs is an invite event for the user.
+        is_invite = False
+        for n in notifs:
+            ev = notif_events[n["event_id"]]
+            if ev.type == EventTypes.Member and ev.state_key == user_id:
+                if ev.content.get("membership") == Membership.INVITE:
+                    is_invite = True
+                    break
 
         room_name = await calculate_room_name(self.store, room_state_ids, user_id)
 
@@ -387,8 +392,8 @@ class Mailer:
         return ret
 
     async def get_message_vars(self, notif, event, room_state_ids):
-        if event.type != EventTypes.Message:
-            return
+        if event.type != EventTypes.Message and event.type != EventTypes.Encrypted:
+            return None
 
         sender_state_event_id = room_state_ids[("m.room.member", event.sender)]
         sender_state_event = await self.store.get_event(sender_state_event_id)
@@ -399,10 +404,8 @@ class Mailer:
         # sender_hash % the number of default images to choose from
         sender_hash = string_ordinal_total(event.sender)
 
-        msgtype = event.content.get("msgtype")
-
         ret = {
-            "msgtype": msgtype,
+            "event_type": event.type,
             "is_historical": event.event_id != notif["event_id"],
             "id": event.event_id,
             "ts": event.origin_server_ts,
@@ -411,6 +414,14 @@ class Mailer:
             "sender_hash": sender_hash,
         }
 
+        # Encrypted messages don't have any additional useful information.
+        if event.type == EventTypes.Encrypted:
+            return ret
+
+        msgtype = event.content.get("msgtype")
+
+        ret["msgtype"] = msgtype
+
         if msgtype == "m.text":
             self.add_text_message_vars(ret, event)
         elif msgtype == "m.image":
@@ -455,16 +466,26 @@ class Mailer:
                 self.store, room_state_ids[room_id], user_id, fallback_to_members=False
             )
 
-            my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)]
-            my_member_event = await self.store.get_event(my_member_event_id)
-            if my_member_event.content["membership"] == "invite":
-                inviter_member_event_id = room_state_ids[room_id][
-                    ("m.room.member", my_member_event.sender)
-                ]
-                inviter_member_event = await self.store.get_event(
-                    inviter_member_event_id
+            # See if one of the notifs is an invite event for the user
+            invite_event = None
+            for n in notifs_by_room[room_id]:
+                ev = notif_events[n["event_id"]]
+                if ev.type == EventTypes.Member and ev.state_key == user_id:
+                    if ev.content.get("membership") == Membership.INVITE:
+                        invite_event = ev
+                        break
+
+            if invite_event:
+                inviter_member_event_id = room_state_ids[room_id].get(
+                    ("m.room.member", invite_event.sender)
                 )
-                inviter_name = name_from_member_event(inviter_member_event)
+                inviter_name = invite_event.sender
+                if inviter_member_event_id:
+                    inviter_member_event = await self.store.get_event(
+                        inviter_member_event_id, allow_none=True
+                    )
+                    if inviter_member_event:
+                        inviter_name = name_from_member_event(inviter_member_event)
 
                 if room_name is None:
                     return self.email_subjects.invite_from_person % {
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 709ace01e5..2ce9e444ab 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -16,11 +16,10 @@
 
 import logging
 import re
-from typing import Any, Dict, List, Pattern, Union
+from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
 
 from synapse.events import EventBase
 from synapse.types import UserID
-from synapse.util.caches import register_cache
 from synapse.util.caches.lrucache import LruCache
 
 logger = logging.getLogger(__name__)
@@ -174,20 +173,21 @@ class PushRuleEvaluatorForEvent:
         # Similar to _glob_matches, but do not treat display_name as a glob.
         r = regex_cache.get((display_name, False, True), None)
         if not r:
-            r = re.escape(display_name)
-            r = _re_word_boundary(r)
-            r = re.compile(r, flags=re.IGNORECASE)
+            r1 = re.escape(display_name)
+            r1 = _re_word_boundary(r1)
+            r = re.compile(r1, flags=re.IGNORECASE)
             regex_cache[(display_name, False, True)] = r
 
-        return r.search(body)
+        return bool(r.search(body))
 
-    def _get_value(self, dotted_key: str) -> str:
+    def _get_value(self, dotted_key: str) -> Optional[str]:
         return self._value_cache.get(dotted_key, None)
 
 
 # Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
-regex_cache = LruCache(50000)
-register_cache("cache", "regex_push_cache", regex_cache)
+regex_cache = LruCache(
+    50000, "regex_push_cache"
+)  # type: LruCache[Tuple[str, bool, bool], Pattern]
 
 
 def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
@@ -205,7 +205,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
         if not r:
             r = _glob_to_re(glob, word_boundary)
             regex_cache[(glob, True, word_boundary)] = r
-        return r.search(value)
+        return bool(r.search(value))
     except re.error:
         logger.warning("Failed to parse glob to regex: %r", glob)
         return False
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index d0145666bf..6e7c880dc0 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -12,12 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 from synapse.storage import Storage
+from synapse.storage.databases.main import DataStore
 
 
-async def get_badge_count(store, user_id):
+async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
     invites = await store.get_invited_rooms_for_local_user(user_id)
     joins = await store.get_rooms_for_user(user_id)
 
@@ -34,9 +34,15 @@ async def get_badge_count(store, user_id):
                     room_id, user_id, last_unread_event_id
                 )
             )
-            # return one badge count per conversation, as count per
-            # message is so noisy as to be almost useless
-            badge += 1 if notifs["notify_count"] else 0
+            if notifs["notify_count"] == 0:
+                continue
+
+            if group_by_room:
+                # return one badge count per conversation
+                badge += 1
+            else:
+                # increment the badge count by the number of unread messages in the room
+                badge += notifs["notify_count"]
     return badge
 
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 76150e117b..f325964983 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,11 +19,15 @@ from typing import TYPE_CHECKING, Dict, Union
 
 from prometheus_client import Gauge
 
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.push import PusherConfigException
 from synapse.push.emailpusher import EmailPusher
 from synapse.push.httppusher import HttpPusher
 from synapse.push.pusher import PusherFactory
+from synapse.types import RoomStreamToken
 from synapse.util.async_helpers import concurrently_execute
 
 if TYPE_CHECKING:
@@ -186,15 +190,31 @@ class PusherPool:
                 )
                 await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    async def on_new_notifications(self, max_stream_id: int):
+    def on_new_notifications(self, max_token: RoomStreamToken):
         if not self.pushers:
             # nothing to do here.
             return
 
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_id = max_token.stream
+
         if max_stream_id < self._last_room_stream_id_seen:
             # Nothing to do
             return
 
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._on_new_notifications(max_token)
+
+    @wrap_as_background_process("on_new_notifications")
+    async def _on_new_notifications(self, max_token: RoomStreamToken):
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_id = max_token.stream
+
         prev_stream_id = self._last_room_stream_id_seen
         self._last_room_stream_id_seen = max_stream_id
 
@@ -214,7 +234,7 @@ class PusherPool:
 
                 if u in self.pushers:
                     for p in self.pushers[u].values():
-                        p.on_new_notifications(max_stream_id)
+                        p.on_new_notifications(max_token)
 
         except Exception:
             logger.exception("Exception in pusher on_new_notifications")