summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorDavid Baker <dave@matrix.org>2015-01-13 19:48:37 +0000
committerDavid Baker <dave@matrix.org>2015-01-13 19:48:37 +0000
commit2cb30767fa5e428f82c6c3ebced15d568d671c3c (patch)
tree83be3965596504a9e9cf75fe7c0135b5d6bd69d1 /synapse
parentMerge branch 'develop' into pushers (diff)
downloadsynapse-2cb30767fa5e428f82c6c3ebced15d568d671c3c.tar.xz
Honour the 'rejected' return from push gateways
Add a timestamp to push tokens so we know the last time they we
got them from the device. Send it to the push gateways so it can
determine whether its failure is more recent than the token.
Stop and remove pushers that have been rejected.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/push/__init__.py37
-rw-r--r--synapse/push/httppusher.py15
-rw-r--r--synapse/push/pusherpool.py12
-rw-r--r--synapse/storage/pusher.py34
-rw-r--r--synapse/storage/schema/delta/v10.sql1
-rw-r--r--synapse/storage/schema/pusher.sql1
6 files changed, 81 insertions, 19 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index f4795d559c..839f666390 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -31,8 +31,8 @@ class Pusher(object):
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
@@ -42,6 +42,7 @@ class Pusher(object):
         self.app_display_name = app_display_name
         self.device_display_name = device_display_name
         self.pushkey = pushkey
+        self.pushkey_ts = pushkey_ts
         self.data = data
         self.last_token = last_token
         self.last_success = last_success  # not actually used
@@ -98,9 +99,31 @@ class Pusher(object):
 
             processed = False
             if self._should_notify_for_event(single_event):
-                processed = yield self.dispatch_push(single_event)
+                rejected = yield self.dispatch_push(single_event)
+                if not rejected == False:
+                    processed = True
+                    for pk in rejected:
+                        if pk != self.pushkey:
+                            # for sanity, we only remove the pushkey if it
+                            # was the one we actually sent...
+                            logger.warn(
+                                ("Ignoring rejected pushkey %s because we" +
+                                "didn't send it"), (pk,)
+                            )
+                        else:
+                            logger.info(
+                                "Pushkey %s was rejected: removing",
+                                pk
+                            )
+                            yield self.hs.get_pusherpool().remove_pusher(
+                                self.app_id, pk
+                            )
             else:
                 processed = True
+
+            if not self.alive:
+                continue
+
             if processed:
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
@@ -165,6 +188,14 @@ class Pusher(object):
         self.alive = False
 
     def dispatch_push(self, p):
+        """
+        Overridden by implementing classes to actually deliver the notification
+        :param p: The event to notify for as a single event from the event stream
+        :return: If the notification was delivered, an array containing any
+                 pushkeys that were rejected by the push gateway.
+                 False if the notification could not be delivered (ie.
+                 should be retried).
+        """
         pass
 
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f94f673391..bcfa06e2ab 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -25,8 +25,8 @@ logger = logging.getLogger(__name__)
 
 class HttpPusher(Pusher):
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
             user_name,
@@ -34,6 +34,7 @@ class HttpPusher(Pusher):
             app_display_name,
             device_display_name,
             pushkey,
+            pushkey_ts,
             data,
             last_token,
             last_success,
@@ -77,6 +78,7 @@ class HttpPusher(Pusher):
                     {
                         'app_id': self.app_id,
                         'pushkey': self.pushkey,
+                        'pushkeyTs': long(self.pushkey_ts / 1000),
                         'data': self.data_minus_url
                     }
                 ]
@@ -87,10 +89,13 @@ class HttpPusher(Pusher):
     def dispatch_push(self, event):
         notification_dict = self._build_notification_dict(event)
         if not notification_dict:
-            defer.returnValue(True)
+            defer.returnValue([])
         try:
-            yield self.httpCli.post_json_get_json(self.url, notification_dict)
+            resp = yield self.httpCli.post_json_get_json(self.url, notification_dict)
         except:
             logger.exception("Failed to push %s ", self.url)
             defer.returnValue(False)
-        defer.returnValue(True)
+        rejected = []
+        if 'rejected' in resp:
+            rejected = resp['rejected']
+        defer.returnValue(rejected)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d34ef3f6cf..edddc3003e 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -53,6 +53,7 @@ class PusherPool:
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
             "pushkey": pushkey,
+            "pushkey_ts": self.hs.get_clock().time_msec(),
             "data": data,
             "last_token": None,
             "last_success": None,
