diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b394a6658b..a14b13aec8 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -94,7 +94,9 @@ UNIQUE_INDEX_BACKGROUND_UPDATES = {
"event_search": "event_search_event_id_idx",
"local_media_repository_thumbnails": "local_media_repository_thumbnails_method_idx",
"remote_media_cache_thumbnails": "remote_media_repository_thumbnails_method_idx",
- "event_push_summary": "event_push_summary_unique_index",
+ "event_push_summary": "event_push_summary_unique_index2",
+ "receipts_linearized": "receipts_linearized_unique_index",
+ "receipts_graph": "receipts_graph_unique_index",
}
@@ -288,8 +290,7 @@ class LoggingTransaction:
# LoggingTransaction isn't expecting there to be any callbacks; assert that
# is not the case.
assert self.after_callbacks is not None
- # type-ignore: need mypy containing https://github.com/python/mypy/pull/12668
- self.after_callbacks.append((callback, args, kwargs)) # type: ignore[arg-type]
+ self.after_callbacks.append((callback, args, kwargs))
def async_call_after(
self, callback: Callable[P, Awaitable], *args: P.args, **kwargs: P.kwargs
@@ -310,8 +311,7 @@ class LoggingTransaction:
# LoggingTransaction isn't expecting there to be any callbacks; assert that
# is not the case.
assert self.async_after_callbacks is not None
- # type-ignore: need mypy containing https://github.com/python/mypy/pull/12668
- self.async_after_callbacks.append((callback, args, kwargs)) # type: ignore[arg-type]
+ self.async_after_callbacks.append((callback, args, kwargs))
def call_on_exception(
self, callback: Callable[P, object], *args: P.args, **kwargs: P.kwargs
@@ -329,8 +329,7 @@ class LoggingTransaction:
# LoggingTransaction isn't expecting there to be any callbacks; assert that
# is not the case.
assert self.exception_callbacks is not None
- # type-ignore: need mypy containing https://github.com/python/mypy/pull/12668
- self.exception_callbacks.append((callback, args, kwargs)) # type: ignore[arg-type]
+ self.exception_callbacks.append((callback, args, kwargs))
def fetchone(self) -> Optional[Tuple]:
return self.txn.fetchone()
@@ -391,6 +390,14 @@ class LoggingTransaction:
def executemany(self, sql: str, *args: Any) -> None:
self._do_execute(self.txn.executemany, sql, *args)
+ def executescript(self, sql: str) -> None:
+ if isinstance(self.database_engine, Sqlite3Engine):
+ self._do_execute(self.txn.executescript, sql) # type: ignore[attr-defined]
+ else:
+ raise NotImplementedError(
+ f"executescript only exists for sqlite driver, not {type(self.database_engine)}"
+ )
+
def _make_sql_one_line(self, sql: str) -> str:
"Strip newlines out of SQL so that the loggers in the DB are on one line"
return " ".join(line.strip() for line in sql.splitlines() if line.strip())
@@ -411,10 +418,7 @@ class LoggingTransaction:
sql = self.database_engine.convert_param_style(sql)
if args:
try:
- # The type-ignore should be redundant once mypy releases a version with
- # https://github.com/python/mypy/pull/12668. (`args` might be empty,
- # (but we'll catch the index error if so.)
- sql_logger.debug("[SQL values] {%s} %r", self.name, args[0]) # type: ignore[index]
+ sql_logger.debug("[SQL values] {%s} %r", self.name, args[0])
except Exception:
# Don't let logging failures stop SQL from working
pass
@@ -533,15 +537,14 @@ class DatabasePool:
if isinstance(self.engine, Sqlite3Engine):
self._unsafe_to_upsert_tables.add("user_directory_search")
- if self.engine.can_native_upsert:
- # Check ASAP (and then later, every 1s) to see if we have finished
- # background updates of tables that aren't safe to update.
- self._clock.call_later(
- 0.0,
- run_as_background_process,
- "upsert_safety_check",
- self._check_safe_to_upsert,
- )
+ # Check ASAP (and then later, every 1s) to see if we have finished
+ # background updates of tables that aren't safe to update.
+ self._clock.call_later(
+ 0.0,
+ run_as_background_process,
+ "upsert_safety_check",
+ self._check_safe_to_upsert,
+ )
def name(self) -> str:
"Return the name of this database"
@@ -566,15 +569,15 @@ class DatabasePool:
retcols=["update_name"],
desc="check_background_updates",
)
- updates = [x["update_name"] for x in updates]
+ background_update_names = [x["update_name"] for x in updates]
for table, update_name in UNIQUE_INDEX_BACKGROUND_UPDATES.items():
- if update_name not in updates:
+ if update_name not in background_update_names:
logger.debug("Now safe to upsert in %s", table)
self._unsafe_to_upsert_tables.discard(table)
# If there's any updates still running, reschedule to run.
- if updates:
+ if background_update_names:
self._clock.call_later(
15.0,
run_as_background_process,
@@ -646,9 +649,7 @@ class DatabasePool:
# For now, we just log an error, and hope that it works on the first attempt.
# TODO: raise an exception.
- # Type-ignore Mypy doesn't yet consider ParamSpec.args to be iterable; see
- # https://github.com/python/mypy/pull/12668
- for i, arg in enumerate(args): # type: ignore[arg-type, var-annotated]
+ for i, arg in enumerate(args):
if inspect.isgenerator(arg):
logger.error(
"Programming error: generator passed to new_transaction as "
@@ -656,9 +657,7 @@ class DatabasePool:
i,
func,
)
- # Type-ignore Mypy doesn't yet consider ParamSpec.args to be a mapping; see
- # https://github.com/python/mypy/pull/12668
- for name, val in kwargs.items(): # type: ignore[attr-defined]
+ for name, val in kwargs.items():
if inspect.isgenerator(val):
logger.error(
"Programming error: generator passed to new_transaction as "
@@ -1132,17 +1131,57 @@ class DatabasePool:
desc: str = "simple_upsert",
lock: bool = True,
) -> bool:
- """
+ """Insert a row with values + insertion_values; on conflict, update with values.
+
+ All of our supported databases accept the nonstandard "upsert" statement in
+ their dialect of SQL. We call this a "native upsert". The syntax looks roughly
+ like:
+
+ INSERT INTO table VALUES (values + insertion_values)
+ ON CONFLICT (keyvalues)
+ DO UPDATE SET (values); -- overwrite `values` columns only
+
+ If (values) is empty, the resulting query is slighlty simpler:
+
+ INSERT INTO table VALUES (insertion_values)
+ ON CONFLICT (keyvalues)
+ DO NOTHING; -- do not overwrite any columns
+
+ This function is a helper to build such queries.
- `lock` should generally be set to True (the default), but can be set
- to False if either of the following are true:
- 1. there is a UNIQUE INDEX on the key columns. In this case a conflict
- will cause an IntegrityError in which case this function will retry
- the update.
- 2. we somehow know that we are the only thread which will be updating
- this table.
- As an additional note, this parameter only matters for old SQLite versions
- because we will use native upserts otherwise.
+ In order for upserts to make sense, the database must be able to determine when
+ an upsert CONFLICTs with an existing row. Postgres and SQLite ensure this by
+ requiring that a unique index exist on the column names used to detect a
+ conflict (i.e. `keyvalues.keys()`).
+
+ If there is no such index, we can "emulate" an upsert with a SELECT followed
+ by either an INSERT or an UPDATE. This is unsafe: we cannot make the same
+ atomicity guarantees that a native upsert can and are very vulnerable to races
+ and crashes. Therefore if we wish to upsert without an appropriate unique index,
+ we must either:
+
+ 1. Acquire a table-level lock before the emulated upsert (`lock=True`), or
+ 2. VERY CAREFULLY ensure that we are the only thread and worker which will be
+ writing to this table, in which case we can proceed without a lock
+ (`lock=False`).
+
+ Generally speaking, you should use `lock=True`. If the table in question has a
+ unique index[*], this class will use a native upsert (which is atomic and so can
+ ignore the `lock` argument). Otherwise this class will use an emulated upsert,
+ in which case we want the safer option unless we been VERY CAREFUL.
+
+ [*]: Some tables have unique indices added to them in the background. Those
+ tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES,
+ where `T` maps to the background update that adds a unique index to `T`.
+ This dictionary is maintained by hand.
+
+ At runtime, we constantly check to see if each of these background updates
+ has run. If so, we deem the coresponding table safe to upsert into, because
+ we can now use a native insert to do so. If not, we deem the table unsafe
+ to upsert into and require an emulated upsert.
+
+ Tables that do not appear in this dictionary are assumed to have an
+ appropriate unique index and therefore be safe to upsert into.
Args:
table: The table to upsert into
@@ -1160,11 +1199,8 @@ class DatabasePool:
attempts = 0
while True:
try:
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert
- and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it is safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
return await self.runInteraction(
desc,
@@ -1195,11 +1231,12 @@ class DatabasePool:
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
+ where_clause: Optional[str] = None,
lock: bool = True,
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
- native one (Pg9.5+, recent SQLites), or fall back to an emulated method.
+ native one (Pg9.5+, SQLite >= 3.24), or fall back to an emulated method.
Args:
txn: The transaction to use.
@@ -1207,16 +1244,23 @@ class DatabasePool:
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
- lock: True to lock the table when doing the upsert.
+ where_clause: An index predicate to apply to the upsert.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_txn_native_upsert(
- txn, table, keyvalues, values, insertion_values=insertion_values
+ txn,
+ table,
+ keyvalues,
+ values,
+ insertion_values=insertion_values,
+ where_clause=where_clause,
)
else:
return self.simple_upsert_txn_emulated(
@@ -1225,6 +1269,7 @@ class DatabasePool:
keyvalues,
values,
insertion_values=insertion_values,
+ where_clause=where_clause,
lock=lock,
)
@@ -1235,6 +1280,7 @@ class DatabasePool:
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
+ where_clause: Optional[str] = None,
lock: bool = True,
) -> bool:
"""
@@ -1243,6 +1289,7 @@ class DatabasePool:
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
+ where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1262,14 +1309,17 @@ class DatabasePool:
else:
return "%s = ?" % (key,)
+ # Generate a where clause of each keyvalue and optionally the provided
+ # index predicate.
+ where = [_getwhere(k) for k in keyvalues]
+ if where_clause:
+ where.append(where_clause)
+
if not values:
# If `values` is empty, then all of the values we care about are in
# the unique key, so there is nothing to UPDATE. We can just do a
# SELECT instead to see if it exists.
- sql = "SELECT 1 FROM %s WHERE %s" % (
- table,
- " AND ".join(_getwhere(k) for k in keyvalues),
- )
+ sql = "SELECT 1 FROM %s WHERE %s" % (table, " AND ".join(where))
sqlargs = list(keyvalues.values())
txn.execute(sql, sqlargs)
if txn.fetchall():
@@ -1280,7 +1330,7 @@ class DatabasePool:
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
- " AND ".join(_getwhere(k) for k in keyvalues),
+ " AND ".join(where),
)
sqlargs = list(values.values()) + list(keyvalues.values())
@@ -1310,6 +1360,7 @@ class DatabasePool:
keyvalues: Dict[str, Any],
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
+ where_clause: Optional[str] = None,
) -> bool:
"""
Use the native UPSERT functionality in PostgreSQL.
@@ -1319,6 +1370,7 @@ class DatabasePool:
keyvalues: The unique key tables and their new values
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
+ where_clause: An index predicate to apply to the upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1334,11 +1386,12 @@ class DatabasePool:
allvalues.update(values)
latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
- sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
+ sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) %s DO %s" % (
table,
", ".join(k for k in allvalues),
", ".join("?" for _ in allvalues),
", ".join(k for k in keyvalues),
+ f"WHERE {where_clause}" if where_clause else "",
latter,
)
txn.execute(sql, list(allvalues.values()))
@@ -1365,14 +1418,12 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- # We can autocommit if we are going to use native upserts
- autocommit = (
- self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables
- )
+ # We can autocommit if it safe to upsert
+ autocommit = table not in self._unsafe_to_upsert_tables
await self.runInteraction(
desc,
@@ -1406,10 +1457,10 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused if the database engine
- supports native upserts.
+ lock: True to lock the table when doing the upsert. Unused when performing
+ a native upsert.
"""
- if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
+ if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
txn, table, key_names, key_values, value_names, value_values
)
@@ -1607,7 +1658,7 @@ class DatabasePool:
table: string giving the table name
keyvalues: dict of column names and values to select the row with
retcol: string giving the name of the column to return
- allow_none: If true, return None instead of failing if the SELECT
+ allow_none: If true, return None instead of raising StoreError if the SELECT
statement returns no rows
desc: description of the transaction, for logging and metrics
"""
@@ -2024,13 +2075,14 @@ class DatabasePool:
retcols: Collection[str],
allow_none: bool = False,
) -> Optional[Dict[str, Any]]:
- select_sql = "SELECT %s FROM %s WHERE %s" % (
- ", ".join(retcols),
- table,
- " AND ".join("%s = ?" % (k,) for k in keyvalues),
- )
+ select_sql = "SELECT %s FROM %s" % (", ".join(retcols), table)
+
+ if keyvalues:
+ select_sql += " WHERE %s" % (" AND ".join("%s = ?" % k for k in keyvalues),)
+ txn.execute(select_sql, list(keyvalues.values()))
+ else:
+ txn.execute(select_sql)
- txn.execute(select_sql, list(keyvalues.values()))
row = txn.fetchone()
if not row:
@@ -2410,6 +2462,66 @@ def make_in_list_sql_clause(
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
+# These overloads ensure that `columns` and `iterable` values have the same length.
+# Suppress "Single overload definition, multiple required" complaint.
+@overload # type: ignore[misc]
+def make_tuple_in_list_sql_clause(
+ database_engine: BaseDatabaseEngine,
+ columns: Tuple[str, str],
+ iterable: Collection[Tuple[Any, Any]],
+) -> Tuple[str, list]:
+ ...
+
+
+def make_tuple_in_list_sql_clause(
+ database_engine: BaseDatabaseEngine,
+ columns: Tuple[str, ...],
+ iterable: Collection[Tuple[Any, ...]],
+) -> Tuple[str, list]:
+ """Returns an SQL clause that checks the given tuple of columns is in the iterable.
+
+ Args:
+ database_engine
+ columns: Names of the columns in the tuple.
+ iterable: The tuples to check the columns against.
+
+ Returns:
+ A tuple of SQL query and the args
+ """
+ if len(columns) == 0:
+ # Should be unreachable due to mypy, as long as the overloads are set up right.
+ if () in iterable:
+ return "TRUE", []
+ else:
+ return "FALSE", []
+
+ if len(columns) == 1:
+ # Use `= ANY(?)` on postgres.
+ return make_in_list_sql_clause(
+ database_engine, next(iter(columns)), [values[0] for values in iterable]
+ )
+
+ # There are multiple columns. Avoid using an `= ANY(?)` clause on postgres, as
+ # indices are not used when there are multiple columns. Instead, use an `IN`
+ # expression.
+ #
+ # `IN ((?, ...), ...)` with tuples is supported by postgres only, whereas
+ # `IN (VALUES (?, ...), ...)` is supported by both sqlite and postgres.
+ # Thus, the latter is chosen.
+
+ if len(iterable) == 0:
+ # A 0-length `VALUES` list is not allowed in sqlite or postgres.
+ # Also note that a 0-length `IN (...)` clause (not using `VALUES`) is not
+ # allowed in postgres.
+ return "FALSE", []
+
+ tuple_sql = "(%s)" % (",".join("?" for _ in columns),)
+ return "(%s) IN (VALUES %s)" % (
+ ",".join(column for column in columns),
+ ",".join(tuple_sql for _ in iterable),
+ ), [value for values in iterable for value in values]
+
+
KV = TypeVar("KV")
|