summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2016-02-10 14:52:06 +0000
committerErik Johnston <erik@matrix.org>2016-02-10 14:52:06 +0000
commit907c1faf1e4655659d9a9db1615e911640b9f383 (patch)
treee9f18d4f3044cab76ab5ae119505ea651194db5e /synapse/push
parentMerge pull request #476 from koobs/patch-1 (diff)
parentUpdate CHANGES (diff)
downloadsynapse-907c1faf1e4655659d9a9db1615e911640b9f383.tar.xz
Merge branch 'release-v0.13.0' of github.com:matrix-org/synapse v0.13.0
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py210
-rw-r--r--synapse/push/action_generator.py48
-rw-r--r--synapse/push/baserules.py399
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py162
-rw-r--r--synapse/push/httppusher.py22
-rw-r--r--synapse/push/push_rule_evaluator.py310
-rw-r--r--synapse/push/pusherpool.py50
-rw-r--r--synapse/push/rulekinds.py2
8 files changed, 780 insertions, 423 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index e7c964bcd2..8da2d8716c 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,6 +17,8 @@ from twisted.internet import defer
 
 from synapse.streams.config import PaginationConfig
 from synapse.types import StreamToken
+from synapse.util.logcontext import LoggingContext
+from synapse.util.metrics import Measure
 
 import synapse.util.async
 import push_rule_evaluator as push_rule_evaluator
@@ -27,12 +29,25 @@ import random
 logger = logging.getLogger(__name__)
 
 
