summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/push/__init__.py37
-rw-r--r--synapse/push/httppusher.py15
-rw-r--r--synapse/push/pusherpool.py12
-rw-r--r--synapse/storage/pusher.py34
-rw-r--r--synapse/storage/schema/delta/v10.sql1
-rw-r--r--synapse/storage/schema/pusher.sql1
6 files changed, 81 insertions, 19 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index f4795d559c..839f666390 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -31,8 +31,8 @@ class Pusher(object):
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
@@ -42,6 +42,7 @@ class Pusher(object):
         self.app_display_name = app_display_name
         self.device_display_name = device_display_name
         self.pushkey = pushkey
+        self.pushkey_ts = pushkey_ts
         self.data = data
         self.last_token = last_token
         self.last_success = last_success  # not actually used
@@ -98,9 +99,31 @@ class Pusher(object):
 
             processed = False
             if self._should_notify_for_event(single_event):
-                processed = yield self.dispatch_push(single_event)
+                rejected = yield self.dispatch_push(single_event)
+                if not rejected == False:
+                    processed = True
+                    for pk in rejected:
+                        if pk != self.pushkey:
+                            # for sanity, we only remove the pushkey if it
+                            # was the one we actually sent...
+                            logger.warn(
+                                ("Ignoring rejected pushkey %s because we" +
+                                "didn't send it"), (pk,)
+                            )
+                        else:
+                            logger.info(
+                                "Pushkey %s was rejected: removing",
+                                pk
+                            )
+                            yield self.hs.get_pusherpool().remove_pusher(
+                                self.app_id, pk
+                            )
             else:
                 processed = True
+
+            if not self.alive:
+                continue
+
             if processed:
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
@@ -165,6 +188,14 @@ class Pusher(object):
         self.alive = False
 
     def dispatch_push(self, p):
+        """
+        Overridden by implementing classes to actually deliver the notification
+        :param p: The event to notify for as a single event from the event stream
+        :return: If the notification was delivered, an array containing any
+                 pushkeys that were rejected by the push gateway.
+                 False if the notification could not be delivered (ie.
+                 should be retried).
+        """
         pass
 
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f94f673391..bcfa06e2ab 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -25,8 +25,8 @@ logger = logging.getLogger(__name__)
 
 class HttpPusher(Pusher):
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
             user_name,
@@ -34,6 +34,7 @@ class HttpPusher(Pusher):
             app_display_name,
             device_display_name,
             pushkey,
+            pushkey_ts,
             data,
             last_token,
             last_success,
@@ -77,6 +78,7 @@ class HttpPusher(Pusher):
                     {
                         'app_id': self.app_id,
                         'pushkey': self.pushkey,
+                        'pushkeyTs': long(self.pushkey_ts / 1000),
                         'data': self.data_minus_url
                     }
                 ]
@@ -87,10 +89,13 @@ class HttpPusher(Pusher):
     def dispatch_push(self, event):
         notification_dict = self._build_notification_dict(event)
         if not notification_dict:
-            defer.returnValue(True)
+            defer.returnValue([])
         try:
-            yield self.httpCli.post_json_get_json(self.url, notification_dict)
+            resp = yield self.httpCli.post_json_get_json(self.url, notification_dict)
         except:
             logger.exception("Failed to push %s ", self.url)
             defer.returnValue(False)
-        defer.returnValue(True)
+        rejected = []
+        if 'rejected' in resp:
+            rejected = resp['rejected']
+        defer.returnValue(rejected)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d34ef3f6cf..edddc3003e 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -53,6 +53,7 @@ class PusherPool:
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
             "pushkey": pushkey,
+            "pushkey_ts": self.hs.get_clock().time_msec(),
             "data": data,
             "last_token": None,
             "last_success": None,
@@ -75,6 +76,7 @@ class PusherPool:
             app_display_name=app_display_name,
             device_display_name=device_display_name,
             pushkey=pushkey,
+            pushkey_ts=self.hs.get_clock().time_msec(),
             data=json.dumps(data)
         )
         self._refresh_pusher((app_id, pushkey))
@@ -88,6 +90,7 @@ class PusherPool:
                 app_display_name=pusherdict['app_display_name'],
                 device_display_name=pusherdict['device_display_name'],
                 pushkey=pusherdict['pushkey'],
+                pushkey_ts=pusherdict['pushkey_ts'],
                 data=pusherdict['data'],
                 last_token=pusherdict['last_token'],
                 last_success=pusherdict['last_success'],
@@ -118,3 +121,12 @@ class PusherPool:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
                 p.start()
