diff options
-rw-r--r-- | synapse/api/errors.py | 1 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 2 | ||||
-rw-r--r-- | synapse/http/client.py | 19 | ||||
-rw-r--r-- | synapse/push/__init__.py | 114 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 86 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 98 | ||||
-rw-r--r-- | synapse/rest/__init__.py | 3 | ||||
-rw-r--r-- | synapse/rest/pusher.py | 71 | ||||
-rw-r--r-- | synapse/server.py | 5 | ||||
-rw-r--r-- | synapse/storage/__init__.py | 4 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 116 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v7.sql | 30 | ||||
-rw-r--r-- | synapse/storage/schema/pusher.sql | 30 |
13 files changed, 577 insertions, 2 deletions
diff --git a/synapse/api/errors.py b/synapse/api/errors.py index 581439ceb3..64784bf212 100644 --- a/synapse/api/errors.py +++ b/synapse/api/errors.py @@ -34,6 +34,7 @@ class Codes(object): LIMIT_EXCEEDED = "M_LIMIT_EXCEEDED" CAPTCHA_NEEDED = "M_CAPTCHA_NEEDED" CAPTCHA_INVALID = "M_CAPTCHA_INVALID" + MISSING_PARAM = "M_MISSING_PARAM" class CodeMessageException(Exception): diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 855fe8e170..d24c934b3f 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -242,6 +242,8 @@ def setup(): bind_port = None hs.start_listening(bind_port, config.unsecure_port) + hs.get_pusherpool().start() + if config.daemonize: print config.pid_file daemon = Daemonize( diff --git a/synapse/http/client.py b/synapse/http/client.py index 048a428905..82e80385ce 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -61,6 +61,25 @@ class SimpleHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks + def post_json_get_json(self, uri, post_json): + json_str = json.dumps(post_json) + + logger.info("HTTP POST %s -> %s", json_str, uri) + + response = yield self.agent.request( + "POST", + uri.encode("ascii"), + headers=Headers({ + "Content-Type": ["application/json"] + }), + bodyProducer=FileBodyProducer(StringIO(json_str)) + ) + + body = yield readBody(response) + + defer.returnValue(json.loads(body)) + + @defer.inlineCallbacks def get_json(self, uri, args={}): """ Get's some json from the given host and path diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py new file mode 100644 index 0000000000..a96f0f0183 --- /dev/null +++ b/synapse/push/__init__.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.streams.config import PaginationConfig +from synapse.types import StreamToken + +import synapse.util.async + +import logging + +logger = logging.getLogger(__name__) + +class Pusher(object): + INITIAL_BACKOFF = 1000 + MAX_BACKOFF = 60 * 60 * 1000 + GIVE_UP_AFTER = 24 * 60 * 60 * 1000 + + def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, + last_token, last_success, failing_since): + self.hs = _hs + self.evStreamHandler = self.hs.get_handlers().event_stream_handler + self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() + self.user_name = user_name + self.app = app + self.app_display_name = app_display_name + self.device_display_name = device_display_name + self.pushkey = pushkey + self.data = data + self.last_token = last_token + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.failing_since = None + + @defer.inlineCallbacks + def start(self): + if not self.last_token: + # First-time setup: get a token to start from (we can't just start from no token, ie. 'now' + # because we need the result to be reproduceable in case we fail to dispatch the push) + config = PaginationConfig(from_token=None, limit='1') + chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=0) + self.last_token = chunk['end'] + self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token) + logger.info("Pusher %s for user %s starting from token %s", + self.pushkey, self.user_name, self.last_token) + + while True: + from_tok = StreamToken.from_string(self.last_token) + config = PaginationConfig(from_token=from_tok, limit='1') + chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000) + + # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event + singleEvent = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + singleEvent = c + break + if not singleEvent: + continue + + ret = yield self.dispatchPush(singleEvent) + if (ret): + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey, + self.last_token, self.clock.time_msec()) + if self.failing_since: + self.failing_since = None + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) + else: + if not self.failing_since: + self.failing_since = self.clock.time_msec() + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) + + if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER: + # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, pushkey %s", + self.user_name, self.pushkey) + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token) + + self.failing_since = None + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) + else: + logger.warn("Failed to dispatch push for user %s (failing for %dms)." + "Trying again in %dms", + self.user_name, + self.clock.time_msec() - self.failing_since, + self.backoff_delay + ) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *=2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF + + +class PusherConfigException(Exception): + def __init__(self, msg): + super(PusherConfigException, self).__init__(msg) \ No newline at end of file diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py new file mode 100644 index 0000000000..33d735b974 --- /dev/null +++ b/synapse/push/httppusher.py @@ -0,0 +1,86 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from synapse.push import Pusher, PusherConfigException +from synapse.http.client import SimpleHttpClient + +from twisted.internet import defer + +import logging + +logger = logging.getLogger(__name__) + +class HttpPusher(Pusher): + def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, + last_token, last_success, failing_since): + super(HttpPusher, self).__init__(_hs, + user_name, + app, + app_display_name, + device_display_name, + pushkey, + data, + last_token, + last_success, + failing_since) + if 'url' not in data: + raise PusherConfigException("'url' required in data for HTTP pusher") + self.url = data['url'] + self.httpCli = SimpleHttpClient(self.hs) + self.data_minus_url = {} + self.data_minus_url.update(self.data) + del self.data_minus_url['url'] + + def _build_notification_dict(self, event): + # we probably do not want to push for every presence update + # (we may want to be able to set up notifications when specific + # people sign in, but we'd want to only deliver the pertinent ones) + # Actually, presence events will not get this far now because we + # need to filter them out in the main Pusher code. + if 'event_id' not in event: + return None + + return { + 'notification': { + 'transition' : 'new', # everything is new for now: we don't have read receipts + 'id': event['event_id'], + 'type': event['type'], + 'from': event['user_id'], + # we may have to fetch this over federation and we can't trust it anyway: is it worth it? + #'fromDisplayName': 'Steve Stevington' + }, + #'counts': { -- we don't mark messages as read yet so we have no way of knowing + # 'unread': 1, + # 'missedCalls': 2 + # }, + 'devices': { + self.pushkey: { + 'data' : self.data_minus_url + } + } + } + + @defer.inlineCallbacks + def dispatchPush(self, event): + notificationDict = self._build_notification_dict(event) + if not notificationDict: + defer.returnValue(True) + try: + yield self.httpCli.post_json_get_json(self.url, notificationDict) + except: + logger.exception("Failed to push %s ", self.url) + defer.returnValue(False) + defer.returnValue(True) + diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py new file mode 100644 index 0000000000..3fa5a4c4ff --- /dev/null +++ b/synapse/push/pusherpool.py @@ -0,0 +1,98 @@ +#!/usr/bin/env python +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from httppusher import HttpPusher +from synapse.push import PusherConfigException + +import logging +import json + +logger = logging.getLogger(__name__) + +class PusherPool: + def __init__(self, _hs): + self.hs = _hs + self.store = self.hs.get_datastore() + self.pushers = [] + self.last_pusher_started = -1 + + def start(self): + self._pushers_added() + + def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data): + # we try to create the pusher just to validate the config: it will then get pulled out of the database, + # recreated, added and started: this means we have only one code path adding pushers. + self._create_pusher({ + "user_name": user_name, + "kind": kind, + "app": app, + "app_display_name": app_display_name, + "device_display_name": device_display_name, + "pushkey": pushkey, + "data": data, + "last_token": None, + "last_success": None, + "failing_since": None + }) + self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data) + + @defer.inlineCallbacks + def _add_pusher_to_store(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data): + yield self.store.add_pusher(user_name=user_name, + kind=kind, + app=app, + app_display_name=app_display_name, + device_display_name=device_display_name, + pushkey=pushkey, + data=json.dumps(data)) + self._pushers_added() + + def _create_pusher(self, pusherdict): + if pusherdict['kind'] == 'http': + return HttpPusher(self.hs, + user_name=pusherdict['user_name'], + app=pusherdict['app'], + app_display_name=pusherdict['app_display_name'], + device_display_name=pusherdict['device_display_name'], + pushkey=pusherdict['pushkey'], + data=pusherdict['data'], + last_token=pusherdict['last_token'], + last_success=pusherdict['last_success'], + failing_since=pusherdict['failing_since'] + ) + else: + raise PusherConfigException("Unknown pusher type '%s' for user %s" % + (pusherdict['kind'], pusherdict['user_name'])) + + @defer.inlineCallbacks + def _pushers_added(self): + pushers = yield self.store.get_all_pushers_after_id(self.last_pusher_started) + for p in pushers: + p['data'] = json.loads(p['data']) + if (len(pushers)): + self.last_pusher_started = pushers[-1]['id'] + + self._start_pushers(pushers) + + def _start_pushers(self, pushers): + logger.info("Starting %d pushers", (len(pushers))) + for pusherdict in pushers: + p = self._create_pusher(pusherdict) + if p: + self.pushers.append(p) + p.start() \ No newline at end of file diff --git a/synapse/rest/__init__.py b/synapse/rest/__init__.py index e391e5678d..c38cf27690 100644 --- a/synapse/rest/__init__.py +++ b/synapse/rest/__init__.py @@ -16,7 +16,7 @@ from . import ( room, events, register, login, profile, presence, initial_sync, directory, - voip, admin, + voip, admin, pusher, ) @@ -45,3 +45,4 @@ class RestServletFactory(object): directory.register_servlets(hs, client_resource) voip.register_servlets(hs, client_resource) admin.register_servlets(hs, client_resource) + pusher.register_servlets(hs, client_resource) diff --git a/synapse/rest/pusher.py b/synapse/rest/pusher.py new file mode 100644 index 0000000000..85d0d1c8cd --- /dev/null +++ b/synapse/rest/pusher.py @@ -0,0 +1,71 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from twisted.internet import defer + +from synapse.api.errors import SynapseError, Codes +from synapse.push import PusherConfigException +from base import RestServlet, client_path_pattern + +import json + + +class PusherRestServlet(RestServlet): + PATTERN = client_path_pattern("/pushers/(?P<pushkey>[\w]*)$") + + @defer.inlineCallbacks + def on_PUT(self, request, pushkey): + user = yield self.auth.get_user_by_req(request) + + content = _parse_json(request) + + reqd = ['kind', 'app', 'app_display_name', 'device_display_name', 'data'] + missing = [] + for i in reqd: + if i not in content: + missing.append(i) + if len(missing): + raise SynapseError(400, "Missing parameters: "+','.join(missing), errcode=Codes.MISSING_PARAM) + + pusher_pool = self.hs.get_pusherpool() + try: + pusher_pool.add_pusher(user_name=user.to_string(), + kind=content['kind'], + app=content['app'], + app_display_name=content['app_display_name'], + device_display_name=content['device_display_name'], + pushkey=pushkey, + data=content['data']) + except PusherConfigException as pce: + raise SynapseError(400, "Config Error: "+pce.message, errcode=Codes.MISSING_PARAM) + + defer.returnValue((200, {})) + + def on_OPTIONS(self, request): + return (200, {}) + +# XXX: C+ped from rest/room.py - surely this should be common? +def _parse_json(request): + try: + content = json.loads(request.content.read()) + if type(content) != dict: + raise SynapseError(400, "Content must be a JSON object.", + errcode=Codes.NOT_JSON) + return content + except ValueError: + raise SynapseError(400, "Content not JSON.", errcode=Codes.NOT_JSON) + +def register_servlets(hs, http_server): + PusherRestServlet(hs).register(http_server) diff --git a/synapse/server.py b/synapse/server.py index da0a44433a..cfbe7d5e38 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -36,6 +36,7 @@ from synapse.util.lockutils import LockManager from synapse.streams.events import EventSources from synapse.api.ratelimiting import Ratelimiter from synapse.crypto.keyring import Keyring +from synapse.push.pusherpool import PusherPool class BaseHomeServer(object): @@ -82,6 +83,7 @@ class BaseHomeServer(object): 'ratelimiter', 'keyring', 'event_validator', + 'pusherpool' ] def __init__(self, hostname, **kwargs): @@ -228,6 +230,9 @@ class HomeServer(BaseHomeServer): def build_event_validator(self): return EventValidator(self) + def build_pusherpool(self): + return PusherPool(self) + def register_servlets(self): """ Register all servlets associated with this HomeServer. """ diff --git a/synapse/storage/__init__.py b/synapse/storage/__init__.py index f15e3dfe62..642e5e289e 100644 --- a/synapse/storage/__init__.py +++ b/synapse/storage/__init__.py @@ -33,6 +33,7 @@ from .stream import StreamStore from .transactions import TransactionStore from .keys import KeyStore from .event_federation import EventFederationStore +from .pusher import PusherStore from .state import StateStore from .signatures import SignatureStore @@ -62,6 +63,7 @@ SCHEMAS = [ "state", "event_edges", "event_signatures", + "pusher" ] @@ -81,7 +83,7 @@ class DataStore(RoomMemberStore, RoomStore, RegistrationStore, StreamStore, ProfileStore, FeedbackStore, PresenceStore, TransactionStore, DirectoryStore, KeyStore, StateStore, SignatureStore, - EventFederationStore, ): + EventFederationStore, PusherStore, ): def __init__(self, hs): super(DataStore, self).__init__(hs) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py new file mode 100644 index 0000000000..ce158c4b18 --- /dev/null +++ b/synapse/storage/pusher.py @@ -0,0 +1,116 @@ +# -*- coding: utf-8 -*- +# Copyright 2014 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import collections + +from ._base import SQLBaseStore, Table +from twisted.internet import defer + +from sqlite3 import IntegrityError +from synapse.api.errors import StoreError + +import logging + +logger = logging.getLogger(__name__) + +class PusherStore(SQLBaseStore): + @defer.inlineCallbacks + def get_all_pushers_after_id(self, min_id): + sql = ( + "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, " + "last_token, last_success, failing_since " + "FROM pushers " + "WHERE id > ?" + ) + + rows = yield self._execute(None, sql, min_id) + + ret = [ + { + "id": r[0], + "user_name": r[1], + "kind": r[2], + "app": r[3], + "app_display_name": r[4], + "device_display_name": r[5], + "pushkey": r[6], + "data": r[7], + "last_token": r[8], + "last_success": r[9], + "failing_since": r[10] + } + for r in rows + ] + + defer.returnValue(ret) + + @defer.inlineCallbacks + def add_pusher(self, user_name, kind, app, app_display_name, device_display_name, pushkey, data): + try: + yield self._simple_insert(PushersTable.table_name, dict( + user_name=user_name, + kind=kind, + app=app, + app_display_name=app_display_name, + device_display_name=device_display_name, + pushkey=pushkey, + data=data + )) + except IntegrityError: + raise StoreError(409, "Pushkey in use.") + except Exception as e: + logger.error("create_pusher with failed: %s", e) + raise StoreError(500, "Problem creating pusher.") + + @defer.inlineCallbacks + def update_pusher_last_token(self, user_name, pushkey, last_token): + yield self._simple_update_one(PushersTable.table_name, + {'user_name': user_name, 'pushkey': pushkey}, + {'last_token': last_token} + ) + + @defer.inlineCallbacks + def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success): + yield self._simple_update_one(PushersTable.table_name, + {'user_name': user_name, 'pushkey': pushkey}, + {'last_token': last_token, 'last_success': last_success} + ) + + @defer.inlineCallbacks + def update_pusher_failing_since(self, user_name, pushkey, failing_since): + yield self._simple_update_one(PushersTable.table_name, + {'user_name': user_name, 'pushkey': pushkey}, + {'failing_since': failing_since} + ) + + +class PushersTable(Table): + table_name = "pushers" + + fields = [ + "id", + "user_name", + "kind", + "app" + "app_display_name", + "device_display_name", + "pushkey", + "data", + "last_token", + "last_success", + "failing_since" + ] + + EntryType = collections.namedtuple("PusherEntry", fields) \ No newline at end of file diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql new file mode 100644 index 0000000000..e83f7e7436 --- /dev/null +++ b/synapse/storage/schema/delta/v7.sql @@ -0,0 +1,30 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- Push notification endpoints that users have configured +CREATE TABLE IF NOT EXISTS pushers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_name TEXT NOT NULL, + kind varchar(8) NOT NULL, + app varchar(64) NOT NULL, + app_display_name varchar(64) NOT NULL, + device_display_name varchar(128) NOT NULL, + pushkey blob NOT NULL, + data text, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + FOREIGN KEY(user_name) REFERENCES users(name), + UNIQUE (user_name, pushkey) +); diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql new file mode 100644 index 0000000000..e83f7e7436 --- /dev/null +++ b/synapse/storage/schema/pusher.sql @@ -0,0 +1,30 @@ +/* Copyright 2014 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +-- Push notification endpoints that users have configured +CREATE TABLE IF NOT EXISTS pushers ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + user_name TEXT NOT NULL, + kind varchar(8) NOT NULL, + app varchar(64) NOT NULL, + app_display_name varchar(64) NOT NULL, + device_display_name varchar(128) NOT NULL, + pushkey blob NOT NULL, + data text, + last_token TEXT, + last_success BIGINT, + failing_since BIGINT, + FOREIGN KEY(user_name) REFERENCES users(name), + UNIQUE (user_name, pushkey) +); |