summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py31
-rw-r--r--synapse/push/emailpusher.py331
-rw-r--r--synapse/push/httppusher.py22
-rw-r--r--synapse/push/mailer.py1003
-rw-r--r--synapse/push/push_tools.py8
-rw-r--r--synapse/push/push_types.py4
-rw-r--r--synapse/push/pusher.py38
-rw-r--r--synapse/push/pusherpool.py7
8 files changed, 46 insertions, 1398 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py

index 34ab637c3d..8249d5e84f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -52,6 +52,7 @@ from synapse.events.snapshot import EventContext from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.state import POWER_KEY from synapse.storage.databases.main.roommember import EventIdMembership +from synapse.storage.invite_rule import InviteRule from synapse.storage.roommember import ProfileInfo from synapse.synapse_rust.push import FilteredPushRules, PushRuleEvaluator from synapse.types import JsonValue @@ -191,9 +192,17 @@ class BulkPushRuleEvaluator: # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited - if event.type == EventTypes.Member and event.membership == Membership.INVITE: + if ( + event.is_state() + and event.type == EventTypes.Member + and event.membership == Membership.INVITE + ): invited = event.state_key - if invited and self.hs.is_mine_id(invited) and invited not in local_users: + invite_config = await self.store.get_invite_config_for_user(invited) + if invite_config.get_invite_rule(event.sender) != InviteRule.ALLOW: + # Invite was blocked or ignored, never notify. + return {} + if self.hs.is_mine_id(invited) and invited not in local_users: local_users.append(invited) if not local_users: @@ -304,9 +313,9 @@ class BulkPushRuleEvaluator: if relation_type == "m.thread" and event.content.get( "m.relates_to", {} ).get("is_falling_back", False): - related_events["m.in_reply_to"][ - "im.vector.is_falling_back" - ] = "" + related_events["m.in_reply_to"]["im.vector.is_falling_back"] = ( + "" + ) return related_events @@ -371,8 +380,9 @@ class BulkPushRuleEvaluator: "Deferred[Tuple[int, Tuple[dict, Optional[int]], Dict[str, Dict[str, JsonValue]], Mapping[str, ProfileInfo]]]", gather_results( ( - run_in_background( # type: ignore[call-arg] - self.store.get_number_joined_users_in_room, event.room_id # type: ignore[arg-type] + run_in_background( # type: ignore[call-overload] + self.store.get_number_joined_users_in_room, + event.room_id, # type: ignore[arg-type] ), run_in_background( self._get_power_levels_and_sender_level, @@ -381,10 +391,10 @@ class BulkPushRuleEvaluator: event_id_to_event, ), run_in_background(self._related_events, event), - run_in_background( # type: ignore[call-arg] + run_in_background( # type: ignore[call-overload] self.store.get_subset_users_in_room_with_profiles, - event.room_id, # type: ignore[arg-type] - rules_by_user.keys(), # type: ignore[arg-type] + event.room_id, + rules_by_user.keys(), ), ), consumeErrors=True, @@ -435,6 +445,7 @@ class BulkPushRuleEvaluator: self._related_event_match_enabled, event.room_version.msc3931_push_features, self.hs.config.experimental.msc1767_enabled, # MSC3931 flag + self.hs.config.experimental.msc4210_enabled, ) for uid, rules in rules_by_user.items(): diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py deleted file mode 100644
index 0a14c534f7..0000000000 --- a/synapse/push/emailpusher.py +++ /dev/null
@@ -1,331 +0,0 @@ -# -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# <https://www.gnu.org/licenses/agpl-3.0.html>. -# -# Originally licensed under the Apache License, Version 2.0: -# <http://www.apache.org/licenses/LICENSE-2.0>. -# -# [This file includes modifications made by New Vector Limited] -# -# - -import logging -from typing import TYPE_CHECKING, Dict, List, Optional - -from twisted.internet.error import AlreadyCalled, AlreadyCancelled -from twisted.internet.interfaces import IDelayedCall - -from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.push import Pusher, PusherConfig, PusherConfigException, ThrottleParams -from synapse.push.mailer import Mailer -from synapse.push.push_types import EmailReason -from synapse.storage.databases.main.event_push_actions import EmailPushAction -from synapse.util.threepids import validate_email - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - -# THROTTLE is the minimum time between mail notifications sent for a given room. -# Each room maintains its own throttle counter, but each new mail notification -# sends the pending notifications for all rooms. -THROTTLE_MAX_MS = 24 * 60 * 60 * 1000 # 24h -# THROTTLE_MULTIPLIER = 6 # 10 mins, 1 hour, 6 hours, 24 hours -THROTTLE_MULTIPLIER = 144 # 10 mins, 24 hours - i.e. jump straight to 1 day - -# If no event triggers a notification for this long after the previous, -# the throttle is released. -# 12 hours - a gap of 12 hours in conversation is surely enough to merit a new -# notification when things get going again... -THROTTLE_RESET_AFTER_MS = 12 * 60 * 60 * 1000 - -# does each email include all unread notifs, or just the ones which have happened -# since the last mail? -# XXX: this is currently broken as it includes ones from parted rooms(!) -INCLUDE_ALL_UNREAD_NOTIFS = False - - -class EmailPusher(Pusher): - """ - A pusher that sends email notifications about events (approximately) - when they happen. - This shares quite a bit of code with httpusher: it would be good to - factor out the common parts - """ - - def __init__(self, hs: "HomeServer", pusher_config: PusherConfig, mailer: Mailer): - super().__init__(hs, pusher_config) - self.mailer = mailer - - self.store = self.hs.get_datastores().main - self.email = pusher_config.pushkey - self.timed_call: Optional[IDelayedCall] = None - self.throttle_params: Dict[str, ThrottleParams] = {} - self._inited = False - - self._is_processing = False - - # Make sure that the email is valid. - try: - validate_email(self.email) - except ValueError: - raise PusherConfigException("Invalid email") - - self._delay_before_mail_ms = self.hs.config.email.notif_delay_before_mail_ms - - def on_started(self, should_check_for_notifs: bool) -> None: - """Called when this pusher has been started. - - Args: - should_check_for_notifs: Whether we should immediately - check for push to send. Set to False only if it's known there - is nothing to send - """ - if should_check_for_notifs and self.mailer is not None: - self._start_processing() - - def on_stop(self) -> None: - if self.timed_call: - try: - self.timed_call.cancel() - except (AlreadyCalled, AlreadyCancelled): - pass - self.timed_call = None - - def on_new_receipts(self) -> None: - # We could wake up and cancel the timer but there tend to be quite a - # lot of read receipts so it's probably less work to just let the - # timer fire - pass - - def on_timer(self) -> None: - self.timed_call = None - self._start_processing() - - def _start_processing(self) -> None: - if self._is_processing: - return - - run_as_background_process("emailpush.process", self._process) - - def _pause_processing(self) -> None: - """Used by tests to temporarily pause processing of events. - - Asserts that its not currently processing. - """ - assert not self._is_processing - self._is_processing = True - - def _resume_processing(self) -> None: - """Used by tests to resume processing of events after pausing.""" - assert self._is_processing - self._is_processing = False - self._start_processing() - - async def _process(self) -> None: - # we should never get here if we are already processing - assert not self._is_processing - - try: - self._is_processing = True - - if not self._inited: - # this is our first loop: load up the throttle params - assert self.pusher_id is not None - self.throttle_params = await self.store.get_throttle_params_by_room( - self.pusher_id - ) - self._inited = True - - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - await self._unsafe_process() - except Exception: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self._is_processing = False - - async def _unsafe_process(self) -> None: - """ - Main logic of the push loop without the wrapper function that sets - up logging, measures and guards against multiple instances of it - being run. - """ - start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - unprocessed = ( - await self.store.get_unread_push_actions_for_user_in_range_for_email( - self.user_id, start, self.max_stream_ordering - ) - ) - - soonest_due_at: Optional[int] = None - - if not unprocessed: - await self.save_last_stream_ordering_and_success(self.max_stream_ordering) - return - - for push_action in unprocessed: - received_at = push_action.received_ts - if received_at is None: - received_at = 0 - notif_ready_at = received_at + self._delay_before_mail_ms - - room_ready_at = self.room_ready_to_notify_at(push_action.room_id) - - should_notify_at = max(notif_ready_at, room_ready_at) - - if should_notify_at <= self.clock.time_msec(): - # one of our notifications is ready for sending, so we send - # *one* email updating the user on their notifications, - # we then consider all previously outstanding notifications - # to be delivered. - - reason: EmailReason = { - "room_id": push_action.room_id, - "now": self.clock.time_msec(), - "received_at": received_at, - "delay_before_mail_ms": self._delay_before_mail_ms, - "last_sent_ts": self.get_room_last_sent_ts(push_action.room_id), - "throttle_ms": self.get_room_throttle_ms(push_action.room_id), - } - - await self.send_notification(unprocessed, reason) - - await self.save_last_stream_ordering_and_success( - max(ea.stream_ordering for ea in unprocessed) - ) - - # we update the throttle on all the possible unprocessed push actions - for ea in unprocessed: - await self.sent_notif_update_throttle(ea.room_id, ea) - break - else: - if soonest_due_at is None or should_notify_at < soonest_due_at: - soonest_due_at = should_notify_at - - if self.timed_call is not None: - try: - self.timed_call.cancel() - except (AlreadyCalled, AlreadyCancelled): - pass - self.timed_call = None - - if soonest_due_at is not None: - self.timed_call = self.hs.get_reactor().callLater( - self.seconds_until(soonest_due_at), self.on_timer - ) - - async def save_last_stream_ordering_and_success( - self, last_stream_ordering: int - ) -> None: - self.last_stream_ordering = last_stream_ordering - pusher_still_exists = ( - await self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.email, - self.user_id, - last_stream_ordering, - self.clock.time_msec(), - ) - ) - if not pusher_still_exists: - # The pusher has been deleted while we were processing, so - # lets just stop and return. - self.on_stop() - - def seconds_until(self, ts_msec: int) -> float: - secs = (ts_msec - self.clock.time_msec()) / 1000 - return max(secs, 0) - - def get_room_throttle_ms(self, room_id: str) -> int: - if room_id in self.throttle_params: - return self.throttle_params[room_id].throttle_ms - else: - return 0 - - def get_room_last_sent_ts(self, room_id: str) -> int: - if room_id in self.throttle_params: - return self.throttle_params[room_id].last_sent_ts - else: - return 0 - - def room_ready_to_notify_at(self, room_id: str) -> int: - """ - Determines whether throttling should prevent us from sending an email - for the given room - - Returns: - The timestamp when we are next allowed to send an email notif - for this room - """ - last_sent_ts = self.get_room_last_sent_ts(room_id) - throttle_ms = self.get_room_throttle_ms(room_id) - - may_send_at = last_sent_ts + throttle_ms - return may_send_at - - async def sent_notif_update_throttle( - self, room_id: str, notified_push_action: EmailPushAction - ) -> None: - # We have sent a notification, so update the throttle accordingly. - # If the event that triggered the notif happened more than - # THROTTLE_RESET_AFTER_MS after the previous one that triggered a - # notif, we release the throttle. Otherwise, the throttle is increased. - time_of_previous_notifs = await self.store.get_time_of_last_push_action_before( - notified_push_action.stream_ordering - ) - - time_of_this_notifs = notified_push_action.received_ts - - if time_of_previous_notifs is not None and time_of_this_notifs is not None: - gap = time_of_this_notifs - time_of_previous_notifs - else: - # if we don't know the arrival time of one of the notifs (it was not - # stored prior to email notification code) then assume a gap of - # zero which will just not reset the throttle - gap = 0 - - current_throttle_ms = self.get_room_throttle_ms(room_id) - - if gap > THROTTLE_RESET_AFTER_MS: - new_throttle_ms = self._delay_before_mail_ms - else: - if current_throttle_ms == 0: - new_throttle_ms = self._delay_before_mail_ms - else: - new_throttle_ms = min( - current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS - ) - self.throttle_params[room_id] = ThrottleParams( - self.clock.time_msec(), - new_throttle_ms, - ) - assert self.pusher_id is not None - await self.store.set_throttle_params( - self.pusher_id, room_id, self.throttle_params[room_id] - ) - - async def send_notification( - self, push_actions: List[EmailPushAction], reason: EmailReason - ) -> None: - logger.info("Sending notif email for user %r", self.user_id) - - await self.mailer.send_notification_mail( - self.app_id, self.user_id, self.email, push_actions, reason - ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index dd9b64d6ef..7df8a128c9 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py
@@ -127,6 +127,11 @@ class HttpPusher(Pusher): if self.data is None: raise PusherConfigException("'data' key can not be null for HTTP pusher") + # Check if badge counts should be disabled for this push gateway + self.disable_badge_count = self.hs.config.experimental.msc4076_enabled and bool( + self.data.get("org.matrix.msc4076.disable_badge_count", False) + ) + self.name = "%s/%s/%s" % ( pusher_config.user_name, pusher_config.app_id, @@ -200,6 +205,12 @@ class HttpPusher(Pusher): if self._is_processing: return + # Check if we are trying, but failing, to contact the pusher. If so, we + # don't try and start processing immediately and instead wait for the + # retry loop to try again later (which is controlled by the timer). + if self.failing_since and self.timed_call and self.timed_call.active(): + return + run_as_background_process("httppush.process", self._process) async def _process(self) -> None: @@ -461,9 +472,10 @@ class HttpPusher(Pusher): content: JsonDict = { "event_id": event.event_id, "room_id": event.room_id, - "counts": {"unread": badge}, "prio": priority, } + if not self.disable_badge_count: + content["counts"] = {"unread": badge} # event_id_only doesn't include the tweaks, so override them. tweaks = {} else: @@ -478,11 +490,11 @@ class HttpPusher(Pusher): "type": event.type, "sender": event.user_id, "prio": priority, - "counts": { - "unread": badge, - # 'missed_calls': 2 - }, } + if not self.disable_badge_count: + content["counts"] = { + "unread": badge, + } if event.type == "m.room.member" and event.is_state(): content["membership"] = event.content["membership"] content["user_is_target"] = event.state_key == self.user_id diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py deleted file mode 100644
index cf611bd90b..0000000000 --- a/synapse/push/mailer.py +++ /dev/null
@@ -1,1003 +0,0 @@ -# -# This file is licensed under the Affero General Public License (AGPL) version 3. -# -# Copyright 2016 OpenMarket Ltd -# Copyright (C) 2023 New Vector, Ltd -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the GNU Affero General Public License as -# published by the Free Software Foundation, either version 3 of the -# License, or (at your option) any later version. -# -# See the GNU Affero General Public License for more details: -# <https://www.gnu.org/licenses/agpl-3.0.html>. -# -# Originally licensed under the Apache License, Version 2.0: -# <http://www.apache.org/licenses/LICENSE-2.0>. -# -# [This file includes modifications made by New Vector Limited] -# -# - -import logging -import urllib.parse -from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, TypeVar - -import bleach -import jinja2 -from markupsafe import Markup -from prometheus_client import Counter - -from synapse.api.constants import EventContentFields, EventTypes, Membership, RoomTypes -from synapse.api.errors import StoreError -from synapse.config.emailconfig import EmailSubjectConfig -from synapse.events import EventBase -from synapse.push.presentable_names import ( - calculate_room_name, - descriptor_from_member_events, - name_from_member_event, -) -from synapse.push.push_types import ( - EmailReason, - MessageVars, - NotifVars, - RoomVars, - TemplateVars, -) -from synapse.storage.databases.main.event_push_actions import EmailPushAction -from synapse.types import StateMap, UserID -from synapse.types.state import StateFilter -from synapse.util.async_helpers import concurrently_execute -from synapse.visibility import filter_events_for_client - -if TYPE_CHECKING: - from synapse.server import HomeServer - -logger = logging.getLogger(__name__) - -T = TypeVar("T") - -emails_sent_counter = Counter( - "synapse_emails_sent_total", - "Emails sent by type", - ["type"], -) - - -CONTEXT_BEFORE = 1 -CONTEXT_AFTER = 1 - -# From https://github.com/matrix-org/matrix-react-sdk/blob/master/src/HtmlUtils.js -ALLOWED_TAGS = [ - "font", # custom to matrix for IRC-style font coloring - "del", # for markdown - # deliberately no h1/h2 to stop people shouting. - "h3", - "h4", - "h5", - "h6", - "blockquote", - "p", - "a", - "ul", - "ol", - "nl", - "li", - "b", - "i", - "u", - "strong", - "em", - "strike", - "code", - "hr", - "br", - "div", - "table", - "thead", - "caption", - "tbody", - "tr", - "th", - "td", - "pre", -] -ALLOWED_ATTRS = { - # custom ones first: - "font": ["color"], # custom to matrix - "a": ["href", "name", "target"], # remote target: custom to matrix - # We don't currently allow img itself by default, but this - # would make sense if we did - "img": ["src"], -} -# When bleach release a version with this option, we can specify schemes -# ALLOWED_SCHEMES = ["http", "https", "ftp", "mailto"] - - -class Mailer: - def __init__( - self, - hs: "HomeServer", - app_name: str, - template_html: jinja2.Template, - template_text: jinja2.Template, - ): - self.hs = hs - self.template_html = template_html - self.template_text = template_text - - self.send_email_handler = hs.get_send_email_handler() - self.store = self.hs.get_datastores().main - self._state_storage_controller = self.hs.get_storage_controllers().state - self.macaroon_gen = self.hs.get_macaroon_generator() - self.state_handler = self.hs.get_state_handler() - self._storage_controllers = hs.get_storage_controllers() - self.app_name = app_name - self.email_subjects: EmailSubjectConfig = hs.config.email.email_subjects - - logger.info("Created Mailer for app_name %s" % app_name) - - emails_sent_counter.labels("password_reset") - - async def send_password_reset_mail( - self, email_address: str, token: str, client_secret: str, sid: str - ) -> None: - """Send an email with a password reset link to a user - - Args: - email_address: Email address we're sending the password - reset to - token: Unique token generated by the server to verify - the email was received - client_secret: Unique token generated by the client to - group together multiple email sending attempts - sid: The generated session ID - """ - params = {"token": token, "client_secret": client_secret, "sid": sid} - link = ( - self.hs.config.server.public_baseurl - + "_synapse/client/password_reset/email/submit_token?%s" - % urllib.parse.urlencode(params) - ) - - template_vars: TemplateVars = {"link": link} - - emails_sent_counter.labels("password_reset").inc() - - await self.send_email( - email_address, - self.email_subjects.password_reset - % {"server_name": self.hs.config.server.server_name, "app": self.app_name}, - template_vars, - ) - - emails_sent_counter.labels("registration") - - async def send_registration_mail( - self, email_address: str, token: str, client_secret: str, sid: str - ) -> None: - """Send an email with a registration confirmation link to a user - - Args: - email_address: Email address we're sending the registration - link to - token: Unique token generated by the server to verify - the email was received - client_secret: Unique token generated by the client to - group together multiple email sending attempts - sid: The generated session ID - """ - params = {"token": token, "client_secret": client_secret, "sid": sid} - link = ( - self.hs.config.server.public_baseurl - + "_matrix/client/unstable/registration/email/submit_token?%s" - % urllib.parse.urlencode(params) - ) - - template_vars: TemplateVars = {"link": link} - - emails_sent_counter.labels("registration").inc() - - await self.send_email( - email_address, - self.email_subjects.email_validation - % {"server_name": self.hs.config.server.server_name, "app": self.app_name}, - template_vars, - ) - - emails_sent_counter.labels("already_in_use") - - async def send_already_in_use_mail(self, email_address: str) -> None: - """Send an email if the address is already bound to an user account - - Args: - email_address: Email address we're sending to the "already in use" mail - """ - - await self.send_email( - email_address, - self.email_subjects.email_already_in_use - % {"server_name": self.hs.config.server.server_name, "app": self.app_name}, - {}, - ) - - emails_sent_counter.labels("add_threepid") - - async def send_add_threepid_mail( - self, email_address: str, token: str, client_secret: str, sid: str - ) -> None: - """Send an email with a validation link to a user for adding a 3pid to their account - - Args: - email_address: Email address we're sending the validation link to - - token: Unique token generated by the server to verify the email was received - - client_secret: Unique token generated by the client to group together - multiple email sending attempts - - sid: The generated session ID - """ - params = {"token": token, "client_secret": client_secret, "sid": sid} - link = ( - self.hs.config.server.public_baseurl - + "_matrix/client/unstable/add_threepid/email/submit_token?%s" - % urllib.parse.urlencode(params) - ) - - template_vars: TemplateVars = {"link": link} - - emails_sent_counter.labels("add_threepid").inc() - - await self.send_email( - email_address, - self.email_subjects.email_validation - % {"server_name": self.hs.config.server.server_name, "app": self.app_name}, - template_vars, - ) - - emails_sent_counter.labels("notification") - - async def send_notification_mail( - self, - app_id: str, - user_id: str, - email_address: str, - push_actions: Iterable[EmailPushAction], - reason: EmailReason, - ) -> None: - """ - Send email regarding a user's room notifications - - Params: - app_id: The application receiving the notification. - user_id: The user receiving the notification. - email_address: The email address receiving the notification. - push_actions: All outstanding notifications. - reason: The notification that was ready and is the cause of an email - being sent. - """ - rooms_in_order = deduped_ordered_list([pa.room_id for pa in push_actions]) - - notif_events = await self.store.get_events([pa.event_id for pa in push_actions]) - - notifs_by_room: Dict[str, List[EmailPushAction]] = {} - for pa in push_actions: - notifs_by_room.setdefault(pa.room_id, []).append(pa) - - # collect the current state for all the rooms in which we have - # notifications - state_by_room = {} - - try: - user_display_name = await self.store.get_profile_displayname( - UserID.from_string(user_id) - ) - if user_display_name is None: - user_display_name = user_id - except StoreError: - user_display_name = user_id - - async def _fetch_room_state(room_id: str) -> None: - room_state = await self._state_storage_controller.get_current_state_ids( - room_id - ) - state_by_room[room_id] = room_state - - # Run at most 3 of these at once: sync does 10 at a time but email - # notifs are much less realtime than sync so we can afford to wait a bit. - await concurrently_execute(_fetch_room_state, rooms_in_order, 3) - - # actually sort our so-called rooms_in_order list, most recent room first - rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1].received_ts or 0)) - - rooms: List[RoomVars] = [] - - for r in rooms_in_order: - roomvars = await self._get_room_vars( - r, user_id, notifs_by_room[r], notif_events, state_by_room[r] - ) - rooms.append(roomvars) - - reason["room_name"] = await calculate_room_name( - self.store, - state_by_room[reason["room_id"]], - user_id, - fallback_to_members=True, - ) - - if len(notifs_by_room) == 1: - # Only one room has new stuff - room_id = list(notifs_by_room.keys())[0] - - summary_text = await self._make_summary_text_single_room( - room_id, - notifs_by_room[room_id], - state_by_room[room_id], - notif_events, - user_id, - ) - else: - summary_text = await self._make_summary_text( - notifs_by_room, state_by_room, notif_events, reason - ) - - unsubscribe_link = self._make_unsubscribe_link(user_id, app_id, email_address) - - template_vars: TemplateVars = { - "user_display_name": user_display_name, - "unsubscribe_link": unsubscribe_link, - "summary_text": summary_text, - "rooms": rooms, - "reason": reason, - } - - emails_sent_counter.labels("notification").inc() - - await self.send_email( - email_address, summary_text, template_vars, unsubscribe_link - ) - - async def send_email( - self, - email_address: str, - subject: str, - extra_template_vars: TemplateVars, - unsubscribe_link: Optional[str] = None, - ) -> None: - """Send an email with the given information and template text""" - template_vars: TemplateVars = { - "app_name": self.app_name, - "server_name": self.hs.config.server.server_name, - } - - template_vars.update(extra_template_vars) - - html_text = self.template_html.render(**template_vars) - plain_text = self.template_text.render(**template_vars) - - await self.send_email_handler.send_email( - email_address=email_address, - subject=subject, - app_name=self.app_name, - html=html_text, - text=plain_text, - # Include the List-Unsubscribe header which some clients render in the UI. - # Per RFC 2369, this can be a URL or mailto URL. See - # https://www.rfc-editor.org/rfc/rfc2369.html#section-3.2 - # - # It is preferred to use email, but Synapse doesn't support incoming email. - # - # Also include the List-Unsubscribe-Post header from RFC 8058. See - # https://www.rfc-editor.org/rfc/rfc8058.html#section-3.1 - # - # Note that many email clients will not render the unsubscribe link - # unless DKIM, etc. is properly setup. - additional_headers=( - { - "List-Unsubscribe-Post": "List-Unsubscribe=One-Click", - "List-Unsubscribe": f"<{unsubscribe_link}>", - } - if unsubscribe_link - else None - ), - ) - - async def _get_room_vars( - self, - room_id: str, - user_id: str, - notifs: Iterable[EmailPushAction], - notif_events: Dict[str, EventBase], - room_state_ids: StateMap[str], - ) -> RoomVars: - """ - Generate the variables for notifications on a per-room basis. - - Args: - room_id: The room ID - user_id: The user receiving the notification. - notifs: The outstanding push actions for this room. - notif_events: The events related to the above notifications. - room_state_ids: The event IDs of the current room state. - - Returns: - A dictionary to be added to the template context. - """ - - # Check if one of the notifs is an invite event for the user. - is_invite = False - for n in notifs: - ev = notif_events[n.event_id] - if ev.type == EventTypes.Member and ev.state_key == user_id: - if ev.content.get("membership") == Membership.INVITE: - is_invite = True - break - - room_name = await calculate_room_name(self.store, room_state_ids, user_id) - - room_vars: RoomVars = { - "title": room_name, - "hash": string_ordinal_total(room_id), # See sender avatar hash - "notifs": [], - "invite": is_invite, - "link": self._make_room_link(room_id), - "avatar_url": await self._get_room_avatar(room_state_ids), - } - - if not is_invite: - for n in notifs: - notifvars = await self._get_notif_vars( - n, user_id, notif_events[n.event_id], room_state_ids - ) - - # merge overlapping notifs together. - # relies on the notifs being in chronological order. - merge = False - if room_vars["notifs"] and "messages" in room_vars["notifs"][-1]: - prev_messages = room_vars["notifs"][-1]["messages"] - for message in notifvars["messages"]: - pm = list( - filter(lambda pm: pm["id"] == message["id"], prev_messages) - ) - if pm: - if not message["is_historical"]: - pm[0]["is_historical"] = False - merge = True - elif merge: - # we're merging, so append any remaining messages - # in this notif to the previous one - prev_messages.append(message) - - if not merge: - room_vars["notifs"].append(notifvars) - - return room_vars - - async def _get_room_avatar( - self, - room_state_ids: StateMap[str], - ) -> Optional[str]: - """ - Retrieve the avatar url for this room---if it exists. - - Args: - room_state_ids: The event IDs of the current room state. - - Returns: - room's avatar url if it's present and a string; otherwise None. - """ - event_id = room_state_ids.get((EventTypes.RoomAvatar, "")) - if event_id: - ev = await self.store.get_event(event_id) - url = ev.content.get("url") - if isinstance(url, str): - return url - return None - - async def _get_notif_vars( - self, - notif: EmailPushAction, - user_id: str, - notif_event: EventBase, - room_state_ids: StateMap[str], - ) -> NotifVars: - """ - Generate the variables for a single notification. - - Args: - notif: The outstanding notification for this room. - user_id: The user receiving the notification. - notif_event: The event related to the above notification. - room_state_ids: The event IDs of the current room state. - - Returns: - A dictionary to be added to the template context. - """ - - results = await self.store.get_events_around( - notif.room_id, - notif.event_id, - before_limit=CONTEXT_BEFORE, - after_limit=CONTEXT_AFTER, - ) - - ret: NotifVars = { - "link": self._make_notif_link(notif), - "ts": notif.received_ts, - "messages": [], - } - - the_events = await filter_events_for_client( - self._storage_controllers, - user_id, - results.events_before, - ) - the_events.append(notif_event) - - for event in the_events: - messagevars = await self._get_message_vars(notif, event, room_state_ids) - if messagevars is not None: - ret["messages"].append(messagevars) - - return ret - - async def _get_message_vars( - self, notif: EmailPushAction, event: EventBase, room_state_ids: StateMap[str] - ) -> Optional[MessageVars]: - """ - Generate the variables for a single event, if possible. - - Args: - notif: The outstanding notification for this room. - event: The event under consideration. - room_state_ids: The event IDs of the current room state. - - Returns: - A dictionary to be added to the template context, or None if the - event cannot be processed. - """ - if event.type != EventTypes.Message and event.type != EventTypes.Encrypted: - return None - - # Get the sender's name and avatar from the room state. - type_state_key = ("m.room.member", event.sender) - sender_state_event_id = room_state_ids.get(type_state_key) - if sender_state_event_id: - sender_state_event: Optional[EventBase] = await self.store.get_event( - sender_state_event_id - ) - else: - # Attempt to check the historical state for the room. - historical_state = await self._state_storage_controller.get_state_for_event( - event.event_id, StateFilter.from_types((type_state_key,)) - ) - sender_state_event = historical_state.get(type_state_key) - - if sender_state_event: - sender_name = name_from_member_event(sender_state_event) - sender_avatar_url: Optional[str] = sender_state_event.content.get( - "avatar_url" - ) - else: - # No state could be found, fallback to the MXID. - sender_name = event.sender - sender_avatar_url = None - - # 'hash' for deterministically picking default images: use - # sender_hash % the number of default images to choose from - sender_hash = string_ordinal_total(event.sender) - - ret: MessageVars = { - "event_type": event.type, - "is_historical": event.event_id != notif.event_id, - "id": event.event_id, - "ts": event.origin_server_ts, - "sender_name": sender_name, - "sender_avatar_url": sender_avatar_url, - "sender_hash": sender_hash, - } - - # Encrypted messages don't have any additional useful information. - if event.type == EventTypes.Encrypted: - return ret - - msgtype = event.content.get("msgtype") - if not isinstance(msgtype, str): - msgtype = None - - ret["msgtype"] = msgtype - - if msgtype == "m.text": - self._add_text_message_vars(ret, event) - elif msgtype == "m.image": - self._add_image_message_vars(ret, event) - - if "body" in event.content: - ret["body_text_plain"] = event.content["body"] - - return ret - - def _add_text_message_vars( - self, messagevars: MessageVars, event: EventBase - ) -> None: - """ - Potentially add a sanitised message body to the message variables. - - Args: - messagevars: The template context to be modified. - event: The event under consideration. - """ - msgformat = event.content.get("format") - if not isinstance(msgformat, str): - msgformat = None - - formatted_body = event.content.get("formatted_body") - body = event.content.get("body") - - if msgformat == "org.matrix.custom.html" and formatted_body: - messagevars["body_text_html"] = safe_markup(formatted_body) - elif body: - messagevars["body_text_html"] = safe_text(body) - - def _add_image_message_vars( - self, messagevars: MessageVars, event: EventBase - ) -> None: - """ - Potentially add an image URL to the message variables. - - Args: - messagevars: The template context to be modified. - event: The event under consideration. - """ - if "url" in event.content: - messagevars["image_url"] = event.content["url"] - - async def _make_summary_text_single_room( - self, - room_id: str, - notifs: List[EmailPushAction], - room_state_ids: StateMap[str], - notif_events: Dict[str, EventBase], - user_id: str, - ) -> str: - """ - Make a summary text for the email when only a single room has notifications. - - Args: - room_id: The ID of the room. - notifs: The push actions for this room. - room_state_ids: The state map for the room. - notif_events: A map of event ID -> notification event. - user_id: The user receiving the notification. - - Returns: - The summary text. - """ - # If the room has some kind of name, use it, but we don't - # want the generated-from-names one here otherwise we'll - # end up with, "new message from Bob in the Bob room" - room_name = await calculate_room_name( - self.store, room_state_ids, user_id, fallback_to_members=False - ) - - # See if one of the notifs is an invite event for the user - invite_event = None - for n in notifs: - ev = notif_events[n.event_id] - if ev.type == EventTypes.Member and ev.state_key == user_id: - if ev.content.get("membership") == Membership.INVITE: - invite_event = ev - break - - if invite_event: - inviter_member_event_id = room_state_ids.get( - ("m.room.member", invite_event.sender) - ) - inviter_name = invite_event.sender - if inviter_member_event_id: - inviter_member_event = await self.store.get_event( - inviter_member_event_id, allow_none=True - ) - if inviter_member_event: - inviter_name = name_from_member_event(inviter_member_event) - - if room_name is None: - return self.email_subjects.invite_from_person % { - "person": inviter_name, - "app": self.app_name, - } - - # If the room is a space, it gets a slightly different topic. - create_event_id = room_state_ids.get(("m.room.create", "")) - if create_event_id: - create_event = await self.store.get_event( - create_event_id, allow_none=True - ) - if ( - create_event - and create_event.content.get(EventContentFields.ROOM_TYPE) - == RoomTypes.SPACE - ): - return self.email_subjects.invite_from_person_to_space % { - "person": inviter_name, - "space": room_name, - "app": self.app_name, - } - - return self.email_subjects.invite_from_person_to_room % { - "person": inviter_name, - "room": room_name, - "app": self.app_name, - } - - if len(notifs) == 1: - # There is just the one notification, so give some detail - sender_name = None - event = notif_events[notifs[0].event_id] - if ("m.room.member", event.sender) in room_state_ids: - state_event_id = room_state_ids[("m.room.member", event.sender)] - state_event = await self.store.get_event(state_event_id) - sender_name = name_from_member_event(state_event) - - if sender_name is not None and room_name is not None: - return self.email_subjects.message_from_person_in_room % { - "person": sender_name, - "room": room_name, - "app": self.app_name, - } - elif sender_name is not None: - return self.email_subjects.message_from_person % { - "person": sender_name, - "app": self.app_name, - } - - # The sender is unknown, just use the room name (or ID). - return self.email_subjects.messages_in_room % { - "room": room_name or room_id, - "app": self.app_name, - } - else: - # There's more than one notification for this room, so just - # say there are several - if room_name is not None: - return self.email_subjects.messages_in_room % { - "room": room_name, - "app": self.app_name, - } - - return await self._make_summary_text_from_member_events( - room_id, notifs, room_state_ids, notif_events - ) - - async def _make_summary_text( - self, - notifs_by_room: Dict[str, List[EmailPushAction]], - room_state_ids: Dict[str, StateMap[str]], - notif_events: Dict[str, EventBase], - reason: EmailReason, - ) -> str: - """ - Make a summary text for the email when multiple rooms have notifications. - - Args: - notifs_by_room: A map of room ID to the push actions for that room. - room_state_ids: A map of room ID to the state map for that room. - notif_events: A map of event ID -> notification event. - reason: The reason this notification is being sent. - - Returns: - The summary text. - """ - # Stuff's happened in multiple different rooms - # ...but we still refer to the 'reason' room which triggered the mail - if reason["room_name"] is not None: - return self.email_subjects.messages_in_room_and_others % { - "room": reason["room_name"], - "app": self.app_name, - } - - room_id = reason["room_id"] - return await self._make_summary_text_from_member_events( - room_id, notifs_by_room[room_id], room_state_ids[room_id], notif_events - ) - - async def _make_summary_text_from_member_events( - self, - room_id: str, - notifs: List[EmailPushAction], - room_state_ids: StateMap[str], - notif_events: Dict[str, EventBase], - ) -> str: - """ - Make a summary text for the email when only a single room has notifications. - - Args: - room_id: The ID of the room. - notifs: The push actions for this room. - room_state_ids: The state map for the room. - notif_events: A map of event ID -> notification event. - - Returns: - The summary text. - """ - # If the room doesn't have a name, say who the messages - # are from explicitly to avoid, "messages in the Bob room" - - # Find the latest event ID for each sender, note that the notifications - # are already in descending received_ts. - sender_ids = {} - for n in notifs: - sender = notif_events[n.event_id].sender - if sender not in sender_ids: - sender_ids[sender] = n.event_id - - # Get the actual member events (in order to calculate a pretty name for - # the room). - member_event_ids = [] - member_events = {} - for sender_id, event_id in sender_ids.items(): - type_state_key = ("m.room.member", sender_id) - sender_state_event_id = room_state_ids.get(type_state_key) - if sender_state_event_id: - member_event_ids.append(sender_state_event_id) - else: - # Attempt to check the historical state for the room. - historical_state = ( - await self._state_storage_controller.get_state_for_event( - event_id, StateFilter.from_types((type_state_key,)) - ) - ) - sender_state_event = historical_state.get(type_state_key) - if sender_state_event: - member_events[event_id] = sender_state_event - member_events.update(await self.store.get_events(member_event_ids)) - - if not member_events: - # No member events were found! Maybe the room is empty? - # Fallback to the room ID (note that if there was a room name this - # would already have been used previously). - return self.email_subjects.messages_in_room % { - "room": room_id, - "app": self.app_name, - } - - # There was a single sender. - if len(member_events) == 1: - return self.email_subjects.messages_from_person % { - "person": descriptor_from_member_events(member_events.values()), - "app": self.app_name, - } - - # There was more than one sender, use the first one and a tweaked template. - return self.email_subjects.messages_from_person_and_others % { - "person": descriptor_from_member_events(list(member_events.values())[:1]), - "app": self.app_name, - } - - def _make_room_link(self, room_id: str) -> str: - """ - Generate a link to open a room in the web client. - - Args: - room_id: The room ID to generate a link to. - - Returns: - A link to open a room in the web client. - """ - if self.hs.config.email.email_riot_base_url: - base_url = "%s/#/room" % (self.hs.config.email.email_riot_base_url) - elif self.app_name == "Vector": - # need /beta for Universal Links to work on iOS - base_url = "https://vector.im/beta/#/room" - else: - base_url = "https://matrix.to/#" - return "%s/%s" % (base_url, room_id) - - def _make_notif_link(self, notif: EmailPushAction) -> str: - """ - Generate a link to open an event in the web client. - - Args: - notif: The notification to generate a link for. - - Returns: - A link to open the notification in the web client. - """ - if self.hs.config.email.email_riot_base_url: - return "%s/#/room/%s/%s" % ( - self.hs.config.email.email_riot_base_url, - notif.room_id, - notif.event_id, - ) - elif self.app_name == "Vector": - # need /beta for Universal Links to work on iOS - return "https://vector.im/beta/#/room/%s/%s" % ( - notif.room_id, - notif.event_id, - ) - else: - return "https://matrix.to/#/%s/%s" % (notif.room_id, notif.event_id) - - def _make_unsubscribe_link( - self, user_id: str, app_id: str, email_address: str - ) -> str: - """ - Generate a link to unsubscribe from email notifications. - - Args: - user_id: The user receiving the notification. - app_id: The application receiving the notification. - email_address: The email address receiving the notification. - - Returns: - A link to unsubscribe from email notifications. - """ - params = { - "access_token": self.macaroon_gen.generate_delete_pusher_token( - user_id, app_id, email_address - ), - "app_id": app_id, - "pushkey": email_address, - } - - return "%s_synapse/client/unsubscribe?%s" % ( - self.hs.config.server.public_baseurl, - urllib.parse.urlencode(params), - ) - - -def safe_markup(raw_html: str) -> Markup: - """ - Sanitise a raw HTML string to a set of allowed tags and attributes, and linkify any bare URLs. - - Args - raw_html: Unsafe HTML. - - Returns: - A Markup object ready to safely use in a Jinja template. - """ - return Markup( - bleach.linkify( - bleach.clean( - raw_html, - tags=ALLOWED_TAGS, - attributes=ALLOWED_ATTRS, - # bleach master has this, but it isn't released yet - # protocols=ALLOWED_SCHEMES, - strip=True, - ) - ) - ) - - -def safe_text(raw_text: str) -> Markup: - """ - Sanitise text (escape any HTML tags), and then linkify any bare URLs. - - Args - raw_text: Unsafe text which might include HTML markup. - - Returns: - A Markup object ready to safely use in a Jinja template. - """ - return Markup( - bleach.linkify(bleach.clean(raw_text, tags=[], attributes=[], strip=False)) - ) - - -def deduped_ordered_list(it: Iterable[T]) -> List[T]: - seen = set() - ret = [] - for item in it: - if item not in seen: - seen.add(item) - ret.append(item) - return ret - - -def string_ordinal_total(s: str) -> int: - tot = 0 - for c in s: - tot += ord(c) - return tot diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 1ef881f702..3f3e4a9234 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py
@@ -74,9 +74,13 @@ async def get_context_for_event( room_state = [] if ev.content.get("membership") == Membership.INVITE: - room_state = ev.unsigned.get("invite_room_state", []) + invite_room_state = ev.unsigned.get("invite_room_state", []) + if isinstance(invite_room_state, list): + room_state = invite_room_state elif ev.content.get("membership") == Membership.KNOCK: - room_state = ev.unsigned.get("knock_room_state", []) + knock_room_state = ev.unsigned.get("knock_room_state", []) + if isinstance(knock_room_state, list): + room_state = knock_room_state # Ideally we'd reuse the logic in `calculate_room_name`, but that gets # complicated to handle partial events vs pulling events from the DB. diff --git a/synapse/push/push_types.py b/synapse/push/push_types.py
index 201ec97219..57fa926a46 100644 --- a/synapse/push/push_types.py +++ b/synapse/push/push_types.py
@@ -18,9 +18,7 @@ # [This file includes modifications made by New Vector Limited] # # -from typing import List, Optional - -from typing_extensions import TypedDict +from typing import List, Optional, TypedDict class EmailReason(TypedDict, total=False): diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 9a5dd7a9d4..39bfe0dd33 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py
@@ -23,9 +23,7 @@ import logging from typing import TYPE_CHECKING, Callable, Dict, Optional from synapse.push import Pusher, PusherConfig -from synapse.push.emailpusher import EmailPusher from synapse.push.httppusher import HttpPusher -from synapse.push.mailer import Mailer if TYPE_CHECKING: from synapse.server import HomeServer @@ -42,17 +40,6 @@ class PusherFactory: "http": HttpPusher } - logger.info("email enable notifs: %r", hs.config.email.email_enable_notifs) - if hs.config.email.email_enable_notifs: - self.mailers: Dict[str, Mailer] = {} - - self._notif_template_html = hs.config.email.email_notif_template_html - self._notif_template_text = hs.config.email.email_notif_template_text - - self.pusher_types["email"] = self._create_email_pusher - - logger.info("defined email pusher type") - def create_pusher(self, pusher_config: PusherConfig) -> Optional[Pusher]: kind = pusher_config.kind f = self.pusher_types.get(kind, None) @@ -60,28 +47,3 @@ class PusherFactory: return None logger.debug("creating %s pusher for %r", kind, pusher_config) return f(self.hs, pusher_config) - - def _create_email_pusher( - self, _hs: "HomeServer", pusher_config: PusherConfig - ) -> EmailPusher: - app_name = self._app_name_from_pusherdict(pusher_config) - mailer = self.mailers.get(app_name) - if not mailer: - mailer = Mailer( - hs=self.hs, - app_name=app_name, - template_html=self._notif_template_html, - template_text=self._notif_template_text, - ) - self.mailers[app_name] = mailer - return EmailPusher(self.hs, pusher_config, mailer) - - def _app_name_from_pusherdict(self, pusher_config: PusherConfig) -> str: - data = pusher_config.data - - if isinstance(data, dict): - brand = data.get("brand") - if isinstance(brand, str): - return brand - - return self.config.email.email_app_name diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0a7541b4c7..bf80ac97a1 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py
@@ -34,7 +34,6 @@ from synapse.push.pusher import PusherFactory from synapse.replication.http.push import ReplicationRemovePusherRestServlet from synapse.types import JsonDict, RoomStreamToken, StrCollection from synapse.util.async_helpers import concurrently_execute -from synapse.util.threepids import canonicalise_email if TYPE_CHECKING: from synapse.server import HomeServer @@ -122,11 +121,7 @@ class PusherPool: """ if kind == "email": - email_owner = await self.store.get_user_id_by_threepid( - "email", canonicalise_email(pushkey) - ) - if email_owner != user_id: - raise SynapseError(400, "Email not found", Codes.THREEPID_NOT_FOUND) + raise SynapseError(400, "Threepids are not supported on this server", "M_UNSUPPORTED") time_now_msec = self.clock.time_msec()