@@ -75,6 +76,7 @@ class PusherPool:
             app_display_name=app_display_name,
             device_display_name=device_display_name,
             pushkey=pushkey,
+            pushkey_ts=self.hs.get_clock().time_msec(),
             data=json.dumps(data)
         )
         self._refresh_pusher((app_id, pushkey))
@@ -88,6 +90,7 @@ class PusherPool:
                 app_display_name=pusherdict['app_display_name'],
                 device_display_name=pusherdict['device_display_name'],
                 pushkey=pusherdict['pushkey'],
+                pushkey_ts=pusherdict['pushkey_ts'],
                 data=pusherdict['data'],
                 last_token=pusherdict['last_token'],
                 last_success=pusherdict['last_success'],
@@ -118,3 +121,12 @@ class PusherPool:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
                 p.start()
+
+    @defer.inlineCallbacks
+    def remove_pusher(self, app_id, pushkey):
+        fullid = "%s:%s" % (app_id, pushkey)
+        if fullid in self.pushers:
+            logger.info("Stopping pusher %s", fullid)
+            self.pushers[fullid].stop()
+            del self.pushers[fullid]
+        yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
\ No newline at end of file
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9b5170a5f7..bfc4980256 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -30,7 +30,7 @@ class PusherStore(SQLBaseStore):
     def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
         sql = (
             "SELECT id, user_name, kind, app_id,"
-            "app_display_name, device_display_name, pushkey, data, "
+            "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers "
             "WHERE app_id = ? AND pushkey = ?"
@@ -49,10 +49,11 @@ class PusherStore(SQLBaseStore):
                 "app_display_name": r[4],
                 "device_display_name": r[5],
                 "pushkey": r[6],
-                "data": r[7],
-                "last_token": r[8],
-                "last_success": r[9],
-                "failing_since": r[10]
+                "pushkey_ts": r[7],
+                "data": r[8],
+                "last_token": r[9],
+                "last_success": r[10],
+                "failing_since": r[11]
             }
             for r in rows
         ]
@@ -63,7 +64,7 @@ class PusherStore(SQLBaseStore):
     def get_all_pushers(self):
         sql = (
             "SELECT id, user_name, kind, app_id,"
-            "app_display_name, device_display_name, pushkey, data, "
+            "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers"
         )
@@ -79,10 +80,11 @@ class PusherStore(SQLBaseStore):
                 "app_display_name": r[4],
                 "device_display_name": r[5],
                 "pushkey": r[6],
-                "data": r[7],
-                "last_token": r[8],
-                "last_success": r[9],
-                "failing_since": r[10]
+                "pushkey_ts": r[7],
+                "data": r[8],
+                "last_token": r[9],
+                "last_success": r[10],
+                "failing_since": r[11]
             }
             for r in rows
         ]
@@ -91,7 +93,8 @@ class PusherStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def add_pusher(self, user_name, kind, app_id,
-                   app_display_name, device_display_name, pushkey, data):
+                   app_display_name, device_display_name,
+                   pushkey, pushkey_ts, data):
         try:
             yield self._simple_upsert(
                 PushersTable.table_name,
@@ -104,6 +107,7 @@ class PusherStore(SQLBaseStore):
                     kind=kind,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
+                    ts=pushkey_ts,
                     data=data
                 ))
         except Exception as e:
@@ -111,6 +115,13 @@ class PusherStore(SQLBaseStore):
             raise StoreError(500, "Problem creating pusher.")
 
     @defer.inlineCallbacks
+    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+        yield self._simple_delete_one(
+            PushersTable.table_name,
+            dict(app_id=app_id, pushkey=pushkey)
+        )
+
+    @defer.inlineCallbacks
     def update_pusher_last_token(self, user_name, pushkey, last_token):
         yield self._simple_update_one(
             PushersTable.table_name,
@@ -147,6 +158,7 @@ class PushersTable(Table):
         "app_display_name",
         "device_display_name",
         "pushkey",
+        "pushkey_ts",
         "data",
         "last_token",
         "last_success",
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
index 799e48d780..a991e4eb11 100644
--- a/synapse/storage/schema/delta/v10.sql
+++ b/synapse/storage/schema/delta/v10.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
   data blob,
   last_token TEXT,
   last_success BIGINT,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index 799e48d780..a991e4eb11 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
   data blob,
   last_token TEXT,
   last_success BIGINT,