summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--scripts/port_from_sqlite_to_postgres.py158
1 files changed, 120 insertions, 38 deletions
diff --git a/scripts/port_from_sqlite_to_postgres.py b/scripts/port_from_sqlite_to_postgres.py
index f4b6ed0681..19e35bf806 100644
--- a/scripts/port_from_sqlite_to_postgres.py
+++ b/scripts/port_from_sqlite_to_postgres.py
@@ -122,8 +122,8 @@ class Store(object):
 
         return self.db_pool.runWithConnection(r)
 
-    def execute(self, f):
-        return self.runInteraction(f.__name__, f)
+    def execute(self, f, *args, **kwargs):
+        return self.runInteraction(f.__name__, f, *args, **kwargs)
 
     def insert_many_txn(self, txn, table, headers, rows):
         sql = "INSERT INTO %s (%s) VALUES (%s)" % (
@@ -347,9 +347,118 @@ class Porter(object):
     def __init__(self, **kwargs):
         self.__dict__.update(kwargs)
 
+    def convert_rows(self, table, headers, rows):
+        bool_col_names = BOOLEAN_COLUMNS.get(table, [])
+
+        bool_cols = [
+            i for i, h in enumerate(headers) if h in bool_col_names
+        ]
+
+        def conv(j, col):
+            if j in bool_cols:
+                return bool(col)
+            return col
+
+        for i, row in enumerate(rows):
+            rows[i] = tuple(
+                self.postgres_store.database_engine.encode_parameter(
+                    conv(j, col)
+                )
+                for j, col in enumerate(row)
+                if j > 0
+            )
+
     @defer.inlineCallbacks
     def handle_table(self, table):
-        if table in APPEND_ONLY_TABLES:
+        def delete_all(txn):
+            txn.execute(
+                "DELETE FROM port_from_sqlite3 WHERE table_name = %s",
+                (table,)
+            )
+            txn.execute("TRUNCATE %s CASCADE" % (table,))
+
+        def get_table_size(txn):
+            txn.execute("SELECT count(*) FROM %s" % (table,))
+            size, = txn.fetchone()
+            return int(size)
+
+        if table == "sent_transactions":
+            # This is a big table, and we really only need some of the recent
+            # data
+            yield self.postgres_store.execute(delete_all)
+
+            # Only save things from the last day
+            yesterday = 1429114568820 #int(time.time()*1000) - 86400000
+
+            # And save the max transaction id from each destination
+            select = (
+                "SELECT rowid, * FROM sent_transactions WHERE rowid IN ("
+                "SELECT max(rowid) FROM sent_transactions"
+                " GROUP BY destination"
+                ")"
+            )
+
+            def r(txn):
+                txn.execute(select)
+                rows = txn.fetchall()
+                headers = [column[0] for column in txn.description]
+
+                ts_ind = headers.index('ts')
+
+                return headers, [r for r in rows if r[ts_ind] < yesterday]
+
+            headers, rows = yield self.sqlite_store.runInteraction(
+                "select", r,
+            )
+
+            self.convert_rows(table, headers, rows)
+
+            inserted_rows = len(rows)
+            max_inserted_rowid = max(r[0] for r in rows)
+
+            def insert(txn):
+                self.postgres_store.insert_many_txn(
+                    txn, table, headers[1:], rows
+                )
+
+            yield self.postgres_store.execute(insert)
+
+            def get_start_id(txn):
+                txn.execute(
+                    "SELECT rowid FROM sent_transactions WHERE ts >= ?"
+                    " ORDER BY rowid ASC LIMIT 1",
+                    (yesterday,)
+                )
+
+                rows = txn.fetchall()
+                if rows:
+                    return rows[0][0]
+                else:
+                    return 1
+
+            next_chunk = yield self.sqlite_store.execute(get_start_id)
+            next_chunk = max(max_inserted_rowid + 1, next_chunk)
+
+            yield self.postgres_store._simple_insert(
+                table="port_from_sqlite3",
+                values={"table_name": table, "rowid": next_chunk}
+            )
+
+            def get_sent_table_size(txn):
+                txn.execute(
+                    "SELECT count(*) FROM sent_transactions"
+                    " WHERE ts >= ?",
+                    (yesterday,)
+                )
+                size, = txn.fetchone()
+                return int(size)
+
+            table_size = yield self.sqlite_store.execute(
+                get_sent_table_size
+            )
+
+            table_size += inserted_rows
+        elif table in APPEND_ONLY_TABLES:
             # It's safe to just carry on inserting.
             next_chunk = yield self.postgres_store._simple_select_one_onecol(
                 table="port_from_sqlite3",
@@ -365,28 +474,18 @@ class Porter(object):
                 )
 
                 next_chunk = 1
+
+            table_size = yield self.sqlite_store.execute(get_table_size)
         else:
-            def delete_all(txn):
-                txn.execute(
-                    "DELETE FROM port_from_sqlite3 WHERE table_name = %s",
-                    (table,)
-                )
-                txn.execute("TRUNCATE %s CASCADE" % (table,))
-                self.postgres_store._simple_insert_txn(
-                    txn,
-                    table="port_from_sqlite3",
-                    values={"table_name": table, "rowid": 0}
-                )
             yield self.postgres_store.execute(delete_all)
+            self.postgres_store._simple_insert(
+                table="port_from_sqlite3",
+                values={"table_name": table, "rowid": 0}
+            )
 
+            table_size = yield self.sqlite_store.execute(get_table_size)
             next_chunk = 1
 
-        def get_table_size(txn):
-            txn.execute("SELECT count(*) FROM %s" % (table,))
-            size, = txn.fetchone()
-            return int(size)
-
-        table_size = yield self.sqlite_store.execute(get_table_size)
         postgres_size = yield self.postgres_store.execute(get_table_size)
 
         if not table_size:
@@ -399,8 +498,6 @@ class Porter(object):
             % (table,)
         )
 
-        bool_col_names = BOOLEAN_COLUMNS.get(table, [])
-
         while True:
             def r(txn):
                 txn.execute(select, (next_chunk, self.batch_size,))
@@ -412,24 +509,9 @@ class Porter(object):
             headers, rows = yield self.sqlite_store.runInteraction("select", r)
 
             if rows:
-                bool_cols = [
-                    i for i, h in enumerate(headers) if h in bool_col_names
-                ]
                 next_chunk = rows[-1][0] + 1
 
-                def conv(j, col):
-                    if j in bool_cols:
-                        return bool(col)
-                    return col
-
-                for i, row in enumerate(rows):
-                    rows[i] = tuple(
-                        self.postgres_store.database_engine.encode_parameter(
-                            conv(j, col)
-                        )
-                        for j, col in enumerate(row)
-                        if j > 0
-                    )
+                self.convert_rows(table, headers, rows)
 
                 def insert(txn):
                     self.postgres_store.insert_many_txn(