diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 9be4869360..d695885649 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -13,60 +13,188 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from synapse.push import Pusher, PusherConfigException
+from synapse.push import PusherConfigException
-from twisted.internet import defer
+from twisted.internet import defer, reactor
import logging
+import push_rule_evaluator
+import push_tools
logger = logging.getLogger(__name__)
-class HttpPusher(Pusher):
- def __init__(self, _hs, user_id, app_id,
- app_display_name, device_display_name, pushkey, pushkey_ts,
- data, last_token, last_success, failing_since):
- super(HttpPusher, self).__init__(
- _hs,
- user_id,
- app_id,
- app_display_name,
- device_display_name,
- pushkey,
- pushkey_ts,
- data,
- last_token,
- last_success,
- failing_since
+class HttpPusher(object):
+ INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
+ MAX_BACKOFF_SEC = 60 * 60
+
+ # This one's in ms because we compare it against the clock
+ GIVE_UP_AFTER_MS = 24 * 60 * 60 * 1000
+
+ def __init__(self, hs, pusherdict):
+ self.hs = hs
+ self.store = self.hs.get_datastore()
+ self.clock = self.hs.get_clock()
+ self.user_id = pusherdict['user_name']
+ self.app_id = pusherdict['app_id']
+ self.app_display_name = pusherdict['app_display_name']
+ self.device_display_name = pusherdict['device_display_name']
+ self.pushkey = pusherdict['pushkey']
+ self.pushkey_ts = pusherdict['ts']
+ self.data = pusherdict['data']
+ self.last_stream_ordering = pusherdict['last_stream_ordering']
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.failing_since = pusherdict['failing_since']
+ self.timed_call = None
+
+ # This is the highest stream ordering we know it's safe to process.
+ # When new events arrive, we'll be given a window of new events: we
+ # should honour this rather than just looking for anything higher
+ # because of potential out-of-order event serialisation. This starts
+ # off as None though as we don't know any better.
+ self.max_stream_ordering = None
+
+ if 'data' not in pusherdict:
+ raise PusherConfigException(
+ "No 'data' key for HTTP pusher"
+ )
+ self.data = pusherdict['data']
+
+ self.name = "%s/%s/%s" % (
+ pusherdict['user_name'],
+ pusherdict['app_id'],
+ pusherdict['pushkey'],
)
- if 'url' not in data:
+
+ if 'url' not in self.data:
raise PusherConfigException(
"'url' required in data for HTTP pusher"
)
- self.url = data['url']
- self.http_client = _hs.get_simple_http_client()
+ self.url = self.data['url']
+ self.http_client = hs.get_simple_http_client()
self.data_minus_url = {}
self.data_minus_url.update(self.data)
del self.data_minus_url['url']
+ def on_started(self):
+ self._process()
+
+ def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
+ self.max_stream_ordering = max_stream_ordering
+ self._process()
+
+ def on_timer(self):
+ self._process()
+
+ def on_stop(self):
+ if self.timed_call:
+ self.timed_call.cancel()
+
@defer.inlineCallbacks
- def _build_notification_dict(self, event, tweaks, badge):
- # we probably do not want to push for every presence update
- # (we may want to be able to set up notifications when specific
- # people sign in, but we'd want to only deliver the pertinent ones)
- # Actually, presence events will not get this far now because we
- # need to filter them out in the main Pusher code.
- if 'event_id' not in event:
- defer.returnValue(None)
+ def _process(self):
+ unprocessed = yield self.store.get_unread_push_actions_for_user_in_range(
+ self.user_id, self.last_stream_ordering, self.max_stream_ordering
+ )
+
+ for push_action in unprocessed:
+ processed = yield self._process_one(push_action)
+ if processed:
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.last_stream_ordering = push_action['stream_ordering']
+ self.store.update_pusher_last_stream_ordering_and_success(
+ self.app_id, self.pushkey, self.user_id,
+ self.last_stream_ordering,
+ self.clock.time_msec()
+ )
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
+ else:
+ self.failing_since = self.clock.time_msec()
+ yield self.store.update_pusher_failing_since(
+ self.app_id, self.pushkey, self.user_id,
+ self.failing_since
+ )
+
+ if (
+ self.failing_since and
+ self.failing_since <
+ self.clock.time_msec() - HttpPusher.GIVE_UP_AFTER
+ ):
+ # we really only give up so that if the URL gets
+ # fixed, we don't suddenly deliver a load
+ # of old notifications.
+ logger.warn("Giving up on a notification to user %s, "
+ "pushkey %s",
+ self.user_id, self.pushkey)
+ self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
+ self.last_stream_ordering = push_action['stream_ordering']
+ yield self.store.update_pusher_last_stream_ordering(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.last_stream_ordering
+ )
+
+ self.failing_since = None
+ yield self.store.update_pusher_failing_since(
+ self.app_id,
+ self.pushkey,
+ self.user_id,
+ self.failing_since
+ )
+ else:
+ logger.info("Push failed: delaying for %ds", self.backoff_delay)
+ self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.backoff_delay = min(self.backoff_delay, self.MAX_BACKOFF_SEC)
+ break
+
+ @defer.inlineCallbacks
+ def _process_one(self, push_action):
+ if 'notify' not in push_action['actions']:
+ defer.returnValue(True)
- ctx = yield self.get_context_for_event(event)
+ tweaks = push_rule_evaluator.PushRuleEvaluator.tweaks_for_actions(push_action['actions'])
+ badge = yield push_tools.get_badge_count(self.hs, self.user_id)
+
+ event = yield self.store.get_event(push_action['event_id'], allow_none=True)
+ if event is None:
+ defer.returnValue(True) # It's been redacted
+ rejected = yield self.dispatch_push(event, tweaks, badge)
+ if rejected is False:
+ defer.returnValue(False)
+
+ if isinstance(rejected, list) or isinstance(rejected, tuple):
+ for pk in rejected:
+ if pk != self.pushkey:
+ # for sanity, we only remove the pushkey if it
+ # was the one we actually sent...
+ logger.warn(
+ ("Ignoring rejected pushkey %s because we"
+ " didn't send it"), pk
+ )
+ else:
+ logger.info(
+ "Pushkey %s was rejected: removing",
+ pk
+ )
+ yield self.hs.get_pusherpool().remove_pusher(
+ self.app_id, pk, self.user_id
+ )
+ defer.returnValue(True)
+
+ @defer.inlineCallbacks
+ def _build_notification_dict(self, event, tweaks, badge):
+ ctx = yield push_tools.get_context_for_event(self.hs, event)
d = {
'notification': {
- 'id': event['event_id'],
- 'room_id': event['room_id'],
- 'type': event['type'],
- 'sender': event['user_id'],
+ 'id': event.event_id,
+ 'room_id': event.room_id,
+ 'type': event.type,
+ 'sender': event.user_id,
'counts': { # -- we don't mark messages as read yet so
# we have no way of knowing
# Just set the badge to 1 until we have read receipts
@@ -84,11 +212,11 @@ class HttpPusher(Pusher):
]
}
}
- if event['type'] == 'm.room.member':
- d['notification']['membership'] = event['content']['membership']
- d['notification']['user_is_target'] = event['state_key'] == self.user_id
+ if event.type == 'm.room.member':
+ d['notification']['membership'] = event.content['membership']
+ d['notification']['user_is_target'] = event.state_key == self.user_id
if 'content' in event:
- d['notification']['content'] = event['content']
+ d['notification']['content'] = event.content
if len(ctx['aliases']):
d['notification']['room_alias'] = ctx['aliases'][0]
|