summary refs log tree commit diff
path: root/scripts/synapse_port_db
diff options
context:
space:
mode:
Diffstat (limited to 'scripts/synapse_port_db')
-rwxr-xr-xscripts/synapse_port_db123
1 files changed, 81 insertions, 42 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 253a6ef6c7..efd04da2d6 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -214,6 +214,10 @@ class Porter(object):
 
         self.progress.add_table(table, postgres_size, table_size)
 
+        if table == "event_search":
+            yield self.handle_search_table(postgres_size, table_size, next_chunk)
+            return
+
         select = (
             "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
             % (table,)
@@ -232,60 +236,95 @@ class Porter(object):
             if rows:
                 next_chunk = rows[-1][0] + 1
 
-                if table == "event_search":
-                    # We have to treat event_search differently since it has a
-                    # different structure in the two different databases.
-                    def insert(txn):
-                        sql = (
-                            "INSERT INTO event_search (event_id, room_id, key, sender, vector)"
-                            " VALUES (?,?,?,?,to_tsvector('english', ?))"
-                        )
+                self._convert_rows(table, headers, rows)
 
-                        rows_dict = [
-                            dict(zip(headers, row))
-                            for row in rows
-                        ]
-
-                        txn.executemany(sql, [
-                            (
-                                row["event_id"],
-                                row["room_id"],
-                                row["key"],
-                                row["sender"],
-                                row["value"],
-                            )
-                            for row in rows_dict
-                        ])
-
-                        self.postgres_store._simple_update_one_txn(
-                            txn,
-                            table="port_from_sqlite3",
-                            keyvalues={"table_name": table},
-                            updatevalues={"rowid": next_chunk},
-                        )
-                else:
-                    self._convert_rows(table, headers, rows)
+                def insert(txn):
+                    self.postgres_store.insert_many_txn(
+                        txn, table, headers[1:], rows
+                    )
 
-                    def insert(txn):
-                        self.postgres_store.insert_many_txn(
-                            txn, table, headers[1:], rows
-                        )
+                    self.postgres_store._simple_update_one_txn(
+                        txn,
+                        table="port_from_sqlite3",
+                        keyvalues={"table_name": table},
+                        updatevalues={"rowid": next_chunk},
+                    )
+
+                yield self.postgres_store.execute(insert)
+
+                postgres_size += len(rows)
+
+                self.progress.update(table, postgres_size)
+            else:
+                return
+
+    @defer.inlineCallbacks
+    def handle_search_table(self, postgres_size, table_size, next_chunk):
+        select = (
+            "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
+            " FROM event_search as es"
+            " INNER JOIN events AS e USING (event_id, room_id)"
+            " WHERE es.rowid >= ?"
+            " ORDER BY es.rowid LIMIT ?"
+        )
 
-                        self.postgres_store._simple_update_one_txn(
-                            txn,
-                            table="port_from_sqlite3",
-                            keyvalues={"table_name": table},
-                            updatevalues={"rowid": next_chunk},
+        while True:
+            def r(txn):
+                txn.execute(select, (next_chunk, self.batch_size,))
+                rows = txn.fetchall()
+                headers = [column[0] for column in txn.description]
+
+                return headers, rows
+
+            headers, rows = yield self.sqlite_store.runInteraction("select", r)
+
+            if rows:
+                next_chunk = rows[-1][0] + 1
+
+                # We have to treat event_search differently since it has a
+                # different structure in the two different databases.
+                def insert(txn):
+                    sql = (
+                        "INSERT INTO event_search (event_id, room_id, key,"
+                        " sender, vector, origin_server_ts, stream_ordering)"
+                        " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
+                    )
+
+                    rows_dict = [
+                        dict(zip(headers, row))
+                        for row in rows
+                    ]
+
+                    txn.executemany(sql, [
+                        (
+                            row["event_id"],
+                            row["room_id"],
+                            row["key"],
+                            row["sender"],
+                            row["value"],
+                            row["origin_server_ts"],
+                            row["stream_ordering"],
                         )
+                        for row in rows_dict
+                    ])
+
+                    self.postgres_store._simple_update_one_txn(
+                        txn,
+                        table="port_from_sqlite3",
+                        keyvalues={"table_name": "event_search"},
+                        updatevalues={"rowid": next_chunk},
+                    )
 
                 yield self.postgres_store.execute(insert)
 
                 postgres_size += len(rows)
 
-                self.progress.update(table, postgres_size)
+                self.progress.update("event_search", postgres_size)
+
             else:
                 return
 
+
     def setup_db(self, db_config, database_engine):
         db_conn = database_engine.module.connect(
             **{