summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/__init__.py23
-rw-r--r--synapse/storage/databases/main/__init__.py24
-rw-r--r--synapse/storage/databases/main/event_push_actions.py7
-rw-r--r--synapse/storage/prepare_database.py87
-rw-r--r--synapse/storage/util/id_generators.py5
5 files changed, 111 insertions, 35 deletions
diff --git a/synapse/storage/databases/__init__.py b/synapse/storage/databases/__init__.py
index 7f08bd8285..985b12df91 100644
--- a/synapse/storage/databases/__init__.py
+++ b/synapse/storage/databases/__init__.py
@@ -47,9 +47,14 @@ class Databases:
             engine = create_engine(database_config.config)
 
             with make_conn(database_config, engine) as db_conn:
-                logger.info("Preparing database %r...", db_name)
-
+                logger.info("[database config %r]: Checking database server", db_name)
                 engine.check_database(db_conn)
+
+                logger.info(
+                    "[database config %r]: Preparing for databases %r",
+                    db_name,
+                    database_config.databases,
+                )
                 prepare_database(
                     db_conn, engine, hs.config, databases=database_config.databases,
                 )
@@ -57,7 +62,9 @@ class Databases:
                 database = DatabasePool(hs, database_config, engine)
 
                 if "main" in database_config.databases:
-                    logger.info("Starting 'main' data store")
+                    logger.info(
+                        "[database config %r]: Starting 'main' database", db_name
+                    )
 
                     # Sanity check we don't try and configure the main store on
                     # multiple databases.
@@ -72,7 +79,9 @@ class Databases:
                         persist_events = PersistEventsStore(hs, database, main)
 
                 if "state" in database_config.databases:
-                    logger.info("Starting 'state' data store")
+                    logger.info(
+                        "[database config %r]: Starting 'state' database", db_name
+                    )
 
                     # Sanity check we don't try and configure the state store on
                     # multiple databases.
@@ -85,7 +94,7 @@ class Databases:
 
                 self.databases.append(database)
 
-                logger.info("Database %r prepared", db_name)
+                logger.info("[database config %r]: prepared", db_name)
 
             # Closing the context manager doesn't close the connection.
             # psycopg will close the connection when the object gets GCed, but *only*
@@ -98,10 +107,10 @@ class Databases:
 
         # Sanity check that we have actually configured all the required stores.
         if not main:
-            raise Exception("No 'main' data store configured")
+            raise Exception("No 'main' database configured")
 
         if not state:
-            raise Exception("No 'state' data store configured")
+            raise Exception("No 'state' database configured")
 
         # We use local variables here to ensure that the databases do not have
         # optional types.
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 99890ffbf3..2ae2fbd5d7 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -29,6 +29,7 @@ from synapse.storage.util.id_generators import (
     MultiWriterIdGenerator,
     StreamIdGenerator,
 )
+from synapse.types import get_domain_from_id
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
 from .account_data import AccountDataStore
