summary refs log tree commit diff
path: root/scripts-dev/tail-synapse.py
blob: 7c9985d9f07f7fd5b82c281a0978dc24f6c64caa (plain) (blame)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
import collections
import json
import sys
import time

import requests

Entry = collections.namedtuple("Entry", "name position rows")

ROW_TYPES = {}


def row_type_for_columns(name, column_names):
    column_names = tuple(column_names)
    row_type = ROW_TYPES.get((name, column_names))
    if row_type is None:
        row_type = collections.namedtuple(name, column_names)
        ROW_TYPES[(name, column_names)] = row_type
    return row_type


def parse_response(content):
    streams = json.loads(content)
    result = {}
    for name, value in streams.items():
        row_type = row_type_for_columns(name, value["field_names"])
        position = value["position"]
        rows = [row_type(*row) for row in value["rows"]]
        result[name] = Entry(name, position, rows)
    return result


def replicate(server, streams):
    return parse_response(
        requests.get(
            server + "/_synapse/replication", verify=False, params=streams
        ).content
    )


def main():
    server = sys.argv[1]

    streams = None
    while not streams:
        try:
            streams = {
                row.name: row.position
                for row in replicate(server, {"streams": "-1"})["streams"].rows
            }
        except requests.exceptions.ConnectionError:
            time.sleep(0.1)

    while True:
        try:
            results = replicate(server, streams)
        except Exception:
            sys.stdout.write("connection_lost(" + repr(streams) + ")\n")
            break
        for update in results.values():
            for row in update.rows:
                sys.stdout.write(repr(row) + "\n")
            streams[update.name] = update.position


if __name__ == '__main__':
    main()