diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index d695885649..0d5450bc01 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -76,15 +76,25 @@ class HttpPusher(object):
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
+ @defer.inlineCallbacks
def on_started(self):
- self._process()
+ yield self._process()
+ @defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
self.max_stream_ordering = max_stream_ordering
- self._process()
+ yield self._process()
+
+ @defer.inlineCallbacks
+ def on_new_receipts(self, min_stream_id, max_stream_id):
+ # We could check the receipts are actually m.read receipts here,
+ # but currently that's the only type of receipt anyway...
+ badge = yield push_tools.get_badge_count(self.hs, self.user_id)
+ yield self.send_badge(badge)
+ @defer.inlineCallbacks
def on_timer(self):
- self._process()
+ yield self._process()
def on_stop(self):
if self.timed_call:
@@ -106,22 +116,24 @@ class HttpPusher(object):
self.last_stream_ordering,
self.clock.time_msec()
)
- self.failing_since = None
- yield self.store.update_pusher_failing_since(
- self.app_id, self.pushkey, self.user_id,
- self.failing_since
- )
+ if self.failing_since:
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
else:
- self.failing_since = self.clock.time_msec()
- yield self.store.update_pusher_failing_since(
- self.app_id, self.pushkey, self.user_id,
- self.failing_since
- )
+ if not self.failing_since:
+ self.failing_since = self.clock.time_msec()
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
if (
self.failing_since and
self.failing_since <
- self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER
+ self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER_MS
):
# we really only give up so that if the URL gets
# fixed, we don't suddenly deliver a load
@@ -148,7 +160,7 @@ class HttpPusher(object):
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
- self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC)
+ self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break
@defer.inlineCallbacks
@@ -191,7 +203,8 @@ class HttpPusher(object):
d = {
'notification': {
- 'id': event.event_id,
+ 'id': event.event_id, # deprecated: remove soon
+ 'event_id': event.event_id,
'room_id': event.room_id,
'type': event.type,
'sender': event.user_id,
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index b67ad455ea..7b1ce81e9a 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -126,11 +126,29 @@ class PusherPool:
for u in users_affected:
if u in self.pushers:
for p in self.pushers[u].values():
- p.on_new_notifications(min_stream_id, max_stream_id)
+ yield p.on_new_notifications(min_stream_id, max_stream_id)
except:
logger.exception("Exception in pusher on_new_notifications")
@defer.inlineCallbacks
+ def on_new_receipts(self, min_stream_id, max_stream_id, affected_room_ids):
+ yield run_on_reactor()
+ try:
+ # Need to subtract 1 from the minimum because the lower bound here
+ # is not inclusive
+ updated_receipts = yield self.store.get_all_updated_receipts(
+ min_stream_id - 1, max_stream_id
+ )
+ # This returns a tuple, user_id is at index 3
+ users_affected = set([r[3] for r in updated_receipts])
+ for u in users_affected:
+ if u in self.pushers:
+ for p in self.pushers[u].values():
+ yield p.on_new_receipts(min_stream_id, max_stream_id)
+ except:
+ logger.exception("Exception in pusher on_new_receipts")
+
+ @defer.inlineCallbacks
def _refresh_pusher(self, app_id, pushkey, user_id):
resultlist = yield self.store.get_pushers_by_app_id_and_pushkey(
app_id, pushkey
|