summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorAndrew Morgan <1342360+anoadragon453@users.noreply.github.com>2021-11-15 17:34:15 +0000
committerGitHub <noreply@github.com>2021-11-15 17:34:15 +0000
commit9c59e117db6b448a1e930365014b043fa7ef26b6 (patch)
treef00186741bb0ae33426e18f9fc3e2a0b65db0750 /synapse
parentMove sql file for `remove_deleted_devices_from_device_inbox` into v65 (#11303) (diff)
downloadsynapse-9c59e117db6b448a1e930365014b043fa7ef26b6.tar.xz
Run _upgrade_existing_database on workers if at current schema_version (#11346)
Co-authored-by: Richard van der Hoff <1389908+richvdh@users.noreply.github.com>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/prepare_database.py40
1 files changed, 22 insertions, 18 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 8b9c6adae2..e45adfcb55 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -131,24 +131,16 @@ def prepare_database(
                     "config==None in prepare_database, but database is not empty"
                 )
 
-            # if it's a worker app, refuse to upgrade the database, to avoid multiple
-            # workers doing it at once.
-            if config.worker.worker_app is None:
-                _upgrade_existing_database(
-                    cur,
-                    version_info,
-                    database_engine,
-                    config,
-                    databases=databases,
-                )
-            elif version_info.current_version < SCHEMA_VERSION:
-                # If the DB is on an older version than we expect then we refuse
-                # to start the worker (as the main process needs to run first to
-                # update the schema).
-                raise UpgradeDatabaseException(
-                    OUTDATED_SCHEMA_ON_WORKER_ERROR
-                    % (SCHEMA_VERSION, version_info.current_version)
-                )
+            # This should be run on all processes, master or worker. The master will
+            # apply the deltas, while workers will check if any outstanding deltas
+            # exist and raise an PrepareDatabaseException if they do.
+            _upgrade_existing_database(
+                cur,
+                version_info,
+                database_engine,
+                config,
+                databases=databases,
+            )
 
         else:
             logger.info("%r: Initialising new database", databases)
@@ -358,6 +350,18 @@ def _upgrade_existing_database(
 
     is_worker = config and config.worker.worker_app is not None
 
+    # If the schema version needs to be updated, and we are on a worker, we immediately
+    # know to bail out as workers cannot update the database schema. Only one process
+    # must update the database at the time, therefore we delegate this task to the master.
+    if is_worker and current_schema_state.current_version < SCHEMA_VERSION:
+        # If the DB is on an older version than we expect then we refuse
+        # to start the worker (as the main process needs to run first to
+        # update the schema).
+        raise UpgradeDatabaseException(
+            OUTDATED_SCHEMA_ON_WORKER_ERROR
+            % (SCHEMA_VERSION, current_schema_state.current_version)
+        )
+
     if (
         current_schema_state.compat_version is not None
         and current_schema_state.compat_version > SCHEMA_VERSION