diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 8a5d473108..81e18bcf7d 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
+# Copyright 2017 New Vector Ltd
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -12,21 +13,25 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
+import logging
-from synapse.push import PusherConfigException
+from prometheus_client import Counter
-from twisted.internet import defer, reactor
+from twisted.internet import defer
from twisted.internet.error import AlreadyCalled, AlreadyCancelled
-import logging
-import push_rule_evaluator
-import push_tools
-
+from synapse.push import PusherConfigException
from synapse.util.logcontext import LoggingContext
from synapse.util.metrics import Measure
+from . import push_rule_evaluator, push_tools
+
logger = logging.getLogger(__name__)
+http_push_processed_counter = Counter("synapse_http_httppusher_http_pushes_processed", "")
+
+http_push_failed_counter = Counter("synapse_http_httppusher_http_pushes_failed", "")
+
class HttpPusher(object):
INITIAL_BACKOFF_SEC = 1 # in seconds because that's what Twisted takes
@@ -84,7 +89,10 @@ class HttpPusher(object):
@defer.inlineCallbacks
def on_started(self):
- yield self._process()
+ try:
+ yield self._process()
+ except Exception:
+ logger.exception("Error starting http pusher")
@defer.inlineCallbacks
def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
@@ -131,7 +139,7 @@ class HttpPusher(object):
starting_max_ordering = self.max_stream_ordering
try:
yield self._unsafe_process()
- except:
+ except Exception:
logger.exception("Exception processing notifs")
if self.max_stream_ordering == starting_max_ordering:
break
@@ -151,9 +159,16 @@ class HttpPusher(object):
self.user_id, self.last_stream_ordering, self.max_stream_ordering
)
+ logger.info(
+ "Processing %i unprocessed push actions for %s starting at "
+ "stream_ordering %s",
+ len(unprocessed), self.name, self.last_stream_ordering,
+ )
+
for push_action in unprocessed:
processed = yield self._process_one(push_action)
if processed:
+ http_push_processed_counter.inc()
self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
self.last_stream_ordering = push_action['stream_ordering']
yield self.store.update_pusher_last_stream_ordering_and_success(
@@ -168,6 +183,7 @@ class HttpPusher(object):
self.failing_since
)
else:
+ http_push_failed_counter.inc()
if not self.failing_since:
self.failing_since = self.clock.time_msec()
yield self.store.update_pusher_failing_since(
@@ -204,7 +220,9 @@ class HttpPusher(object):
)
else:
logger.info("Push failed: delaying for %ds", self.backoff_delay)
- self.timed_call = reactor.callLater(self.backoff_delay, self.on_timer)
+ self.timed_call = self.hs.get_reactor().callLater(
+ self.backoff_delay, self.on_timer
+ )
self.backoff_delay = min(self.backoff_delay * 2, self.MAX_BACKOFF_SEC)
break
@@ -244,6 +262,26 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _build_notification_dict(self, event, tweaks, badge):
+ if self.data.get('format') == 'event_id_only':
+ d = {
+ 'notification': {
+ 'event_id': event.event_id,
+ 'room_id': event.room_id,
+ 'counts': {
+ 'unread': badge,
+ },
+ 'devices': [
+ {
+ 'app_id': self.app_id,
+ 'pushkey': self.pushkey,
+ 'pushkey_ts': long(self.pushkey_ts / 1000),
+ 'data': self.data_minus_url,
+ }
+ ]
+ }
+ }
+ defer.returnValue(d)
+
ctx = yield push_tools.get_context_for_event(
self.store, self.state_handler, event, self.user_id
)
@@ -275,7 +313,7 @@ class HttpPusher(object):
if event.type == 'm.room.member':
d['notification']['membership'] = event.content['membership']
d['notification']['user_is_target'] = event.state_key == self.user_id
- if not self.hs.config.push_redact_content and 'content' in event:
+ if self.hs.config.push_include_content and 'content' in event:
d['notification']['content'] = event.content
# We no longer send aliases separately, instead, we send the human
@@ -294,8 +332,11 @@ class HttpPusher(object):
defer.returnValue([])
try:
resp = yield self.http_client.post_json_get_json(self.url, notification_dict)
- except:
- logger.warn("Failed to push %s ", self.url)
+ except Exception:
+ logger.warn(
+ "Failed to push event %s to %s",
+ event.event_id, self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
@@ -304,7 +345,7 @@ class HttpPusher(object):
@defer.inlineCallbacks
def _send_badge(self, badge):
- logger.info("Sending updated badge count %d to %r", badge, self.user_id)
+ logger.info("Sending updated badge count %d to %s", badge, self.name)
d = {
'notification': {
'id': '',
@@ -325,8 +366,11 @@ class HttpPusher(object):
}
try:
resp = yield self.http_client.post_json_get_json(self.url, d)
- except:
- logger.exception("Failed to push %s ", self.url)
+ except Exception:
+ logger.warn(
+ "Failed to send badge count to %s",
+ self.name, exc_info=True,
+ )
defer.returnValue(False)
rejected = []
if 'rejected' in resp:
|