diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index bee525197f..ae5e1810fc 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -35,31 +35,29 @@ from synapse.logging.context import (
make_deferred_yieldable,
run_in_background,
)
-from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
-from synapse.storage.data_stores.main.deviceinbox import (
- DeviceInboxBackgroundUpdateStore,
-)
-from synapse.storage.data_stores.main.devices import DeviceBackgroundUpdateStore
-from synapse.storage.data_stores.main.events_bg_updates import (
+from synapse.storage.database import DatabasePool, make_conn
+from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
+from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
+from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore
+from synapse.storage.databases.main.events_bg_updates import (
EventsBackgroundUpdatesStore,
)
-from synapse.storage.data_stores.main.media_repository import (
+from synapse.storage.databases.main.media_repository import (
MediaRepositoryBackgroundUpdateStore,
)
-from synapse.storage.data_stores.main.registration import (
+from synapse.storage.databases.main.registration import (
RegistrationBackgroundUpdateStore,
find_max_generated_user_id_localpart,
)
-from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
-from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
-from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
-from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
-from synapse.storage.data_stores.main.stats import StatsStore
-from synapse.storage.data_stores.main.user_directory import (
+from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
+from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
+from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
+from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
+from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.databases.main.user_directory import (
UserDirectoryBackgroundUpdateStore,
)
-from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
-from synapse.storage.database import Database, make_conn
+from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
from synapse.storage.engines import create_engine
from synapse.storage.prepare_database import prepare_database
from synapse.util import Clock
@@ -175,14 +173,14 @@ class Store(
StatsStore,
):
def execute(self, f, *args, **kwargs):
- return self.db.runInteraction(f.__name__, f, *args, **kwargs)
+ return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
def execute_sql(self, sql, *args):
def r(txn):
txn.execute(sql, args)
return txn.fetchall()
- return self.db.runInteraction("execute_sql", r)
+ return self.db_pool.runInteraction("execute_sql", r)
def insert_many_txn(self, txn, table, headers, rows):
sql = "INSERT INTO %s (%s) VALUES (%s)" % (
@@ -227,7 +225,7 @@ class Porter(object):
async def setup_table(self, table):
if table in APPEND_ONLY_TABLES:
# It's safe to just carry on inserting.
- row = await self.postgres_store.db.simple_select_one(
+ row = await self.postgres_store.db_pool.simple_select_one(
table="port_from_sqlite3",
keyvalues={"table_name": table},
retcols=("forward_rowid", "backward_rowid"),
@@ -244,7 +242,7 @@ class Porter(object):
) = await self._setup_sent_transactions()
backward_chunk = 0
else:
- await self.postgres_store.db.simple_insert(
+ await self.postgres_store.db_pool.simple_insert(
table="port_from_sqlite3",
values={
"table_name": table,
@@ -274,7 +272,7 @@ class Porter(object):
await self.postgres_store.execute(delete_all)
- await self.postgres_store.db.simple_insert(
+ await self.postgres_store.db_pool.simple_insert(
table="port_from_sqlite3",
values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
)
@@ -318,7 +316,7 @@ class Porter(object):
if table == "user_directory_stream_pos":
# We need to make sure there is a single row, `(X, null), as that is
# what synapse expects to be there.
- await self.postgres_store.db.simple_insert(
+ await self.postgres_store.db_pool.simple_insert(
table=table, values={"stream_id": None}
)
self.progress.update(table, table_size) # Mark table as done
@@ -359,7 +357,7 @@ class Porter(object):
return headers, forward_rows, backward_rows
- headers, frows, brows = await self.sqlite_store.db.runInteraction(
+ headers, frows, brows = await self.sqlite_store.db_pool.runInteraction(
"select", r
)
@@ -375,7 +373,7 @@ class Porter(object):
def insert(txn):
self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
- self.postgres_store.db.simple_update_one_txn(
+ self.postgres_store.db_pool.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": table},
@@ -413,7 +411,7 @@ class Porter(object):
return headers, rows
- headers, rows = await self.sqlite_store.db.runInteraction("select", r)
+ headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
if rows:
forward_chunk = rows[-1][0] + 1
@@ -451,7 +449,7 @@ class Porter(object):
],
)
- self.postgres_store.db.simple_update_one_txn(
+ self.postgres_store.db_pool.simple_update_one_txn(
txn,
table="port_from_sqlite3",
keyvalues={"table_name": "event_search"},
@@ -494,7 +492,7 @@ class Porter(object):
db_conn, allow_outdated_version=allow_outdated_version
)
prepare_database(db_conn, engine, config=self.hs_config)
- store = Store(Database(hs, db_config, engine), db_conn, hs)
+ store = Store(DatabasePool(hs, db_config, engine), db_conn, hs)
db_conn.commit()
return store
@@ -502,7 +500,7 @@ class Porter(object):
async def run_background_updates_on_postgres(self):
# Manually apply all background updates on the PostgreSQL database.
postgres_ready = (
- await self.postgres_store.db.updates.has_completed_background_updates()
+ await self.postgres_store.db_pool.updates.has_completed_background_updates()
)
if not postgres_ready:
@@ -511,9 +509,9 @@ class Porter(object):
self.progress.set_state("Running background updates on PostgreSQL")
while not postgres_ready:
- await self.postgres_store.db.updates.do_next_background_update(100)
+ await self.postgres_store.db_pool.updates.do_next_background_update(100)
postgres_ready = await (
- self.postgres_store.db.updates.has_completed_background_updates()
+ self.postgres_store.db_pool.updates.has_completed_background_updates()
)
async def run(self):
@@ -534,7 +532,7 @@ class Porter(object):
# Check if all background updates are done, abort if not.
updates_complete = (
- await self.sqlite_store.db.updates.has_completed_background_updates()
+ await self.sqlite_store.db_pool.updates.has_completed_background_updates()
)
if not updates_complete:
end_error = (
@@ -576,22 +574,24 @@ class Porter(object):
)
try:
- await self.postgres_store.db.runInteraction("alter_table", alter_table)
+ await self.postgres_store.db_pool.runInteraction(
+ "alter_table", alter_table
+ )
except Exception:
# On Error Resume Next
pass
- await self.postgres_store.db.runInteraction(
+ await self.postgres_store.db_pool.runInteraction(
"create_port_table", create_port_table
)
# Step 2. Get tables.
self.progress.set_state("Fetching tables")
- sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
+ sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
)
- postgres_tables = await self.postgres_store.db.simple_select_onecol(
+ postgres_tables = await self.postgres_store.db_pool.simple_select_onecol(
table="information_schema.tables",
keyvalues={},
retcol="distinct table_name",
@@ -692,7 +692,7 @@ class Porter(object):
return headers, [r for r in rows if r[ts_ind] < yesterday]
- headers, rows = await self.sqlite_store.db.runInteraction("select", r)
+ headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
rows = self._convert_rows("sent_transactions", headers, rows)
@@ -725,7 +725,7 @@ class Porter(object):
next_chunk = await self.sqlite_store.execute(get_start_id)
next_chunk = max(max_inserted_rowid + 1, next_chunk)
- await self.postgres_store.db.simple_insert(
+ await self.postgres_store.db_pool.simple_insert(
table="port_from_sqlite3",
values={
"table_name": "sent_transactions",
@@ -794,14 +794,14 @@ class Porter(object):
next_id = curr_id + 1
txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
- return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)
+ return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
def _setup_user_id_seq(self):
def r(txn):
next_id = find_max_generated_user_id_localpart(txn) + 1
txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
- return self.postgres_store.db.runInteraction("setup_user_id_seq", r)
+ return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
##############################################
|