summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2015-03-12 16:49:27 +0000
committerErik Johnston <erik@matrix.org>2015-03-12 16:52:02 +0000
commitd1ae594ae5eef90a9f1d1dc1dc0be3779917f20c (patch)
treed419ce0ccdb5d5b2e29726f326916f67363d268a /synapse
parentFix bug in logging. (diff)
downloadsynapse-d1ae594ae5eef90a9f1d1dc1dc0be3779917f20c.tar.xz
Add a utility class that can be used to generate a twisted deferred aware call graph
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/app/homeserver.py10
-rw-r--r--synapse/handlers/federation.py1
-rw-r--r--synapse/rest/client/v1/room.py12
-rw-r--r--synapse/util/traceutil.py280
4 files changed, 301 insertions, 2 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py

index 3649406efb..c7d7bf2fe1 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py
@@ -51,6 +51,8 @@ from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource from daemonize import Daemonize import twisted.manhole.telnet +from synapse.util.traceutil import Tracer + import synapse import logging @@ -61,6 +63,7 @@ import subprocess import sqlite3 import syweb + logger = logging.getLogger(__name__) @@ -399,8 +402,13 @@ class SynapseService(service.Service): def run(hs): - def in_thread(): + try: + tracer = Tracer() + sys.settrace(tracer.process) + except Exception: + logger.exception("Failed to start tracer") + with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ae4e9b316d..14c416fc68 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py
@@ -73,7 +73,6 @@ class FederationHandler(BaseHandler): # When joining a room we need to queue any events for that room up self.room_queues = {} - @log_function @defer.inlineCallbacks def handle_new_event(self, event, destinations): """ Takes in an event from the client to server side, that has already diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 0346afb1b4..f5666f99aa 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py
@@ -165,6 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet): defer.returnValue((200, {})) +def trace(f): + f.should_trace = True + f.root_trace = True + return f + + # TODO: Needs unit testing for generic events + feedback class RoomSendEventRestServlet(ClientV1RestServlet): @@ -175,7 +181,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, room_id, event_type, txn_id=None): + import inspect + frame = inspect.currentframe() + logger.info("Frame: %s", id(frame)) user, client = yield self.auth.get_user_by_req(request) + logger.info("Frame: %s", id(inspect.currentframe())) content = _parse_json(request) msg_handler = self.handlers.message_handler @@ -189,12 +199,14 @@ class RoomSendEventRestServlet(ClientV1RestServlet): client=client, txn_id=txn_id, ) + logger.info("Frame: %s", id(inspect.currentframe())) defer.returnValue((200, {"event_id": event.event_id})) def on_GET(self, request, room_id, event_type, txn_id): return (200, "Not implemented") + @trace @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, txn_id): try: diff --git a/synapse/util/traceutil.py b/synapse/util/traceutil.py new file mode 100644
index 0000000000..e313912801 --- /dev/null +++ b/synapse/util/traceutil.py
@@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import inspect +import logging + + +logger = logging.getLogger("Tracer") + + +class Tracer(object): + + def __init__(self): + self.interested_deferreds = set() + + self.next_id = 1 + + self.deferred_to_current_frames = {} + + def process(self, frame, event, arg): + if event == 'call': + return self.process_call(frame) + + def handle_inline_callbacks(self, frm): + argvalues = inspect.getargvalues(frm) + generator = argvalues.locals["g"] + deferred = argvalues.locals["deferred"] + + if not hasattr(deferred, "syn_trace_defer_id"): + trace_id = self.get_next_id() + deferred.syn_trace_defer_id = trace_id + logger.info( + "%s named Deferred %s", + trace_id, + self.get_name_for_frame(generator.gi_frame) + ) + + logger.info("%s start %d", trace_id, int(time.time() * 1000)) + + def do(res): + logger.info("%s end %d", trace_id, int(time.time() * 1000)) + return res + + deferred.addBoth(do) + + back = frm.f_back + while back: + try: + name = self.get_name_for_frame(back) + if name == "twisted.internet.defer._inlineCallbacks": + argvalues = inspect.getargvalues(back) + deferred = argvalues.locals["deferred"] + + d_id = getattr(deferred, "syn_trace_defer_id", None) + if d_id: + logger.info("%s in %s", trace_id, d_id) + curr_stack = self.deferred_to_current_frames.setdefault( + d_id, [] + ) + + if curr_stack: + logger.info("%s calls %s", curr_stack[-1], trace_id) + else: + logger.info("%s calls %s", d_id, trace_id) + break + + except: + pass + + back = back.f_back + + def are_interested(self, name): + if not name.startswith("synapse"): + return False + if name.startswith("synapse.util.logcontext"): + return False + if name.startswith("synapse.util.logutils"): + return False + if name.startswith("synapse.util.traceutil"): + return False + if name.startswith("synapse.events.FrozenEvent.get"): + return False + if name.startswith("synapse.events.EventBuilder.get"): + return False + if name.startswith("synapse.types"): + return False + if name.startswith("synapse.util.frozenutils.freeze"): + return False + if name.startswith("synapse.util.frozenutils.<dictcomp>"): + return False + if name.startswith("synapse.util.Clock"): + return False + + if name.endswith("__repr__") or name.endswith("__str__"): + return False + if name.endswith("<genexpr>"): + return False + + return True + + def process_call(self, frame): + should_trace = False + + try: + name = self.get_name_for_frame(frame) + if name == "twisted.internet.defer._inlineCallbacks": + self.handle_inline_callbacks(frame) + return + + if not self.are_interested(name): + return + + back_name = self.get_name_for_frame(frame.f_back) + + if name == "synapse.api.auth.Auth.get_user_by_req": + logger.info( + "synapse.api.auth.Auth.get_user_by_req %s", + back_name + ) + + try: + if back_name == "twisted.internet.defer._inlineCallbacks": + def ret(f, event, result): + if event != "return": + return + + argvalues = inspect.getargvalues(frame.f_back) + deferred = argvalues.locals["deferred"] + + try: + logger.info( + "%s waits on %s", + deferred.syn_trace_defer_id, + result.syn_trace_defer_id + ) + except: + pass + return ret + if back_name == "twisted.internet.defer.unwindGenerator": + return + except: + pass + + try: + func = getattr(frame.f_locals["self"], frame.f_code.co_name) + if inspect.isgeneratorfunction(func): + return + except: + pass + + try: + func = frame.f_globals[frame.f_code.co_name] + if inspect.isgeneratorfunction(func): + return + except: + pass + except: + return + + back = frame + names = [] + + seen_deferreds = [] + bottom_deferred = None + while back: + try: + name = self.get_name_for_frame(back) + if name.startswith("synapse"): + names.append(name) + + # if name.startswith("twisted.internet.defer"): + # logger.info("Name: %s", name) + + if name == "twisted.internet.defer._inlineCallbacks": + argvalues = inspect.getargvalues(back) + deferred = argvalues.locals["deferred"] + + d_id = getattr(deferred, "syn_trace_defer_id", None) + if d_id: + seen_deferreds.append(d_id) + if not bottom_deferred: + bottom_deferred = deferred + if d_id in self.interested_deferreds: + should_trace = True + break + + func = getattr(back.f_locals["self"], back.f_code.co_name) + + if hasattr(func, "should_trace") or hasattr(func.im_func, "should_trace"): + should_trace = True + break + + func.root_trace + should_trace = True + + break + except: + pass + + back = back.f_back + + if not should_trace: + return + + frame_id = self.get_next_id() + name = self.get_name_for_frame(frame) + logger.info("%s named %s", frame_id, name) + + self.interested_deferreds.update(seen_deferreds) + + names.reverse() + + if bottom_deferred: + self.deferred_frames.setdefault( + bottom_deferred.syn_trace_defer_id, [] + ).append(names) + + logger.info("%s in %s", frame_id, bottom_deferred.syn_trace_defer_id) + + if not hasattr(bottom_deferred, "syn_trace_registered_cb"): + bottom_deferred.syn_trace_registered_cb = True + + def do(res): + return res + + bottom_deferred.addBoth(do) + + curr_stack = self.deferred_to_current_frames.setdefault( + bottom_deferred.syn_trace_defer_id, [] + ) + + if curr_stack: + logger.info("%s calls %s", curr_stack[-1], frame_id) + else: + logger.info("%s calls %s", bottom_deferred.syn_trace_defer_id, frame_id) + + curr_stack.append(frame_id) + + logger.info("%s start %d", frame_id, int(time.time() * 1000)) + + def p(frame, event, arg): + if event == "return": + curr_stack.pop() + + logger.info("%s end %d", frame_id, int(time.time() * 1000)) + + return p + + def get_name_for_frame(self, frame): + module_name = frame.f_globals["__name__"] + cls_instance = frame.f_locals.get("self", None) + if cls_instance: + cls_name = cls_instance.__class__.__name__ + name = "%s.%s.%s" % ( + module_name, cls_name, frame.f_code.co_name + ) + else: + name = "%s.%s" % ( + module_name, frame.f_code.co_name + ) + return name + + def get_next_id(self): + i = self.next_id + self.next_id += 1 + return i