diff options
author | Richard van der Hoff <richard@matrix.org> | 2021-02-17 16:31:57 +0000 |
---|---|---|
committer | Richard van der Hoff <richard@matrix.org> | 2021-02-17 16:31:57 +0000 |
commit | 7b7831bb6363a625c97446298838c66abfeb6b8b (patch) | |
tree | 39ec72cc7b8985858012f5d77fb89796fb04ff43 /synapse/push | |
parent | Ensure that we never stop reconnecting to redis (#9391) (diff) | |
parent | Reorganize CONTRIBUTING.md documentation. (#9281) (diff) | |
download | synapse-7b7831bb6363a625c97446298838c66abfeb6b8b.tar.xz |
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 9 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 26 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 23 | ||||
-rw-r--r-- | synapse/push/mailer.py | 213 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 9 |
5 files changed, 214 insertions, 66 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 9018f9e20b..c016a83909 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -144,8 +144,7 @@ class BulkPushRuleEvaluator: @lru_cache() def _get_rules_for_room(self, room_id: str) -> "RulesForRoom": - """Get the current RulesForRoom object for the given room id - """ + """Get the current RulesForRoom object for the given room id""" # It's important that RulesForRoom gets added to self._get_rules_for_room.cache # before any lookup methods get called on it as otherwise there may be # a race if invalidate_all gets called (which assumes its in the cache) @@ -252,7 +251,9 @@ class BulkPushRuleEvaluator: # notified for this event. (This will then get handled when we persist # the event) await self.store.add_push_actions_to_staging( - event.event_id, actions_by_user, count_as_unread, + event.event_id, + actions_by_user, + count_as_unread, ) @@ -524,7 +525,7 @@ class RulesForRoom: class _Invalidation: # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules, # which means that it it is stored on the bulk_get_push_rules cache entry. In order - # to ensure that we don't accumulate lots of redunant callbacks on the cache entry, + # to ensure that we don't accumulate lots of redundant callbacks on the cache entry, # we need to ensure that two _Invalidation objects are "equal" if they refer to the # same `cache` and `room_id`. # diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 4ac1b31748..5fec2aaf5d 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -116,8 +116,7 @@ class EmailPusher(Pusher): self._is_processing = True def _resume_processing(self) -> None: - """Used by tests to resume processing of events after pausing. - """ + """Used by tests to resume processing of events after pausing.""" assert self._is_processing self._is_processing = False self._start_processing() @@ -157,8 +156,10 @@ class EmailPusher(Pusher): being run. """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering - unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email( - self.user_id, start, self.max_stream_ordering + unprocessed = ( + await self.store.get_unread_push_actions_for_user_in_range_for_email( + self.user_id, start, self.max_stream_ordering + ) ) soonest_due_at = None # type: Optional[int] @@ -222,12 +223,14 @@ class EmailPusher(Pusher): self, last_stream_ordering: int ) -> None: self.last_stream_ordering = last_stream_ordering - pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.email, - self.user_id, - last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + await self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.email, + self.user_id, + last_stream_ordering, + self.clock.time_msec(), + ) ) if not pusher_still_exists: # The pusher has been deleted while we were processing, so @@ -298,7 +301,8 @@ class EmailPusher(Pusher): current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS ) self.throttle_params[room_id] = ThrottleParams( - self.clock.time_msec(), new_throttle_ms, + self.clock.time_msec(), + new_throttle_ms, ) assert self.pusher_id is not None await self.store.set_throttle_params( diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 9fa26fe9f8..ed911f106a 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -181,8 +181,10 @@ class HttpPusher(Pusher): Never call this directly: use _process which will only allow this to run once per pusher. """ - unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http( - self.user_id, self.last_stream_ordering, self.max_stream_ordering + unprocessed = ( + await self.store.get_unread_push_actions_for_user_in_range_for_http( + self.user_id, self.last_stream_ordering, self.max_stream_ordering + ) ) logger.info( @@ -209,12 +211,14 @@ class HttpPusher(Pusher): http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.pushkey, - self.user_id, - self.last_stream_ordering, - self.clock.time_msec(), + pusher_still_exists = ( + await self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering, + self.clock.time_msec(), + ) ) if not pusher_still_exists: # The pusher has been deleted while we were processing, so @@ -295,7 +299,8 @@ class HttpPusher(Pusher): # for sanity, we only remove the pushkey if it # was the one we actually sent... logger.warning( - ("Ignoring rejected pushkey %s because we didn't send it"), pk, + ("Ignoring rejected pushkey %s because we didn't send it"), + pk, ) else: logger.info("Pushkey %s was rejected: removing", pk) diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 8a6dcff30d..d10201b6b3 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -34,6 +34,7 @@ from synapse.push.presentable_names import ( descriptor_from_member_events, name_from_member_event, ) +from synapse.storage.state import StateFilter from synapse.types import StateMap, UserID from synapse.util.async_helpers import concurrently_execute from synapse.visibility import filter_events_for_client @@ -110,6 +111,7 @@ class Mailer: self.sendmail = self.hs.get_sendmail() self.store = self.hs.get_datastore() + self.state_store = self.hs.get_storage().state self.macaroon_gen = self.hs.get_macaroon_generator() self.state_handler = self.hs.get_state_handler() self.storage = hs.get_storage() @@ -217,7 +219,17 @@ class Mailer: push_actions: Iterable[Dict[str, Any]], reason: Dict[str, Any], ) -> None: - """Send email regarding a user's room notifications""" + """ + Send email regarding a user's room notifications + + Params: + app_id: The application receiving the notification. + user_id: The user receiving the notification. + email_address: The email address receiving the notification. + push_actions: All outstanding notifications. + reason: The notification that was ready and is the cause of an email + being sent. + """ rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions]) notif_events = await self.store.get_events( @@ -241,7 +253,7 @@ class Mailer: except StoreError: user_display_name = user_id - async def _fetch_room_state(room_id): + async def _fetch_room_state(room_id: str) -> None: room_state = await self.store.get_current_state_ids(room_id) state_by_room[room_id] = room_state @@ -255,7 +267,7 @@ class Mailer: rooms = [] for r in rooms_in_order: - roomvars = await self.get_room_vars( + roomvars = await self._get_room_vars( r, user_id, notifs_by_room[r], notif_events, state_by_room[r] ) rooms.append(roomvars) @@ -271,7 +283,7 @@ class Mailer: # Only one room has new stuff room_id = list(notifs_by_room.keys())[0] - summary_text = await self.make_summary_text_single_room( + summary_text = await self._make_summary_text_single_room( room_id, notifs_by_room[room_id], state_by_room[room_id], @@ -279,13 +291,13 @@ class Mailer: user_id, ) else: - summary_text = await self.make_summary_text( + summary_text = await self._make_summary_text( notifs_by_room, state_by_room, notif_events, reason ) template_vars = { "user_display_name": user_display_name, - "unsubscribe_link": self.make_unsubscribe_link( + "unsubscribe_link": self._make_unsubscribe_link( user_id, app_id, email_address ), "summary_text": summary_text, @@ -349,7 +361,7 @@ class Mailer: ) ) - async def get_room_vars( + async def _get_room_vars( self, room_id: str, user_id: str, @@ -357,6 +369,20 @@ class Mailer: notif_events: Dict[str, EventBase], room_state_ids: StateMap[str], ) -> Dict[str, Any]: + """ + Generate the variables for notifications on a per-room basis. + + Args: + room_id: The room ID + user_id: The user receiving the notification. + notifs: The outstanding push actions for this room. + notif_events: The events related to the above notifications. + room_state_ids: The event IDs of the current room state. + + Returns: + A dictionary to be added to the template context. + """ + # Check if one of the notifs is an invite event for the user. is_invite = False for n in notifs: @@ -373,12 +399,12 @@ class Mailer: "hash": string_ordinal_total(room_id), # See sender avatar hash "notifs": [], "invite": is_invite, - "link": self.make_room_link(room_id), + "link": self._make_room_link(room_id), } # type: Dict[str, Any] if not is_invite: for n in notifs: - notifvars = await self.get_notif_vars( + notifvars = await self._get_notif_vars( n, user_id, notif_events[n["event_id"]], room_state_ids ) @@ -405,13 +431,26 @@ class Mailer: return room_vars - async def get_notif_vars( + async def _get_notif_vars( self, notif: Dict[str, Any], user_id: str, notif_event: EventBase, room_state_ids: StateMap[str], ) -> Dict[str, Any]: + """ + Generate the variables for a single notification. + + Args: + notif: The outstanding notification for this room. + user_id: The user receiving the notification. + notif_event: The event related to the above notification. + room_state_ids: The event IDs of the current room state. + + Returns: + A dictionary to be added to the template context. + """ + results = await self.store.get_events_around( notif["room_id"], notif["event_id"], @@ -420,7 +459,7 @@ class Mailer: ) ret = { - "link": self.make_notif_link(notif), + "link": self._make_notif_link(notif), "ts": notif["received_ts"], "messages": [], } @@ -431,22 +470,51 @@ class Mailer: the_events.append(notif_event) for event in the_events: - messagevars = await self.get_message_vars(notif, event, room_state_ids) + messagevars = await self._get_message_vars(notif, event, room_state_ids) if messagevars is not None: ret["messages"].append(messagevars) return ret - async def get_message_vars( + async def _get_message_vars( self, notif: Dict[str, Any], event: EventBase, room_state_ids: StateMap[str] ) -> Optional[Dict[str, Any]]: + """ + Generate the variables for a single event, if possible. + + Args: + notif: The outstanding notification for this room. + event: The event under consideration. + room_state_ids: The event IDs of the current room state. + + Returns: + A dictionary to be added to the template context, or None if the + event cannot be processed. + """ if event.type != EventTypes.Message and event.type != EventTypes.Encrypted: return None - sender_state_event_id = room_state_ids[("m.room.member", event.sender)] - sender_state_event = await self.store.get_event(sender_state_event_id) - sender_name = name_from_member_event(sender_state_event) - sender_avatar_url = sender_state_event.content.get("avatar_url") + # Get the sender's name and avatar from the room state. + type_state_key = ("m.room.member", event.sender) + sender_state_event_id = room_state_ids.get(type_state_key) + if sender_state_event_id: + sender_state_event = await self.store.get_event( + sender_state_event_id + ) # type: Optional[EventBase] + else: + # Attempt to check the historical state for the room. + historical_state = await self.state_store.get_state_for_event( + event.event_id, StateFilter.from_types((type_state_key,)) + ) + sender_state_event = historical_state.get(type_state_key) + + if sender_state_event: + sender_name = name_from_member_event(sender_state_event) + sender_avatar_url = sender_state_event.content.get("avatar_url") + else: + # No state could be found, fallback to the MXID. + sender_name = event.sender + sender_avatar_url = None # 'hash' for deterministically picking default images: use # sender_hash % the number of default images to choose from @@ -471,18 +539,25 @@ class Mailer: ret["msgtype"] = msgtype if msgtype == "m.text": - self.add_text_message_vars(ret, event) + self._add_text_message_vars(ret, event) elif msgtype == "m.image": - self.add_image_message_vars(ret, event) + self._add_image_message_vars(ret, event) if "body" in event.content: ret["body_text_plain"] = event.content["body"] return ret - def add_text_message_vars( + def _add_text_message_vars( self, messagevars: Dict[str, Any], event: EventBase ) -> None: + """ + Potentially add a sanitised message body to the message variables. + + Args: + messagevars: The template context to be modified. + event: The event under consideration. + """ msgformat = event.content.get("format") messagevars["format"] = msgformat @@ -495,16 +570,20 @@ class Mailer: elif body: messagevars["body_text_html"] = safe_text(body) - def add_image_message_vars( + def _add_image_message_vars( self, messagevars: Dict[str, Any], event: EventBase ) -> None: """ Potentially add an image URL to the message variables. + + Args: + messagevars: The template context to be modified. + event: The event under consideration. """ if "url" in event.content: messagevars["image_url"] = event.content["url"] - async def make_summary_text_single_room( + async def _make_summary_text_single_room( self, room_id: str, notifs: List[Dict[str, Any]], @@ -517,7 +596,7 @@ class Mailer: Args: room_id: The ID of the room. - notifs: The notifications for this room. + notifs: The push actions for this room. room_state_ids: The state map for the room. notif_events: A map of event ID -> notification event. user_id: The user receiving the notification. @@ -600,11 +679,11 @@ class Mailer: "app": self.app_name, } - return await self.make_summary_text_from_member_events( + return await self._make_summary_text_from_member_events( room_id, notifs, room_state_ids, notif_events ) - async def make_summary_text( + async def _make_summary_text( self, notifs_by_room: Dict[str, List[Dict[str, Any]]], room_state_ids: Dict[str, StateMap[str]], @@ -615,7 +694,7 @@ class Mailer: Make a summary text for the email when multiple rooms have notifications. Args: - notifs_by_room: A map of room ID to the notifications for that room. + notifs_by_room: A map of room ID to the push actions for that room. room_state_ids: A map of room ID to the state map for that room. notif_events: A map of event ID -> notification event. reason: The reason this notification is being sent. @@ -632,11 +711,11 @@ class Mailer: } room_id = reason["room_id"] - return await self.make_summary_text_from_member_events( + return await self._make_summary_text_from_member_events( room_id, notifs_by_room[room_id], room_state_ids[room_id], notif_events ) - async def make_summary_text_from_member_events( + async def _make_summary_text_from_member_events( self, room_id: str, notifs: List[Dict[str, Any]], @@ -648,7 +727,7 @@ class Mailer: Args: room_id: The ID of the room. - notifs: The notifications for this room. + notifs: The push actions for this room. room_state_ids: The state map for the room. notif_events: A map of event ID -> notification event. @@ -657,14 +736,45 @@ class Mailer: """ # If the room doesn't have a name, say who the messages # are from explicitly to avoid, "messages in the Bob room" - sender_ids = {notif_events[n["event_id"]].sender for n in notifs} - member_events = await self.store.get_events( - [room_state_ids[("m.room.member", s)] for s in sender_ids] - ) + # Find the latest event ID for each sender, note that the notifications + # are already in descending received_ts. + sender_ids = {} + for n in notifs: + sender = notif_events[n["event_id"]].sender + if sender not in sender_ids: + sender_ids[sender] = n["event_id"] + + # Get the actual member events (in order to calculate a pretty name for + # the room). + member_event_ids = [] + member_events = {} + for sender_id, event_id in sender_ids.items(): + type_state_key = ("m.room.member", sender_id) + sender_state_event_id = room_state_ids.get(type_state_key) + if sender_state_event_id: + member_event_ids.append(sender_state_event_id) + else: + # Attempt to check the historical state for the room. + historical_state = await self.state_store.get_state_for_event( + event_id, StateFilter.from_types((type_state_key,)) + ) + sender_state_event = historical_state.get(type_state_key) + if sender_state_event: + member_events[event_id] = sender_state_event + member_events.update(await self.store.get_events(member_event_ids)) + + if not member_events: + # No member events were found! Maybe the room is empty? + # Fallback to the room ID (note that if there was a room name this + # would already have been used previously). + return self.email_subjects.messages_in_room % { + "room": room_id, + "app": self.app_name, + } # There was a single sender. - if len(sender_ids) == 1: + if len(member_events) == 1: return self.email_subjects.messages_from_person % { "person": descriptor_from_member_events(member_events.values()), "app": self.app_name, @@ -676,7 +786,16 @@ class Mailer: "app": self.app_name, } - def make_room_link(self, room_id: str) -> str: + def _make_room_link(self, room_id: str) -> str: + """ + Generate a link to open a room in the web client. + + Args: + room_id: The room ID to generate a link to. + + Returns: + A link to open a room in the web client. + """ if self.hs.config.email_riot_base_url: base_url = "%s/#/room" % (self.hs.config.email_riot_base_url) elif self.app_name == "Vector": @@ -686,7 +805,16 @@ class Mailer: base_url = "https://matrix.to/#" return "%s/%s" % (base_url, room_id) - def make_notif_link(self, notif: Dict[str, str]) -> str: + def _make_notif_link(self, notif: Dict[str, str]) -> str: + """ + Generate a link to open an event in the web client. + + Args: + notif: The notification to generate a link for. + + Returns: + A link to open the notification in the web client. + """ if self.hs.config.email_riot_base_url: return "%s/#/room/%s/%s" % ( self.hs.config.email_riot_base_url, @@ -702,9 +830,20 @@ class Mailer: else: return "https://matrix.to/#/%s/%s" % (notif["room_id"], notif["event_id"]) - def make_unsubscribe_link( + def _make_unsubscribe_link( self, user_id: str, app_id: str, email_address: str ) -> str: + """ + Generate a link to unsubscribe from email notifications. + + Args: + user_id: The user receiving the notification. + app_id: The application receiving the notification. + email_address: The email address receiving the notification. + + Returns: + A link to unsubscribe from email notifications. + """ params = { "access_token": self.macaroon_gen.generate_delete_pusher_token(user_id), "app_id": app_id, diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index eed16dbfb5..ae1145be0e 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -78,8 +78,7 @@ class PusherPool: self.pushers = {} # type: Dict[str, Dict[str, Pusher]] def start(self) -> None: - """Starts the pushers off in a background process. - """ + """Starts the pushers off in a background process.""" if not self._should_start_pushers: logger.info("Not starting pushers because they are disabled in the config") return @@ -297,8 +296,7 @@ class PusherPool: return pusher async def _start_pushers(self) -> None: - """Start all the pushers - """ + """Start all the pushers""" pushers = await self.store.get_all_pushers() # Stagger starting up the pushers so we don't completely drown the @@ -335,7 +333,8 @@ class PusherPool: return None except Exception: logger.exception( - "Couldn't start pusher id %i: caught Exception", pusher_config.id, + "Couldn't start pusher id %i: caught Exception", + pusher_config.id, ) return None |