summary refs log tree commit diff
path: root/scripts-dev/storage_inheritance.py
blob: ec06d886dfdc435f3505baddc7aa989b72497985 (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#! /usr/bin/env python3
import argparse
import os
import re
import subprocess
import sys
import tempfile
from typing import Iterable, Optional, Set

import networkx


def scrape_storage_classes() -> str:
    """Grep the for classes ending with "Store" and extract their list of parents.

    Returns the stdout from `rg` as a single string."""

    # TODO: this is a big hack which assumes that each Store class has a unique name.
    #   That assumption is wrong: there are two DirectoryStores, one in
    #   synapse/replication/slave/storage/directory.py and the other in
    #   synapse/storage/databases/main/directory.py
    #   Would be nice to have a way to account for this.

    return subprocess.check_output(
        [
            "rg",
            "-o",
            "--no-line-number",
            "--no-filename",
            "--multiline",
            r"class .*Store\((.|\n)*?\):$",
            "synapse",
            "tests",
        ],
        cwd="/home/dmr/workspace/synapse/",
    ).decode()


oneline_class_pattern = re.compile(r"^class (.*)\((.*)\):$")
opening_class_pattern = re.compile(r"^class (.*)\($")


def load_graph(lines: Iterable[str]) -> networkx.DiGraph:
    """Process the output of scrape_storage_classes to build an inheritance graph.

    Every time a class C is created that explicitly inherits from a parent P, we add an
    edge C -> P.
    """
    G = networkx.DiGraph()
    child: Optional[str] = None

    for line in lines:
        line = line.strip()
        if not line or line.startswith("#"):
            continue
        if (match := oneline_class_pattern.match(line)) is not None:
            child, parents = match.groups()
            for parent in parents.split(", "):
                if "metaclass" not in parent:
                    G.add_edge(child, parent)

            child = None
        elif (match := opening_class_pattern.match(line)) is not None:
            (child,) = match.groups()
        elif line == "):":
            child = None
        else:
            assert child is not None, repr(line)
            parent = line.strip(",")
            if "metaclass" not in parent:
                G.add_edge(child, parent)

    return G


def select_vertices_of_interest(G: networkx.DiGraph, target: Optional[str]) -> Set[str]:
    """Find all nodes we want to visualise.

    If no TARGET is given, we visualise all of G. Otherwise we visualise a given
    TARGET, its parents, and all of their parents recursively.

    Requires that G is a DAG.
    If not None, the TARGET must belong to G.
    """
    assert networkx.is_directed_acyclic_graph(G)
    if target is not None:
        component: Set[str] = networkx.descendants(G, target)
        component.add(target)
    else:
        component = set(G.nodes)
    return component


def generate_dot_source(G: networkx.DiGraph, nodes: Set[str]) -> str:
    output = """\
strict digraph {
    rankdir="LR";
    node [shape=box];

"""
    for (child, parent) in G.edges:
        if child in nodes and parent in nodes:
            output += f"   {child} -> {parent};\n"
    output += "}\n"
    return output


def render_png(dot_source: str, destination: Optional[str]) -> str:
    if destination is None:
        handle, destination = tempfile.mkstemp()
        os.close(handle)
        print("Warning: writing to", destination, "which will persist", file=sys.stderr)

    subprocess.run(
        [
            "dot",
            "-o",
            destination,
            "-Tpng",
        ],
        input=dot_source,
        encoding="utf-8",
        check=True,
    )
    return destination


def show_graph(location: str) -> None:
    subprocess.run(
        ["xdg-open", location],
        check=True,
    )


def main(parser: argparse.ArgumentParser, args: argparse.Namespace) -> int:
    if not (args.output or args.show):
        parser.print_help(file=sys.stderr)
        print("Must either --output or --show, or both.", file=sys.stderr)
        return os.EX_USAGE

    lines = scrape_storage_classes().split("\n")
    G = load_graph(lines)
    nodes = select_vertices_of_interest(G, args.target)
    dot_source = generate_dot_source(G, nodes)
    output_location = render_png(dot_source, args.output)
    if args.show:
        show_graph(output_location)
    return os.EX_OK


def build_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="Visualise the inheritance of Synapse's storage classes. Requires "
        "ripgrep (https://github.com/BurntSushi/ripgrep) as 'rg'; graphviz "
        "(https://graphviz.org/) for the 'dot' program; and networkx "
        "(https://networkx.org/). Requires Python 3.8+ for the walrus"
        "operator."
    )
    parser.add_argument(
        "target",
        nargs="?",
        help="Show only TARGET and its ancestors. Otherwise, show the entire hierarchy.",
    )
    parser.add_argument(
        "--output",
        nargs=1,
        help="Render inheritance graph to a png file.",
    )
    parser.add_argument(
        "--show",
        action="store_true",
        help="Open the inheritance graph in an image viewer.",
    )
    return parser


if __name__ == "__main__":
    parser = build_parser()
    args = parser.parse_args()
    sys.exit(main(parser, args))