diff options
-rw-r--r-- | synapse/handlers/message.py | 114 | ||||
-rw-r--r-- | synapse/push/action_generator.py | 17 | ||||
-rw-r--r-- | synapse/push/bulk_push_rule_evaluator.py | 85 | ||||
-rw-r--r-- | synapse/util/logutils.py | 13 |
4 files changed, 134 insertions, 95 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c41dafdef5..0b1aa8218e 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -29,6 +29,7 @@ from synapse.util import unwrapFirstError from synapse.util.async import concurrently_execute from synapse.util.caches.snapshot_cache import SnapshotCache from synapse.util.logcontext import PreserveLoggingContext, preserve_fn +from synapse.util.logutils import log_duration from synapse.visibility import filter_events_for_client from ._base import BaseHandler @@ -734,43 +735,47 @@ class MessageHandler(BaseHandler): @defer.inlineCallbacks def _create_new_client_event(self, builder, prev_event_ids=None): - if prev_event_ids: - prev_events = yield self.store.add_event_hashes(prev_event_ids) - prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) - depth = prev_max_depth + 1 - else: - latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( - builder.room_id, - ) - - if latest_ret: - depth = max([d for _, _, d in latest_ret]) + 1 + with log_duration("prev_events"): + if prev_event_ids: + prev_events = yield self.store.add_event_hashes(prev_event_ids) + prev_max_depth = yield self.store.get_max_depth_of_events(prev_event_ids) + depth = prev_max_depth + 1 else: - depth = 1 + latest_ret = yield self.store.get_latest_event_ids_and_hashes_in_room( + builder.room_id, + ) + + if latest_ret: + depth = max([d for _, _, d in latest_ret]) + 1 + else: + depth = 1 - prev_events = [ - (event_id, prev_hashes) - for event_id, prev_hashes, _ in latest_ret - ] + prev_events = [ + (event_id, prev_hashes) + for event_id, prev_hashes, _ in latest_ret + ] builder.prev_events = prev_events builder.depth = depth state_handler = self.state_handler - context = yield state_handler.compute_event_context(builder) + with log_duration("context"): + context = yield state_handler.compute_event_context(builder) if builder.is_state(): builder.prev_state = yield self.store.add_event_hashes( context.prev_state_events ) - yield self.auth.add_auth_events(builder, context) + with log_duration("add_auth"): + yield self.auth.add_auth_events(builder, context) - signing_key = self.hs.config.signing_key[0] - add_hashes_and_signatures( - builder, self.server_name, signing_key - ) + with log_duration("signing"): + signing_key = self.hs.config.signing_key[0] + add_hashes_and_signatures( + builder, self.server_name, signing_key + ) event = builder.build() @@ -798,7 +803,8 @@ class MessageHandler(BaseHandler): self.ratelimit(requester) try: - self.auth.check(event, auth_events=context.current_state) + with log_duration("auth_check"): + self.auth.check(event, auth_events=context.current_state) except AuthError as err: logger.warn("Denying new event %r because %s", event, err) raise err @@ -882,42 +888,48 @@ class MessageHandler(BaseHandler): "Changing the room create event is forbidden", ) - action_generator = ActionGenerator(self.hs) - yield action_generator.handle_push_actions_for_event( - event, context - ) + with log_duration("action_generator"): + action_generator = ActionGenerator(self.hs) + yield action_generator.handle_push_actions_for_event( + event, context + ) - (event_stream_id, max_stream_id) = yield self.store.persist_event( - event, context=context - ) + with log_duration("persist_event"): + (event_stream_id, max_stream_id) = yield self.store.persist_event( + event, context=context + ) # this intentionally does not yield: we don't care about the result # and don't need to wait for it. - preserve_fn(self.hs.get_pusherpool().on_new_notifications)( - event_stream_id, max_stream_id - ) + with log_duration("pusherpool"): + preserve_fn(self.hs.get_pusherpool().on_new_notifications)( + event_stream_id, max_stream_id + ) destinations = set() - for k, s in context.current_state.items(): - try: - if k[0] == EventTypes.Member: - if s.content["membership"] == Membership.JOIN: - destinations.add(get_domain_from_id(s.state_key)) - except SynapseError: - logger.warn( - "Failed to get destination from event %s", s.event_id - ) + with log_duration("destination"): + for k, s in context.current_state.items(): + try: + if k[0] == EventTypes.Member: + if s.content["membership"] == Membership.JOIN: + destinations.add(get_domain_from_id(s.state_key)) + except SynapseError: + logger.warn( + "Failed to get destination from event %s", s.event_id + ) - with PreserveLoggingContext(): - # Don't block waiting on waking up all the listeners. - self.notifier.on_new_room_event( - event, event_stream_id, max_stream_id, - extra_users=extra_users - ) + with log_duration("on_new_room_event"): + with PreserveLoggingContext(): + # Don't block waiting on waking up all the listeners. + self.notifier.on_new_room_event( + event, event_stream_id, max_stream_id, + extra_users=extra_users + ) # If invite, remove room_state from unsigned before sending. event.unsigned.pop("invite_room_state", None) - federation_handler.handle_new_event( - event, destinations=destinations, - ) + with log_duration("handle_new_event"): + federation_handler.handle_new_event( + event, destinations=destinations, + ) diff --git a/synapse/push/action_generator.py b/synapse/push/action_generator.py index 9b208668b6..e2b74f5aa2 100644 --- a/synapse/push/action_generator.py +++ b/synapse/push/action_generator.py @@ -17,6 +17,7 @@ from twisted.internet import defer from .bulk_push_rule_evaluator import evaluator_for_event +from synapse.util.logutils import log_duration from synapse.util.metrics import Measure import logging @@ -39,13 +40,15 @@ class ActionGenerator: @defer.inlineCallbacks def handle_push_actions_for_event(self, event, context): with Measure(self.clock, "handle_push_actions_for_event"): - bulk_evaluator = yield evaluator_for_event( - event, self.hs, self.store - ) - - actions_by_user = yield bulk_evaluator.action_for_event_by_user( - event, context.current_state - ) + with log_duration("evaluator_for_event"): + bulk_evaluator = yield evaluator_for_event( + event, self.hs, self.store + ) + + with log_duration("action_for_event_by_user"): + actions_by_user = yield bulk_evaluator.action_for_event_by_user( + event, context.current_state + ) context.push_actions = [ (uid, actions) for uid, actions in actions_by_user.items() diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py index 25f2fb9da4..bdffc3c90f 100644 --- a/synapse/push/bulk_push_rule_evaluator.py +++ b/synapse/push/bulk_push_rule_evaluator.py @@ -23,6 +23,7 @@ from .push_rule_evaluator import PushRuleEvaluatorForEvent from synapse.api.constants import EventTypes from synapse.visibility import filter_events_for_clients +from synapse.util.logutils import log_duration logger = logging.getLogger(__name__) @@ -37,36 +38,39 @@ def decode_rule_json(rule): @defer.inlineCallbacks def _get_rules(room_id, user_ids, store): - rules_by_user = yield store.bulk_get_push_rules(user_ids) - rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) + with log_duration("bulk_get_push_rules"): + rules_by_user = yield store.bulk_get_push_rules(user_ids) + with log_duration("bulk_get_push_rules_enabled"): + rules_enabled_by_user = yield store.bulk_get_push_rules_enabled(user_ids) rules_by_user = {k: v for k, v in rules_by_user.items() if v is not None} - rules_by_user = { - uid: list_with_base_rules([ - decode_rule_json(rule_list) - for rule_list in rules_by_user.get(uid, []) - ]) - for uid in user_ids - } + with log_duration("list_with_base_rules"): + rules_by_user = { + uid: list_with_base_rules([ + decode_rule_json(rule_list) + for rule_list in rules_by_user.get(uid, []) + ]) + for uid in user_ids + } # We apply the rules-enabled map here: bulk_get_push_rules doesn't # fetch disabled rules, but this won't account for any server default # rules the user has disabled, so we need to do this too. - for uid in user_ids: - user_enabled_map = rules_enabled_by_user.get(uid) - if not user_enabled_map: - continue - - for i, rule in enumerate(rules_by_user[uid]): - rule_id = rule['rule_id'] + with log_duration("apply_the_rules_enabled"): + for uid in user_ids: + user_enabled_map = rules_enabled_by_user.get(uid) + if not user_enabled_map: + continue - if rule_id in user_enabled_map: - if rule.get('enabled', True) != bool(user_enabled_map[rule_id]): - # Rules are cached across users. - rule = dict(rule) - rule['enabled'] = bool(user_enabled_map[rule_id]) - rules_by_user[uid][i] = rule + for i, rule in enumerate(rules_by_user[uid]): + rule_id = rule['rule_id'] + if rule_id in user_enabled_map: + if rule.get('enabled', True) != bool(user_enabled_map[rule_id]): + # Rules are cached across users. + rule = dict(rule) + rule['enabled'] = bool(user_enabled_map[rule_id]) + rules_by_user[uid][i] = rule defer.returnValue(rules_by_user) @@ -77,36 +81,43 @@ def evaluator_for_event(event, hs, store): # users in the room who have pushers need to get push rules run because # that's how their pushers work - users_with_pushers = yield store.get_users_with_pushers_in_room(room_id) + with log_duration("get_users_with_pushers_in_room"): + users_with_pushers = yield store.get_users_with_pushers_in_room(room_id) # We also will want to generate notifs for other people in the room so # their unread countss are correct in the event stream, but to avoid # generating them for bot / AS users etc, we only do so for people who've # sent a read receipt into the room. - all_in_room = yield store.get_users_in_room(room_id) - all_in_room = set(all_in_room) + with log_duration("get_users_in_room"): + all_in_room = yield store.get_users_in_room(room_id) + with log_duration("all_in_room"): + all_in_room = set(all_in_room) - receipts = yield store.get_receipts_for_room(room_id, "m.read") + with log_duration("get_receipts_for_room"): + receipts = yield store.get_receipts_for_room(room_id, "m.read") # any users with pushers must be ours: they have pushers - user_ids = set(users_with_pushers) - for r in receipts: - if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room: - user_ids.add(r['user_id']) + with log_duration("get_mine_pushers"): + user_ids = set(users_with_pushers) + for r in receipts: + if hs.is_mine_id(r['user_id']) and r['user_id'] in all_in_room: + user_ids.add(r['user_id']) # if this event is an invite event, we may need to run rules for the user # who's been invited, otherwise they won't get told they've been invited - if event.type == 'm.room.member' and event.content['membership'] == 'invite': - invited_user = event.state_key - if invited_user and hs.is_mine_id(invited_user): - has_pusher = yield store.user_has_pusher(invited_user) - if has_pusher: - user_ids.add(invited_user) + with log_duration("add_invite"): + if event.type == 'm.room.member' and event.content['membership'] == 'invite': + invited_user = event.state_key + if invited_user and hs.is_mine_id(invited_user): + has_pusher = yield store.user_has_pusher(invited_user) + if has_pusher: + user_ids.add(invited_user) user_ids = list(user_ids) - rules_by_user = yield _get_rules(room_id, user_ids, store) + with log_duration("_get_rules"): + rules_by_user = yield _get_rules(room_id, user_ids, store) defer.returnValue(BulkPushRuleEvaluator( room_id, rules_by_user, user_ids, store diff --git a/synapse/util/logutils.py b/synapse/util/logutils.py index 3a83828d25..5324c2815b 100644 --- a/synapse/util/logutils.py +++ b/synapse/util/logutils.py @@ -21,10 +21,23 @@ import logging import inspect import time +from contextlib import contextmanager + _TIME_FUNC_ID = 0 +logger = logging.getLogger(__name__) + + +@contextmanager +def log_duration(name): + start = time.time() * 1000 + yield + end = time.time() * 1000 + logger.info("Timings: %s tooke %dms", name, int(end - start)) + + def _log_debug_as_f(f, msg, msg_args): name = f.__module__ logger = logging.getLogger(name) |