diff options
author | David Baker <dbkr@matrix.org> | 2014-11-21 12:21:00 +0000 |
---|---|---|
committer | David Baker <dbkr@matrix.org> | 2014-11-21 12:21:00 +0000 |
commit | eb6aedf92c0fe467fd4724623262907ad78573bb (patch) | |
tree | 69b1f04952ffd7dd82b6643a56f1bc4e34c2087b | |
parent | Merge branch 'develop' into pushers (diff) | |
download | synapse-eb6aedf92c0fe467fd4724623262907ad78573bb.tar.xz |
More work on pushers. Attempt to do HTTP pokes. Not sure if the actual HTTP pokes work or not yet but the retry semantics are pretty good.
-rw-r--r-- | synapse/http/client.py | 19 | ||||
-rw-r--r-- | synapse/push/__init__.py | 58 | ||||
-rw-r--r-- | synapse/push/httppusher.py | 55 | ||||
-rw-r--r-- | synapse/push/pusherpool.py | 8 | ||||
-rw-r--r-- | synapse/storage/pusher.py | 26 | ||||
-rw-r--r-- | synapse/storage/schema/delta/v7.sql | 2 | ||||
-rw-r--r-- | synapse/storage/schema/pusher.sql | 2 |
7 files changed, 150 insertions, 20 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py index 048a428905..82e80385ce 100644 --- a/synapse/http/client.py +++ b/synapse/http/client.py @@ -61,6 +61,25 @@ class SimpleHttpClient(object): defer.returnValue(json.loads(body)) @defer.inlineCallbacks + def post_json_get_json(self, uri, post_json): + json_str = json.dumps(post_json) + + logger.info("HTTP POST %s -> %s", json_str, uri) + + response = yield self.agent.request( + "POST", + uri.encode("ascii"), + headers=Headers({ + "Content-Type": ["application/json"] + }), + bodyProducer=FileBodyProducer(StringIO(json_str)) + ) + + body = yield readBody(response) + + defer.returnValue(json.loads(body)) + + @defer.inlineCallbacks def get_json(self, uri, args={}): """ Get's some json from the given host and path diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py index df0b91a8e9..a96f0f0183 100644 --- a/synapse/push/__init__.py +++ b/synapse/push/__init__.py @@ -26,12 +26,15 @@ logger = logging.getLogger(__name__) class Pusher(object): INITIAL_BACKOFF = 1000 - MAX_BACKOFF = 10 * 60 * 1000 + MAX_BACKOFF = 60 * 60 * 1000 + GIVE_UP_AFTER = 24 * 60 * 60 * 1000 - def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token): + def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, + last_token, last_success, failing_since): self.hs = _hs self.evStreamHandler = self.hs.get_handlers().event_stream_handler self.store = self.hs.get_datastore() + self.clock = self.hs.get_clock() self.user_name = user_name self.app = app self.app_display_name = app_display_name @@ -40,6 +43,7 @@ class Pusher(object): self.data = data self.last_token = last_token self.backoff_delay = Pusher.INITIAL_BACKOFF + self.failing_since = None @defer.inlineCallbacks def start(self): @@ -58,17 +62,51 @@ class Pusher(object): config = PaginationConfig(from_token=from_tok, limit='1') chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000) - if (self.dispatchPush(chunk['chunk'][0])): + # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event + singleEvent = None + for c in chunk['chunk']: + if 'event_id' in c: # Hmmm... + singleEvent = c + break + if not singleEvent: + continue + + ret = yield self.dispatchPush(singleEvent) + if (ret): self.backoff_delay = Pusher.INITIAL_BACKOFF self.last_token = chunk['end'] - self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token) + self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey, + self.last_token, self.clock.time_msec()) + if self.failing_since: + self.failing_since = None + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) else: - logger.warn("Failed to dispatch push for user %s. Trying again in %dms", - self.user_name, self.backoff_delay) - yield synapse.util.async.sleep(self.backoff_delay / 1000.0) - self.backoff_delay *=2 - if self.backoff_delay > Pusher.MAX_BACKOFF: - self.backoff_delay = Pusher.MAX_BACKOFF + if not self.failing_since: + self.failing_since = self.clock.time_msec() + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) + + if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER: + # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load + # of old notifications. + logger.warn("Giving up on a notification to user %s, pushkey %s", + self.user_name, self.pushkey) + self.backoff_delay = Pusher.INITIAL_BACKOFF + self.last_token = chunk['end'] + self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token) + + self.failing_since = None + self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since) + else: + logger.warn("Failed to dispatch push for user %s (failing for %dms)." + "Trying again in %dms", + self.user_name, + self.clock.time_msec() - self.failing_since, + self.backoff_delay + ) + yield synapse.util.async.sleep(self.backoff_delay / 1000.0) + self.backoff_delay *=2 + if self.backoff_delay > Pusher.MAX_BACKOFF: + self.backoff_delay = Pusher.MAX_BACKOFF class PusherConfigException(Exception): diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index f3c3ca8191..33d735b974 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -14,13 +14,17 @@ # limitations under the License. from synapse.push import Pusher, PusherConfigException +from synapse.http.client import SimpleHttpClient + +from twisted.internet import defer import logging logger = logging.getLogger(__name__) class HttpPusher(Pusher): - def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token): + def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, + last_token, last_success, failing_since): super(HttpPusher, self).__init__(_hs, user_name, app, @@ -28,12 +32,55 @@ class HttpPusher(Pusher): device_display_name, pushkey, data, - last_token) + last_token, + last_success, + failing_since) if 'url' not in data: raise PusherConfigException("'url' required in data for HTTP pusher") self.url = data['url'] + self.httpCli = SimpleHttpClient(self.hs) + self.data_minus_url = {} + self.data_minus_url.update(self.data) + del self.data_minus_url['url'] + + def _build_notification_dict(self, event): + # we probably do not want to push for every presence update + # (we may want to be able to set up notifications when specific + # people sign in, but we'd want to only deliver the pertinent ones) + # Actually, presence events will not get this far now because we + # need to filter them out in the main Pusher code. + if 'event_id' not in event: + return None + + return { + 'notification': { + 'transition' : 'new', # everything is new for now: we don't have read receipts + 'id': event['event_id'], + 'type': event['type'], + 'from': event['user_id'], + # we may have to fetch this over federation and we can't trust it anyway: is it worth it? + #'fromDisplayName': 'Steve Stevington' + }, + #'counts': { -- we don't mark messages as read yet so we have no way of knowing + # 'unread': 1, + # 'missedCalls': 2 + # }, + 'devices': { + self.pushkey: { + 'data' : self.data_minus_url + } + } + } + @defer.inlineCallbacks def dispatchPush(self, event): - print event - return True + notificationDict = self._build_notification_dict(event) + if not notificationDict: + defer.returnValue(True) + try: + yield self.httpCli.post_json_get_json(self.url, notificationDict) + except: + logger.exception("Failed to push %s ", self.url) + defer.returnValue(False) + defer.returnValue(True) diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py index 436040f123..3fa5a4c4ff 100644 --- a/synapse/push/pusherpool.py +++ b/synapse/push/pusherpool.py @@ -45,7 +45,9 @@ class PusherPool: "device_display_name": device_display_name, "pushkey": pushkey, "data": data, - "last_token": None + "last_token": None, + "last_success": None, + "failing_since": None }) self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data) @@ -69,7 +71,9 @@ class PusherPool: device_display_name=pusherdict['device_display_name'], pushkey=pusherdict['pushkey'], data=pusherdict['data'], - last_token=pusherdict['last_token'] + last_token=pusherdict['last_token'], + last_success=pusherdict['last_success'], + failing_since=pusherdict['failing_since'] ) else: raise PusherConfigException("Unknown pusher type '%s' for user %s" % diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index 047a5f42d9..ce158c4b18 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -29,7 +29,8 @@ class PusherStore(SQLBaseStore): @defer.inlineCallbacks def get_all_pushers_after_id(self, min_id): sql = ( - "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, last_token " + "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, " + "last_token, last_success, failing_since " "FROM pushers " "WHERE id > ?" ) @@ -46,8 +47,9 @@ class PusherStore(SQLBaseStore): "device_display_name": r[5], "pushkey": r[6], "data": r[7], - "last_token": r[8] - + "last_token": r[8], + "last_success": r[9], + "failing_since": r[10] } for r in rows ] @@ -79,6 +81,20 @@ class PusherStore(SQLBaseStore): {'last_token': last_token} ) + @defer.inlineCallbacks + def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success): + yield self._simple_update_one(PushersTable.table_name, + {'user_name': user_name, 'pushkey': pushkey}, + {'last_token': last_token, 'last_success': last_success} + ) + + @defer.inlineCallbacks + def update_pusher_failing_since(self, user_name, pushkey, failing_since): + yield self._simple_update_one(PushersTable.table_name, + {'user_name': user_name, 'pushkey': pushkey}, + {'failing_since': failing_since} + ) + class PushersTable(Table): table_name = "pushers" @@ -92,7 +108,9 @@ class PushersTable(Table): "device_display_name", "pushkey", "data", - "last_token" + "last_token", + "last_success", + "failing_since" ] EntryType = collections.namedtuple("PusherEntry", fields) \ No newline at end of file diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql index 7f6852485d..e83f7e7436 100644 --- a/synapse/storage/schema/delta/v7.sql +++ b/synapse/storage/schema/delta/v7.sql @@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers ( pushkey blob NOT NULL, data text, last_token TEXT, + last_success BIGINT, + failing_since BIGINT, FOREIGN KEY(user_name) REFERENCES users(name), UNIQUE (user_name, pushkey) ); diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql index 7f6852485d..e83f7e7436 100644 --- a/synapse/storage/schema/pusher.sql +++ b/synapse/storage/schema/pusher.sql @@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers ( pushkey blob NOT NULL, data text, last_token TEXT, + last_success BIGINT, + failing_since BIGINT, FOREIGN KEY(user_name) REFERENCES users(name), UNIQUE (user_name, pushkey) ); |