diff --git a/changelog.d/6102.bugfix b/changelog.d/6102.bugfix
new file mode 100644
index 0000000000..cd288c2a44
--- /dev/null
+++ b/changelog.d/6102.bugfix
@@ -0,0 +1 @@
+Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 3f942abdb6..5a34d6f2f5 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -2,6 +2,7 @@
# -*- coding: utf-8 -*-
# Copyright 2015, 2016 OpenMarket Ltd
# Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -29,9 +30,23 @@ import yaml
from twisted.enterprise import adbapi
from twisted.internet import defer, reactor
-from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.config.homeserver import HomeServerConfig
+from synapse.logging.context import PreserveLoggingContext
+from synapse.storage._base import LoggingTransaction
+from synapse.storage.client_ips import ClientIpBackgroundUpdateStore
+from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore
+from synapse.storage.devices import DeviceBackgroundUpdateStore
from synapse.storage.engines import create_engine
+from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore
+from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore
from synapse.storage.prepare_database import prepare_database
+from synapse.storage.registration import RegistrationBackgroundUpdateStore
+from synapse.storage.roommember import RoomMemberBackgroundUpdateStore
+from synapse.storage.search import SearchBackgroundUpdateStore
+from synapse.storage.state import StateBackgroundUpdateStore
+from synapse.storage.stats import StatsStore
+from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore
+from synapse.util import Clock
logger = logging.getLogger("synapse_port_db")
@@ -98,33 +113,24 @@ APPEND_ONLY_TABLES = [
end_error_exec_info = None
-class Store(object):
- """This object is used to pull out some of the convenience API from the
- Storage layer.
-
- *All* database interactions should go through this object.
- """
-
- def __init__(self, db_pool, engine):
- self.db_pool = db_pool
- self.database_engine = engine
-
- _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
- _simple_insert = SQLBaseStore.__dict__["_simple_insert"]
-
- _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
- _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
- _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
- _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
- _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
- _simple_select_one_onecol_txn = SQLBaseStore.__dict__[
- "_simple_select_one_onecol_txn"
- ]
-
- _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
- _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
- _simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"]
+class Store(
+ ClientIpBackgroundUpdateStore,
+ DeviceInboxBackgroundUpdateStore,
+ DeviceBackgroundUpdateStore,
+ EventsBackgroundUpdatesStore,
+ MediaRepositoryBackgroundUpdateStore,
+ RegistrationBackgroundUpdateStore,
+ RoomMemberBackgroundUpdateStore,
+ SearchBackgroundUpdateStore,
+ StateBackgroundUpdateStore,
+ UserDirectoryBackgroundUpdateStore,
+ StatsStore,
+):
+ def __init__(self, db_conn, hs):
+ super().__init__(db_conn, hs)
+ self.db_pool = hs.get_db_pool()
+ @defer.inlineCallbacks
def runInteraction(self, desc, func, *args, **kwargs):
def r(conn):
try:
@@ -150,7 +156,8 @@ class Store(object):
logger.debug("[TXN FAIL] {%s} %s", desc, e)
raise
- return self.db_pool.runWithConnection(r)
+ with PreserveLoggingContext():
+ return (yield self.db_pool.runWithConnection(r))
def execute(self, f, *args, **kwargs):
return self.runInteraction(f.__name__, f, *args, **kwargs)
@@ -176,6 +183,25 @@ class Store(object):
raise
+class MockHomeserver:
+ def __init__(self, config, database_engine, db_conn, db_pool):
+ self.database_engine = database_engine
+ self.db_conn = db_conn
+ self.db_pool = db_pool
+ self.clock = Clock(reactor)
+ self.config = config
+ self.hostname = config.server_name
+
+ def get_db_conn(self):
+ return self.db_conn
+
+ def get_db_pool(self):
+ return self.db_pool
+
+ def get_clock(self):
+ return self.clock
+
+
class Porter(object):
def __init__(self, **kwargs):
self.__dict__.update(kwargs)
@@ -447,31 +473,75 @@ class Porter(object):
db_conn.commit()
+ return db_conn
+
@defer.inlineCallbacks
- def run(self):
- try:
- sqlite_db_pool = adbapi.ConnectionPool(
- self.sqlite_config["name"], **self.sqlite_config["args"]
- )
+ def build_db_store(self, config):
+ """Builds and returns a database store using the provided configuration.
- postgres_db_pool = adbapi.ConnectionPool(
- self.postgres_config["name"], **self.postgres_config["args"]
- )
+ Args:
+ config: The database configuration, i.e. a dict following the structure of
+ the "database" section of Synapse's configuration file.
+
+ Returns:
+ The built Store object.
+ """
+ engine = create_engine(config)
+
+ self.progress.set_state("Preparing %s" % config["name"])
+ conn = self.setup_db(config, engine)
+
+ db_pool = adbapi.ConnectionPool(
+ config["name"], **config["args"]
+ )
+
+ hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
+
+ store = Store(conn, hs)
+
+ yield store.runInteraction(
+ "%s_engine.check_database" % config["name"],
+ engine.check_database,
+ )
- sqlite_engine = create_engine(sqlite_config)
- postgres_engine = create_engine(postgres_config)
+ return store
- self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
- self.postgres_store = Store(postgres_db_pool, postgres_engine)
+ @defer.inlineCallbacks
+ def run_background_updates_on_postgres(self):
+ # Manually apply all background updates on the PostgreSQL database.
+ postgres_ready = yield self.postgres_store.has_completed_background_updates()
+
+ if not postgres_ready:
+ # Only say that we're running background updates when there are background
+ # updates to run.
+ self.progress.set_state("Running background updates on PostgreSQL")
+
+ while not postgres_ready:
+ yield self.postgres_store.do_next_background_update(100)
+ postgres_ready = yield (
+ self.postgres_store.has_completed_background_updates()
+ )
- yield self.postgres_store.execute(postgres_engine.check_database)
+ @defer.inlineCallbacks
+ def run(self):
+ try:
+ self.sqlite_store = yield self.build_db_store(self.sqlite_config)
+
+ # Check if all background updates are done, abort if not.
+ updates_complete = yield self.sqlite_store.has_completed_background_updates()
+ if not updates_complete:
+ sys.stderr.write(
+ "Pending background updates exist in the SQLite3 database."
+ " Please start Synapse again and wait until every update has finished"
+ " before running this script.\n"
+ )
+ defer.returnValue(None)
- # Step 1. Set up databases.
- self.progress.set_state("Preparing SQLite3")
- self.setup_db(sqlite_config, sqlite_engine)
+ self.postgres_store = yield self.build_db_store(
+ self.hs_config.database_config
+ )
- self.progress.set_state("Preparing PostgreSQL")
- self.setup_db(postgres_config, postgres_engine)
+ yield self.run_background_updates_on_postgres()
self.progress.set_state("Creating port tables")
@@ -563,6 +633,8 @@ class Porter(object):
def conv(j, col):
if j in bool_cols:
return bool(col)
+ if isinstance(col, bytes):
+ return bytearray(col)
elif isinstance(col, string_types) and "\0" in col:
logger.warn(
"DROPPING ROW: NUL value in table %s col %s: %r",
@@ -926,18 +998,24 @@ if __name__ == "__main__":
},
}
- postgres_config = yaml.safe_load(args.postgres_config)
+ hs_config = yaml.safe_load(args.postgres_config)
- if "database" in postgres_config:
- postgres_config = postgres_config["database"]
+ if "database" not in hs_config:
+ sys.stderr.write("The configuration file must have a 'database' section.\n")
+ sys.exit(4)
+
+ postgres_config = hs_config["database"]
if "name" not in postgres_config:
- sys.stderr.write("Malformed database config: no 'name'")
+ sys.stderr.write("Malformed database config: no 'name'\n")
sys.exit(2)
if postgres_config["name"] != "psycopg2":
- sys.stderr.write("Database must use 'psycopg2' connector.")
+ sys.stderr.write("Database must use the 'psycopg2' connector.\n")
sys.exit(3)
+ config = HomeServerConfig()
+ config.parse_config_dict(hs_config, "", "")
+
def start(stdscr=None):
if stdscr:
progress = CursesProgress(stdscr)
@@ -946,9 +1024,9 @@ if __name__ == "__main__":
porter = Porter(
sqlite_config=sqlite_config,
- postgres_config=postgres_config,
progress=progress,
batch_size=args.batch_size,
+ hs_config=config,
)
reactor.callWhenRunning(porter.run)
|