summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/prepare_database.py8
-rw-r--r--synapse/storage/schema/main/delta/20/pushers.py13
-rw-r--r--synapse/storage/schema/main/delta/25/fts.py9
-rw-r--r--synapse/storage/schema/main/delta/27/ts.py8
-rw-r--r--synapse/storage/schema/main/delta/30/as_users.py16
-rw-r--r--synapse/storage/schema/main/delta/31/pushers_0.py (renamed from synapse/storage/schema/main/delta/31/pushers.py)15
-rw-r--r--synapse/storage/schema/main/delta/31/search_update.py9
-rw-r--r--synapse/storage/schema/main/delta/33/event_fields.py8
-rw-r--r--synapse/storage/schema/main/delta/33/remote_media_ts.py12
-rw-r--r--synapse/storage/schema/main/delta/34/cache_stream.py9
-rw-r--r--synapse/storage/schema/main/delta/34/received_txn_purge.py9
-rw-r--r--synapse/storage/schema/main/delta/37/remove_auth_idx.py9
-rw-r--r--synapse/storage/schema/main/delta/42/user_dir.py9
-rw-r--r--synapse/storage/schema/main/delta/48/group_unique_indexes.py10
-rw-r--r--synapse/storage/schema/main/delta/50/make_event_content_nullable.py17
-rw-r--r--synapse/storage/schema/main/delta/56/unique_user_filter_index.py9
-rw-r--r--synapse/storage/schema/main/delta/57/local_current_membership.py13
-rw-r--r--synapse/storage/schema/main/delta/58/06dlols_unique_idx.py8
-rw-r--r--synapse/storage/schema/main/delta/58/11user_id_seq.py9
-rw-r--r--synapse/storage/schema/main/delta/59/01ignored_user.py8
-rw-r--r--synapse/storage/schema/main/delta/61/03recreate_min_depth.py8
-rw-r--r--synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py4
-rw-r--r--synapse/storage/schema/main/delta/69/01as_txn_seq.py7
-rw-r--r--synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py9
-rw-r--r--synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py9
-rw-r--r--synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py6
-rw-r--r--synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py4
-rw-r--r--synapse/storage/schema/state/delta/47/state_group_seq.py10
28 files changed, 131 insertions, 134 deletions
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 2a1c6fa31b..38b7abd801 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -22,7 +22,7 @@ import attr
 from typing_extensions import Counter as CounterType
 
 from synapse.config.homeserver import HomeServerConfig
-from synapse.storage.database import LoggingDatabaseConnection
+from synapse.storage.database import LoggingDatabaseConnection, LoggingTransaction
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.schema import SCHEMA_COMPAT_VERSION, SCHEMA_VERSION
 from synapse.storage.types import Cursor
