summary refs log tree commit diff
path: root/scripts/synapse_port_db
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2020-08-05 21:38:57 +0100
committerGitHub <noreply@github.com>2020-08-05 21:38:57 +0100
commita7bdf98d01d2225a479753a85ba81adf02b16a32 (patch)
tree11443d9f4d89d9cd33d34d4146fd62e6393a87c6 /scripts/synapse_port_db
parentStop the parent process flushing the logs on exit (#8012) (diff)
downloadsynapse-a7bdf98d01d2225a479753a85ba81adf02b16a32.tar.xz
Rename database classes to make some sense (#8033)
Diffstat (limited to 'scripts/synapse_port_db')
-rwxr-xr-xscripts/synapse_port_db78
1 files changed, 39 insertions, 39 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index bee525197f..ae5e1810fc 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -35,31 +35,29 @@ from synapse.logging.context import (
     make_deferred_yieldable,
     run_in_background,
 )
-from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
-from synapse.storage.data_stores.main.deviceinbox import (
-    DeviceInboxBackgroundUpdateStore,
-)
-from synapse.storage.data_stores.main.devices import DeviceBackgroundUpdateStore
-from synapse.storage.data_stores.main.events_bg_updates import (
+from synapse.storage.database import DatabasePool, make_conn
+from synapse.storage.databases.main.client_ips import ClientIpBackgroundUpdateStore
+from synapse.storage.databases.main.deviceinbox import DeviceInboxBackgroundUpdateStore
+from synapse.storage.databases.main.devices import DeviceBackgroundUpdateStore
+from synapse.storage.databases.main.events_bg_updates import (
     EventsBackgroundUpdatesStore,
 )
-from synapse.storage.data_stores.main.media_repository import (
+from synapse.storage.databases.main.media_repository import (
     MediaRepositoryBackgroundUpdateStore,
 )
-from synapse.storage.data_stores.main.registration import (
+from synapse.storage.databases.main.registration import (
     RegistrationBackgroundUpdateStore,
     find_max_generated_user_id_localpart,
 )
-from synapse.storage.data_stores.main.room import RoomBackgroundUpdateStore
-from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
-from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
-from synapse.storage.data_stores.main.state import MainStateBackgroundUpdateStore
-from synapse.storage.data_stores.main.stats import StatsStore
-from synapse.storage.data_stores.main.user_directory import (
+from synapse.storage.databases.main.room import RoomBackgroundUpdateStore
+from synapse.storage.databases.main.roommember import RoomMemberBackgroundUpdateStore
+from synapse.storage.databases.main.search import SearchBackgroundUpdateStore
+from synapse.storage.databases.main.state import MainStateBackgroundUpdateStore
+from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.databases.main.user_directory import (
     UserDirectoryBackgroundUpdateStore,
 )
-from synapse.storage.data_stores.state.bg_updates import StateBackgroundUpdateStore
-from synapse.storage.database import Database, make_conn
+from synapse.storage.databases.state.bg_updates import StateBackgroundUpdateStore
 from synapse.storage.engines import create_engine
 from synapse.storage.prepare_database import prepare_database
 from synapse.util import Clock
@@ -175,14 +173,14 @@ class Store(
     StatsStore,
 ):
     def execute(self, f, *args, **kwargs):
-        return self.db.runInteraction(f.__name__, f, *args, **kwargs)
+        return self.db_pool.runInteraction(f.__name__, f, *args, **kwargs)
 
     def execute_sql(self, sql, *args):
         def r(txn):
             txn.execute(sql, args)
             return txn.fetchall()
 
-        return self.db.runInteraction("execute_sql", r)
+        return self.db_pool.runInteraction("execute_sql", r)
 
     def insert_many_txn(self, txn, table, headers, rows):
         sql = "INSERT INTO %s (%s) VALUES (%s)" % (
@@ -227,7 +225,7 @@ class Porter(object):
     async def setup_table(self, table):
         if table in APPEND_ONLY_TABLES:
             # It's safe to just carry on inserting.
-            row = await self.postgres_store.db.simple_select_one(
+            row = await self.postgres_store.db_pool.simple_select_one(
                 table="port_from_sqlite3",
                 keyvalues={"table_name": table},
                 retcols=("forward_rowid", "backward_rowid"),
@@ -244,7 +242,7 @@ class Porter(object):
                     ) = await self._setup_sent_transactions()
                     backward_chunk = 0
                 else:
-                    await self.postgres_store.db.simple_insert(
+                    await self.postgres_store.db_pool.simple_insert(
                         table="port_from_sqlite3",
                         values={
                             "table_name": table,
@@ -274,7 +272,7 @@ class Porter(object):
 
             await self.postgres_store.execute(delete_all)
 
-            await self.postgres_store.db.simple_insert(
+            await self.postgres_store.db_pool.simple_insert(
                 table="port_from_sqlite3",
                 values={"table_name": table, "forward_rowid": 1, "backward_rowid": 0},
             )
@@ -318,7 +316,7 @@ class Porter(object):
         if table == "user_directory_stream_pos":
             # We need to make sure there is a single row, `(X, null), as that is
             # what synapse expects to be there.
-            await self.postgres_store.db.simple_insert(
+            await self.postgres_store.db_pool.simple_insert(
                 table=table, values={"stream_id": None}
             )
             self.progress.update(table, table_size)  # Mark table as done
@@ -359,7 +357,7 @@ class Porter(object):
 
                 return headers, forward_rows, backward_rows
 
-            headers, frows, brows = await self.sqlite_store.db.runInteraction(
+            headers, frows, brows = await self.sqlite_store.db_pool.runInteraction(
                 "select", r
             )
 
@@ -375,7 +373,7 @@ class Porter(object):
                 def insert(txn):
                     self.postgres_store.insert_many_txn(txn, table, headers[1:], rows)
 
-                    self.postgres_store.db.simple_update_one_txn(
+                    self.postgres_store.db_pool.simple_update_one_txn(
                         txn,
                         table="port_from_sqlite3",
                         keyvalues={"table_name": table},
@@ -413,7 +411,7 @@ class Porter(object):
 
                 return headers, rows
 
-            headers, rows = await self.sqlite_store.db.runInteraction("select", r)
+            headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
 
             if rows:
                 forward_chunk = rows[-1][0] + 1
@@ -451,7 +449,7 @@ class Porter(object):
                         ],
                     )
 
-                    self.postgres_store.db.simple_update_one_txn(
+                    self.postgres_store.db_pool.simple_update_one_txn(
                         txn,
                         table="port_from_sqlite3",
                         keyvalues={"table_name": "event_search"},
@@ -494,7 +492,7 @@ class Porter(object):
                 db_conn, allow_outdated_version=allow_outdated_version
             )
             prepare_database(db_conn, engine, config=self.hs_config)
-            store = Store(Database(hs, db_config, engine), db_conn, hs)
+            store = Store(DatabasePool(hs, db_config, engine), db_conn, hs)
             db_conn.commit()
 
         return store
@@ -502,7 +500,7 @@ class Porter(object):
     async def run_background_updates_on_postgres(self):
         # Manually apply all background updates on the PostgreSQL database.
         postgres_ready = (
-            await self.postgres_store.db.updates.has_completed_background_updates()
+            await self.postgres_store.db_pool.updates.has_completed_background_updates()
         )
 
         if not postgres_ready:
@@ -511,9 +509,9 @@ class Porter(object):
             self.progress.set_state("Running background updates on PostgreSQL")
 
         while not postgres_ready:
-            await self.postgres_store.db.updates.do_next_background_update(100)
+            await self.postgres_store.db_pool.updates.do_next_background_update(100)
             postgres_ready = await (
-                self.postgres_store.db.updates.has_completed_background_updates()
+                self.postgres_store.db_pool.updates.has_completed_background_updates()
             )
 
     async def run(self):
@@ -534,7 +532,7 @@ class Porter(object):
 
             # Check if all background updates are done, abort if not.
             updates_complete = (
-                await self.sqlite_store.db.updates.has_completed_background_updates()
+                await self.sqlite_store.db_pool.updates.has_completed_background_updates()
             )
             if not updates_complete:
                 end_error = (
@@ -576,22 +574,24 @@ class Porter(object):
                 )
 
             try:
-                await self.postgres_store.db.runInteraction("alter_table", alter_table)
+                await self.postgres_store.db_pool.runInteraction(
+                    "alter_table", alter_table
+                )
             except Exception:
                 # On Error Resume Next
                 pass
 
-            await self.postgres_store.db.runInteraction(
+            await self.postgres_store.db_pool.runInteraction(
                 "create_port_table", create_port_table
             )
 
             # Step 2. Get tables.
             self.progress.set_state("Fetching tables")
-            sqlite_tables = await self.sqlite_store.db.simple_select_onecol(
+            sqlite_tables = await self.sqlite_store.db_pool.simple_select_onecol(
                 table="sqlite_master", keyvalues={"type": "table"}, retcol="name"
             )
 
-            postgres_tables = await self.postgres_store.db.simple_select_onecol(
+            postgres_tables = await self.postgres_store.db_pool.simple_select_onecol(
                 table="information_schema.tables",
                 keyvalues={},
                 retcol="distinct table_name",
@@ -692,7 +692,7 @@ class Porter(object):
 
             return headers, [r for r in rows if r[ts_ind] < yesterday]
 
-        headers, rows = await self.sqlite_store.db.runInteraction("select", r)
+        headers, rows = await self.sqlite_store.db_pool.runInteraction("select", r)
 
         rows = self._convert_rows("sent_transactions", headers, rows)
 
@@ -725,7 +725,7 @@ class Porter(object):
         next_chunk = await self.sqlite_store.execute(get_start_id)
         next_chunk = max(max_inserted_rowid + 1, next_chunk)
 
-        await self.postgres_store.db.simple_insert(
+        await self.postgres_store.db_pool.simple_insert(
             table="port_from_sqlite3",
             values={
                 "table_name": "sent_transactions",
@@ -794,14 +794,14 @@ class Porter(object):
             next_id = curr_id + 1
             txn.execute("ALTER SEQUENCE state_group_id_seq RESTART WITH %s", (next_id,))
 
-        return self.postgres_store.db.runInteraction("setup_state_group_id_seq", r)
+        return self.postgres_store.db_pool.runInteraction("setup_state_group_id_seq", r)
 
     def _setup_user_id_seq(self):
         def r(txn):
             next_id = find_max_generated_user_id_localpart(txn) + 1
             txn.execute("ALTER SEQUENCE user_id_seq RESTART WITH %s", (next_id,))
 
-        return self.postgres_store.db.runInteraction("setup_user_id_seq", r)
+        return self.postgres_store.db_pool.runInteraction("setup_user_id_seq", r)
 
 
 ##############################################