summary refs log tree commit diff
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-11-16 13:55:44 +0000
committerAndrew Morgan <andrew@amorgan.xyz>2021-11-16 13:55:44 +0000
commitce06cb59c313ce705c5e7e14642e46b2ec1b8918 (patch)
tree1d36d5259c6339f181d8d6c2df17be0d3ca9ee4f
parentMerge branch 'release-v1.47' into matrix-org-hotfixes (diff)
parentRename `remove_deleted_devices_from_device_inbox` to ensure it is always run ... (diff)
downloadsynapse-ce06cb59c313ce705c5e7e14642e46b2ec1b8918.tar.xz
Merge branch 'release-v1.47' of github.com:matrix-org/synapse into matrix-org-hotfixes
-rw-r--r--changelog.d/11346.bugfix1
-rw-r--r--changelog.d/11353.misc1
-rw-r--r--synapse/storage/prepare_database.py40
-rw-r--r--synapse/storage/schema/main/delta/65/06remove_deleted_devices_from_device_inbox.sql (renamed from synapse/storage/schema/main/delta/65/05remove_deleted_devices_from_device_inbox.sql)14
-rw-r--r--tests/storage/test_rollback_worker.py52
5 files changed, 88 insertions, 20 deletions
diff --git a/changelog.d/11346.bugfix b/changelog.d/11346.bugfix
new file mode 100644
index 0000000000..1fe8020eab
--- /dev/null
+++ b/changelog.d/11346.bugfix
@@ -0,0 +1 @@
+Fix a bug introduced in v1.47.0rc1 which caused worker processes to not halt startup in the presence of outstanding database migrations.
\ No newline at end of file
diff --git a/changelog.d/11353.misc b/changelog.d/11353.misc
new file mode 100644
index 0000000000..fa96dae919
--- /dev/null
+++ b/changelog.d/11353.misc
@@ -0,0 +1 @@
+Fix an issue which prevented the 'remove deleted devices from `device_inbox` column' background process from running when updating from a recent Synapse version.
\ No newline at end of file
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 8b9c6adae2..e45adfcb55 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -131,24 +131,16 @@ def prepare_database(
                     "config==None in prepare_database, but database is not empty"
                 )
 
-            # if it's a worker app, refuse to upgrade the database, to avoid multiple
-            # workers doing it at once.
-            if config.worker.worker_app is None:
-                _upgrade_existing_database(
-                    cur,
-                    version_info,
-                    database_engine,
-                    config,
-                    databases=databases,
-                )
-            elif version_info.current_version < SCHEMA_VERSION:
-                # If the DB is on an older version than we expect then we refuse
-                # to start the worker (as the main process needs to run first to
-                # update the schema).
-                raise UpgradeDatabaseException(
-                    OUTDATED_SCHEMA_ON_WORKER_ERROR
-                    % (SCHEMA_VERSION, version_info.current_version)
-                )
+            # This should be run on all processes, master or worker. The master will
+            # apply the deltas, while workers will check if any outstanding deltas
+            # exist and raise an PrepareDatabaseException if they do.
+            _upgrade_existing_database(
+                cur,
+                version_info,
+                database_engine,
+                config,
+                databases=databases,
+            )
 
         else:
             logger.info("%r: Initialising new database", databases)
