summary refs log tree commit diff
path: root/scripts/synapse_port_db
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2019-10-30 13:37:04 +0000
committerErik Johnston <erik@matrix.org>2019-10-30 13:37:04 +0000
commitec6de1cc7d915abf6907b1d6a93336f8cd435cdd (patch)
tree5e0beaa14d2e9abc82116a7a07740abf2508177b /scripts/synapse_port_db
parentReview comments (diff)
parentMerge pull request #6291 from matrix-org/erikj/fix_cache_descriptor (diff)
downloadsynapse-ec6de1cc7d915abf6907b1d6a93336f8cd435cdd.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into erikj/split_out_persistence_store
Diffstat (limited to 'scripts/synapse_port_db')
-rwxr-xr-xscripts/synapse_port_db193
1 files changed, 141 insertions, 52 deletions
diff --git a/scripts/synapse_port_db b/scripts/synapse_port_db
index 3f942abdb6..54faed1e83 100755
--- a/scripts/synapse_port_db
+++ b/scripts/synapse_port_db
@@ -2,6 +2,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2018 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -29,9 +30,33 @@ import yaml
 from twisted.enterprise import adbapi
 from twisted.internet import defer, reactor
 
-from synapse.storage._base import LoggingTransaction, SQLBaseStore
+from synapse.config.homeserver import HomeServerConfig
+from synapse.logging.context import PreserveLoggingContext
+from synapse.storage._base import LoggingTransaction
+from synapse.storage.data_stores.main.client_ips import ClientIpBackgroundUpdateStore
+from synapse.storage.data_stores.main.deviceinbox import (
+    DeviceInboxBackgroundUpdateStore,
+)
+from synapse.storage.data_stores.main.devices import DeviceBackgroundUpdateStore
+from synapse.storage.data_stores.main.events_bg_updates import (
+    EventsBackgroundUpdatesStore,
+)
+from synapse.storage.data_stores.main.media_repository import (
+    MediaRepositoryBackgroundUpdateStore,
+)
+from synapse.storage.data_stores.main.registration import (
+    RegistrationBackgroundUpdateStore,
+)
+from synapse.storage.data_stores.main.roommember import RoomMemberBackgroundUpdateStore
+from synapse.storage.data_stores.main.search import SearchBackgroundUpdateStore
+from synapse.storage.data_stores.main.state import StateBackgroundUpdateStore
+from synapse.storage.data_stores.main.stats import StatsStore
+from synapse.storage.data_stores.main.user_directory import (
+    UserDirectoryBackgroundUpdateStore,
+)
 from synapse.storage.engines import create_engine
 from synapse.storage.prepare_database import prepare_database
+from synapse.util import Clock
 
 logger = logging.getLogger("synapse_port_db")
 
