summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorDavid Baker <dbkr@matrix.org>2014-11-21 12:21:00 +0000
committerDavid Baker <dbkr@matrix.org>2014-11-21 12:21:00 +0000
commiteb6aedf92c0fe467fd4724623262907ad78573bb (patch)
tree69b1f04952ffd7dd82b6643a56f1bc4e34c2087b /synapse
parentMerge branch 'develop' into pushers (diff)
downloadsynapse-eb6aedf92c0fe467fd4724623262907ad78573bb.tar.xz
More work on pushers. Attempt to do HTTP pokes. Not sure if the actual HTTP pokes work or not yet but the retry semantics are pretty good.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/http/client.py19
-rw-r--r--synapse/push/__init__.py58
-rw-r--r--synapse/push/httppusher.py55
-rw-r--r--synapse/push/pusherpool.py8
-rw-r--r--synapse/storage/pusher.py26
-rw-r--r--synapse/storage/schema/delta/v7.sql2
-rw-r--r--synapse/storage/schema/pusher.sql2
7 files changed, 150 insertions, 20 deletions
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 048a428905..82e80385ce 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -61,6 +61,25 @@ class SimpleHttpClient(object):
         defer.returnValue(json.loads(body))
 
     @defer.inlineCallbacks
+    def post_json_get_json(self, uri, post_json):
+        json_str = json.dumps(post_json)
+
+        logger.info("HTTP POST %s -> %s", json_str, uri)
+
+        response = yield self.agent.request(
+            "POST",
+            uri.encode("ascii"),
+            headers=Headers({
+                "Content-Type": ["application/json"]
+            }),
+            bodyProducer=FileBodyProducer(StringIO(json_str))
+        )
+
+        body = yield readBody(response)
+
+        defer.returnValue(json.loads(body))
+
+    @defer.inlineCallbacks
     def get_json(self, uri, args={}):
         """ Get's some json from the given host and path
 
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index df0b91a8e9..a96f0f0183 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -26,12 +26,15 @@ logger = logging.getLogger(__name__)
 
 class Pusher(object):
     INITIAL_BACKOFF = 1000
-    MAX_BACKOFF = 10 * 60 * 1000
+    MAX_BACKOFF = 60 * 60 * 1000
+    GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
-    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
+    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+                 last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
+        self.clock = self.hs.get_clock()
         self.user_name = user_name
         self.app = app
         self.app_display_name = app_display_name
@@ -40,6 +43,7 @@ class Pusher(object):
         self.data = data
         self.last_token = last_token
         self.backoff_delay = Pusher.INITIAL_BACKOFF
+        self.failing_since = None
 
     @defer.inlineCallbacks
     def start(self):
@@ -58,17 +62,51 @@ class Pusher(object):
             config = PaginationConfig(from_token=from_tok, limit='1')
             chunk = yield self.evStreamHandler.get_stream(self.user_name, config, timeout=100*365*24*60*60*1000)
 
-            if (self.dispatchPush(chunk['chunk'][0])):
+            # limiting to 1 may get 1 event plus 1 presence event, so pick out the actual event
+            singleEvent = None
+            for c in chunk['chunk']:
+                if 'event_id' in c: # Hmmm...
+                    singleEvent = c
+                    break
+            if not singleEvent:
+                continue
+
+            ret = yield self.dispatchPush(singleEvent)
+            if (ret):
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
-                self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+                self.store.update_pusher_last_token_and_success(self.user_name, self.pushkey,
+                                                                self.last_token, self.clock.time_msec())
+                if self.failing_since:
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
             else:
-                logger.warn("Failed to dispatch push for user %s. Trying again in %dms",
-                            self.user_name, self.backoff_delay)
-                yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
-                self.backoff_delay *=2
-                if self.backoff_delay > Pusher.MAX_BACKOFF:
-                    self.backoff_delay = Pusher.MAX_BACKOFF
+                if not self.failing_since:
+                    self.failing_since = self.clock.time_msec()
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+
+                if self.failing_since and self.failing_since < self.clock.time_msec() - Pusher.GIVE_UP_AFTER:
+                    # we really only give up so that if the URL gets fixed, we don't suddenly deliver a load
+                    # of old notifications.
+                    logger.warn("Giving up on a notification to user %s, pushkey %s",
+                                self.user_name, self.pushkey)
+                    self.backoff_delay = Pusher.INITIAL_BACKOFF
+                    self.last_token = chunk['end']
+                    self.store.update_pusher_last_token(self.user_name, self.pushkey, self.last_token)
+
+                    self.failing_since = None
+                    self.store.update_pusher_failing_since(self.user_name, self.pushkey, self.failing_since)
+                else:
+                    logger.warn("Failed to dispatch push for user %s (failing for %dms)."
+                                "Trying again in %dms",
+                            self.user_name,
+                            self.clock.time_msec() - self.failing_since,
+                            self.backoff_delay
+                    )
+                    yield synapse.util.async.sleep(self.backoff_delay / 1000.0)
+                    self.backoff_delay *=2
+                    if self.backoff_delay > Pusher.MAX_BACKOFF:
+                        self.backoff_delay = Pusher.MAX_BACKOFF
 
 
 class PusherConfigException(Exception):
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f3c3ca8191..33d735b974 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -14,13 +14,17 @@
 # limitations under the License.
 
 from synapse.push import Pusher, PusherConfigException
+from synapse.http.client import SimpleHttpClient
+
+from twisted.internet import defer
 
 import logging
 
 logger = logging.getLogger(__name__)
 
 class HttpPusher(Pusher):
-    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data, last_token):
+    def __init__(self, _hs, user_name, app, app_display_name, device_display_name, pushkey, data,
+                 last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(_hs,
                                          user_name,
                                          app,
@@ -28,12 +32,55 @@ class HttpPusher(Pusher):
                                          device_display_name,
                                          pushkey,
                                          data,
-                                         last_token)
+                                         last_token,
+                                         last_success,
+                                         failing_since)
         if 'url' not in data:
             raise PusherConfigException("'url' required in data for HTTP pusher")
         self.url = data['url']
+        self.httpCli = SimpleHttpClient(self.hs)
+        self.data_minus_url = {}
+        self.data_minus_url.update(self.data)
+        del self.data_minus_url['url']
+
+    def _build_notification_dict(self, event):
+        # we probably do not want to push for every presence update
+        # (we may want to be able to set up notifications when specific
+        # people sign in, but we'd want to only deliver the pertinent ones)
+        # Actually, presence events will not get this far now because we
+        # need to filter them out in the main Pusher code.
+        if 'event_id' not in event:
+            return None
+
+        return {
+           'notification': {
+               'transition' : 'new', # everything is new for now: we don't have read receipts
+               'id': event['event_id'],
+               'type': event['type'],
+               'from': event['user_id'],
+               # we may have to fetch this over federation and we can't trust it anyway: is it worth it?
+               #'fromDisplayName': 'Steve Stevington'
+           },
+           #'counts': { -- we don't mark messages as read yet so we have no way of knowing
+           #    'unread': 1,
+           #    'missedCalls': 2
+           # },
+           'devices': {
+               self.pushkey: {
+                   'data' : self.data_minus_url
+                }
+           }
+        }
 
+    @defer.inlineCallbacks
     def dispatchPush(self, event):
-        print event
-        return True
+        notificationDict = self._build_notification_dict(event)
+        if not notificationDict:
+            defer.returnValue(True)
+        try:
+            yield self.httpCli.post_json_get_json(self.url, notificationDict)
+        except:
+            logger.exception("Failed to push %s ", self.url)
+            defer.returnValue(False)
+        defer.returnValue(True)
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 436040f123..3fa5a4c4ff 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -45,7 +45,9 @@ class PusherPool:
             "device_display_name": device_display_name,
             "pushkey": pushkey,
             "data": data,
-            "last_token": None
+            "last_token": None,
+            "last_success": None,
+            "failing_since": None
         })
         self._add_pusher_to_store(user_name, kind, app, app_display_name, device_display_name, pushkey, data)
 
@@ -69,7 +71,9 @@ class PusherPool:
                                device_display_name=pusherdict['device_display_name'],
                                pushkey=pusherdict['pushkey'],
                                data=pusherdict['data'],
-                               last_token=pusherdict['last_token']
+                               last_token=pusherdict['last_token'],
+                               last_success=pusherdict['last_success'],
+                               failing_since=pusherdict['failing_since']
                                )
         else:
             raise PusherConfigException("Unknown pusher type '%s' for user %s" %
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 047a5f42d9..ce158c4b18 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -29,7 +29,8 @@ class PusherStore(SQLBaseStore):
     @defer.inlineCallbacks
     def get_all_pushers_after_id(self, min_id):
         sql = (
-            "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, last_token "
+            "SELECT id, user_name, kind, app, app_display_name, device_display_name, pushkey, data, "
+            "last_token, last_success, failing_since "
             "FROM pushers "
             "WHERE id > ?"
         )
@@ -46,8 +47,9 @@ class PusherStore(SQLBaseStore):
                 "device_display_name": r[5],
                 "pushkey": r[6],
                 "data": r[7],
-                "last_token": r[8]
-
+                "last_token": r[8],
+                "last_success": r[9],
+                "failing_since": r[10]
             }
             for r in rows
         ]
@@ -79,6 +81,20 @@ class PusherStore(SQLBaseStore):
                                       {'last_token': last_token}
         )
 
+    @defer.inlineCallbacks
+    def update_pusher_last_token_and_success(self, user_name, pushkey, last_token, last_success):
+        yield self._simple_update_one(PushersTable.table_name,
+                                      {'user_name': user_name, 'pushkey': pushkey},
+                                      {'last_token': last_token, 'last_success': last_success}
+        )
+
+    @defer.inlineCallbacks
+    def update_pusher_failing_since(self, user_name, pushkey, failing_since):
+        yield self._simple_update_one(PushersTable.table_name,
+                                      {'user_name': user_name, 'pushkey': pushkey},
+                                      {'failing_since': failing_since}
+        )
+
 
 class PushersTable(Table):
     table_name = "pushers"
@@ -92,7 +108,9 @@ class PushersTable(Table):
         "device_display_name",
         "pushkey",
         "data",
-        "last_token"
+        "last_token",
+        "last_success",
+        "failing_since"
     ]
 
     EntryType = collections.namedtuple("PusherEntry", fields)
\ No newline at end of file
diff --git a/synapse/storage/schema/delta/v7.sql b/synapse/storage/schema/delta/v7.sql
index 7f6852485d..e83f7e7436 100644
--- a/synapse/storage/schema/delta/v7.sql
+++ b/synapse/storage/schema/delta/v7.sql
@@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers (
   pushkey blob NOT NULL,
   data text,
   last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
   FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (user_name, pushkey)
 );
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index 7f6852485d..e83f7e7436 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -23,6 +23,8 @@ CREATE TABLE IF NOT EXISTS pushers (
   pushkey blob NOT NULL,
   data text,
   last_token TEXT,
+  last_success BIGINT,
+  failing_since BIGINT,
   FOREIGN KEY(user_name) REFERENCES users(name),
   UNIQUE (user_name, pushkey)
 );