diff options
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/action_generator.py | 16 | ||||
-rw-r--r-- | synapse/push/baserules.py | 23 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 457 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 31 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 75 | ||||
-rw-r--r-- | synapse/push/mailer.py | 95 | ||||
-rw-r--r-- | synapse/push/push_rule_evaluator.py | 154 | ||||
-rw-r--r-- | synapse/push/push_tools.py | 15 | ||||
-rw-r--r-- | synapse/push/pusher.py | 60 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 63 |
10 files changed, 754 insertions, 235 deletions
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 3f75d3f921..8f619a7a1b 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -15,7 +15,7 @@ from twisted.internet import defer -from .bulk_push_rule_evaluator import evaluator_for_event +from .bulk_push_rule_evaluator import BulkPushRuleEvaluator from synapse.util.metrics import Measure @@ -24,11 +24,12 @@ import logging logger = logging.getLogger(__name__) -class ActionGenerator: +class ActionGenerator(object): def __init__(self, hs): self.hs = hs self.clock = hs.get_clock() self.store = hs.get_datastore() + self.bulk_evaluator = BulkPushRuleEvaluator(hs) # really we want to get all user ids and all profile tags too, # since we want the actions for each profile tag for every user and # also actions for a client with no profile tag for each user. @@ -38,16 +39,7 @@ class ActionGenerator: @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context): - with Measure(self.clock, "evaluator_for_event"): - bulk_evaluator = yield evaluator_for_event( - event, self.hs, self.store, context - ) - with Measure(self.clock, "action_for_event_by_user"): - actions_by_user = yield bulk_evaluator.action_for_event_by_user( + yield self.bulk_evaluator.action_for_event_by_user( event, context ) - - context.push_actions = [ - (uid, actions) for uid, actions in actions_by_user.items() - ] diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 85effdfa46..7a18afe5f9 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -1,4 +1,5 @@ # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -238,6 +239,28 @@ BASE_APPEND_OVERRIDE_RULES = [ } ] }, + { + 'rule_id': 'global/override/.m.rule.roomnotif', + 'conditions': [ + { + 'kind': 'event_match', + 'key': 'content.body', + 'pattern': '@room', + '_id': '_roomnotif_content', + }, + { + 'kind': 'sender_notification_permission', + 'key': 'room', + '_id': '_roomnotif_pl', + }, + ], + 'actions': [ + 'notify', { + 'set_tweak': 'highlight', + 'value': True, + } + ] + } ] diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 78b095c903..7c680659b6 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -19,88 +20,166 @@ from twisted.internet import defer from .push_rule_evaluator import PushRuleEvaluatorForEvent -from synapse.api.constants import EventTypes -from synapse.visibility import filter_events_for_clients_context +from synapse.event_auth import get_user_power_level +from synapse.api.constants import EventTypes, Membership +from synapse.metrics import get_metrics_for +from synapse.util.caches import metrics as cache_metrics +from synapse.util.caches.descriptors import cached +from synapse.util.async import Linearizer +from synapse.state import POWER_KEY + +from collections import namedtuple logger = logging.getLogger(__name__) -@defer.inlineCallbacks -def evaluator_for_event(event, hs, store, context): - rules_by_user = yield store.bulk_get_push_rules_for_room( - event, context - ) - - # if this event is an invite event, we may need to run rules for the user - # who's been invited, otherwise they won't get told they've been invited - if event.type == 'm.room.member' and event.content['membership'] == 'invite': - invited_user = event.state_key - if invited_user and hs.is_mine_id(invited_user): - has_pusher = yield store.user_has_pusher(invited_user) - if has_pusher: - rules_by_user = dict(rules_by_user) - rules_by_user[invited_user] = yield store.get_push_rules_for_user( - invited_user - ) +rules_by_room = {} - defer.returnValue(BulkPushRuleEvaluator( - event.room_id, rules_by_user, store - )) +push_metrics = get_metrics_for(__name__) +push_rules_invalidation_counter = push_metrics.register_counter( + "push_rules_invalidation_counter" +) +push_rules_state_size_counter = push_metrics.register_counter( + "push_rules_state_size_counter" +) -class BulkPushRuleEvaluator: - """ - Runs push rules for all users in a room. - This is faster than running PushRuleEvaluator for each user because it - fetches all the rules for all the users in one (batched) db query - rather than doing multiple queries per-user. It currently uses - the same logic to run the actual rules, but could be optimised further - (see https://matrix.org/jira/browse/SYN-562) +# Measures whether we use the fast path of using state deltas, or if we have to +# recalculate from scratch +push_rules_delta_state_cache_metric = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # Meaningless size, as this isn't a cache that stores values + cache_name="push_rules_delta_state_cache_metric", +) + + +class BulkPushRuleEvaluator(object): + """Calculates the outcome of push rules for an event for all users in the + room at once. """ - def __init__(self, room_id, rules_by_user, store): - self.room_id = room_id - self.rules_by_user = rules_by_user - self.store = store + + def __init__(self, hs): + self.hs = hs + self.store = hs.get_datastore() + self.auth = hs.get_auth() + + self.room_push_rule_cache_metrics = cache_metrics.register_cache( + "cache", + size_callback=lambda: 0, # There's not good value for this + cache_name="room_push_rule_cache", + ) @defer.inlineCallbacks - def action_for_event_by_user(self, event, context): - actions_by_user = {} + def _get_rules_for_event(self, event, context): + """This gets the rules for all users in the room at the time of the event, + as well as the push rules for the invitee if the event is an invite. + + Returns: + dict of user_id -> push_rules + """ + room_id = event.room_id + rules_for_room = self._get_rules_for_room(room_id) + + rules_by_user = yield rules_for_room.get_rules(event, context) + + # if this event is an invite event, we may need to run rules for the user + # who's been invited, otherwise they won't get told they've been invited + if event.type == 'm.room.member' and event.content['membership'] == 'invite': + invited = event.state_key + if invited and self.hs.is_mine_id(invited): + has_pusher = yield self.store.user_has_pusher(invited) + if has_pusher: + rules_by_user = dict(rules_by_user) + rules_by_user[invited] = yield self.store.get_push_rules_for_user( + invited + ) - # None of these users can be peeking since this list of users comes - # from the set of users in the room, so we know for sure they're all - # actually in the room. - user_tuples = [ - (u, False) for u in self.rules_by_user.keys() - ] + defer.returnValue(rules_by_user) - filtered_by_user = yield filter_events_for_clients_context( - self.store, user_tuples, [event], {event.event_id: context} + @cached() + def _get_rules_for_room(self, room_id): + """Get the current RulesForRoom object for the given room id + + Returns: + RulesForRoom + """ + # It's important that RulesForRoom gets added to self._get_rules_for_room.cache + # before any lookup methods get called on it as otherwise there may be + # a race if invalidate_all gets called (which assumes its in the cache) + return RulesForRoom( + self.hs, room_id, self._get_rules_for_room.cache, + self.room_push_rule_cache_metrics, ) + @defer.inlineCallbacks + def _get_power_levels_and_sender_level(self, event, context): + pl_event_id = context.prev_state_ids.get(POWER_KEY) + if pl_event_id: + # fastpath: if there's a power level event, that's all we need, and + # not having a power level event is an extreme edge case + pl_event = yield self.store.get_event(pl_event_id) + auth_events = {POWER_KEY: pl_event} + else: + auth_events_ids = yield self.auth.compute_auth_events( + event, context.prev_state_ids, for_verification=False, + ) + auth_events = yield self.store.get_events(auth_events_ids) + auth_events = { + (e.type, e.state_key): e for e in auth_events.itervalues() + } + + sender_level = get_user_power_level(event.sender, auth_events) + + pl_event = auth_events.get(POWER_KEY) + + defer.returnValue((pl_event.content if pl_event else {}, sender_level)) + + @defer.inlineCallbacks + def action_for_event_by_user(self, event, context): + """Given an event and context, evaluate the push rules and insert the + results into the event_push_actions_staging table. + + Returns: + Deferred + """ + rules_by_user = yield self._get_rules_for_event(event, context) + actions_by_user = {} + room_members = yield self.store.get_joined_users_from_context( event, context ) - evaluator = PushRuleEvaluatorForEvent(event, len(room_members)) + (power_levels, sender_power_level) = ( + yield self._get_power_levels_and_sender_level(event, context) + ) + + evaluator = PushRuleEvaluatorForEvent( + event, len(room_members), sender_power_level, power_levels, + ) condition_cache = {} - for uid, rules in self.rules_by_user.items(): - display_name = room_members.get(uid, {}).get("display_name", None) + for uid, rules in rules_by_user.iteritems(): + if event.sender == uid: + continue + + if not event.is_state(): + is_ignored = yield self.store.is_ignored_by(event.sender, uid) + if is_ignored: + continue + + display_name = None + profile_info = room_members.get(uid) + if profile_info: + display_name = profile_info.display_name + if not display_name: # Handle the case where we are pushing a membership event to # that user, as they might not be already joined. if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) - filtered = filtered_by_user[uid] - if len(filtered) == 0: - continue - - if filtered[0].sender == uid: - continue - for rule in rules: if 'enabled' in rule and not rule['enabled']: continue @@ -111,9 +190,16 @@ class BulkPushRuleEvaluator: if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: + # Push rules say we should notify the user of this event actions_by_user[uid] = actions break - defer.returnValue(actions_by_user) + + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) + yield self.store.add_push_actions_to_staging( + event.event_id, actions_by_user, + ) def _condition_checker(evaluator, conditions, uid, display_name, cache): @@ -134,3 +220,264 @@ def _condition_checker(evaluator, conditions, uid, display_name, cache): return False return True + + +class RulesForRoom(object): + """Caches push rules for users in a room. + + This efficiently handles users joining/leaving the room by not invalidating + the entire cache for the room. + """ + + def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics): + """ + Args: + hs (HomeServer) + room_id (str) + rules_for_room_cache(Cache): The cache object that caches these + RoomsForUser objects. + room_push_rule_cache_metrics (CacheMetric) + """ + self.room_id = room_id + self.is_mine_id = hs.is_mine_id + self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = room_push_rule_cache_metrics + + self.linearizer = Linearizer(name="rules_for_room") + + self.member_map = {} # event_id -> (user_id, state) + self.rules_by_user = {} # user_id -> rules + + # The last state group we updated the caches for. If the state_group of + # a new event comes along, we know that we can just return the cached + # result. + # On invalidation of the rules themselves (if the user changes them), + # we invalidate everything and set state_group to `object()` + self.state_group = object() + + # A sequence number to keep track of when we're allowed to update the + # cache. We bump the sequence number when we invalidate the cache. If + # the sequence number changes while we're calculating stuff we should + # not update the cache with it. + self.sequence = 0 + + # A cache of user_ids that we *know* aren't interesting, e.g. user_ids + # owned by AS's, or remote users, etc. (I.e. users we will never need to + # calculate push for) + # These never need to be invalidated as we will never set up push for + # them. + self.uninteresting_user_set = set() + + # We need to be clever on the invalidating caches callbacks, as + # otherwise the invalidation callback holds a reference to the object, + # potentially causing it to leak. + # To get around this we pass a function that on invalidations looks ups + # the RoomsForUser entry in the cache, rather than keeping a reference + # to self around in the callback. + self.invalidate_all_cb = _Invalidation(rules_for_room_cache, room_id) + + @defer.inlineCallbacks + def get_rules(self, event, context): + """Given an event context return the rules for all users who are + currently in the room. + """ + state_group = context.state_group + + if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() + defer.returnValue(self.rules_by_user) + + with (yield self.linearizer.queue(())): + if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() + defer.returnValue(self.rules_by_user) + + self.room_push_rule_cache_metrics.inc_misses() + + ret_rules_by_user = {} + missing_member_event_ids = {} + if state_group and self.state_group == context.prev_group: + # If we have a simple delta then we can reuse most of the previous + # results. + ret_rules_by_user = self.rules_by_user + current_state_ids = context.delta_ids + + push_rules_delta_state_cache_metric.inc_hits() + else: + current_state_ids = context.current_state_ids + push_rules_delta_state_cache_metric.inc_misses() + + push_rules_state_size_counter.inc_by(len(current_state_ids)) + + logger.debug( + "Looking for member changes in %r %r", state_group, current_state_ids + ) + + # Loop through to see which member events we've seen and have rules + # for and which we need to fetch + for key in current_state_ids: + typ, user_id = key + if typ != EventTypes.Member: + continue + + if user_id in self.uninteresting_user_set: + continue + + if not self.is_mine_id(user_id): + self.uninteresting_user_set.add(user_id) + continue + + if self.store.get_if_app_services_interested_in_user(user_id): + self.uninteresting_user_set.add(user_id) + continue + + event_id = current_state_ids[key] + + res = self.member_map.get(event_id, None) + if res: + user_id, state = res + if state == Membership.JOIN: + rules = self.rules_by_user.get(user_id, None) + if rules: + ret_rules_by_user[user_id] = rules + continue + + # If a user has left a room we remove their push rule. If they + # joined then we readd it later in _update_rules_with_member_event_ids + ret_rules_by_user.pop(user_id, None) + missing_member_event_ids[user_id] = event_id + + if missing_member_event_ids: + # If we have some memebr events we haven't seen, look them up + # and fetch push rules for them if appropriate. + logger.debug("Found new member events %r", missing_member_event_ids) + yield self._update_rules_with_member_event_ids( + ret_rules_by_user, missing_member_event_ids, state_group, event + ) + else: + # The push rules didn't change but lets update the cache anyway + self.update_cache( + self.sequence, + members={}, # There were no membership changes + rules_by_user=ret_rules_by_user, + state_group=state_group + ) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug( + "Returning push rules for %r %r", + self.room_id, ret_rules_by_user.keys(), + ) + defer.returnValue(ret_rules_by_user) + + @defer.inlineCallbacks + def _update_rules_with_member_event_ids(self, ret_rules_by_user, member_event_ids, + state_group, event): + """Update the partially filled rules_by_user dict by fetching rules for + any newly joined users in the `member_event_ids` list. + + Args: + ret_rules_by_user (dict): Partiallly filled dict of push rules. Gets + updated with any new rules. + member_event_ids (list): List of event ids for membership events that + have happened since the last time we filled rules_by_user + state_group: The state group we are currently computing push rules + for. Used when updating the cache. + """ + sequence = self.sequence + + rows = yield self.store._simple_select_many_batch( + table="room_memberships", + column="event_id", + iterable=member_event_ids.values(), + retcols=('user_id', 'membership', 'event_id'), + keyvalues={}, + batch_size=500, + desc="_get_rules_for_member_event_ids", + ) + + members = { + row["event_id"]: (row["user_id"], row["membership"]) + for row in rows + } + + # If the event is a join event then it will be in current state evnts + # map but not in the DB, so we have to explicitly insert it. + if event.type == EventTypes.Member: + for event_id in member_event_ids.itervalues(): + if event_id == event.event_id: + members[event_id] = (event.state_key, event.membership) + + if logger.isEnabledFor(logging.DEBUG): + logger.debug("Found members %r: %r", self.room_id, members.values()) + + interested_in_user_ids = set( + user_id for user_id, membership in members.itervalues() + if membership == Membership.JOIN + ) + + logger.debug("Joined: %r", interested_in_user_ids) + + if_users_with_pushers = yield self.store.get_if_users_have_pushers( + interested_in_user_ids, + on_invalidate=self.invalidate_all_cb, + ) + + user_ids = set( + uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher + ) + + logger.debug("With pushers: %r", user_ids) + + users_with_receipts = yield self.store.get_users_with_read_receipts_in_room( + self.room_id, on_invalidate=self.invalidate_all_cb, + ) + + logger.debug("With receipts: %r", users_with_receipts) + + # any users with pushers must be ours: they have pushers + for uid in users_with_receipts: + if uid in interested_in_user_ids: + user_ids.add(uid) + + rules_by_user = yield self.store.bulk_get_push_rules( + user_ids, on_invalidate=self.invalidate_all_cb, + ) + + ret_rules_by_user.update( + item for item in rules_by_user.iteritems() if item[0] is not None + ) + + self.update_cache(sequence, members, ret_rules_by_user, state_group) + + def invalidate_all(self): + # Note: Don't hand this function directly to an invalidation callback + # as it keeps a reference to self and will stop this instance from being + # GC'd if it gets dropped from the rules_to_user cache. Instead use + # `self.invalidate_all_cb` + logger.debug("Invalidating RulesForRoom for %r", self.room_id) + self.sequence += 1 + self.state_group = object() + self.member_map = {} + self.rules_by_user = {} + push_rules_invalidation_counter.inc() + + def update_cache(self, sequence, members, rules_by_user, state_group): + if sequence == self.sequence: + self.member_map.update(members) + self.rules_by_user = rules_by_user + self.state_group = state_group + + +class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))): + # We rely on _CacheContext implementing __eq__ and __hash__ sensibly, + # which namedtuple does for us (i.e. two _CacheContext are the same if + # their caches and keys match). This is important in particular to + # dedupe when we add callbacks to lru cache nodes, otherwise the number + # of callbacks would grow. + def __call__(self): + rules = self.cache.get(self.room_id, None, update_metrics=False) + if rules: + rules.invalidate_all() diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index 2eb325c7c7..ba7286cb72 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -21,7 +21,6 @@ import logging from synapse.util.metrics import Measure from synapse.util.logcontext import LoggingContext -from mailer import Mailer logger = logging.getLogger(__name__) @@ -56,8 +55,10 @@ class EmailPusher(object): This shares quite a bit of code with httpusher: it would be good to factor out the common parts """ - def __init__(self, hs, pusherdict): + def __init__(self, hs, pusherdict, mailer): self.hs = hs + self.mailer = mailer + self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() self.pusher_id = pusherdict['id'] @@ -73,23 +74,16 @@ class EmailPusher(object): self.processing = False - if self.hs.config.email_enable_notifs: - if 'data' in pusherdict and 'brand' in pusherdict['data']: - app_name = pusherdict['data']['brand'] - else: - app_name = self.hs.config.email_app_name - - self.mailer = Mailer(self.hs, app_name) - else: - self.mailer = None - @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() + try: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + except Exception: + logger.exception("Error starting email pusher") def on_stop(self): if self.timed_call: @@ -130,7 +124,7 @@ class EmailPusher(object): starting_max_ordering = self.max_stream_ordering try: yield self._unsafe_process() - except: + except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: break @@ -218,7 +212,8 @@ class EmailPusher(object): ) def seconds_until(self, ts_msec): - return (ts_msec - self.clock.time_msec()) / 1000 + secs = (ts_msec - self.clock.time_msec()) / 1000 + return max(secs, 0) def get_room_throttle_ms(self, room_id): if room_id in self.throttle_params: diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index c0f8176e3d..b077e1a446 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,21 +13,30 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - -from synapse.push import PusherConfigException +import logging from twisted.internet import defer, reactor from twisted.internet.error import AlreadyCalled, AlreadyCancelled -import logging -import push_rule_evaluator -import push_tools - +from . import push_rule_evaluator +from . import push_tools +import synapse +from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) +metrics = synapse.metrics.get_metrics_for(__name__) + +http_push_processed_counter = metrics.register_counter( + "http_pushes_processed", +) + +http_push_failed_counter = metrics.register_counter( + "http_pushes_failed", +) + class HttpPusher(object): INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes @@ -84,7 +94,10 @@ class HttpPusher(object): @defer.inlineCallbacks def on_started(self): - yield self._process() + try: + yield self._process() + except Exception: + logger.exception("Error starting http pusher") @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): @@ -131,7 +144,7 @@ class HttpPusher(object): starting_max_ordering = self.max_stream_ordering try: yield self._unsafe_process() - except: + except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: break @@ -151,9 +164,16 @@ class HttpPusher(object): self.user_id, self.last_stream_ordering, self.max_stream_ordering ) + logger.info( + "Processing %i unprocessed push actions for %s starting at " + "stream_ordering %s", + len(unprocessed), self.name, self.last_stream_ordering, + ) + for push_action in unprocessed: processed = yield self._process_one(push_action) if processed: + http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action['stream_ordering'] yield self.store.update_pusher_last_stream_ordering_and_success( @@ -168,6 +188,7 @@ class HttpPusher(object): self.failing_since ) else: + http_push_failed_counter.inc() if not self.failing_since: self.failing_since = self.clock.time_msec() yield self.store.update_pusher_failing_since( @@ -244,6 +265,26 @@ class HttpPusher(object): @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): + if self.data.get('format') == 'event_id_only': + d = { + 'notification': { + 'event_id': event.event_id, + 'room_id': event.room_id, + 'counts': { + 'unread': badge, + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + } + ] + } + } + defer.returnValue(d) + ctx = yield push_tools.get_context_for_event( self.store, self.state_handler, event, self.user_id ) @@ -275,7 +316,7 @@ class HttpPusher(object): if event.type == 'm.room.member': d['notification']['membership'] = event.content['membership'] d['notification']['user_is_target'] = event.state_key == self.user_id - if 'content' in event: + if self.hs.config.push_include_content and 'content' in event: d['notification']['content'] = event.content # We no longer send aliases separately, instead, we send the human @@ -294,8 +335,11 @@ class HttpPusher(object): defer.returnValue([]) try: resp = yield self.http_client.post_json_get_json(self.url, notification_dict) - except: - logger.warn("Failed to push %s ", self.url) + except Exception: + logger.warn( + "Failed to push event %s to %s", + event.event_id, self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: @@ -304,7 +348,7 @@ class HttpPusher(object): @defer.inlineCallbacks def _send_badge(self, badge): - logger.info("Sending updated badge count %d to %r", badge, self.user_id) + logger.info("Sending updated badge count %d to %s", badge, self.name) d = { 'notification': { 'id': '', @@ -325,8 +369,11 @@ class HttpPusher(object): } try: resp = yield self.http_client.post_json_get_json(self.url, d) - except: - logger.exception("Failed to push %s ", self.url) + except Exception: + logger.warn( + "Failed to send badge count to %s", + self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index 62d794f22b..b5cd9b426a 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -78,23 +78,17 @@ ALLOWED_ATTRS = { class Mailer(object): - def __init__(self, hs, app_name): + def __init__(self, hs, app_name, notif_template_html, notif_template_text): self.hs = hs + self.notif_template_html = notif_template_html + self.notif_template_text = notif_template_text + self.store = self.hs.get_datastore() self.macaroon_gen = self.hs.get_macaroon_generator() self.state_handler = self.hs.get_state_handler() - loader = jinja2.FileSystemLoader(self.hs.config.email_template_dir) self.app_name = app_name + logger.info("Created Mailer for app_name %s" % app_name) - env = jinja2.Environment(loader=loader) - env.filters["format_ts"] = format_ts_filter - env.filters["mxc_to_http"] = self.mxc_to_http_filter - self.notif_template_html = env.get_template( - self.hs.config.email_notif_template_html - ) - self.notif_template_text = env.get_template( - self.hs.config.email_notif_template_text - ) @defer.inlineCallbacks def send_notification_mail(self, app_id, user_id, email_address, @@ -139,7 +133,7 @@ class Mailer(object): @defer.inlineCallbacks def _fetch_room_state(room_id): - room_state = yield self.state_handler.get_current_state_ids(room_id) + room_state = yield self.store.get_current_state_ids(room_id) state_by_room[room_id] = room_state # Run at most 3 of these at once: sync does 10 at a time but email @@ -200,7 +194,11 @@ class Mailer(object): yield sendmail( self.hs.config.email_smtp_host, raw_from, raw_to, multipart_msg.as_string(), - port=self.hs.config.email_smtp_port + port=self.hs.config.email_smtp_port, + requireAuthentication=self.hs.config.email_smtp_user is not None, + username=self.hs.config.email_smtp_user, + password=self.hs.config.email_smtp_pass, + requireTransportSecurity=self.hs.config.require_transport_security ) @defer.inlineCallbacks @@ -477,28 +475,6 @@ class Mailer(object): urllib.urlencode(params), ) - def mxc_to_http_filter(self, value, width, height, resize_method="crop"): - if value[0:6] != "mxc://": - return "" - - serverAndMediaId = value[6:] - fragment = None - if '#' in serverAndMediaId: - (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1) - fragment = "#" + fragment - - params = { - "width": width, - "height": height, - "method": resize_method, - } - return "%s_matrix/media/v1/thumbnail/%s?%s%s" % ( - self.hs.config.public_baseurl, - serverAndMediaId, - urllib.urlencode(params), - fragment or "", - ) - def safe_markup(raw_html): return jinja2.Markup(bleach.linkify(bleach.clean( @@ -539,3 +515,52 @@ def string_ordinal_total(s): def format_ts_filter(value, format): return time.strftime(format, time.localtime(value / 1000)) + + +def load_jinja2_templates(config): + """Load the jinja2 email templates from disk + + Returns: + (notif_template_html, notif_template_text) + """ + logger.info("loading jinja2") + + loader = jinja2.FileSystemLoader(config.email_template_dir) + env = jinja2.Environment(loader=loader) + env.filters["format_ts"] = format_ts_filter + env.filters["mxc_to_http"] = _create_mxc_to_http_filter(config) + + notif_template_html = env.get_template( + config.email_notif_template_html + ) + notif_template_text = env.get_template( + config.email_notif_template_text + ) + + return notif_template_html, notif_template_text + + +def _create_mxc_to_http_filter(config): + def mxc_to_http_filter(value, width, height, resize_method="crop"): + if value[0:6] != "mxc://": + return "" + + serverAndMediaId = value[6:] + fragment = None + if '#' in serverAndMediaId: + (serverAndMediaId, fragment) = serverAndMediaId.split('#', 1) + fragment = "#" + fragment + + params = { + "width": width, + "height": height, + "method": resize_method, + } + return "%s_matrix/media/v1/thumbnail/%s?%s%s" % ( + config.public_baseurl, + serverAndMediaId, + urllib.urlencode(params), + fragment or "", + ) + + return mxc_to_http_filter diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 4db76f18bd..3601f2d365 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -17,6 +18,7 @@ import logging import re from synapse.types import UserID +from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache from synapse.util.caches.lrucache import LruCache logger = logging.getLogger(__name__) @@ -28,6 +30,21 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") def _room_member_count(ev, condition, room_member_count): + return _test_ineq_condition(condition, room_member_count) + + +def _sender_notification_permission(ev, condition, sender_power_level, power_levels): + notif_level_key = condition.get('key') + if notif_level_key is None: + return False + + notif_levels = power_levels.get('notifications', {}) + room_notif_level = notif_levels.get(notif_level_key, 50) + + return sender_power_level >= room_notif_level + + +def _test_ineq_condition(condition, number): if 'is' not in condition: return False m = INEQUALITY_EXPR.match(condition['is']) @@ -40,15 +57,15 @@ def _room_member_count(ev, condition, room_member_count): rhs = int(rhs) if ineq == '' or ineq == '==': - return room_member_count == rhs + return number == rhs elif ineq == '<': - return room_member_count < rhs + return number < rhs elif ineq == '>': - return room_member_count > rhs + return number > rhs elif ineq == '>=': - return room_member_count >= rhs + return number >= rhs elif ineq == '<=': - return room_member_count <= rhs + return number <= rhs else: return False @@ -64,9 +81,11 @@ def tweaks_for_actions(actions): class PushRuleEvaluatorForEvent(object): - def __init__(self, event, room_member_count): + def __init__(self, event, room_member_count, sender_power_level, power_levels): self._event = event self._room_member_count = room_member_count + self._sender_power_level = sender_power_level + self._power_levels = power_levels # Maps strings of e.g. 'content.body' -> event["content"]["body"] self._value_cache = _flatten_dict(event) @@ -80,6 +99,10 @@ class PushRuleEvaluatorForEvent(object): return _room_member_count( self._event, condition, self._room_member_count ) + elif condition['kind'] == 'sender_notification_permission': + return _sender_notification_permission( + self._event, condition, self._sender_power_level, self._power_levels, + ) else: return True @@ -125,6 +148,11 @@ class PushRuleEvaluatorForEvent(object): return self._value_cache.get(dotted_key, None) +# Caches (glob, word_boundary) -> regex for push. See _glob_matches +regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR) +register_cache("regex_push_cache", regex_cache) + + def _glob_matches(glob, value, word_boundary=False): """Tests if value matches glob. @@ -137,47 +165,78 @@ def _glob_matches(glob, value, word_boundary=False): Returns: bool """ + try: - if IS_GLOB.search(glob): - r = re.escape(glob) - - r = r.replace(r'\*', '.*?') - r = r.replace(r'\?', '.') - - # handle [abc], [a-z] and [!a-z] style ranges. - r = GLOB_REGEX.sub( - lambda x: ( - '[%s%s]' % ( - x.group(1) and '^' or '', - x.group(2).replace(r'\\\-', '-') - ) - ), - r, - ) - if word_boundary: - r = r"\b%s\b" % (r,) - r = _compile_regex(r) - - return r.search(value) - else: - r = r + "$" - r = _compile_regex(r) - - return r.match(value) - elif word_boundary: - r = re.escape(glob) - r = r"\b%s\b" % (r,) - r = _compile_regex(r) - - return r.search(value) - else: - return value.lower() == glob.lower() + r = regex_cache.get((glob, word_boundary), None) + if not r: + r = _glob_to_re(glob, word_boundary) + regex_cache[(glob, word_boundary)] = r + return r.search(value) except re.error: logger.warn("Failed to parse glob to regex: %r", glob) return False -def _flatten_dict(d, prefix=[], result={}): +def _glob_to_re(glob, word_boundary): + """Generates regex for a given glob. + + Args: + glob (string) + word_boundary (bool): Whether to match against word boundaries or entire + string. Defaults to False. + + Returns: + regex object + """ + if IS_GLOB.search(glob): + r = re.escape(glob) + + r = r.replace(r'\*', '.*?') + r = r.replace(r'\?', '.') + + # handle [abc], [a-z] and [!a-z] style ranges. + r = GLOB_REGEX.sub( + lambda x: ( + '[%s%s]' % ( + x.group(1) and '^' or '', + x.group(2).replace(r'\\\-', '-') + ) + ), + r, + ) + if word_boundary: + r = _re_word_boundary(r) + + return re.compile(r, flags=re.IGNORECASE) + else: + r = "^" + r + "$" + + return re.compile(r, flags=re.IGNORECASE) + elif word_boundary: + r = re.escape(glob) + r = _re_word_boundary(r) + + return re.compile(r, flags=re.IGNORECASE) + else: + r = "^" + re.escape(glob) + "$" + return re.compile(r, flags=re.IGNORECASE) + + +def _re_word_boundary(r): + """ + Adds word boundary characters to the start and end of an + expression to require that the match occur as a whole word, + but do so respecting the fact that strings starting or ending + with non-word characters will change word boundaries. + """ + # we can't use \b as it chokes on unicode. however \W seems to be okay + # as shorthand for [^0-9A-Za-z_]. + return r"(^|\W)%s(\W|$)" % (r,) + + +def _flatten_dict(d, prefix=[], result=None): + if result is None: + result = {} for key, value in d.items(): if isinstance(value, basestring): result[".".join(prefix + [key])] = value.lower() @@ -185,16 +244,3 @@ def _flatten_dict(d, prefix=[], result={}): _flatten_dict(value, prefix=(prefix + [key]), result=result) return result - - -regex_cache = LruCache(5000) - - -def _compile_regex(regex_str): - r = regex_cache.get(regex_str, None) - if r: - return r - - r = re.compile(regex_str, flags=re.IGNORECASE) - regex_cache[regex_str] = r - return r diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index a27476bbad..6835f54e97 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -17,15 +17,12 @@ from twisted.internet import defer from synapse.push.presentable_names import ( calculate_room_name, name_from_member_event ) -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred @defer.inlineCallbacks def get_badge_count(store, user_id): - invites, joins = yield preserve_context_over_deferred(defer.gatherResults([ - preserve_fn(store.get_invited_rooms_for_user)(user_id), - preserve_fn(store.get_rooms_for_user)(user_id), - ], consumeErrors=True)) + invites = yield store.get_invited_rooms_for_user(user_id) + joins = yield store.get_rooms_for_user(user_id) my_receipts_by_room = yield store.get_receipts_for_user( user_id, "m.read", @@ -33,13 +30,13 @@ def get_badge_count(store, user_id): badge = len(invites) - for r in joins: - if r.room_id in my_receipts_by_room: - last_unread_event_id = my_receipts_by_room[r.room_id] + for room_id in joins: + if room_id in my_receipts_by_room: + last_unread_event_id = my_receipts_by_room[room_id] notifs = yield ( store.get_unread_event_push_actions_by_room_for_user( - r.room_id, user_id, last_unread_event_id + room_id, user_id, last_unread_event_id ) ) # return one badge count per conversation, as count per diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index de9c33b936..5aa6667e91 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -13,7 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from httppusher import HttpPusher +from .httppusher import HttpPusher import logging logger = logging.getLogger(__name__) @@ -26,22 +26,54 @@ logger = logging.getLogger(__name__) # process works fine) try: from synapse.push.emailpusher import EmailPusher -except: + from synapse.push.mailer import Mailer, load_jinja2_templates +except Exception: pass -def create_pusher(hs, pusherdict): - logger.info("trying to create_pusher for %r", pusherdict) +class PusherFactory(object): + def __init__(self, hs): + self.hs = hs - PUSHER_TYPES = { - "http": HttpPusher, - } + self.pusher_types = { + "http": HttpPusher, + } - logger.info("email enable notifs: %r", hs.config.email_enable_notifs) - if hs.config.email_enable_notifs: - PUSHER_TYPES["email"] = EmailPusher - logger.info("defined email pusher type") + logger.info("email enable notifs: %r", hs.config.email_enable_notifs) + if hs.config.email_enable_notifs: + self.mailers = {} # app_name -> Mailer - if pusherdict['kind'] in PUSHER_TYPES: - logger.info("found pusher") - return PUSHER_TYPES[pusherdict['kind']](hs, pusherdict) + templates = load_jinja2_templates(hs.config) + self.notif_template_html, self.notif_template_text = templates + + self.pusher_types["email"] = self._create_email_pusher + + logger.info("defined email pusher type") + + def create_pusher(self, pusherdict): + logger.info("trying to create_pusher for %r", pusherdict) + + if pusherdict['kind'] in self.pusher_types: + logger.info("found pusher") + return self.pusher_types[pusherdict['kind']](self.hs, pusherdict) + + def _create_email_pusher(self, _hs, pusherdict): + app_name = self._app_name_from_pusherdict(pusherdict) + mailer = self.mailers.get(app_name) + if not mailer: + mailer = Mailer( + hs=self.hs, + app_name=app_name, + notif_template_html=self.notif_template_html, + notif_template_text=self.notif_template_text, + ) + self.mailers[app_name] = mailer + return EmailPusher(self.hs, pusherdict, mailer) + + def _app_name_from_pusherdict(self, pusherdict): + if 'data' in pusherdict and 'brand' in pusherdict['data']: + app_name = pusherdict['data']['brand'] + else: + app_name = self.hs.config.email_app_name + + return app_name diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 3837be523d..750d11ca38 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -14,13 +14,13 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -import pusher -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred +from synapse.push.pusher import PusherFactory from synapse.util.async import run_on_reactor - -import logging +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -28,6 +28,7 @@ logger = logging.getLogger(__name__) class PusherPool: def __init__(self, _hs): self.hs = _hs + self.pusher_factory = PusherFactory(_hs) self.start_pushers = _hs.config.start_pushers self.store = self.hs.get_datastore() self.clock = self.hs.get_clock() @@ -48,7 +49,7 @@ class PusherPool: # will then get pulled out of the database, # recreated, added and started: this means we have only one # code path adding pushers. - pusher.create_pusher(self.hs, { + self.pusher_factory.create_pusher({ "id": None, "user_name": user_id, "kind": kind, @@ -102,19 +103,25 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user(self, user_id, except_access_token_id=None): - all = yield self.store.get_all_pushers() - logger.info( - "Removing all pushers for user %s except access tokens id %r", - user_id, except_access_token_id - ) - for p in all: - if p['user_name'] == user_id and p['access_token'] != except_access_token_id: + def remove_pushers_by_access_token(self, user_id, access_tokens): + """Remove the pushers for a given user corresponding to a set of + access_tokens. + + Args: + user_id (str): user to remove pushers for + access_tokens (Iterable[int]): access token *ids* to remove pushers + for + """ + tokens = set(access_tokens) + for p in (yield self.store.get_pushers_by_user_id(user_id)): + if p['access_token'] in tokens: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] ) - yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher( + p['app_id'], p['pushkey'], p['user_name'], + ) @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): @@ -130,13 +137,16 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_notifications)( - min_stream_id, max_stream_id + run_in_background( + p.on_new_notifications, + min_stream_id, max_stream_id, ) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) - except: + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) + except Exception: logger.exception("Exception in pusher on_new_notifications") @defer.inlineCallbacks @@ -157,11 +167,16 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) + run_in_background( + p.on_new_receipts, + min_stream_id, max_stream_id, + ) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) - except: + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) + except Exception: logger.exception("Exception in pusher on_new_receipts") @defer.inlineCallbacks @@ -186,8 +201,8 @@ class PusherPool: logger.info("Starting %d pushers", len(pushers)) for pusherdict in pushers: try: - p = pusher.create_pusher(self.hs, pusherdict) - except: + p = self.pusher_factory.create_pusher(pusherdict) + except Exception: logger.exception("Couldn't start a pusher: caught Exception") continue if p: @@ -200,7 +215,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - preserve_fn(p.on_started)() + run_in_background(p.on_started) logger.info("Started pushers") |