@@ -168,7 +168,9 @@ def prepare_database(
 
 
 def _setup_new_database(
-    cur: Cursor, database_engine: BaseDatabaseEngine, databases: Collection[str]
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+    databases: Collection[str],
 ) -> None:
     """Sets up the physical database by finding a base set of "full schemas" and
     then applying any necessary deltas, including schemas from the given data
@@ -289,7 +291,7 @@ def _setup_new_database(
 
 
 def _upgrade_existing_database(
-    cur: Cursor,
+    cur: LoggingTransaction,
     current_schema_state: _SchemaState,
     database_engine: BaseDatabaseEngine,
     config: Optional[HomeServerConfig],
diff --git a/synapse/storage/schema/main/delta/20/pushers.py b/synapse/storage/schema/main/delta/20/pushers.py
index 45b846e6a7..08ae0efc21 100644
--- a/synapse/storage/schema/main/delta/20/pushers.py
+++ b/synapse/storage/schema/main/delta/20/pushers.py
@@ -24,10 +24,13 @@ UTF-8 bytes, so we have to do it in Python.
 
 import logging
 
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
+
 logger = logging.getLogger(__name__)
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     logger.info("Porting pushers table...")
     cur.execute(
         """
@@ -61,8 +64,8 @@ def run_create(cur, database_engine, *args, **kwargs):
     """
     )
     count = 0
-    for row in cur.fetchall():
-        row = list(row)
+    for tuple_row in cur.fetchall():
+        row = list(tuple_row)
         row[8] = bytes(row[8]).decode("utf-8")
         row[11] = bytes(row[11]).decode("utf-8")
         cur.execute(
@@ -81,7 +84,3 @@ def run_create(cur, database_engine, *args, **kwargs):
     cur.execute("DROP TABLE pushers")
     cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
     logger.info("Moved %d pushers to new table", count)
-
-
-def run_upgrade(*args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/25/fts.py b/synapse/storage/schema/main/delta/25/fts.py
index 21f57825d4..831f8e914d 100644
--- a/synapse/storage/schema/main/delta/25/fts.py
+++ b/synapse/storage/schema/main/delta/25/fts.py
@@ -14,7 +14,8 @@
 import json
 import logging
 
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -41,7 +42,7 @@ SQLITE_TABLE = (
 )
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if isinstance(database_engine, PostgresEngine):
         for statement in get_statements(POSTGRES_TABLE.splitlines()):
             cur.execute(statement)
@@ -72,7 +73,3 @@ def run_create(cur, database_engine, *args, **kwargs):
         )
 
         cur.execute(sql, ("event_search", progress_json))
-
-
-def run_upgrade(*args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/27/ts.py b/synapse/storage/schema/main/delta/27/ts.py
index 1c6058063f..8962afdeda 100644
--- a/synapse/storage/schema/main/delta/27/ts.py
+++ b/synapse/storage/schema/main/delta/27/ts.py
@@ -14,6 +14,8 @@
 import json
 import logging
 
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -25,7 +27,7 @@ ALTER_TABLE = (
 )
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     for statement in get_statements(ALTER_TABLE.splitlines()):
         cur.execute(statement)
 
@@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs):
         )
 
         cur.execute(sql, ("event_origin_server_ts", progress_json))
-
-
-def run_upgrade(*args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/30/as_users.py b/synapse/storage/schema/main/delta/30/as_users.py
index 4b4b166e37..b9d8df1231 100644
--- a/synapse/storage/schema/main/delta/30/as_users.py
+++ b/synapse/storage/schema/main/delta/30/as_users.py
@@ -12,13 +12,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
+from typing import Dict, Iterable, List, Tuple, cast
 
 from synapse.config.appservice import load_appservices
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
 
 logger = logging.getLogger(__name__)
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     # NULL indicates user was not registered by an appservice.
     try:
         cur.execute("ALTER TABLE users ADD COLUMN appservice_id TEXT")
@@ -27,9 +31,13 @@ def run_create(cur, database_engine, *args, **kwargs):
         pass
 
 
-def run_upgrade(cur, database_engine, config, *args, **kwargs):
+def run_upgrade(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+    config: HomeServerConfig,
+) -> None:
     cur.execute("SELECT name FROM users")
-    rows = cur.fetchall()
+    rows = cast(Iterable[Tuple[str]], cur.fetchall())
 
     config_files = []
     try:
@@ -39,7 +47,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
 
     appservices = load_appservices(config.server.server_name, config_files)
 
-    owned = {}
+    owned: Dict[str, List[str]] = {}
 
     for row in rows:
         user_id = row[0]
diff --git a/synapse/storage/schema/main/delta/31/pushers.py b/synapse/storage/schema/main/delta/31/pushers_0.py
index 5be81c806a..e772e2dc65 100644
--- a/synapse/storage/schema/main/delta/31/pushers.py
+++ b/synapse/storage/schema/main/delta/31/pushers_0.py
@@ -20,14 +20,17 @@
 
 import logging
 
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
+
 logger = logging.getLogger(__name__)
 
 
-def token_to_stream_ordering(token):
+def token_to_stream_ordering(token: str) -> int:
     return int(token[1:].split("_")[0])
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     logger.info("Porting pushers table, delta 31...")
     cur.execute(
         """
@@ -61,8 +64,8 @@ def run_create(cur, database_engine, *args, **kwargs):
     """
     )
     count = 0
-    for row in cur.fetchall():
-        row = list(row)
+    for tuple_row in cur.fetchall():
+        row = list(tuple_row)
         row[12] = token_to_stream_ordering(row[12])
         cur.execute(
             """
@@ -80,7 +83,3 @@ def run_create(cur, database_engine, *args, **kwargs):
     cur.execute("DROP TABLE pushers")
     cur.execute("ALTER TABLE pushers2 RENAME TO pushers")
     logger.info("Moved %d pushers to new table", count)
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/31/search_update.py b/synapse/storage/schema/main/delta/31/search_update.py
index b84c844e3a..e20e92e454 100644
--- a/synapse/storage/schema/main/delta/31/search_update.py
+++ b/synapse/storage/schema/main/delta/31/search_update.py
@@ -14,7 +14,8 @@
 import json
 import logging
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -26,7 +27,7 @@ ALTER TABLE event_search ADD COLUMN stream_ordering BIGINT;
 """
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if not isinstance(database_engine, PostgresEngine):
         return
 
@@ -56,7 +57,3 @@ def run_create(cur, database_engine, *args, **kwargs):
         )
 
         cur.execute(sql, ("event_search_order", progress_json))
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/33/event_fields.py b/synapse/storage/schema/main/delta/33/event_fields.py
index e928c66a8f..8d806f5b52 100644
--- a/synapse/storage/schema/main/delta/33/event_fields.py
+++ b/synapse/storage/schema/main/delta/33/event_fields.py
@@ -14,6 +14,8 @@
 import json
 import logging
 
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -25,7 +27,7 @@ ALTER TABLE events ADD COLUMN contains_url BOOLEAN;
 """
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     for statement in get_statements(ALTER_TABLE.splitlines()):
         cur.execute(statement)
 
@@ -51,7 +53,3 @@ def run_create(cur, database_engine, *args, **kwargs):
         )
 
         cur.execute(sql, ("event_fields_sender_url", progress_json))
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/33/remote_media_ts.py b/synapse/storage/schema/main/delta/33/remote_media_ts.py
index 3907189e29..35499e43b5 100644
--- a/synapse/storage/schema/main/delta/33/remote_media_ts.py
+++ b/synapse/storage/schema/main/delta/33/remote_media_ts.py
@@ -14,14 +14,22 @@
 
 import time
 
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
+
 ALTER_TABLE = "ALTER TABLE remote_media_cache ADD COLUMN last_access_ts BIGINT"
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     cur.execute(ALTER_TABLE)
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_upgrade(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+    config: HomeServerConfig,
+) -> None:
     cur.execute(
         "UPDATE remote_media_cache SET last_access_ts = ?",
         (int(time.time() * 1000),),
diff --git a/synapse/storage/schema/main/delta/34/cache_stream.py b/synapse/storage/schema/main/delta/34/cache_stream.py
index cf09e43e2b..682c86da1a 100644
--- a/synapse/storage/schema/main/delta/34/cache_stream.py
+++ b/synapse/storage/schema/main/delta/34/cache_stream.py
@@ -14,7 +14,8 @@
 
 import logging
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -34,13 +35,9 @@ CREATE INDEX cache_invalidation_stream_id ON cache_invalidation_stream(stream_id
 """
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if not isinstance(database_engine, PostgresEngine):
         return
 
     for statement in get_statements(CREATE_TABLE.splitlines()):
         cur.execute(statement)
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/34/received_txn_purge.py b/synapse/storage/schema/main/delta/34/received_txn_purge.py
index 67d505e68b..dcfe3bc45a 100644
--- a/synapse/storage/schema/main/delta/34/received_txn_purge.py
+++ b/synapse/storage/schema/main/delta/34/received_txn_purge.py
@@ -14,19 +14,16 @@
 
 import logging
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 
 logger = logging.getLogger(__name__)
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if isinstance(database_engine, PostgresEngine):
         cur.execute("TRUNCATE received_transactions")
     else:
         cur.execute("DELETE FROM received_transactions")
 
     cur.execute("CREATE INDEX received_transactions_ts ON received_transactions(ts)")
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/37/remove_auth_idx.py b/synapse/storage/schema/main/delta/37/remove_auth_idx.py
index a377884169..d672f9b43c 100644
--- a/synapse/storage/schema/main/delta/37/remove_auth_idx.py
+++ b/synapse/storage/schema/main/delta/37/remove_auth_idx.py
@@ -14,7 +14,8 @@
 
 import logging
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -68,7 +69,7 @@ CREATE INDEX evauth_edges_id ON event_auth(event_id);
 """
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     for statement in get_statements(DROP_INDICES.splitlines()):
         cur.execute(statement)
 
@@ -79,7 +80,3 @@ def run_create(cur, database_engine, *args, **kwargs):
 
     for statement in get_statements(drop_constraint.splitlines()):
         cur.execute(statement)
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/42/user_dir.py b/synapse/storage/schema/main/delta/42/user_dir.py
index 506f326f4d..7e5c307c62 100644
--- a/synapse/storage/schema/main/delta/42/user_dir.py
+++ b/synapse/storage/schema/main/delta/42/user_dir.py
@@ -14,7 +14,8 @@
 
 import logging
 
-from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.prepare_database import get_statements
 
 logger = logging.getLogger(__name__)
@@ -66,7 +67,7 @@ CREATE VIRTUAL TABLE user_directory_search
 """
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     for statement in get_statements(BOTH_TABLES.splitlines()):
         cur.execute(statement)
 
@@ -78,7 +79,3 @@ def run_create(cur, database_engine, *args, **kwargs):
             cur.execute(statement)
     else:
         raise Exception("Unrecognized database engine")
-
-
-def run_upgrade(*args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/48/group_unique_indexes.py b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
index 49f5f2c003..ad2da4c8af 100644
--- a/synapse/storage/schema/main/delta/48/group_unique_indexes.py
+++ b/synapse/storage/schema/main/delta/48/group_unique_indexes.py
@@ -12,7 +12,9 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.engines import PostgresEngine
+
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.prepare_database import get_statements
 
 FIX_INDEXES = """
@@ -34,7 +36,7 @@ CREATE INDEX group_rooms_r_idx ON group_rooms(room_id);
 """
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     rowid = "ctid" if isinstance(database_engine, PostgresEngine) else "rowid"
 
     # remove duplicates from group_users & group_invites tables
@@ -57,7 +59,3 @@ def run_create(cur, database_engine, *args, **kwargs):
 
     for statement in get_statements(FIX_INDEXES.splitlines()):
         cur.execute(statement)
-
-
-def run_upgrade(*args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/50/make_event_content_nullable.py b/synapse/storage/schema/main/delta/50/make_event_content_nullable.py
index acd6ad1e1f..3e8a348b8a 100644
--- a/synapse/storage/schema/main/delta/50/make_event_content_nullable.py
+++ b/synapse/storage/schema/main/delta/50/make_event_content_nullable.py
@@ -53,16 +53,13 @@ SQLite:
 
 import logging
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 
 logger = logging.getLogger(__name__)
 
 
-def run_create(cur, database_engine, *args, **kwargs):
-    pass
-
-
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if isinstance(database_engine, PostgresEngine):
         cur.execute(
             """
@@ -76,7 +73,9 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
     cur.execute(
         "SELECT sql FROM sqlite_master WHERE tbl_name='events' AND type='table'"
     )
-    (oldsql,) = cur.fetchone()
+    row = cur.fetchone()
+    assert row is not None
+    (oldsql,) = row
 
     sql = oldsql.replace("content TEXT NOT NULL", "content TEXT")
     if sql == oldsql:
@@ -85,7 +84,9 @@ def run_upgrade(cur, database_engine, *args, **kwargs):
     logger.info("Replacing definition of 'events' with: %s", sql)
 
     cur.execute("PRAGMA schema_version")
-    (oldver,) = cur.fetchone()
+    row = cur.fetchone()
+    assert row is not None
+    (oldver,) = row
     cur.execute("PRAGMA writable_schema=ON")
     cur.execute(
         "UPDATE sqlite_master SET sql=? WHERE tbl_name='events' AND type='table'",
diff --git a/synapse/storage/schema/main/delta/56/unique_user_filter_index.py b/synapse/storage/schema/main/delta/56/unique_user_filter_index.py
index bb7296852a..2461f87d77 100644
--- a/synapse/storage/schema/main/delta/56/unique_user_filter_index.py
+++ b/synapse/storage/schema/main/delta/56/unique_user_filter_index.py
@@ -1,7 +1,8 @@
 import logging
 from io import StringIO
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.prepare_database import execute_statements_from_stream
 
 logger = logging.getLogger(__name__)
@@ -16,11 +17,7 @@ This migration updates the user_filters table as follows:
 """
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
-    pass
-
-
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if isinstance(database_engine, PostgresEngine):
         select_clause = """
             SELECT DISTINCT ON (user_id, filter_id) user_id, filter_id, filter_json
diff --git a/synapse/storage/schema/main/delta/57/local_current_membership.py b/synapse/storage/schema/main/delta/57/local_current_membership.py
index d25093c19f..cc0f2109bb 100644
--- a/synapse/storage/schema/main/delta/57/local_current_membership.py
+++ b/synapse/storage/schema/main/delta/57/local_current_membership.py
@@ -27,7 +27,16 @@
 # equivalent behaviour as if the server had remained in the room).
 
 
-def run_upgrade(cur, database_engine, config, *args, **kwargs):
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
+
+
+def run_upgrade(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+    config: HomeServerConfig,
+) -> None:
     # We need to do the insert in `run_upgrade` section as we don't have access
     # to `config` in `run_create`.
 
@@ -77,7 +86,7 @@ def run_upgrade(cur, database_engine, config, *args, **kwargs):
     )
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     cur.execute(
         """
         CREATE TABLE local_current_membership (
diff --git a/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py b/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py
index d353f2bcb3..4eaab9e086 100644
--- a/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py
+++ b/synapse/storage/schema/main/delta/58/06dlols_unique_idx.py
@@ -20,18 +20,14 @@ entries, and with a UNIQUE index.
 import logging
 from io import StringIO
 
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.prepare_database import execute_statements_from_stream
-from synapse.storage.types import Cursor
 
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(*args, **kwargs):
-    pass
-
-
-def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     # some instances might already have this index, in which case we can skip this
     if isinstance(database_engine, PostgresEngine):
         cur.execute(
diff --git a/synapse/storage/schema/main/delta/58/11user_id_seq.py b/synapse/storage/schema/main/delta/58/11user_id_seq.py
index 4310ec12ce..32f7e0a252 100644
--- a/synapse/storage/schema/main/delta/58/11user_id_seq.py
+++ b/synapse/storage/schema/main/delta/58/11user_id_seq.py
@@ -16,19 +16,16 @@
 Adds a postgres SEQUENCE for generating guest user IDs.
 """
 
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.databases.main.registration import (
     find_max_generated_user_id_localpart,
 )
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if not isinstance(database_engine, PostgresEngine):
         return
 
     next_id = find_max_generated_user_id_localpart(cur) + 1
     cur.execute("CREATE SEQUENCE user_id_seq START WITH %s", (next_id,))
-
-
-def run_upgrade(*args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/59/01ignored_user.py b/synapse/storage/schema/main/delta/59/01ignored_user.py
index 9e8f35c1d2..c53e2bade2 100644
--- a/synapse/storage/schema/main/delta/59/01ignored_user.py
+++ b/synapse/storage/schema/main/delta/59/01ignored_user.py
@@ -20,18 +20,14 @@ import logging
 from io import StringIO
 
 from synapse.storage._base import db_to_json
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.prepare_database import execute_statements_from_stream
-from synapse.storage.types import Cursor
 
 logger = logging.getLogger(__name__)
 
 
-def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
-    pass
-
-
-def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     logger.info("Creating ignored_users table")
     execute_statements_from_stream(cur, StringIO(_create_commands))
 
diff --git a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py
index f8d7db9f2e..4a06b65888 100644
--- a/synapse/storage/schema/main/delta/61/03recreate_min_depth.py
+++ b/synapse/storage/schema/main/delta/61/03recreate_min_depth.py
@@ -16,11 +16,11 @@
 This migration handles the process of changing the type of `room_depth.min_depth` to
 a BIGINT.
 """
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
-from synapse.storage.types import Cursor
 
 
-def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if not isinstance(database_engine, PostgresEngine):
         # this only applies to postgres - sqlite does not distinguish between big and
         # little ints.
@@ -64,7 +64,3 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs
             (6103, 'replace_room_depth_min_depth', '{}', 'populate_room_depth2')
         """
     )
-
-
-def run_upgrade(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
-    pass
diff --git a/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py b/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py
index a2ec4fc26e..9210026dde 100644
--- a/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py
+++ b/synapse/storage/schema/main/delta/68/05partial_state_rooms_triggers.py
@@ -18,11 +18,11 @@ This migration adds triggers to the partial_state_events tables to enforce uniqu
 
 Triggers cannot be expressed in .sql files, so we have to use a separate file.
 """
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
-from synapse.storage.types import Cursor
 
 
-def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     # complain if the room_id in partial_state_events doesn't match
     # that in `events`. We already have a fk constraint which ensures that the event
     # exists in `events`, so all we have to do is raise if there is a row with a
diff --git a/synapse/storage/schema/main/delta/69/01as_txn_seq.py b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
index 24bd4b391e..6c112425f2 100644
--- a/synapse/storage/schema/main/delta/69/01as_txn_seq.py
+++ b/synapse/storage/schema/main/delta/69/01as_txn_seq.py
@@ -17,10 +17,11 @@
 Adds a postgres SEQUENCE for generating application service transaction IDs.
 """
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if isinstance(database_engine, PostgresEngine):
         # If we already have some AS TXNs we want to start from the current
         # maximum value. There are two potential places this is stored - the
@@ -30,10 +31,12 @@ def run_create(cur, database_engine, *args, **kwargs):
 
         cur.execute("SELECT COALESCE(max(txn_id), 0) FROM application_services_txns")
         row = cur.fetchone()
+        assert row is not None
         txn_max = row[0]
 
         cur.execute("SELECT COALESCE(max(last_txn), 0) FROM application_services_state")
         row = cur.fetchone()
+        assert row is not None
         last_txn_max = row[0]
 
         start_val = max(last_txn_max, txn_max) + 1
diff --git a/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py
index 55a5d092cc..2ec1830c6f 100644
--- a/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py
+++ b/synapse/storage/schema/main/delta/72/03bg_populate_events_columns.py
@@ -14,10 +14,11 @@
 
 import json
 
-from synapse.storage.types import Cursor
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
 
 
-def run_create(cur: Cursor, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     """Add a bg update to populate the `state_key` and `rejection_reason` columns of `events`"""
 
     # we know that any new events will have the columns populated (and that has been
@@ -27,7 +28,9 @@ def run_create(cur: Cursor, database_engine, *args, **kwargs):
     # current min and max stream orderings, since that is guaranteed to include all
     # the events that were stored before the new columns were added.
     cur.execute("SELECT MIN(stream_ordering), MAX(stream_ordering) FROM events")
-    (min_stream_ordering, max_stream_ordering) = cur.fetchone()
+    row = cur.fetchone()
+    assert row is not None
+    (min_stream_ordering, max_stream_ordering) = row
 
     if min_stream_ordering is None:
         # no rows, nothing to do.
diff --git a/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
index b5853d125c..5c3e3584a2 100644
--- a/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
+++ b/synapse/storage/schema/main/delta/72/07force_update_current_state_events_membership.py
@@ -19,9 +19,16 @@ for its completion can be removed.
 
 Note the background job must still remain defined in the database class.
 """
+from synapse.config.homeserver import HomeServerConfig
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine
 
 
-def run_upgrade(cur, database_engine, *args, **kwargs):
+def run_upgrade(
+    cur: LoggingTransaction,
+    database_engine: BaseDatabaseEngine,
+    config: HomeServerConfig,
+) -> None:
     cur.execute("SELECT update_name FROM background_updates")
     rows = cur.fetchall()
     for row in rows:
diff --git a/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py b/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py
index 3de0a709eb..c7ed258e9d 100644
--- a/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py
+++ b/synapse/storage/schema/main/delta/73/10_update_sqlite_fts4_tokenizer.py
@@ -13,11 +13,11 @@
 # limitations under the License.
 import json
 
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.engines import BaseDatabaseEngine, Sqlite3Engine
-from synapse.storage.types import Cursor
 
 
-def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None:
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     """
     Upgrade the event_search table to use the porter tokenizer if it isn't already
 
@@ -38,6 +38,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None:
     # Re-run the background job to re-populate the event_search table.
     cur.execute("SELECT MIN(stream_ordering) FROM events")
     row = cur.fetchone()
+    assert row is not None
     min_stream_id = row[0]
 
     # If there are not any events, nothing to do.
@@ -46,6 +47,7 @@ def run_create(cur: Cursor, database_engine: BaseDatabaseEngine) -> None:
 
     cur.execute("SELECT MAX(stream_ordering) FROM events")
     row = cur.fetchone()
+    assert row is not None
     max_stream_id = row[0]
 
     progress = {
diff --git a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py
index e32e9083b3..2ee2bc9422 100644
--- a/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py
+++ b/synapse/storage/schema/main/delta/74/04_membership_tables_event_stream_ordering_triggers.py
@@ -17,11 +17,11 @@
 This migration adds triggers to the room membership tables to enforce consistency.
 Triggers cannot be expressed in .sql files, so we have to use a separate file.
 """
+from synapse.storage.database import LoggingTransaction
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
-from synapse.storage.types import Cursor
 
 
-def run_create(cur: Cursor, database_engine: BaseDatabaseEngine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     # Complain if the `event_stream_ordering` in membership tables doesn't match
     # the `stream_ordering` row with the same `event_id` in `events`.
     if isinstance(database_engine, Sqlite3Engine):
diff --git a/synapse/storage/schema/state/delta/47/state_group_seq.py b/synapse/storage/schema/state/delta/47/state_group_seq.py
index 9fd1ccf6f7..42aff50227 100644
--- a/synapse/storage/schema/state/delta/47/state_group_seq.py
+++ b/synapse/storage/schema/state/delta/47/state_group_seq.py
@@ -12,15 +12,17 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from synapse.storage.engines import PostgresEngine
+from synapse.storage.database import LoggingTransaction
+from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 
 
-def run_create(cur, database_engine, *args, **kwargs):
+def run_create(cur: LoggingTransaction, database_engine: BaseDatabaseEngine) -> None:
     if isinstance(database_engine, PostgresEngine):
         # if we already have some state groups, we want to start making new
         # ones with a higher id.
         cur.execute("SELECT max(id) FROM state_groups")
         row = cur.fetchone()
+        assert row is not None
 
         if row[0] is None:
             start_val = 1
@@ -28,7 +30,3 @@ def run_create(cur, database_engine, *args, **kwargs):
             start_val = row[0] + 1
 
         cur.execute("CREATE SEQUENCE state_group_id_seq START WITH %s", (start_val,))
-
-
-def run_upgrade(*args, **kwargs):
-    pass