From 7be1065b8f6b6a4e865342f7fdadd00474b624ca Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Apr 2016 11:26:15 +0100 Subject: Add extra Measure --- synapse/push/httppusher.py | 30 ++++++++++++++++-------------- 1 file changed, 16 insertions(+), 14 deletions(-) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 57f0a69e03..0ceff661e7 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -114,20 +114,22 @@ class HttpPusher(object): def _process(self): if self.processing: return - try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - yield self._unsafe_process() - except: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + + with Measure(self.clock, "push._process"): + try: + self.processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering + try: + yield self._unsafe_process() + except: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False @defer.inlineCallbacks def _unsafe_process(self): -- cgit 1.4.1 From 96bcfb29c7b8cb04f4b887c518016cb968645cb9 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Apr 2016 11:26:33 +0100 Subject: Add index --- synapse/storage/schema/delta/31/pushers_index.sql | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) create mode 100644 synapse/storage/schema/delta/31/pushers_index.sql diff --git a/synapse/storage/schema/delta/31/pushers_index.sql b/synapse/storage/schema/delta/31/pushers_index.sql new file mode 100644 index 0000000000..9027bccc69 --- /dev/null +++ b/synapse/storage/schema/delta/31/pushers_index.sql @@ -0,0 +1,18 @@ +/* Copyright 2016 OpenMarket Ltd + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + CREATE INDEX event_push_actions_stream_ordering on event_push_actions( + stream_ordering, user_id + ); -- cgit 1.4.1 From 56da835eafa23c911749174d1d9a4e89d9b34643 Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Apr 2016 11:33:50 +0100 Subject: Add necessary logging contexts --- synapse/push/httppusher.py | 49 +++++++++++++++++++++++----------------------- 1 file changed, 25 insertions(+), 24 deletions(-) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index 0ceff661e7..e80b7dae51 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -21,6 +21,7 @@ import logging import push_rule_evaluator import push_tools +from synapse.util.logcontext import LoggingContext from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -85,9 +86,8 @@ class HttpPusher(object): @defer.inlineCallbacks def on_new_notifications(self, min_stream_ordering, max_stream_ordering): - with Measure(self.clock, "push.on_new_notifications"): - self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) - yield self._process() + self.max_stream_ordering = max(max_stream_ordering, self.max_stream_ordering) + yield self._process() @defer.inlineCallbacks def on_new_receipts(self, min_stream_id, max_stream_id): @@ -95,16 +95,16 @@ class HttpPusher(object): # We could check the receipts are actually m.read receipts here, # but currently that's the only type of receipt anyway... - with Measure(self.clock, "push.on_new_receipts"): - badge = yield push_tools.get_badge_count( - self.hs.get_datastore(), self.user_id - ) + with LoggingContext("push._process"): + with Measure(self.clock, "push.on_new_receipts"): + badge = yield push_tools.get_badge_count( + self.hs.get_datastore(), self.user_id + ) yield self.send_badge(badge) @defer.inlineCallbacks def on_timer(self): - with Measure(self.clock, "push.on_timer"): - yield self._process() + yield self._process() def on_stop(self): if self.timed_call: @@ -115,21 +115,22 @@ class HttpPusher(object): if self.processing: return - with Measure(self.clock, "push._process"): - try: - self.processing = True - # if the max ordering changes while we're running _unsafe_process, - # call it again, and so on until we've caught up. - while True: - starting_max_ordering = self.max_stream_ordering - try: - yield self._unsafe_process() - except: - logger.exception("Exception processing notifs") - if self.max_stream_ordering == starting_max_ordering: - break - finally: - self.processing = False + with LoggingContext("push._process"): + with Measure(self.clock, "push._process"): + try: + self.processing = True + # if the max ordering changes while we're running _unsafe_process, + # call it again, and so on until we've caught up. + while True: + starting_max_ordering = self.max_stream_ordering + try: + yield self._unsafe_process() + except: + logger.exception("Exception processing notifs") + if self.max_stream_ordering == starting_max_ordering: + break + finally: + self.processing = False @defer.inlineCallbacks def _unsafe_process(self): -- cgit 1.4.1 From d213d69fe301a1166516c2f56c50d7a379deaf6e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Apr 2016 11:36:23 +0100 Subject: Add desc arg --- synapse/storage/pusher.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/synapse/storage/pusher.py b/synapse/storage/pusher.py index e64c0dce0a..e5755c0aea 100644 --- a/synapse/storage/pusher.py +++ b/synapse/storage/pusher.py @@ -137,7 +137,11 @@ class PusherStore(SQLBaseStore): users = yield self.get_users_in_room(room_id) result = yield self._simple_select_many_batch( - 'pushers', 'user_name', users, ['user_name'] + table='pushers', + column='user_name', + iterable=users, + retcols=['user_name'], + desc='get_users_with_pushers_in_room' ) defer.returnValue([r['user_name'] for r in result]) -- cgit 1.4.1 From 2ae91a9e2f5632bcaa32154d457ddd39a96e981e Mon Sep 17 00:00:00 2001 From: Erik Johnston Date: Thu, 14 Apr 2016 11:37:50 +0100 Subject: Make send_badge private --- synapse/push/httppusher.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py index e80b7dae51..b939d889fb 100644 --- a/synapse/push/httppusher.py +++ b/synapse/push/httppusher.py @@ -100,7 +100,7 @@ class HttpPusher(object): badge = yield push_tools.get_badge_count( self.hs.get_datastore(), self.user_id ) - yield self.send_badge(badge) + yield self._send_badge(badge) @defer.inlineCallbacks def on_timer(self): @@ -294,7 +294,7 @@ class HttpPusher(object): defer.returnValue(rejected) @defer.inlineCallbacks - def send_badge(self, badge): + def _send_badge(self, badge): logger.info("Sending updated badge count %d to %r", badge, self.user_id) d = { 'notification': { -- cgit 1.4.1