+_NEXT_ID = 1
+
+
+def _get_next_id():
+    global _NEXT_ID
+    _id = _NEXT_ID
+    _NEXT_ID += 1
+    return _id
+
+
+# Pushers could now be moved to pull out of the event_push_actions table instead
+# of listening on the event stream: this would avoid them having to run the
+# rules again.
 class Pusher(object):
     INITIAL_BACKOFF = 1000
     MAX_BACKOFF = 60 * 60 * 1000
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
-    def __init__(self, _hs, profile_tag, user_name, app_id,
+    def __init__(self, _hs, profile_tag, user_id, app_id,
                  app_display_name, device_display_name, pushkey, pushkey_ts,
                  data, last_token, last_success, failing_since):
         self.hs = _hs
@@ -40,7 +55,7 @@ class Pusher(object):
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
         self.profile_tag = profile_tag
-        self.user_name = user_name
+        self.user_id = user_id
         self.app_id = app_id
         self.app_display_name = app_display_name
         self.device_display_name = device_display_name
@@ -52,6 +67,9 @@ class Pusher(object):
         self.backoff_delay = Pusher.INITIAL_BACKOFF
         self.failing_since = failing_since
         self.alive = True
+        self.badge = None
+
+        self.name = "Pusher-%d" % (_get_next_id(),)
 
         # The last value of last_active_time that we saw
         self.last_last_active_time = 0
@@ -82,64 +100,82 @@ class Pusher(object):
 
     @defer.inlineCallbacks
     def start(self):
-        if not self.last_token:
-            # First-time setup: get a token to start from (we can't
-            # just start from no token, ie. 'now'
-            # because we need the result to be reproduceable in case
-            # we fail to dispatch the push)
-            config = PaginationConfig(from_token=None, limit='1')
-            chunk = yield self.evStreamHandler.get_stream(
-                self.user_name, config, timeout=0, affect_presence=False,
-                only_room_events=True
-            )
-            self.last_token = chunk['end']
-            self.store.update_pusher_last_token(
-                self.app_id, self.pushkey, self.user_name, self.last_token
-            )
-            logger.info("Pusher %s for user %s starting from token %s",
-                        self.pushkey, self.user_name, self.last_token)
-
-        wait = 0
-        while self.alive:
-            try:
-                if wait > 0:
-                    yield synapse.util.async.sleep(wait)
-                yield self.get_and_dispatch()
-                wait = 0
-            except:
-                if wait == 0:
-                    wait = 1
-                else:
-                    wait = min(wait * 2, 1800)
-                logger.exception(
-                    "Exception in pusher loop for pushkey %s. Pausing for %ds",
-                    self.pushkey, wait
+        with LoggingContext(self.name):
+            if not self.last_token:
+                # First-time setup: get a token to start from (we can't
+                # just start from no token, ie. 'now'
+                # because we need the result to be reproduceable in case
+                # we fail to dispatch the push)
+                config = PaginationConfig(from_token=None, limit='1')
+                chunk = yield self.evStreamHandler.get_stream(
+                    self.user_id, config, timeout=0, affect_presence=False
+                )
+                self.last_token = chunk['end']
+                yield self.store.update_pusher_last_token(
+                    self.app_id, self.pushkey, self.user_id, self.last_token
+                )
+                logger.info("New pusher %s for user %s starting from token %s",
+                            self.pushkey, self.user_id, self.last_token)
+
+            else:
+                logger.info(
+                    "Old pusher %s for user %s starting",
+                    self.pushkey, self.user_id,
                 )
 
+            wait = 0
+            while self.alive:
+                try:
+                    if wait > 0:
+                        yield synapse.util.async.sleep(wait)
+                    with Measure(self.clock, "push"):
+                        yield self.get_and_dispatch()
+                    wait = 0
+                except:
+                    if wait == 0:
+                        wait = 1
+                    else:
+                        wait = min(wait * 2, 1800)
+                    logger.exception(
+                        "Exception in pusher loop for pushkey %s. Pausing for %ds",
+                        self.pushkey, wait
+                    )
+
     @defer.inlineCallbacks
     def get_and_dispatch(self):
         from_tok = StreamToken.from_string(self.last_token)
         config = PaginationConfig(from_token=from_tok, limit='1')
         timeout = (300 + random.randint(-60, 60)) * 1000
         chunk = yield self.evStreamHandler.get_stream(
-            self.user_name, config, timeout=timeout, affect_presence=False,
-            only_room_events=True
+            self.user_id, config, timeout=timeout, affect_presence=False,
+            only_keys=("room", "receipt",),
         )
 
         # limiting to 1 may get 1 event plus 1 presence event, so
         # pick out the actual event
         single_event = None
+        read_receipt = None
         for c in chunk['chunk']:
             if 'event_id' in c:  # Hmmm...
                 single_event = c
-                break
+            elif c['type'] == 'm.receipt':
+                read_receipt = c
+
+        have_updated_badge = False
+        if read_receipt:
+            for receipt_part in read_receipt['content'].values():
+                if 'm.read' in receipt_part:
+                    if self.user_id in receipt_part['m.read'].keys():
+                        have_updated_badge = True
+
         if not single_event:
+            if have_updated_badge:
+                yield self.update_badge()
             self.last_token = chunk['end']
-            logger.debug("Event stream timeout for pushkey %s", self.pushkey)
             yield self.store.update_pusher_last_token(
                 self.app_id,
                 self.pushkey,
-                self.user_name,
+                self.user_id,
                 self.last_token
             )
             return
@@ -150,29 +186,16 @@ class Pusher(object):
         processed = False
 
         rule_evaluator = yield \
-            push_rule_evaluator.evaluator_for_user_name_and_profile_tag(
-                self.user_name, self.profile_tag, single_event['room_id'], self.store
+            push_rule_evaluator.evaluator_for_user_id_and_profile_tag(
+                self.user_id, self.profile_tag, single_event['room_id'], self.store
             )
 
         actions = yield rule_evaluator.actions_for_event(single_event)
         tweaks = rule_evaluator.tweaks_for_actions(actions)
 
-        if len(actions) == 0:
-            logger.warn("Empty actions! Using default action.")
-            actions = Pusher.DEFAULT_ACTIONS
-
-        if 'notify' not in actions and 'dont_notify' not in actions:
-            logger.warn("Neither notify nor dont_notify in actions: adding default")
-            actions.extend(Pusher.DEFAULT_ACTIONS)
-
-        if 'dont_notify' in actions:
-            logger.debug(
-                "%s for %s: dont_notify",
-                single_event['event_id'], self.user_name
-            )
-            processed = True
-        else:
-            rejected = yield self.dispatch_push(single_event, tweaks)
+        if 'notify' in actions:
+            self.badge = yield self._get_badge_count()
+            rejected = yield self.dispatch_push(single_event, tweaks, self.badge)
             self.has_unread = True
             if isinstance(rejected, list) or isinstance(rejected, tuple):
                 processed = True
@@ -190,8 +213,12 @@ class Pusher(object):
                             pk
                         )
                         yield self.hs.get_pusherpool().remove_pusher(
-                            self.app_id, pk, self.user_name
+                            self.app_id, pk, self.user_id
                         )
+        else:
+            if have_updated_badge:
+                yield self.update_badge()
+            processed = True
 
         if not self.alive:
             return
@@ -202,7 +229,7 @@ class Pusher(object):
             yield self.store.update_pusher_last_token_and_success(
                 self.app_id,
                 self.pushkey,
-                self.user_name,
+                self.user_id,
                 self.last_token,
                 self.clock.time_msec()
             )
@@ -211,7 +238,7 @@ class Pusher(object):
                 yield self.store.update_pusher_failing_since(
                     self.app_id,
                     self.pushkey,
-                    self.user_name,
+                    self.user_id,
                     self.failing_since)
         else:
             if not self.failing_since:
@@ -219,7 +246,7 @@ class Pusher(object):
                 yield self.store.update_pusher_failing_since(
                     self.app_id,
                     self.pushkey,
-                    self.user_name,
+                    self.user_id,
                     self.failing_since
                 )
 
@@ -231,13 +258,13 @@ class Pusher(object):
                 # of old notifications.
                 logger.warn("Giving up on a notification to user %s, "
                             "pushkey %s",
-                            self.user_name, self.pushkey)
+                            self.user_id, self.pushkey)
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
                 yield self.store.update_pusher_last_token(
                     self.app_id,
                     self.pushkey,
-                    self.user_name,
+                    self.user_id,
                     self.last_token
                 )
 
@@ -245,14 +272,14 @@ class Pusher(object):
                 yield self.store.update_pusher_failing_since(
                     self.app_id,
                     self.pushkey,
-                    self.user_name,
+                    self.user_id,
                     self.failing_since
                 )
             else:
                 logger.warn("Failed to dispatch push for user %s "
                             "(failing for %dms)."
                             "Trying again in %dms",
-                            self.user_name,
+                            self.user_id,
                             self.clock.time_msec() - self.failing_since,
                             self.backoff_delay)
                 yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
@@ -263,7 +290,7 @@ class Pusher(object):
     def stop(self):
         self.alive = False
 
-    def dispatch_push(self, p, tweaks):
+    def dispatch_push(self, p, tweaks, badge):
         """
         Overridden by implementing classes to actually deliver the notification
         Args:
@@ -275,23 +302,44 @@ class Pusher(object):
         """
         pass
 
-    def reset_badge_count(self):
-        pass
+    @defer.inlineCallbacks
+    def update_badge(self):
+        new_badge = yield self._get_badge_count()
+        if self.badge != new_badge:
+            self.badge = new_badge
+            yield self.send_badge(self.badge)
 
-    def presence_changed(self, state):
+    def send_badge(self, badge):
         """
-        We clear badge counts whenever a user's last_active time is bumped
-        This is by no means perfect but I think it's the best we can do
-        without read receipts.
+        Overridden by implementing classes to send an updated badge count
         """
-        if 'last_active' in state.state:
-            last_active = state.state['last_active']
-            if last_active > self.last_last_active_time:
-                self.last_last_active_time = last_active
-                if self.has_unread:
-                    logger.info("Resetting badge count for %s", self.user_name)
-                    self.reset_badge_count()
-                    self.has_unread = False
+        pass
+
+    @defer.inlineCallbacks
+    def _get_badge_count(self):
+        invites, joins = yield defer.gatherResults([
+            self.store.get_invites_for_user(self.user_id),
+            self.store.get_rooms_for_user(self.user_id),
+        ], consumeErrors=True)
+
+        my_receipts_by_room = yield self.store.get_receipts_for_user(
+            self.user_id,
+            "m.read",
+        )
+
+        badge = len(invites)
+
+        for r in joins:
+            if r.room_id in my_receipts_by_room:
+                last_unread_event_id = my_receipts_by_room[r.room_id]
+
+                notifs = yield (
+                    self.store.get_unread_event_push_actions_by_room_for_user(
+                        r.room_id, self.user_id, last_unread_event_id
+                    )
+                )
+                badge += notifs["notify_count"]
+        defer.returnValue(badge)
 
 
 class PusherConfigException(Exception):
diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py
new file mode 100644
index 0000000000..e0da0868ec
--- /dev/null
+++ b/synapse/push/action_generator.py
@@ -0,0 +1,48 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+import bulk_push_rule_evaluator
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class ActionGenerator:
+    def __init__(self, hs):
+        self.hs = hs
+        self.store = hs.get_datastore()
+        # really we want to get all user ids and all profile tags too,
+        # since we want the actions for each profile tag for every user and
+        # also actions for a client with no profile tag for each user.
+        # Currently the event stream doesn't support profile tags on an
+        # event stream, so we just run the rules for a client with no profile
+        # tag (ie. we just need all the users).
+
+    @defer.inlineCallbacks
+    def handle_push_actions_for_event(self, event, context, handler):
+        bulk_evaluator = yield bulk_push_rule_evaluator.evaluator_for_room_id(
+            event.room_id, self.hs, self.store
+        )
+
+        actions_by_user = yield bulk_evaluator.action_for_event_by_user(
+            event, handler, context.current_state
+        )
+
+        context.push_actions = [
+            (uid, None, actions) for uid, actions in actions_by_user.items()
+        ]
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 7f76382a17..0832c77cb4 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -1,4 +1,4 @@
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,27 +15,25 @@
 from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP
 
 
-def list_with_base_rules(rawrules, user_name):
+def list_with_base_rules(rawrules):
     ruleslist = []
 
     # shove the server default rules for each kind onto the end of each
     current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1]
 
     ruleslist.extend(make_base_prepend_rules(
-        user_name, PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
+        PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
     ))
 
     for r in rawrules:
         if r['priority_class'] < current_prio_class:
             while r['priority_class'] < current_prio_class:
                 ruleslist.extend(make_base_append_rules(
-                    user_name,
                     PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
                 ))
                 current_prio_class -= 1
                 if current_prio_class > 0:
                     ruleslist.extend(make_base_prepend_rules(
-                        user_name,
                         PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
                     ))
 
