diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 253a6ef6c7..efd04da2d6 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -214,6 +214,10 @@ class Porter(object):
self.progress.add_table(table, postgres_size, table_size)
+ if table == "event_search":
+ yield self.handle_search_table(postgres_size, table_size, next_chunk)
+ return
+
select = (
"SELECT rowid, * FROM %s WHERE rowid >= ? ORDER BY rowid LIMIT ?"
% (table,)
@@ -232,60 +236,95 @@ class Porter(object):
if rows:
next_chunk = rows[-1][0] + 1
- if table == "event_search":
- # We have to treat event_search differently since it has a
- # different structure in the two different databases.
- def insert(txn):
- sql = (
- "INSERT INTO event_search (event_id, room_id, key, sender, vector)"
- " VALUES (?,?,?,?,to_tsvector('english', ?))"
- )
+ self._convert_rows(table, headers, rows)
- rows_dict = [
- dict(zip(headers, row))
- for row in rows
- ]
-
- txn.executemany(sql, [
- (
- row["event_id"],
- row["room_id"],
- row["key"],
- row["sender"],
- row["value"],
- )
- for row in rows_dict
- ])
-
- self.postgres_store._simple_update_one_txn(
- txn,
- table="port_from_sqlite3",
- keyvalues={"table_name": table},
- updatevalues={"rowid": next_chunk},
- )
- else:
- self._convert_rows(table, headers, rows)
+ def insert(txn):
+ self.postgres_store.insert_many_txn(
+ txn, table, headers[1:], rows
+ )
- def insert(txn):
- self.postgres_store.insert_many_txn(
- txn, table, headers[1:], rows
- )
+ self.postgres_store._simple_update_one_txn(
+ txn,
+ table="port_from_sqlite3",
+ keyvalues={"table_name": table},
+ updatevalues={"rowid": next_chunk},
+ )
+
+ yield self.postgres_store.execute(insert)
+
+ postgres_size += len(rows)
+
+ self.progress.update(table, postgres_size)
+ else:
+ return
+
+ @defer.inlineCallbacks
+ def handle_search_table(self, postgres_size, table_size, next_chunk):
+ select = (
+ "SELECT es.rowid, es.*, e.origin_server_ts, e.stream_ordering"
+ " FROM event_search as es"
+ " INNER JOIN events AS e USING (event_id, room_id)"
+ " WHERE es.rowid >= ?"
+ " ORDER BY es.rowid LIMIT ?"
+ )
- self.postgres_store._simple_update_one_txn(
- txn,
- table="port_from_sqlite3",
- keyvalues={"table_name": table},
- updatevalues={"rowid": next_chunk},
+ while True:
+ def r(txn):
+ txn.execute(select, (next_chunk, self.batch_size,))
+ rows = txn.fetchall()
+ headers = [column[0] for column in txn.description]
+
+ return headers, rows
+
+ headers, rows = yield self.sqlite_store.runInteraction("select", r)
+
+ if rows:
+ next_chunk = rows[-1][0] + 1
+
+ # We have to treat event_search differently since it has a
+ # different structure in the two different databases.
+ def insert(txn):
+ sql = (
+ "INSERT INTO event_search (event_id, room_id, key,"
+ " sender, vector, origin_server_ts, stream_ordering)"
+ " VALUES (?,?,?,?,to_tsvector('english', ?),?,?)"
+ )
+
+ rows_dict = [
+ dict(zip(headers, row))
+ for row in rows
+ ]
+
+ txn.executemany(sql, [
+ (
+ row["event_id"],
+ row["room_id"],
+ row["key"],
+ row["sender"],
+ row["value"],
+ row["origin_server_ts"],
+ row["stream_ordering"],
)
+ for row in rows_dict
+ ])
+
+ self.postgres_store._simple_update_one_txn(
+ txn,
+ table="port_from_sqlite3",
+ keyvalues={"table_name": "event_search"},
+ updatevalues={"rowid": next_chunk},
+ )
yield self.postgres_store.execute(insert)
postgres_size += len(rows)
- self.progress.update(table, postgres_size)
+ self.progress.update("event_search", postgres_size)
+
else:
return
+
def setup_db(self, db_config, database_engine):
db_conn = database_engine.module.connect(
**{
|