diff options
Diffstat (limited to 'scripts/synapse_port_db')
-rwxr-xr-x | scripts/synapse_port_db | 136 |
1 files changed, 86 insertions, 50 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db index a2a0f364cf..efd04da2d6 100755 --- a/scripts/synapse_port_db +++ b/scripts/synapse_port_db @@ -19,6 +19,7 @@ from twisted.enterprise import adbapi from synapse.storage._base import LoggingTransaction, SQLBaseStore from synapse.storage.engines import create_engine +from synapse.storage.prepare_database import prepare_database import argparse import curses @@ -37,6 +38,7 @@ BOOLEAN_COLUMNS = { "rooms": ["is_public"], "event_edges": ["is_state"], "presence_list": ["accepted"], + "presence_stream": ["currently_active"], } @@ -212,6 +214,10 @@ class Porter(object): self.progress.add_table(table, postgres_size, table_size) + if table == "event_search": + yield self.handle_search_table(postgres_size, table_size, next_chunk) + return + select = ( "SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?" % (table,) @@ -230,60 +236,95 @@ class Porter(object): if rows: next_chunk = rows[-1][0] + 1 - if table == "event_search": - # We have to treat event_search differently since it has a - # different structure in the two different databases. - def insert(txn): - sql = ( - "INSERT INTO event_search (event_id, room_id, key, sender, vector)" - " VALUES (?,?,?,?,to_tsvector('english', ?))" - ) + self._convert_rows(table, headers, rows) - rows_dict = [ - dict(zip(headers, row)) - for row in rows - ] - - txn.executemany(sql, [ - ( - row["event_id"], - row["room_id"], - row["key"], - row["sender"], - row["value"], - ) - for row in rows_dict - ]) - - self.postgres_store._simple_update_one_txn( - txn, - table="port_from_sqlite3", - keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, - ) - else: - self._convert_rows(table, headers, rows) + def insert(txn): + self.postgres_store.insert_many_txn( + txn, table, headers[1:], rows + ) - def insert(txn): - self.postgres_store.insert_many_txn( - txn, table, headers[1:], rows - ) + self.postgres_store._simple_update_one_txn( + txn, + table="port_from_sqlite3", + keyvalues={"table_name": table}, + updatevalues={"rowid": next_chunk}, + ) + + yield self.postgres_store.execute(insert) + + postgres_size += len(rows) + + self.progress.update(table, postgres_size) + else: + return + + @defer.inlineCallbacks + def handle_search_table(self, postgres_size, table_size, next_chunk): + select = ( + "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering" + " FROM event_search as es" + " INNER JOIN events AS e USING (event_id, room_id)" + " WHERE es.rowid >= ?" + " ORDER BY es.rowid LIMIT ?" + ) + + while True: + def r(txn): + txn.execute(select, (next_chunk, self.batch_size,)) + rows = txn.fetchall() + headers = [column[0] for column in txn.description] + + return headers, rows + + headers, rows = yield self.sqlite_store.runInteraction("select", r) + + if rows: + next_chunk = rows[-1][0] + 1 + + # We have to treat event_search differently since it has a + # different structure in the two different databases. + def insert(txn): + sql = ( + "INSERT INTO event_search (event_id, room_id, key," + " sender, vector, origin_server_ts, stream_ordering)" + " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)" + ) - self.postgres_store._simple_update_one_txn( - txn, - table="port_from_sqlite3", - keyvalues={"table_name": table}, - updatevalues={"rowid": next_chunk}, + rows_dict = [ + dict(zip(headers, row)) + for row in rows + ] + + txn.executemany(sql, [ + ( + row["event_id"], + row["room_id"], + row["key"], + row["sender"], + row["value"], + row["origin_server_ts"], + row["stream_ordering"], ) + for row in rows_dict + ]) + + self.postgres_store._simple_update_one_txn( + txn, + table="port_from_sqlite3", + keyvalues={"table_name": "event_search"}, + updatevalues={"rowid": next_chunk}, + ) yield self.postgres_store.execute(insert) postgres_size += len(rows) - self.progress.update(table, postgres_size) + self.progress.update("event_search", postgres_size) + else: return + def setup_db(self, db_config, database_engine): db_conn = database_engine.module.connect( **{ @@ -292,7 +333,7 @@ class Porter(object): } ) - database_engine.prepare_database(db_conn) + prepare_database(db_conn, database_engine, config=None) db_conn.commit() @@ -309,8 +350,8 @@ class Porter(object): **self.postgres_config["args"] ) - sqlite_engine = create_engine(FakeConfig(sqlite_config)) - postgres_engine = create_engine(FakeConfig(postgres_config)) + sqlite_engine = create_engine(sqlite_config) + postgres_engine = create_engine(postgres_config) self.sqlite_store = Store(sqlite_db_pool, sqlite_engine) self.postgres_store = Store(postgres_db_pool, postgres_engine) @@ -792,8 +833,3 @@ if __name__ == "__main__": if end_error_exec_info: exc_type, exc_value, exc_traceback = end_error_exec_info traceback.print_exception(exc_type, exc_value, exc_traceback) - - -class FakeConfig: - def __init__(self, database_config): - self.database_config = database_config |