@@ -43,223 +41,240 @@ def list_with_base_rules(rawrules, user_name):
 
     while current_prio_class > 0:
         ruleslist.extend(make_base_append_rules(
-            user_name,
             PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
         ))
         current_prio_class -= 1
         if current_prio_class > 0:
             ruleslist.extend(make_base_prepend_rules(
-                user_name,
                 PRIORITY_CLASS_INVERSE_MAP[current_prio_class]
             ))
 
     return ruleslist
 
 
-def make_base_append_rules(user, kind):
+def make_base_append_rules(kind):
     rules = []
 
     if kind == 'override':
-        rules = make_base_append_override_rules()
+        rules = BASE_APPEND_OVRRIDE_RULES
     elif kind == 'underride':
-        rules = make_base_append_underride_rules(user)
+        rules = BASE_APPEND_UNDERRIDE_RULES
     elif kind == 'content':
-        rules = make_base_append_content_rules(user)
-
-    for r in rules:
-        r['priority_class'] = PRIORITY_CLASS_MAP[kind]
-        r['default'] = True  # Deprecated, left for backwards compat
+        rules = BASE_APPEND_CONTENT_RULES
 
     return rules
 
 
-def make_base_prepend_rules(user, kind):
+def make_base_prepend_rules(kind):
     rules = []
 
     if kind == 'override':
-        rules = make_base_prepend_override_rules()
-
-    for r in rules:
-        r['priority_class'] = PRIORITY_CLASS_MAP[kind]
-        r['default'] = True  # Deprecated, left for backwards compat
+        rules = BASE_PREPEND_OVERRIDE_RULES
 
     return rules
 
 
-def make_base_append_content_rules(user):
-    return [
-        {
-            'rule_id': 'global/content/.m.rule.contains_user_name',
-            'conditions': [
-                {
-                    'kind': 'event_match',
-                    'key': 'content.body',
-                    'pattern': user.localpart,  # Matrix ID match
-                }
-            ],
-            'actions': [
-                'notify',
-                {
-                    'set_tweak': 'sound',
-                    'value': 'default',
-                }, {
-                    'set_tweak': 'highlight'
-                }
-            ]
-        },
-    ]
+BASE_APPEND_CONTENT_RULES = [
+    {
+        'rule_id': 'global/content/.m.rule.contains_user_name',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'content.body',
+                'pattern_type': 'user_localpart'
+            }
+        ],
+        'actions': [
+            'notify',
+            {
+                'set_tweak': 'sound',
+                'value': 'default',
+            }, {
+                'set_tweak': 'highlight'
+            }
+        ]
+    },
+]
+
+
+BASE_PREPEND_OVERRIDE_RULES = [
+    {
+        'rule_id': 'global/override/.m.rule.master',
+        'enabled': False,
+        'conditions': [],
+        'actions': [
+            "dont_notify"
+        ]
+    }
+]
+
+
+BASE_APPEND_OVRRIDE_RULES = [
+    {
+        'rule_id': 'global/override/.m.rule.suppress_notices',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'content.msgtype',
+                'pattern': 'm.notice',
+                '_id': '_suppress_notices',
+            }
+        ],
+        'actions': [
+            'dont_notify',
+        ]
+    }
+]
+
 
