summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--scripts/graph_tracer.py116
-rwxr-xr-xsynapse/app/homeserver.py10
-rw-r--r--synapse/handlers/federation.py1
-rw-r--r--synapse/rest/client/v1/room.py12
-rw-r--r--synapse/util/traceutil.py280
5 files changed, 417 insertions, 2 deletions
diff --git a/scripts/graph_tracer.py b/scripts/graph_tracer.py
new file mode 100644
index 0000000000..ba6b62389c
--- /dev/null
+++ b/scripts/graph_tracer.py
@@ -0,0 +1,116 @@
+import fileinput
+import pydot
+
+nodes = {}
+edges = set()
+
+graph = pydot.Dot(graph_name="call_graph", graph_type="digraph")
+
+names = {}
+times = {}
+deferreds = {}
+deferred_edges = set()
+
+root_id = None
+
+for line in fileinput.input():
+    try:
+        if " calls " in line:
+            start, end = line.split(" calls ")
+            start, end = start.strip(), end.strip()
+            edges.add((start, end))
+            print start, end
+        if " named " in line:
+            node_id, name = line.split(" named ")
+            names[node_id.strip()] = name.strip()
+
+            if name.strip() == "Deferred synapse.rest.client.v1.room.RoomSendEventRestServlet.on_PUT":
+                root_id = node_id
+        if " in " in line:
+            node_id, d = line.split(" in ")
+            deferreds[node_id.strip()] = d.strip()
+        if " time " in line:
+            node_id, ms = line.split(" time ")
+            times[node_id.strip()] = int(ms.strip())
+        if " waits on " in line:
+            start, end = line.split(" waits on ")
+            start, end = start.strip(), end.strip()
+            deferred_edges.add((start, end))
+            print start, end
+    except Exception as e:
+        print "failed %s to parse '%s'" % (e.message, line)
+
+
+# deferreds_root = set(deferreds.values())
+# for parent, child in deferred_edges:
+#     deferreds_root.discard(child)
+#
+# deferred_tree = {
+#     d: {}
+#     for d in deferreds_root
+# }
+#
+# def populate(root, tree):
+#     for leaf in deferred_edges.get(root, []):
+#         populate(leaf, tree.setdefault(leaf, {}))
+#
+#
+# for d in deferreds_root:
+#     tree = deferred_tree.setdefault(d, {})
+#     populate(d, tree)
+
+print deferred_edges
+print root_id
+
+def is_in_deferred(d):
+    while True:
+        if d == root_id:
+            return True
+
+        for start, end in deferred_edges:
+            if d == end:
+                d = start
+                break
+        else:
+            return False
+
+for node_id, name in names.items():
+    # if times.get(node_id, 100) < 5:
+    #     continue
+
+    if node_id in deferreds:
+        if not is_in_deferred(deferreds[node_id]):
+            continue
+    else:
+        if not is_in_deferred(node_id):
+            continue
+
+    node = pydot.Node(node_id, label=name)
+
+    # if node_id in deferreds:
+    #     clusters[deferreds[node_id]].add_node(node)
+    # elif node_id in clusters:
+    #     clusters[node_id].add_node(node)
+    # else:
+    #     graph.add_node(node)
+    graph.add_node(node)
+    nodes[node_id] = node
+
+    # print node_id
+
+for parent, child in edges:
+    if child not in nodes:
+        print child, "not a node"
+        continue
+
+    if parent not in nodes:
+        print parent, "not a node"
+        continue
+
+    edge = pydot.Edge(nodes[parent], nodes[child])
+    graph.add_edge(edge)
+
+
+file_prefix = "call_graph_out"
+graph.write('%s.dot' % file_prefix, format='raw', prog='dot')
+graph.write_svg("%s.svg" % file_prefix, prog='dot')
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 3649406efb..c7d7bf2fe1 100755
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -51,6 +51,8 @@ from synapse.rest.client.v2_alpha import ClientV2AlphaRestResource
 from daemonize import Daemonize
 import twisted.manhole.telnet
 
+from synapse.util.traceutil import Tracer
+
 import synapse
 
 import logging
@@ -61,6 +63,7 @@ import subprocess
 import sqlite3
 import syweb
 
+
 logger = logging.getLogger(__name__)
 
 
@@ -399,8 +402,13 @@ class SynapseService(service.Service):
 
 
 def run(hs):
-
     def in_thread():
+        try:
+            tracer = Tracer()
+            sys.settrace(tracer.process)
+        except Exception:
+            logger.exception("Failed to start tracer")
+
         with LoggingContext("run"):
             change_resource_limit(hs.config.soft_file_limit)
 
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ae4e9b316d..14c416fc68 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -73,7 +73,6 @@ class FederationHandler(BaseHandler):
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
 
