summary refs log tree commit diff
diff options
context:
space:
mode:
authorBrendan Abolivier <babolivier@matrix.org>2019-10-23 15:31:59 +0100
committerGitHub <noreply@github.com>2019-10-23 15:31:59 +0100
commitc97ed64db3d99680819ec4dcd88ea76f3d0c7537 (patch)
treee57cfb8336c64aa23eb5be01d4d753974fcfcf72
parentAdd config linting script that checks for bool casing (#6203) (diff)
downloadsynapse-c97ed64db3d99680819ec4dcd88ea76f3d0c7537.tar.xz
Make synapse_port_db correctly create indexes (#6102)
Make `synapse_port_db` correctly create indexes in the PostgreSQL database, by having it run the background updates on the database before migrating the data.

To ensure we're migrating the right data, also block the port if the SQLite3 database still has pending or ongoing background updates.

Fixes #4877 
-rw-r--r--changelog.d/6102.bugfix1
-rwxr-xr-xscripts/synapse_port_db182
2 files changed, 131 insertions, 52 deletions
diff --git a/changelog.d/6102.bugfix b/changelog.d/6102.bugfix
new file mode 100644
index 0000000000..cd288c2a44
--- /dev/null
+++ b/changelog.d/6102.bugfix
@@ -0,0 +1 @@
+Make the `synapse_port_db` script create the right indexes on a new PostgreSQL database.
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 3f942abdb6..5a34d6f2f5 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -2,6 +2,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,9 +30,23 @@ import yaml
 from twisted.enterprise import adbapi
 from twisted.internet import defer, reactor
 
-from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.config.homeserver import HomeServerConfig
+from synapse.logging.context import PreserveLoggingContext
+from synapse.storage._base import LoggingTransaction
+from synapse.storage.client_ips import ClientIpBackgroundUpdateStore
+from synapse.storage.deviceinbox import DeviceInboxBackgroundUpdateStore
+from synapse.storage.devices import DeviceBackgroundUpdateStore
 from synapse.storage.engines import create_engine
+from synapse.storage.events_bg_updates import EventsBackgroundUpdatesStore
+from synapse.storage.media_repository import MediaRepositoryBackgroundUpdateStore
 from synapse.storage.prepare_database import prepare_database
+from synapse.storage.registration import RegistrationBackgroundUpdateStore
+from synapse.storage.roommember import RoomMemberBackgroundUpdateStore
+from synapse.storage.search import SearchBackgroundUpdateStore
+from synapse.storage.state import StateBackgroundUpdateStore
+from synapse.storage.stats import StatsStore
+from synapse.storage.user_directory import UserDirectoryBackgroundUpdateStore
+from synapse.util import Clock
 
 logger = logging.getLogger("synapse_port_db")
 
@@ -98,33 +113,24 @@ APPEND_ONLY_TABLES = [
 end_error_exec_info = None
 
 
-class Store(object):
-    """This object is used to pull out some of the convenience API from the
-    Storage layer.
-
-    *All* database interactions should go through this object.
-    """
-
-    def __init__(self, db_pool, engine):
-        self.db_pool = db_pool
-        self.database_engine = engine
-
-    _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
-    _simple_insert = SQLBaseStore.__dict__["_simple_insert"]
-
-    _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
-    _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
-    _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
-    _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
-    _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
-    _simple_select_one_onecol_txn = SQLBaseStore.__dict__[
-        "_simple_select_one_onecol_txn"
-    ]
-
-    _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
-    _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
-    _simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"]
+class Store(
+    ClientIpBackgroundUpdateStore,
+    DeviceInboxBackgroundUpdateStore,
+    DeviceBackgroundUpdateStore,
+    EventsBackgroundUpdatesStore,
+    MediaRepositoryBackgroundUpdateStore,
+    RegistrationBackgroundUpdateStore,
+    RoomMemberBackgroundUpdateStore,
+    SearchBackgroundUpdateStore,
+    StateBackgroundUpdateStore,
+    UserDirectoryBackgroundUpdateStore,
+    StatsStore,
+):
+    def __init__(self, db_conn, hs):
+        super().__init__(db_conn, hs)
+        self.db_pool = hs.get_db_pool()
 
+    @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
         def r(conn):
             try:
@@ -150,7 +156,8 @@ class Store(object):
                 logger.debug("[TXN FAIL] {%s} %s", desc, e)
                 raise
 
-        return self.db_pool.runWithConnection(r)
+        with PreserveLoggingContext():
+            return (yield self.db_pool.runWithConnection(r))
 
     def execute(self, f, *args, **kwargs):
         return self.runInteraction(f.__name__, f, *args, **kwargs)
@@ -176,6 +183,25 @@ class Store(object):
             raise
 
 
+class MockHomeserver:
+    def __init__(self, config, database_engine, db_conn, db_pool):
+        self.database_engine = database_engine
+        self.db_conn = db_conn
+        self.db_pool = db_pool
+        self.clock = Clock(reactor)
+        self.config = config
+        self.hostname = config.server_name
+
+    def get_db_conn(self):
+        return self.db_conn
+
+    def get_db_pool(self):
+        return self.db_pool
+
+    def get_clock(self):
+        return self.clock
+
+
 class Porter(object):
     def __init__(self, **kwargs):
         self.__dict__.update(kwargs)
@@ -447,31 +473,75 @@ class Porter(object):
 
         db_conn.commit()
 
+        return db_conn
+
     @defer.inlineCallbacks
-    def run(self):
-        try:
-            sqlite_db_pool = adbapi.ConnectionPool(
-                self.sqlite_config["name"], **self.sqlite_config["args"]
-            )
+    def build_db_store(self, config):
+        """Builds and returns a database store using the provided configuration.
 
-            postgres_db_pool = adbapi.ConnectionPool(
-                self.postgres_config["name"], **self.postgres_config["args"]
-            )
+        Args:
+            config: The database configuration, i.e. a dict following the structure of
+                the "database" section of Synapse's configuration file.
+
+        Returns:
+            The built Store object.
+        """
+        engine = create_engine(config)
+
+        self.progress.set_state("Preparing %s" % config["name"])
+        conn = self.setup_db(config, engine)
+
+        db_pool = adbapi.ConnectionPool(
+            config["name"], **config["args"]
+        )
+
+        hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
+
+        store = Store(conn, hs)
+
+        yield store.runInteraction(
+            "%s_engine.check_database" % config["name"],
+            engine.check_database,
+        )
 
-            sqlite_engine = create_engine(sqlite_config)
-            postgres_engine = create_engine(postgres_config)
+        return store
 
-            self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
-            self.postgres_store = Store(postgres_db_pool, postgres_engine)
+    @defer.inlineCallbacks
+    def run_background_updates_on_postgres(self):
+        # Manually apply all background updates on the PostgreSQL database.
+        postgres_ready = yield self.postgres_store.has_completed_background_updates()
+
+        if not postgres_ready:
+            # Only say that we're running background updates when there are background
+            # updates to run.
+            self.progress.set_state("Running background updates on PostgreSQL")
+
+        while not postgres_ready:
+            yield self.postgres_store.do_next_background_update(100)
+            postgres_ready = yield (
+                self.postgres_store.has_completed_background_updates()
+            )
 
-            yield self.postgres_store.execute(postgres_engine.check_database)
+    @defer.inlineCallbacks
+    def run(self):
+        try:
+            self.sqlite_store = yield self.build_db_store(self.sqlite_config)
+
+            # Check if all background updates are done, abort if not.
+            updates_complete = yield self.sqlite_store.has_completed_background_updates()
+            if not updates_complete:
+                sys.stderr.write(
+                    "Pending background updates exist in the SQLite3 database."
+                    " Please start Synapse again and wait until every update has finished"
+                    " before running this script.\n"
+                )
+                defer.returnValue(None)
 
-            # Step 1. Set up databases.
-            self.progress.set_state("Preparing SQLite3")
-            self.setup_db(sqlite_config, sqlite_engine)
+            self.postgres_store = yield self.build_db_store(
+                self.hs_config.database_config
+            )
 
-            self.progress.set_state("Preparing PostgreSQL")
-            self.setup_db(postgres_config, postgres_engine)
+            yield self.run_background_updates_on_postgres()
 
             self.progress.set_state("Creating port tables")
 
@@ -563,6 +633,8 @@ class Porter(object):
         def conv(j, col):
             if j in bool_cols:
                 return bool(col)
+            if isinstance(col, bytes):
+                return bytearray(col)
             elif isinstance(col, string_types) and "\0" in col:
                 logger.warn(
                     "DROPPING ROW: NUL value in table %s col %s: %r",
@@ -926,18 +998,24 @@ if __name__ == "__main__":
         },
     }
 
-    postgres_config = yaml.safe_load(args.postgres_config)
+    hs_config = yaml.safe_load(args.postgres_config)
 
-    if "database" in postgres_config:
-        postgres_config = postgres_config["database"]
+    if "database" not in hs_config:
+        sys.stderr.write("The configuration file must have a 'database' section.\n")
+        sys.exit(4)
+
+    postgres_config = hs_config["database"]
 
     if "name" not in postgres_config:
-        sys.stderr.write("Malformed database config: no 'name'")
+        sys.stderr.write("Malformed database config: no 'name'\n")
         sys.exit(2)
     if postgres_config["name"] != "psycopg2":
-        sys.stderr.write("Database must use 'psycopg2' connector.")
+        sys.stderr.write("Database must use the 'psycopg2' connector.\n")
         sys.exit(3)
 
+    config = HomeServerConfig()
+    config.parse_config_dict(hs_config, "", "")
+
     def start(stdscr=None):
         if stdscr:
             progress = CursesProgress(stdscr)
@@ -946,9 +1024,9 @@ if __name__ == "__main__":
 
         porter = Porter(
             sqlite_config=sqlite_config,
-            postgres_config=postgres_config,
             progress=progress,
             batch_size=args.batch_size,
+            hs_config=config,
         )
 
         reactor.callWhenRunning(porter.run)