+BASE_APPEND_UNDERRIDE_RULES = [
+    {
+        'rule_id': 'global/underride/.m.rule.call',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'type',
+                'pattern': 'm.call.invite',
+                '_id': '_call',
+            }
+        ],
+        'actions': [
+            'notify',
+            {
+                'set_tweak': 'sound',
+                'value': 'ring'
+            }, {
+                'set_tweak': 'highlight',
+                'value': False
+            }
+        ]
+    },
+    {
+        'rule_id': 'global/underride/.m.rule.contains_display_name',
+        'conditions': [
+            {
+                'kind': 'contains_display_name'
+            }
+        ],
+        'actions': [
+            'notify',
+            {
+                'set_tweak': 'sound',
+                'value': 'default'
+            }, {
+                'set_tweak': 'highlight'
+            }
+        ]
+    },
+    {
+        'rule_id': 'global/underride/.m.rule.room_one_to_one',
+        'conditions': [
+            {
+                'kind': 'room_member_count',
+                'is': '2',
+                '_id': 'member_count',
+            },
+            {
+                'kind': 'event_match',
+                'key': 'type',
+                'pattern': 'm.room.message',
+                '_id': '_message',
+            }
+        ],
+        'actions': [
+            'notify',
+            {
+                'set_tweak': 'sound',
+                'value': 'default'
+            }, {
+                'set_tweak': 'highlight',
+                'value': False
+            }
+        ]
+    },
+    {
+        'rule_id': 'global/underride/.m.rule.invite_for_me',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'type',
+                'pattern': 'm.room.member',
+                '_id': '_member',
+            },
+            {
+                'kind': 'event_match',
+                'key': 'content.membership',
+                'pattern': 'invite',
+                '_id': '_invite_member',
+            },
+            {
+                'kind': 'event_match',
+                'key': 'state_key',
+                'pattern_type': 'user_id'
+            },
+        ],
+        'actions': [
+            'notify',
+            {
+                'set_tweak': 'sound',
+                'value': 'default'
+            }, {
+                'set_tweak': 'highlight',
+                'value': False
+            }
+        ]
+    },
+    # This is too simple: https://matrix.org/jira/browse/SYN-607
+    # Removing for now
+    # {
+    #     'rule_id': 'global/underride/.m.rule.member_event',
+    #     'conditions': [
+    #         {
+    #             'kind': 'event_match',
+    #             'key': 'type',
+    #             'pattern': 'm.room.member',
+    #             '_id': '_member',
+    #         }
+    #     ],
+    #     'actions': [
+    #         'notify', {
+    #             'set_tweak': 'highlight',
+    #             'value': False
+    #         }
+    #     ]
+    # },
+    {
+        'rule_id': 'global/underride/.m.rule.message',
+        'conditions': [
+            {
+                'kind': 'event_match',
+                'key': 'type',
+                'pattern': 'm.room.message',
+                '_id': '_message',
+            }
+        ],
+        'actions': [
+            'notify', {
+                'set_tweak': 'highlight',
+                'value': False
+            }
+        ]
+    }
+]
 
-def make_base_prepend_override_rules():
-    return [
-        {
-            'rule_id': 'global/override/.m.rule.master',
-            'enabled': False,
-            'conditions': [],
-            'actions': [
-                "dont_notify"
-            ]
-        }
-    ]
 
+for r in BASE_APPEND_CONTENT_RULES:
+    r['priority_class'] = PRIORITY_CLASS_MAP['content']
+    r['default'] = True
 
-def make_base_append_override_rules():
-    return [
-        {
-            'rule_id': 'global/override/.m.rule.suppress_notices',
-            'conditions': [
-                {
-                    'kind': 'event_match',
-                    'key': 'content.msgtype',
-                    'pattern': 'm.notice',
-                }
-            ],
-            'actions': [
-                'dont_notify',
-            ]
-        }
-    ]
+for r in BASE_PREPEND_OVERRIDE_RULES:
+    r['priority_class'] = PRIORITY_CLASS_MAP['override']
+    r['default'] = True
 
+for r in BASE_APPEND_OVRRIDE_RULES:
+    r['priority_class'] = PRIORITY_CLASS_MAP['override']
+    r['default'] = True
 
