summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/__init__.py37
-rw-r--r--synapse/push/httppusher.py15
-rw-r--r--synapse/push/pusherpool.py12
3 files changed, 56 insertions, 8 deletions
diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index f4795d559c..839f666390 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -31,8 +31,8 @@ class Pusher(object):
     GIVE_UP_AFTER = 24 * 60 * 60 * 1000
 
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         self.hs = _hs
         self.evStreamHandler = self.hs.get_handlers().event_stream_handler
         self.store = self.hs.get_datastore()
@@ -42,6 +42,7 @@ class Pusher(object):
         self.app_display_name = app_display_name
         self.device_display_name = device_display_name
         self.pushkey = pushkey
+        self.pushkey_ts = pushkey_ts
         self.data = data
         self.last_token = last_token
         self.last_success = last_success  # not actually used
@@ -98,9 +99,31 @@ class Pusher(object):
 
             processed = False
             if self._should_notify_for_event(single_event):
-                processed = yield self.dispatch_push(single_event)
+                rejected = yield self.dispatch_push(single_event)
+                if not rejected == False:
+                    processed = True
+                    for pk in rejected:
+                        if pk != self.pushkey:
+                            # for sanity, we only remove the pushkey if it
+                            # was the one we actually sent...
+                            logger.warn(
+                                ("Ignoring rejected pushkey %s because we" +
+                                "didn't send it"), (pk,)
+                            )
+                        else:
+                            logger.info(
+                                "Pushkey %s was rejected: removing",
+                                pk
+                            )
+                            yield self.hs.get_pusherpool().remove_pusher(
+                                self.app_id, pk
+                            )
             else:
                 processed = True
+
+            if not self.alive:
+                continue
+
             if processed:
                 self.backoff_delay = Pusher.INITIAL_BACKOFF
                 self.last_token = chunk['end']
@@ -165,6 +188,14 @@ class Pusher(object):
         self.alive = False
 
     def dispatch_push(self, p):
+        """
+        Overridden by implementing classes to actually deliver the notification
+        :param p: The event to notify for as a single event from the event stream
+        :return: If the notification was delivered, an array containing any
+                 pushkeys that were rejected by the push gateway.
+                 False if the notification could not be delivered (ie.
+                 should be retried).
+        """
         pass
 
 
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f94f673391..bcfa06e2ab 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -25,8 +25,8 @@ logger = logging.getLogger(__name__)
 
 class HttpPusher(Pusher):
     def __init__(self, _hs, user_name, app_id,
-                 app_display_name, device_display_name, pushkey, data,
-                 last_token, last_success, failing_since):
+                 app_display_name, device_display_name, pushkey, pushkey_ts,
+                 data, last_token, last_success, failing_since):
         super(HttpPusher, self).__init__(
             _hs,
             user_name,
@@ -34,6 +34,7 @@ class HttpPusher(Pusher):
             app_display_name,
             device_display_name,
             pushkey,
+            pushkey_ts,
             data,
             last_token,
             last_success,
@@ -77,6 +78,7 @@ class HttpPusher(Pusher):
                     {
                         'app_id': self.app_id,
                         'pushkey': self.pushkey,
+                        'pushkeyTs': long(self.pushkey_ts / 1000),
                         'data': self.data_minus_url
                     }
                 ]
@@ -87,10 +89,13 @@ class HttpPusher(Pusher):
     def dispatch_push(self, event):
         notification_dict = self._build_notification_dict(event)
         if not notification_dict:
-            defer.returnValue(True)
+            defer.returnValue([])
         try:
-            yield self.httpCli.post_json_get_json(self.url, notification_dict)
+            resp = yield self.httpCli.post_json_get_json(self.url, notification_dict)
         except:
             logger.exception("Failed to push %s ", self.url)
             defer.returnValue(False)
-        defer.returnValue(True)
+        rejected = []
+        if 'rejected' in resp:
+            rejected = resp['rejected']
+        defer.returnValue(rejected)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d34ef3f6cf..edddc3003e 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -53,6 +53,7 @@ class PusherPool:
             "app_display_name": app_display_name,
             "device_display_name": device_display_name,
             "pushkey": pushkey,
+            "pushkey_ts": self.hs.get_clock().time_msec(),
             "data": data,
             "last_token": None,
             "last_success": None,
@@ -75,6 +76,7 @@ class PusherPool:
             app_display_name=app_display_name,
             device_display_name=device_display_name,
             pushkey=pushkey,
+            pushkey_ts=self.hs.get_clock().time_msec(),
             data=json.dumps(data)
         )
         self._refresh_pusher((app_id, pushkey))
@@ -88,6 +90,7 @@ class PusherPool:
                 app_display_name=pusherdict['app_display_name'],
                 device_display_name=pusherdict['device_display_name'],
                 pushkey=pusherdict['pushkey'],
+                pushkey_ts=pusherdict['pushkey_ts'],
                 data=pusherdict['data'],
                 last_token=pusherdict['last_token'],
                 last_success=pusherdict['last_success'],
@@ -118,3 +121,12 @@ class PusherPool:
                     self.pushers[fullid].stop()
                 self.pushers[fullid] = p
                 p.start()
+
+    @defer.inlineCallbacks
+    def remove_pusher(self, app_id, pushkey):
+        fullid = "%s:%s" % (app_id, pushkey)
+        if fullid in self.pushers:
+            logger.info("Stopping pusher %s", fullid)
+            self.pushers[fullid].stop()
+            del self.pushers[fullid]
+        yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
\ No newline at end of file