summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py410
-rw-r--r--synapse/push/baserules.py48
-rw-r--r--synapse/push/httppusher.py146
-rw-r--r--synapse/push/pusherpool.py152
4 files changed, 756 insertions, 0 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
new file mode 100644
index 0000000000..28e5dae81d
--- /dev/null
+++ b/synapse/push/__init__.py
@@ -0,0 +1,410 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from synapse.streams.config import PaginationConfig
+from synapse.types import StreamToken, UserID
+
+import synapse.util.async
+import baserules
+
+import logging
+import fnmatch
+import json
+import re
+
+logger = logging.getLogger(__name__)
+
+
+class Pusher(object):
+    INITIAL_BACKOFF = 1000
+    MAX_BACKOFF = 60 * 60 * 1000
+    GIVE_UP_AFTER = 24 * 60 * 60 * 1000
+    DEFAULT_ACTIONS = ['notify']
+
+    INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$")
+
+    def __init__(self, _hs, instance_handle, user_name, app_id,
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
+        self.hs = _hs
+        self.evStreamHandler = self.hs.get_handlers().event_stream_handler
+        self.store = self.hs.get_datastore()
+        self.clock = self.hs.get_clock()
+        self.instance_handle = instance_handle
+        self.user_name = user_name
+        self.app_id = app_id
+        self.app_display_name = app_display_name
+        self.device_display_name = device_display_name
+        self.pushkey = pushkey
+        self.pushkey_ts = pushkey_ts
+        self.data = data
+        self.last_token = last_token
+        self.last_success = last_success  # not actually used
+        self.backoff_delay = Pusher.INITIAL_BACKOFF
+        self.failing_since = failing_since
+        self.alive = True
+
+        # The last value of last_active_time that we saw
+        self.last_last_active_time = 0
+        self.has_unread = True
+
+    @defer.inlineCallbacks
+    def _actions_for_event(self, ev):
+        """
+        This should take into account notification settings that the user
+        has configured both globally and per-room when we have the ability
+        to do such things.
+        """
+        if ev['user_id'] == self.user_name:
+            # let's assume you probably know about messages you sent yourself
+            defer.returnValue(['dont_notify'])
+
+        if ev['type'] == 'm.room.member':
+            if ev['state_key'] != self.user_name:
+                defer.returnValue(['dont_notify'])
+
+        rules = yield self.store.get_push_rules_for_user_name(self.user_name)
+
+        for r in rules:
+            r['conditions'] = json.loads(r['conditions'])
+            r['actions'] = json.loads(r['actions'])
+
+        user_name_localpart = UserID.from_string(self.user_name).localpart
+
+        rules.extend(baserules.make_base_rules(user_name_localpart))
+
+        # get *our* member event for display name matching
+        member_events_for_room = yield self.store.get_current_state(
+            room_id=ev['room_id'],
+            event_type='m.room.member',
+            state_key=None
+        )
+        my_display_name = None
+        room_member_count = 0
+        for mev in member_events_for_room:
+            if mev.content['membership'] != 'join':
+                continue
+
+            # This loop does two things:
+            # 1) Find our current display name
+            if mev.state_key == self.user_name and 'displayname' in mev.content:
+                my_display_name = mev.content['displayname']
+
+            # and 2) Get the number of people in that room
+            room_member_count += 1
+
+        for r in rules:
+            matches = True
+
+            conditions = r['conditions']
+            actions = r['actions']
+
+            for c in conditions:
+                matches &= self._event_fulfills_condition(
+                    ev, c, display_name=my_display_name,
+                    room_member_count=room_member_count
+                )
+            # ignore rules with no actions (we have an explict 'dont_notify'
+            if len(actions) == 0:
+                logger.warn(
+                    "Ignoring rule id %s with no actions for user %s" %
+                    (r['rule_id'], r['user_name'])
+                )
+                continue
+            if matches:
+                defer.returnValue(actions)
+
+        defer.returnValue(Pusher.DEFAULT_ACTIONS)
+
+    def _event_fulfills_condition(self, ev, condition, display_name, room_member_count):
+        if condition['kind'] == 'event_match':
+            if 'pattern' not in condition:
+                logger.warn("event_match condition with no pattern")
+                return False
+            pat = condition['pattern']
+
+            if pat.strip("*?[]") == pat:
+                # no special glob characters so we assume the user means
+                # 'contains this string' rather than 'is this string'
+                pat = "*%s*" % (pat,)
+
+            val = _value_for_dotted_key(condition['key'], ev)
+            if val is None:
+                return False
+            return fnmatch.fnmatch(val.upper(), pat.upper())
+        elif condition['kind'] == 'device':
+            if 'instance_handle' not in condition:
+                return True
+            return condition['instance_handle'] == self.instance_handle
+        elif condition['kind'] == 'contains_display_name':
+            # This is special because display names can be different
+            # between rooms and so you can't really hard code it in a rule.
+            # Optimisation: we should cache these names and update them from
+            # the event stream.
+            if 'content' not in ev or 'body' not in ev['content']:
+                return False
+            if not display_name:
+                return False
+            return fnmatch.fnmatch(
+                ev['content']['body'].upper(), "*%s*" % (display_name.upper(),)
+            )
+        elif condition['kind'] == 'room_member_count':
+            if 'is' not in condition:
+                return False
+            m = Pusher.INEQUALITY_EXPR.match(condition['is'])
+            if not m:
+                return False
+            ineq = m.group(1)
+            rhs = m.group(2)
+            if not rhs.isdigit():
+                return False
+            rhs = int(rhs)
+
+            if ineq == '' or ineq == '==':
+                return room_member_count == rhs
+            elif ineq == '<':
+                return room_member_count < rhs
+            elif ineq == '>':
+                return room_member_count > rhs
+            elif ineq == '>=':
+                return room_member_count >= rhs
+            elif ineq == '<=':
+                return room_member_count <= rhs
+            else:
+                return False
+        else:
+            return True
+
+    @defer.inlineCallbacks
+    def get_context_for_event(self, ev):
+        name_aliases = yield self.store.get_room_name_and_aliases(
+            ev['room_id']
+        )
+
+        ctx = {'aliases': name_aliases[1]}
+        if name_aliases[0] is not None:
+            ctx['name'] = name_aliases[0]
+
+        their_member_events_for_room = yield self.store.get_current_state(
+            room_id=ev['room_id'],
+            event_type='m.room.member',
+            state_key=ev['user_id']
+        )
+        for mev in their_member_events_for_room:
+            if mev.content['membership'] == 'join' and 'displayname' in mev.content:
+                dn = mev.content['displayname']
+                if dn is not None:
+                    ctx['sender_display_name'] = dn
+
+        defer.returnValue(ctx)
+
+    @defer.inlineCallbacks
+    def start(self):
+        if not self.last_token:
+            # First-time setup: get a token to start from (we can't
+            # just start from no token, ie. 'now'
+            # because we need the result to be reproduceable in case
+            # we fail to dispatch the push)
+            config = PaginationConfig(from_token=None, limit='1')
+            chunk = yield self.evStreamHandler.get_stream(
+                self.user_name, config, timeout=0)
+            self.last_token = chunk['end']
+            self.store.update_pusher_last_token(
+                self.user_name, self.pushkey, self.last_token)
+            logger.info("Pusher %s for user %s starting from token %s",
+                        self.pushkey, self.user_name, self.last_token)
+
+        while self.alive:
+            from_tok = StreamToken.from_string(self.last_token)
+            config = PaginationConfig(from_token=from_tok, limit='1')
+            chunk = yield self.evStreamHandler.get_stream(
+                self.user_name, config,
+                timeout=100*365*24*60*60*1000, affect_presence=False
+            )
+
+            # limiting to 1 may get 1 event plus 1 presence event, so
+            # pick out the actual event
+            single_event = None
+            for c in chunk['chunk']:
+                if 'event_id' in c:  # Hmmm...
+                    single_event = c
+                    break
+            if not single_event:
+                self.last_token = chunk['end']
+                continue
+
+            if not self.alive:
+                continue
+
+            processed = False
+            actions = yield self._actions_for_event(single_event)
+            tweaks = _tweaks_for_actions(actions)
+
+            if len(actions) == 0:
+                logger.warn("Empty actions! Using default action.")
+                actions = Pusher.DEFAULT_ACTIONS
+            if 'notify' not in actions and 'dont_notify' not in actions:
+                logger.warn("Neither notify nor dont_notify in actions: adding default")
+                actions.extend(Pusher.DEFAULT_ACTIONS)
+            if 'dont_notify' in actions:
+                logger.debug(
+                    "%s for %s: dont_notify",
+                    single_event['event_id'], self.user_name
+                )
+                processed = True
+            else:
+                rejected = yield self.dispatch_push(single_event, tweaks)
+                self.has_unread = True
+                if isinstance(rejected, list) or isinstance(rejected, tuple):
+                    processed = True
+                    for pk in rejected:
+                        if pk != self.pushkey:
+                            # for sanity, we only remove the pushkey if it
+                            # was the one we actually sent...
+                            logger.warn(
+                                ("Ignoring rejected pushkey %s because we"
+                                 " didn't send it"), pk
+                            )
+                        else:
+                            logger.info(
+                                "Pushkey %s was rejected: removing",
+                                pk
+                            )
+                            yield self.hs.get_pusherpool().remove_pusher(
+                                self.app_id, pk
+                            )
+
+            if not self.alive:
+                continue
+
+            if processed:
+                self.backoff_delay = Pusher.INITIAL_BACKOFF
+                self.last_token = chunk['end']
+                self.store.update_pusher_last_token_and_success(
+                    self.user_name,
+                    self.pushkey,
+                    self.last_token,
+                    self.clock.time_msec()
+                )
+                if self.failing_since:
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(
+                        self.user_name,
+                        self.pushkey,
+                        self.failing_since)
+            else:
+                if not self.failing_since:
+                    self.failing_since = self.clock.time_msec()
+                    self.store.update_pusher_failing_since(
+                        self.user_name,
+                        self.pushkey,
+                        self.failing_since
+                    )
+
+                if (self.failing_since and
+                   self.failing_since <
+                   self.clock.time_msec() - Pusher.GIVE_UP_AFTER):
+                    # we really only give up so that if the URL gets
+                    # fixed, we don't suddenly deliver a load
+                    # of old notifications.
+                    logger.warn("Giving up on a notification to user %s, "
+                                "pushkey %s",
+                                self.user_name, self.pushkey)
+                    self.backoff_delay = Pusher.INITIAL_BACKOFF
+                    self.last_token = chunk['end']
+                    self.store.update_pusher_last_token(
+                        self.user_name,
+                        self.pushkey,
+                        self.last_token
+                    )
+
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(
+                        self.user_name,
+                        self.pushkey,
+                        self.failing_since
+                    )
+                else:
+                    logger.warn("Failed to dispatch push for user %s "
+                                "(failing for %dms)."
+                                "Trying again in %dms",
+                                self.user_name,
+                                self.clock.time_msec() - self.failing_since,
+                                self.backoff_delay)
+                    yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+                    self.backoff_delay *= 2
+                    if self.backoff_delay > Pusher.MAX_BACKOFF:
+                        self.backoff_delay = Pusher.MAX_BACKOFF
+
+    def stop(self):
+        self.alive = False
+
+    def dispatch_push(self, p, tweaks):
+        """
+        Overridden by implementing classes to actually deliver the notification
+        Args:
+            p: The event to notify for as a single event from the event stream
+        Returns: If the notification was delivered, an array containing any
+                 pushkeys that were rejected by the push gateway.
+                 False if the notification could not be delivered (ie.
+                 should be retried).
+        """
+        pass
+
+    def reset_badge_count(self):
+        pass
+
+    def presence_changed(self, state):
+        """
+        We clear badge counts whenever a user's last_active time is bumped
+        This is by no means perfect but I think it's the best we can do
+        without read receipts.
+        """
+        if 'last_active' in state.state:
+            last_active = state.state['last_active']
+            if last_active > self.last_last_active_time:
+                self.last_last_active_time = last_active
+                if self.has_unread:
+                    logger.info("Resetting badge count for %s", self.user_name)
+                    self.reset_badge_count()
+                    self.has_unread = False
+
+
+def _value_for_dotted_key(dotted_key, event):
+    parts = dotted_key.split(".")
+    val = event
+    while len(parts) > 0:
+        if parts[0] not in val:
+            return None
+        val = val[parts[0]]
+        parts = parts[1:]
+    return val
+
+
+def _tweaks_for_actions(actions):
+    tweaks = {}
+    for a in actions:
+        if not isinstance(a, dict):
+            continue
+        if 'set_sound' in a:
+            tweaks['sound'] = a['set_sound']
+    return tweaks
+
+
+class PusherConfigException(Exception):
+    def __init__(self, msg):
+        super(PusherConfigException, self).__init__(msg)
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
new file mode 100644
index 0000000000..382de118e0
--- /dev/null
+++ b/synapse/push/baserules.py
@@ -0,0 +1,48 @@
+def make_base_rules(user_name):
+    rules = [
+        {
+            'conditions': [
+                {
+                    'kind': 'event_match',
+                    'key': 'content.body',
+                    'pattern': '*%s*' % (user_name,), # Matrix ID match
+                }
+            ],
+            'actions': [
+                'notify',
+                {
+                    'set_sound': 'default'
+                }
+            ]
+        },
+        {
+            'conditions': [
+                {
+                    'kind': 'contains_display_name'
+                }
+            ],
+            'actions': [
+                'notify',
+                {
+                    'set_sound': 'default'
+                }
+            ]
+        },
+        {
+            'conditions': [
+                {
+                    'kind': 'room_member_count',
+                    'is': '2'
+                }
+            ],
+            'actions': [
+                'notify',
+                {
+                    'set_sound': 'default'
+                }
+            ]
+        }
+    ]
+    for r in rules:
+        r['priority_class'] = 0
+    return rules
\ No newline at end of file
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
new file mode 100644
index 0000000000..7c6953c989
--- /dev/null
+++ b/synapse/push/httppusher.py
@@ -0,0 +1,146 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from synapse.push import Pusher, PusherConfigException
+from synapse.http.client import SimpleHttpClient
+
+from twisted.internet import defer
+
+import logging
+
+logger = logging.getLogger(__name__)
+
+
+class HttpPusher(Pusher):
+    def __init__(self, _hs, instance_handle, user_name, app_id,
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
+        super(HttpPusher, self).__init__(
+            _hs,
+            instance_handle,
+            user_name,
+            app_id,
+            app_display_name,
+            device_display_name,
+            pushkey,
+            pushkey_ts,
+            data,
+            last_token,
+            last_success,
+            failing_since
+        )
+        if 'url' not in data:
+            raise PusherConfigException(
+                "'url' required in data for HTTP pusher"
+            )
+        self.url = data['url']
+        self.httpCli = SimpleHttpClient(self.hs)
+        self.data_minus_url = {}
+        self.data_minus_url.update(self.data)
+        del self.data_minus_url['url']
+
+    @defer.inlineCallbacks
+    def _build_notification_dict(self, event, tweaks):
+        # we probably do not want to push for every presence update
+        # (we may want to be able to set up notifications when specific
+        # people sign in, but we'd want to only deliver the pertinent ones)
+        # Actually, presence events will not get this far now because we
+        # need to filter them out in the main Pusher code.
+        if 'event_id' not in event:
+            defer.returnValue(None)
+
+        ctx = yield self.get_context_for_event(event)
+
+        d = {
+            'notification': {
+                'id': event['event_id'],
+                'type': event['type'],
+                'sender': event['user_id'],
+                'counts': {  # -- we don't mark messages as read yet so
+                             # we have no way of knowing
+                    # Just set the badge to 1 until we have read receipts
+                    'unread': 1,
+                    # 'missed_calls': 2
+                },
+                'devices': [
+                    {
+                        'app_id': self.app_id,
+                        'pushkey': self.pushkey,
+                        'pushkey_ts': long(self.pushkey_ts / 1000),
+                        'data': self.data_minus_url,
+                        'tweaks': tweaks
+                    }
+                ]
+            }
+        }
+        if event['type'] == 'm.room.member':
+            d['notification']['membership'] = event['content']['membership']
+        if 'content' in event:
+            d['notification']['content'] = event['content']
+
+        if len(ctx['aliases']):
+            d['notification']['room_alias'] = ctx['aliases'][0]
+        if 'sender_display_name' in ctx and len(ctx['sender_display_name']) > 0:
+            d['notification']['sender_display_name'] = ctx['sender_display_name']
+        if 'name' in ctx and len(ctx['name']) > 0:
+            d['notification']['room_name'] = ctx['name']
+
+        defer.returnValue(d)
+
+    @defer.inlineCallbacks
+    def dispatch_push(self, event, tweaks):
+        notification_dict = yield self._build_notification_dict(event, tweaks)
+        if not notification_dict:
+            defer.returnValue([])
+        try:
+            resp = yield self.httpCli.post_json_get_json(self.url, notification_dict)
+        except:
+            logger.exception("Failed to push %s ", self.url)
+            defer.returnValue(False)
+        rejected = []
+        if 'rejected' in resp:
+            rejected = resp['rejected']
+        defer.returnValue(rejected)
+
+    @defer.inlineCallbacks
+    def reset_badge_count(self):
+        d = {
+            'notification': {
+                'id': '',
+                'type': None,
+                'sender': '',
+                'counts': {
+                    'unread': 0,
+                    'missed_calls': 0
+                },
+                'devices': [
+                    {
+                        'app_id': self.app_id,
+                        'pushkey': self.pushkey,
+                        'pushkey_ts': long(self.pushkey_ts / 1000),
+                        'data': self.data_minus_url,
+                    }
+                ]
+            }
+        }
+        try:
+            resp = yield self.httpCli.post_json_get_json(self.url, d)
+        except:
+            logger.exception("Failed to push %s ", self.url)
+            defer.returnValue(False)
+        rejected = []
+        if 'rejected' in resp:
+            rejected = resp['rejected']
+        defer.returnValue(rejected)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
new file mode 100644
index 0000000000..4892c21e7b
--- /dev/null
+++ b/synapse/push/pusherpool.py
@@ -0,0 +1,152 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer
+
+from httppusher import HttpPusher
+from synapse.push import PusherConfigException
+
+import logging
+import json
+
+logger = logging.getLogger(__name__)
+
+
+class PusherPool:
+    def __init__(self, _hs):
+        self.hs = _hs
+        self.store = self.hs.get_datastore()
+        self.pushers = {}
+        self.last_pusher_started = -1
+
+        distributor = self.hs.get_distributor()
+        distributor.observe(
+            "user_presence_changed", self.user_presence_changed
+        )
+
+    @defer.inlineCallbacks
+    def user_presence_changed(self, user, state):
+        user_name = user.to_string()
+
+        # until we have read receipts, pushers use this to reset a user's
+        # badge counters to zero
+        for p in self.pushers.values():
+            if p.user_name == user_name:
+                yield p.presence_changed(state)
+
+    @defer.inlineCallbacks
+    def start(self):
+        pushers = yield self.store.get_all_pushers()
+        for p in pushers:
+            p['data'] = json.loads(p['data'])
+        self._start_pushers(pushers)
+
+    @defer.inlineCallbacks
+    def add_pusher(self, user_name, instance_handle, kind, app_id,
+                   app_display_name, device_display_name, pushkey, lang, data):
+        # we try to create the pusher just to validate the config: it
+        # will then get pulled out of the database,
+        # recreated, added and started: this means we have only one
+        # code path adding pushers.
+        self._create_pusher({
+            "user_name": user_name,
+            "kind": kind,
+            "instance_handle": instance_handle,
+            "app_id": app_id,
+            "app_display_name": app_display_name,
+            "device_display_name": device_display_name,
+            "pushkey": pushkey,
+            "pushkey_ts": self.hs.get_clock().time_msec(),
+            "lang": lang,
+            "data": data,
+            "last_token": None,
+            "last_success": None,
+            "failing_since": None
+        })
+        yield self._add_pusher_to_store(
+            user_name, instance_handle, kind, app_id,
+            app_display_name, device_display_name,
+            pushkey, lang, data
+        )
+
+    @defer.inlineCallbacks
+    def _add_pusher_to_store(self, user_name, instance_handle, kind, app_id,
+                             app_display_name, device_display_name,
+                             pushkey, lang, data):
+        yield self.store.add_pusher(
+            user_name=user_name,
+            instance_handle=instance_handle,
+            kind=kind,
+            app_id=app_id,
+            app_display_name=app_display_name,
+            device_display_name=device_display_name,
+            pushkey=pushkey,
+            pushkey_ts=self.hs.get_clock().time_msec(),
+            lang=lang,
+            data=json.dumps(data)
+        )
+        self._refresh_pusher((app_id, pushkey))
+
+    def _create_pusher(self, pusherdict):
+        if pusherdict['kind'] == 'http':
+            return HttpPusher(
+                self.hs,
+                instance_handle=pusherdict['instance_handle'],
+                user_name=pusherdict['user_name'],
+                app_id=pusherdict['app_id'],
+                app_display_name=pusherdict['app_display_name'],
+                device_display_name=pusherdict['device_display_name'],
+                pushkey=pusherdict['pushkey'],
+                pushkey_ts=pusherdict['pushkey_ts'],
+                data=pusherdict['data'],
+                last_token=pusherdict['last_token'],
+                last_success=pusherdict['last_success'],
+                failing_since=pusherdict['failing_since']
+            )
+        else:
+            raise PusherConfigException(
+                "Unknown pusher type '%s' for user %s" %
+                (pusherdict['kind'], pusherdict['user_name'])
+            )
+
+    @defer.inlineCallbacks
+    def _refresh_pusher(self, app_id_pushkey):
+        p = yield self.store.get_pushers_by_app_id_and_pushkey(
+            app_id_pushkey
+        )
+        p['data'] = json.loads(p['data'])
+
+        self._start_pushers([p])
+
+    def _start_pushers(self, pushers):
+        logger.info("Starting %d pushers", len(pushers))
+        for pusherdict in pushers:
+            p = self._create_pusher(pusherdict)
+            if p:
+                fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey'])
+                if fullid in self.pushers:
+                    self.pushers[fullid].stop()
+                self.pushers[fullid] = p
+                p.start()
+
+    @defer.inlineCallbacks
+    def remove_pusher(self, app_id, pushkey):
+        fullid = "%s:%s" % (app_id, pushkey)
+        if fullid in self.pushers:
+            logger.info("Stopping pusher %s", fullid)
+            self.pushers[fullid].stop()
+            del self.pushers[fullid]
+        yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)