diff options
Diffstat (limited to 'scripts/synapse_port_db')
-rwxr-xr-x | scripts/synapse_port_db | 123 |
1 files changed, 81 insertions, 42 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index 253a6ef6c7..efd04da2d6 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -214,6 +214,10 @@ class Porter(object): self.progress.add_table(table, postgres_size, table_size) + if table == "event_search": + yield self.handle_search_table(postgres_size, table_size, next_chunk) + return + select = ( "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) @@ -232,60 +236,95 @@ class Porter(object): if rows: next_chunk = rows[-1][0] + 1 - if table == "event_search": - # We have to treat event_search differently since it has a - # different structure in the two different databases. - def insert(txn): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, sender, vector)" - " VALUES (?,?,?,?,to_tsvector('english', ?))" - ) + self._convert_rows(table, headers, rows) - rows_dict = [ - dict(zip(headers, row)) - for row in rows - ] - - txn.executemany(sql, [ - ( - row["event_id"], - row["room_id"], - row["key"], - row["sender"], - row["value"], - ) - for row in rows_dict - ]) - - self.postgres_store._simple_update_one_txn( - txn, - table="port_from_sqlite3", - keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, - ) - else: - self._convert_rows(table, headers, rows) + def insert(txn): + self.postgres_store.insert_many_txn( + txn, table, headers[1:], rows + ) - def insert(txn): - self.postgres_store.insert_many_txn( - txn, table, headers[1:], rows - ) + self.postgres_store._simple_update_one_txn( + txn, + table="port_from_sqlite3", + keyvalues={"table_name": table}, + updatevalues={"rowid": next_chunk}, + ) + + yield self.postgres_store.execute(insert) + + postgres_size += len(rows) + + self.progress.update(table, postgres_size) + else: + return + + @defer.inlineCallbacks + def handle_search_table(self, postgres_size, table_size, next_chunk): + select = ( + "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering" + " FROM event_search as es" + " INNER JOIN events AS e USING (event_id, room_id)" + " WHERE es.rowid >= ?" + " ORDER BY es.rowid LIMIT ?" + ) - self.postgres_store._simple_update_one_txn( - txn, - table="port_from_sqlite3", - keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, + while True: + def r(txn): + txn.execute(select, (next_chunk, self.batch_size,)) + rows = txn.fetchall() + headers = [column[0] for column in txn.description] + + return headers, rows + + headers, rows = yield self.sqlite_store.runInteraction("select", r) + + if rows: + next_chunk = rows[-1][0] + 1 + + # We have to treat event_search differently since it has a + # different structure in the two different databases. + def insert(txn): + sql = ( + "INSERT INTO event_search (event_id, room_id, key," + " sender, vector, origin_server_ts, stream_ordering)" + " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)" + ) + + rows_dict = [ + dict(zip(headers, row)) + for row in rows + ] + + txn.executemany(sql, [ + ( + row["event_id"], + row["room_id"], + row["key"], + row["sender"], + row["value"], + row["origin_server_ts"], + row["stream_ordering"], ) + for row in rows_dict + ]) + + self.postgres_store._simple_update_one_txn( + txn, + table="port_from_sqlite3", + keyvalues={"table_name": "event_search"}, + updatevalues={"rowid": next_chunk}, + ) yield self.postgres_store.execute(insert) postgres_size += len(rows) - self.progress.update(table, postgres_size) + self.progress.update("event_search", postgres_size) + else: return + def setup_db(self, db_config, database_engine): db_conn = database_engine.module.connect( **{ |