diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 8047873ff1..f5788c1de7 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -37,7 +37,7 @@ def list_with_base_rules(rawrules, use_new_defaults=False):
modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0}
# Remove the modified base rules from the list, They'll be added back
- # in the default postions in the list.
+ # in the default positions in the list.
rawrules = [r for r in rawrules if r["priority_class"] >= 0]
# shove the server default rules for each kind onto the end of each
@@ -498,6 +498,30 @@ BASE_APPEND_UNDERRIDE_RULES = [
],
"actions": ["notify", {"set_tweak": "highlight", "value": False}],
},
+ {
+ "rule_id": "global/underride/.im.vector.jitsi",
+ "conditions": [
+ {
+ "kind": "event_match",
+ "key": "type",
+ "pattern": "im.vector.modular.widgets",
+ "_id": "_type_modular_widgets",
+ },
+ {
+ "kind": "event_match",
+ "key": "content.type",
+ "pattern": "jitsi",
+ "_id": "_content_type_jitsi",
+ },
+ {
+ "kind": "event_match",
+ "key": "state_key",
+ "pattern": "*",
+ "_id": "_is_state_event",
+ },
+ ],
+ "actions": ["notify", {"set_tweak": "highlight", "value": False}],
+ },
]
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index c440f2545c..82a72dc34f 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -15,8 +15,8 @@
# limitations under the License.
import logging
-from collections import namedtuple
+import attr
from prometheus_client import Counter
from synapse.api.constants import EventTypes, Membership, RelationTypes
@@ -26,7 +26,8 @@ from synapse.events.snapshot import EventContext
from synapse.state import POWER_KEY
from synapse.util.async_helpers import Linearizer
from synapse.util.caches import register_cache
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import lru_cache
+from synapse.util.caches.lrucache import LruCache
from .push_rule_evaluator import PushRuleEvaluatorForEvent
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator:
dict of user_id -> push_rules
"""
room_id = event.room_id
- rules_for_room = await self._get_rules_for_room(room_id)
+ rules_for_room = self._get_rules_for_room(room_id)
rules_by_user = await rules_for_room.get_rules(event, context)
@@ -138,7 +139,7 @@ class BulkPushRuleEvaluator:
return rules_by_user
- @cached()
+ @lru_cache()
def _get_rules_for_room(self, room_id):
"""Get the current RulesForRoom object for the given room id
@@ -275,12 +276,14 @@ class RulesForRoom:
the entire cache for the room.
"""
- def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
+ def __init__(
+ self, hs, room_id, rules_for_room_cache: LruCache, room_push_rule_cache_metrics
+ ):
"""
Args:
hs (HomeServer)
room_id (str)
- rules_for_room_cache(Cache): The cache object that caches these
+ rules_for_room_cache: The cache object that caches these
RoomsForUser objects.
room_push_rule_cache_metrics (CacheMetric)
"""
@@ -390,12 +393,12 @@ class RulesForRoom:
continue
# If a user has left a room we remove their push rule. If they
- # joined then we readd it later in _update_rules_with_member_event_ids
+ # joined then we re-add it later in _update_rules_with_member_event_ids
ret_rules_by_user.pop(user_id, None)
missing_member_event_ids[user_id] = event_id
if missing_member_event_ids:
- # If we have some memebr events we haven't seen, look them up
+ # If we have some member events we haven't seen, look them up
# and fetch push rules for them if appropriate.
logger.debug("Found new member events %r", missing_member_event_ids)
await self._update_rules_with_member_event_ids(
@@ -489,12 +492,20 @@ class RulesForRoom:
self.state_group = state_group
-class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
- # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
- # which namedtuple does for us (i.e. two _CacheContext are the same if
- # their caches and keys match). This is important in particular to
- # dedupe when we add callbacks to lru cache nodes, otherwise the number
- # of callbacks would grow.
+@attr.attrs(slots=True, frozen=True)
+class _Invalidation:
+ # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules,
+ # which means that it it is stored on the bulk_get_push_rules cache entry. In order
+ # to ensure that we don't accumulate lots of redunant callbacks on the cache entry,
+ # we need to ensure that two _Invalidation objects are "equal" if they refer to the
+ # same `cache` and `room_id`.
+ #
+ # attrs provides suitable __hash__ and __eq__ methods, provided we remember to
+ # set `frozen=True`.
+
+ cache = attr.ib(type=LruCache)
+ room_id = attr.ib(type=str)
+
def __call__(self):
rules = self.cache.get(self.room_id, None, update_metrics=False)
if rules:
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 28bd8ab748..c6763971ee 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -18,6 +18,7 @@ import logging
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
logger = logging.getLogger(__name__)
@@ -91,7 +92,12 @@ class EmailPusher:
pass
self.timed_call = None
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_ordering = max_token.stream
+
if self.max_stream_ordering:
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 26706bf3e1..eff0975b6a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -23,6 +23,7 @@ from synapse.api.constants import EventTypes
from synapse.logging import opentracing
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.push import PusherConfigException
+from synapse.types import RoomStreamToken
from . import push_rule_evaluator, push_tools
@@ -74,6 +75,7 @@ class HttpPusher:
self.failing_since = pusherdict["failing_since"]
self.timed_call = None
self._is_processing = False
+ self._group_unread_count_by_room = hs.config.push_group_unread_count_by_room
# This is the highest stream ordering we know it's safe to process.
# When new events arrive, we'll be given a window of new events: we
@@ -114,7 +116,12 @@ class HttpPusher:
if should_check_for_notifs:
self._start_processing()
- def on_new_notifications(self, max_stream_ordering):
+ def on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_ordering = max_token.stream
+
self.max_stream_ordering = max(
max_stream_ordering, self.max_stream_ordering or 0
)
@@ -130,7 +137,11 @@ class HttpPusher:
async def _update_badge(self):
# XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
# to be largely redundant. perhaps we can remove it.
- badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+ badge = await push_tools.get_badge_count(
+ self.hs.get_datastore(),
+ self.user_id,
+ group_by_room=self._group_unread_count_by_room,
+ )
await self._send_badge(badge)
def on_timer(self):
@@ -277,7 +288,11 @@ class HttpPusher:
return True
tweaks = push_rule_evaluator.tweaks_for_actions(push_action["actions"])
- badge = await push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
+ badge = await push_tools.get_badge_count(
+ self.hs.get_datastore(),
+ self.user_id,
+ group_by_room=self._group_unread_count_by_room,
+ )
event = await self.store.get_event(push_action["event_id"], allow_none=True)
if event is None:
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 455a1acb46..38195c8eea 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -24,7 +24,7 @@ from typing import Iterable, List, TypeVar
import bleach
import jinja2
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
from synapse.api.errors import StoreError
from synapse.config.emailconfig import EmailSubjectConfig
from synapse.logging.context import make_deferred_yieldable
@@ -317,9 +317,14 @@ class Mailer:
async def get_room_vars(
self, room_id, user_id, notifs, notif_events, room_state_ids
):
- my_member_event_id = room_state_ids[("m.room.member", user_id)]
- my_member_event = await self.store.get_event(my_member_event_id)
- is_invite = my_member_event.content["membership"] == "invite"
+ # Check if one of the notifs is an invite event for the user.
+ is_invite = False
+ for n in notifs:
+ ev = notif_events[n["event_id"]]
+ if ev.type == EventTypes.Member and ev.state_key == user_id:
+ if ev.content.get("membership") == Membership.INVITE:
+ is_invite = True
+ break
room_name = await calculate_room_name(self.store, room_state_ids, user_id)
@@ -387,8 +392,8 @@ class Mailer:
return ret
async def get_message_vars(self, notif, event, room_state_ids):
- if event.type != EventTypes.Message:
- return
+ if event.type != EventTypes.Message and event.type != EventTypes.Encrypted:
+ return None
sender_state_event_id = room_state_ids[("m.room.member", event.sender)]
sender_state_event = await self.store.get_event(sender_state_event_id)
@@ -399,10 +404,8 @@ class Mailer:
# sender_hash % the number of default images to choose from
sender_hash = string_ordinal_total(event.sender)
- msgtype = event.content.get("msgtype")
-
ret = {
- "msgtype": msgtype,
+ "event_type": event.type,
"is_historical": event.event_id != notif["event_id"],
"id": event.event_id,
"ts": event.origin_server_ts,
@@ -411,6 +414,14 @@ class Mailer:
"sender_hash": sender_hash,
}
+ # Encrypted messages don't have any additional useful information.
+ if event.type == EventTypes.Encrypted:
+ return ret
+
+ msgtype = event.content.get("msgtype")
+
+ ret["msgtype"] = msgtype
+
if msgtype == "m.text":
self.add_text_message_vars(ret, event)
elif msgtype == "m.image":
@@ -455,16 +466,26 @@ class Mailer:
self.store, room_state_ids[room_id], user_id, fallback_to_members=False
)
- my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)]
- my_member_event = await self.store.get_event(my_member_event_id)
- if my_member_event.content["membership"] == "invite":
- inviter_member_event_id = room_state_ids[room_id][
- ("m.room.member", my_member_event.sender)
- ]
- inviter_member_event = await self.store.get_event(
- inviter_member_event_id
+ # See if one of the notifs is an invite event for the user
+ invite_event = None
+ for n in notifs_by_room[room_id]:
+ ev = notif_events[n["event_id"]]
+ if ev.type == EventTypes.Member and ev.state_key == user_id:
+ if ev.content.get("membership") == Membership.INVITE:
+ invite_event = ev
+ break
+
+ if invite_event:
+ inviter_member_event_id = room_state_ids[room_id].get(
+ ("m.room.member", invite_event.sender)
)
- inviter_name = name_from_member_event(inviter_member_event)
+ inviter_name = invite_event.sender
+ if inviter_member_event_id:
+ inviter_member_event = await self.store.get_event(
+ inviter_member_event_id, allow_none=True
+ )
+ if inviter_member_event:
+ inviter_name = name_from_member_event(inviter_member_event)
if room_name is None:
return self.email_subjects.invite_from_person % {
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 709ace01e5..2ce9e444ab 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -16,11 +16,10 @@
import logging
import re
-from typing import Any, Dict, List, Pattern, Union
+from typing import Any, Dict, List, Optional, Pattern, Tuple, Union
from synapse.events import EventBase
from synapse.types import UserID
-from synapse.util.caches import register_cache
from synapse.util.caches.lrucache import LruCache
logger = logging.getLogger(__name__)
@@ -174,20 +173,21 @@ class PushRuleEvaluatorForEvent:
# Similar to _glob_matches, but do not treat display_name as a glob.
r = regex_cache.get((display_name, False, True), None)
if not r:
- r = re.escape(display_name)
- r = _re_word_boundary(r)
- r = re.compile(r, flags=re.IGNORECASE)
+ r1 = re.escape(display_name)
+ r1 = _re_word_boundary(r1)
+ r = re.compile(r1, flags=re.IGNORECASE)
regex_cache[(display_name, False, True)] = r
- return r.search(body)
+ return bool(r.search(body))
- def _get_value(self, dotted_key: str) -> str:
+ def _get_value(self, dotted_key: str) -> Optional[str]:
return self._value_cache.get(dotted_key, None)
# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches
-regex_cache = LruCache(50000)
-register_cache("cache", "regex_push_cache", regex_cache)
+regex_cache = LruCache(
+ 50000, "regex_push_cache"
+) # type: LruCache[Tuple[str, bool, bool], Pattern]
def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
@@ -205,7 +205,7 @@ def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool:
if not r:
r = _glob_to_re(glob, word_boundary)
regex_cache[(glob, True, word_boundary)] = r
- return r.search(value)
+ return bool(r.search(value))
except re.error:
logger.warning("Failed to parse glob to regex: %r", glob)
return False
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index d0145666bf..6e7c880dc0 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -12,12 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
from synapse.push.presentable_names import calculate_room_name, name_from_member_event
from synapse.storage import Storage
+from synapse.storage.databases.main import DataStore
-async def get_badge_count(store, user_id):
+async def get_badge_count(store: DataStore, user_id: str, group_by_room: bool) -> int:
invites = await store.get_invited_rooms_for_local_user(user_id)
joins = await store.get_rooms_for_user(user_id)
@@ -34,9 +34,15 @@ async def get_badge_count(store, user_id):
room_id, user_id, last_unread_event_id
)
)
- # return one badge count per conversation, as count per
- # message is so noisy as to be almost useless
- badge += 1 if notifs["notify_count"] else 0
+ if notifs["notify_count"] == 0:
+ continue
+
+ if group_by_room:
+ # return one badge count per conversation
+ badge += 1
+ else:
+ # increment the badge count by the number of unread messages in the room
+ badge += notifs["notify_count"]
return badge
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 76150e117b..f325964983 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,11 +19,15 @@ from typing import TYPE_CHECKING, Dict, Union
from prometheus_client import Gauge
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+ run_as_background_process,
+ wrap_as_background_process,
+)
from synapse.push import PusherConfigException
from synapse.push.emailpusher import EmailPusher
from synapse.push.httppusher import HttpPusher
from synapse.push.pusher import PusherFactory
+from synapse.types import RoomStreamToken
from synapse.util.async_helpers import concurrently_execute
if TYPE_CHECKING:
@@ -186,15 +190,31 @@ class PusherPool:
)
await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
- async def on_new_notifications(self, max_stream_id: int):
+ def on_new_notifications(self, max_token: RoomStreamToken):
if not self.pushers:
# nothing to do here.
return
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_id = max_token.stream
+
if max_stream_id < self._last_room_stream_id_seen:
# Nothing to do
return
+ # We only start a new background process if necessary rather than
+ # optimistically (to cut down on overhead).
+ self._on_new_notifications(max_token)
+
+ @wrap_as_background_process("on_new_notifications")
+ async def _on_new_notifications(self, max_token: RoomStreamToken):
+ # We just use the minimum stream ordering and ignore the vector clock
+ # component. This is safe to do as long as we *always* ignore the vector
+ # clock components.
+ max_stream_id = max_token.stream
+
prev_stream_id = self._last_room_stream_id_seen
self._last_room_stream_id_seen = max_stream_id
@@ -214,7 +234,7 @@ class PusherPool:
if u in self.pushers:
for p in self.pushers[u].values():
- p.on_new_notifications(max_stream_id)
+ p.on_new_notifications(max_token)
except Exception:
logger.exception("Exception in pusher on_new_notifications")
|