1 files changed, 67 insertions, 0 deletions
diff --git a/scripts-dev/tail-synapse.py b/scripts-dev/tail-synapse.py
new file mode 100644
index 0000000000..18be711e92
--- /dev/null
+++ b/scripts-dev/tail-synapse.py
@@ -0,0 +1,67 @@
+import requests
+import collections
+import sys
+import time
+import json
+
+Entry = collections.namedtuple("Entry", "name position rows")
+
+ROW_TYPES = {}
+
+
+def row_type_for_columns(name, column_names):
+ column_names = tuple(column_names)
+ row_type = ROW_TYPES.get((name, column_names))
+ if row_type is None:
+ row_type = collections.namedtuple(name, column_names)
+ ROW_TYPES[(name, column_names)] = row_type
+ return row_type
+
+
+def parse_response(content):
+ streams = json.loads(content)
+ result = {}
+ for name, value in streams.items():
+ row_type = row_type_for_columns(name, value["field_names"])
+ position = value["position"]
+ rows = [row_type(*row) for row in value["rows"]]
+ result[name] = Entry(name, position, rows)
+ return result
+
+
+def replicate(server, streams):
+ return parse_response(requests.get(
+ server + "/_synapse/replication",
+ verify=False,
+ params=streams
+ ).content)
+
+
+def main():
+ server = sys.argv[1]
+
+ streams = None
+ while not streams:
+ try:
+ streams = {
+ row.name: row.position
+ for row in replicate(server, {"streams":"-1"})["streams"].rows
+ }
+ except requests.exceptions.ConnectionError as e:
+ time.sleep(0.1)
+
+ while True:
+ try:
+ results = replicate(server, streams)
+ except:
+ sys.stdout.write("connection_lost("+ repr(streams) + ")\n")
+ break
+ for update in results.values():
+ for row in update.rows:
+ sys.stdout.write(repr(row) + "\n")
+ streams[update.name] = update.position
+
+
+
+if __name__=='__main__':
+ main()
|