diff options
Diffstat (limited to 'synapse/push/httppusher.py')
-rw-r--r-- | synapse/push/httppusher.py | 74 |
1 files changed, 59 insertions, 15 deletions
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 8a5d473108..81e18bcf7d 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd +# Copyright 2017 New Vector Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,21 +13,25 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging -from synapse.push import PusherConfigException +from prometheus_client import Counter -from twisted.internet import defer, reactor +from twisted.internet import defer from twisted.internet.error import AlreadyCalled, AlreadyCancelled -import logging -import push_rule_evaluator -import push_tools - +from synapse.push import PusherConfigException from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure +from . import push_rule_evaluator, push_tools + logger = logging.getLogger(__name__) +http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "") + +http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "") + class HttpPusher(object): INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes @@ -84,7 +89,10 @@ class HttpPusher(object): @defer.inlineCallbacks def on_started(self): - yield self._process() + try: + yield self._process() + except Exception: + logger.exception("Error starting http pusher") @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): @@ -131,7 +139,7 @@ class HttpPusher(object): starting_max_ordering = self.max_stream_ordering try: yield self._unsafe_process() - except: + except Exception: logger.exception("Exception processing notifs") if self.max_stream_ordering == starting_max_ordering: break @@ -151,9 +159,16 @@ class HttpPusher(object): self.user_id, self.last_stream_ordering, self.max_stream_ordering ) + logger.info( + "Processing %i unprocessed push actions for %s starting at " + "stream_ordering %s", + len(unprocessed), self.name, self.last_stream_ordering, + ) + for push_action in unprocessed: processed = yield self._process_one(push_action) if processed: + http_push_processed_counter.inc() self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action['stream_ordering'] yield self.store.update_pusher_last_stream_ordering_and_success( @@ -168,6 +183,7 @@ class HttpPusher(object): self.failing_since ) else: + http_push_failed_counter.inc() if not self.failing_since: self.failing_since = self.clock.time_msec() yield self.store.update_pusher_failing_since( @@ -204,7 +220,9 @@ class HttpPusher(object): ) else: logger.info("Push failed: delaying for %ds", self.backoff_delay) - self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer) + self.timed_call = self.hs.get_reactor().callLater( + self.backoff_delay, self.on_timer + ) self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC) break @@ -244,6 +262,26 @@ class HttpPusher(object): @defer.inlineCallbacks def _build_notification_dict(self, event, tweaks, badge): + if self.data.get('format') == 'event_id_only': + d = { + 'notification': { + 'event_id': event.event_id, + 'room_id': event.room_id, + 'counts': { + 'unread': badge, + }, + 'devices': [ + { + 'app_id': self.app_id, + 'pushkey': self.pushkey, + 'pushkey_ts': long(self.pushkey_ts / 1000), + 'data': self.data_minus_url, + } + ] + } + } + defer.returnValue(d) + ctx = yield push_tools.get_context_for_event( self.store, self.state_handler, event, self.user_id ) @@ -275,7 +313,7 @@ class HttpPusher(object): if event.type == 'm.room.member': d['notification']['membership'] = event.content['membership'] d['notification']['user_is_target'] = event.state_key == self.user_id - if not self.hs.config.push_redact_content and 'content' in event: + if self.hs.config.push_include_content and 'content' in event: d['notification']['content'] = event.content # We no longer send aliases separately, instead, we send the human @@ -294,8 +332,11 @@ class HttpPusher(object): defer.returnValue([]) try: resp = yield self.http_client.post_json_get_json(self.url, notification_dict) - except: - logger.warn("Failed to push %s ", self.url) + except Exception: + logger.warn( + "Failed to push event %s to %s", + event.event_id, self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: @@ -304,7 +345,7 @@ class HttpPusher(object): @defer.inlineCallbacks def _send_badge(self, badge): - logger.info("Sending updated badge count %d to %r", badge, self.user_id) + logger.info("Sending updated badge count %d to %s", badge, self.name) d = { 'notification': { 'id': '', @@ -325,8 +366,11 @@ class HttpPusher(object): } try: resp = yield self.http_client.post_json_get_json(self.url, d) - except: - logger.exception("Failed to push %s ", self.url) + except Exception: + logger.warn( + "Failed to send badge count to %s", + self.name, exc_info=True, + ) defer.returnValue(False) rejected = [] if 'rejected' in resp: |