-def make_base_append_underride_rules(user):
-    return [
-        {
-            'rule_id': 'global/underride/.m.rule.call',
-            'conditions': [
-                {
-                    'kind': 'event_match',
-                    'key': 'type',
-                    'pattern': 'm.call.invite',
-                }
-            ],
-            'actions': [
-                'notify',
-                {
-                    'set_tweak': 'sound',
-                    'value': 'ring'
-                }, {
-                    'set_tweak': 'highlight',
-                    'value': False
-                }
-            ]
-        },
-        {
-            'rule_id': 'global/underride/.m.rule.contains_display_name',
-            'conditions': [
-                {
-                    'kind': 'contains_display_name'
-                }
-            ],
-            'actions': [
-                'notify',
-                {
-                    'set_tweak': 'sound',
-                    'value': 'default'
-                }, {
-                    'set_tweak': 'highlight'
-                }
-            ]
-        },
-        {
-            'rule_id': 'global/underride/.m.rule.room_one_to_one',
-            'conditions': [
-                {
-                    'kind': 'room_member_count',
-                    'is': '2'
-                }
-            ],
-            'actions': [
-                'notify',
-                {
-                    'set_tweak': 'sound',
-                    'value': 'default'
-                }, {
-                    'set_tweak': 'highlight',
-                    'value': False
-                }
-            ]
-        },
-        {
-            'rule_id': 'global/underride/.m.rule.invite_for_me',
-            'conditions': [
-                {
-                    'kind': 'event_match',
-                    'key': 'type',
-                    'pattern': 'm.room.member',
-                },
-                {
-                    'kind': 'event_match',
-                    'key': 'content.membership',
-                    'pattern': 'invite',
-                },
-                {
-                    'kind': 'event_match',
-                    'key': 'state_key',
-                    'pattern': user.to_string(),
-                },
-            ],
-            'actions': [
-                'notify',
-                {
-                    'set_tweak': 'sound',
-                    'value': 'default'
-                }, {
-                    'set_tweak': 'highlight',
-                    'value': False
-                }
-            ]
-        },
-        {
-            'rule_id': 'global/underride/.m.rule.member_event',
-            'conditions': [
-                {
-                    'kind': 'event_match',
-                    'key': 'type',
-                    'pattern': 'm.room.member',
-                }
-            ],
-            'actions': [
-                'notify', {
-                    'set_tweak': 'highlight',
-                    'value': False
-                }
-            ]
-        },
-        {
-            'rule_id': 'global/underride/.m.rule.message',
-            'enabled': False,
-            'conditions': [
-                {
-                    'kind': 'event_match',
-                    'key': 'type',
-                    'pattern': 'm.room.message',
-                }
-            ],
-            'actions': [
-                'notify', {
-                    'set_tweak': 'highlight',
-                    'value': False
-                }
-            ]
-        }
-    ]
+for r in BASE_APPEND_UNDERRIDE_RULES:
+    r['priority_class'] = PRIORITY_CLASS_MAP['underride']
+    r['default'] = True
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
new file mode 100644
index 0000000000..8ac5ceb9ef
--- /dev/null
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -0,0 +1,162 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+import ujson as json
+
+from twisted.internet import defer
+
+import baserules
+from push_rule_evaluator import PushRuleEvaluatorForEvent
+
+from synapse.api.constants import EventTypes
+
+
+logger = logging.getLogger(__name__)
+
+
+def decode_rule_json(rule):
+    rule['conditions'] = json.loads(rule['conditions'])
+    rule['actions'] = json.loads(rule['actions'])
+    return rule
+
+
+@defer.inlineCallbacks
+def _get_rules(room_id, user_ids, store):
+    rules_by_user = yield store.bulk_get_push_rules(user_ids)
+    rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids)
+
+    rules_by_user = {
+        uid: baserules.list_with_base_rules([
+            decode_rule_json(rule_list)
+            for rule_list in rules_by_user.get(uid, [])
+        ])
+        for uid in user_ids
+    }
+
+    # We apply the rules-enabled map here: bulk_get_push_rules doesn't
+    # fetch disabled rules, but this won't account for any server default
+    # rules the user has disabled, so we need to do this too.
+    for uid in user_ids:
+        if uid not in rules_enabled_by_user:
+            continue
+
+        user_enabled_map = rules_enabled_by_user[uid]
+
+        for i, rule in enumerate(rules_by_user[uid]):
+            rule_id = rule['rule_id']
+
+            if rule_id in user_enabled_map:
+                if rule.get('enabled', True) != bool(user_enabled_map[rule_id]):
+                    # Rules are cached across users.
+                    rule = dict(rule)
+                    rule['enabled'] = bool(user_enabled_map[rule_id])
+                    rules_by_user[uid][i] = rule
+
+    defer.returnValue(rules_by_user)
+
+
+@defer.inlineCallbacks
+def evaluator_for_room_id(room_id, hs, store):
+    results = yield store.get_receipts_for_room(room_id, "m.read")
+    user_ids = [
+        row["user_id"] for row in results
+        if hs.is_mine_id(row["user_id"])
+    ]
+    rules_by_user = yield _get_rules(room_id, user_ids, store)
+
+    defer.returnValue(BulkPushRuleEvaluator(
+        room_id, rules_by_user, user_ids, store
+    ))
+
+
+class BulkPushRuleEvaluator:
+    """
+    Runs push rules for all users in a room.
+    This is faster than running PushRuleEvaluator for each user because it
+    fetches all the rules for all the users in one (batched) db query
+    rather than doing multiple queries per-user. It currently uses
+    the same logic to run the actual rules, but could be optimised further
+    (see https://matrix.org/jira/browse/SYN-562)
+    """
+    def __init__(self, room_id, rules_by_user, users_in_room, store):
+        self.room_id = room_id
+        self.rules_by_user = rules_by_user
+        self.users_in_room = users_in_room
+        self.store = store
+
+    @defer.inlineCallbacks
+    def action_for_event_by_user(self, event, handler, current_state):
+        actions_by_user = {}
+
+        users_dict = yield self.store.are_guests(self.rules_by_user.keys())
+
+        filtered_by_user = yield handler._filter_events_for_clients(
+            users_dict.items(), [event], {event.event_id: current_state}
+        )
+
+        evaluator = PushRuleEvaluatorForEvent(event, len(self.users_in_room))
+
+        condition_cache = {}
+
+        display_names = {}
+        for ev in current_state.values():
+            nm = ev.content.get("displayname", None)
+            if nm and ev.type == EventTypes.Member:
+                display_names[ev.state_key] = nm
+
+        for uid, rules in self.rules_by_user.items():
+            display_name = display_names.get(uid, None)
+
+            filtered = filtered_by_user[uid]
+            if len(filtered) == 0:
+                continue
+
+            if filtered[0].sender == uid:
+                continue
+
+            for rule in rules:
+                if 'enabled' in rule and not rule['enabled']:
+                    continue
+
+                matches = _condition_checker(
+                    evaluator, rule['conditions'], uid, display_name, condition_cache
+                )
+                if matches:
+                    actions = [x for x in rule['actions'] if x != 'dont_notify']
+                    if actions and 'notify' in actions:
+                        actions_by_user[uid] = actions
+                    break
+        defer.returnValue(actions_by_user)
+
+
+def _condition_checker(evaluator, conditions, uid, display_name, cache):
+    for cond in conditions:
+        _id = cond.get("_id", None)
+        if _id:
+            res = cache.get(_id, None)
+            if res is False:
+                return False
+            elif res is True:
+                continue
+
+        res = evaluator.matches(cond, uid, display_name, None)
+        if _id:
+            cache[_id] = bool(res)
+
+        if not res:
+            return False
+
+    return True
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 5160775e59..cdc4494928 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -23,13 +23,13 @@ logger = logging.getLogger(__name__)
 
 
 class HttpPusher(Pusher):
