diff options
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/__init__.py | 423 | ||||
-rw-r--r-- | synapse/push/baserules.py | 97 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 146 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 152 | ||||
-rw-r--r-- | synapse/push/rulekinds.py | 8 |
5 files changed, 826 insertions, 0 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py new file mode 100644 index 0000000000..418a348a58 --- /dev/null +++ b/synapse/push/__init__.py @@ -0,0 +1,423 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.streams.config import PaginationConfig +from synapse.types import StreamToken, UserID + +import synapse.util.async +import baserules + +import logging +import json +import re + +logger = logging.getLogger(__name__) + + +class Pusher(object): + INITIAL_BACKOFF = 1000 + MAX_BACKOFF = 60 * 60 * 1000 + GIVE_UP_AFTER = 24 * 60 * 60 * 1000 + DEFAULT_ACTIONS = ['notify'] + + INEQUALITY_EXPR = re.compile("^([=<>]*)([0-9]*)$") + + def __init__(self, _hs, profile_tag, user_name, app_id, + app_display_name, device_display_name, pushkey, pushkey_ts, + data, last_token, last_success, failing_since): + self.hs = _hs + self.evStreamHandler = self.hs.get_handlers().event_stream_handler + self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() + self.profile_tag = profile_tag + self.user_name = user_name + self.app_id = app_id + self.app_display_name = app_display_name + self.device_display_name = device_display_name + self.pushkey = pushkey + self.pushkey_ts = pushkey_ts + self.data = data + self.last_token = last_token + self.last_success = last_success # not actually used + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.failing_since = failing_since + self.alive = True + + # The last value of last_active_time that we saw + self.last_last_active_time = 0 + self.has_unread = True + + @defer.inlineCallbacks + def _actions_for_event(self, ev): + """ + This should take into account notification settings that the user + has configured both globally and per-room when we have the ability + to do such things. + """ + if ev['user_id'] == self.user_name: + # let's assume you probably know about messages you sent yourself + defer.returnValue(['dont_notify']) + + if ev['type'] == 'm.room.member': + if ev['state_key'] != self.user_name: + defer.returnValue(['dont_notify']) + + rawrules = yield self.store.get_push_rules_for_user_name(self.user_name) + + for r in rawrules: + r['conditions'] = json.loads(r['conditions']) + r['actions'] = json.loads(r['actions']) + + user = UserID.from_string(self.user_name) + + rules = baserules.list_with_base_rules(rawrules, user) + + # get *our* member event for display name matching + member_events_for_room = yield self.store.get_current_state( + room_id=ev['room_id'], + event_type='m.room.member', + state_key=None + ) + my_display_name = None + room_member_count = 0 + for mev in member_events_for_room: + if mev.content['membership'] != 'join': + continue + + # This loop does two things: + # 1) Find our current display name + if mev.state_key == self.user_name and 'displayname' in mev.content: + my_display_name = mev.content['displayname'] + + # and 2) Get the number of people in that room + room_member_count += 1 + + for r in rules: + matches = True + + conditions = r['conditions'] + actions = r['actions'] + + for c in conditions: + matches &= self._event_fulfills_condition( + ev, c, display_name=my_display_name, + room_member_count=room_member_count + ) + # ignore rules with no actions (we have an explict 'dont_notify' + if len(actions) == 0: + logger.warn( + "Ignoring rule id %s with no actions for user %s" % + (r['rule_id'], r['user_name']) + ) + continue + if matches: + defer.returnValue(actions) + + defer.returnValue(Pusher.DEFAULT_ACTIONS) + + @staticmethod + def _glob_to_regexp(glob): + r = re.escape(glob) + r = re.sub(r'\\\*', r'.*?', r) + r = re.sub(r'\\\?', r'.', r) + + # handle [abc], [a-z] and [!a-z] style ranges. + r = re.sub(r'\\\[(\\\!|)(.*)\\\]', + lambda x: ('[%s%s]' % (x.group(1) and '^' or '', + re.sub(r'\\\-', '-', x.group(2)))), r) + return r + + def _event_fulfills_condition(self, ev, condition, display_name, room_member_count): + if condition['kind'] == 'event_match': + if 'pattern' not in condition: + logger.warn("event_match condition with no pattern") + return False + # XXX: optimisation: cache our pattern regexps + if condition['key'] == 'content.body': + r = r'\b%s\b' % self._glob_to_regexp(condition['pattern']) + else: + r = r'^%s$' % self._glob_to_regexp(condition['pattern']) + val = _value_for_dotted_key(condition['key'], ev) + if val is None: + return False + return re.search(r, val, flags=re.IGNORECASE) is not None + + elif condition['kind'] == 'device': + if 'profile_tag' not in condition: + return True + return condition['profile_tag'] == self.profile_tag + + elif condition['kind'] == 'contains_display_name': + # This is special because display names can be different + # between rooms and so you can't really hard code it in a rule. + # Optimisation: we should cache these names and update them from + # the event stream. + if 'content' not in ev or 'body' not in ev['content']: + return False + if not display_name: + return False + return re.search( + "\b%s\b" % re.escape(display_name), ev['content']['body'], + flags=re.IGNORECASE + ) is not None + + elif condition['kind'] == 'room_member_count': + if 'is' not in condition: + return False + m = Pusher.INEQUALITY_EXPR.match(condition['is']) + if not m: + return False + ineq = m.group(1) + rhs = m.group(2) + if not rhs.isdigit(): + return False + rhs = int(rhs) + + if ineq == '' or ineq == '==': + return room_member_count == rhs + elif ineq == '<': + return room_member_count < rhs + elif ineq == '>': + return room_member_count > rhs + elif ineq == '>=': + return room_member_count >= rhs + elif ineq == '<=': + return room_member_count <= rhs + else: + return False + else: + return True + + @defer.inlineCallbacks + def get_context_for_event(self, ev): + name_aliases = yield self.store.get_room_name_and_aliases( + ev['room_id'] + ) + + ctx = {'aliases': name_aliases[1]} + if name_aliases[0] is not None: + ctx['name'] = name_aliases[0] + + their_member_events_for_room = yield self.store.get_current_state( + room_id=ev['room_id'], + event_type='m.room.member', + state_key=ev['user_id'] + ) + for mev in their_member_events_for_room: + if mev.content['membership'] == 'join' and 'displayname' in mev.content: + dn = mev.content['displayname'] + if dn is not None: + ctx['sender_display_name'] = dn + + defer.returnValue(ctx) + + @defer.inlineCallbacks + def start(self): + if not self.last_token: + # First-time setup: get a token to start from (we can't + # just start from no token, ie. 'now' + # because we need the result to be reproduceable in case + # we fail to dispatch the push) + config = PaginationConfig(from_token=None, limit='1') + chunk = yield self.evStreamHandler.get_stream( + self.user_name, config, timeout=0) + self.last_token = chunk['end'] + self.store.update_pusher_last_token( + self.user_name, self.pushkey, self.last_token) + logger.info("Pusher %s for user %s starting from token %s", + self.pushkey, self.user_name, self.last_token) + + while self.alive: + from_tok = StreamToken.from_string(self.last_token) + config = PaginationConfig(from_token=from_tok, limit='1') + chunk = yield self.evStreamHandler.get_stream( + self.user_name, config, + timeout=100*365*24*60*60*1000, affect_presence=False + ) + + # limiting to 1 may get 1 event plus 1 presence event, so + # pick out the actual event + single_event = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + single_event = c + break + if not single_event: + self.last_token = chunk['end'] + continue + + if not self.alive: + continue + + processed = False + actions = yield self._actions_for_event(single_event) + tweaks = _tweaks_for_actions(actions) + + if len(actions) == 0: + logger.warn("Empty actions! Using default action.") + actions = Pusher.DEFAULT_ACTIONS + if 'notify' not in actions and 'dont_notify' not in actions: + logger.warn("Neither notify nor dont_notify in actions: adding default") + actions.extend(Pusher.DEFAULT_ACTIONS) + if 'dont_notify' in actions: + logger.debug( + "%s for %s: dont_notify", + single_event['event_id'], self.user_name + ) + processed = True + else: + rejected = yield self.dispatch_push(single_event, tweaks) + self.has_unread = True + if isinstance(rejected, list) or isinstance(rejected, tuple): + processed = True + for pk in rejected: + if pk != self.pushkey: + # for sanity, we only remove the pushkey if it + # was the one we actually sent... + logger.warn( + ("Ignoring rejected pushkey %s because we" + " didn't send it"), pk + ) + else: + logger.info( + "Pushkey %s was rejected: removing", + pk + ) + yield self.hs.get_pusherpool().remove_pusher( + self.app_id, pk + ) + + if not self.alive: + continue + + if processed: + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + self.store.update_pusher_last_token_and_success( + self.user_name, + self.pushkey, + self.last_token, + self.clock.time_msec() + ) + if self.failing_since: + self.failing_since = None + self.store.update_pusher_failing_since( + self.user_name, + self.pushkey, + self.failing_since) + else: + if not self.failing_since: + self.failing_since = self.clock.time_msec() + self.store.update_pusher_failing_since( + self.user_name, + self.pushkey, + self.failing_since + ) + + if (self.failing_since and + self.failing_since < + self.clock.time_msec() - Pusher.GIVE_UP_AFTER): + # we really only give up so that if the URL gets + # fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, " + "pushkey %s", + self.user_name, self.pushkey) + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + self.store.update_pusher_last_token( + self.user_name, + self.pushkey, + self.last_token + ) + + self.failing_since = None + self.store.update_pusher_failing_since( + self.user_name, + self.pushkey, + self.failing_since + ) + else: + logger.warn("Failed to dispatch push for user %s " + "(failing for %dms)." + "Trying again in %dms", + self.user_name, + self.clock.time_msec() - self.failing_since, + self.backoff_delay) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *= 2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF + + def stop(self): + self.alive = False + + def dispatch_push(self, p, tweaks): + """ + Overridden by implementing classes to actually deliver the notification + Args: + p: The event to notify for as a single event from the event stream + Returns: If the notification was delivered, an array containing any + pushkeys that were rejected by the push gateway. + False if the notification could not be delivered (ie. + should be retried). + """ + pass + + def reset_badge_count(self): + pass + + def presence_changed(self, state): + """ + We clear badge counts whenever a user's last_active time is bumped + This is by no means perfect but I think it's the best we can do + without read receipts. + """ + if 'last_active' in state.state: + last_active = state.state['last_active'] + if last_active > self.last_last_active_time: + self.last_last_active_time = last_active + if self.has_unread: + logger.info("Resetting badge count for %s", self.user_name) + self.reset_badge_count() + self.has_unread = False + + +def _value_for_dotted_key(dotted_key, event): + parts = dotted_key.split(".") + val = event + while len(parts) > 0: + if parts[0] not in val: + return None + val = val[parts[0]] + parts = parts[1:] + return val + + +def _tweaks_for_actions(actions): + tweaks = {} + for a in actions: + if not isinstance(a, dict): + continue + if 'set_tweak' in a and 'value' in a: + tweaks[a['set_tweak']] = a['value'] + return tweaks + + +class PusherConfigException(Exception): + def __init__(self, msg): + super(PusherConfigException, self).__init__(msg) diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py new file mode 100644 index 0000000000..162d265f66 --- /dev/null +++ b/synapse/push/baserules.py @@ -0,0 +1,97 @@ +from synapse.push.rulekinds import PRIORITY_CLASS_MAP, PRIORITY_CLASS_INVERSE_MAP + + +def list_with_base_rules(rawrules, user_name): + ruleslist = [] + + # shove the server default rules for each kind onto the end of each + current_prio_class = PRIORITY_CLASS_INVERSE_MAP.keys()[-1] + for r in rawrules: + if r['priority_class'] < current_prio_class: + while r['priority_class'] < current_prio_class: + ruleslist.extend(make_base_rules( + user_name, + PRIORITY_CLASS_INVERSE_MAP[current_prio_class] + )) + current_prio_class -= 1 + + ruleslist.append(r) + + while current_prio_class > 0: + ruleslist.extend(make_base_rules( + user_name, + PRIORITY_CLASS_INVERSE_MAP[current_prio_class] + )) + current_prio_class -= 1 + + return ruleslist + + +def make_base_rules(user, kind): + rules = [] + + if kind == 'override': + rules = make_base_override_rules() + elif kind == 'content': + rules = make_base_content_rules(user) + + for r in rules: + r['priority_class'] = PRIORITY_CLASS_MAP[kind] + r['default'] = True + + return rules + + +def make_base_content_rules(user): + return [ + { + 'conditions': [ + { + 'kind': 'event_match', + 'key': 'content.body', + 'pattern': user.localpart, # Matrix ID match + } + ], + 'actions': [ + 'notify', + { + 'set_tweak': 'sound', + 'value': 'default', + } + ] + }, + ] + + +def make_base_override_rules(): + return [ + { + 'conditions': [ + { + 'kind': 'contains_display_name' + } + ], + 'actions': [ + 'notify', + { + 'set_tweak': 'sound', + 'value': 'default' + } + ] + }, + { + 'conditions': [ + { + 'kind': 'room_member_count', + 'is': '2' + } + ], + 'actions': [ + 'notify', + { + 'set_tweak': 'sound', + 'value': 'default' + } + ] + } + ] diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py new file mode 100644 index 0000000000..5788db4eba --- /dev/null +++ b/synapse/push/httppusher.py @@ -0,0 +1,146 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.push import Pusher, PusherConfigException +from synapse.http.client import SimpleHttpClient + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + + +class HttpPusher(Pusher): + def __init__(self, _hs, profile_tag, user_name, app_id, + app_display_name, device_display_name, pushkey, pushkey_ts, + data, last_token, last_success, failing_since): + super(HttpPusher, self).__init__( + _hs, + profile_tag, + user_name, + app_id, + app_display_name, + device_display_name, + pushkey, + pushkey_ts, + data, + last_token, + last_success, + failing_since + ) + if 'url' not in data: + raise PusherConfigException( + "'url' required in data for HTTP pusher" + ) + self.url = data['url'] + self.httpCli = SimpleHttpClient(self.hs) + self.data_minus_url = {} + self.data_minus_url.update(self.data) + del self.data_minus_url['url'] + + @defer.inlineCallbacks + def _build_notification_dict(self, event, tweaks): + # we probably do not want to push for every presence update + # (we may want to be able to set up notifications when specific + # people sign in, but we'd want to only deliver the pertinent ones) + # Actually, presence events will not get this far now because we + # need to filter them out in the main Pusher code. + if 'event_id' not in event: + defer.returnValue(None) + + ctx = yield self.get_context_for_event(event) + + d = { + 'notification': { + 'id': event['event_id'], + 'type': event['type'], + 'sender': event['user_id'], + 'counts': { # -- we don't mark messages as read yet so + # we have no way of knowing + # Just set the badge to 1 until we have read receipts + 'unread': 1, + # 'missed_calls': 2 + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + 'tweaks': tweaks + } + ] + } + } + if event['type'] == 'm.room.member': + d['notification']['membership'] = event['content']['membership'] + if 'content' in event: + d['notification']['content'] = event['content'] + + if len(ctx['aliases']): + d['notification']['room_alias'] = ctx['aliases'][0] + if 'sender_display_name' in ctx and len(ctx['sender_display_name']) > 0: + d['notification']['sender_display_name'] = ctx['sender_display_name'] + if 'name' in ctx and len(ctx['name']) > 0: + d['notification']['room_name'] = ctx['name'] + + defer.returnValue(d) + + @defer.inlineCallbacks + def dispatch_push(self, event, tweaks): + notification_dict = yield self._build_notification_dict(event, tweaks) + if not notification_dict: + defer.returnValue([]) + try: + resp = yield self.httpCli.post_json_get_json(self.url, notification_dict) + except: + logger.exception("Failed to push %s ", self.url) + defer.returnValue(False) + rejected = [] + if 'rejected' in resp: + rejected = resp['rejected'] + defer.returnValue(rejected) + + @defer.inlineCallbacks + def reset_badge_count(self): + d = { + 'notification': { + 'id': '', + 'type': None, + 'sender': '', + 'counts': { + 'unread': 0, + 'missed_calls': 0 + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + } + ] + } + } + try: + resp = yield self.httpCli.post_json_get_json(self.url, d) + except: + logger.exception("Failed to push %s ", self.url) + defer.returnValue(False) + rejected = [] + if 'rejected' in resp: + rejected = resp['rejected'] + defer.returnValue(rejected) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py new file mode 100644 index 0000000000..5a525befd7 --- /dev/null +++ b/synapse/push/pusherpool.py @@ -0,0 +1,152 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from httppusher import HttpPusher +from synapse.push import PusherConfigException + +import logging +import json + +logger = logging.getLogger(__name__) + + +class PusherPool: + def __init__(self, _hs): + self.hs = _hs + self.store = self.hs.get_datastore() + self.pushers = {} + self.last_pusher_started = -1 + + distributor = self.hs.get_distributor() + distributor.observe( + "user_presence_changed", self.user_presence_changed + ) + + @defer.inlineCallbacks + def user_presence_changed(self, user, state): + user_name = user.to_string() + + # until we have read receipts, pushers use this to reset a user's + # badge counters to zero + for p in self.pushers.values(): + if p.user_name == user_name: + yield p.presence_changed(state) + + @defer.inlineCallbacks + def start(self): + pushers = yield self.store.get_all_pushers() + for p in pushers: + p['data'] = json.loads(p['data']) + self._start_pushers(pushers) + + @defer.inlineCallbacks + def add_pusher(self, user_name, profile_tag, kind, app_id, + app_display_name, device_display_name, pushkey, lang, data): + # we try to create the pusher just to validate the config: it + # will then get pulled out of the database, + # recreated, added and started: this means we have only one + # code path adding pushers. + self._create_pusher({ + "user_name": user_name, + "kind": kind, + "profile_tag": profile_tag, + "app_id": app_id, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "pushkey": pushkey, + "pushkey_ts": self.hs.get_clock().time_msec(), + "lang": lang, + "data": data, + "last_token": None, + "last_success": None, + "failing_since": None + }) + yield self._add_pusher_to_store( + user_name, profile_tag, kind, app_id, + app_display_name, device_display_name, + pushkey, lang, data + ) + + @defer.inlineCallbacks + def _add_pusher_to_store(self, user_name, profile_tag, kind, app_id, + app_display_name, device_display_name, + pushkey, lang, data): + yield self.store.add_pusher( + user_name=user_name, + profile_tag=profile_tag, + kind=kind, + app_id=app_id, + app_display_name=app_display_name, + device_display_name=device_display_name, + pushkey=pushkey, + pushkey_ts=self.hs.get_clock().time_msec(), + lang=lang, + data=json.dumps(data) + ) + self._refresh_pusher((app_id, pushkey)) + + def _create_pusher(self, pusherdict): + if pusherdict['kind'] == 'http': + return HttpPusher( + self.hs, + profile_tag=pusherdict['profile_tag'], + user_name=pusherdict['user_name'], + app_id=pusherdict['app_id'], + app_display_name=pusherdict['app_display_name'], + device_display_name=pusherdict['device_display_name'], + pushkey=pusherdict['pushkey'], + pushkey_ts=pusherdict['pushkey_ts'], + data=pusherdict['data'], + last_token=pusherdict['last_token'], + last_success=pusherdict['last_success'], + failing_since=pusherdict['failing_since'] + ) + else: + raise PusherConfigException( + "Unknown pusher type '%s' for user %s" % + (pusherdict['kind'], pusherdict['user_name']) + ) + + @defer.inlineCallbacks + def _refresh_pusher(self, app_id_pushkey): + p = yield self.store.get_pushers_by_app_id_and_pushkey( + app_id_pushkey + ) + p['data'] = json.loads(p['data']) + + self._start_pushers([p]) + + def _start_pushers(self, pushers): + logger.info("Starting %d pushers", len(pushers)) + for pusherdict in pushers: + p = self._create_pusher(pusherdict) + if p: + fullid = "%s:%s" % (pusherdict['app_id'], pusherdict['pushkey']) + if fullid in self.pushers: + self.pushers[fullid].stop() + self.pushers[fullid] = p + p.start() + + @defer.inlineCallbacks + def remove_pusher(self, app_id, pushkey): + fullid = "%s:%s" % (app_id, pushkey) + if fullid in self.pushers: + logger.info("Stopping pusher %s", fullid) + self.pushers[fullid].stop() + del self.pushers[fullid] + yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey) diff --git a/synapse/push/rulekinds.py b/synapse/push/rulekinds.py new file mode 100644 index 0000000000..660aa4e10e --- /dev/null +++ b/synapse/push/rulekinds.py @@ -0,0 +1,8 @@ +PRIORITY_CLASS_MAP = { + 'underride': 1, + 'sender': 2, + 'room': 3, + 'content': 4, + 'override': 5, +} +PRIORITY_CLASS_INVERSE_MAP = {v: k for k, v in PRIORITY_CLASS_MAP.items()} |