diff options
-rw-r--r-- | scripts/graph_tracer.py | 116 | ||||
-rwxr-xr-x | synapse/app/homeserver.py | 10 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 1 | ||||
-rw-r--r-- | synapse/rest/client/v1/room.py | 12 | ||||
-rw-r--r-- | synapse/util/traceutil.py | 280 |
5 files changed, 417 insertions, 2 deletions
diff --git a/scripts/graph_tracer.py b/scripts/graph_tracer.py new file mode 100644 index 0000000000..ba6b62389c --- /dev/null +++ b/scripts/graph_tracer.py @@ -0,0 +1,116 @@ +import fileinput +import pydot + +nodes = {} +edges = set() + +graph = pydot.Dot(graph_name="call_graph", graph_type="digraph") + +names = {} +times = {} +deferreds = {} +deferred_edges = set() + +root_id = None + +for line in fileinput.input(): + try: + if " calls " in line: + start, end = line.split(" calls ") + start, end = start.strip(), end.strip() + edges.add((start, end)) + print start, end + if " named " in line: + node_id, name = line.split(" named ") + names[node_id.strip()] = name.strip() + + if name.strip() == "Deferred synapse.rest.client.v1.room.RoomSendEventRestServlet.on_PUT": + root_id = node_id + if " in " in line: + node_id, d = line.split(" in ") + deferreds[node_id.strip()] = d.strip() + if " time " in line: + node_id, ms = line.split(" time ") + times[node_id.strip()] = int(ms.strip()) + if " waits on " in line: + start, end = line.split(" waits on ") + start, end = start.strip(), end.strip() + deferred_edges.add((start, end)) + print start, end + except Exception as e: + print "failed %s to parse '%s'" % (e.message, line) + + +# deferreds_root = set(deferreds.values()) +# for parent, child in deferred_edges: +# deferreds_root.discard(child) +# +# deferred_tree = { +# d: {} +# for d in deferreds_root +# } +# +# def populate(root, tree): +# for leaf in deferred_edges.get(root, []): +# populate(leaf, tree.setdefault(leaf, {})) +# +# +# for d in deferreds_root: +# tree = deferred_tree.setdefault(d, {}) +# populate(d, tree) + +print deferred_edges +print root_id + +def is_in_deferred(d): + while True: + if d == root_id: + return True + + for start, end in deferred_edges: + if d == end: + d = start + break + else: + return False + +for node_id, name in names.items(): + # if times.get(node_id, 100) < 5: + # continue + + if node_id in deferreds: + if not is_in_deferred(deferreds[node_id]): + continue + else: + if not is_in_deferred(node_id): + continue + + node = pydot.Node(node_id, label=name) + + # if node_id in deferreds: + # clusters[deferreds[node_id]].add_node(node) + # elif node_id in clusters: + # clusters[node_id].add_node(node) + # else: + # graph.add_node(node) + graph.add_node(node) + nodes[node_id] = node + + # print node_id + +for parent, child in edges: + if child not in nodes: + print child, "not a node" + continue + + if parent not in nodes: + print parent, "not a node" + continue + + edge = pydot.Edge(nodes[parent], nodes[child]) + graph.add_edge(edge) + + +file_prefix = "call_graph_out" +graph.write('%s.dot' % file_prefix, format='raw', prog='dot') +graph.write_svg("%s.svg" % file_prefix, prog='dot') diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py index 3649406efb..c7d7bf2fe1 100755 --- a/synapse/app/homeserver.py +++ b/synapse/app/homeserver.py @@ -51,6 +51,8 @@ from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource from daemonize import Daemonize import twisted.manhole.telnet +from synapse.util.traceutil import Tracer + import synapse import logging @@ -61,6 +63,7 @@ import subprocess import sqlite3 import syweb + logger = logging.getLogger(__name__) @@ -399,8 +402,13 @@ class SynapseService(service.Service): def run(hs): - def in_thread(): + try: + tracer = Tracer() + sys.settrace(tracer.process) + except Exception: + logger.exception("Failed to start tracer") + with LoggingContext("run"): change_resource_limit(hs.config.soft_file_limit) diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index ae4e9b316d..14c416fc68 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -73,7 +73,6 @@ class FederationHandler(BaseHandler): # When joining a room we need to queue any events for that room up self.room_queues = {} - @log_function @defer.inlineCallbacks def handle_new_event(self, event, destinations): """ Takes in an event from the client to server side, that has already diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 0346afb1b4..f5666f99aa 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -165,6 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet): defer.returnValue((200, {})) +def trace(f): + f.should_trace = True + f.root_trace = True + return f + + # TODO: Needs unit testing for generic events + feedback class RoomSendEventRestServlet(ClientV1RestServlet): @@ -175,7 +181,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet): @defer.inlineCallbacks def on_POST(self, request, room_id, event_type, txn_id=None): + import inspect + frame = inspect.currentframe() + logger.info("Frame: %s", id(frame)) user, client = yield self.auth.get_user_by_req(request) + logger.info("Frame: %s", id(inspect.currentframe())) content = _parse_json(request) msg_handler = self.handlers.message_handler @@ -189,12 +199,14 @@ class RoomSendEventRestServlet(ClientV1RestServlet): client=client, txn_id=txn_id, ) + logger.info("Frame: %s", id(inspect.currentframe())) defer.returnValue((200, {"event_id": event.event_id})) def on_GET(self, request, room_id, event_type, txn_id): return (200, "Not implemented") + @trace @defer.inlineCallbacks def on_PUT(self, request, room_id, event_type, txn_id): try: diff --git a/synapse/util/traceutil.py b/synapse/util/traceutil.py new file mode 100644 index 0000000000..e313912801 --- /dev/null +++ b/synapse/util/traceutil.py @@ -0,0 +1,280 @@ +# -*- coding: utf-8 -*- +# Copyright 2015 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import time + +import inspect +import logging + + +logger = logging.getLogger("Tracer") + + +class Tracer(object): + + def __init__(self): + self.interested_deferreds = set() + + self.next_id = 1 + + self.deferred_to_current_frames = {} + + def process(self, frame, event, arg): + if event == 'call': + return self.process_call(frame) + + def handle_inline_callbacks(self, frm): + argvalues = inspect.getargvalues(frm) + generator = argvalues.locals["g"] + deferred = argvalues.locals["deferred"] + + if not hasattr(deferred, "syn_trace_defer_id"): + trace_id = self.get_next_id() + deferred.syn_trace_defer_id = trace_id + logger.info( + "%s named Deferred %s", + trace_id, + self.get_name_for_frame(generator.gi_frame) + ) + + logger.info("%s start %d", trace_id, int(time.time() * 1000)) + + def do(res): + logger.info("%s end %d", trace_id, int(time.time() * 1000)) + return res + + deferred.addBoth(do) + + back = frm.f_back + while back: + try: + name = self.get_name_for_frame(back) + if name == "twisted.internet.defer._inlineCallbacks": + argvalues = inspect.getargvalues(back) + deferred = argvalues.locals["deferred"] + + d_id = getattr(deferred, "syn_trace_defer_id", None) + if d_id: + logger.info("%s in %s", trace_id, d_id) + curr_stack = self.deferred_to_current_frames.setdefault( + d_id, [] + ) + + if curr_stack: + logger.info("%s calls %s", curr_stack[-1], trace_id) + else: + logger.info("%s calls %s", d_id, trace_id) + break + + except: + pass + + back = back.f_back + + def are_interested(self, name): + if not name.startswith("synapse"): + return False + if name.startswith("synapse.util.logcontext"): + return False + if name.startswith("synapse.util.logutils"): + return False + if name.startswith("synapse.util.traceutil"): + return False + if name.startswith("synapse.events.FrozenEvent.get"): + return False + if name.startswith("synapse.events.EventBuilder.get"): + return False + if name.startswith("synapse.types"): + return False + if name.startswith("synapse.util.frozenutils.freeze"): + return False + if name.startswith("synapse.util.frozenutils.<dictcomp>"): + return False + if name.startswith("synapse.util.Clock"): + return False + + if name.endswith("__repr__") or name.endswith("__str__"): + return False + if name.endswith("<genexpr>"): + return False + + return True + + def process_call(self, frame): + should_trace = False + + try: + name = self.get_name_for_frame(frame) + if name == "twisted.internet.defer._inlineCallbacks": + self.handle_inline_callbacks(frame) + return + + if not self.are_interested(name): + return + + back_name = self.get_name_for_frame(frame.f_back) + + if name == "synapse.api.auth.Auth.get_user_by_req": + logger.info( + "synapse.api.auth.Auth.get_user_by_req %s", + back_name + ) + + try: + if back_name == "twisted.internet.defer._inlineCallbacks": + def ret(f, event, result): + if event != "return": + return + + argvalues = inspect.getargvalues(frame.f_back) + deferred = argvalues.locals["deferred"] + + try: + logger.info( + "%s waits on %s", + deferred.syn_trace_defer_id, + result.syn_trace_defer_id + ) + except: + pass + return ret + if back_name == "twisted.internet.defer.unwindGenerator": + return + except: + pass + + try: + func = getattr(frame.f_locals["self"], frame.f_code.co_name) + if inspect.isgeneratorfunction(func): + return + except: + pass + + try: + func = frame.f_globals[frame.f_code.co_name] + if inspect.isgeneratorfunction(func): + return + except: + pass + except: + return + + back = frame + names = [] + + seen_deferreds = [] + bottom_deferred = None + while back: + try: + name = self.get_name_for_frame(back) + if name.startswith("synapse"): + names.append(name) + + # if name.startswith("twisted.internet.defer"): + # logger.info("Name: %s", name) + + if name == "twisted.internet.defer._inlineCallbacks": + argvalues = inspect.getargvalues(back) + deferred = argvalues.locals["deferred"] + + d_id = getattr(deferred, "syn_trace_defer_id", None) + if d_id: + seen_deferreds.append(d_id) + if not bottom_deferred: + bottom_deferred = deferred + if d_id in self.interested_deferreds: + should_trace = True + break + + func = getattr(back.f_locals["self"], back.f_code.co_name) + + if hasattr(func, "should_trace") or hasattr(func.im_func, "should_trace"): + should_trace = True + break + + func.root_trace + should_trace = True + + break + except: + pass + + back = back.f_back + + if not should_trace: + return + + frame_id = self.get_next_id() + name = self.get_name_for_frame(frame) + logger.info("%s named %s", frame_id, name) + + self.interested_deferreds.update(seen_deferreds) + + names.reverse() + + if bottom_deferred: + self.deferred_frames.setdefault( + bottom_deferred.syn_trace_defer_id, [] + ).append(names) + + logger.info("%s in %s", frame_id, bottom_deferred.syn_trace_defer_id) + + if not hasattr(bottom_deferred, "syn_trace_registered_cb"): + bottom_deferred.syn_trace_registered_cb = True + + def do(res): + return res + + bottom_deferred.addBoth(do) + + curr_stack = self.deferred_to_current_frames.setdefault( + bottom_deferred.syn_trace_defer_id, [] + ) + + if curr_stack: + logger.info("%s calls %s", curr_stack[-1], frame_id) + else: + logger.info("%s calls %s", bottom_deferred.syn_trace_defer_id, frame_id) + + curr_stack.append(frame_id) + + logger.info("%s start %d", frame_id, int(time.time() * 1000)) + + def p(frame, event, arg): + if event == "return": + curr_stack.pop() + + logger.info("%s end %d", frame_id, int(time.time() * 1000)) + + return p + + def get_name_for_frame(self, frame): + module_name = frame.f_globals["__name__"] + cls_instance = frame.f_locals.get("self", None) + if cls_instance: + cls_name = cls_instance.__class__.__name__ + name = "%s.%s.%s" % ( + module_name, cls_name, frame.f_code.co_name + ) + else: + name = "%s.%s" % ( + module_name, frame.f_code.co_name + ) + return name + + def get_next_id(self): + i = self.next_id + self.next_id += 1 + return i |