@@ -43,6 +68,7 @@ BOOLEAN_COLUMNS = {
     "presence_list": ["accepted"],
     "presence_stream": ["currently_active"],
     "public_room_list_stream": ["visibility"],
+    "devices": ["hidden"],
     "device_lists_outbound_pokes": ["sent"],
     "users_who_share_rooms": ["share_private"],
     "groups": ["is_public"],
@@ -98,33 +124,24 @@ APPEND_ONLY_TABLES = [
 end_error_exec_info = None
 
 
-class Store(object):
-    """This object is used to pull out some of the convenience API from the
-    Storage layer.
-
-    *All* database interactions should go through this object.
-    """
-
-    def __init__(self, db_pool, engine):
-        self.db_pool = db_pool
-        self.database_engine = engine
-
-    _simple_insert_txn = SQLBaseStore.__dict__["_simple_insert_txn"]
-    _simple_insert = SQLBaseStore.__dict__["_simple_insert"]
-
-    _simple_select_onecol_txn = SQLBaseStore.__dict__["_simple_select_onecol_txn"]
-    _simple_select_onecol = SQLBaseStore.__dict__["_simple_select_onecol"]
-    _simple_select_one = SQLBaseStore.__dict__["_simple_select_one"]
-    _simple_select_one_txn = SQLBaseStore.__dict__["_simple_select_one_txn"]
-    _simple_select_one_onecol = SQLBaseStore.__dict__["_simple_select_one_onecol"]
-    _simple_select_one_onecol_txn = SQLBaseStore.__dict__[
-        "_simple_select_one_onecol_txn"
-    ]
-
-    _simple_update_one = SQLBaseStore.__dict__["_simple_update_one"]
-    _simple_update_one_txn = SQLBaseStore.__dict__["_simple_update_one_txn"]
-    _simple_update_txn = SQLBaseStore.__dict__["_simple_update_txn"]
+class Store(
+    ClientIpBackgroundUpdateStore,
+    DeviceInboxBackgroundUpdateStore,
+    DeviceBackgroundUpdateStore,
+    EventsBackgroundUpdatesStore,
+    MediaRepositoryBackgroundUpdateStore,
+    RegistrationBackgroundUpdateStore,
+    RoomMemberBackgroundUpdateStore,
+    SearchBackgroundUpdateStore,
+    StateBackgroundUpdateStore,
+    UserDirectoryBackgroundUpdateStore,
+    StatsStore,
+):
+    def __init__(self, db_conn, hs):
+        super().__init__(db_conn, hs)
+        self.db_pool = hs.get_db_pool()
 
+    @defer.inlineCallbacks
     def runInteraction(self, desc, func, *args, **kwargs):
         def r(conn):
             try:
@@ -150,7 +167,8 @@ class Store(object):
                 logger.debug("[TXN FAIL] {%s} %s", desc, e)
                 raise
 
-        return self.db_pool.runWithConnection(r)
+        with PreserveLoggingContext():
+            return (yield self.db_pool.runWithConnection(r))
 
     def execute(self, f, *args, **kwargs):
         return self.runInteraction(f.__name__, f, *args, **kwargs)
@@ -176,6 +194,25 @@ class Store(object):
             raise
 
 
+class MockHomeserver:
+    def __init__(self, config, database_engine, db_conn, db_pool):
+        self.database_engine = database_engine
+        self.db_conn = db_conn
+        self.db_pool = db_pool
+        self.clock = Clock(reactor)
+        self.config = config
+        self.hostname = config.server_name
+
+    def get_db_conn(self):
+        return self.db_conn
+
+    def get_db_pool(self):
+        return self.db_pool
+
+    def get_clock(self):
+        return self.clock
+
+
 class Porter(object):
     def __init__(self, **kwargs):
         self.__dict__.update(kwargs)
@@ -447,31 +484,75 @@ class Porter(object):
 
         db_conn.commit()
 
+        return db_conn
+
     @defer.inlineCallbacks
-    def run(self):
-        try:
-            sqlite_db_pool = adbapi.ConnectionPool(
-                self.sqlite_config["name"], **self.sqlite_config["args"]
-            )
+    def build_db_store(self, config):
+        """Builds and returns a database store using the provided configuration.
 
-            postgres_db_pool = adbapi.ConnectionPool(
-                self.postgres_config["name"], **self.postgres_config["args"]
-            )
+        Args:
+            config: The database configuration, i.e. a dict following the structure of
+                the "database" section of Synapse's configuration file.
+
+        Returns:
+            The built Store object.
+        """
+        engine = create_engine(config)
+
+        self.progress.set_state("Preparing %s" % config["name"])
+        conn = self.setup_db(config, engine)
+
+        db_pool = adbapi.ConnectionPool(
+            config["name"], **config["args"]
+        )
+
+        hs = MockHomeserver(self.hs_config, engine, conn, db_pool)
+
+        store = Store(conn, hs)
+
+        yield store.runInteraction(
+            "%s_engine.check_database" % config["name"],
+            engine.check_database,
+        )
 
-            sqlite_engine = create_engine(sqlite_config)
-            postgres_engine = create_engine(postgres_config)
+        return store
 
-            self.sqlite_store = Store(sqlite_db_pool, sqlite_engine)
-            self.postgres_store = Store(postgres_db_pool, postgres_engine)
+    @defer.inlineCallbacks
+    def run_background_updates_on_postgres(self):
+        # Manually apply all background updates on the PostgreSQL database.
+        postgres_ready = yield self.postgres_store.has_completed_background_updates()
+
+        if not postgres_ready:
+            # Only say that we're running background updates when there are background
+            # updates to run.
+            self.progress.set_state("Running background updates on PostgreSQL")
+
+        while not postgres_ready:
+            yield self.postgres_store.do_next_background_update(100)
+            postgres_ready = yield (
+                self.postgres_store.has_completed_background_updates()
+            )
 
-            yield self.postgres_store.execute(postgres_engine.check_database)
+    @defer.inlineCallbacks
+    def run(self):
+        try:
+            self.sqlite_store = yield self.build_db_store(self.sqlite_config)
+
+            # Check if all background updates are done, abort if not.
+            updates_complete = yield self.sqlite_store.has_completed_background_updates()
+            if not updates_complete:
+                sys.stderr.write(
+                    "Pending background updates exist in the SQLite3 database."
+                    " Please start Synapse again and wait until every update has finished"
+                    " before running this script.\n"
+                )
+                defer.returnValue(None)
 
-            # Step 1. Set up databases.
-            self.progress.set_state("Preparing SQLite3")
-            self.setup_db(sqlite_config, sqlite_engine)
+            self.postgres_store = yield self.build_db_store(
+                self.hs_config.database_config
+            )
 
-            self.progress.set_state("Preparing PostgreSQL")
-            self.setup_db(postgres_config, postgres_engine)
+            yield self.run_background_updates_on_postgres()
 
             self.progress.set_state("Creating port tables")
 
@@ -563,6 +644,8 @@ class Porter(object):
         def conv(j, col):
             if j in bool_cols:
                 return bool(col)
+            if isinstance(col, bytes):
+                return bytearray(col)
             elif isinstance(col, string_types) and "\0" in col:
                 logger.warn(
                     "DROPPING ROW: NUL value in table %s col %s: %r",
@@ -926,18 +1009,24 @@ if __name__ == "__main__":
         },
     }
 
-    postgres_config = yaml.safe_load(args.postgres_config)
+    hs_config = yaml.safe_load(args.postgres_config)
 
-    if "database" in postgres_config:
-        postgres_config = postgres_config["database"]
+    if "database" not in hs_config:
+        sys.stderr.write("The configuration file must have a 'database' section.\n")
+        sys.exit(4)
+
+    postgres_config = hs_config["database"]
 
     if "name" not in postgres_config:
-        sys.stderr.write("Malformed database config: no 'name'")
+        sys.stderr.write("Malformed database config: no 'name'\n")
         sys.exit(2)
     if postgres_config["name"] != "psycopg2":
-        sys.stderr.write("Database must use 'psycopg2' connector.")
+        sys.stderr.write("Database must use the 'psycopg2' connector.\n")
         sys.exit(3)
 
+    config = HomeServerConfig()
+    config.parse_config_dict(hs_config, "", "")
+
     def start(stdscr=None):
         if stdscr:
             progress = CursesProgress(stdscr)
@@ -946,9 +1035,9 @@ if __name__ == "__main__":
 
         porter = Porter(
             sqlite_config=sqlite_config,
-            postgres_config=postgres_config,
             progress=progress,
             batch_size=args.batch_size,
+            hs_config=config,
         )
 
         reactor.callWhenRunning(porter.run)