summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/action_generator.py12
-rw-r--r--synapse/push/baserules.py28
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py152
-rw-r--r--synapse/push/clientformat.py6
-rw-r--r--synapse/push/emailpusher.py24
-rw-r--r--synapse/push/httppusher.py74
-rw-r--r--synapse/push/mailer.py36
-rw-r--r--synapse/push/presentable_names.py8
-rw-r--r--synapse/push/push_rule_evaluator.py56
-rw-r--r--synapse/push/push_tools.py5
-rw-r--r--synapse/push/pusher.py7
-rw-r--r--synapse/push/pusherpool.py61
12 files changed, 332 insertions, 137 deletions
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
index fe09d50d55..a5de75c48a 100644
--- a/synapse/push/action_generator.py
+++ b/synapse/push/action_generator.py
@@ -13,13 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
+import logging
 
-from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
+from twisted.internet import defer
 
 from synapse.util.metrics import Measure
 
-import logging
+from .bulk_push_rule_evaluator import BulkPushRuleEvaluator
 
 logger = logging.getLogger(__name__)
 
@@ -40,10 +40,6 @@ class ActionGenerator(object):
     @defer.inlineCallbacks
     def handle_push_actions_for_event(self, event, context):
         with Measure(self.clock, "action_for_event_by_user"):