-    def __init__(self, _hs, profile_tag, user_name, app_id,
+    def __init__(self, _hs, profile_tag, user_id, app_id,
                  app_display_name, device_display_name, pushkey, pushkey_ts,
                  data, last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
             profile_tag,
-            user_name,
+            user_id,
             app_id,
             app_display_name,
             device_display_name,
@@ -51,7 +51,7 @@ class HttpPusher(Pusher):
         del self.data_minus_url['url']
 
     @defer.inlineCallbacks
-    def _build_notification_dict(self, event, tweaks):
+    def _build_notification_dict(self, event, tweaks, badge):
         # we probably do not want to push for every presence update
         # (we may want to be able to set up notifications when specific
         # people sign in, but we'd want to only deliver the pertinent ones)
@@ -71,7 +71,7 @@ class HttpPusher(Pusher):
                 'counts': {  # -- we don't mark messages as read yet so
                              # we have no way of knowing
                     # Just set the badge to 1 until we have read receipts
-                    'unread': 1,
+                    'unread': badge,
                     # 'missed_calls': 2
                 },
                 'devices': [
@@ -87,7 +87,7 @@ class HttpPusher(Pusher):
         }
         if event['type'] == 'm.room.member':
             d['notification']['membership'] = event['content']['membership']
-            d['notification']['user_is_target'] = event['state_key'] == self.user_name
+            d['notification']['user_is_target'] = event['state_key'] == self.user_id
         if 'content' in event:
             d['notification']['content'] = event['content']
 
@@ -101,8 +101,8 @@ class HttpPusher(Pusher):
         defer.returnValue(d)
 
     @defer.inlineCallbacks
-    def dispatch_push(self, event, tweaks):
-        notification_dict = yield self._build_notification_dict(event, tweaks)
+    def dispatch_push(self, event, tweaks, badge):
+        notification_dict = yield self._build_notification_dict(event, tweaks, badge)
         if not notification_dict:
             defer.returnValue([])
         try:
@@ -116,15 +116,15 @@ class HttpPusher(Pusher):
         defer.returnValue(rejected)
 
     @defer.inlineCallbacks
-    def reset_badge_count(self):
+    def send_badge(self, badge):
+        logger.info("Sending updated badge count %d to %r", badge, self.user_id)
         d = {
             'notification': {
                 'id': '',
                 'type': None,
                 'sender': '',
                 'counts': {
-                    'unread': 0,
-                    'missed_calls': 0
+                    'unread': badge
                 },
                 'devices': [
                     {
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 92c7fd048f..2a2b4437dc 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -1,5 +1,5 @@
 # -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,40 +15,71 @@
 
 from twisted.internet import defer
 
-from synapse.types import UserID
-
 import baserules
 
 import logging
 import simplejson as json
 import re
 
+from synapse.types import UserID
+from synapse.util.caches.lrucache import LruCache
+
 logger = logging.getLogger(__name__)
 
 
+GLOB_REGEX = re.compile(r'\\\[(\\\!|)(.*)\\\]')
+IS_GLOB = re.compile(r'[\?\*\[\]]')
+INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
+
+
 @defer.inlineCallbacks
-def evaluator_for_user_name_and_profile_tag(user_name, profile_tag, room_id, store):
-    rawrules = yield store.get_push_rules_for_user(user_name)
-    enabled_map = yield store.get_push_rules_enabled_for_user(user_name)
+def evaluator_for_user_id_and_profile_tag(user_id, profile_tag, room_id, store):
+    rawrules = yield store.get_push_rules_for_user(user_id)
+    enabled_map = yield store.get_push_rules_enabled_for_user(user_id)
     our_member_event = yield store.get_current_state(
         room_id=room_id,
         event_type='m.room.member',
-        state_key=user_name,
+        state_key=user_id,
     )
 
     defer.returnValue(PushRuleEvaluator(
-        user_name, profile_tag, rawrules, enabled_map,
+        user_id, profile_tag, rawrules, enabled_map,
         room_id, our_member_event, store
     ))
 
 
+def _room_member_count(ev, condition, room_member_count):
+    if 'is' not in condition:
+        return False
+    m = INEQUALITY_EXPR.match(condition['is'])
+    if not m:
+        return False
+    ineq = m.group(1)
+    rhs = m.group(2)
+    if not rhs.isdigit():
+        return False
+    rhs = int(rhs)
+
+    if ineq == '' or ineq == '==':
+        return room_member_count == rhs
+    elif ineq == '<':
+        return room_member_count < rhs
+    elif ineq == '>':
+        return room_member_count > rhs
+    elif ineq == '>=':
+        return room_member_count >= rhs
+    elif ineq == '<=':
+        return room_member_count <= rhs
+    else:
+        return False
+
+
 class PushRuleEvaluator:
-    DEFAULT_ACTIONS = ['dont_notify']
-    INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
+    DEFAULT_ACTIONS = []
 
-    def __init__(self, user_name, profile_tag, raw_rules, enabled_map, room_id,
+    def __init__(self, user_id, profile_tag, raw_rules, enabled_map, room_id,
                  our_member_event, store):
-        self.user_name = user_name
+        self.user_id = user_id
         self.profile_tag = profile_tag
         self.room_id = room_id
         self.our_member_event = our_member_event
@@ -61,8 +92,7 @@ class PushRuleEvaluator:
             rule['actions'] = json.loads(raw_rule['actions'])
             rules.append(rule)
 
-        user = UserID.from_string(self.user_name)
-        self.rules = baserules.list_with_base_rules(rules, user)
+        self.rules = baserules.list_with_base_rules(rules)
 
         self.enabled_map = enabled_map
 
@@ -83,9 +113,9 @@ class PushRuleEvaluator:
         has configured both globally and per-room when we have the ability
         to do such things.
         """
-        if ev['user_id'] == self.user_name:
+        if ev['user_id'] == self.user_id:
             # let's assume you probably know about messages you sent yourself
-            defer.returnValue(['dont_notify'])
+            defer.returnValue([])
 
         room_id = ev['room_id']
 
@@ -98,127 +128,195 @@ class PushRuleEvaluator:
         room_members = yield self.store.get_users_in_room(room_id)
         room_member_count = len(room_members)
 
+        evaluator = PushRuleEvaluatorForEvent(ev, room_member_count)
+
         for r in self.rules:
-            if r['rule_id'] in self.enabled_map:
-                r['enabled'] = self.enabled_map[r['rule_id']]
-            elif 'enabled' not in r:
-                r['enabled'] = True
-            if not r['enabled']:
+            enabled = self.enabled_map.get(r['rule_id'], None)
+            if enabled is not None and not enabled:
+                continue
+
+            if not r.get("enabled", True):
                 continue
-            matches = True
 
             conditions = r['conditions']
             actions = r['actions']
 
-            for c in conditions:
-                matches &= self._event_fulfills_condition(
-                    ev, c, display_name=my_display_name,
-                    room_member_count=room_member_count
-                )
-            logger.debug(
-                "Rule %s %s",
-                r['rule_id'], "matches" if matches else "doesn't match"
-            )
             # ignore rules with no actions (we have an explict 'dont_notify')
             if len(actions) == 0:
                 logger.warn(
                     "Ignoring rule id %s with no actions for user %s",
-                    r['rule_id'], self.user_name
+                    r['rule_id'], self.user_id
                 )
                 continue
+
+            matches = True
+            for c in conditions:
+                matches = evaluator.matches(
+                    c, self.user_id, my_display_name, self.profile_tag
+                )
+                if not matches:
+                    break
+
+            logger.debug(
+                "Rule %s %s",
+                r['rule_id'], "matches" if matches else "doesn't match"
+            )
+
             if matches:
-                logger.info(
+                logger.debug(
                     "%s matches for user %s, event %s",
-                    r['rule_id'], self.user_name, ev['event_id']
+                    r['rule_id'], self.user_id, ev['event_id']
                 )
+
+                # filter out dont_notify as we treat an empty actions list
+                # as dont_notify, and this doesn't take up a row in our database
+                actions = [x for x in actions if x != 'dont_notify']
+
                 defer.returnValue(actions)
 
-        logger.info(
+        logger.debug(
             "No rules match for user %s, event %s",
-            self.user_name, ev['event_id']
+            self.user_id, ev['event_id']
         )
         defer.returnValue(PushRuleEvaluator.DEFAULT_ACTIONS)
 
-    @staticmethod
-    def _glob_to_regexp(glob):
-        r = re.escape(glob)
-        r = re.sub(r'\\\*', r'.*?', r)
-        r = re.sub(r'\\\?', r'.', r)
-
-        # handle [abc], [a-z] and [!a-z] style ranges.
-        r = re.sub(r'\\\[(\\\!|)(.*)\\\]',
-                   lambda x: ('[%s%s]' % (x.group(1) and '^' or '',
-                                          re.sub(r'\\\-', '-', x.group(2)))), r)
-        return r
 
-    def _event_fulfills_condition(self, ev, condition, display_name, room_member_count):
-        if condition['kind'] == 'event_match':
-            if 'pattern' not in condition:
-                logger.warn("event_match condition with no pattern")
-                return False
-            # XXX: optimisation: cache our pattern regexps
-            if condition['key'] == 'content.body':
-                r = r'\b%s\b' % self._glob_to_regexp(condition['pattern'])
-            else:
-                r = r'^%s$' % self._glob_to_regexp(condition['pattern'])
-            val = _value_for_dotted_key(condition['key'], ev)
-            if val is None:
-                return False
-            return re.search(r, val, flags=re.IGNORECASE) is not None
+class PushRuleEvaluatorForEvent(object):
+    def __init__(self, event, room_member_count):
+        self._event = event
+        self._room_member_count = room_member_count
 
+        # Maps strings of e.g. 'content.body' -> event["content"]["body"]
+        self._value_cache = _flatten_dict(event)
+
+    def matches(self, condition, user_id, display_name, profile_tag):
+        if condition['kind'] == 'event_match':
+            return self._event_match(condition, user_id)
         elif condition['kind'] == 'device':
             if 'profile_tag' not in condition:
                 return True
-            return condition['profile_tag'] == self.profile_tag
-
+            return condition['profile_tag'] == profile_tag
         elif condition['kind'] == 'contains_display_name':
-            # This is special because display names can be different
-            # between rooms and so you can't really hard code it in a rule.
-            # Optimisation: we should cache these names and update them from
-            # the event stream.
-            if 'content' not in ev or 'body' not in ev['content']:
-                return False
-            if not display_name:
-                return False
-            return re.search(
-                r"\b%s\b" % re.escape(display_name), ev['content']['body'],
-                flags=re.IGNORECASE
-            ) is not None
-
+            return self._contains_display_name(display_name)
         elif condition['kind'] == 'room_member_count':
-            if 'is' not in condition:
-                return False
-            m = PushRuleEvaluator.INEQUALITY_EXPR.match(condition['is'])
-            if not m:
+            return _room_member_count(
+                self._event, condition, self._room_member_count
+            )
+        else:
+            return True
+
+    def _event_match(self, condition, user_id):
+        pattern = condition.get('pattern', None)
+
+        if not pattern:
+            pattern_type = condition.get('pattern_type', None)
+            if pattern_type == "user_id":
+                pattern = user_id
+            elif pattern_type == "user_localpart":
+                pattern = UserID.from_string(user_id).localpart
+
+        if not pattern:
+            logger.warn("event_match condition with no pattern")
+            return False
+
+        # XXX: optimisation: cache our pattern regexps
+        if condition['key'] == 'content.body':
+            body = self._event["content"].get("body", None)
+            if not body:
                 return False
-            ineq = m.group(1)
-            rhs = m.group(2)
-            if not rhs.isdigit():
+
+            return _glob_matches(pattern, body, word_boundary=True)
+        else:
+            haystack = self._get_value(condition['key'])
+            if haystack is None:
                 return False
-            rhs = int(rhs)
-
-            if ineq == '' or ineq == '==':
-                return room_member_count == rhs
-            elif ineq == '<':
-                return room_member_count < rhs
-            elif ineq == '>':
-                return room_member_count > rhs
-            elif ineq == '>=':
-                return room_member_count >= rhs
-            elif ineq == '<=':
-                return room_member_count <= rhs
+
+            return _glob_matches(pattern, haystack)
+
+    def _contains_display_name(self, display_name):
+        if not display_name:
+            return False
+
+        body = self._event["content"].get("body", None)
+        if not body:
+            return False
+
+        return _glob_matches(display_name, body, word_boundary=True)
+
+    def _get_value(self, dotted_key):
+        return self._value_cache.get(dotted_key, None)
+
+
+def _glob_matches(glob, value, word_boundary=False):
+    """Tests if value matches glob.
+
+    Args:
+        glob (string)
+        value (string): String to test against glob.
+        word_boundary (bool): Whether to match against word boundaries or entire
+            string. Defaults to False.
+
+    Returns:
+        bool
+    """
+    try:
+        if IS_GLOB.search(glob):
+            r = re.escape(glob)
+
+            r = r.replace(r'\*', '.*?')
+            r = r.replace(r'\?', '.')
+
+            # handle [abc], [a-z] and [!a-z] style ranges.
+            r = GLOB_REGEX.sub(
+                lambda x: (
+                    '[%s%s]' % (
+                        x.group(1) and '^' or '',
+                        x.group(2).replace(r'\\\-', '-')
+                    )
+                ),
+                r,
+            )
+            if word_boundary:
+                r = r"\b%s\b" % (r,)
+                r = _compile_regex(r)
+
+                return r.search(value)
             else:
-                return False
+                r = r + "$"
+                r = _compile_regex(r)
+
+                return r.match(value)
+        elif word_boundary:
+            r = re.escape(glob)
+            r = r"\b%s\b" % (r,)
+            r = _compile_regex(r)
+
+            return r.search(value)
         else:
-            return True
+            return value.lower() == glob.lower()
+    except re.error:
+        logger.warn("Failed to parse glob to regex: %r", glob)
+        return False
+
+
+def _flatten_dict(d, prefix=[], result={}):
+    for key, value in d.items():
+        if isinstance(value, basestring):
+            result[".".join(prefix + [key])] = value.lower()
+        elif hasattr(value, "items"):
+            _flatten_dict(value, prefix=(prefix + [key]), result=result)
+
+    return result
 
 
-def _value_for_dotted_key(dotted_key, event):
-    parts = dotted_key.split(".")
-    val = event
-    while len(parts) > 0:
-        if parts[0] not in val:
-            return None
-        val = val[parts[0]]
-        parts = parts[1:]
-    return val
+regex_cache = LruCache(5000)
+
+
+def _compile_regex(regex_str):
+    r = regex_cache.get(regex_str, None)
+    if r:
+        return r
+
+    r = re.compile(regex_str, flags=re.IGNORECASE)
+    regex_cache[regex_str] = r
+    return r
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index e012c565ee..d7dcb2de4b 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 # -*- coding: utf-8 -*-
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -18,6 +18,7 @@ from twisted.internet import defer
 
 from httppusher import HttpPusher
 from synapse.push import PusherConfigException
+from synapse.util.logcontext import preserve_fn
 
 import logging
 
@@ -31,35 +32,20 @@ class PusherPool:
         self.pushers = {}
         self.last_pusher_started = -1
 
-        distributor = self.hs.get_distributor()
-        distributor.observe(
-            "user_presence_changed", self.user_presence_changed
-        )
-
-    @defer.inlineCallbacks
-    def user_presence_changed(self, user, state):
-        user_name = user.to_string()
-
-        # until we have read receipts, pushers use this to reset a user's
-        # badge counters to zero
-        for p in self.pushers.values():
-            if p.user_name == user_name:
-                yield p.presence_changed(state)
-
     @defer.inlineCallbacks
     def start(self):
         pushers = yield self.store.get_all_pushers()
         self._start_pushers(pushers)
 
     @defer.inlineCallbacks
-    def add_pusher(self, user_name, access_token, profile_tag, kind, app_id,
+    def add_pusher(self, user_id, access_token, profile_tag, kind, app_id,
                    app_display_name, device_display_name, pushkey, lang, data):
         # we try to create the pusher just to validate the config: it
         # will then get pulled out of the database,
         # recreated, added and started: this means we have only one
         # code path adding pushers.
         self._create_pusher({
-            "user_name": user_name,
+            "user_name": user_id,
             "kind": kind,
             "profile_tag": profile_tag,
             "app_id": app_id,
@@ -74,7 +60,7 @@ class PusherPool:
             "failing_since": None
         })
         yield self._add_pusher_to_store(
-            user_name, access_token, profile_tag, kind, app_id,
+            user_id, access_token, profile_tag, kind, app_id,
             app_display_name, device_display_name,
             pushkey, lang, data
         )
@@ -91,7 +77,7 @@ class PusherPool:
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     app_id, pushkey, p['user_name']
                 )
-                self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
     @defer.inlineCallbacks
     def remove_pushers_by_user(self, user_id):
@@ -106,14 +92,14 @@ class PusherPool:
                     "Removing pusher for app id %s, pushkey %s, user %s",
                     p['app_id'], p['pushkey'], p['user_name']
                 )
-                self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
+                yield self.remove_pusher(p['app_id'], p['pushkey'], p['user_name'])
 
     @defer.inlineCallbacks
-    def _add_pusher_to_store(self, user_name, access_token, profile_tag, kind,
+    def _add_pusher_to_store(self, user_id, access_token, profile_tag, kind,
                              app_id, app_display_name, device_display_name,
                              pushkey, lang, data):
         yield self.store.add_pusher(
-            user_name=user_name,
+            user_id=user_id,
             access_token=access_token,
             profile_tag=profile_tag,
             kind=kind,
@@ -125,14 +111,14 @@ class PusherPool:
             lang=lang,
             data=data,
         )
-        self._refresh_pusher(app_id, pushkey, user_name)
+        yield self._refresh_pusher(app_id, pushkey, user_id)
 
     def _create_pusher(self, pusherdict):
         if pusherdict['kind'] == 'http':
             return HttpPusher(
                 self.hs,
                 profile_tag=pusherdict['profile_tag'],
-                user_name=pusherdict['user_name'],
+                user_id=pusherdict['user_name'],
                 app_id=pusherdict['app_id'],
                 app_display_name=pusherdict['app_display_name'],
                 device_display_name=pusherdict['device_display_name'],
@@ -150,14 +136,14 @@ class PusherPool:
             )
 
     @defer.inlineCallbacks
-    def _refresh_pusher(self, app_id, pushkey, user_name):
+    def _refresh_pusher(self, app_id, pushkey, user_id):
         resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
             app_id, pushkey
         )
 
         p = None
         for r in resultlist:
-            if r['user_name'] == user_name:
+            if r['user_name'] == user_id:
                 p = r
 
         if p:
@@ -181,17 +167,17 @@ class PusherPool:
                 if fullid in self.pushers:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
-                p.start()
+                preserve_fn(p.start)()
 
         logger.info("Started pushers")
 
     @defer.inlineCallbacks
-    def remove_pusher(self, app_id, pushkey, user_name):
-        fullid = "%s:%s:%s" % (app_id, pushkey, user_name)
+    def remove_pusher(self, app_id, pushkey, user_id):
+        fullid = "%s:%s:%s" % (app_id, pushkey, user_id)
         if fullid in self.pushers:
             logger.info("Stopping pusher %s", fullid)
             self.pushers[fullid].stop()
             del self.pushers[fullid]
-        yield self.store.delete_pusher_by_app_id_pushkey_user_name(
-            app_id, pushkey, user_name
+        yield self.store.delete_pusher_by_app_id_pushkey_user_id(
+            app_id, pushkey, user_id
         )
diff --git a/synapse/push/rulekinds.py b/synapse/push/rulekinds.py
index 4c591aa638..4cae48ac07 100644
--- a/synapse/push/rulekinds.py
+++ b/synapse/push/rulekinds.py
@@ -1,4 +1,4 @@
-# Copyright 2015 OpenMarket Ltd
+# Copyright 2015, 2016 OpenMarket Ltd
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.