summary refs log tree commit diff
path: root/synapse/push
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/push')
-rw-r--r--synapse/push/httppusher.py53
1 files changed, 28 insertions, 25 deletions
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index 57f0a69e03..6950a20632 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -21,6 +21,7 @@ import logging
 import push_rule_evaluator
 import push_tools
 
+from synapse.util.logcontext import LoggingContext
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -85,9 +86,8 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def on_new_notifications(self, min_stream_ordering, max_stream_ordering):
-        with Measure(self.clock, "push.on_new_notifications"):
-            self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
-            yield self._process()
+        self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering)
+        yield self._process()
 
     @defer.inlineCallbacks
     def on_new_receipts(self, min_stream_id, max_stream_id):
@@ -95,16 +95,16 @@ class HttpPusher(object):
 
         # We could check the receipts are actually m.read receipts here,
         # but currently that's the only type of receipt anyway...
-        with Measure(self.clock, "push.on_new_receipts"):
-            badge = yield push_tools.get_badge_count(
-                self.hs.get_datastore(), self.user_id
-            )
-            yield self.send_badge(badge)
+        with LoggingContext("push.on_new_receipts"):
+            with Measure(self.clock, "push.on_new_receipts"):
+                badge = yield push_tools.get_badge_count(
+                    self.hs.get_datastore(), self.user_id
+                )
+            yield self._send_badge(badge)
 
     @defer.inlineCallbacks
     def on_timer(self):
-        with Measure(self.clock, "push.on_timer"):
-            yield self._process()
+        yield self._process()
 
     def on_stop(self):
         if self.timed_call:
@@ -114,20 +114,23 @@ class HttpPusher(object):
     def _process(self):
         if self.processing:
             return
-        try:
-            self.processing = True
-            # if the max ordering changes while we're running _unsafe_process,
-            # call it again, and so on until we've caught up.
-            while True:
-                starting_max_ordering = self.max_stream_ordering
+
+        with LoggingContext("push._process"):
+            with Measure(self.clock, "push._process"):
                 try:
-                    yield self._unsafe_process()
-                except:
-                    logger.exception("Exception processing notifs")
-                if self.max_stream_ordering == starting_max_ordering:
-                    break
-        finally:
-            self.processing = False
+                    self.processing = True
+                    # if the max ordering changes while we're running _unsafe_process,
+                    # call it again, and so on until we've caught up.
+                    while True:
+                        starting_max_ordering = self.max_stream_ordering
+                        try:
+                            yield self._unsafe_process()
+                        except:
+                            logger.exception("Exception processing notifs")
+                        if self.max_stream_ordering == starting_max_ordering:
+                            break
+                finally:
+                    self.processing = False
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
@@ -146,7 +149,7 @@ class HttpPusher(object):
             if processed:
                 self.backoff_delay = HttpPusher.INITIAL_BACKOFF_SEC
                 self.last_stream_ordering = push_action['stream_ordering']
-                self.store.update_pusher_last_stream_ordering_and_success(
+                yield self.store.update_pusher_last_stream_ordering_and_success(
                     self.app_id, self.pushkey, self.user_id,
                     self.last_stream_ordering,
                     self.clock.time_msec()
@@ -291,7 +294,7 @@ class HttpPusher(object):
         defer.returnValue(rejected)
 
     @defer.inlineCallbacks
-    def send_badge(self, badge):
+    def _send_badge(self, badge):
         logger.info("Sending updated badge count %d to %r", badge, self.user_id)
         d = {
             'notification': {