diff options
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 35 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 52 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 41 | ||||
-rw-r--r-- | synapse/push/mailer.py | 155 | ||||
-rw-r--r-- | synapse/push/presentable_names.py | 36 | ||||
-rw-r--r-- | synapse/push/push_rule_evaluator.py | 77 | ||||
-rw-r--r-- | synapse/push/push_tools.py | 11 | ||||
-rw-r--r-- | synapse/push/pusher.py | 12 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 59 |
9 files changed, 258 insertions, 220 deletions
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 22491f3700..e75d964ac8 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -51,6 +51,7 @@ push_rules_delta_state_cache_metric = register_cache( "cache", "push_rules_delta_state_cache_metric", cache=[], # Meaningless size, as this isn't a cache that stores values + resizable=False, ) @@ -67,7 +68,8 @@ class BulkPushRuleEvaluator(object): self.room_push_rule_cache_metrics = register_cache( "cache", "room_push_rule_cache", - cache=[], # Meaningless size, as this isn't a cache that stores values + cache=[], # Meaningless size, as this isn't a cache that stores values, + resizable=False, ) @defer.inlineCallbacks @@ -79,7 +81,7 @@ class BulkPushRuleEvaluator(object): dict of user_id -> push_rules """ room_id = event.room_id - rules_for_room = self._get_rules_for_room(room_id) + rules_for_room = yield self._get_rules_for_room(room_id) rules_by_user = yield rules_for_room.get_rules(event, context) @@ -116,7 +118,7 @@ class BulkPushRuleEvaluator(object): @defer.inlineCallbacks def _get_power_levels_and_sender_level(self, event, context): - prev_state_ids = yield context.get_prev_state_ids(self.store) + prev_state_ids = yield context.get_prev_state_ids() pl_event_id = prev_state_ids.get(POWER_KEY) if pl_event_id: # fastpath: if there's a power level event, that's all we need, and @@ -149,9 +151,10 @@ class BulkPushRuleEvaluator(object): room_members = yield self.store.get_joined_users_from_context(event, context) - (power_levels, sender_power_level) = ( - yield self._get_power_levels_and_sender_level(event, context) - ) + ( + power_levels, + sender_power_level, + ) = yield self._get_power_levels_and_sender_level(event, context) evaluator = PushRuleEvaluatorForEvent( event, len(room_members), sender_power_level, power_levels @@ -303,7 +306,7 @@ class RulesForRoom(object): push_rules_delta_state_cache_metric.inc_hits() else: - current_state_ids = yield context.get_current_state_ids(self.store) + current_state_ids = yield context.get_current_state_ids() push_rules_delta_state_cache_metric.inc_misses() push_rules_state_size_counter.inc(len(current_state_ids)) @@ -385,15 +388,7 @@ class RulesForRoom(object): """ sequence = self.sequence - rows = yield self.store._simple_select_many_batch( - table="room_memberships", - column="event_id", - iterable=member_event_ids.values(), - retcols=("user_id", "membership", "event_id"), - keyvalues={}, - batch_size=500, - desc="_get_rules_for_member_event_ids", - ) + rows = yield self.store.get_membership_from_event_ids(member_event_ids.values()) members = {row["event_id"]: (row["user_id"], row["membership"]) for row in rows} @@ -407,11 +402,11 @@ class RulesForRoom(object): if logger.isEnabledFor(logging.DEBUG): logger.debug("Found members %r: %r", self.room_id, members.values()) - interested_in_user_ids = set( + interested_in_user_ids = { user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN - ) + } logger.debug("Joined: %r", interested_in_user_ids) @@ -419,9 +414,9 @@ class RulesForRoom(object): interested_in_user_ids, on_invalidate=self.invalidate_all_cb ) - user_ids = set( + user_ids = { uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher - ) + } logger.debug("With pushers: %r", user_ids) diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 42e5b0c0a5..568c13eaea 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -15,7 +15,6 @@ import logging -from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled from synapse.metrics.background_process_metrics import run_as_background_process @@ -132,8 +131,7 @@ class EmailPusher(object): self._is_processing = False self._start_processing() - @defer.inlineCallbacks - def _process(self): + async def _process(self): # we should never get here if we are already processing assert not self._is_processing @@ -142,7 +140,7 @@ class EmailPusher(object): if self.throttle_params is None: # this is our first loop: load up the throttle params - self.throttle_params = yield self.store.get_throttle_params_by_room( + self.throttle_params = await self.store.get_throttle_params_by_room( self.pusher_id ) @@ -151,7 +149,7 @@ class EmailPusher(object): while True: starting_max_ordering = self.max_stream_ordering try: - yield self._unsafe_process() + await self._unsafe_process() except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: @@ -159,8 +157,7 @@ class EmailPusher(object): finally: self._is_processing = False - @defer.inlineCallbacks - def _unsafe_process(self): + async def _unsafe_process(self): """ Main logic of the push loop without the wrapper function that sets up logging, measures and guards against multiple instances of it @@ -168,12 +165,12 @@ class EmailPusher(object): """ start = 0 if INCLUDE_ALL_UNREAD_NOTIFS else self.last_stream_ordering fn = self.store.get_unread_push_actions_for_user_in_range_for_email - unprocessed = yield fn(self.user_id, start, self.max_stream_ordering) + unprocessed = await fn(self.user_id, start, self.max_stream_ordering) soonest_due_at = None if not unprocessed: - yield self.save_last_stream_ordering_and_success(self.max_stream_ordering) + await self.save_last_stream_ordering_and_success(self.max_stream_ordering) return for push_action in unprocessed: @@ -201,15 +198,15 @@ class EmailPusher(object): "throttle_ms": self.get_room_throttle_ms(push_action["room_id"]), } - yield self.send_notification(unprocessed, reason) + await self.send_notification(unprocessed, reason) - yield self.save_last_stream_ordering_and_success( - max([ea["stream_ordering"] for ea in unprocessed]) + await self.save_last_stream_ordering_and_success( + max(ea["stream_ordering"] for ea in unprocessed) ) # we update the throttle on all the possible unprocessed push actions for ea in unprocessed: - yield self.sent_notif_update_throttle(ea["room_id"], ea) + await self.sent_notif_update_throttle(ea["room_id"], ea) break else: if soonest_due_at is None or should_notify_at < soonest_due_at: @@ -227,21 +224,18 @@ class EmailPusher(object): self.seconds_until(soonest_due_at), self.on_timer ) - @defer.inlineCallbacks - def save_last_stream_ordering_and_success(self, last_stream_ordering): + async def save_last_stream_ordering_and_success(self, last_stream_ordering): if last_stream_ordering is None: # This happens if we haven't yet processed anything return self.last_stream_ordering = last_stream_ordering - pusher_still_exists = ( - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.email, - self.user_id, - last_stream_ordering, - self.clock.time_msec(), - ) + pusher_still_exists = await self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.email, + self.user_id, + last_stream_ordering, + self.clock.time_msec(), ) if not pusher_still_exists: # The pusher has been deleted while we were processing, so @@ -277,13 +271,12 @@ class EmailPusher(object): may_send_at = last_sent_ts + throttle_ms return may_send_at - @defer.inlineCallbacks - def sent_notif_update_throttle(self, room_id, notified_push_action): + async def sent_notif_update_throttle(self, room_id, notified_push_action): # We have sent a notification, so update the throttle accordingly. # If the event that triggered the notif happened more than # THROTTLE_RESET_AFTER_MS after the previous one that triggered a # notif, we release the throttle. Otherwise, the throttle is increased. - time_of_previous_notifs = yield self.store.get_time_of_last_push_action_before( + time_of_previous_notifs = await self.store.get_time_of_last_push_action_before( notified_push_action["stream_ordering"] ) @@ -312,14 +305,13 @@ class EmailPusher(object): "last_sent_ts": self.clock.time_msec(), "throttle_ms": new_throttle_ms, } - yield self.store.set_throttle_params( + await self.store.set_throttle_params( self.pusher_id, room_id, self.throttle_params[room_id] ) - @defer.inlineCallbacks - def send_notification(self, push_actions, reason): + async def send_notification(self, push_actions, reason): logger.info("Sending notif email for user %r", self.user_id) - yield self.mailer.send_notification_mail( + await self.mailer.send_notification_mail( self.app_id, self.user_id, self.email, push_actions, reason ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 6299587808..eaaa7afc91 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -15,8 +15,6 @@ # limitations under the License. import logging -import six - from prometheus_client import Counter from twisted.internet import defer @@ -28,9 +26,6 @@ from synapse.push import PusherConfigException from . import push_rule_evaluator, push_tools -if six.PY3: - long = int - logger = logging.getLogger(__name__) http_push_processed_counter = Counter( @@ -64,6 +59,7 @@ class HttpPusher(object): def __init__(self, hs, pusherdict): self.hs = hs self.store = self.hs.get_datastore() + self.storage = self.hs.get_storage() self.clock = self.hs.get_clock() self.state_handler = self.hs.get_state_handler() self.user_id = pusherdict["user_name"] @@ -102,7 +98,7 @@ class HttpPusher(object): if "url" not in self.data: raise PusherConfigException("'url' required in data for HTTP pusher") self.url = self.data["url"] - self.http_client = hs.get_simple_http_client() + self.http_client = hs.get_proxied_http_client() self.data_minus_url = {} self.data_minus_url.update(self.data) del self.data_minus_url["url"] @@ -210,14 +206,12 @@ class HttpPusher(object): http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action["stream_ordering"] - pusher_still_exists = ( - yield self.store.update_pusher_last_stream_ordering_and_success( - self.app_id, - self.pushkey, - self.user_id, - self.last_stream_ordering, - self.clock.time_msec(), - ) + pusher_still_exists = yield self.store.update_pusher_last_stream_ordering_and_success( + self.app_id, + self.pushkey, + self.user_id, + self.last_stream_ordering, + self.clock.time_msec(), ) if not pusher_still_exists: # The pusher has been deleted while we were processing, so @@ -246,8 +240,8 @@ class HttpPusher(object): # we really only give up so that if the URL gets # fixed, we don't suddenly deliver a load # of old notifications. - logger.warn( - "Giving up on a notification to user %s, " "pushkey %s", + logger.warning( + "Giving up on a notification to user %s, pushkey %s", self.user_id, self.pushkey, ) @@ -299,9 +293,8 @@ class HttpPusher(object): if pk != self.pushkey: # for sanity, we only remove the pushkey if it # was the one we actually sent... - logger.warn( - ("Ignoring rejected pushkey %s because we" " didn't send it"), - pk, + logger.warning( + ("Ignoring rejected pushkey %s because we didn't send it"), pk, ) else: logger.info("Pushkey %s was rejected: removing", pk) @@ -320,7 +313,7 @@ class HttpPusher(object): { "app_id": self.app_id, "pushkey": self.pushkey, - "pushkey_ts": long(self.pushkey_ts / 1000), + "pushkey_ts": int(self.pushkey_ts / 1000), "data": self.data_minus_url, } ], @@ -329,7 +322,7 @@ class HttpPusher(object): return d ctx = yield push_tools.get_context_for_event( - self.store, self.state_handler, event, self.user_id + self.storage, self.state_handler, event, self.user_id ) d = { @@ -349,7 +342,7 @@ class HttpPusher(object): { "app_id": self.app_id, "pushkey": self.pushkey, - "pushkey_ts": long(self.pushkey_ts / 1000), + "pushkey_ts": int(self.pushkey_ts / 1000), "data": self.data_minus_url, "tweaks": tweaks, } @@ -400,7 +393,7 @@ class HttpPusher(object): Args: badge (int): number of unread messages """ - logger.info("Sending updated badge count %d to %s", badge, self.name) + logger.debug("Sending updated badge count %d to %s", badge, self.name) d = { "notification": { "id": "", @@ -411,7 +404,7 @@ class HttpPusher(object): { "app_id": self.app_id, "pushkey": self.pushkey, - "pushkey_ts": long(self.pushkey_ts / 1000), + "pushkey_ts": int(self.pushkey_ts / 1000), "data": self.data_minus_url, } ], diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 3dfd527849..d57a66a697 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -19,14 +19,13 @@ import logging import time from email.mime.multipart import MIMEMultipart from email.mime.text import MIMEText +from typing import Iterable, List, TypeVar from six.moves import urllib import bleach import jinja2 -from twisted.internet import defer - from synapse.api.constants import EventTypes from synapse.api.errors import StoreError from synapse.logging.context import make_deferred_yieldable @@ -41,9 +40,11 @@ from synapse.visibility import filter_events_for_client logger = logging.getLogger(__name__) +T = TypeVar("T") + MESSAGE_FROM_PERSON_IN_ROOM = ( - "You have a message on %(app)s from %(person)s " "in the %(room)s room..." + "You have a message on %(app)s from %(person)s in the %(room)s room..." ) MESSAGE_FROM_PERSON = "You have a message on %(app)s from %(person)s..." MESSAGES_FROM_PERSON = "You have messages on %(app)s from %(person)s..." @@ -55,7 +56,7 @@ MESSAGES_FROM_PERSON_AND_OTHERS = ( "You have messages on %(app)s from %(person)s and others..." ) INVITE_FROM_PERSON_TO_ROOM = ( - "%(person)s has invited you to join the " "%(room)s room on %(app)s..." + "%(person)s has invited you to join the %(room)s room on %(app)s..." ) INVITE_FROM_PERSON = "%(person)s has invited you to chat on %(app)s..." @@ -119,12 +120,12 @@ class Mailer(object): self.store = self.hs.get_datastore() self.macaroon_gen = self.hs.get_macaroon_generator() self.state_handler = self.hs.get_state_handler() + self.storage = hs.get_storage() self.app_name = app_name logger.info("Created Mailer for app_name %s" % app_name) - @defer.inlineCallbacks - def send_password_reset_mail(self, email_address, token, client_secret, sid): + async def send_password_reset_mail(self, email_address, token, client_secret, sid): """Send an email with a password reset link to a user Args: @@ -136,22 +137,22 @@ class Mailer(object): group together multiple email sending attempts sid (str): The generated session ID """ + params = {"token": token, "client_secret": client_secret, "sid": sid} link = ( self.hs.config.public_baseurl - + "_matrix/client/unstable/password_reset/email/submit_token" - "?token=%s&client_secret=%s&sid=%s" % (token, client_secret, sid) + + "_matrix/client/unstable/password_reset/email/submit_token?%s" + % urllib.parse.urlencode(params) ) template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Password Reset" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_registration_mail(self, email_address, token, client_secret, sid): + async def send_registration_mail(self, email_address, token, client_secret, sid): """Send an email with a registration confirmation link to a user Args: @@ -163,28 +164,56 @@ class Mailer(object): group together multiple email sending attempts sid (str): The generated session ID """ + params = {"token": token, "client_secret": client_secret, "sid": sid} link = ( self.hs.config.public_baseurl - + "_matrix/client/unstable/registration/email/submit_token" - "?token=%s&client_secret=%s&sid=%s" % (token, client_secret, sid) + + "_matrix/client/unstable/registration/email/submit_token?%s" + % urllib.parse.urlencode(params) ) template_vars = {"link": link} - yield self.send_email( + await self.send_email( email_address, "[%s] Register your Email Address" % self.hs.config.server_name, template_vars, ) - @defer.inlineCallbacks - def send_notification_mail( + async def send_add_threepid_mail(self, email_address, token, client_secret, sid): + """Send an email with a validation link to a user for adding a 3pid to their account + + Args: + email_address (str): Email address we're sending the validation link to + + token (str): Unique token generated by the server to verify the email was received + + client_secret (str): Unique token generated by the client to group together + multiple email sending attempts + + sid (str): The generated session ID + """ + params = {"token": token, "client_secret": client_secret, "sid": sid} + link = ( + self.hs.config.public_baseurl + + "_matrix/client/unstable/add_threepid/email/submit_token?%s" + % urllib.parse.urlencode(params) + ) + + template_vars = {"link": link} + + await self.send_email( + email_address, + "[%s] Validate Your Email" % self.hs.config.server_name, + template_vars, + ) + + async def send_notification_mail( self, app_id, user_id, email_address, push_actions, reason ): """Send email regarding a user's room notifications""" rooms_in_order = deduped_ordered_list([pa["room_id"] for pa in push_actions]) - notif_events = yield self.store.get_events( + notif_events = await self.store.get_events( [pa["event_id"] for pa in push_actions] ) @@ -197,7 +226,7 @@ class Mailer(object): state_by_room = {} try: - user_display_name = yield self.store.get_profile_displayname( + user_display_name = await self.store.get_profile_displayname( UserID.from_string(user_id).localpart ) if user_display_name is None: @@ -205,14 +234,13 @@ class Mailer(object): except StoreError: user_display_name = user_id - @defer.inlineCallbacks - def _fetch_room_state(room_id): - room_state = yield self.store.get_current_state_ids(room_id) + async def _fetch_room_state(room_id): + room_state = await self.store.get_current_state_ids(room_id) state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email # notifs are much less realtime than sync so we can afford to wait a bit. - yield concurrently_execute(_fetch_room_state, rooms_in_order, 3) + await concurrently_execute(_fetch_room_state, rooms_in_order, 3) # actually sort our so-called rooms_in_order list, most recent room first rooms_in_order.sort(key=lambda r: -(notifs_by_room[r][-1]["received_ts"] or 0)) @@ -220,19 +248,19 @@ class Mailer(object): rooms = [] for r in rooms_in_order: - roomvars = yield self.get_room_vars( + roomvars = await self.get_room_vars( r, user_id, notifs_by_room[r], notif_events, state_by_room[r] ) rooms.append(roomvars) - reason["room_name"] = yield calculate_room_name( + reason["room_name"] = await calculate_room_name( self.store, state_by_room[reason["room_id"]], user_id, fallback_to_members=True, ) - summary_text = yield self.make_summary_text( + summary_text = await self.make_summary_text( notifs_by_room, state_by_room, notif_events, user_id, reason ) @@ -247,12 +275,11 @@ class Mailer(object): "reason": reason, } - yield self.send_email( + await self.send_email( email_address, "[%s] %s" % (self.app_name, summary_text), template_vars ) - @defer.inlineCallbacks - def send_email(self, email_address, subject, template_vars): + async def send_email(self, email_address, subject, template_vars): """Send an email with the given information and template text""" try: from_string = self.hs.config.email_notif_from % {"app": self.app_name} @@ -280,9 +307,9 @@ class Mailer(object): multipart_msg.attach(text_part) multipart_msg.attach(html_part) - logger.info("Sending email notification to %s" % email_address) + logger.info("Sending email to %s" % email_address) - yield make_deferred_yieldable( + await make_deferred_yieldable( self.sendmail( self.hs.config.email_smtp_host, raw_from, @@ -297,13 +324,14 @@ class Mailer(object): ) ) - @defer.inlineCallbacks - def get_room_vars(self, room_id, user_id, notifs, notif_events, room_state_ids): + async def get_room_vars( + self, room_id, user_id, notifs, notif_events, room_state_ids + ): my_member_event_id = room_state_ids[("m.room.member", user_id)] - my_member_event = yield self.store.get_event(my_member_event_id) + my_member_event = await self.store.get_event(my_member_event_id) is_invite = my_member_event.content["membership"] == "invite" - room_name = yield calculate_room_name(self.store, room_state_ids, user_id) + room_name = await calculate_room_name(self.store, room_state_ids, user_id) room_vars = { "title": room_name, @@ -315,7 +343,7 @@ class Mailer(object): if not is_invite: for n in notifs: - notifvars = yield self.get_notif_vars( + notifvars = await self.get_notif_vars( n, user_id, notif_events[n["event_id"]], room_state_ids ) @@ -342,9 +370,8 @@ class Mailer(object): return room_vars - @defer.inlineCallbacks - def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): - results = yield self.store.get_events_around( + async def get_notif_vars(self, notif, user_id, notif_event, room_state_ids): + results = await self.store.get_events_around( notif["room_id"], notif["event_id"], before_limit=CONTEXT_BEFORE, @@ -357,25 +384,24 @@ class Mailer(object): "messages": [], } - the_events = yield filter_events_for_client( - self.store, user_id, results["events_before"] + the_events = await filter_events_for_client( + self.storage, user_id, results["events_before"] ) the_events.append(notif_event) for event in the_events: - messagevars = yield self.get_message_vars(notif, event, room_state_ids) + messagevars = await self.get_message_vars(notif, event, room_state_ids) if messagevars is not None: ret["messages"].append(messagevars) return ret - @defer.inlineCallbacks - def get_message_vars(self, notif, event, room_state_ids): + async def get_message_vars(self, notif, event, room_state_ids): if event.type != EventTypes.Message: return sender_state_event_id = room_state_ids[("m.room.member", event.sender)] - sender_state_event = yield self.store.get_event(sender_state_event_id) + sender_state_event = await self.store.get_event(sender_state_event_id) sender_name = name_from_member_event(sender_state_event) sender_avatar_url = sender_state_event.content.get("avatar_url") @@ -425,8 +451,7 @@ class Mailer(object): return messagevars - @defer.inlineCallbacks - def make_summary_text( + async def make_summary_text( self, notifs_by_room, room_state_ids, notif_events, user_id, reason ): if len(notifs_by_room) == 1: @@ -436,17 +461,17 @@ class Mailer(object): # If the room has some kind of name, use it, but we don't # want the generated-from-names one here otherwise we'll # end up with, "new message from Bob in the Bob room" - room_name = yield calculate_room_name( + room_name = await calculate_room_name( self.store, room_state_ids[room_id], user_id, fallback_to_members=False ) my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)] - my_member_event = yield self.store.get_event(my_member_event_id) + my_member_event = await self.store.get_event(my_member_event_id) if my_member_event.content["membership"] == "invite": inviter_member_event_id = room_state_ids[room_id][ ("m.room.member", my_member_event.sender) ] - inviter_member_event = yield self.store.get_event( + inviter_member_event = await self.store.get_event( inviter_member_event_id ) inviter_name = name_from_member_event(inviter_member_event) @@ -471,7 +496,7 @@ class Mailer(object): state_event_id = room_state_ids[room_id][ ("m.room.member", event.sender) ] - state_event = yield self.store.get_event(state_event_id) + state_event = await self.store.get_event(state_event_id) sender_name = name_from_member_event(state_event) if sender_name is not None and room_name is not None: @@ -494,15 +519,13 @@ class Mailer(object): # If the room doesn't have a name, say who the messages # are from explicitly to avoid, "messages in the Bob room" sender_ids = list( - set( - [ - notif_events[n["event_id"]].sender - for n in notifs_by_room[room_id] - ] - ) + { + notif_events[n["event_id"]].sender + for n in notifs_by_room[room_id] + } ) - member_events = yield self.store.get_events( + member_events = await self.store.get_events( [ room_state_ids[room_id][("m.room.member", s)] for s in sender_ids @@ -525,16 +548,16 @@ class Mailer(object): else: # If the reason room doesn't have a name, say who the messages # are from explicitly to avoid, "messages in the Bob room" + room_id = reason["room_id"] + sender_ids = list( - set( - [ - notif_events[n["event_id"]].sender - for n in notifs_by_room[reason["room_id"]] - ] - ) + { + notif_events[n["event_id"]].sender + for n in notifs_by_room[room_id] + } ) - member_events = yield self.store.get_events( + member_events = await self.store.get_events( [room_state_ids[room_id][("m.room.member", s)] for s in sender_ids] ) @@ -608,10 +631,10 @@ def safe_text(raw_text): ) -def deduped_ordered_list(l): +def deduped_ordered_list(it: Iterable[T]) -> List[T]: seen = set() ret = [] - for item in l: + for item in it: if item not in seen: seen.add(item) ret.append(item) diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py index 16a7e8e31d..0644a13cfc 100644 --- a/synapse/push/presentable_names.py +++ b/synapse/push/presentable_names.py @@ -18,6 +18,8 @@ import re from twisted.internet import defer +from synapse.api.constants import EventTypes + logger = logging.getLogger(__name__) # intentionally looser than what aliases we allow to be registered since @@ -50,17 +52,17 @@ def calculate_room_name( (string or None) A human readable name for the room. """ # does it have a name? - if ("m.room.name", "") in room_state_ids: + if (EventTypes.Name, "") in room_state_ids: m_room_name = yield store.get_event( - room_state_ids[("m.room.name", "")], allow_none=True + room_state_ids[(EventTypes.Name, "")], allow_none=True ) if m_room_name and m_room_name.content and m_room_name.content["name"]: return m_room_name.content["name"] # does it have a canonical alias? - if ("m.room.canonical_alias", "") in room_state_ids: + if (EventTypes.CanonicalAlias, "") in room_state_ids: canon_alias = yield store.get_event( - room_state_ids[("m.room.canonical_alias", "")], allow_none=True + room_state_ids[(EventTypes.CanonicalAlias, "")], allow_none=True ) if ( canon_alias @@ -74,32 +76,22 @@ def calculate_room_name( # for an event type, so rearrange the data structure room_state_bytype_ids = _state_as_two_level_dict(room_state_ids) - # right then, any aliases at all? - if "m.room.aliases" in room_state_bytype_ids: - m_room_aliases = room_state_bytype_ids["m.room.aliases"] - for alias_id in m_room_aliases.values(): - alias_event = yield store.get_event(alias_id, allow_none=True) - if alias_event and alias_event.content.get("aliases"): - the_aliases = alias_event.content["aliases"] - if len(the_aliases) > 0 and _looks_like_an_alias(the_aliases[0]): - return the_aliases[0] - if not fallback_to_members: return None my_member_event = None - if ("m.room.member", user_id) in room_state_ids: + if (EventTypes.Member, user_id) in room_state_ids: my_member_event = yield store.get_event( - room_state_ids[("m.room.member", user_id)], allow_none=True + room_state_ids[(EventTypes.Member, user_id)], allow_none=True ) if ( my_member_event is not None and my_member_event.content["membership"] == "invite" ): - if ("m.room.member", my_member_event.sender) in room_state_ids: + if (EventTypes.Member, my_member_event.sender) in room_state_ids: inviter_member_event = yield store.get_event( - room_state_ids[("m.room.member", my_member_event.sender)], + room_state_ids[(EventTypes.Member, my_member_event.sender)], allow_none=True, ) if inviter_member_event: @@ -114,9 +106,9 @@ def calculate_room_name( # we're going to have to generate a name based on who's in the room, # so find out who is in the room that isn't the user. - if "m.room.member" in room_state_bytype_ids: + if EventTypes.Member in room_state_bytype_ids: member_events = yield store.get_events( - list(room_state_bytype_ids["m.room.member"].values()) + list(room_state_bytype_ids[EventTypes.Member].values()) ) all_members = [ ev @@ -138,9 +130,9 @@ def calculate_room_name( # self-chat, peeked room with 1 participant, # or inbound invite, or outbound 3PID invite. if all_members[0].sender == user_id: - if "m.room.third_party_invite" in room_state_bytype_ids: + if EventTypes.ThirdPartyInvite in room_state_bytype_ids: third_party_invites = room_state_bytype_ids[ - "m.room.third_party_invite" + EventTypes.ThirdPartyInvite ].values() if len(third_party_invites) > 0: diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 5ed9147de4..11032491af 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -16,11 +16,13 @@ import logging import re +from typing import Pattern from six import string_types +from synapse.events import EventBase from synapse.types import UserID -from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache +from synapse.util.caches import register_cache from synapse.util.caches.lrucache import LruCache logger = logging.getLogger(__name__) @@ -56,18 +58,18 @@ def _test_ineq_condition(condition, number): rhs = m.group(2) if not rhs.isdigit(): return False - rhs = int(rhs) + rhs_int = int(rhs) if ineq == "" or ineq == "==": - return number == rhs + return number == rhs_int elif ineq == "<": - return number < rhs + return number < rhs_int elif ineq == ">": - return number > rhs + return number > rhs_int elif ineq == ">=": - return number >= rhs + return number >= rhs_int elif ineq == "<=": - return number <= rhs + return number <= rhs_int else: return False @@ -83,7 +85,13 @@ def tweaks_for_actions(actions): class PushRuleEvaluatorForEvent(object): - def __init__(self, event, room_member_count, sender_power_level, power_levels): + def __init__( + self, + event: EventBase, + room_member_count: int, + sender_power_level: int, + power_levels: dict, + ): self._event = event self._room_member_count = room_member_count self._sender_power_level = sender_power_level @@ -92,7 +100,7 @@ class PushRuleEvaluatorForEvent(object): # Maps strings of e.g. 'content.body' -> event["content"]["body"] self._value_cache = _flatten_dict(event) - def matches(self, condition, user_id, display_name): + def matches(self, condition: dict, user_id: str, display_name: str) -> bool: if condition["kind"] == "event_match": return self._event_match(condition, user_id) elif condition["kind"] == "contains_display_name": @@ -106,7 +114,7 @@ class PushRuleEvaluatorForEvent(object): else: return True - def _event_match(self, condition, user_id): + def _event_match(self, condition: dict, user_id: str) -> bool: pattern = condition.get("pattern", None) if not pattern: @@ -117,7 +125,7 @@ class PushRuleEvaluatorForEvent(object): pattern = UserID.from_string(user_id).localpart if not pattern: - logger.warn("event_match condition with no pattern") + logger.warning("event_match condition with no pattern") return False # XXX: optimisation: cache our pattern regexps @@ -134,7 +142,7 @@ class PushRuleEvaluatorForEvent(object): return _glob_matches(pattern, haystack) - def _contains_display_name(self, display_name): + def _contains_display_name(self, display_name: str) -> bool: if not display_name: return False @@ -142,51 +150,52 @@ class PushRuleEvaluatorForEvent(object): if not body: return False - return _glob_matches(display_name, body, word_boundary=True) + # Similar to _glob_matches, but do not treat display_name as a glob. + r = regex_cache.get((display_name, False, True), None) + if not r: + r = re.escape(display_name) + r = _re_word_boundary(r) + r = re.compile(r, flags=re.IGNORECASE) + regex_cache[(display_name, False, True)] = r + + return r.search(body) - def _get_value(self, dotted_key): + def _get_value(self, dotted_key: str) -> str: return self._value_cache.get(dotted_key, None) -# Caches (glob, word_boundary) -> regex for push. See _glob_matches -regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR) +# Caches (string, is_glob, word_boundary) -> regex for push. See _glob_matches +regex_cache = LruCache(50000) register_cache("cache", "regex_push_cache", regex_cache) -def _glob_matches(glob, value, word_boundary=False): +def _glob_matches(glob: str, value: str, word_boundary: bool = False) -> bool: """Tests if value matches glob. Args: - glob (string) - value (string): String to test against glob. - word_boundary (bool): Whether to match against word boundaries or entire + glob + value: String to test against glob. + word_boundary: Whether to match against word boundaries or entire string. Defaults to False. - - Returns: - bool """ try: - r = regex_cache.get((glob, word_boundary), None) + r = regex_cache.get((glob, True, word_boundary), None) if not r: r = _glob_to_re(glob, word_boundary) - regex_cache[(glob, word_boundary)] = r + regex_cache[(glob, True, word_boundary)] = r return r.search(value) except re.error: - logger.warn("Failed to parse glob to regex: %r", glob) + logger.warning("Failed to parse glob to regex: %r", glob) return False -def _glob_to_re(glob, word_boundary): +def _glob_to_re(glob: str, word_boundary: bool) -> Pattern: """Generates regex for a given glob. Args: - glob (string) - word_boundary (bool): Whether to match against word boundaries or entire - string. Defaults to False. - - Returns: - regex object + glob + word_boundary: Whether to match against word boundaries or entire string. """ if IS_GLOB.search(glob): r = re.escape(glob) @@ -219,7 +228,7 @@ def _glob_to_re(glob, word_boundary): return re.compile(r, flags=re.IGNORECASE) -def _re_word_boundary(r): +def _re_word_boundary(r: str) -> str: """ Adds word boundary characters to the start and end of an expression to require that the match occur as a whole word, diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index a54051a726..5dae4648c0 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -16,11 +16,12 @@ from twisted.internet import defer from synapse.push.presentable_names import calculate_room_name, name_from_member_event +from synapse.storage import Storage @defer.inlineCallbacks def get_badge_count(store, user_id): - invites = yield store.get_invited_rooms_for_user(user_id) + invites = yield store.get_invited_rooms_for_local_user(user_id) joins = yield store.get_rooms_for_user(user_id) my_receipts_by_room = yield store.get_receipts_for_user(user_id, "m.read") @@ -43,22 +44,22 @@ def get_badge_count(store, user_id): @defer.inlineCallbacks -def get_context_for_event(store, state_handler, ev, user_id): +def get_context_for_event(storage: Storage, state_handler, ev, user_id): ctx = {} - room_state_ids = yield store.get_state_ids_for_event(ev.event_id) + room_state_ids = yield storage.state.get_state_ids_for_event(ev.event_id) # we no longer bother setting room_alias, and make room_name the # human-readable name instead, be that m.room.name, an alias or # a list of people in the room name = yield calculate_room_name( - store, room_state_ids, user_id, fallback_to_single_member=False + storage.main, room_state_ids, user_id, fallback_to_single_member=False ) if name: ctx["name"] = name sender_state_event_id = room_state_ids[("m.room.member", ev.sender)] - sender_state_event = yield store.get_event(sender_state_event_id) + sender_state_event = yield storage.main.get_event(sender_state_event_id) ctx["sender_display_name"] = name_from_member_event(sender_state_event) return ctx diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index f277aeb131..8ad0bf5936 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -80,9 +80,11 @@ class PusherFactory(object): return EmailPusher(self.hs, pusherdict, mailer) def _app_name_from_pusherdict(self, pusherdict): - if "data" in pusherdict and "brand" in pusherdict["data"]: - app_name = pusherdict["data"]["brand"] - else: - app_name = self.config.email_app_name + data = pusherdict["data"] - return app_name + if isinstance(data, dict): + brand = data.get("brand") + if isinstance(brand, str): + return brand + + return self.config.email_app_name diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 08e840fdc2..88d203aa44 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -15,11 +15,17 @@ # limitations under the License. import logging +from collections import defaultdict +from threading import Lock +from typing import Dict, Tuple, Union from twisted.internet import defer +from synapse.metrics import LaterGauge from synapse.metrics.background_process_metrics import run_as_background_process from synapse.push import PusherConfigException +from synapse.push.emailpusher import EmailPusher +from synapse.push.httppusher import HttpPusher from synapse.push.pusher import PusherFactory from synapse.util.async_helpers import concurrently_execute @@ -47,7 +53,29 @@ class PusherPool: self._should_start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() - self.pushers = {} + + # map from user id to app_id:pushkey to pusher + self.pushers = {} # type: Dict[str, Dict[str, Union[HttpPusher, EmailPusher]]] + + # a lock for the pushers dict, since `count_pushers` is called from an different + # and we otherwise get concurrent modification errors + self._pushers_lock = Lock() + + def count_pushers(): + results = defaultdict(int) # type: Dict[Tuple[str, str], int] + with self._pushers_lock: + for pushers in self.pushers.values(): + for pusher in pushers.values(): + k = (type(pusher).__name__, pusher.app_id) + results[k] += 1 + return results + + LaterGauge( + name="synapse_pushers", + desc="the number of active pushers", + labels=["kind", "app_id"], + caller=count_pushers, + ) def start(self): """Starts the pushers off in a background process. @@ -103,9 +131,7 @@ class PusherPool: # create the pusher setting last_stream_ordering to the current maximum # stream ordering in event_push_actions, so it will process # pushes from this point onwards. - last_stream_ordering = ( - yield self.store.get_latest_push_action_stream_ordering() - ) + last_stream_ordering = yield self.store.get_latest_push_action_stream_ordering() yield self.store.add_pusher( user_id=user_id, @@ -193,7 +219,7 @@ class PusherPool: min_stream_id - 1, max_stream_id ) # This returns a tuple, user_id is at index 3 - users_affected = set([r[3] for r in updated_receipts]) + users_affected = {r[3] for r in updated_receipts} for u in users_affected: if u in self.pushers: @@ -234,7 +260,6 @@ class PusherPool: Deferred """ pushers = yield self.store.get_all_pushers() - logger.info("Starting %d pushers", len(pushers)) # Stagger starting up the pushers so we don't completely drown the # process on start up. @@ -247,7 +272,7 @@ class PusherPool: """Start the given pusher Args: - pusherdict (dict): + pusherdict (dict): dict with the values pulled from the db table Returns: Deferred[EmailPusher|HttpPusher] @@ -256,7 +281,8 @@ class PusherPool: p = self.pusher_factory.create_pusher(pusherdict) except PusherConfigException as e: logger.warning( - "Pusher incorrectly configured user=%s, appid=%s, pushkey=%s: %s", + "Pusher incorrectly configured id=%i, user=%s, appid=%s, pushkey=%s: %s", + pusherdict["id"], pusherdict.get("user_name"), pusherdict.get("app_id"), pusherdict.get("pushkey"), @@ -264,18 +290,21 @@ class PusherPool: ) return except Exception: - logger.exception("Couldn't start a pusher: caught Exception") + logger.exception( + "Couldn't start pusher id %i: caught Exception", pusherdict["id"], + ) return if not p: return appid_pushkey = "%s:%s" % (pusherdict["app_id"], pusherdict["pushkey"]) - byuser = self.pushers.setdefault(pusherdict["user_name"], {}) - if appid_pushkey in byuser: - byuser[appid_pushkey].on_stop() - byuser[appid_pushkey] = p + with self._pushers_lock: + byuser = self.pushers.setdefault(pusherdict["user_name"], {}) + if appid_pushkey in byuser: + byuser[appid_pushkey].on_stop() + byuser[appid_pushkey] = p # Check if there *may* be push to process. We do this as this check is a # lot cheaper to do than actually fetching the exact rows we need to @@ -304,7 +333,9 @@ class PusherPool: if appid_pushkey in byuser: logger.info("Stopping pusher %s / %s", user_id, appid_pushkey) byuser[appid_pushkey].on_stop() - del byuser[appid_pushkey] + with self._pushers_lock: + del byuser[appid_pushkey] + yield self.store.delete_pusher_by_app_id_pushkey_user_id( app_id, pushkey, user_id ) |