diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/__init__.py | 23 | ||||
-rw-r--r-- | synapse/storage/databases/main/__init__.py | 24 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_push_actions.py | 7 | ||||
-rw-r--r-- | synapse/storage/prepare_database.py | 87 | ||||
-rw-r--r-- | synapse/storage/util/id_generators.py | 5 |
5 files changed, 111 insertions, 35 deletions
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py index 7f08bd8285..985b12df91 100644 --- a/synapse/storage/databases/__init__.py +++ b/synapse/storage/databases/__init__.py @@ -47,9 +47,14 @@ class Databases: engine = create_engine(database_config.config) with make_conn(database_config, engine) as db_conn: - logger.info("Preparing database %r...", db_name) - + logger.info("[database config %r]: Checking database server", db_name) engine.check_database(db_conn) + + logger.info( + "[database config %r]: Preparing for databases %r", + db_name, + database_config.databases, + ) prepare_database( db_conn, engine, hs.config, databases=database_config.databases, ) @@ -57,7 +62,9 @@ class Databases: database = DatabasePool(hs, database_config, engine) if "main" in database_config.databases: - logger.info("Starting 'main' data store") + logger.info( + "[database config %r]: Starting 'main' database", db_name + ) # Sanity check we don't try and configure the main store on # multiple databases. @@ -72,7 +79,9 @@ class Databases: persist_events = PersistEventsStore(hs, database, main) if "state" in database_config.databases: - logger.info("Starting 'state' data store") + logger.info( + "[database config %r]: Starting 'state' database", db_name + ) # Sanity check we don't try and configure the state store on # multiple databases. @@ -85,7 +94,7 @@ class Databases: self.databases.append(database) - logger.info("Database %r prepared", db_name) + logger.info("[database config %r]: prepared", db_name) # Closing the context manager doesn't close the connection. # psycopg will close the connection when the object gets GCed, but *only* @@ -98,10 +107,10 @@ class Databases: # Sanity check that we have actually configured all the required stores. if not main: - raise Exception("No 'main' data store configured") + raise Exception("No 'main' database configured") if not state: - raise Exception("No 'state' data store configured") + raise Exception("No 'state' database configured") # We use local variables here to ensure that the databases do not have # optional types. diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py index 99890ffbf3..2ae2fbd5d7 100644 --- a/synapse/storage/databases/main/__init__.py +++ b/synapse/storage/databases/main/__init__.py @@ -29,6 +29,7 @@ from synapse.storage.util.id_generators import ( MultiWriterIdGenerator, StreamIdGenerator, ) +from synapse.types import get_domain_from_id from synapse.util.caches.stream_change_cache import StreamChangeCache from .account_data import AccountDataStore @@ -591,21 +592,24 @@ def check_database_before_upgrade(cur, database_engine, config: HomeServerConfig """Called before upgrading an existing database to check that it is broadly sane compared with the configuration. """ - domain = config.server_name + logger.info("Checking database for consistency with configuration...") - sql = database_engine.convert_param_style( - "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?" - ) - pat = "%:" + domain - cur.execute(sql, (pat,)) - num_not_matching = cur.fetchall()[0][0] - if num_not_matching == 0: + # if there are any users in the database, check that the username matches our + # configured server name. + + cur.execute("SELECT name FROM users LIMIT 1") + rows = cur.fetchall() + if not rows: + return + + user_domain = get_domain_from_id(rows[0][0]) + if user_domain == config.server_name: return raise Exception( "Found users in database not native to %s!\n" - "You cannot changed a synapse server_name after it's been configured" - % (domain,) + "You cannot change a synapse server_name after it's been configured" + % (config.server_name,) ) diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py index 001d06378d..50fac9e72e 100644 --- a/synapse/storage/databases/main/event_push_actions.py +++ b/synapse/storage/databases/main/event_push_actions.py @@ -177,7 +177,12 @@ class EventPushActionsWorkerStore(SQLBaseStore): if row: notif_count += row[0] - unread_count += row[1] + + if row[1] is not None: + # The unread_count column of event_push_summary is NULLable, so we need + # to make sure we don't try increasing the unread counts if it's NULL + # for this row. + unread_count += row[1] return { "notify_count": notif_count, diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 229acb2da7..a7f2dfb850 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -50,6 +50,22 @@ class UpgradeDatabaseException(PrepareDatabaseException): pass +OUTDATED_SCHEMA_ON_WORKER_ERROR = ( + "Expected database schema version %i but got %i: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) + +EMPTY_DATABASE_ON_WORKER_ERROR = ( + "Uninitialised database: run the main synapse process to prepare the database " + "schema before starting worker processes." +) + +UNAPPLIED_DELTA_ON_WORKER_ERROR = ( + "Database schema delta %s has not been applied: run the main synapse process to " + "upgrade the database schema before starting worker processes." +) + + def prepare_database( db_conn: Connection, database_engine: BaseDatabaseEngine, @@ -83,30 +99,49 @@ def prepare_database( # at all, so this is redundant but harmless there.) cur.execute("BEGIN TRANSACTION") + logger.info("%r: Checking existing schema version", databases) version_info = _get_or_create_schema_state(cur, database_engine) if version_info: user_version, delta_files, upgraded = version_info + logger.info( + "%r: Existing schema is %i (+%i deltas)", + databases, + user_version, + len(delta_files), + ) + # config should only be None when we are preparing an in-memory SQLite db, + # which should be empty. if config is None: - if user_version != SCHEMA_VERSION: - # If we don't pass in a config file then we are expecting to - # have already upgraded the DB. - raise UpgradeDatabaseException( - "Expected database schema version %i but got %i" - % (SCHEMA_VERSION, user_version) - ) - else: - _upgrade_existing_database( - cur, - user_version, - delta_files, - upgraded, - database_engine, - config, - databases=databases, + raise ValueError( + "config==None in prepare_database, but databse is not empty" ) + + # if it's a worker app, refuse to upgrade the database, to avoid multiple + # workers doing it at once. + if config.worker_app is not None and user_version != SCHEMA_VERSION: + raise UpgradeDatabaseException( + OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version) + ) + + _upgrade_existing_database( + cur, + user_version, + delta_files, + upgraded, + database_engine, + config, + databases=databases, + ) else: + logger.info("%r: Initialising new database", databases) + + # if it's a worker app, refuse to upgrade the database, to avoid multiple + # workers doing it at once. + if config and config.worker_app is not None: + raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR) + _setup_new_database(cur, database_engine, databases=databases) # check if any of our configured dynamic modules want a database @@ -312,6 +347,8 @@ def _upgrade_existing_database( else: assert config + is_worker = config and config.worker_app is not None + if current_version > SCHEMA_VERSION: raise ValueError( "Cannot use this database as it is too " @@ -339,7 +376,7 @@ def _upgrade_existing_database( specific_engine_extensions = (".sqlite", ".postgres") for v in range(start_ver, SCHEMA_VERSION + 1): - logger.info("Upgrading schema to v%d", v) + logger.info("Applying schema deltas for v%d", v) # We need to search both the global and per data store schema # directories for schema updates. @@ -399,9 +436,15 @@ def _upgrade_existing_database( continue root_name, ext = os.path.splitext(file_name) + if ext == ".py": # This is a python upgrade module. We need to import into some # package and then execute its `run_upgrade` function. + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) + module_name = "synapse.storage.v%d_%s" % (v, root_name) with open(absolute_path) as python_file: module = imp.load_source(module_name, absolute_path, python_file) @@ -416,10 +459,18 @@ def _upgrade_existing_database( continue elif ext == ".sql": # A plain old .sql file, just read and execute it + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) elif ext == specific_engine_extension and root_name.endswith(".sql"): # A .sql file specific to our engine; just read and execute it + if is_worker: + raise PrepareDatabaseException( + UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path + ) logger.info("Applying engine-specific schema %s", relative_path) executescript(cur, absolute_path) elif ext in specific_engine_extensions and root_name.endswith(".sql"): @@ -449,6 +500,8 @@ def _upgrade_existing_database( (v, True), ) + logger.info("Schema now up to date") + def _apply_module_schemas(txn, database_engine, config): """Apply the module schemas for the dynamic modules, if any diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py index 76bc3afdfa..b7eb4f8ac9 100644 --- a/synapse/storage/util/id_generators.py +++ b/synapse/storage/util/id_generators.py @@ -15,6 +15,7 @@ import contextlib import heapq +import logging import threading from collections import deque from typing import Dict, List, Set @@ -24,6 +25,8 @@ from typing_extensions import Deque from synapse.storage.database import DatabasePool, LoggingTransaction from synapse.storage.util.sequence import PostgresSequenceGenerator +logger = logging.getLogger(__name__) + class IdGenerator: def __init__(self, db_conn, table, column): @@ -48,6 +51,8 @@ def _load_current_id(db_conn, table, column, step=1): Returns: int """ + # debug logging for https://github.com/matrix-org/synapse/issues/7968 + logger.info("initialising stream generator for %s(%s)", table, column) cur = db_conn.cursor() if step == 1: cur.execute("SELECT MAX(%s) FROM %s" % (column, table)) |