diff options
Diffstat (limited to 'synapse/push')
-rw-r--r-- | synapse/push/httppusher.py | 53 |
1 files changed, 28 insertions, 25 deletions
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 57f0a69e03..6950a20632 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -21,6 +21,7 @@ import logging import push_rule_evaluator import push_tools +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -85,9 +86,8 @@ class HttpPusher(object): @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): - with Measure(self.clock, "push.on_new_notifications"): - self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) - yield self._process() + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + yield self._process() @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id): @@ -95,16 +95,16 @@ class HttpPusher(object): # We could check the receipts are actually m.read receipts here, # but currently that's the only type of receipt anyway... - with Measure(self.clock, "push.on_new_receipts"): - badge = yield push_tools.get_badge_count( - self.hs.get_datastore(), self.user_id - ) - yield self.send_badge(badge) + with LoggingContext("push.on_new_receipts"): + with Measure(self.clock, "push.on_new_receipts"): + badge = yield push_tools.get_badge_count( + self.hs.get_datastore(), self.user_id + ) + yield self._send_badge(badge) @defer.inlineCallbacks def on_timer(self): - with Measure(self.clock, "push.on_timer"): - yield self._process() + yield self._process() def on_stop(self): if self.timed_call: @@ -114,20 +114,23 @@ class HttpPusher(object): def _process(self): if self.processing: return - try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering + + with LoggingContext("push._process"): + with Measure(self.clock, "push._process"): try: - yield self._unsafe_process() - except: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + self.processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering + try: + yield self._unsafe_process() + except: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False @defer.inlineCallbacks def _unsafe_process(self): @@ -146,7 +149,7 @@ class HttpPusher(object): if processed: self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC self.last_stream_ordering = push_action['stream_ordering'] - self.store.update_pusher_last_stream_ordering_and_success( + yield self.store.update_pusher_last_stream_ordering_and_success( self.app_id, self.pushkey, self.user_id, self.last_stream_ordering, self.clock.time_msec() @@ -291,7 +294,7 @@ class HttpPusher(object): defer.returnValue(rejected) @defer.inlineCallbacks - def send_badge(self, badge): + def _send_badge(self, badge): logger.info("Sending updated badge count %d to %r", badge, self.user_id) d = { 'notification': { |