summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2021-02-17 16:31:57 +0000
committerRichard van der Hoff <richard@matrix.org>2021-02-17 16:31:57 +0000
commit7b7831bb6363a625c97446298838c66abfeb6b8b (patch)
tree39ec72cc7b8985858012f5d77fb89796fb04ff43 /synapse/push
parentEnsure that we never stop reconnecting to redis (#9391) (diff)
parentReorganize CONTRIBUTING.md documentation. (#9281) (diff)
downloadsynapse-7b7831bb6363a625c97446298838c66abfeb6b8b.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py9
-rw-r--r--synapse/push/emailpusher.py26
-rw-r--r--synapse/push/httppusher.py23
-rw-r--r--synapse/push/mailer.py213
-rw-r--r--synapse/push/pusherpool.py9
5 files changed, 214 insertions, 66 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 9018f9e20b..c016a83909 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -144,8 +144,7 @@ class BulkPushRuleEvaluator:
 
     @lru_cache()
     def _get_rules_for_room(self, room_id: str) -> "RulesForRoom":
-        """Get the current RulesForRoom object for the given room id
-        """
+        """Get the current RulesForRoom object for the given room id"""
         # It's important that RulesForRoom gets added to self._get_rules_for_room.cache
         # before any lookup methods get called on it as otherwise there may be
         # a race if invalidate_all gets called (which assumes its in the cache)
@@ -252,7 +251,9 @@ class BulkPushRuleEvaluator:
         # notified for this event. (This will then get handled when we persist
         # the event)
         await self.store.add_push_actions_to_staging(
-            event.event_id, actions_by_user, count_as_unread,
+            event.event_id,
+            actions_by_user,
+            count_as_unread,
         )
 
 
@@ -524,7 +525,7 @@ class RulesForRoom:
 class _Invalidation:
     # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules,
     # which means that it it is stored on the bulk_get_push_rules cache entry. In order
-    # to ensure that we don't accumulate lots of redunant callbacks on the cache entry,
+    # to ensure that we don't accumulate lots of redundant callbacks on the cache entry,
     # we need to ensure that two _Invalidation objects are "equal" if they refer to the
     # same `cache` and `room_id`.
     #
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index 4ac1b31748..5fec2aaf5d 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -116,8 +116,7 @@ class EmailPusher(Pusher):
         self._is_processing = True
 
     def _resume_processing(self) -> None:
-        """Used by tests to resume processing of events after pausing.
-        """
+        """Used by tests to resume processing of events after pausing."""
         assert self._is_processing
         self._is_processing = False
         self._start_processing()
@@ -157,8 +156,10 @@ class EmailPusher(Pusher):
         being run.
         """
         start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering
-        unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_email(
-            self.user_id, start, self.max_stream_ordering
+        unprocessed = (
+            await self.store.get_unread_push_actions_for_user_in_range_for_email(
+                self.user_id, start, self.max_stream_ordering
+            )
         )
 
         soonest_due_at = None  # type: Optional[int]
@@ -222,12 +223,14 @@ class EmailPusher(Pusher):
         self, last_stream_ordering: int
     ) -> None:
         self.last_stream_ordering = last_stream_ordering
-        pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
-            self.app_id,
-            self.email,
-            self.user_id,
-            last_stream_ordering,
-            self.clock.time_msec(),
+        pusher_still_exists = (
+            await self.store.update_pusher_last_stream_ordering_and_success(
+                self.app_id,
+                self.email,
+                self.user_id,
+                last_stream_ordering,
+                self.clock.time_msec(),
+            )
         )
         if not pusher_still_exists:
             # The pusher has been deleted while we were processing, so
@@ -298,7 +301,8 @@ class EmailPusher(Pusher):
                     current_throttle_ms * THROTTLE_MULTIPLIER, THROTTLE_MAX_MS
                 )
         self.throttle_params[room_id] = ThrottleParams(
-            self.clock.time_msec(), new_throttle_ms,
+            self.clock.time_msec(),
+            new_throttle_ms,
         )
         assert self.pusher_id is not None
         await self.store.set_throttle_params(
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 9fa26fe9f8..ed911f106a 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -181,8 +181,10 @@ class HttpPusher(Pusher):
         Never call this directly: use _process which will only allow this to
         run once per pusher.
         """
-        unprocessed = await self.store.get_unread_push_actions_for_user_in_range_for_http(
-            self.user_id, self.last_stream_ordering, self.max_stream_ordering
+        unprocessed = (
+            await self.store.get_unread_push_actions_for_user_in_range_for_http(
+                self.user_id, self.last_stream_ordering, self.max_stream_ordering
+            )
         )
 
         logger.info(
@@ -209,12 +211,14 @@ class HttpPusher(Pusher):
                 http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action["stream_ordering"]
-                pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success(
-                    self.app_id,
-                    self.pushkey,
-                    self.user_id,
-                    self.last_stream_ordering,
-                    self.clock.time_msec(),
+                pusher_still_exists = (
+                    await self.store.update_pusher_last_stream_ordering_and_success(
+                        self.app_id,
+                        self.pushkey,
+                        self.user_id,
+                        self.last_stream_ordering,
+                        self.clock.time_msec(),
+                    )
                 )
                 if not pusher_still_exists:
                     # The pusher has been deleted while we were processing, so
@@ -295,7 +299,8 @@ class HttpPusher(Pusher):
                     # for sanity, we only remove the pushkey if it
                     # was the one we actually sent...
                     logger.warning(
-                        ("Ignoring rejected pushkey %s because we didn't send it"), pk,
+                        ("Ignoring rejected pushkey %s because we didn't send it"),
+                        pk,
                     )
                 else:
                     logger.info("Pushkey %s was rejected: removing", pk)
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 8a6dcff30d..d10201b6b3 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -34,6 +34,7 @@ from synapse.push.presentable_names import (
     descriptor_from_member_events,
     name_from_member_event,
 )
+from synapse.storage.state import StateFilter
 from synapse.types import StateMap, UserID
 from synapse.util.async_helpers import concurrently_execute
 from synapse.visibility import filter_events_for_client
@@ -110,6 +111,7 @@ class Mailer:
 
         self.sendmail = self.hs.get_sendmail()
         self.store = self.hs.get_datastore()
+        self.state_store = self.hs.get_storage().state
         self.macaroon_gen = self.hs.get_macaroon_generator()
         self.state_handler = self.hs.get_state_handler()
         self.storage = hs.get_storage()
@@ -217,7 +219,17 @@ class Mailer:
         push_actions: Iterable[Dict[str, Any]],
         reason: Dict[str, Any],
     ) -> None:
-        """Send email regarding a user's room notifications"""
+        """
+        Send email regarding a user's room notifications
+
+        Params:
+            app_id: The application receiving the notification.
+            user_id: The user receiving the notification.
+            email_address: The email address receiving the notification.
+            push_actions: All outstanding notifications.
+            reason: The notification that was ready and is the cause of an email
+                being sent.
+        """
         rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions])
 
         notif_events = await self.store.get_events(
@@ -241,7 +253,7 @@ class Mailer:
         except StoreError:
             user_display_name = user_id
 
-        async def _fetch_room_state(room_id):
+        async def _fetch_room_state(room_id: str) -> None:
             room_state = await self.store.get_current_state_ids(room_id)
             state_by_room[room_id] = room_state
 
@@ -255,7 +267,7 @@ class Mailer:
         rooms = []
 
         for r in rooms_in_order:
-            roomvars = await self.get_room_vars(
+            roomvars = await self._get_room_vars(
                 r, user_id, notifs_by_room[r], notif_events, state_by_room[r]
             )
             rooms.append(roomvars)
@@ -271,7 +283,7 @@ class Mailer:
             # Only one room has new stuff
             room_id = list(notifs_by_room.keys())[0]
 
-            summary_text = await self.make_summary_text_single_room(
+            summary_text = await self._make_summary_text_single_room(
                 room_id,
                 notifs_by_room[room_id],
                 state_by_room[room_id],
@@ -279,13 +291,13 @@ class Mailer:
                 user_id,
             )
         else:
-            summary_text = await self.make_summary_text(
+            summary_text = await self._make_summary_text(
                 notifs_by_room, state_by_room, notif_events, reason
             )
 
         template_vars = {
             "user_display_name": user_display_name,
-            "unsubscribe_link": self.make_unsubscribe_link(
+            "unsubscribe_link": self._make_unsubscribe_link(
                 user_id, app_id, email_address
             ),
             "summary_text": summary_text,
@@ -349,7 +361,7 @@ class Mailer:
             )
         )
 
-    async def get_room_vars(
+    async def _get_room_vars(
         self,
         room_id: str,
         user_id: str,
@@ -357,6 +369,20 @@ class Mailer:
         notif_events: Dict[str, EventBase],
         room_state_ids: StateMap[str],
     ) -> Dict[str, Any]:
+        """
+        Generate the variables for notifications on a per-room basis.
+
+        Args:
+            room_id: The room ID
+            user_id: The user receiving the notification.
+            notifs: The outstanding push actions for this room.
+            notif_events: The events related to the above notifications.
+            room_state_ids: The event IDs of the current room state.
+
+        Returns:
+             A dictionary to be added to the template context.
+        """
+
         # Check if one of the notifs is an invite event for the user.
         is_invite = False
         for n in notifs:
@@ -373,12 +399,12 @@ class Mailer:
             "hash": string_ordinal_total(room_id),  # See sender avatar hash
             "notifs": [],
             "invite": is_invite,
-            "link": self.make_room_link(room_id),
+            "link": self._make_room_link(room_id),
         }  # type: Dict[str, Any]
 
         if not is_invite:
             for n in notifs:
-                notifvars = await self.get_notif_vars(
+                notifvars = await self._get_notif_vars(
                     n, user_id, notif_events[n["event_id"]], room_state_ids
                 )
 
@@ -405,13 +431,26 @@ class Mailer:
 
         return room_vars
 
-    async def get_notif_vars(
+    async def _get_notif_vars(
         self,
         notif: Dict[str, Any],
         user_id: str,
         notif_event: EventBase,
         room_state_ids: StateMap[str],
     ) -> Dict[str, Any]:
+        """
+        Generate the variables for a single notification.
+
+        Args:
+            notif: The outstanding notification for this room.
+            user_id: The user receiving the notification.
+            notif_event: The event related to the above notification.
+            room_state_ids: The event IDs of the current room state.
+
+        Returns:
+             A dictionary to be added to the template context.
+        """
+
         results = await self.store.get_events_around(
             notif["room_id"],
             notif["event_id"],
@@ -420,7 +459,7 @@ class Mailer:
         )
 
         ret = {
-            "link": self.make_notif_link(notif),
+            "link": self._make_notif_link(notif),
             "ts": notif["received_ts"],
             "messages": [],
         }
@@ -431,22 +470,51 @@ class Mailer:
         the_events.append(notif_event)
 
         for event in the_events:
-            messagevars = await self.get_message_vars(notif, event, room_state_ids)
+            messagevars = await self._get_message_vars(notif, event, room_state_ids)
             if messagevars is not None:
                 ret["messages"].append(messagevars)
 
         return ret
 
-    async def get_message_vars(
+    async def _get_message_vars(
         self, notif: Dict[str, Any], event: EventBase, room_state_ids: StateMap[str]
     ) -> Optional[Dict[str, Any]]:
+        """
+        Generate the variables for a single event, if possible.
+
+        Args:
+            notif: The outstanding notification for this room.
+            event: The event under consideration.
+            room_state_ids: The event IDs of the current room state.
+
+        Returns:
+             A dictionary to be added to the template context, or None if the
+             event cannot be processed.
+        """
         if event.type != EventTypes.Message and event.type != EventTypes.Encrypted:
             return None
 
-        sender_state_event_id = room_state_ids[("m.room.member", event.sender)]
-        sender_state_event = await self.store.get_event(sender_state_event_id)
-        sender_name = name_from_member_event(sender_state_event)
-        sender_avatar_url = sender_state_event.content.get("avatar_url")
+        # Get the sender's name and avatar from the room state.
+        type_state_key = ("m.room.member", event.sender)
+        sender_state_event_id = room_state_ids.get(type_state_key)
+        if sender_state_event_id:
+            sender_state_event = await self.store.get_event(
+                sender_state_event_id
+            )  # type: Optional[EventBase]
+        else:
+            # Attempt to check the historical state for the room.
+            historical_state = await self.state_store.get_state_for_event(
+                event.event_id, StateFilter.from_types((type_state_key,))
+            )
+            sender_state_event = historical_state.get(type_state_key)
+
+        if sender_state_event:
+            sender_name = name_from_member_event(sender_state_event)
+            sender_avatar_url = sender_state_event.content.get("avatar_url")
+        else:
+            # No state could be found, fallback to the MXID.
+            sender_name = event.sender
+            sender_avatar_url = None
 
         # 'hash' for deterministically picking default images: use
         # sender_hash % the number of default images to choose from
@@ -471,18 +539,25 @@ class Mailer:
         ret["msgtype"] = msgtype
 
         if msgtype == "m.text":
-            self.add_text_message_vars(ret, event)
+            self._add_text_message_vars(ret, event)
         elif msgtype == "m.image":
-            self.add_image_message_vars(ret, event)
+            self._add_image_message_vars(ret, event)
 
         if "body" in event.content:
             ret["body_text_plain"] = event.content["body"]
 
         return ret
 
-    def add_text_message_vars(
+    def _add_text_message_vars(
         self, messagevars: Dict[str, Any], event: EventBase
     ) -> None:
+        """
+        Potentially add a sanitised message body to the message variables.
+
+        Args:
+            messagevars: The template context to be modified.
+            event: The event under consideration.
+        """
         msgformat = event.content.get("format")
 
         messagevars["format"] = msgformat
@@ -495,16 +570,20 @@ class Mailer:
         elif body:
             messagevars["body_text_html"] = safe_text(body)
 
-    def add_image_message_vars(
+    def _add_image_message_vars(
         self, messagevars: Dict[str, Any], event: EventBase
     ) -> None:
         """
         Potentially add an image URL to the message variables.
+
+        Args:
+            messagevars: The template context to be modified.
+            event: The event under consideration.
         """
         if "url" in event.content:
             messagevars["image_url"] = event.content["url"]
 
-    async def make_summary_text_single_room(
+    async def _make_summary_text_single_room(
         self,
         room_id: str,
         notifs: List[Dict[str, Any]],
@@ -517,7 +596,7 @@ class Mailer:
 
         Args:
             room_id: The ID of the room.
-            notifs: The notifications for this room.
+            notifs: The push actions for this room.
             room_state_ids: The state map for the room.
             notif_events: A map of event ID -> notification event.
             user_id: The user receiving the notification.
@@ -600,11 +679,11 @@ class Mailer:
                     "app": self.app_name,
                 }
 
-            return await self.make_summary_text_from_member_events(
+            return await self._make_summary_text_from_member_events(
                 room_id, notifs, room_state_ids, notif_events
             )
 
-    async def make_summary_text(
+    async def _make_summary_text(
         self,
         notifs_by_room: Dict[str, List[Dict[str, Any]]],
         room_state_ids: Dict[str, StateMap[str]],
@@ -615,7 +694,7 @@ class Mailer:
         Make a summary text for the email when multiple rooms have notifications.
 
         Args:
-            notifs_by_room: A map of room ID to the notifications for that room.
+            notifs_by_room: A map of room ID to the push actions for that room.
             room_state_ids: A map of room ID to the state map for that room.
             notif_events: A map of event ID -> notification event.
             reason: The reason this notification is being sent.
@@ -632,11 +711,11 @@ class Mailer:
             }
 
         room_id = reason["room_id"]
-        return await self.make_summary_text_from_member_events(
+        return await self._make_summary_text_from_member_events(
             room_id, notifs_by_room[room_id], room_state_ids[room_id], notif_events
         )
 
-    async def make_summary_text_from_member_events(
+    async def _make_summary_text_from_member_events(
         self,
         room_id: str,
         notifs: List[Dict[str, Any]],
@@ -648,7 +727,7 @@ class Mailer:
 
         Args:
             room_id: The ID of the room.
-            notifs: The notifications for this room.
+            notifs: The push actions for this room.
             room_state_ids: The state map for the room.
             notif_events: A map of event ID -> notification event.
 
@@ -657,14 +736,45 @@ class Mailer:
         """
         # If the room doesn't have a name, say who the messages
         # are from explicitly to avoid, "messages in the Bob room"
-        sender_ids = {notif_events[n["event_id"]].sender for n in notifs}
 
-        member_events = await self.store.get_events(
-            [room_state_ids[("m.room.member", s)] for s in sender_ids]
-        )
+        # Find the latest event ID for each sender, note that the notifications
+        # are already in descending received_ts.
+        sender_ids = {}
+        for n in notifs:
+            sender = notif_events[n["event_id"]].sender
+            if sender not in sender_ids:
+                sender_ids[sender] = n["event_id"]
+
+        # Get the actual member events (in order to calculate a pretty name for
+        # the room).
+        member_event_ids = []
+        member_events = {}
+        for sender_id, event_id in sender_ids.items():
+            type_state_key = ("m.room.member", sender_id)
+            sender_state_event_id = room_state_ids.get(type_state_key)
+            if sender_state_event_id:
+                member_event_ids.append(sender_state_event_id)
+            else:
+                # Attempt to check the historical state for the room.
+                historical_state = await self.state_store.get_state_for_event(
+                    event_id, StateFilter.from_types((type_state_key,))
+                )
+                sender_state_event = historical_state.get(type_state_key)
+                if sender_state_event:
+                    member_events[event_id] = sender_state_event
+        member_events.update(await self.store.get_events(member_event_ids))
+
+        if not member_events:
+            # No member events were found! Maybe the room is empty?
+            # Fallback to the room ID (note that if there was a room name this
+            # would already have been used previously).
+            return self.email_subjects.messages_in_room % {
+                "room": room_id,
+                "app": self.app_name,
+            }
 
         # There was a single sender.
-        if len(sender_ids) == 1:
+        if len(member_events) == 1:
             return self.email_subjects.messages_from_person % {
                 "person": descriptor_from_member_events(member_events.values()),
                 "app": self.app_name,
@@ -676,7 +786,16 @@ class Mailer:
             "app": self.app_name,
         }
 
-    def make_room_link(self, room_id: str) -> str:
+    def _make_room_link(self, room_id: str) -> str:
+        """
+        Generate a link to open a room in the web client.
+
+        Args:
+            room_id: The room ID to generate a link to.
+
+        Returns:
+             A link to open a room in the web client.
+        """
         if self.hs.config.email_riot_base_url:
             base_url = "%s/#/room" % (self.hs.config.email_riot_base_url)
         elif self.app_name == "Vector":
@@ -686,7 +805,16 @@ class Mailer:
             base_url = "https://matrix.to/#"
         return "%s/%s" % (base_url, room_id)
 
-    def make_notif_link(self, notif: Dict[str, str]) -> str:
+    def _make_notif_link(self, notif: Dict[str, str]) -> str:
+        """
+        Generate a link to open an event in the web client.
+
+        Args:
+            notif: The notification to generate a link for.
+
+        Returns:
+             A link to open the notification in the web client.
+        """
         if self.hs.config.email_riot_base_url:
             return "%s/#/room/%s/%s" % (
                 self.hs.config.email_riot_base_url,
@@ -702,9 +830,20 @@ class Mailer:
         else:
             return "https://matrix.to/#/%s/%s" % (notif["room_id"], notif["event_id"])
 
-    def make_unsubscribe_link(
+    def _make_unsubscribe_link(
         self, user_id: str, app_id: str, email_address: str
     ) -> str:
+        """
+        Generate a link to unsubscribe from email notifications.
+
+        Args:
+            user_id: The user receiving the notification.
+            app_id: The application receiving the notification.
+            email_address: The email address receiving the notification.
+
+        Returns:
+             A link to unsubscribe from email notifications.
+        """
         params = {
             "access_token": self.macaroon_gen.generate_delete_pusher_token(user_id),
             "app_id": app_id,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index eed16dbfb5..ae1145be0e 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -78,8 +78,7 @@ class PusherPool:
         self.pushers = {}  # type: Dict[str, Dict[str, Pusher]]
 
     def start(self) -> None:
-        """Starts the pushers off in a background process.
-        """
+        """Starts the pushers off in a background process."""
         if not self._should_start_pushers:
             logger.info("Not starting pushers because they are disabled in the config")
             return
@@ -297,8 +296,7 @@ class PusherPool:
         return pusher
 
     async def _start_pushers(self) -> None:
-        """Start all the pushers
-        """
+        """Start all the pushers"""
         pushers = await self.store.get_all_pushers()
 
         # Stagger starting up the pushers so we don't completely drown the
@@ -335,7 +333,8 @@ class PusherPool:
             return None
         except Exception:
             logger.exception(
-                "Couldn't start pusher id %i: caught Exception", pusher_config.id,
+                "Couldn't start pusher id %i: caught Exception",
+                pusher_config.id,
             )
             return None