# Copyright 2014-2016 OpenMarket Ltd # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. from collections import OrderedDict from typing import Generator from unittest.mock import Mock, call, patch from twisted.internet import defer from synapse.storage._base import SQLBaseStore from synapse.storage.database import DatabasePool from synapse.storage.engines import create_engine from tests import unittest from tests.server import TestHomeServer from tests.utils import USE_POSTGRES_FOR_TESTS, default_config class SQLBaseStoreTestCase(unittest.TestCase): """Test the "simple" SQL generating methods in SQLBaseStore.""" def setUp(self) -> None: # This is the Twisted connection pool. conn_pool = Mock(spec=["runInteraction", "runWithConnection"]) self.mock_txn = Mock() if USE_POSTGRES_FOR_TESTS: # To avoid testing psycopg2 itself, patch execute_batch/execute_values # to assert how it is called. from psycopg2 import extras self.mock_execute_batch = Mock() self.execute_batch_patcher = patch.object( extras, "execute_batch", new=self.mock_execute_batch ) self.execute_batch_patcher.start() self.mock_execute_values = Mock() self.execute_values_patcher = patch.object( extras, "execute_values", new=self.mock_execute_values ) self.execute_values_patcher.start() self.mock_conn = Mock( spec_set=[ "cursor", "rollback", "commit", "closed", "reconnect", "set_session", "encoding", ] ) self.mock_conn.encoding = "UNICODE" else: self.mock_conn = Mock(spec_set=["cursor", "rollback", "commit"]) self.mock_conn.cursor.return_value = self.mock_txn self.mock_txn.connection = self.mock_conn self.mock_conn.rollback.return_value = None # Our fake runInteraction just runs synchronously inline def runInteraction(func, *args, **kwargs) -> defer.Deferred: # type: ignore[no-untyped-def] return defer.succeed(func(self.mock_txn, *args, **kwargs)) conn_pool.runInteraction = runInteraction def runWithConnection(func, *args, **kwargs): # type: ignore[no-untyped-def] return defer.succeed(func(self.mock_conn, *args, **kwargs)) conn_pool.runWithConnection = runWithConnection config = default_config(name="test", parse=True) hs = TestHomeServer("test", config=config) if USE_POSTGRES_FOR_TESTS: db_config = {"name": "psycopg2", "args": {}} else: db_config = {"name": "sqlite3"} engine = create_engine(db_config) fake_engine = Mock(wraps=engine) fake_engine.in_transaction.return_value = False fake_engine.module.OperationalError = engine.module.OperationalError fake_engine.module.DatabaseError = engine.module.DatabaseError fake_engine.module.IntegrityError = engine.module.IntegrityError # Don't convert param style to make assertions easier. fake_engine.convert_param_style = lambda sql: sql # To fix isinstance(...) checks. fake_engine.__class__ = engine.__class__ # type: ignore[assignment] db = DatabasePool(Mock(), Mock(config=db_config), fake_engine) db._db_pool = conn_pool self.datastore = SQLBaseStore(db, None, hs) # type: ignore[arg-type] def tearDown(self) -> None: if USE_POSTGRES_FOR_TESTS: self.execute_batch_patcher.stop() self.execute_values_patcher.stop() @defer.inlineCallbacks def test_insert_1col(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 yield defer.ensureDeferred( self.datastore.db_pool.simple_insert( table="tablename", values={"columname": "Value"} ) ) self.mock_txn.execute.assert_called_once_with( "INSERT INTO tablename (columname) VALUES(?)", ("Value",) ) @defer.inlineCallbacks def test_insert_3cols(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 yield defer.ensureDeferred( self.datastore.db_pool.simple_insert( table="tablename", # Use OrderedDict() so we can assert on the SQL generated values=OrderedDict([("colA", 1), ("colB", 2), ("colC", 3)]), ) ) self.mock_txn.execute.assert_called_once_with( "INSERT INTO tablename (colA, colB, colC) VALUES(?, ?, ?)", (1, 2, 3) ) @defer.inlineCallbacks def test_insert_many(self) -> Generator["defer.Deferred[object]", object, None]: yield defer.ensureDeferred( self.datastore.db_pool.simple_insert_many( table="tablename", keys=( "col1", "col2", ), values=[ ( "val1", "val2", ), ("val3", "val4"), ], desc="", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_execute_values.assert_called_once_with( self.mock_txn, "INSERT INTO tablename (col1, col2) VALUES ?", [("val1", "val2"), ("val3", "val4")], template=None, fetch=False, ) else: self.mock_txn.executemany.assert_called_once_with( "INSERT INTO tablename (col1, col2) VALUES(?, ?)", [("val1", "val2"), ("val3", "val4")], ) @defer.inlineCallbacks def test_insert_many_no_iterable( self, ) -> Generator["defer.Deferred[object]", object, None]: yield defer.ensureDeferred( self.datastore.db_pool.simple_insert_many( table="tablename", keys=( "col1", "col2", ), values=[], desc="", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_execute_values.assert_called_once_with( self.mock_txn, "INSERT INTO tablename (col1, col2) VALUES ?", [], template=None, fetch=False, ) else: self.mock_txn.executemany.assert_called_once_with( "INSERT INTO tablename (col1, col2) VALUES(?, ?)", [] ) @defer.inlineCallbacks def test_select_one_1col(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 self.mock_txn.__iter__ = Mock(return_value=iter([("Value",)])) value = yield defer.ensureDeferred( self.datastore.db_pool.simple_select_one_onecol( table="tablename", keyvalues={"keycol": "TheKey"}, retcol="retcol" ) ) self.assertEqual("Value", value) self.mock_txn.execute.assert_called_once_with( "SELECT retcol FROM tablename WHERE keycol = ?", ["TheKey"] ) @defer.inlineCallbacks def test_select_one_3col(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 self.mock_txn.fetchone.return_value = (1, 2, 3) ret = yield defer.ensureDeferred( self.datastore.db_pool.simple_select_one( table="tablename", keyvalues={"keycol": "TheKey"}, retcols=["colA", "colB", "colC"], ) ) self.assertEqual({"colA": 1, "colB": 2, "colC": 3}, ret) self.mock_txn.execute.assert_called_once_with( "SELECT colA, colB, colC FROM tablename WHERE keycol = ?", ["TheKey"] ) @defer.inlineCallbacks def test_select_one_missing( self, ) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 0 self.mock_txn.fetchone.return_value = None ret = yield defer.ensureDeferred( self.datastore.db_pool.simple_select_one( table="tablename", keyvalues={"keycol": "Not here"}, retcols=["colA"], allow_none=True, ) ) self.assertFalse(ret) @defer.inlineCallbacks def test_select_list(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 3 self.mock_txn.fetchall.return_value = [(1,), (2,), (3,)] self.mock_txn.description = (("colA", None, None, None, None, None, None),) ret = yield defer.ensureDeferred( self.datastore.db_pool.simple_select_list( table="tablename", keyvalues={"keycol": "A set"}, retcols=["colA"] ) ) self.assertEqual([(1,), (2,), (3,)], ret) self.mock_txn.execute.assert_called_once_with( "SELECT colA FROM tablename WHERE keycol = ?", ["A set"] ) @defer.inlineCallbacks def test_select_many_batch( self, ) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 3 self.mock_txn.fetchall.side_effect = [[(1,), (2,)], [(3,)]] ret = yield defer.ensureDeferred( self.datastore.db_pool.simple_select_many_batch( table="tablename", column="col1", iterable=("val1", "val2", "val3"), retcols=("col2",), keyvalues={"col3": "val4"}, batch_size=2, ) ) self.mock_txn.execute.assert_has_calls( [ call( "SELECT col2 FROM tablename WHERE col1 = ANY(?) AND col3 = ?", [["val1", "val2"], "val4"], ), call( "SELECT col2 FROM tablename WHERE col1 = ANY(?) AND col3 = ?", [["val3"], "val4"], ), ], ) self.assertEqual([(1,), (2,), (3,)], ret) def test_select_many_no_iterable(self) -> None: self.mock_txn.rowcount = 3 self.mock_txn.fetchall.side_effect = [(1,), (2,)] ret = self.datastore.db_pool.simple_select_many_txn( self.mock_txn, table="tablename", column="col1", iterable=(), retcols=("col2",), keyvalues={"col3": "val4"}, ) self.mock_txn.execute.assert_not_called() self.assertEqual([], ret) @defer.inlineCallbacks def test_update_one_1col(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 yield defer.ensureDeferred( self.datastore.db_pool.simple_update_one( table="tablename", keyvalues={"keycol": "TheKey"}, updatevalues={"columnname": "New Value"}, ) ) self.mock_txn.execute.assert_called_once_with( "UPDATE tablename SET columnname = ? WHERE keycol = ?", ["New Value", "TheKey"], ) @defer.inlineCallbacks def test_update_one_4cols( self, ) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 yield defer.ensureDeferred( self.datastore.db_pool.simple_update_one( table="tablename", keyvalues=OrderedDict([("colA", 1), ("colB", 2)]), updatevalues=OrderedDict([("colC", 3), ("colD", 4)]), ) ) self.mock_txn.execute.assert_called_once_with( "UPDATE tablename SET colC = ?, colD = ? WHERE" " colA = ? AND colB = ?", [3, 4, 1, 2], ) @defer.inlineCallbacks def test_update_many(self) -> Generator["defer.Deferred[object]", object, None]: yield defer.ensureDeferred( self.datastore.db_pool.simple_update_many( table="tablename", key_names=("col1", "col2"), key_values=[("val1", "val2")], value_names=("col3",), value_values=[("val3",)], desc="", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_execute_batch.assert_called_once_with( self.mock_txn, "UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?", [("val3", "val1", "val2"), ("val3", "val1", "val2")], ) else: self.mock_txn.executemany.assert_called_once_with( "UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?", [("val3", "val1", "val2"), ("val3", "val1", "val2")], ) # key_values and value_values must be the same length. with self.assertRaises(ValueError): yield defer.ensureDeferred( self.datastore.db_pool.simple_update_many( table="tablename", key_names=("col1", "col2"), key_values=[("val1", "val2")], value_names=("col3",), value_values=[], desc="", ) ) @defer.inlineCallbacks def test_update_many_no_values( self, ) -> Generator["defer.Deferred[object]", object, None]: yield defer.ensureDeferred( self.datastore.db_pool.simple_update_many( table="tablename", key_names=("col1", "col2"), key_values=[], value_names=("col3",), value_values=[], desc="", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_execute_batch.assert_called_once_with( self.mock_txn, "UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?", [], ) else: self.mock_txn.executemany.assert_called_once_with( "UPDATE tablename SET col3 = ? WHERE col1 = ? AND col2 = ?", [], ) @defer.inlineCallbacks def test_delete_one(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 yield defer.ensureDeferred( self.datastore.db_pool.simple_delete_one( table="tablename", keyvalues={"keycol": "Go away"} ) ) self.mock_txn.execute.assert_called_once_with( "DELETE FROM tablename WHERE keycol = ?", ["Go away"] ) @defer.inlineCallbacks def test_delete_many(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 2 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_delete_many( table="tablename", column="col1", iterable=("val1", "val2"), keyvalues={"col2": "val3"}, desc="", ) ) self.mock_txn.execute.assert_called_once_with( "DELETE FROM tablename WHERE col1 = ANY(?) AND col2 = ?", [["val1", "val2"], "val3"], ) self.assertEqual(result, 2) @defer.inlineCallbacks def test_delete_many_no_iterable( self, ) -> Generator["defer.Deferred[object]", object, None]: result = yield defer.ensureDeferred( self.datastore.db_pool.simple_delete_many( table="tablename", column="col1", iterable=(), keyvalues={"col2": "val3"}, desc="", ) ) self.mock_txn.execute.assert_not_called() self.assertEqual(result, 0) @defer.inlineCallbacks def test_delete_many_no_keyvalues( self, ) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 2 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_delete_many( table="tablename", column="col1", iterable=("val1", "val2"), keyvalues={}, desc="", ) ) self.mock_txn.execute.assert_called_once_with( "DELETE FROM tablename WHERE col1 = ANY(?)", [["val1", "val2"]] ) self.assertEqual(result, 2) @defer.inlineCallbacks def test_upsert(self) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "oldvalue"}, values={"othercol": "newvalue"}, ) ) self.mock_txn.execute.assert_called_once_with( "INSERT INTO tablename (columnname, othercol) VALUES (?, ?) ON CONFLICT (columnname) DO UPDATE SET othercol=EXCLUDED.othercol", ["oldvalue", "newvalue"], ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_no_values( self, ) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "value"}, values={}, insertion_values={"columnname": "value"}, ) ) self.mock_txn.execute.assert_called_once_with( "INSERT INTO tablename (columnname) VALUES (?) ON CONFLICT (columnname) DO NOTHING", ["value"], ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_with_insertion( self, ) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "oldvalue"}, values={"othercol": "newvalue"}, insertion_values={"thirdcol": "insertionval"}, ) ) self.mock_txn.execute.assert_called_once_with( "INSERT INTO tablename (columnname, thirdcol, othercol) VALUES (?, ?, ?) ON CONFLICT (columnname) DO UPDATE SET othercol=EXCLUDED.othercol", ["oldvalue", "insertionval", "newvalue"], ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_with_where( self, ) -> Generator["defer.Deferred[object]", object, None]: self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "oldvalue"}, values={"othercol": "newvalue"}, where_clause="thirdcol IS NULL", ) ) self.mock_txn.execute.assert_called_once_with( "INSERT INTO tablename (columnname, othercol) VALUES (?, ?) ON CONFLICT (columnname) WHERE thirdcol IS NULL DO UPDATE SET othercol=EXCLUDED.othercol", ["oldvalue", "newvalue"], ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_many(self) -> Generator["defer.Deferred[object]", object, None]: yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert_many( table="tablename", key_names=["keycol1", "keycol2"], key_values=[["keyval1", "keyval2"], ["keyval3", "keyval4"]], value_names=["valuecol3"], value_values=[["val5"], ["val6"]], desc="", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_execute_values.assert_called_once_with( self.mock_txn, "INSERT INTO tablename (keycol1, keycol2, valuecol3) VALUES ? ON CONFLICT (keycol1, keycol2) DO UPDATE SET valuecol3=EXCLUDED.valuecol3", [("keyval1", "keyval2", "val5"), ("keyval3", "keyval4", "val6")], template=None, fetch=False, ) else: self.mock_txn.executemany.assert_called_once_with( "INSERT INTO tablename (keycol1, keycol2, valuecol3) VALUES (?, ?, ?) ON CONFLICT (keycol1, keycol2) DO UPDATE SET valuecol3=EXCLUDED.valuecol3", [("keyval1", "keyval2", "val5"), ("keyval3", "keyval4", "val6")], ) @defer.inlineCallbacks def test_upsert_many_no_values( self, ) -> Generator["defer.Deferred[object]", object, None]: yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert_many( table="tablename", key_names=["columnname"], key_values=[["oldvalue"]], value_names=[], value_values=[], desc="", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_execute_values.assert_called_once_with( self.mock_txn, "INSERT INTO tablename (columnname) VALUES ? ON CONFLICT (columnname) DO NOTHING", [("oldvalue",)], template=None, fetch=False, ) else: self.mock_txn.executemany.assert_called_once_with( "INSERT INTO tablename (columnname) VALUES (?) ON CONFLICT (columnname) DO NOTHING", [("oldvalue",)], ) @defer.inlineCallbacks def test_upsert_emulated_no_values_exists( self, ) -> Generator["defer.Deferred[object]", object, None]: self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename") self.mock_txn.fetchall.return_value = [(1,)] result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "value"}, values={}, insertion_values={"columnname": "value"}, ) ) if USE_POSTGRES_FOR_TESTS: self.mock_txn.execute.assert_has_calls( [ call("LOCK TABLE tablename in EXCLUSIVE MODE", ()), call("SELECT 1 FROM tablename WHERE columnname = ?", ["value"]), ] ) else: self.mock_txn.execute.assert_called_once_with( "SELECT 1 FROM tablename WHERE columnname = ?", ["value"] ) self.assertFalse(result) @defer.inlineCallbacks def test_upsert_emulated_no_values_not_exists( self, ) -> Generator["defer.Deferred[object]", object, None]: self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename") self.mock_txn.fetchall.return_value = [] self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "value"}, values={}, insertion_values={"columnname": "value"}, ) ) self.mock_txn.execute.assert_has_calls( [ call( "SELECT 1 FROM tablename WHERE columnname = ?", ["value"], ), call("INSERT INTO tablename (columnname) VALUES (?)", ["value"]), ], ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_emulated_with_insertion_exists( self, ) -> Generator["defer.Deferred[object]", object, None]: self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename") self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "oldvalue"}, values={"othercol": "newvalue"}, insertion_values={"thirdcol": "insertionval"}, ) ) if USE_POSTGRES_FOR_TESTS: self.mock_txn.execute.assert_has_calls( [ call("LOCK TABLE tablename in EXCLUSIVE MODE", ()), call( "UPDATE tablename SET othercol = ? WHERE columnname = ?", ["newvalue", "oldvalue"], ), ] ) else: self.mock_txn.execute.assert_called_once_with( "UPDATE tablename SET othercol = ? WHERE columnname = ?", ["newvalue", "oldvalue"], ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_emulated_with_insertion_not_exists( self, ) -> Generator["defer.Deferred[object]", object, None]: self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename") self.mock_txn.rowcount = 0 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "oldvalue"}, values={"othercol": "newvalue"}, insertion_values={"thirdcol": "insertionval"}, ) ) self.mock_txn.execute.assert_has_calls( [ call( "UPDATE tablename SET othercol = ? WHERE columnname = ?", ["newvalue", "oldvalue"], ), call( "INSERT INTO tablename (columnname, othercol, thirdcol) VALUES (?, ?, ?)", ["oldvalue", "newvalue", "insertionval"], ), ] ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_emulated_with_where( self, ) -> Generator["defer.Deferred[object]", object, None]: self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename") self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "oldvalue"}, values={"othercol": "newvalue"}, where_clause="thirdcol IS NULL", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_txn.execute.assert_has_calls( [ call("LOCK TABLE tablename in EXCLUSIVE MODE", ()), call( "UPDATE tablename SET othercol = ? WHERE columnname = ? AND thirdcol IS NULL", ["newvalue", "oldvalue"], ), ] ) else: self.mock_txn.execute.assert_called_once_with( "UPDATE tablename SET othercol = ? WHERE columnname = ? AND thirdcol IS NULL", ["newvalue", "oldvalue"], ) self.assertTrue(result) @defer.inlineCallbacks def test_upsert_emulated_with_where_no_values( self, ) -> Generator["defer.Deferred[object]", object, None]: self.datastore.db_pool._unsafe_to_upsert_tables.add("tablename") self.mock_txn.rowcount = 1 result = yield defer.ensureDeferred( self.datastore.db_pool.simple_upsert( table="tablename", keyvalues={"columnname": "oldvalue"}, values={}, where_clause="thirdcol IS NULL", ) ) if USE_POSTGRES_FOR_TESTS: self.mock_txn.execute.assert_has_calls( [ call("LOCK TABLE tablename in EXCLUSIVE MODE", ()), call( "SELECT 1 FROM tablename WHERE columnname = ? AND thirdcol IS NULL", ["oldvalue"], ), ] ) else: self.mock_txn.execute.assert_called_once_with( "SELECT 1 FROM tablename WHERE columnname = ? AND thirdcol IS NULL", ["oldvalue"], ) self.assertFalse(result)