@@ -358,6 +350,18 @@ def _upgrade_existing_database(
 
     is_worker = config and config.worker.worker_app is not None
 
+    # If the schema version needs to be updated, and we are on a worker, we immediately
+    # know to bail out as workers cannot update the database schema. Only one process
+    # must update the database at the time, therefore we delegate this task to the master.
+    if is_worker and current_schema_state.current_version < SCHEMA_VERSION:
+        # If the DB is on an older version than we expect then we refuse
+        # to start the worker (as the main process needs to run first to
+        # update the schema).
+        raise UpgradeDatabaseException(
+            OUTDATED_SCHEMA_ON_WORKER_ERROR
+            % (SCHEMA_VERSION, current_schema_state.current_version)
+        )
+
     if (
         current_schema_state.compat_version is not None
         and current_schema_state.compat_version > SCHEMA_VERSION
diff --git a/synapse/storage/schema/main/delta/65/05remove_deleted_devices_from_device_inbox.sql b/synapse/storage/schema/main/delta/65/06remove_deleted_devices_from_device_inbox.sql
index 076179123d..82f6408b36 100644
--- a/synapse/storage/schema/main/delta/65/05remove_deleted_devices_from_device_inbox.sql
+++ b/synapse/storage/schema/main/delta/65/06remove_deleted_devices_from_device_inbox.sql
@@ -18,5 +18,17 @@
 -- when a device was deleted using Synapse earlier than 1.47.0.
 -- This runs as background task, but may take a bit to finish.
 
+-- Remove any existing instances of this job running. It's OK to stop and restart this job,
+-- as it's just deleting entries from a table - no progress will be lost.
+--
+-- This is necessary due a similar migration running the job accidentally
+-- being included in schema version 64 during v1.47.0rc1,rc2. If a
+-- homeserver had updated from Synapse <=v1.45.0 (schema version <=64),
+-- then they would have started running this background update already.
+-- If that update was still running, then simply inserting it again would
+-- cause an SQL failure. So we effectively do an "upsert" here instead.
+
+DELETE FROM background_updates WHERE update_name = 'remove_deleted_devices_from_device_inbox';
+
 INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
-  (6505, 'remove_deleted_devices_from_device_inbox', '{}');
+  (6506, 'remove_deleted_devices_from_device_inbox', '{}');
diff --git a/tests/storage/test_rollback_worker.py b/tests/storage/test_rollback_worker.py
index a6be9a1bb1..0ce0892165 100644
--- a/tests/storage/test_rollback_worker.py
+++ b/tests/storage/test_rollback_worker.py
@@ -11,6 +11,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import List
+from unittest import mock
+
 from synapse.app.generic_worker import GenericWorkerServer
 from synapse.storage.database import LoggingDatabaseConnection
 from synapse.storage.prepare_database import PrepareDatabaseException, prepare_database
@@ -19,6 +22,22 @@ from synapse.storage.schema import SCHEMA_VERSION
 from tests.unittest import HomeserverTestCase
 
 
+def fake_listdir(filepath: str) -> List[str]:
+    """
+    A fake implementation of os.listdir which we can use to mock out the filesystem.
+
+    Args:
+        filepath: The directory to list files for.
+
+    Returns:
+        A list of files and folders in the directory.
+    """
+    if filepath.endswith("full_schemas"):
+        return [SCHEMA_VERSION]
+
+    return ["99_add_unicorn_to_database.sql"]
+
+
 class WorkerSchemaTests(HomeserverTestCase):
     def make_homeserver(self, reactor, clock):
         hs = self.setup_test_homeserver(
@@ -51,7 +70,7 @@ class WorkerSchemaTests(HomeserverTestCase):
 
         prepare_database(db_conn, db_pool.engine, self.hs.config)
 
-    def test_not_upgraded(self):
+    def test_not_upgraded_old_schema_version(self):
         """Test that workers don't start if the DB has an older schema version"""
         db_pool = self.hs.get_datastore().db_pool
         db_conn = LoggingDatabaseConnection(
@@ -67,3 +86,34 @@ class WorkerSchemaTests(HomeserverTestCase):
 
         with self.assertRaises(PrepareDatabaseException):
             prepare_database(db_conn, db_pool.engine, self.hs.config)
+
+    def test_not_upgraded_current_schema_version_with_outstanding_deltas(self):
+        """
+        Test that workers don't start if the DB is on the current schema version,
+        but there are still outstanding delta migrations to run.
+        """
+        db_pool = self.hs.get_datastore().db_pool
+        db_conn = LoggingDatabaseConnection(
+            db_pool._db_pool.connect(),
+            db_pool.engine,
+            "tests",
+        )
+
+        # Set the schema version of the database to the current version
+        cur = db_conn.cursor()
+        cur.execute("UPDATE schema_version SET version = ?", (SCHEMA_VERSION,))
+
+        db_conn.commit()
+
+        # Path `os.listdir` here to make synapse think that there is a migration
+        # file ready to be run.
+        # Note that we can't patch this function for the whole method, else Synapse
+        # will try to find the file when building the database initially.
+        with mock.patch("os.listdir", mock.Mock(side_effect=fake_listdir)):
+            with self.assertRaises(PrepareDatabaseException):
+                # Synapse should think that there is an outstanding migration file due to
+                # patching 'os.listdir' in the function decorator.
+                #
+                # We expect Synapse to raise an exception to indicate the master process
+                # needs to apply this migration file.
+                prepare_database(db_conn, db_pool.engine, self.hs.config)