From 6efa6740044bc240691115135660d901db358ce9 Mon Sep 17 00:00:00 2001 From: Patrick Cloke Date: Thu, 27 Apr 2023 08:44:53 -0400 Subject: Add type hints to schema deltas (#15497) Cleans-up the schema delta files: * Removes no-op functions. * Adds missing type hints to function parameters. * Fixes any issues with type hints. This also renames one (very old) schema delta to avoid a conflict that mypy complains about. --- changelog.d/15497.misc | 1 + mypy.ini | 10 --- synapse/storage/prepare_database.py | 8 +- synapse/storage/schema/main/delta/20/pushers.py | 13 ++-- synapse/storage/schema/main/delta/25/fts.py | 9 +-- synapse/storage/schema/main/delta/27/ts.py | 8 +- synapse/storage/schema/main/delta/30/as_users.py | 16 +++- synapse/storage/schema/main/delta/31/pushers.py | 86 ---------------------- synapse/storage/schema/main/delta/31/pushers_0.py | 85 +++++++++++++++++++++ .../storage/schema/main/delta/31/search_update.py | 9 +-- .../storage/schema/main/delta/33/event_fields.py | 8 +- .../schema/main/delta/33/remote_media_ts.py | 12 ++- .../storage/schema/main/delta/34/cache_stream.py | 9 +-- .../schema/main/delta/34/received_txn_purge.py | 9 +-- .../schema/main/delta/37/remove_auth_idx.py | 9 +-- synapse/storage/schema/main/delta/42/user_dir.py | 9 +-- .../schema/main/delta/48/group_unique_indexes.py | 10 +-- .../main/delta/50/make_event_content_nullable.py | 17 +++-- .../main/delta/56/unique_user_filter_index.py | 9 +-- .../main/delta/57/local_current_membership.py | 13 +++- .../schema/main/delta/58/06dlols_unique_idx.py | 8 +- .../storage/schema/main/delta/58/11user_id_seq.py | 9 +-- .../storage/schema/main/delta/59/01ignored_user.py | 8 +- .../schema/main/delta/61/03recreate_min_depth.py | 8 +- .../delta/68/05partial_state_rooms_triggers.py | 4 +- .../storage/schema/main/delta/69/01as_txn_seq.py | 7 +- .../main/delta/72/03bg_populate_events_columns.py | 9 ++- ...force_update_current_state_events_membership.py | 9 ++- .../delta/73/10_update_sqlite_fts4_tokenizer.py | 6 +- ...ership_tables_event_stream_ordering_triggers.py | 4 +- .../schema/state/delta/47/state_group_seq.py | 10 +-- 31 files changed, 210 insertions(+), 222 deletions(-) create mode 100644 changelog.d/15497.misc delete mode 100644 synapse/storage/schema/main/delta/31/pushers.py create mode 100644 synapse/storage/schema/main/delta/31/pushers_0.py diff --git a/changelog.d/15497.misc b/changelog.d/15497.misc new file mode 100644 index 0000000000..93ceaeafc9 --- /dev/null +++ b/changelog.d/15497.misc @@ -0,0 +1 @@ +Improve type hints. diff --git a/mypy.ini b/mypy.ini index 3b17c59dfc..5e7057cfb7 100644 --- a/mypy.ini +++ b/mypy.ini @@ -21,16 +21,6 @@ files = tests/, build_rust.py -# Note: Better exclusion syntax coming in mypy > 0.910 -# https://github.com/python/mypy/pull/11329 -# -# For now, set the (?x) flag enable "verbose" regexes -# https://docs.python.org/3/library/re.html#re.X -exclude = (?x) - ^( - |synapse/storage/schema/ - )$ - [mypy-synapse.metrics._reactor_metrics] # This module imports select.epoll. That exists on Linux, but doesn't on macOS. # See https://github.com/matrix-org/synapse/pull/11771. diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index 2a1c6fa31b..38b7abd801 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -22,7 +22,7 @@ import attr from typing_extensions import Counter as CounterType from synapse.config.homeserver import HomeServerConfig -from synapse.storage.database import LoggingDatabaseConnection +from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION from synapse.storage.types import Cursor @@ -168,7 +168,9 @@ def prepare_database( def _setup_new_database( - cur: Cursor, database_engine: BaseDatabaseEngine, databases: Collection[str] + cur: LoggingTransaction, + database_engine: BaseDatabaseEngine, + databases: Collection[str], ) -> None: """Sets up the physical database by finding a base set of "full schemas" and then applying any necessary deltas, including schemas from the given data @@ -289,7 +291,7 @@ def _setup_new_database( def _upgrade_existing_database( - cur: Cursor, + cur: LoggingTransaction, current_schema_state: _SchemaState, database_engine: BaseDatabaseEngine, config: Optional[HomeServerConfig], diff --git a/synapse/storage/schema/main/delta/20/pushers.py b/synapse/storage/schema/main/delta/20/pushers.py index 45b846e6a7..08ae0efc21 100644 --- a/synapse/storage/schema/main/delta/20/pushers.py +++ b/synapse/storage/schema/main/delta/20/pushers.py @@ -24,10 +24,13 @@ UTF-8 bytes, so we have to do it in Python. import logging +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine + logger = logging.getLogger(__name__) -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: logger.info("Porting pushers table...") cur.execute( """ @@ -61,8 +64,8 @@ def run_create(cur, database_engine, *args, **kwargs): """ ) count = 0 - for row in cur.fetchall(): - row = list(row) + for tuple_row in cur.fetchall(): + row = list(tuple_row) row[8] = bytes(row[8]).decode("utf-8") row[11] = bytes(row[11]).decode("utf-8") cur.execute( @@ -81,7 +84,3 @@ def run_create(cur, database_engine, *args, **kwargs): cur.execute("DROP TABLE pushers") cur.execute("ALTER TABLE pushers2 RENAME TO pushers") logger.info("Moved %d pushers to new table", count) - - -def run_upgrade(*args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/25/fts.py b/synapse/storage/schema/main/delta/25/fts.py index 21f57825d4..831f8e914d 100644 --- a/synapse/storage/schema/main/delta/25/fts.py +++ b/synapse/storage/schema/main/delta/25/fts.py @@ -14,7 +14,8 @@ import json import logging -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -41,7 +42,7 @@ SQLITE_TABLE = ( ) -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if isinstance(database_engine, PostgresEngine): for statement in get_statements(POSTGRES_TABLE.splitlines()): cur.execute(statement) @@ -72,7 +73,3 @@ def run_create(cur, database_engine, *args, **kwargs): ) cur.execute(sql, ("event_search", progress_json)) - - -def run_upgrade(*args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/27/ts.py b/synapse/storage/schema/main/delta/27/ts.py index 1c6058063f..8962afdeda 100644 --- a/synapse/storage/schema/main/delta/27/ts.py +++ b/synapse/storage/schema/main/delta/27/ts.py @@ -14,6 +14,8 @@ import json import logging +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -25,7 +27,7 @@ ALTER_TABLE = ( ) -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: for statement in get_statements(ALTER_TABLE.splitlines()): cur.execute(statement) @@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs): ) cur.execute(sql, ("event_origin_server_ts", progress_json)) - - -def run_upgrade(*args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/30/as_users.py b/synapse/storage/schema/main/delta/30/as_users.py index 4b4b166e37..b9d8df1231 100644 --- a/synapse/storage/schema/main/delta/30/as_users.py +++ b/synapse/storage/schema/main/delta/30/as_users.py @@ -12,13 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging +from typing import Dict, Iterable, List, Tuple, cast from synapse.config.appservice import load_appservices +from synapse.config.homeserver import HomeServerConfig +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine logger = logging.getLogger(__name__) -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: # NULL indicates user was not registered by an appservice. try: cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT") @@ -27,9 +31,13 @@ def run_create(cur, database_engine, *args, **kwargs): pass -def run_upgrade(cur, database_engine, config, *args, **kwargs): +def run_upgrade( + cur: LoggingTransaction, + database_engine: BaseDatabaseEngine, + config: HomeServerConfig, +) -> None: cur.execute("SELECT name FROM users") - rows = cur.fetchall() + rows = cast(Iterable[Tuple[str]], cur.fetchall()) config_files = [] try: @@ -39,7 +47,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): appservices = load_appservices(config.server.server_name, config_files) - owned = {} + owned: Dict[str, List[str]] = {} for row in rows: user_id = row[0] diff --git a/synapse/storage/schema/main/delta/31/pushers.py b/synapse/storage/schema/main/delta/31/pushers.py deleted file mode 100644 index 5be81c806a..0000000000 --- a/synapse/storage/schema/main/delta/31/pushers.py +++ /dev/null @@ -1,86 +0,0 @@ -# Copyright 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -# Change the last_token to last_stream_ordering now that pushers no longer -# listen on an event stream but instead select out of the event_push_actions -# table. - - -import logging - -logger = logging.getLogger(__name__) - - -def token_to_stream_ordering(token): - return int(token[1:].split("_")[0]) - - -def run_create(cur, database_engine, *args, **kwargs): - logger.info("Porting pushers table, delta 31...") - cur.execute( - """ - CREATE TABLE IF NOT EXISTS pushers2 ( - id BIGINT PRIMARY KEY, - user_name TEXT NOT NULL, - access_token BIGINT DEFAULT NULL, - profile_tag VARCHAR(32) NOT NULL, - kind VARCHAR(8) NOT NULL, - app_id VARCHAR(64) NOT NULL, - app_display_name VARCHAR(64) NOT NULL, - device_display_name VARCHAR(128) NOT NULL, - pushkey TEXT NOT NULL, - ts BIGINT NOT NULL, - lang VARCHAR(8), - data TEXT, - last_stream_ordering INTEGER, - last_success BIGINT, - failing_since BIGINT, - UNIQUE (app_id, pushkey, user_name) - ) - """ - ) - cur.execute( - """SELECT - id, user_name, access_token, profile_tag, kind, - app_id, app_display_name, device_display_name, - pushkey, ts, lang, data, last_token, last_success, - failing_since - FROM pushers - """ - ) - count = 0 - for row in cur.fetchall(): - row = list(row) - row[12] = token_to_stream_ordering(row[12]) - cur.execute( - """ - INSERT into pushers2 ( - id, user_name, access_token, profile_tag, kind, - app_id, app_display_name, device_display_name, - pushkey, ts, lang, data, last_stream_ordering, last_success, - failing_since - ) values (%s) - """ - % (",".join(["?" for _ in range(len(row))])), - row, - ) - count += 1 - cur.execute("DROP TABLE pushers") - cur.execute("ALTER TABLE pushers2 RENAME TO pushers") - logger.info("Moved %d pushers to new table", count) - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/31/pushers_0.py b/synapse/storage/schema/main/delta/31/pushers_0.py new file mode 100644 index 0000000000..e772e2dc65 --- /dev/null +++ b/synapse/storage/schema/main/delta/31/pushers_0.py @@ -0,0 +1,85 @@ +# Copyright 2016 OpenMarket Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + + +# Change the last_token to last_stream_ordering now that pushers no longer +# listen on an event stream but instead select out of the event_push_actions +# table. + + +import logging + +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine + +logger = logging.getLogger(__name__) + + +def token_to_stream_ordering(token: str) -> int: + return int(token[1:].split("_")[0]) + + +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: + logger.info("Porting pushers table, delta 31...") + cur.execute( + """ + CREATE TABLE IF NOT EXISTS pushers2 ( + id BIGINT PRIMARY KEY, + user_name TEXT NOT NULL, + access_token BIGINT DEFAULT NULL, + profile_tag VARCHAR(32) NOT NULL, + kind VARCHAR(8) NOT NULL, + app_id VARCHAR(64) NOT NULL, + app_display_name VARCHAR(64) NOT NULL, + device_display_name VARCHAR(128) NOT NULL, + pushkey TEXT NOT NULL, + ts BIGINT NOT NULL, + lang VARCHAR(8), + data TEXT, + last_stream_ordering INTEGER, + last_success BIGINT, + failing_since BIGINT, + UNIQUE (app_id, pushkey, user_name) + ) + """ + ) + cur.execute( + """SELECT + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_token, last_success, + failing_since + FROM pushers + """ + ) + count = 0 + for tuple_row in cur.fetchall(): + row = list(tuple_row) + row[12] = token_to_stream_ordering(row[12]) + cur.execute( + """ + INSERT into pushers2 ( + id, user_name, access_token, profile_tag, kind, + app_id, app_display_name, device_display_name, + pushkey, ts, lang, data, last_stream_ordering, last_success, + failing_since + ) values (%s) + """ + % (",".join(["?" for _ in range(len(row))])), + row, + ) + count += 1 + cur.execute("DROP TABLE pushers") + cur.execute("ALTER TABLE pushers2 RENAME TO pushers") + logger.info("Moved %d pushers to new table", count) diff --git a/synapse/storage/schema/main/delta/31/search_update.py b/synapse/storage/schema/main/delta/31/search_update.py index b84c844e3a..e20e92e454 100644 --- a/synapse/storage/schema/main/delta/31/search_update.py +++ b/synapse/storage/schema/main/delta/31/search_update.py @@ -14,7 +14,8 @@ import json import logging -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -26,7 +27,7 @@ ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT; """ -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if not isinstance(database_engine, PostgresEngine): return @@ -56,7 +57,3 @@ def run_create(cur, database_engine, *args, **kwargs): ) cur.execute(sql, ("event_search_order", progress_json)) - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/33/event_fields.py b/synapse/storage/schema/main/delta/33/event_fields.py index e928c66a8f..8d806f5b52 100644 --- a/synapse/storage/schema/main/delta/33/event_fields.py +++ b/synapse/storage/schema/main/delta/33/event_fields.py @@ -14,6 +14,8 @@ import json import logging +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -25,7 +27,7 @@ ALTER TABLE events ADD COLUMN contains_url BOOLEAN; """ -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: for statement in get_statements(ALTER_TABLE.splitlines()): cur.execute(statement) @@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs): ) cur.execute(sql, ("event_fields_sender_url", progress_json)) - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/33/remote_media_ts.py b/synapse/storage/schema/main/delta/33/remote_media_ts.py index 3907189e29..35499e43b5 100644 --- a/synapse/storage/schema/main/delta/33/remote_media_ts.py +++ b/synapse/storage/schema/main/delta/33/remote_media_ts.py @@ -14,14 +14,22 @@ import time +from synapse.config.homeserver import HomeServerConfig +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine + ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT" -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: cur.execute(ALTER_TABLE) -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_upgrade( + cur: LoggingTransaction, + database_engine: BaseDatabaseEngine, + config: HomeServerConfig, +) -> None: cur.execute( "UPDATE remote_media_cache SET last_access_ts = ?", (int(time.time() * 1000),), diff --git a/synapse/storage/schema/main/delta/34/cache_stream.py b/synapse/storage/schema/main/delta/34/cache_stream.py index cf09e43e2b..682c86da1a 100644 --- a/synapse/storage/schema/main/delta/34/cache_stream.py +++ b/synapse/storage/schema/main/delta/34/cache_stream.py @@ -14,7 +14,8 @@ import logging -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -34,13 +35,9 @@ CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id """ -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if not isinstance(database_engine, PostgresEngine): return for statement in get_statements(CREATE_TABLE.splitlines()): cur.execute(statement) - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/34/received_txn_purge.py b/synapse/storage/schema/main/delta/34/received_txn_purge.py index 67d505e68b..dcfe3bc45a 100644 --- a/synapse/storage/schema/main/delta/34/received_txn_purge.py +++ b/synapse/storage/schema/main/delta/34/received_txn_purge.py @@ -14,19 +14,16 @@ import logging -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine logger = logging.getLogger(__name__) -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if isinstance(database_engine, PostgresEngine): cur.execute("TRUNCATE received_transactions") else: cur.execute("DELETE FROM received_transactions") cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)") - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/37/remove_auth_idx.py b/synapse/storage/schema/main/delta/37/remove_auth_idx.py index a377884169..d672f9b43c 100644 --- a/synapse/storage/schema/main/delta/37/remove_auth_idx.py +++ b/synapse/storage/schema/main/delta/37/remove_auth_idx.py @@ -14,7 +14,8 @@ import logging -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -68,7 +69,7 @@ CREATE INDEX evauth_edges_id ON event_auth(event_id); """ -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: for statement in get_statements(DROP_INDICES.splitlines()): cur.execute(statement) @@ -79,7 +80,3 @@ def run_create(cur, database_engine, *args, **kwargs): for statement in get_statements(drop_constraint.splitlines()): cur.execute(statement) - - -def run_upgrade(cur, database_engine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/42/user_dir.py b/synapse/storage/schema/main/delta/42/user_dir.py index 506f326f4d..7e5c307c62 100644 --- a/synapse/storage/schema/main/delta/42/user_dir.py +++ b/synapse/storage/schema/main/delta/42/user_dir.py @@ -14,7 +14,8 @@ import logging -from synapse.storage.engines import PostgresEngine, Sqlite3Engine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine from synapse.storage.prepare_database import get_statements logger = logging.getLogger(__name__) @@ -66,7 +67,7 @@ CREATE VIRTUAL TABLE user_directory_search """ -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: for statement in get_statements(BOTH_TABLES.splitlines()): cur.execute(statement) @@ -78,7 +79,3 @@ def run_create(cur, database_engine, *args, **kwargs): cur.execute(statement) else: raise Exception("Unrecognized database engine") - - -def run_upgrade(*args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py index 49f5f2c003..ad2da4c8af 100644 --- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py +++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py @@ -12,7 +12,9 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.engines import PostgresEngine + +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.prepare_database import get_statements FIX_INDEXES = """ @@ -34,7 +36,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id); """ -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid" # remove duplicates from group_users & group_invites tables @@ -57,7 +59,3 @@ def run_create(cur, database_engine, *args, **kwargs): for statement in get_statements(FIX_INDEXES.splitlines()): cur.execute(statement) - - -def run_upgrade(*args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/50/make_event_content_nullable.py b/synapse/storage/schema/main/delta/50/make_event_content_nullable.py index acd6ad1e1f..3e8a348b8a 100644 --- a/synapse/storage/schema/main/delta/50/make_event_content_nullable.py +++ b/synapse/storage/schema/main/delta/50/make_event_content_nullable.py @@ -53,16 +53,13 @@ SQLite: import logging -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine logger = logging.getLogger(__name__) -def run_create(cur, database_engine, *args, **kwargs): - pass - - -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if isinstance(database_engine, PostgresEngine): cur.execute( """ @@ -76,7 +73,9 @@ def run_upgrade(cur, database_engine, *args, **kwargs): cur.execute( "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'" ) - (oldsql,) = cur.fetchone() + row = cur.fetchone() + assert row is not None + (oldsql,) = row sql = oldsql.replace("content TEXT NOT NULL", "content TEXT") if sql == oldsql: @@ -85,7 +84,9 @@ def run_upgrade(cur, database_engine, *args, **kwargs): logger.info("Replacing definition of 'events' with: %s", sql) cur.execute("PRAGMA schema_version") - (oldver,) = cur.fetchone() + row = cur.fetchone() + assert row is not None + (oldver,) = row cur.execute("PRAGMA writable_schema=ON") cur.execute( "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'", diff --git a/synapse/storage/schema/main/delta/56/unique_user_filter_index.py b/synapse/storage/schema/main/delta/56/unique_user_filter_index.py index bb7296852a..2461f87d77 100644 --- a/synapse/storage/schema/main/delta/56/unique_user_filter_index.py +++ b/synapse/storage/schema/main/delta/56/unique_user_filter_index.py @@ -1,7 +1,8 @@ import logging from io import StringIO -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.prepare_database import execute_statements_from_stream logger = logging.getLogger(__name__) @@ -16,11 +17,7 @@ This migration updates the user_filters table as follows: """ -def run_upgrade(cur, database_engine, *args, **kwargs): - pass - - -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if isinstance(database_engine, PostgresEngine): select_clause = """ SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json diff --git a/synapse/storage/schema/main/delta/57/local_current_membership.py b/synapse/storage/schema/main/delta/57/local_current_membership.py index d25093c19f..cc0f2109bb 100644 --- a/synapse/storage/schema/main/delta/57/local_current_membership.py +++ b/synapse/storage/schema/main/delta/57/local_current_membership.py @@ -27,7 +27,16 @@ # equivalent behaviour as if the server had remained in the room). -def run_upgrade(cur, database_engine, config, *args, **kwargs): +from synapse.config.homeserver import HomeServerConfig +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine + + +def run_upgrade( + cur: LoggingTransaction, + database_engine: BaseDatabaseEngine, + config: HomeServerConfig, +) -> None: # We need to do the insert in `run_upgrade` section as we don't have access # to `config` in `run_create`. @@ -77,7 +86,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs): ) -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: cur.execute( """ CREATE TABLE local_current_membership ( diff --git a/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py b/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py index d353f2bcb3..4eaab9e086 100644 --- a/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py +++ b/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py @@ -20,18 +20,14 @@ entries, and with a UNIQUE index. import logging from io import StringIO +from synapse.storage.database import LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine from synapse.storage.prepare_database import execute_statements_from_stream -from synapse.storage.types import Cursor logger = logging.getLogger(__name__) -def run_upgrade(*args, **kwargs): - pass - - -def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: # some instances might already have this index, in which case we can skip this if isinstance(database_engine, PostgresEngine): cur.execute( diff --git a/synapse/storage/schema/main/delta/58/11user_id_seq.py b/synapse/storage/schema/main/delta/58/11user_id_seq.py index 4310ec12ce..32f7e0a252 100644 --- a/synapse/storage/schema/main/delta/58/11user_id_seq.py +++ b/synapse/storage/schema/main/delta/58/11user_id_seq.py @@ -16,19 +16,16 @@ Adds a postgres SEQUENCE for generating guest user IDs. """ +from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main.registration import ( find_max_generated_user_id_localpart, ) -from synapse.storage.engines import PostgresEngine +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if not isinstance(database_engine, PostgresEngine): return next_id = find_max_generated_user_id_localpart(cur) + 1 cur.execute("CREATE SEQUENCE user_id_seq START WITH %s", (next_id,)) - - -def run_upgrade(*args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/59/01ignored_user.py b/synapse/storage/schema/main/delta/59/01ignored_user.py index 9e8f35c1d2..c53e2bade2 100644 --- a/synapse/storage/schema/main/delta/59/01ignored_user.py +++ b/synapse/storage/schema/main/delta/59/01ignored_user.py @@ -20,18 +20,14 @@ import logging from io import StringIO from synapse.storage._base import db_to_json +from synapse.storage.database import LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine from synapse.storage.prepare_database import execute_statements_from_stream -from synapse.storage.types import Cursor logger = logging.getLogger(__name__) -def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): - pass - - -def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: logger.info("Creating ignored_users table") execute_statements_from_stream(cur, StringIO(_create_commands)) diff --git a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py index f8d7db9f2e..4a06b65888 100644 --- a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py +++ b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py @@ -16,11 +16,11 @@ This migration handles the process of changing the type of `room_depth.min_depth` to a BIGINT. """ +from synapse.storage.database import LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -from synapse.storage.types import Cursor -def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if not isinstance(database_engine, PostgresEngine): # this only applies to postgres - sqlite does not distinguish between big and # little ints. @@ -64,7 +64,3 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs (6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2') """ ) - - -def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): - pass diff --git a/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py b/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py index a2ec4fc26e..9210026dde 100644 --- a/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py +++ b/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py @@ -18,11 +18,11 @@ This migration adds triggers to the partial_state_events tables to enforce uniqu Triggers cannot be expressed in .sql files, so we have to use a separate file. """ +from synapse.storage.database import LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine -from synapse.storage.types import Cursor -def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: # complain if the room_id in partial_state_events doesn't match # that in `events`. We already have a fk constraint which ensures that the event # exists in `events`, so all we have to do is raise if there is a row with a diff --git a/synapse/storage/schema/main/delta/69/01as_txn_seq.py b/synapse/storage/schema/main/delta/69/01as_txn_seq.py index 24bd4b391e..6c112425f2 100644 --- a/synapse/storage/schema/main/delta/69/01as_txn_seq.py +++ b/synapse/storage/schema/main/delta/69/01as_txn_seq.py @@ -17,10 +17,11 @@ Adds a postgres SEQUENCE for generating application service transaction IDs. """ -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if isinstance(database_engine, PostgresEngine): # If we already have some AS TXNs we want to start from the current # maximum value. There are two potential places this is stored - the @@ -30,10 +31,12 @@ def run_create(cur, database_engine, *args, **kwargs): cur.execute("SELECT COALESCE(max(txn_id), 0) FROM application_services_txns") row = cur.fetchone() + assert row is not None txn_max = row[0] cur.execute("SELECT COALESCE(max(last_txn), 0) FROM application_services_state") row = cur.fetchone() + assert row is not None last_txn_max = row[0] start_val = max(last_txn_max, txn_max) + 1 diff --git a/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py index 55a5d092cc..2ec1830c6f 100644 --- a/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py +++ b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py @@ -14,10 +14,11 @@ import json -from synapse.storage.types import Cursor +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine -def run_create(cur: Cursor, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: """Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`""" # we know that any new events will have the columns populated (and that has been @@ -27,7 +28,9 @@ def run_create(cur: Cursor, database_engine, *args, **kwargs): # current min and max stream orderings, since that is guaranteed to include all # the events that were stored before the new columns were added. cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events") - (min_stream_ordering, max_stream_ordering) = cur.fetchone() + row = cur.fetchone() + assert row is not None + (min_stream_ordering, max_stream_ordering) = row if min_stream_ordering is None: # no rows, nothing to do. diff --git a/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py index b5853d125c..5c3e3584a2 100644 --- a/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py +++ b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py @@ -19,9 +19,16 @@ for its completion can be removed. Note the background job must still remain defined in the database class. """ +from synapse.config.homeserver import HomeServerConfig +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine -def run_upgrade(cur, database_engine, *args, **kwargs): +def run_upgrade( + cur: LoggingTransaction, + database_engine: BaseDatabaseEngine, + config: HomeServerConfig, +) -> None: cur.execute("SELECT update_name FROM background_updates") rows = cur.fetchall() for row in rows: diff --git a/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py b/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py index 3de0a709eb..c7ed258e9d 100644 --- a/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py +++ b/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py @@ -13,11 +13,11 @@ # limitations under the License. import json +from synapse.storage.database import LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine, Sqlite3Engine -from synapse.storage.types import Cursor -def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None: +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: """ Upgrade the event_search table to use the porter tokenizer if it isn't already @@ -38,6 +38,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None: # Re-run the background job to re-populate the event_search table. cur.execute("SELECT MIN(stream_ordering) FROM events") row = cur.fetchone() + assert row is not None min_stream_id = row[0] # If there are not any events, nothing to do. @@ -46,6 +47,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None: cur.execute("SELECT MAX(stream_ordering) FROM events") row = cur.fetchone() + assert row is not None max_stream_id = row[0] progress = { diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py index e32e9083b3..2ee2bc9422 100644 --- a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py +++ b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py @@ -17,11 +17,11 @@ This migration adds triggers to the room membership tables to enforce consistency. Triggers cannot be expressed in .sql files, so we have to use a separate file. """ +from synapse.storage.database import LoggingTransaction from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine -from synapse.storage.types import Cursor -def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: # Complain if the `event_stream_ordering` in membership tables doesn't match # the `stream_ordering` row with the same `event_id` in `events`. if isinstance(database_engine, Sqlite3Engine): diff --git a/synapse/storage/schema/state/delta/47/state_group_seq.py b/synapse/storage/schema/state/delta/47/state_group_seq.py index 9fd1ccf6f7..42aff50227 100644 --- a/synapse/storage/schema/state/delta/47/state_group_seq.py +++ b/synapse/storage/schema/state/delta/47/state_group_seq.py @@ -12,15 +12,17 @@ # See the License for the specific language governing permissions and # limitations under the License. -from synapse.storage.engines import PostgresEngine +from synapse.storage.database import LoggingTransaction +from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine -def run_create(cur, database_engine, *args, **kwargs): +def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None: if isinstance(database_engine, PostgresEngine): # if we already have some state groups, we want to start making new # ones with a higher id. cur.execute("SELECT max(id) FROM state_groups") row = cur.fetchone() + assert row is not None if row[0] is None: start_val = 1 @@ -28,7 +30,3 @@ def run_create(cur, database_engine, *args, **kwargs): start_val = row[0] + 1 cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,)) - - -def run_upgrade(*args, **kwargs): - pass -- cgit 1.4.1