summary refs log tree commit diff
path: root/scripts/synapse_port_db
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/synapse_port_db')
-rwxr-xr-xscripts/synapse_port_db136
1 files changed, 86 insertions, 50 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index a2a0f364cf..efd04da2d6 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -19,6 +19,7 @@ from twisted.enterprise import adbapi
 
 from synapse.storage._base import LoggingTransaction, SQLBaseStore
 from synapse.storage.engines import create_engine
+from synapse.storage.prepare_database import prepare_database
 
 import argparse
 import curses
@@ -37,6 +38,7 @@ BOOLEAN_COLUMNS = {
     "rooms": ["is_public"],
     "event_edges": ["is_state"],
     "presence_list": ["accepted"],
+    "presence_stream": ["currently_active"],
 }
 
 
@@ -212,6 +214,10 @@ class Porter(object):
 
         self.progress.add_table(table, postgres_size, table_size)
 
+        if table == "event_search":
+            yield self.handle_search_table(postgres_size, table_size, next_chunk)
+            return
+
         select = (
             "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
             % (table,)
@@ -230,60 +236,95 @@ class Porter(object):
             if rows:
                 next_chunk = rows[-1][0] + 1
 
-                if table == "event_search":
-                    # We have to treat event_search differently since it has a
-                    # different structure in the two different databases.
-                    def insert(txn):
-                        sql = (
-                            "INSERT INTO event_search (event_id, room_id, key, sender, vector)"
-                            " VALUES (?,?,?,?,to_tsvector('english', ?))"
-                        )
+                self._convert_rows(table, headers, rows)
 
-                        rows_dict = [
-                            dict(zip(headers, row))
-                            for row in rows
-                        ]
-
-                        txn.executemany(sql, [
-                            (
-                                row["event_id"],
-                                row["room_id"],
-                                row["key"],
-                                row["sender"],
-                                row["value"],
-                            )
-                            for row in rows_dict
-                        ])
-
-                        self.postgres_store._simple_update_one_txn(
-                            txn,
-                            table="port_from_sqlite3",
-                            keyvalues={"table_name": table},
-                            updatevalues={"rowid": next_chunk},
-                        )
-                else:
-                    self._convert_rows(table, headers, rows)
+                def insert(txn):
+                    self.postgres_store.insert_many_txn(
+                        txn, table, headers[1:], rows
+                    )
 
-                    def insert(txn):
-                        self.postgres_store.insert_many_txn(
-                            txn, table, headers[1:], rows
-                        )
+                    self.postgres_store._simple_update_one_txn(
+                        txn,
+                        table="port_from_sqlite3",
+                        keyvalues={"table_name": table},
+                        updatevalues={"rowid": next_chunk},
+                    )
+
+                yield self.postgres_store.execute(insert)
+
+                postgres_size += len(rows)
+
+                self.progress.update(table, postgres_size)
+            else:
+                return
+
+    @defer.inlineCallbacks
+    def handle_search_table(self, postgres_size, table_size, next_chunk):
+        select = (
+            "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
+            " FROM event_search as es"
+            " INNER JOIN events AS e USING (event_id, room_id)"
+            " WHERE es.rowid >= ?"
+            " ORDER BY es.rowid LIMIT ?"
+        )
+
+        while True:
+            def r(txn):
+                txn.execute(select, (next_chunk, self.batch_size,))
+                rows = txn.fetchall()
+                headers = [column[0] for column in txn.description]
+
+                return headers, rows
+
+            headers, rows = yield self.sqlite_store.runInteraction("select", r)
+
+            if rows:
+                next_chunk = rows[-1][0] + 1
+
+                # We have to treat event_search differently since it has a
+                # different structure in the two different databases.
+                def insert(txn):
+                    sql = (
+                        "INSERT INTO event_search (event_id, room_id, key,"
+                        " sender, vector, origin_server_ts, stream_ordering)"
+                        " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
+                    )
 
-                        self.postgres_store._simple_update_one_txn(
-                            txn,
-                            table="port_from_sqlite3",
-                            keyvalues={"table_name": table},
-                            updatevalues={"rowid": next_chunk},
+                    rows_dict = [
+                        dict(zip(headers, row))
+                        for row in rows
+                    ]
+
+                    txn.executemany(sql, [
+                        (
+                            row["event_id"],
+                            row["room_id"],
+                            row["key"],
+                            row["sender"],
+                            row["value"],
+                            row["origin_server_ts"],
+                            row["stream_ordering"],
                         )
+                        for row in rows_dict
+                    ])
+
+                    self.postgres_store._simple_update_one_txn(
+                        txn,
+                        table="port_from_sqlite3",
+                        keyvalues={"table_name": "event_search"},
+                        updatevalues={"rowid": next_chunk},
+                    )
 
                 yield self.postgres_store.execute(insert)
 
                 postgres_size += len(rows)
 
-                self.progress.update(table, postgres_size)
+                self.progress.update("event_search", postgres_size)
+
             else:
                 return
 
+
     def setup_db(self, db_config, database_engine):
         db_conn = database_engine.module.connect(
             **{
@@ -292,7 +333,7 @@ class Porter(object):
             }
         )
 
-        database_engine.prepare_database(db_conn)
+        prepare_database(db_conn, database_engine, config=None)
 
         db_conn.commit()
 
@@ -309,8 +350,8 @@ class Porter(object):
                 **self.postgres_config["args"]
             )
 
-            sqlite_engine = create_engine(FakeConfig(sqlite_config))
-            postgres_engine = create_engine(FakeConfig(postgres_config))
+            sqlite_engine = create_engine(sqlite_config)
+            postgres_engine = create_engine(postgres_config)
 
             self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
             self.postgres_store = Store(postgres_db_pool, postgres_engine)
@@ -792,8 +833,3 @@ if __name__ == "__main__":
     if end_error_exec_info:
         exc_type, exc_value, exc_traceback = end_error_exec_info
         traceback.print_exception(exc_type, exc_value, exc_traceback)
-
-
-class FakeConfig:
-    def __init__(self, database_config):
-        self.database_config = database_config