-    @log_function
     @defer.inlineCallbacks
     def handle_new_event(self, event, destinations):
         """ Takes in an event from the client to server side, that has already
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 0346afb1b4..f5666f99aa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -165,6 +165,12 @@ class RoomStateEventRestServlet(ClientV1RestServlet):
         defer.returnValue((200, {}))
 
 
+def trace(f):
+    f.should_trace = True
+    f.root_trace = True
+    return f
+
+
 # TODO: Needs unit testing for generic events + feedback
 class RoomSendEventRestServlet(ClientV1RestServlet):
 
@@ -175,7 +181,11 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
 
     @defer.inlineCallbacks
     def on_POST(self, request, room_id, event_type, txn_id=None):
+        import inspect
+        frame = inspect.currentframe()
+        logger.info("Frame: %s", id(frame))
         user, client = yield self.auth.get_user_by_req(request)
+        logger.info("Frame: %s", id(inspect.currentframe()))
         content = _parse_json(request)
 
         msg_handler = self.handlers.message_handler
@@ -189,12 +199,14 @@ class RoomSendEventRestServlet(ClientV1RestServlet):
             client=client,
             txn_id=txn_id,
         )
+        logger.info("Frame: %s", id(inspect.currentframe()))
 
         defer.returnValue((200, {"event_id": event.event_id}))
 
     def on_GET(self, request, room_id, event_type, txn_id):
         return (200, "Not implemented")
 
+    @trace
     @defer.inlineCallbacks
     def on_PUT(self, request, room_id, event_type, txn_id):
         try:
diff --git a/synapse/util/traceutil.py b/synapse/util/traceutil.py
new file mode 100644
index 0000000000..e313912801
--- /dev/null
+++ b/synapse/util/traceutil.py
@@ -0,0 +1,280 @@
+# -*- coding: utf-8 -*-
+# Copyright 2015 OpenMarket Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import time
+
+import inspect
+import logging
+
+
+logger = logging.getLogger("Tracer")
+
+
+class Tracer(object):
+
+    def __init__(self):
+        self.interested_deferreds = set()
+
+        self.next_id = 1
+
+        self.deferred_to_current_frames = {}
+
+    def process(self, frame, event, arg):
+        if event == 'call':
+            return self.process_call(frame)
+
+    def handle_inline_callbacks(self, frm):
+        argvalues = inspect.getargvalues(frm)
+        generator = argvalues.locals["g"]
+        deferred = argvalues.locals["deferred"]
+
+        if not hasattr(deferred, "syn_trace_defer_id"):
+            trace_id = self.get_next_id()
+            deferred.syn_trace_defer_id = trace_id
+            logger.info(
+                "%s named Deferred %s",
+                trace_id,
+                self.get_name_for_frame(generator.gi_frame)
+            )
+
+            logger.info("%s start %d", trace_id, int(time.time() * 1000))
+
+            def do(res):
+                logger.info("%s end %d", trace_id, int(time.time() * 1000))
+                return res
+
+            deferred.addBoth(do)
+
+            back = frm.f_back
+            while back:
+                try:
+                    name = self.get_name_for_frame(back)
+                    if name == "twisted.internet.defer._inlineCallbacks":
+                        argvalues = inspect.getargvalues(back)
+                        deferred = argvalues.locals["deferred"]
+
+                        d_id = getattr(deferred, "syn_trace_defer_id", None)
+                        if d_id:
+                            logger.info("%s in %s", trace_id, d_id)
+                            curr_stack = self.deferred_to_current_frames.setdefault(
+                                d_id, []
+                            )
+
+                            if curr_stack:
+                                logger.info("%s calls %s", curr_stack[-1], trace_id)
+                            else:
+                                logger.info("%s calls %s", d_id, trace_id)
+                            break
+
+                except:
+                    pass
+
+                back = back.f_back
+
+    def are_interested(self, name):
+        if not name.startswith("synapse"):
+            return False
+        if name.startswith("synapse.util.logcontext"):
+            return False
+        if name.startswith("synapse.util.logutils"):
+            return False
+        if name.startswith("synapse.util.traceutil"):
+            return False
+        if name.startswith("synapse.events.FrozenEvent.get"):
+            return False
+        if name.startswith("synapse.events.EventBuilder.get"):
+            return False
+        if name.startswith("synapse.types"):
+            return False
+        if name.startswith("synapse.util.frozenutils.freeze"):
+            return False
+        if name.startswith("synapse.util.frozenutils.<dictcomp>"):
+            return False
+        if name.startswith("synapse.util.Clock"):
+            return False
+
+        if name.endswith("__repr__") or name.endswith("__str__"):
+            return False
+        if name.endswith("<genexpr>"):
+            return False
+
+        return True
+
+    def process_call(self, frame):
+        should_trace = False
+
+        try:
+            name = self.get_name_for_frame(frame)
+            if name == "twisted.internet.defer._inlineCallbacks":
+                self.handle_inline_callbacks(frame)
+                return
+
+            if not self.are_interested(name):
+                return
+
+            back_name = self.get_name_for_frame(frame.f_back)
+
+            if name == "synapse.api.auth.Auth.get_user_by_req":
+                logger.info(
+                    "synapse.api.auth.Auth.get_user_by_req %s",
+                    back_name
+                )
+
+            try:
+                if back_name == "twisted.internet.defer._inlineCallbacks":
+                    def ret(f, event, result):
+                        if event != "return":
+                            return
+
+                        argvalues = inspect.getargvalues(frame.f_back)
+                        deferred = argvalues.locals["deferred"]
+
+                        try:
+                            logger.info(
+                                "%s waits on %s",
+                                deferred.syn_trace_defer_id,
+                                result.syn_trace_defer_id
+                            )
+                        except:
+                            pass
+                    return ret
+                if back_name == "twisted.internet.defer.unwindGenerator":
+                    return
+            except:
+                pass
+
+            try:
+                func = getattr(frame.f_locals["self"], frame.f_code.co_name)
+                if inspect.isgeneratorfunction(func):
+                    return
+            except:
+                pass
+
+            try:
+                func = frame.f_globals[frame.f_code.co_name]
+                if inspect.isgeneratorfunction(func):
+                    return
+            except:
+                pass
+        except:
+            return
+
+        back = frame
+        names = []
+
+        seen_deferreds = []
+        bottom_deferred = None
+        while back:
+            try:
+                name = self.get_name_for_frame(back)
+                if name.startswith("synapse"):
+                    names.append(name)
+
+                # if name.startswith("twisted.internet.defer"):
+                #     logger.info("Name: %s", name)
+
+                if name == "twisted.internet.defer._inlineCallbacks":
+                    argvalues = inspect.getargvalues(back)
+                    deferred = argvalues.locals["deferred"]
+
+                    d_id = getattr(deferred, "syn_trace_defer_id", None)
+                    if d_id:
+                        seen_deferreds.append(d_id)
+                        if not bottom_deferred:
+                            bottom_deferred = deferred
+                    if d_id in self.interested_deferreds:
+                        should_trace = True
+                        break
+
+                func = getattr(back.f_locals["self"], back.f_code.co_name)
+
+                if hasattr(func, "should_trace") or hasattr(func.im_func, "should_trace"):
+                    should_trace = True
+                    break
+
+                func.root_trace
+                should_trace = True
+
+                break
+            except:
+                pass
+
+            back = back.f_back
+
+        if not should_trace:
+            return
+
+        frame_id = self.get_next_id()
+        name = self.get_name_for_frame(frame)
+        logger.info("%s named %s", frame_id, name)
+
+        self.interested_deferreds.update(seen_deferreds)
+
+        names.reverse()
+
+        if bottom_deferred:
+            self.deferred_frames.setdefault(
+                bottom_deferred.syn_trace_defer_id, []
+            ).append(names)
+
+            logger.info("%s in %s", frame_id, bottom_deferred.syn_trace_defer_id)
+
+            if not hasattr(bottom_deferred, "syn_trace_registered_cb"):
+                bottom_deferred.syn_trace_registered_cb = True
+
+                def do(res):
+                    return res
+
+                bottom_deferred.addBoth(do)
+
+            curr_stack = self.deferred_to_current_frames.setdefault(
+                bottom_deferred.syn_trace_defer_id, []
+            )
+
+            if curr_stack:
+                logger.info("%s calls %s", curr_stack[-1], frame_id)
+            else:
+                logger.info("%s calls %s", bottom_deferred.syn_trace_defer_id, frame_id)
+
+            curr_stack.append(frame_id)
+
+            logger.info("%s start %d", frame_id, int(time.time() * 1000))
+
+            def p(frame, event, arg):
+                if event == "return":
+                    curr_stack.pop()
+
+                    logger.info("%s end %d", frame_id, int(time.time() * 1000))
+
+            return p
+
+    def get_name_for_frame(self, frame):
+        module_name = frame.f_globals["__name__"]
+        cls_instance = frame.f_locals.get("self", None)
+        if cls_instance:
+            cls_name = cls_instance.__class__.__name__
+            name = "%s.%s.%s" % (
+                module_name, cls_name, frame.f_code.co_name
+            )
+        else:
+            name = "%s.%s" % (
+                module_name, frame.f_code.co_name
+            )
+        return name
+
+    def get_next_id(self):
+        i = self.next_id
+        self.next_id += 1
+        return i