diff --git a/changelog.d/13760.removal b/changelog.d/13760.removal
new file mode 100644
index 0000000000..624e7c3678
--- /dev/null
+++ b/changelog.d/13760.removal
@@ -0,0 +1 @@
+Synapse will now refuse to start if configured to use SQLite < 3.27.
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..e881bff7fb 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -533,15 +533,14 @@ class DatabasePool:
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
- if self.engine.can_native_upsert:
- # Check ASAP (and then later, every 1s) to see if we have finished
- # background updates of tables that aren't safe to update.
- self._clock.call_later(
- 0.0,
- run_as_background_process,
- "upsert_safety_check",
- self._check_safe_to_upsert,
- )
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates of tables that aren't safe to update.
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert,
+ )
def name(self) -> str:
"Return the name of this database"
@@ -1160,11 +1159,8 @@ class DatabasePool:
attempts = 0
while True:
try:
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert
- and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it is safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
return await self.runInteraction(
desc,
@@ -1199,7 +1195,7 @@ class DatabasePool:
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
- native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+ native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
Args:
txn: The transaction to use.
@@ -1207,14 +1203,15 @@ class DatabasePool:
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
- lock: True to lock the table when doing the upsert.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_txn_native_upsert(
txn, table, keyvalues, values, insertion_values=insertion_values
)
@@ -1365,14 +1362,12 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
await self.runInteraction(
desc,
@@ -1406,10 +1401,10 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
diff --git a/synapse/storage/databases/main/lock.py b/synapse/storage/databases/main/lock.py
index 2d7633fbd5..7270ef09da 100644
--- a/synapse/storage/databases/main/lock.py
+++ b/synapse/storage/databases/main/lock.py
@@ -129,91 +129,48 @@ class LockStore(SQLBaseStore):
now = self._clock.time_msec()
token = random_string(6)
- if self.db_pool.engine.can_native_upsert:
-
- def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
- # We take out the lock if either a) there is no row for the lock
- # already, b) the existing row has timed out, or c) the row is
- # for this instance (which means the process got killed and
- # restarted)
- sql = """
- INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
- VALUES (?, ?, ?, ?, ?)
- ON CONFLICT (lock_name, lock_key)
- DO UPDATE
- SET
- token = EXCLUDED.token,
- instance_name = EXCLUDED.instance_name,
- last_renewed_ts = EXCLUDED.last_renewed_ts
- WHERE
- worker_locks.last_renewed_ts < ?
- OR worker_locks.instance_name = EXCLUDED.instance_name
- """
- txn.execute(
- sql,
- (
- lock_name,
- lock_key,
- self._instance_name,
- token,
- now,
- now - _LOCK_TIMEOUT_MS,
- ),
- )
-
- # We only acquired the lock if we inserted or updated the table.
- return bool(txn.rowcount)
-
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock",
- _try_acquire_lock_txn,
- # We can autocommit here as we're executing a single query, this
- # will avoid serialization errors.
- db_autocommit=True,
+ def _try_acquire_lock_txn(txn: LoggingTransaction) -> bool:
+ # We take out the lock if either a) there is no row for the lock
+ # already, b) the existing row has timed out, or c) the row is
+ # for this instance (which means the process got killed and
+ # restarted)
+ sql = """
+ INSERT INTO worker_locks (lock_name, lock_key, instance_name, token, last_renewed_ts)
+ VALUES (?, ?, ?, ?, ?)
+ ON CONFLICT (lock_name, lock_key)
+ DO UPDATE
+ SET
+ token = EXCLUDED.token,
+ instance_name = EXCLUDED.instance_name,
+ last_renewed_ts = EXCLUDED.last_renewed_ts
+ WHERE
+ worker_locks.last_renewed_ts < ?
+ OR worker_locks.instance_name = EXCLUDED.instance_name
+ """
+ txn.execute(
+ sql,
+ (
+ lock_name,
+ lock_key,
+ self._instance_name,
+ token,
+ now,
+ now - _LOCK_TIMEOUT_MS,
+ ),
)
- if not did_lock:
- return None
-
- else:
- # If we're on an old SQLite we emulate the above logic by first
- # clearing out any existing stale locks and then upserting.
-
- def _try_acquire_lock_emulated_txn(txn: LoggingTransaction) -> bool:
- sql = """
- DELETE FROM worker_locks
- WHERE
- lock_name = ?
- AND lock_key = ?
- AND (last_renewed_ts < ? OR instance_name = ?)
- """
- txn.execute(
- sql,
- (lock_name, lock_key, now - _LOCK_TIMEOUT_MS, self._instance_name),
- )
-
- inserted = self.db_pool.simple_upsert_txn_emulated(
- txn,
- table="worker_locks",
- keyvalues={
- "lock_name": lock_name,
- "lock_key": lock_key,
- },
- values={},
- insertion_values={
- "token": token,
- "last_renewed_ts": self._clock.time_msec(),
- "instance_name": self._instance_name,
- },
- )
-
- return inserted
- did_lock = await self.db_pool.runInteraction(
- "try_acquire_lock_emulated", _try_acquire_lock_emulated_txn
- )
+ # We only acquired the lock if we inserted or updated the table.
+ return bool(txn.rowcount)
- if not did_lock:
- return None
+ did_lock = await self.db_pool.runInteraction(
+ "try_acquire_lock",
+ _try_acquire_lock_txn,
+ # We can autocommit here as we're executing a single query, this
+ # will avoid serialization errors.
+ db_autocommit=True,
+ )
+ if not did_lock:
+ return None
lock = Lock(
self._reactor,
diff --git a/synapse/storage/databases/main/stats.py b/synapse/storage/databases/main/stats.py
index b4c652acf3..356d4ca788 100644
--- a/synapse/storage/databases/main/stats.py
+++ b/synapse/storage/databases/main/stats.py
@@ -446,59 +446,41 @@ class StatsStore(StateDeltasStore):
absolutes: Absolute (set) fields
additive_relatives: Fields that will be added onto if existing row present.
"""
- if self.database_engine.can_native_upsert:
- absolute_updates = [
- "%(field)s = EXCLUDED.%(field)s" % {"field": field}
- for field in absolutes.keys()
- ]
-
- relative_updates = [
- "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
- % {"table": table, "field": field}
- for field in additive_relatives.keys()
- ]
-
- insert_cols = []
- qargs = []
-
- for (key, val) in chain(
- keyvalues.items(), absolutes.items(), additive_relatives.items()
- ):
- insert_cols.append(key)
- qargs.append(val)
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + COALESCE(%(table)s.%(field)s, 0)"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = []
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "key_columns": ", ".join(keyvalues),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
- sql = """
- INSERT INTO %(table)s (%(insert_cols_cs)s)
- VALUES (%(insert_vals_qs)s)
- ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
- """ % {
- "table": table,
- "insert_cols_cs": ", ".join(insert_cols),
- "insert_vals_qs": ", ".join(
- ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
- ),
- "key_columns": ", ".join(keyvalues),
- "updates": ", ".join(chain(absolute_updates, relative_updates)),
- }
-
- txn.execute(sql, qargs)
- else:
- self.database_engine.lock_table(txn, table)
- retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
- current_row = self.db_pool.simple_select_one_txn(
- txn, table, keyvalues, retcols, allow_none=True
- )
- if current_row is None:
- merged_dict = {**keyvalues, **absolutes, **additive_relatives}
- self.db_pool.simple_insert_txn(txn, table, merged_dict)
- else:
- for (key, val) in additive_relatives.items():
- if current_row[key] is None:
- current_row[key] = val
- else:
- current_row[key] += val
- current_row.update(absolutes)
- self.db_pool.simple_update_one_txn(txn, table, keyvalues, current_row)
+ txn.execute(sql, qargs)
async def _calculate_and_set_initial_state_for_room(self, room_id: str) -> None:
"""Calculate and insert an entry into room_stats_current.
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index ba79e19f7f..f8c6877ee8 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -221,25 +221,15 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_interval: how long until next retry in ms
"""
- if self.database_engine.can_native_upsert:
- await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings_native,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- db_autocommit=True, # Safe as its a single upsert
- )
- else:
- await self.db_pool.runInteraction(
- "set_destination_retry_timings",
- self._set_destination_retry_timings_emulated,
- destination,
- failure_ts,
- retry_last_ts,
- retry_interval,
- )
+ await self.db_pool.runInteraction(
+ "set_destination_retry_timings",
+ self._set_destination_retry_timings_native,
+ destination,
+ failure_ts,
+ retry_last_ts,
+ retry_interval,
+ db_autocommit=True, # Safe as it's a single upsert
+ )
def _set_destination_retry_timings_native(
self,
@@ -249,8 +239,6 @@ class TransactionWorkerStore(CacheInvalidationWorkerStore):
retry_last_ts: int,
retry_interval: int,
) -> None:
- assert self.database_engine.can_native_upsert
-
# Upsert retry time interval if retry_interval is zero (i.e. we're
# resetting it) or greater than the existing retry interval.
#
diff --git a/synapse/storage/engines/_base.py b/synapse/storage/engines/_base.py
index 971ff82693..0d16a419a4 100644
--- a/synapse/storage/engines/_base.py
+++ b/synapse/storage/engines/_base.py
@@ -45,14 +45,6 @@ class BaseDatabaseEngine(Generic[ConnectionType], metaclass=abc.ABCMeta):
@property
@abc.abstractmethod
- def can_native_upsert(self) -> bool:
- """
- Do we support native UPSERTs?
- """
- ...
-
- @property
- @abc.abstractmethod
def supports_using_any_list(self) -> bool:
"""
Do we support using `a = ANY(?)` and passing a list
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index 517f9d5f98..7f7d006ac2 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -159,13 +159,6 @@ class PostgresEngine(BaseDatabaseEngine[psycopg2.extensions.connection]):
db_conn.commit()
@property
- def can_native_upsert(self) -> bool:
- """
- Can we use native UPSERTs?
- """
- return True
-
- @property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
return True
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index 621f2c5efe..095ae0a096 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -49,14 +49,6 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
return True
@property
- def can_native_upsert(self) -> bool:
- """
- Do we support native UPSERTs? This requires SQLite3 3.24+, plus some
- more work we haven't done yet to tell what was inserted vs updated.
- """
- return sqlite3.sqlite_version_info >= (3, 24, 0)
-
- @property
def supports_using_any_list(self) -> bool:
"""Do we support using `a = ANY(?)` and passing a list"""
return False
@@ -70,12 +62,11 @@ class Sqlite3Engine(BaseDatabaseEngine[sqlite3.Connection]):
self, db_conn: sqlite3.Connection, allow_outdated_version: bool = False
) -> None:
if not allow_outdated_version:
- version = sqlite3.sqlite_version_info
# Synapse is untested against older SQLite versions, and we don't want
# to let users upgrade to a version of Synapse with broken support for their
# sqlite version, because it risks leaving them with a half-upgraded db.
- if version < (3, 22, 0):
- raise RuntimeError("Synapse requires sqlite 3.22 or above.")
+ if sqlite3.sqlite_version_info < (3, 27, 0):
+ raise RuntimeError("Synapse requires sqlite 3.27 or above.")
def check_new_database(self, txn: Cursor) -> None:
"""Gets called when setting up a brand new database. This allows us to
diff --git a/tests/storage/test_base.py b/tests/storage/test_base.py
index cce8e75c74..40e58f8199 100644
--- a/tests/storage/test_base.py
+++ b/tests/storage/test_base.py
@@ -54,7 +54,6 @@ class SQLBaseStoreTestCase(unittest.TestCase):
sqlite_config = {"name": "sqlite3"}
engine = create_engine(sqlite_config)
fake_engine = Mock(wraps=engine)
- fake_engine.can_native_upsert = False
fake_engine.in_transaction.return_value = False
db = DatabasePool(Mock(), Mock(config=sqlite_config), fake_engine)
|