@@ -591,21 +592,24 @@ def check_database_before_upgrade(cur, database_engine, config: HomeServerConfig
     """Called before upgrading an existing database to check that it is broadly sane
     compared with the configuration.
     """
-    domain = config.server_name
+    logger.info("Checking database for consistency with configuration...")
 
-    sql = database_engine.convert_param_style(
-        "SELECT COUNT(*) FROM users WHERE name NOT LIKE ?"
-    )
-    pat = "%:" + domain
-    cur.execute(sql, (pat,))
-    num_not_matching = cur.fetchall()[0][0]
-    if num_not_matching == 0:
+    # if there are any users in the database, check that the username matches our
+    # configured server name.
+
+    cur.execute("SELECT name FROM users LIMIT 1")
+    rows = cur.fetchall()
+    if not rows:
+        return
+
+    user_domain = get_domain_from_id(rows[0][0])
+    if user_domain == config.server_name:
         return
 
     raise Exception(
         "Found users in database not native to %s!\n"
-        "You cannot changed a synapse server_name after it's been configured"
-        % (domain,)
+        "You cannot change a synapse server_name after it's been configured"
+        % (config.server_name,)
     )
 
 
diff --git a/synapse/storage/databases/main/event_push_actions.py b/synapse/storage/databases/main/event_push_actions.py
index 001d06378d..50fac9e72e 100644
--- a/synapse/storage/databases/main/event_push_actions.py
+++ b/synapse/storage/databases/main/event_push_actions.py
@@ -177,7 +177,12 @@ class EventPushActionsWorkerStore(SQLBaseStore):
 
         if row:
             notif_count += row[0]
-            unread_count += row[1]
+
+            if row[1] is not None:
+                # The unread_count column of event_push_summary is NULLable, so we need
+                # to make sure we don't try increasing the unread counts if it's NULL
+                # for this row.
+                unread_count += row[1]
 
         return {
             "notify_count": notif_count,
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 229acb2da7..a7f2dfb850 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -50,6 +50,22 @@ class UpgradeDatabaseException(PrepareDatabaseException):
     pass
 
 
+OUTDATED_SCHEMA_ON_WORKER_ERROR = (
+    "Expected database schema version %i but got %i: run the main synapse process to "
+    "upgrade the database schema before starting worker processes."
+)
+
+EMPTY_DATABASE_ON_WORKER_ERROR = (
+    "Uninitialised database: run the main synapse process to prepare the database "
+    "schema before starting worker processes."
+)
+
+UNAPPLIED_DELTA_ON_WORKER_ERROR = (
+    "Database schema delta %s has not been applied: run the main synapse process to "
+    "upgrade the database schema before starting worker processes."
+)
+
+
 def prepare_database(
     db_conn: Connection,
     database_engine: BaseDatabaseEngine,
@@ -83,30 +99,49 @@ def prepare_database(
         # at all, so this is redundant but harmless there.)
         cur.execute("BEGIN TRANSACTION")
 
+        logger.info("%r: Checking existing schema version", databases)
         version_info = _get_or_create_schema_state(cur, database_engine)
 
         if version_info:
             user_version, delta_files, upgraded = version_info
+            logger.info(
+                "%r: Existing schema is %i (+%i deltas)",
+                databases,
+                user_version,
+                len(delta_files),
+            )
 
+            # config should only be None when we are preparing an in-memory SQLite db,
+            # which should be empty.
             if config is None:
-                if user_version != SCHEMA_VERSION:
-                    # If we don't pass in a config file then we are expecting to
-                    # have already upgraded the DB.
-                    raise UpgradeDatabaseException(
-                        "Expected database schema version %i but got %i"
-                        % (SCHEMA_VERSION, user_version)
-                    )
-            else:
-                _upgrade_existing_database(
-                    cur,
-                    user_version,
-                    delta_files,
-                    upgraded,
-                    database_engine,
-                    config,
-                    databases=databases,
+                raise ValueError(
+                    "config==None in prepare_database, but databse is not empty"
                 )
+
+            # if it's a worker app, refuse to upgrade the database, to avoid multiple
+            # workers doing it at once.
+            if config.worker_app is not None and user_version != SCHEMA_VERSION:
+                raise UpgradeDatabaseException(
+                    OUTDATED_SCHEMA_ON_WORKER_ERROR % (SCHEMA_VERSION, user_version)
+                )
+
+            _upgrade_existing_database(
+                cur,
+                user_version,
+                delta_files,
+                upgraded,
+                database_engine,
+                config,
+                databases=databases,
+            )
         else:
+            logger.info("%r: Initialising new database", databases)
+
+            # if it's a worker app, refuse to upgrade the database, to avoid multiple
+            # workers doing it at once.
+            if config and config.worker_app is not None:
+                raise UpgradeDatabaseException(EMPTY_DATABASE_ON_WORKER_ERROR)
+
             _setup_new_database(cur, database_engine, databases=databases)
 
         # check if any of our configured dynamic modules want a database
@@ -312,6 +347,8 @@ def _upgrade_existing_database(
     else:
         assert config
 
+    is_worker = config and config.worker_app is not None
+
     if current_version > SCHEMA_VERSION:
         raise ValueError(
             "Cannot use this database as it is too "
@@ -339,7 +376,7 @@ def _upgrade_existing_database(
     specific_engine_extensions = (".sqlite", ".postgres")
 
     for v in range(start_ver, SCHEMA_VERSION + 1):
-        logger.info("Upgrading schema to v%d", v)
+        logger.info("Applying schema deltas for v%d", v)
 
         # We need to search both the global and per data store schema
         # directories for schema updates.
@@ -399,9 +436,15 @@ def _upgrade_existing_database(
                 continue
 
             root_name, ext = os.path.splitext(file_name)
+
             if ext == ".py":
                 # This is a python upgrade module. We need to import into some
                 # package and then execute its `run_upgrade` function.
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
+
                 module_name = "synapse.storage.v%d_%s" % (v, root_name)
                 with open(absolute_path) as python_file:
                     module = imp.load_source(module_name, absolute_path, python_file)
@@ -416,10 +459,18 @@ def _upgrade_existing_database(
                 continue
             elif ext == ".sql":
                 # A plain old .sql file, just read and execute it
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
                 logger.info("Applying schema %s", relative_path)
                 executescript(cur, absolute_path)
             elif ext == specific_engine_extension and root_name.endswith(".sql"):
                 # A .sql file specific to our engine; just read and execute it
+                if is_worker:
+                    raise PrepareDatabaseException(
+                        UNAPPLIED_DELTA_ON_WORKER_ERROR % relative_path
+                    )
                 logger.info("Applying engine-specific schema %s", relative_path)
                 executescript(cur, absolute_path)
             elif ext in specific_engine_extensions and root_name.endswith(".sql"):
@@ -449,6 +500,8 @@ def _upgrade_existing_database(
                 (v, True),
             )
 
+    logger.info("Schema now up to date")
+
 
 def _apply_module_schemas(txn, database_engine, config):
     """Apply the module schemas for the dynamic modules, if any
diff --git a/synapse/storage/util/id_generators.py b/synapse/storage/util/id_generators.py
index 76bc3afdfa..b7eb4f8ac9 100644
--- a/synapse/storage/util/id_generators.py
+++ b/synapse/storage/util/id_generators.py
@@ -15,6 +15,7 @@
 
 import contextlib
 import heapq
+import logging
 import threading
 from collections import deque
 from typing import Dict, List, Set
@@ -24,6 +25,8 @@ from typing_extensions import Deque
 from synapse.storage.database import DatabasePool, LoggingTransaction
 from synapse.storage.util.sequence import PostgresSequenceGenerator
 
+logger = logging.getLogger(__name__)
+
 
 class IdGenerator:
     def __init__(self, db_conn, table, column):
@@ -48,6 +51,8 @@ def _load_current_id(db_conn, table, column, step=1):
     Returns:
         int
     """
+    # debug logging for https://github.com/matrix-org/synapse/issues/7968
+    logger.info("initialising stream generator for %s(%s)", table, column)
     cur = db_conn.cursor()
     if step == 1:
         cur.execute("SELECT MAX(%s) FROM %s" % (column, table))