diff --git a/synapse/push/__init__.py b/synapse/push/__init__.py
index f4795d559c..839f666390 100644
--- a/synapse/push/__init__.py
+++ b/synapse/push/__init__.py
@@ -31,8 +31,8 @@ class Pusher(object):
GIVE_UP_AFTER = 24 * 60 * 60 * 1000
def __init__(self, _hs, user_name, app_id,
- app_display_name, device_display_name, pushkey, data,
- last_token, last_success, failing_since):
+ app_display_name, device_display_name, pushkey, pushkey_ts,
+ data, last_token, last_success, failing_since):
self.hs = _hs
self.evStreamHandler = self.hs.get_handlers().event_stream_handler
self.store = self.hs.get_datastore()
@@ -42,6 +42,7 @@ class Pusher(object):
self.app_display_name = app_display_name
self.device_display_name = device_display_name
self.pushkey = pushkey
+ self.pushkey_ts = pushkey_ts
self.data = data
self.last_token = last_token
self.last_success = last_success # not actually used
@@ -98,9 +99,31 @@ class Pusher(object):
processed = False
if self._should_notify_for_event(single_event):
- processed = yield self.dispatch_push(single_event)
+ rejected = yield self.dispatch_push(single_event)
+ if not rejected == False:
+ processed = True
+ for pk in rejected:
+ if pk != self.pushkey:
+ # for sanity, we only remove the pushkey if it
+ # was the one we actually sent...
+ logger.warn(
+ ("Ignoring rejected pushkey %s because we" +
+ "didn't send it"), (pk,)
+ )
+ else:
+ logger.info(
+ "Pushkey %s was rejected: removing",
+ pk
+ )
+ yield self.hs.get_pusherpool().remove_pusher(
+ self.app_id, pk
+ )
else:
processed = True
+
+ if not self.alive:
+ continue
+
if processed:
self.backoff_delay = Pusher.INITIAL_BACKOFF
self.last_token = chunk['end']
@@ -165,6 +188,14 @@ class Pusher(object):
self.alive = False
def dispatch_push(self, p):
+ """
+ Overridden by implementing classes to actually deliver the notification
+ :param p: The event to notify for as a single event from the event stream
+ :return: If the notification was delivered, an array containing any
+ pushkeys that were rejected by the push gateway.
+ False if the notification could not be delivered (ie.
+ should be retried).
+ """
pass
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index f94f673391..bcfa06e2ab 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -25,8 +25,8 @@ logger = logging.getLogger(__name__)
class HttpPusher(Pusher):
def __init__(self, _hs, user_name, app_id,
- app_display_name, device_display_name, pushkey, data,
- last_token, last_success, failing_since):
+ app_display_name, device_display_name, pushkey, pushkey_ts,
+ data, last_token, last_success, failing_since):
super(HttpPusher, self).__init__(
_hs,
user_name,
@@ -34,6 +34,7 @@ class HttpPusher(Pusher):
app_display_name,
device_display_name,
pushkey,
+ pushkey_ts,
data,
last_token,
last_success,
@@ -77,6 +78,7 @@ class HttpPusher(Pusher):
{
'app_id': self.app_id,
'pushkey': self.pushkey,
+ 'pushkeyTs': long(self.pushkey_ts / 1000),
'data': self.data_minus_url
}
]
@@ -87,10 +89,13 @@ class HttpPusher(Pusher):
def dispatch_push(self, event):
notification_dict = self._build_notification_dict(event)
if not notification_dict:
- defer.returnValue(True)
+ defer.returnValue([])
try:
- yield self.httpCli.post_json_get_json(self.url, notification_dict)
+ resp = yield self.httpCli.post_json_get_json(self.url, notification_dict)
except:
logger.exception("Failed to push %s ", self.url)
defer.returnValue(False)
- defer.returnValue(True)
+ rejected = []
+ if 'rejected' in resp:
+ rejected = resp['rejected']
+ defer.returnValue(rejected)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index d34ef3f6cf..edddc3003e 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -53,6 +53,7 @@ class PusherPool:
"app_display_name": app_display_name,
"device_display_name": device_display_name,
"pushkey": pushkey,
+ "pushkey_ts": self.hs.get_clock().time_msec(),
"data": data,
"last_token": None,
"last_success": None,
@@ -75,6 +76,7 @@ class PusherPool:
app_display_name=app_display_name,
device_display_name=device_display_name,
pushkey=pushkey,
+ pushkey_ts=self.hs.get_clock().time_msec(),
data=json.dumps(data)
)
self._refresh_pusher((app_id, pushkey))
@@ -88,6 +90,7 @@ class PusherPool:
app_display_name=pusherdict['app_display_name'],
device_display_name=pusherdict['device_display_name'],
pushkey=pusherdict['pushkey'],
+ pushkey_ts=pusherdict['pushkey_ts'],
data=pusherdict['data'],
last_token=pusherdict['last_token'],
last_success=pusherdict['last_success'],
@@ -118,3 +121,12 @@ class PusherPool:
self.pushers[fullid].stop()
self.pushers[fullid] = p
p.start()
+
+ @defer.inlineCallbacks
+ def remove_pusher(self, app_id, pushkey):
+ fullid = "%s:%s" % (app_id, pushkey)
+ if fullid in self.pushers:
+ logger.info("Stopping pusher %s", fullid)
+ self.pushers[fullid].stop()
+ del self.pushers[fullid]
+ yield self.store.delete_pusher_by_app_id_pushkey(app_id, pushkey)
\ No newline at end of file
|