-            actions_by_user = yield self.bulk_evaluator.action_for_event_by_user(
+            yield self.bulk_evaluator.action_for_event_by_user(
                 event, context
             )
-
-        context.push_actions = [
-            (uid, actions) for uid, actions in actions_by_user.iteritems()
-        ]
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 85effdfa46..8f0682c948 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,4 +1,5 @@
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
 import copy
 
+from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+
 
 def list_with_base_rules(rawrules):
     """Combine the list of rules set by the user with the default push rules
@@ -38,7 +40,7 @@ def list_with_base_rules(rawrules):
     rawrules = [r for r in rawrules if r['priority_class'] >= 0]
 
     # shove the server default rules for each kind onto the end of each
-    current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
+    current_prio_class = list(PRIORITY_CLASS_INVERSE_MAP)[-1]
 
     ruleslist.extend(make_base_prepend_rules(
         PRIORITY_CLASS_INVERSE_MAP[current_prio_class], modified_base_rules
@@ -238,6 +240,28 @@ BASE_APPEND_OVERRIDE_RULES = [
             }
         ]
     },
+    {
+        'rule_id': 'global/override/.m.rule.roomnotif',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'content.body',
+                'pattern': '@room',
+                '_id': '_roomnotif_content',
+            },
+            {
+                'kind': 'sender_notification_permission',
+                'key': 'room',
+                '_id': '_roomnotif_pl',
+            },
+        ],
+        'actions': [
+            'notify', {
+                'set_tweak': 'highlight',
+                'value': True,
+            }
+        ]
+    }
 ]
 
 
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 9a96e6fe8f..1d14d3639c 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,18 +15,22 @@
 # limitations under the License.
 
 import logging
+from collections import namedtuple
 
-from twisted.internet import defer
+from six import iteritems, itervalues
 
-from .push_rule_evaluator import PushRuleEvaluatorForEvent
+from prometheus_client import Counter
+
+from twisted.internet import defer
 
-from synapse.visibility import filter_events_for_clients_context
 from synapse.api.constants import EventTypes, Membership
-from synapse.util.caches.descriptors import cached
+from synapse.event_auth import get_user_power_level
+from synapse.state import POWER_KEY
 from synapse.util.async import Linearizer
+from synapse.util.caches import register_cache
+from synapse.util.caches.descriptors import cached
 
-from collections import namedtuple
-
+from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
 logger = logging.getLogger(__name__)
 
@@ -33,6 +38,20 @@ logger = logging.getLogger(__name__)
 rules_by_room = {}
 
 
+push_rules_invalidation_counter = Counter(
+    "synapse_push_bulk_push_rule_evaluator_push_rules_invalidation_counter", "")
+push_rules_state_size_counter = Counter(
+    "synapse_push_bulk_push_rule_evaluator_push_rules_state_size_counter", "")
+
+# Measures whether we use the fast path of using state deltas, or if we have to
+# recalculate from scratch
+push_rules_delta_state_cache_metric = register_cache(
+    "cache",
+    "push_rules_delta_state_cache_metric",
+    cache=[],  # Meaningless size, as this isn't a cache that stores values
+)
+
+
 class BulkPushRuleEvaluator(object):
     """Calculates the outcome of push rules for an event for all users in the
     room at once.
@@ -41,6 +60,13 @@ class BulkPushRuleEvaluator(object):
     def __init__(self, hs):
         self.hs = hs
         self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+
+        self.room_push_rule_cache_metrics = register_cache(
+            "cache",
+            "room_push_rule_cache",
+            cache=[],  # Meaningless size, as this isn't a cache that stores values
+        )
 
     @defer.inlineCallbacks
     def _get_rules_for_event(self, event, context):
@@ -79,37 +105,69 @@ class BulkPushRuleEvaluator(object):
         # It's important that RulesForRoom gets added to self._get_rules_for_room.cache
         # before any lookup methods get called on it as otherwise there may be
         # a race if invalidate_all gets called (which assumes its in the cache)
-        return RulesForRoom(self.hs, room_id, self._get_rules_for_room.cache)
+        return RulesForRoom(
+            self.hs, room_id, self._get_rules_for_room.cache,
+            self.room_push_rule_cache_metrics,
+        )
+
+    @defer.inlineCallbacks
+    def _get_power_levels_and_sender_level(self, event, context):
+        prev_state_ids = yield context.get_prev_state_ids(self.store)
+        pl_event_id = prev_state_ids.get(POWER_KEY)
+        if pl_event_id:
+            # fastpath: if there's a power level event, that's all we need, and
+            # not having a power level event is an extreme edge case
+            pl_event = yield self.store.get_event(pl_event_id)
+            auth_events = {POWER_KEY: pl_event}
+        else:
+            auth_events_ids = yield self.auth.compute_auth_events(
+                event, prev_state_ids, for_verification=False,
+            )
+            auth_events = yield self.store.get_events(auth_events_ids)
+            auth_events = {
+                (e.type, e.state_key): e for e in itervalues(auth_events)
+            }
+
+        sender_level = get_user_power_level(event.sender, auth_events)
+
+        pl_event = auth_events.get(POWER_KEY)
+
+        defer.returnValue((pl_event.content if pl_event else {}, sender_level))
 
     @defer.inlineCallbacks
     def action_for_event_by_user(self, event, context):
-        """Given an event and context, evaluate the push rules and return
-        the results
+        """Given an event and context, evaluate the push rules and insert the
+        results into the event_push_actions_staging table.
 
         Returns:
-            dict of user_id -> action
+            Deferred
         """
         rules_by_user = yield self._get_rules_for_event(event, context)
         actions_by_user = {}
 
-        # None of these users can be peeking since this list of users comes
-        # from the set of users in the room, so we know for sure they're all
-        # actually in the room.
-        user_tuples = [(u, False) for u in rules_by_user]
-
-        filtered_by_user = yield filter_events_for_clients_context(
-            self.store, user_tuples, [event], {event.event_id: context}
-        )
-
         room_members = yield self.store.get_joined_users_from_context(
             event, context
         )
 
-        evaluator = PushRuleEvaluatorForEvent(event, len(room_members))
+        (power_levels, sender_power_level) = (
+            yield self._get_power_levels_and_sender_level(event, context)
+        )
+
+        evaluator = PushRuleEvaluatorForEvent(
+            event, len(room_members), sender_power_level, power_levels,
+        )
 
         condition_cache = {}
 
-        for uid, rules in rules_by_user.iteritems():
+        for uid, rules in iteritems(rules_by_user):
+            if event.sender == uid:
+                continue
+
+            if not event.is_state():
+                is_ignored = yield self.store.is_ignored_by(event.sender, uid)
+                if is_ignored:
+                    continue
+
             display_name = None
             profile_info = room_members.get(uid)
             if profile_info:
@@ -121,13 +179,6 @@ class BulkPushRuleEvaluator(object):
                 if event.type == EventTypes.Member and event.state_key == uid:
                     display_name = event.content.get("displayname", None)
 
-            filtered = filtered_by_user[uid]
-            if len(filtered) == 0:
-                continue
-
-            if filtered[0].sender == uid:
-                continue
-
             for rule in rules:
                 if 'enabled' in rule and not rule['enabled']:
                     continue
@@ -138,9 +189,16 @@ class BulkPushRuleEvaluator(object):
                 if matches:
                     actions = [x for x in rule['actions'] if x != 'dont_notify']
                     if actions and 'notify' in actions:
+                        # Push rules say we should notify the user of this event
                         actions_by_user[uid] = actions
                     break
-        defer.returnValue(actions_by_user)
+
+        # Mark in the DB staging area the push actions for users who should be
+        # notified for this event. (This will then get handled when we persist
+        # the event)
+        yield self.store.add_push_actions_to_staging(
+            event.event_id, actions_by_user,
+        )
 
 
 def _condition_checker(evaluator, conditions, uid, display_name, cache):
@@ -170,17 +228,19 @@ class RulesForRoom(object):
     the entire cache for the room.
     """
 
-    def __init__(self, hs, room_id, rules_for_room_cache):
+    def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
         """
         Args:
             hs (HomeServer)
             room_id (str)
             rules_for_room_cache(Cache): The cache object that caches these
                 RoomsForUser objects.
+            room_push_rule_cache_metrics (CacheMetric)
         """
         self.room_id = room_id
         self.is_mine_id = hs.is_mine_id
         self.store = hs.get_datastore()
+        self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
 
         self.linearizer = Linearizer(name="rules_for_room")
 
@@ -222,11 +282,19 @@ class RulesForRoom(object):
         """
         state_group = context.state_group
 
+        if state_group and self.state_group == state_group:
+            logger.debug("Using cached rules for %r", self.room_id)
+            self.room_push_rule_cache_metrics.inc_hits()
+            defer.returnValue(self.rules_by_user)
+
         with (yield self.linearizer.queue(())):
             if state_group and self.state_group == state_group:
                 logger.debug("Using cached rules for %r", self.room_id)
+                self.room_push_rule_cache_metrics.inc_hits()
                 defer.returnValue(self.rules_by_user)
 
+            self.room_push_rule_cache_metrics.inc_misses()
+
             ret_rules_by_user = {}
             missing_member_event_ids = {}
             if state_group and self.state_group == context.prev_group:
@@ -234,8 +302,13 @@ class RulesForRoom(object):
                 # results.
                 ret_rules_by_user = self.rules_by_user
                 current_state_ids = context.delta_ids
+
+                push_rules_delta_state_cache_metric.inc_hits()
             else:
-                current_state_ids = context.current_state_ids
+                current_state_ids = yield context.get_current_state_ids(self.store)
+                push_rules_delta_state_cache_metric.inc_misses()
+
+            push_rules_state_size_counter.inc(len(current_state_ids))
 
             logger.debug(
                 "Looking for member changes in %r %r", state_group, current_state_ids
@@ -282,6 +355,14 @@ class RulesForRoom(object):
                 yield self._update_rules_with_member_event_ids(
                     ret_rules_by_user, missing_member_event_ids, state_group, event
                 )
+            else:
+                # The push rules didn't change but lets update the cache anyway
+                self.update_cache(
+                    self.sequence,
+                    members={},  # There were no membership changes
+                    rules_by_user=ret_rules_by_user,
+                    state_group=state_group
+                )
 
         if logger.isEnabledFor(logging.DEBUG):
             logger.debug(
@@ -324,7 +405,7 @@ class RulesForRoom(object):
         # If the event is a join event then it will be in current state evnts
         # map but not in the DB, so we have to explicitly insert it.
         if event.type == EventTypes.Member:
-            for event_id in member_event_ids.itervalues():
+            for event_id in itervalues(member_event_ids):
                 if event_id == event.event_id:
                     members[event_id] = (event.state_key, event.membership)
 
@@ -332,7 +413,7 @@ class RulesForRoom(object):
             logger.debug("Found members %r: %r", self.room_id, members.values())
 
         interested_in_user_ids = set(
-            user_id for user_id, membership in members.itervalues()
+            user_id for user_id, membership in itervalues(members)
             if membership == Membership.JOIN
         )
 
@@ -344,7 +425,7 @@ class RulesForRoom(object):
         )
 
         user_ids = set(
-            uid for uid, have_pusher in if_users_with_pushers.iteritems() if have_pusher
+            uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
         )
 
         logger.debug("With pushers: %r", user_ids)
@@ -365,7 +446,7 @@ class RulesForRoom(object):
         )
 
         ret_rules_by_user.update(
-            item for item in rules_by_user.iteritems() if item[0] is not None
+            item for item in iteritems(rules_by_user) if item[0] is not None
         )
 
         self.update_cache(sequence, members, ret_rules_by_user, state_group)
@@ -380,6 +461,7 @@ class RulesForRoom(object):
         self.state_group = object()
         self.member_map = {}
         self.rules_by_user = {}
+        push_rules_invalidation_counter.inc()
 
     def update_cache(self, sequence, members, rules_by_user, state_group):
         if sequence == self.sequence:
diff --git a/synapse/push/clientformat.py b/synapse/push/clientformat.py
index e0331b2d2d..ecbf364a5e 100644
--- a/synapse/push/clientformat.py
+++ b/synapse/push/clientformat.py
@@ -13,12 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.push.rulekinds import (
-    PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
-)
-
 import copy
 
+from synapse.push.rulekinds import PRIORITY_CLASS_INVERSE_MAP, PRIORITY_CLASS_MAP
+
 
 def format_push_rules_for_user(user, ruleslist):
     """Converts a list of rawrules and a enabled map into nested dictionaries
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index a69dda7b09..d746371420 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -13,14 +13,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer, reactor
-from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-
 import logging
 
-from synapse.util.metrics import Measure
-from synapse.util.logcontext import LoggingContext
+from twisted.internet import defer
+from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -77,10 +76,13 @@ class EmailPusher(object):
     @defer.inlineCallbacks
     def on_started(self):
         if self.mailer is not None:
-            self.throttle_params = yield self.store.get_throttle_params_by_room(
-                self.pusher_id
-            )
-            yield self._process()
+            try:
+                self.throttle_params = yield self.store.get_throttle_params_by_room(
+                    self.pusher_id
+                )
+                yield self._process()
+            except Exception:
+                logger.exception("Error starting email pusher")
 
     def on_stop(self):
         if self.timed_call:
@@ -121,7 +123,7 @@ class EmailPusher(object):
                         starting_max_ordering = self.max_stream_ordering
                         try:
                             yield self._unsafe_process()
-                        except:
+                        except Exception:
                             logger.exception("Exception processing notifs")
                         if self.max_stream_ordering == starting_max_ordering:
                             break
@@ -196,7 +198,7 @@ class EmailPusher(object):
                     self.timed_call = None
 
         if soonest_due_at is not None:
-            self.timed_call = reactor.callLater(
+            self.timed_call = self.hs.get_reactor().callLater(
                 self.seconds_until(soonest_due_at), self.on_timer
             )
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 8a5d473108..81e18bcf7d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -12,21 +13,25 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
 
-from synapse.push import PusherConfigException
+from prometheus_client import Counter
 
-from twisted.internet import defer, reactor
+from twisted.internet import defer
 from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 
-import logging
-import push_rule_evaluator
-import push_tools
-
+from synapse.push import PusherConfigException
 from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 
+from . import push_rule_evaluator, push_tools
+
 logger = logging.getLogger(__name__)
 
+http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
+
+http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
+
 
 class HttpPusher(object):
     INITIAL_BACKOFF_SEC = 1  # in seconds because that's what Twisted takes
@@ -84,7 +89,10 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def on_started(self):
-        yield self._process()
+        try:
+            yield self._process()
+        except Exception:
+            logger.exception("Error starting http pusher")
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
@@ -131,7 +139,7 @@ class HttpPusher(object):
                         starting_max_ordering = self.max_stream_ordering
                         try:
                             yield self._unsafe_process()
-                        except:
+                        except Exception:
                             logger.exception("Exception processing notifs")
                         if self.max_stream_ordering == starting_max_ordering:
                             break
@@ -151,9 +159,16 @@ class HttpPusher(object):
             self.user_id, self.last_stream_ordering, self.max_stream_ordering
         )
 
+        logger.info(
+            "Processing %i unprocessed push actions for %s starting at "
+            "stream_ordering %s",
+            len(unprocessed), self.name, self.last_stream_ordering,
+        )
+
         for push_action in unprocessed:
             processed = yield self._process_one(push_action)
             if processed:
+                http_push_processed_counter.inc()
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action['stream_ordering']
                 yield self.store.update_pusher_last_stream_ordering_and_success(
@@ -168,6 +183,7 @@ class HttpPusher(object):
                         self.failing_since
                     )
             else:
+                http_push_failed_counter.inc()
                 if not self.failing_since:
                     self.failing_since = self.clock.time_msec()
                     yield self.store.update_pusher_failing_since(
@@ -204,7 +220,9 @@ class HttpPusher(object):
                     )
                 else:
                     logger.info("Push failed: delaying for %ds", self.backoff_delay)
-                    self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+                    self.timed_call = self.hs.get_reactor().callLater(
+                        self.backoff_delay, self.on_timer
+                    )
                     self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
                     break
 
@@ -244,6 +262,26 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _build_notification_dict(self, event, tweaks, badge):
+        if self.data.get('format') == 'event_id_only':
+            d = {
+                'notification': {
+                    'event_id': event.event_id,
+                    'room_id': event.room_id,
+                    'counts': {
+                        'unread': badge,
+                    },
+                    'devices': [
+                        {
+                            'app_id': self.app_id,
+                            'pushkey': self.pushkey,
+                            'pushkey_ts': long(self.pushkey_ts / 1000),
+                            'data': self.data_minus_url,
+                        }
+                    ]
+                }
+            }
+            defer.returnValue(d)
+
         ctx = yield push_tools.get_context_for_event(
             self.store, self.state_handler, event, self.user_id
         )
@@ -275,7 +313,7 @@ class HttpPusher(object):
         if event.type == 'm.room.member':
             d['notification']['membership'] = event.content['membership']
             d['notification']['user_is_target'] = event.state_key == self.user_id
-        if not self.hs.config.push_redact_content and 'content' in event:
+        if self.hs.config.push_include_content and 'content' in event:
             d['notification']['content'] = event.content
 
         # We no longer send aliases separately, instead, we send the human
@@ -294,8 +332,11 @@ class HttpPusher(object):
             defer.returnValue([])
         try:
             resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
-        except:
-            logger.warn("Failed to push %s ", self.url)
+        except Exception:
+            logger.warn(
+                "Failed to push event %s to %s",
+                event.event_id, self.name, exc_info=True,
+            )
             defer.returnValue(False)
         rejected = []
         if 'rejected' in resp:
@@ -304,7 +345,7 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _send_badge(self, badge):
-        logger.info("Sending updated badge count %d to %r", badge, self.user_id)
+        logger.info("Sending updated badge count %d to %s", badge, self.name)
         d = {
             'notification': {
                 'id': '',
@@ -325,8 +366,11 @@ class HttpPusher(object):
         }
         try:
             resp = yield self.http_client.post_json_get_json(self.url, d)
-        except:
-            logger.exception("Failed to push %s ", self.url)
+        except Exception:
+            logger.warn(
+                "Failed to send badge count to %s",
+                self.name, exc_info=True,
+            )
             defer.returnValue(False)
         rejected = []
         if 'rejected' in resp:
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index b5cd9b426a..9d601208fd 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -13,30 +13,31 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-from twisted.mail.smtp import sendmail
-
-import email.utils
 import email.mime.multipart
-from email.mime.text import MIMEText
+import email.utils
+import logging
+import time
+import urllib
 from email.mime.multipart import MIMEMultipart
+from email.mime.text import MIMEText
 
-from synapse.util.async import concurrently_execute
+import bleach
+import jinja2
+
+from twisted.internet import defer
+from twisted.mail.smtp import sendmail
+
+from synapse.api.constants import EventTypes
+from synapse.api.errors import StoreError
 from synapse.push.presentable_names import (
-    calculate_room_name, name_from_member_event, descriptor_from_member_events
+    calculate_room_name,
+    descriptor_from_member_events,
+    name_from_member_event,
 )
 from synapse.types import UserID
-from synapse.api.errors import StoreError
-from synapse.api.constants import EventTypes
+from synapse.util.async import concurrently_execute
 from synapse.visibility import filter_events_for_client
 
-import jinja2
-import bleach
-
-import time
-import urllib
-
-import logging
 logger = logging.getLogger(__name__)
 
 
@@ -229,7 +230,8 @@ class Mailer(object):
                 if room_vars['notifs'] and 'messages' in room_vars['notifs'][-1]:
                     prev_messages = room_vars['notifs'][-1]['messages']
                     for message in notifvars['messages']:
-                        pm = filter(lambda pm: pm['id'] == message['id'], prev_messages)
+                        pm = list(filter(lambda pm: pm['id'] == message['id'],
+                                         prev_messages))
                         if pm:
                             if not message["is_historical"]:
                                 pm[0]["is_historical"] = False
diff --git a/synapse/push/presentable_names.py b/synapse/push/presentable_names.py
index 277da3cd35..eef6e18c2e 100644
--- a/synapse/push/presentable_names.py
+++ b/synapse/push/presentable_names.py
@@ -13,10 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from twisted.internet import defer
-
-import re
 import logging
+import re
+
+from twisted.internet import defer
 
 logger = logging.getLogger(__name__)
 
@@ -113,7 +113,7 @@ def calculate_room_name(store, room_state_ids, user_id, fallback_to_members=True
     # so find out who is in the room that isn't the user.
     if "m.room.member" in room_state_bytype_ids:
         member_events = yield store.get_events(
-            room_state_bytype_ids["m.room.member"].values()
+            list(room_state_bytype_ids["m.room.member"].values())
         )
         all_members = [
             ev for ev in member_events.values()
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 172c27c137..2bd321d530 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -16,6 +17,8 @@
 import logging
 import re
 
+from six import string_types
+
 from synapse.types import UserID
 from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
 from synapse.util.caches.lrucache import LruCache
@@ -29,6 +32,21 @@ INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
 
 
 def _room_member_count(ev, condition, room_member_count):
+    return _test_ineq_condition(condition, room_member_count)
+
+
+def _sender_notification_permission(ev, condition, sender_power_level, power_levels):
+    notif_level_key = condition.get('key')
+    if notif_level_key is None:
+        return False
+
+    notif_levels = power_levels.get('notifications', {})
+    room_notif_level = notif_levels.get(notif_level_key, 50)
+
+    return sender_power_level >= room_notif_level
+
+
+def _test_ineq_condition(condition, number):
     if 'is' not in condition:
         return False
     m = INEQUALITY_EXPR.match(condition['is'])
@@ -41,15 +59,15 @@ def _room_member_count(ev, condition, room_member_count):
     rhs = int(rhs)
 
     if ineq == '' or ineq == '==':
-        return room_member_count == rhs
+        return number == rhs
     elif ineq == '<':
-        return room_member_count < rhs
+        return number < rhs
     elif ineq == '>':
-        return room_member_count > rhs
+        return number > rhs
     elif ineq == '>=':
-        return room_member_count >= rhs
+        return number >= rhs
     elif ineq == '<=':
-        return room_member_count <= rhs
+        return number <= rhs
     else:
         return False
 
@@ -65,9 +83,11 @@ def tweaks_for_actions(actions):
 
 
 class PushRuleEvaluatorForEvent(object):
-    def __init__(self, event, room_member_count):
+    def __init__(self, event, room_member_count, sender_power_level, power_levels):
         self._event = event
         self._room_member_count = room_member_count
+        self._sender_power_level = sender_power_level
+        self._power_levels = power_levels
 
         # Maps strings of e.g. 'content.body' -> event["content"]["body"]
         self._value_cache = _flatten_dict(event)
@@ -81,6 +101,10 @@ class PushRuleEvaluatorForEvent(object):
             return _room_member_count(
                 self._event, condition, self._room_member_count
             )
+        elif condition['kind'] == 'sender_notification_permission':
+            return _sender_notification_permission(
+                self._event, condition, self._sender_power_level, self._power_levels,
+            )
         else:
             return True
 
@@ -128,7 +152,7 @@ class PushRuleEvaluatorForEvent(object):
 
 # Caches (glob, word_boundary) -> regex for push. See _glob_matches
 regex_cache = LruCache(50000 * CACHE_SIZE_FACTOR)
-register_cache("regex_push_cache", regex_cache)
+register_cache("cache", "regex_push_cache", regex_cache)
 
 
 def _glob_matches(glob, value, word_boundary=False):
@@ -183,7 +207,7 @@ def _glob_to_re(glob, word_boundary):
             r,
         )
         if word_boundary:
-            r = r"\b%s\b" % (r,)
+            r = _re_word_boundary(r)
 
             return re.compile(r, flags=re.IGNORECASE)
         else:
@@ -192,7 +216,7 @@ def _glob_to_re(glob, word_boundary):
             return re.compile(r, flags=re.IGNORECASE)
     elif word_boundary:
         r = re.escape(glob)
-        r = r"\b%s\b" % (r,)
+        r = _re_word_boundary(r)
 
         return re.compile(r, flags=re.IGNORECASE)
     else:
@@ -200,11 +224,23 @@ def _glob_to_re(glob, word_boundary):
         return re.compile(r, flags=re.IGNORECASE)
 
 
+def _re_word_boundary(r):
+    """
+    Adds word boundary characters to the start and end of an
+    expression to require that the match occur as a whole word,
+    but do so respecting the fact that strings starting or ending
+    with non-word characters will change word boundaries.
+    """
+    # we can't use \b as it chokes on unicode. however \W seems to be okay
+    # as shorthand for [^0-9A-Za-z_].
+    return r"(^|\W)%s(\W|$)" % (r,)
+
+
 def _flatten_dict(d, prefix=[], result=None):
     if result is None:
         result = {}
     for key, value in d.items():
-        if isinstance(value, basestring):
+        if isinstance(value, string_types):
             result[".".join(prefix + [key])] = value.lower()
         elif hasattr(value, "items"):
             _flatten_dict(value, prefix=(prefix + [key]), result=result)
diff --git a/synapse/push/push_tools.py b/synapse/push/push_tools.py
index 6835f54e97..8049c298c2 100644
--- a/synapse/push/push_tools.py
+++ b/synapse/push/push_tools.py
@@ -14,9 +14,8 @@
 # limitations under the License.
 
 from twisted.internet import defer
-from synapse.push.presentable_names import (
-    calculate_room_name, name_from_member_event
-)
+
+from synapse.push.presentable_names import calculate_room_name, name_from_member_event
 
 
 @defer.inlineCallbacks
diff --git a/synapse/push/pusher.py b/synapse/push/pusher.py
index 491f27bded..fcee6d9d7e 100644
--- a/synapse/push/pusher.py
+++ b/synapse/push/pusher.py
@@ -13,9 +13,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from httppusher import HttpPusher
-
 import logging
+
+from .httppusher import HttpPusher
+
 logger = logging.getLogger(__name__)
 
 # We try importing this if we can (it will fail if we don't
@@ -27,7 +28,7 @@ logger = logging.getLogger(__name__)
 try:
     from synapse.push.emailpusher import EmailPusher
     from synapse.push.mailer import Mailer, load_jinja2_templates
-except:
+except Exception:
     pass
 
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 43cb6e9c01..36bb5bbc65 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -14,13 +14,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from twisted.internet import defer
 
-from .pusher import PusherFactory
-from synapse.util.logcontext import preserve_fn, preserve_context_over_deferred
-from synapse.util.async import run_on_reactor
-
-import logging
+from synapse.push.pusher import PusherFactory
+from synapse.util.logcontext import make_deferred_yieldable, run_in_background
 
 logger = logging.getLogger(__name__)
 
@@ -103,23 +102,28 @@ class PusherPool:
                 yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
     @defer.inlineCallbacks
-    def remove_pushers_by_user(self, user_id, except_access_token_id=None):
-        all = yield self.store.get_all_pushers()
-        logger.info(
-            "Removing all pushers for user %s except access tokens id %r",
-            user_id, except_access_token_id
-        )
-        for p in all:
-            if p['user_name'] == user_id and p['access_token'] != except_access_token_id:
+    def remove_pushers_by_access_token(self, user_id, access_tokens):
+        """Remove the pushers for a given user corresponding to a set of
+        access_tokens.
+
+        Args:
+            user_id (str): user to remove pushers for
+            access_tokens (Iterable[int]): access token *ids* to remove pushers
+                for
+        """
+        tokens = set(access_tokens)
+        for p in (yield self.store.get_pushers_by_user_id(user_id)):
+            if p['access_token'] in tokens:
                 logger.info(
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     p['app_id'], p['pushkey'], p['user_name']
                 )
-                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(
+                    p['app_id'], p['pushkey'], p['user_name'],
+                )
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_id, max_stream_id):
-        yield run_on_reactor()
         try:
             users_affected = yield self.store.get_push_action_users_in_range(
                 min_stream_id, max_stream_id
@@ -131,18 +135,20 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_notifications)(
-                                min_stream_id, max_stream_id
+                            run_in_background(
+                                p.on_new_notifications,
+                                min_stream_id, max_stream_id,
                             )
                         )
 
-            yield preserve_context_over_deferred(defer.gatherResults(deferreds))
-        except:
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
+        except Exception:
             logger.exception("Exception in pusher on_new_notifications")
 
     @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
-        yield run_on_reactor()
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive
@@ -158,11 +164,16 @@ class PusherPool:
                 if u in self.pushers:
                     for p in self.pushers[u].values():
                         deferreds.append(
-                            preserve_fn(p.on_new_receipts)(min_stream_id, max_stream_id)
+                            run_in_background(
+                                p.on_new_receipts,
+                                min_stream_id, max_stream_id,
+                            )
                         )
 
-            yield preserve_context_over_deferred(defer.gatherResults(deferreds))
-        except:
+            yield make_deferred_yieldable(
+                defer.gatherResults(deferreds, consumeErrors=True),
+            )
+        except Exception:
             logger.exception("Exception in pusher on_new_receipts")
 
     @defer.inlineCallbacks
@@ -188,7 +199,7 @@ class PusherPool:
         for pusherdict in pushers:
             try:
                 p = self.pusher_factory.create_pusher(pusherdict)
-            except:
+            except Exception:
                 logger.exception("Couldn't start a pusher: caught Exception")
                 continue
             if p:
@@ -201,7 +212,7 @@ class PusherPool:
                 if appid_pushkey in byuser:
                     byuser[appid_pushkey].on_stop()
                 byuser[appid_pushkey] = p
-                preserve_fn(p.on_started)()
+                run_in_background(p.on_started)
 
         logger.info("Started pushers")