diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index c4cf11d19a..7a2e177d3d 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -173,14 +173,14 @@ class Store(
return (yield self.db_pool.runWithConnection(r))
def execute(self, f, *args, **kwargs):
- return self.runInteraction(f.__name__, f, *args, **kwargs)
+ return self.db.runInteraction(f.__name__, f, *args, **kwargs)
def execute_sql(self, sql, *args):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()
- return self.runInteraction("execute_sql", r)
+ return self.db.runInteraction("execute_sql", r)
def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
@@ -223,7 +223,7 @@ class Porter(object):
def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
- row = yield self.postgres_store.simple_select_one(
+ row = yield self.postgres_store.db.simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
@@ -233,12 +233,14 @@ class Porter(object):
total_to_port = None
if row is None:
if table == "sent_transactions":
- forward_chunk, already_ported, total_to_port = (
- yield self._setup_sent_transactions()
- )
+ (
+ forward_chunk,
+ already_ported,
+ total_to_port,
+ ) = yield self._setup_sent_transactions()
backward_chunk = 0
else:
- yield self.postgres_store.simple_insert(
+ yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
@@ -268,7 +270,7 @@ class Porter(object):
yield self.postgres_store.execute(delete_all)
- yield self.postgres_store.simple_insert(
+ yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
@@ -322,7 +324,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
- yield self.postgres_store.simple_insert(
+ yield self.postgres_store.db.simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
@@ -363,7 +365,7 @@ class Porter(object):
return headers, forward_rows, backward_rows
- headers, frows, brows = yield self.sqlite_store.runInteraction("select", r)
+ headers, frows, brows = yield self.sqlite_store.db.runInteraction("select", r)
if frows or brows:
if frows:
@@ -377,7 +379,7 @@ class Porter(object):
def insert(txn):
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
- self.postgres_store.simple_update_one_txn(
+ self.postgres_store.db.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
@@ -416,7 +418,7 @@ class Porter(object):
return headers, rows
- headers, rows = yield self.sqlite_store.runInteraction("select", r)
+ headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
if rows:
forward_chunk = rows[-1][0] + 1
@@ -433,8 +435,8 @@ class Porter(object):
rows_dict = []
for row in rows:
d = dict(zip(headers, row))
- if "\0" in d['value']:
- logger.warning('dropping search row %s', d)
+ if "\0" in d["value"]:
+ logger.warning("dropping search row %s", d)
else:
rows_dict.append(d)
@@ -454,7 +456,7 @@ class Porter(object):
],
)
- self.postgres_store.simple_update_one_txn(
+ self.postgres_store.db.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
@@ -504,17 +506,14 @@ class Porter(object):
self.progress.set_state("Preparing %s" % config["name"])
conn = self.setup_db(config, engine)
- db_pool = adbapi.ConnectionPool(
- config["name"], **config["args"]
- )
+ db_pool = adbapi.ConnectionPool(config["name"], **config["args"])
hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
store = Store(conn, hs)
- yield store.runInteraction(
- "%s_engine.check_database" % config["name"],
- engine.check_database,
+ yield store.db.runInteraction(
+ "%s_engine.check_database" % config["name"], engine.check_database,
)
return store
@@ -541,7 +540,9 @@ class Porter(object):
self.sqlite_store = yield self.build_db_store(self.sqlite_config)
# Check if all background updates are done, abort if not.
- updates_complete = yield self.sqlite_store.has_completed_background_updates()
+ updates_complete = (
+ yield self.sqlite_store.has_completed_background_updates()
+ )
if not updates_complete:
sys.stderr.write(
"Pending background updates exist in the SQLite3 database."
@@ -582,22 +583,22 @@ class Porter(object):
)
try:
- yield self.postgres_store.runInteraction("alter_table", alter_table)
+ yield self.postgres_store.db.runInteraction("alter_table", alter_table)
except Exception:
# On Error Resume Next
pass
- yield self.postgres_store.runInteraction(
+ yield self.postgres_store.db.runInteraction(
"create_port_table", create_port_table
)
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
- sqlite_tables = yield self.sqlite_store.simple_select_onecol(
+ sqlite_tables = yield self.sqlite_store.db.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)
- postgres_tables = yield self.postgres_store.simple_select_onecol(
+ postgres_tables = yield self.postgres_store.db.simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
@@ -687,11 +688,11 @@ class Porter(object):
rows = txn.fetchall()
headers = [column[0] for column in txn.description]
- ts_ind = headers.index('ts')
+ ts_ind = headers.index("ts")
return headers, [r for r in rows if r[ts_ind] < yesterday]
- headers, rows = yield self.sqlite_store.runInteraction("select", r)
+ headers, rows = yield self.sqlite_store.db.runInteraction("select", r)
rows = self._convert_rows("sent_transactions", headers, rows)
@@ -724,7 +725,7 @@ class Porter(object):
next_chunk = yield self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
- yield self.postgres_store.simple_insert(
+ yield self.postgres_store.db.simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
@@ -737,7 +738,7 @@ class Porter(object):
txn.execute(
"SELECT count(*) FROM sent_transactions" " WHERE ts >= ?", (yesterday,)
)
- size, = txn.fetchone()
+ (size,) = txn.fetchone()
return int(size)
remaining_count = yield self.sqlite_store.execute(get_sent_table_size)
@@ -790,7 +791,7 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
- return self.postgres_store.runInteraction("setup_state_group_id_seq", r)
+ return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)
##############################################
@@ -871,7 +872,7 @@ class CursesProgress(Progress):
duration = int(now) - int(self.start_time)
minutes, seconds = divmod(duration, 60)
- duration_str = '%02dm %02ds' % (minutes, seconds)
+ duration_str = "%02dm %02ds" % (minutes, seconds)
if self.finished:
status = "Time spent: %s (Done!)" % (duration_str,)
@@ -881,7 +882,7 @@ class CursesProgress(Progress):
left = float(self.total_remaining) / self.total_processed
est_remaining = (int(now) - self.start_time) * left
- est_remaining_str = '%02dm %02ds remaining' % divmod(est_remaining, 60)
+ est_remaining_str = "%02dm %02ds remaining" % divmod(est_remaining, 60)
else:
est_remaining_str = "Unknown"
status = "Time spent: %s (est. remaining: %s)" % (
@@ -967,7 +968,7 @@ if __name__ == "__main__":
description="A script to port an existing synapse SQLite database to"
" a new PostgreSQL database."
)
- parser.add_argument("-v", action='store_true')
+ parser.add_argument("-v", action="store_true")
parser.add_argument(
"--sqlite-database",
required=True,
@@ -976,12 +977,12 @@ if __name__ == "__main__":
)
parser.add_argument(
"--postgres-config",
- type=argparse.FileType('r'),
+ type=argparse.FileType("r"),
required=True,
help="The database config file for the PostgreSQL database",
)
parser.add_argument(
- "--curses", action='store_true', help="display a curses based progress UI"
+ "--curses", action="store_true", help="display a curses based progress UI"
)
parser.add_argument(
|