diff options
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/action_generator.py | 12 | ||||
-rw-r--r-- | synapse/push/baserules.py | 28 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 152 | ||||
-rw-r--r-- | synapse/push/clientformat.py | 6 | ||||
-rw-r--r-- | synapse/push/emailpusher.py | 24 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 74 | ||||
-rw-r--r-- | synapse/push/mailer.py | 36 | ||||
-rw-r--r-- | synapse/push/presentable_names.py | 8 | ||||
-rw-r--r-- | synapse/push/push_rule_evaluator.py | 56 | ||||
-rw-r--r-- | synapse/push/push_tools.py | 5 | ||||
-rw-r--r-- | synapse/push/pusher.py | 7 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 61 |
12 files changed, 332 insertions, 137 deletions
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index fe09d50d55..a5de75c48a 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -13,13 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer +import logging -from .bulk_push_rule_evaluator import BulkPushRuleEvaluator +from twisted.internet import defer from synapse.util.metrics import Measure -import logging +from .bulk_push_rule_evaluator import BulkPushRuleEvaluator logger = logging.getLogger(__name__) @@ -40,10 +40,6 @@ class ActionGenerator(object): @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "action_for_event_by_user"): - actions_by_user = yield self.bulk_evaluator.action_for_event_by_user( + yield self.bulk_evaluator.action_for_event_by_user( event, context ) - - context.push_actions = [ - (uid, actions) for uid, actions in actions_by_user.iteritems() - ] diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py index 85effdfa46..8f0682c948 100644 --- a/synapse/push/baserules.py +++ b/synapse/push/baserules.py @@ -1,4 +1,5 @@ # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP import copy +from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP + def list_with_base_rules(rawrules): """Combine the list of rules set by the user with the default push rules @@ -38,7 +40,7 @@ def list_with_base_rules(rawrules): rawrules = [r for r in rawrules if r['priority_class'] >= 0] # shove the server default rules for each kind onto the end of each - current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] + current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1] ruleslist.extend(make_base_prepend_rules( PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules @@ -238,6 +240,28 @@ BASE_APPEND_OVERRIDE_RULES = [ } ] }, + { + 'rule_id': 'global/override/.m.rule.roomnotif', + 'conditions': [ + { + 'kind': 'event_match', + 'key': 'content.body', + 'pattern': '@room', + '_id': '_roomnotif_content', + }, + { + 'kind': 'sender_notification_permission', + 'key': 'room', + '_id': '_roomnotif_pl', + }, + ], + 'actions': [ + 'notify', { + 'set_tweak': 'highlight', + 'value': True, + } + ] + } ] diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 9a96e6fe8f..1d14d3639c 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,18 +15,22 @@ # limitations under the License. import logging +from collections import namedtuple -from twisted.internet import defer +from six import iteritems, itervalues -from .push_rule_evaluator import PushRuleEvaluatorForEvent +from prometheus_client import Counter + +from twisted.internet import defer -from synapse.visibility import filter_events_for_clients_context from synapse.api.constants import EventTypes, Membership -from synapse.util.caches.descriptors import cached +from synapse.event_auth import get_user_power_level +from synapse.state import POWER_KEY from synapse.util.async import Linearizer +from synapse.util.caches import register_cache +from synapse.util.caches.descriptors import cached -from collections import namedtuple - +from .push_rule_evaluator import PushRuleEvaluatorForEvent logger = logging.getLogger(__name__) @@ -33,6 +38,20 @@ logger = logging.getLogger(__name__) rules_by_room = {} +push_rules_invalidation_counter = Counter( + "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "") +push_rules_state_size_counter = Counter( + "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "") + +# Measures whether we use the fast path of using state deltas, or if we have to +# recalculate from scratch +push_rules_delta_state_cache_metric = register_cache( + "cache", + "push_rules_delta_state_cache_metric", + cache=[], # Meaningless size, as this isn't a cache that stores values +) + + class BulkPushRuleEvaluator(object): """Calculates the outcome of push rules for an event for all users in the room at once. @@ -41,6 +60,13 @@ class BulkPushRuleEvaluator(object): def __init__(self, hs): self.hs = hs self.store = hs.get_datastore() + self.auth = hs.get_auth() + + self.room_push_rule_cache_metrics = register_cache( + "cache", + "room_push_rule_cache", + cache=[], # Meaningless size, as this isn't a cache that stores values + ) @defer.inlineCallbacks def _get_rules_for_event(self, event, context): @@ -79,37 +105,69 @@ class BulkPushRuleEvaluator(object): # It's important that RulesForRoom gets added to self._get_rules_for_room.cache # before any lookup methods get called on it as otherwise there may be # a race if invalidate_all gets called (which assumes its in the cache) - return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache) + return RulesForRoom( + self.hs, room_id, self._get_rules_for_room.cache, + self.room_push_rule_cache_metrics, + ) + + @defer.inlineCallbacks + def _get_power_levels_and_sender_level(self, event, context): + prev_state_ids = yield context.get_prev_state_ids(self.store) + pl_event_id = prev_state_ids.get(POWER_KEY) + if pl_event_id: + # fastpath: if there's a power level event, that's all we need, and + # not having a power level event is an extreme edge case + pl_event = yield self.store.get_event(pl_event_id) + auth_events = {POWER_KEY: pl_event} + else: + auth_events_ids = yield self.auth.compute_auth_events( + event, prev_state_ids, for_verification=False, + ) + auth_events = yield self.store.get_events(auth_events_ids) + auth_events = { + (e.type, e.state_key): e for e in itervalues(auth_events) + } + + sender_level = get_user_power_level(event.sender, auth_events) + + pl_event = auth_events.get(POWER_KEY) + + defer.returnValue((pl_event.content if pl_event else {}, sender_level)) @defer.inlineCallbacks def action_for_event_by_user(self, event, context): - """Given an event and context, evaluate the push rules and return - the results + """Given an event and context, evaluate the push rules and insert the + results into the event_push_actions_staging table. Returns: - dict of user_id -> action + Deferred """ rules_by_user = yield self._get_rules_for_event(event, context) actions_by_user = {} - # None of these users can be peeking since this list of users comes - # from the set of users in the room, so we know for sure they're all - # actually in the room. - user_tuples = [(u, False) for u in rules_by_user] - - filtered_by_user = yield filter_events_for_clients_context( - self.store, user_tuples, [event], {event.event_id: context} - ) - room_members = yield self.store.get_joined_users_from_context( event, context ) - evaluator = PushRuleEvaluatorForEvent(event, len(room_members)) + (power_levels, sender_power_level) = ( + yield self._get_power_levels_and_sender_level(event, context) + ) + + evaluator = PushRuleEvaluatorForEvent( + event, len(room_members), sender_power_level, power_levels, + ) condition_cache = {} - for uid, rules in rules_by_user.iteritems(): + for uid, rules in iteritems(rules_by_user): + if event.sender == uid: + continue + + if not event.is_state(): + is_ignored = yield self.store.is_ignored_by(event.sender, uid) + if is_ignored: + continue + display_name = None profile_info = room_members.get(uid) if profile_info: @@ -121,13 +179,6 @@ class BulkPushRuleEvaluator(object): if event.type == EventTypes.Member and event.state_key == uid: display_name = event.content.get("displayname", None) - filtered = filtered_by_user[uid] - if len(filtered) == 0: - continue - - if filtered[0].sender == uid: - continue - for rule in rules: if 'enabled' in rule and not rule['enabled']: continue @@ -138,9 +189,16 @@ class BulkPushRuleEvaluator(object): if matches: actions = [x for x in rule['actions'] if x != 'dont_notify'] if actions and 'notify' in actions: + # Push rules say we should notify the user of this event actions_by_user[uid] = actions break - defer.returnValue(actions_by_user) + + # Mark in the DB staging area the push actions for users who should be + # notified for this event. (This will then get handled when we persist + # the event) + yield self.store.add_push_actions_to_staging( + event.event_id, actions_by_user, + ) def _condition_checker(evaluator, conditions, uid, display_name, cache): @@ -170,17 +228,19 @@ class RulesForRoom(object): the entire cache for the room. """ - def __init__(self, hs, room_id, rules_for_room_cache): + def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics): """ Args: hs (HomeServer) room_id (str) rules_for_room_cache(Cache): The cache object that caches these RoomsForUser objects. + room_push_rule_cache_metrics (CacheMetric) """ self.room_id = room_id self.is_mine_id = hs.is_mine_id self.store = hs.get_datastore() + self.room_push_rule_cache_metrics = room_push_rule_cache_metrics self.linearizer = Linearizer(name="rules_for_room") @@ -222,11 +282,19 @@ class RulesForRoom(object): """ state_group = context.state_group + if state_group and self.state_group == state_group: + logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() + defer.returnValue(self.rules_by_user) + with (yield self.linearizer.queue(())): if state_group and self.state_group == state_group: logger.debug("Using cached rules for %r", self.room_id) + self.room_push_rule_cache_metrics.inc_hits() defer.returnValue(self.rules_by_user) + self.room_push_rule_cache_metrics.inc_misses() + ret_rules_by_user = {} missing_member_event_ids = {} if state_group and self.state_group == context.prev_group: @@ -234,8 +302,13 @@ class RulesForRoom(object): # results. ret_rules_by_user = self.rules_by_user current_state_ids = context.delta_ids + + push_rules_delta_state_cache_metric.inc_hits() else: - current_state_ids = context.current_state_ids + current_state_ids = yield context.get_current_state_ids(self.store) + push_rules_delta_state_cache_metric.inc_misses() + + push_rules_state_size_counter.inc(len(current_state_ids)) logger.debug( "Looking for member changes in %r %r", state_group, current_state_ids @@ -282,6 +355,14 @@ class RulesForRoom(object): yield self._update_rules_with_member_event_ids( ret_rules_by_user, missing_member_event_ids, state_group, event ) + else: + # The push rules didn't change but lets update the cache anyway + self.update_cache( + self.sequence, + members={}, # There were no membership changes + rules_by_user=ret_rules_by_user, + state_group=state_group + ) if logger.isEnabledFor(logging.DEBUG): logger.debug( @@ -324,7 +405,7 @@ class RulesForRoom(object): # If the event is a join event then it will be in current state evnts # map but not in the DB, so we have to explicitly insert it. if event.type == EventTypes.Member: - for event_id in member_event_ids.itervalues(): + for event_id in itervalues(member_event_ids): if event_id == event.event_id: members[event_id] = (event.state_key, event.membership) @@ -332,7 +413,7 @@ class RulesForRoom(object): logger.debug("Found members %r: %r", self.room_id, members.values()) interested_in_user_ids = set( - user_id for user_id, membership in members.itervalues() + user_id for user_id, membership in itervalues(members) if membership == Membership.JOIN ) @@ -344,7 +425,7 @@ class RulesForRoom(object): ) user_ids = set( - uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher + uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher ) logger.debug("With pushers: %r", user_ids) @@ -365,7 +446,7 @@ class RulesForRoom(object): ) ret_rules_by_user.update( - item for item in rules_by_user.iteritems() if item[0] is not None + item for item in iteritems(rules_by_user) if item[0] is not None ) self.update_cache(sequence, members, ret_rules_by_user, state_group) @@ -380,6 +461,7 @@ class RulesForRoom(object): self.state_group = object() self.member_map = {} self.rules_by_user = {} + push_rules_invalidation_counter.inc() def update_cache(self, sequence, members, rules_by_user, state_group): if sequence == self.sequence: diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py index e0331b2d2d..ecbf364a5e 100644 --- a/synapse/push/clientformat.py +++ b/synapse/push/clientformat.py @@ -13,12 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.push.rulekinds import ( - PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP -) - import copy +from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP + def format_push_rules_for_user(user, ruleslist): """Converts a list of rawrules and a enabled map into nested dictionaries diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py index a69dda7b09..d746371420 100644 --- a/synapse/push/emailpusher.py +++ b/synapse/push/emailpusher.py @@ -13,14 +13,13 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer, reactor -from twisted.internet.error import AlreadyCalled, AlreadyCancelled - import logging -from synapse.util.metrics import Measure -from synapse.util.logcontext import LoggingContext +from twisted.internet import defer +from twisted.internet.error import AlreadyCalled, AlreadyCancelled +from synapse.util.logcontext import LoggingContext +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -77,10 +76,13 @@ class EmailPusher(object): @defer.inlineCallbacks def on_started(self): if self.mailer is not None: - self.throttle_params = yield self.store.get_throttle_params_by_room( - self.pusher_id - ) - yield self._process() + try: + self.throttle_params = yield self.store.get_throttle_params_by_room( + self.pusher_id + ) + yield self._process() + except Exception: + logger.exception("Error starting email pusher") def on_stop(self): if self.timed_call: @@ -121,7 +123,7 @@ class EmailPusher(object): starting_max_ordering = self.max_stream_ordering try: yield self._unsafe_process() - except: + except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: break @@ -196,7 +198,7 @@ class EmailPusher(object): self.timed_call = None if soonest_due_at is not None: - self.timed_call = reactor.callLater( + self.timed_call = self.hs.get_reactor().callLater( self.seconds_until(soonest_due_at), self.on_timer ) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 8a5d473108..81e18bcf7d 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,21 +13,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging -from synapse.push import PusherConfigException +from prometheus_client import Counter -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled -import logging -import push_rule_evaluator -import push_tools - +from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure +from . import push_rule_evaluator, push_tools + logger = logging.getLogger(__name__) +http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "") + +http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "") + class HttpPusher(object): INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes @@ -84,7 +89,10 @@ class HttpPusher(object): @defer.inlineCallbacks def on_started(self): - yield self._process() + try: + yield self._process() + except Exception: + logger.exception("Error starting http pusher") @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): @@ -131,7 +139,7 @@ class HttpPusher(object): starting_max_ordering = self.max_stream_ordering try: yield self._unsafe_process() - except: + except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: break @@ -151,9 +159,16 @@ class HttpPusher(object): self.user_id, self.last_stream_ordering, self.max_stream_ordering ) + logger.info( + "Processing %i unprocessed push actions for %s starting at " + "stream_ordering %s", + len(unprocessed), self.name, self.last_stream_ordering, + ) + for push_action in unprocessed: processed = yield self._process_one(push_action) if processed: + http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action['stream_ordering'] yield self.store.update_pusher_last_stream_ordering_and_success( @@ -168,6 +183,7 @@ class HttpPusher(object): self.failing_since ) else: + http_push_failed_counter.inc() if not self.failing_since: self.failing_since = self.clock.time_msec() yield self.store.update_pusher_failing_since( @@ -204,7 +220,9 @@ class HttpPusher(object): ) else: logger.info("Push failed: delaying for %ds", self.backoff_delay) - self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) + self.timed_call = self.hs.get_reactor().callLater( + self.backoff_delay, self.on_timer + ) self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) break @@ -244,6 +262,26 @@ class HttpPusher(object): @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): + if self.data.get('format') == 'event_id_only': + d = { + 'notification': { + 'event_id': event.event_id, + 'room_id': event.room_id, + 'counts': { + 'unread': badge, + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + } + ] + } + } + defer.returnValue(d) + ctx = yield push_tools.get_context_for_event( self.store, self.state_handler, event, self.user_id ) @@ -275,7 +313,7 @@ class HttpPusher(object): if event.type == 'm.room.member': d['notification']['membership'] = event.content['membership'] d['notification']['user_is_target'] = event.state_key == self.user_id - if not self.hs.config.push_redact_content and 'content' in event: + if self.hs.config.push_include_content and 'content' in event: d['notification']['content'] = event.content # We no longer send aliases separately, instead, we send the human @@ -294,8 +332,11 @@ class HttpPusher(object): defer.returnValue([]) try: resp = yield self.http_client.post_json_get_json(self.url, notification_dict) - except: - logger.warn("Failed to push %s ", self.url) + except Exception: + logger.warn( + "Failed to push event %s to %s", + event.event_id, self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: @@ -304,7 +345,7 @@ class HttpPusher(object): @defer.inlineCallbacks def _send_badge(self, badge): - logger.info("Sending updated badge count %d to %r", badge, self.user_id) + logger.info("Sending updated badge count %d to %s", badge, self.name) d = { 'notification': { 'id': '', @@ -325,8 +366,11 @@ class HttpPusher(object): } try: resp = yield self.http_client.post_json_get_json(self.url, d) - except: - logger.exception("Failed to push %s ", self.url) + except Exception: + logger.warn( + "Failed to send badge count to %s", + self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py index b5cd9b426a..9d601208fd 100644 --- a/synapse/push/mailer.py +++ b/synapse/push/mailer.py @@ -13,30 +13,31 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer -from twisted.mail.smtp import sendmail - -import email.utils import email.mime.multipart -from email.mime.text import MIMEText +import email.utils +import logging +import time +import urllib from email.mime.multipart import MIMEMultipart +from email.mime.text import MIMEText -from synapse.util.async import concurrently_execute +import bleach +import jinja2 + +from twisted.internet import defer +from twisted.mail.smtp import sendmail + +from synapse.api.constants import EventTypes +from synapse.api.errors import StoreError from synapse.push.presentable_names import ( - calculate_room_name, name_from_member_event, descriptor_from_member_events + calculate_room_name, + descriptor_from_member_events, + name_from_member_event, ) from synapse.types import UserID -from synapse.api.errors import StoreError -from synapse.api.constants import EventTypes +from synapse.util.async import concurrently_execute from synapse.visibility import filter_events_for_client -import jinja2 -import bleach - -import time -import urllib - -import logging logger = logging.getLogger(__name__) @@ -229,7 +230,8 @@ class Mailer(object): if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]: prev_messages = room_vars['notifs'][-1]['messages'] for message in notifvars['messages']: - pm = filter(lambda pm: pm['id'] == message['id'], prev_messages) + pm = list(filter(lambda pm: pm['id'] == message['id'], + prev_messages)) if pm: if not message["is_historical"]: pm[0]["is_historical"] = False diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py index 277da3cd35..eef6e18c2e 100644 --- a/synapse/push/presentable_names.py +++ b/synapse/push/presentable_names.py @@ -13,10 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from twisted.internet import defer - -import re import logging +import re + +from twisted.internet import defer logger = logging.getLogger(__name__) @@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True # so find out who is in the room that isn't the user. if "m.room.member" in room_state_bytype_ids: member_events = yield store.get_events( - room_state_bytype_ids["m.room.member"].values() + list(room_state_bytype_ids["m.room.member"].values()) ) all_members = [ ev for ev in member_events.values() diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py index 172c27c137..2bd321d530 100644 --- a/synapse/push/push_rule_evaluator.py +++ b/synapse/push/push_rule_evaluator.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +17,8 @@ import logging import re +from six import string_types + from synapse.types import UserID from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache from synapse.util.caches.lrucache import LruCache @@ -29,6 +32,21 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") def _room_member_count(ev, condition, room_member_count): + return _test_ineq_condition(condition, room_member_count) + + +def _sender_notification_permission(ev, condition, sender_power_level, power_levels): + notif_level_key = condition.get('key') + if notif_level_key is None: + return False + + notif_levels = power_levels.get('notifications', {}) + room_notif_level = notif_levels.get(notif_level_key, 50) + + return sender_power_level >= room_notif_level + + +def _test_ineq_condition(condition, number): if 'is' not in condition: return False m = INEQUALITY_EXPR.match(condition['is']) @@ -41,15 +59,15 @@ def _room_member_count(ev, condition, room_member_count): rhs = int(rhs) if ineq == '' or ineq == '==': - return room_member_count == rhs + return number == rhs elif ineq == '<': - return room_member_count < rhs + return number < rhs elif ineq == '>': - return room_member_count > rhs + return number > rhs elif ineq == '>=': - return room_member_count >= rhs + return number >= rhs elif ineq == '<=': - return room_member_count <= rhs + return number <= rhs else: return False @@ -65,9 +83,11 @@ def tweaks_for_actions(actions): class PushRuleEvaluatorForEvent(object): - def __init__(self, event, room_member_count): + def __init__(self, event, room_member_count, sender_power_level, power_levels): self._event = event self._room_member_count = room_member_count + self._sender_power_level = sender_power_level + self._power_levels = power_levels # Maps strings of e.g. 'content.body' -> event["content"]["body"] self._value_cache = _flatten_dict(event) @@ -81,6 +101,10 @@ class PushRuleEvaluatorForEvent(object): return _room_member_count( self._event, condition, self._room_member_count ) + elif condition['kind'] == 'sender_notification_permission': + return _sender_notification_permission( + self._event, condition, self._sender_power_level, self._power_levels, + ) else: return True @@ -128,7 +152,7 @@ class PushRuleEvaluatorForEvent(object): # Caches (glob, word_boundary) -> regex for push. See _glob_matches regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR) -register_cache("regex_push_cache", regex_cache) +register_cache("cache", "regex_push_cache", regex_cache) def _glob_matches(glob, value, word_boundary=False): @@ -183,7 +207,7 @@ def _glob_to_re(glob, word_boundary): r, ) if word_boundary: - r = r"\b%s\b" % (r,) + r = _re_word_boundary(r) return re.compile(r, flags=re.IGNORECASE) else: @@ -192,7 +216,7 @@ def _glob_to_re(glob, word_boundary): return re.compile(r, flags=re.IGNORECASE) elif word_boundary: r = re.escape(glob) - r = r"\b%s\b" % (r,) + r = _re_word_boundary(r) return re.compile(r, flags=re.IGNORECASE) else: @@ -200,11 +224,23 @@ def _glob_to_re(glob, word_boundary): return re.compile(r, flags=re.IGNORECASE) +def _re_word_boundary(r): + """ + Adds word boundary characters to the start and end of an + expression to require that the match occur as a whole word, + but do so respecting the fact that strings starting or ending + with non-word characters will change word boundaries. + """ + # we can't use \b as it chokes on unicode. however \W seems to be okay + # as shorthand for [^0-9A-Za-z_]. + return r"(^|\W)%s(\W|$)" % (r,) + + def _flatten_dict(d, prefix=[], result=None): if result is None: result = {} for key, value in d.items(): - if isinstance(value, basestring): + if isinstance(value, string_types): result[".".join(prefix + [key])] = value.lower() elif hasattr(value, "items"): _flatten_dict(value, prefix=(prefix + [key]), result=result) diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py index 6835f54e97..8049c298c2 100644 --- a/synapse/push/push_tools.py +++ b/synapse/push/push_tools.py @@ -14,9 +14,8 @@ # limitations under the License. from twisted.internet import defer -from synapse.push.presentable_names import ( - calculate_room_name, name_from_member_event -) + +from synapse.push.presentable_names import calculate_room_name, name_from_member_event @defer.inlineCallbacks diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py index 491f27bded..fcee6d9d7e 100644 --- a/synapse/push/pusher.py +++ b/synapse/push/pusher.py @@ -13,9 +13,10 @@ # See the License for the specific language governing permissions and # limitations under the License. -from httppusher import HttpPusher - import logging + +from .httppusher import HttpPusher + logger = logging.getLogger(__name__) # We try importing this if we can (it will fail if we don't @@ -27,7 +28,7 @@ logger = logging.getLogger(__name__) try: from synapse.push.emailpusher import EmailPusher from synapse.push.mailer import Mailer, load_jinja2_templates -except: +except Exception: pass diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 43cb6e9c01..36bb5bbc65 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -14,13 +14,12 @@ # See the License for the specific language governing permissions and # limitations under the License. +import logging + from twisted.internet import defer -from .pusher import PusherFactory -from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred -from synapse.util.async import run_on_reactor - -import logging +from synapse.push.pusher import PusherFactory +from synapse.util.logcontext import make_deferred_yieldable, run_in_background logger = logging.getLogger(__name__) @@ -103,23 +102,28 @@ class PusherPool: yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) @defer.inlineCallbacks - def remove_pushers_by_user(self, user_id, except_access_token_id=None): - all = yield self.store.get_all_pushers() - logger.info( - "Removing all pushers for user %s except access tokens id %r", - user_id, except_access_token_id - ) - for p in all: - if p['user_name'] == user_id and p['access_token'] != except_access_token_id: + def remove_pushers_by_access_token(self, user_id, access_tokens): + """Remove the pushers for a given user corresponding to a set of + access_tokens. + + Args: + user_id (str): user to remove pushers for + access_tokens (Iterable[int]): access token *ids* to remove pushers + for + """ + tokens = set(access_tokens) + for p in (yield self.store.get_pushers_by_user_id(user_id)): + if p['access_token'] in tokens: logger.info( "Removing pusher for app id %s, pushkey %s, user %s", p['app_id'], p['pushkey'], p['user_name'] ) - yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name']) + yield self.remove_pusher( + p['app_id'], p['pushkey'], p['user_name'], + ) @defer.inlineCallbacks def on_new_notifications(self, min_stream_id, max_stream_id): - yield run_on_reactor() try: users_affected = yield self.store.get_push_action_users_in_range( min_stream_id, max_stream_id @@ -131,18 +135,20 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_notifications)( - min_stream_id, max_stream_id + run_in_background( + p.on_new_notifications, + min_stream_id, max_stream_id, ) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) - except: + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) + except Exception: logger.exception("Exception in pusher on_new_notifications") @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids): - yield run_on_reactor() try: # Need to subtract 1 from the minimum because the lower bound here # is not inclusive @@ -158,11 +164,16 @@ class PusherPool: if u in self.pushers: for p in self.pushers[u].values(): deferreds.append( - preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id) + run_in_background( + p.on_new_receipts, + min_stream_id, max_stream_id, + ) ) - yield preserve_context_over_deferred(defer.gatherResults(deferreds)) - except: + yield make_deferred_yieldable( + defer.gatherResults(deferreds, consumeErrors=True), + ) + except Exception: logger.exception("Exception in pusher on_new_receipts") @defer.inlineCallbacks @@ -188,7 +199,7 @@ class PusherPool: for pusherdict in pushers: try: p = self.pusher_factory.create_pusher(pusherdict) - except: + except Exception: logger.exception("Couldn't start a pusher: caught Exception") continue if p: @@ -201,7 +212,7 @@ class PusherPool: if appid_pushkey in byuser: byuser[appid_pushkey].on_stop() byuser[appid_pushkey] = p - preserve_fn(p.on_started)() + run_in_background(p.on_started) logger.info("Started pushers") |