summary refs log tree commit diff
path: root/scripts-dev
diff options
context:
space:
mode:
authorMark Haines <mjark@negativecurvature.net>2016-03-01 15:08:24 +0000
committerMark Haines <mjark@negativecurvature.net>2016-03-01 15:08:24 +0000
commita612ce66597f2d3837c468803044e0400e385fe6 (patch)
tree0b5734ff804d1b7e24ea0039783695055f49f556 /scripts-dev
parentMerge pull request #613 from matrix-org/markjh/yield (diff)
parentAdd a /replication API for extracting the updates that happened on (diff)
downloadsynapse-a612ce66597f2d3837c468803044e0400e385fe6.tar.xz
Merge pull request #489 from matrix-org/markjh/replication
Add a /replication API for extracting the updates that happened on synapse.
Diffstat (limited to 'scripts-dev')
-rw-r--r--scripts-dev/tail-synapse.py67
1 files changed, 67 insertions, 0 deletions
diff --git a/scripts-dev/tail-synapse.py b/scripts-dev/tail-synapse.py
new file mode 100644
index 0000000000..18be711e92
--- /dev/null
+++ b/scripts-dev/tail-synapse.py
@@ -0,0 +1,67 @@
+import requests
+import collections
+import sys
+import time
+import json
+
+Entry = collections.namedtuple("Entry", "name position rows")
+
+ROW_TYPES = {}
+
+
+def row_type_for_columns(name, column_names):
+    column_names = tuple(column_names)
+    row_type = ROW_TYPES.get((name, column_names))
+    if row_type is None:
+        row_type = collections.namedtuple(name, column_names)
+        ROW_TYPES[(name, column_names)] = row_type
+    return row_type
+
+
+def parse_response(content):
+    streams = json.loads(content)
+    result = {}
+    for name, value in streams.items():
+        row_type = row_type_for_columns(name, value["field_names"])
+        position = value["position"]
+        rows = [row_type(*row) for row in value["rows"]]
+        result[name] = Entry(name, position, rows)
+    return result
+
+
+def replicate(server, streams):
+    return parse_response(requests.get(
+        server + "/_synapse/replication",
+        verify=False,
+        params=streams
+    ).content)
+
+
+def main():
+    server = sys.argv[1]
+
+    streams = None
+    while not streams:
+        try:
+            streams = {
+                row.name: row.position
+                for row in replicate(server, {"streams":"-1"})["streams"].rows
+            }
+        except requests.exceptions.ConnectionError as e:
+            time.sleep(0.1)
+
+    while True:
+        try:
+            results = replicate(server, streams)
+        except:
+            sys.stdout.write("connection_lost("+ repr(streams) + ")\n")
+            break
+        for update in results.values():
+            for row in update.rows:
+                sys.stdout.write(repr(row) + "\n")
+            streams[update.name] = update.position
+
+
+
+if __name__=='__main__':
+    main()