+
+    @defer.inlineCallbacks
+    def remove_pusher(self, app_id, pushkey):
+        fullid = "%s:%s" % (app_id, pushkey)
+        if fullid in self.pushers:
+            logger.info("Stopping pusher %s", fullid)
+            self.pushers[fullid].stop()
+            del self.pushers[fullid]
+        yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
\ No newline at end of file
diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py
index 9b5170a5f7..bfc4980256 100644
--- a/synapse/storage/pusher.py
+++ b/synapse/storage/pusher.py
@@ -30,7 +30,7 @@ class PusherStore(SQLBaseStore):
     def get_pushers_by_app_id_and_pushkey(self, app_id_and_pushkey):
         sql = (
             "SELECT id, user_name, kind, app_id,"
-            "app_display_name, device_display_name, pushkey, data, "
+            "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers "
             "WHERE app_id = ? AND pushkey = ?"
@@ -49,10 +49,11 @@ class PusherStore(SQLBaseStore):
                 "app_display_name": r[4],
                 "device_display_name": r[5],
                 "pushkey": r[6],
-                "data": r[7],
-                "last_token": r[8],
-                "last_success": r[9],
-                "failing_since": r[10]
+                "pushkey_ts": r[7],
+                "data": r[8],
+                "last_token": r[9],
+                "last_success": r[10],
+                "failing_since": r[11]
             }
             for r in rows
         ]
@@ -63,7 +64,7 @@ class PusherStore(SQLBaseStore):
     def get_all_pushers(self):
         sql = (
             "SELECT id, user_name, kind, app_id,"
-            "app_display_name, device_display_name, pushkey, data, "
+            "app_display_name, device_display_name, pushkey, ts, data, "
             "last_token, last_success, failing_since "
             "FROM pushers"
         )
@@ -79,10 +80,11 @@ class PusherStore(SQLBaseStore):
                 "app_display_name": r[4],
                 "device_display_name": r[5],
                 "pushkey": r[6],
-                "data": r[7],
-                "last_token": r[8],
-                "last_success": r[9],
-                "failing_since": r[10]
+                "pushkey_ts": r[7],
+                "data": r[8],
+                "last_token": r[9],
+                "last_success": r[10],
+                "failing_since": r[11]
             }
             for r in rows
         ]
@@ -91,7 +93,8 @@ class PusherStore(SQLBaseStore):
 
     @defer.inlineCallbacks
     def add_pusher(self, user_name, kind, app_id,
-                   app_display_name, device_display_name, pushkey, data):
+                   app_display_name, device_display_name,
+                   pushkey, pushkey_ts, data):
         try:
             yield self._simple_upsert(
                 PushersTable.table_name,
@@ -104,6 +107,7 @@ class PusherStore(SQLBaseStore):
                     kind=kind,
                     app_display_name=app_display_name,
                     device_display_name=device_display_name,
+                    ts=pushkey_ts,
                     data=data
                 ))
         except Exception as e:
@@ -111,6 +115,13 @@ class PusherStore(SQLBaseStore):
             raise StoreError(500, "Problem creating pusher.")
 
     @defer.inlineCallbacks
+    def delete_pusher_by_app_id_pushkey(self, app_id, pushkey):
+        yield self._simple_delete_one(
+            PushersTable.table_name,
+            dict(app_id=app_id, pushkey=pushkey)
+        )
+
+    @defer.inlineCallbacks
     def update_pusher_last_token(self, user_name, pushkey, last_token):
         yield self._simple_update_one(
             PushersTable.table_name,
@@ -147,6 +158,7 @@ class PushersTable(Table):
         "app_display_name",
         "device_display_name",
         "pushkey",
+        "pushkey_ts",
         "data",
         "last_token",
         "last_success",
diff --git a/synapse/storage/schema/delta/v10.sql b/synapse/storage/schema/delta/v10.sql
index 799e48d780..a991e4eb11 100644
--- a/synapse/storage/schema/delta/v10.sql
+++ b/synapse/storage/schema/delta/v10.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
   data blob,
   last_token TEXT,
   last_success BIGINT,
diff --git a/synapse/storage/schema/pusher.sql b/synapse/storage/schema/pusher.sql
index 799e48d780..a991e4eb11 100644
--- a/synapse/storage/schema/pusher.sql
+++ b/synapse/storage/schema/pusher.sql
@@ -21,6 +21,7 @@ CREATE TABLE IF NOT EXISTS pushers (
   app_display_name varchar(64) NOT NULL,
   device_display_name varchar(128) NOT NULL,
   pushkey blob NOT NULL,
+  ts BIGINT NOT NULL,
   data blob,
   last_token TEXT,
   last_success BIGINT,