diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index e61595336c..b112ff3df2 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -17,7 +17,17 @@
import logging
import time
from time import monotonic as monotonic_time
-from typing import Any, Callable, Dict, Iterable, Iterator, List, Optional, Tuple
+from typing import (
+ Any,
+ Callable,
+ Dict,
+ Iterable,
+ Iterator,
+ List,
+ Optional,
+ Tuple,
+ TypeVar,
+)
from six import iteritems, iterkeys, itervalues
from six.moves import intern, range
@@ -32,13 +42,14 @@ from synapse.config.database import DatabaseConnectionConfig
from synapse.logging.context import (
LoggingContext,
LoggingContextOrSentinel,
+ current_context,
make_deferred_yieldable,
)
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.storage.background_updates import BackgroundUpdater
from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
from synapse.storage.types import Connection, Cursor
-from synapse.util.stringutils import exception_to_unicode
+from synapse.types import Collection
logger = logging.getLogger(__name__)
@@ -201,9 +212,9 @@ class LoggingTransaction:
def executemany(self, sql: str, *args: Any):
self._do_execute(self.txn.executemany, sql, *args)
- def _make_sql_one_line(self, sql):
+ def _make_sql_one_line(self, sql: str) -> str:
"Strip newlines out of SQL so that the loggers in the DB are on one line"
- return " ".join(l.strip() for l in sql.splitlines() if l.strip())
+ return " ".join(line.strip() for line in sql.splitlines() if line.strip())
def _do_execute(self, func, sql, *args):
sql = self._make_sql_one_line(sql)
@@ -411,20 +422,14 @@ class Database(object):
# This can happen if the database disappears mid
# transaction.
logger.warning(
- "[TXN OPERROR] {%s} %s %d/%d",
- name,
- exception_to_unicode(e),
- i,
- N,
+ "[TXN OPERROR] {%s} %s %d/%d", name, e, i, N,
)
if i < N:
i += 1
try:
conn.rollback()
except self.engine.module.Error as e1:
- logger.warning(
- "[TXN EROLL] {%s} %s", name, exception_to_unicode(e1)
- )
+ logger.warning("[TXN EROLL] {%s} %s", name, e1)
continue
raise
except self.engine.module.DatabaseError as e:
@@ -436,9 +441,7 @@ class Database(object):
conn.rollback()
except self.engine.module.Error as e1:
logger.warning(
- "[TXN EROLL] {%s} %s",
- name,
- exception_to_unicode(e1),
+ "[TXN EROLL] {%s} %s", name, e1,
)
continue
raise
@@ -483,7 +486,7 @@ class Database(object):
end = monotonic_time()
duration = end - start
- LoggingContext.current_context().add_database_transaction(duration)
+ current_context().add_database_transaction(duration)
transaction_logger.debug("[TXN END] {%s} %f sec", name, duration)
@@ -510,7 +513,7 @@ class Database(object):
after_callbacks = [] # type: List[_CallbackListEntry]
exception_callbacks = [] # type: List[_CallbackListEntry]
- if LoggingContext.current_context() == LoggingContext.sentinel:
+ if not current_context():
logger.warning("Starting db txn '%s' from sentinel context", desc)
try:
@@ -547,10 +550,8 @@ class Database(object):
Returns:
Deferred: The result of func
"""
- parent_context = (
- LoggingContext.current_context()
- ) # type: Optional[LoggingContextOrSentinel]
- if parent_context == LoggingContext.sentinel:
+ parent_context = current_context() # type: Optional[LoggingContextOrSentinel]
+ if not parent_context:
logger.warning(
"Starting db connection from sentinel context: metrics will be lost"
)
@@ -880,20 +881,24 @@ class Database(object):
txn.execute(sql, list(allvalues.values()))
def simple_upsert_many_txn(
- self, txn, table, key_names, key_values, value_names, value_values
- ):
+ self,
+ txn: LoggingTransaction,
+ table: str,
+ key_names: Collection[str],
+ key_values: Collection[Iterable[Any]],
+ value_names: Collection[str],
+ value_values: Iterable[Iterable[str]],
+ ) -> None:
"""
Upsert, many times.
Args:
- table (str): The table to upsert into
- key_names (list[str]): The key column names.
- key_values (list[list]): A list of each row's key column values.
- value_names (list[str]): The value column names. If empty, no
- values will be used, even if value_values is provided.
- value_values (list[list]): A list of each row's value column values.
- Returns:
- None
+ table: The table to upsert into
+ key_names: The key column names.
+ key_values: A list of each row's key column values.
+ value_names: The value column names
+ value_values: A list of each row's value column values.
+ Ignored if value_names is empty.
"""
if self.engine.can_native_upsert and table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
@@ -905,20 +910,24 @@ class Database(object):
)
def simple_upsert_many_txn_emulated(
- self, txn, table, key_names, key_values, value_names, value_values
- ):
+ self,
+ txn: LoggingTransaction,
+ table: str,
+ key_names: Iterable[str],
+ key_values: Collection[Iterable[Any]],
+ value_names: Collection[str],
+ value_values: Iterable[Iterable[str]],
+ ) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
Args:
- table (str): The table to upsert into
- key_names (list[str]): The key column names.
- key_values (list[list]): A list of each row's key column values.
- value_names (list[str]): The value column names. If empty, no
- values will be used, even if value_values is provided.
- value_values (list[list]): A list of each row's value column values.
- Returns:
- None
+ table: The table to upsert into
+ key_names: The key column names.
+ key_values: A list of each row's key column values.
+ value_names: The value column names
+ value_values: A list of each row's value column values.
+ Ignored if value_names is empty.
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
@@ -932,20 +941,24 @@ class Database(object):
self.simple_upsert_txn_emulated(txn, table, _keys, _vals)
def simple_upsert_many_txn_native_upsert(
- self, txn, table, key_names, key_values, value_names, value_values
- ):
+ self,
+ txn: LoggingTransaction,
+ table: str,
+ key_names: Collection[str],
+ key_values: Collection[Iterable[Any]],
+ value_names: Collection[str],
+ value_values: Iterable[Iterable[Any]],
+ ) -> None:
"""
Upsert, many times, using batching where possible.
Args:
- table (str): The table to upsert into
- key_names (list[str]): The key column names.
- key_values (list[list]): A list of each row's key column values.
- value_names (list[str]): The value column names. If empty, no
- values will be used, even if value_values is provided.
- value_values (list[list]): A list of each row's value column values.
- Returns:
- None
+ table: The table to upsert into
+ key_names: The key column names.
+ key_values: A list of each row's key column values.
+ value_names: The value column names
+ value_values: A list of each row's value column values.
+ Ignored if value_names is empty.
"""
allnames = [] # type: List[str]
allnames.extend(key_names)
@@ -1558,3 +1571,74 @@ def make_in_list_sql_clause(
return "%s = ANY(?)" % (column,), [list(iterable)]
else:
return "%s IN (%s)" % (column, ",".join("?" for _ in iterable)), list(iterable)
+
+
+KV = TypeVar("KV")
+
+
+def make_tuple_comparison_clause(
+ database_engine: BaseDatabaseEngine, keys: List[Tuple[str, KV]]
+) -> Tuple[str, List[KV]]:
+ """Returns a tuple comparison SQL clause
+
+ Depending what the SQL engine supports, builds a SQL clause that looks like either
+ "(a, b) > (?, ?)", or "(a > ?) OR (a == ? AND b > ?)".
+
+ Args:
+ database_engine
+ keys: A set of (column, value) pairs to be compared.
+
+ Returns:
+ A tuple of SQL query and the args
+ """
+ if database_engine.supports_tuple_comparison:
+ return (
+ "(%s) > (%s)" % (",".join(k[0] for k in keys), ",".join("?" for _ in keys)),
+ [k[1] for k in keys],
+ )
+
+ # we want to build a clause
+ # (a > ?) OR
+ # (a == ? AND b > ?) OR
+ # (a == ? AND b == ? AND c > ?)
+ # ...
+ # (a == ? AND b == ? AND ... AND z > ?)
+ #
+ # or, equivalently:
+ #
+ # (a > ? OR (a == ? AND
+ # (b > ? OR (b == ? AND
+ # ...
+ # (y > ? OR (y == ? AND
+ # z > ?
+ # ))
+ # ...
+ # ))
+ # ))
+ #
+ # which itself is equivalent to (and apparently easier for the query optimiser):
+ #
+ # (a >= ? AND (a > ? OR
+ # (b >= ? AND (b > ? OR
+ # ...
+ # (y >= ? AND (y > ? OR
+ # z > ?
+ # ))
+ # ...
+ # ))
+ # ))
+ #
+ #
+
+ clause = ""
+ args = [] # type: List[KV]
+ for k, v in keys[:-1]:
+ clause = clause + "(%s >= ? AND (%s > ? OR " % (k, k)
+ args.extend([v, v])
+
+ (k, v) = keys[-1]
+ clause += "%s > ?" % (k,)
+ args.append(v)
+
+ clause += "))" * (len(keys) - 1)
+ return clause, args
|