diff options
Diffstat (limited to 'tests/storage')
-rw-r--r-- | tests/storage/databases/main/test_events_worker.py | 139 | ||||
-rw-r--r-- | tests/storage/test_appservice.py | 439 | ||||
-rw-r--r-- | tests/storage/test_background_update.py | 119 | ||||
-rw-r--r-- | tests/storage/test_event_chain.py | 4 | ||||
-rw-r--r-- | tests/storage/test_main.py | 27 | ||||
-rw-r--r-- | tests/storage/test_user_directory.py | 5 |
6 files changed, 497 insertions, 236 deletions
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index a649e8c618..5ae491ff5a 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -12,11 +12,24 @@ # See the License for the specific language governing permissions and # limitations under the License. import json +from contextlib import contextmanager +from typing import Generator +from twisted.enterprise.adbapi import ConnectionPool +from twisted.internet.defer import ensureDeferred +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.room_versions import EventFormatVersions, RoomVersions from synapse.logging.context import LoggingContext from synapse.rest import admin from synapse.rest.client import login, room -from synapse.storage.databases.main.events_worker import EventsWorkerStore +from synapse.server import HomeServer +from synapse.storage.databases.main.events_worker import ( + EVENT_QUEUE_THREADS, + EventsWorkerStore, +) +from synapse.storage.types import Connection +from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results from tests import unittest @@ -144,3 +157,127 @@ class EventCacheTestCase(unittest.HomeserverTestCase): # We should have fetched the event from the DB self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) + + +class DatabaseOutageTestCase(unittest.HomeserverTestCase): + """Test event fetching during a database outage.""" + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): + self.store: EventsWorkerStore = hs.get_datastore() + + self.room_id = f"!room:{hs.hostname}" + self.event_ids = [f"event{i}" for i in range(20)] + + self._populate_events() + + def _populate_events(self) -> None: + """Ensure that there are test events in the database. + + When testing with the in-memory SQLite database, all the events are lost during + the simulated outage. + + To ensure consistency between `room_id`s and `event_id`s before and after the + outage, rows are built and inserted manually. + + Upserts are used to handle the non-SQLite case where events are not lost. + """ + self.get_success( + self.store.db_pool.simple_upsert( + "rooms", + {"room_id": self.room_id}, + {"room_version": RoomVersions.V4.identifier}, + ) + ) + + self.event_ids = [f"event{i}" for i in range(20)] + for idx, event_id in enumerate(self.event_ids): + self.get_success( + self.store.db_pool.simple_upsert( + "events", + {"event_id": event_id}, + { + "event_id": event_id, + "room_id": self.room_id, + "topological_ordering": idx, + "stream_ordering": idx, + "type": "test", + "processed": True, + "outlier": False, + }, + ) + ) + self.get_success( + self.store.db_pool.simple_upsert( + "event_json", + {"event_id": event_id}, + { + "room_id": self.room_id, + "json": json.dumps({"type": "test", "room_id": self.room_id}), + "internal_metadata": "{}", + "format_version": EventFormatVersions.V3, + }, + ) + ) + + @contextmanager + def _outage(self) -> Generator[None, None, None]: + """Simulate a database outage. + + Returns: + A context manager. While the context is active, any attempts to connect to + the database will fail. + """ + connection_pool = self.store.db_pool._db_pool + + # Close all connections and shut down the database `ThreadPool`. + connection_pool.close() + + # Restart the database `ThreadPool`. + connection_pool.start() + + original_connection_factory = connection_pool.connectionFactory + + def connection_factory(_pool: ConnectionPool) -> Connection: + raise Exception("Could not connect to the database.") + + connection_pool.connectionFactory = connection_factory # type: ignore[assignment] + try: + yield + finally: + connection_pool.connectionFactory = original_connection_factory + + # If the in-memory SQLite database is being used, all the events are gone. + # Restore the test data. + self._populate_events() + + def test_failure(self) -> None: + """Test that event fetches do not get stuck during a database outage.""" + with self._outage(): + failure = self.get_failure( + self.store.get_event(self.event_ids[0]), Exception + ) + self.assertEqual(str(failure.value), "Could not connect to the database.") + + def test_recovery(self) -> None: + """Test that event fetchers recover after a database outage.""" + with self._outage(): + # Kick off a bunch of event fetches but do not pump the reactor + event_deferreds = [] + for event_id in self.event_ids: + event_deferreds.append(ensureDeferred(self.store.get_event(event_id))) + + # We should have maxed out on event fetcher threads + self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS) + + # All the event fetchers will fail + self.pump() + self.assertEqual(self.store._event_fetch_ongoing, 0) + + for event_deferred in event_deferreds: + failure = self.get_failure(event_deferred, Exception) + self.assertEqual( + str(failure.value), "Could not connect to the database." + ) + + # This next event fetch should succeed + self.get_success(self.store.get_event(self.event_ids[0])) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index f26d5acf9c..329490caad 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -14,35 +14,37 @@ import json import os import tempfile +from typing import List, Optional, cast from unittest.mock import Mock import yaml from twisted.internet import defer +from twisted.test.proto_helpers import MemoryReactor from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.config._base import ConfigError +from synapse.events import EventBase +from synapse.server import HomeServer from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore, ) +from synapse.util import Clock from tests import unittest from tests.test_utils import make_awaitable -from tests.utils import setup_test_homeserver -class ApplicationServiceStoreTestCase(unittest.TestCase): - @defer.inlineCallbacks +class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase): def setUp(self): - self.as_yaml_files = [] - hs = yield setup_test_homeserver( - self.addCleanup, federation_sender=Mock(), federation_client=Mock() - ) + super(ApplicationServiceStoreTestCase, self).setUp() + + self.as_yaml_files: List[str] = [] - hs.config.appservice.app_service_config_files = self.as_yaml_files - hs.config.caches.event_cache_size = 1 + self.hs.config.appservice.app_service_config_files = self.as_yaml_files + self.hs.config.caches.event_cache_size = 1 self.as_token = "token1" self.as_url = "some_url" @@ -53,12 +55,14 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self._add_appservice("token2", "as2", "some_url", "some_hs_token", "bob") self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob") # must be done after inserts - database = hs.get_datastores().databases[0] + database = self.hs.get_datastores().databases[0] self.store = ApplicationServiceStore( - database, make_conn(database._database_config, database.engine, "test"), hs + database, + make_conn(database._database_config, database.engine, "test"), + self.hs, ) - def tearDown(self): + def tearDown(self) -> None: # TODO: suboptimal that we need to create files for tests! for f in self.as_yaml_files: try: @@ -66,7 +70,9 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): except Exception: pass - def _add_appservice(self, as_token, id, url, hs_token, sender): + super(ApplicationServiceStoreTestCase, self).tearDown() + + def _add_appservice(self, as_token, id, url, hs_token, sender) -> None: as_yaml = { "url": url, "as_token": as_token, @@ -80,12 +86,13 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - def test_retrieve_unknown_service_token(self): + def test_retrieve_unknown_service_token(self) -> None: service = self.store.get_app_service_by_token("invalid_token") self.assertEquals(service, None) - def test_retrieval_of_service(self): + def test_retrieval_of_service(self) -> None: stored_service = self.store.get_app_service_by_token(self.as_token) + assert stored_service is not None self.assertEquals(stored_service.token, self.as_token) self.assertEquals(stored_service.id, self.as_id) self.assertEquals(stored_service.url, self.as_url) @@ -93,22 +100,18 @@ class ApplicationServiceStoreTestCase(unittest.TestCase): self.assertEquals(stored_service.namespaces[ApplicationService.NS_ROOMS], []) self.assertEquals(stored_service.namespaces[ApplicationService.NS_USERS], []) - def test_retrieval_of_all_services(self): + def test_retrieval_of_all_services(self) -> None: services = self.store.get_app_services() self.assertEquals(len(services), 3) -class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): - @defer.inlineCallbacks - def setUp(self): - self.as_yaml_files = [] - - hs = yield setup_test_homeserver( - self.addCleanup, federation_sender=Mock(), federation_client=Mock() - ) +class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): + def setUp(self) -> None: + super(ApplicationServiceTransactionStoreTestCase, self).setUp() + self.as_yaml_files: List[str] = [] - hs.config.appservice.app_service_config_files = self.as_yaml_files - hs.config.caches.event_cache_size = 1 + self.hs.config.appservice.app_service_config_files = self.as_yaml_files + self.hs.config.caches.event_cache_size = 1 self.as_list = [ {"token": "token1", "url": "https://matrix-as.org", "id": "id_1"}, @@ -117,21 +120,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): {"token": "gamma_tok", "url": "https://gamma.com", "id": "id_gamma"}, ] for s in self.as_list: - yield self._add_service(s["url"], s["token"], s["id"]) + self._add_service(s["url"], s["token"], s["id"]) self.as_yaml_files = [] # We assume there is only one database in these tests - database = hs.get_datastores().databases[0] + database = self.hs.get_datastores().databases[0] self.db_pool = database._db_pool self.engine = database.engine - db_config = hs.config.database.get_single_database() + db_config = self.hs.config.database.get_single_database() self.store = TestTransactionStore( - database, make_conn(db_config, self.engine, "test"), hs + database, make_conn(db_config, self.engine, "test"), self.hs ) - def _add_service(self, url, as_token, id): + def _add_service(self, url, as_token, id) -> None: as_yaml = { "url": url, "as_token": as_token, @@ -145,13 +148,15 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - def _set_state(self, id, state, txn=None): + def _set_state( + self, id: str, state: ApplicationServiceState, txn: Optional[int] = None + ): return self.db_pool.runOperation( self.engine.convert_param_style( "INSERT INTO application_services_state(as_id, state, last_txn) " "VALUES(?,?,?)" ), - (id, state, txn), + (id, state.value, txn), ) def _insert_txn(self, as_id, txn_id, events): @@ -169,234 +174,277 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): "INSERT INTO application_services_state(as_id, last_txn, state) " "VALUES(?,?,?)" ), - (as_id, txn_id, ApplicationServiceState.UP), + (as_id, txn_id, ApplicationServiceState.UP.value), ) - @defer.inlineCallbacks - def test_get_appservice_state_none(self): + def test_get_appservice_state_none( + self, + ) -> None: service = Mock(id="999") - state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) + state = self.get_success(self.store.get_appservice_state(service)) self.assertEquals(None, state) - @defer.inlineCallbacks - def test_get_appservice_state_up(self): - yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) + def test_get_appservice_state_up( + self, + ) -> None: + self.get_success( + self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) + ) service = Mock(id=self.as_list[0]["id"]) - state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) + state = self.get_success( + defer.ensureDeferred(self.store.get_appservice_state(service)) + ) self.assertEquals(ApplicationServiceState.UP, state) - @defer.inlineCallbacks - def test_get_appservice_state_down(self): - yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) - yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.DOWN) - yield self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) + def test_get_appservice_state_down( + self, + ) -> None: + self.get_success( + self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) + ) + self.get_success( + self._set_state(self.as_list[1]["id"], ApplicationServiceState.DOWN) + ) + self.get_success( + self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) + ) service = Mock(id=self.as_list[1]["id"]) - state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) + state = self.get_success(self.store.get_appservice_state(service)) self.assertEquals(ApplicationServiceState.DOWN, state) - @defer.inlineCallbacks - def test_get_appservices_by_state_none(self): - services = yield defer.ensureDeferred( + def test_get_appservices_by_state_none( + self, + ) -> None: + services = self.get_success( self.store.get_appservices_by_state(ApplicationServiceState.DOWN) ) self.assertEquals(0, len(services)) - @defer.inlineCallbacks - def test_set_appservices_state_down(self): + def test_set_appservices_state_down( + self, + ) -> None: service = Mock(id=self.as_list[1]["id"]) - yield defer.ensureDeferred( + self.get_success( self.store.set_appservice_state(service, ApplicationServiceState.DOWN) ) - rows = yield self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT as_id FROM application_services_state WHERE state=?" - ), - (ApplicationServiceState.DOWN,), + rows = self.get_success( + self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT as_id FROM application_services_state WHERE state=?" + ), + (ApplicationServiceState.DOWN.value,), + ) ) self.assertEquals(service.id, rows[0][0]) - @defer.inlineCallbacks - def test_set_appservices_state_multiple_up(self): + def test_set_appservices_state_multiple_up( + self, + ) -> None: service = Mock(id=self.as_list[1]["id"]) - yield defer.ensureDeferred( + self.get_success( self.store.set_appservice_state(service, ApplicationServiceState.UP) ) - yield defer.ensureDeferred( + self.get_success( self.store.set_appservice_state(service, ApplicationServiceState.DOWN) ) - yield defer.ensureDeferred( + self.get_success( self.store.set_appservice_state(service, ApplicationServiceState.UP) ) - rows = yield self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT as_id FROM application_services_state WHERE state=?" - ), - (ApplicationServiceState.UP,), + rows = self.get_success( + self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT as_id FROM application_services_state WHERE state=?" + ), + (ApplicationServiceState.UP.value,), + ) ) self.assertEquals(service.id, rows[0][0]) - @defer.inlineCallbacks - def test_create_appservice_txn_first(self): + def test_create_appservice_txn_first( + self, + ) -> None: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] - txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events, []) + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) + txn = self.get_success( + defer.ensureDeferred(self.store.create_appservice_txn(service, events, [])) ) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - @defer.inlineCallbacks - def test_create_appservice_txn_older_last_txn(self): + def test_create_appservice_txn_older_last_txn( + self, + ) -> None: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] - yield self._set_last_txn(service.id, 9643) # AS is falling behind - yield self._insert_txn(service.id, 9644, events) - yield self._insert_txn(service.id, 9645, events) - txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events, []) - ) + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) + self.get_success(self._set_last_txn(service.id, 9643)) # AS is falling behind + self.get_success(self._insert_txn(service.id, 9644, events)) + self.get_success(self._insert_txn(service.id, 9645, events)) + txn = self.get_success(self.store.create_appservice_txn(service, events, [])) self.assertEquals(txn.id, 9646) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - @defer.inlineCallbacks - def test_create_appservice_txn_up_to_date_last_txn(self): + def test_create_appservice_txn_up_to_date_last_txn( + self, + ) -> None: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] - yield self._set_last_txn(service.id, 9643) - txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events, []) - ) + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) + self.get_success(self._set_last_txn(service.id, 9643)) + txn = self.get_success(self.store.create_appservice_txn(service, events, [])) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - @defer.inlineCallbacks - def test_create_appservice_txn_up_fuzzing(self): + def test_create_appservice_txn_up_fuzzing( + self, + ) -> None: service = Mock(id=self.as_list[0]["id"]) - events = [Mock(event_id="e1"), Mock(event_id="e2")] - yield self._set_last_txn(service.id, 9643) + events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) + self.get_success(self._set_last_txn(service.id, 9643)) # dump in rows with higher IDs to make sure the queries aren't wrong. - yield self._set_last_txn(self.as_list[1]["id"], 119643) - yield self._set_last_txn(self.as_list[2]["id"], 9) - yield self._set_last_txn(self.as_list[3]["id"], 9643) - yield self._insert_txn(self.as_list[1]["id"], 119644, events) - yield self._insert_txn(self.as_list[1]["id"], 119645, events) - yield self._insert_txn(self.as_list[1]["id"], 119646, events) - yield self._insert_txn(self.as_list[2]["id"], 10, events) - yield self._insert_txn(self.as_list[3]["id"], 9643, events) - - txn = yield defer.ensureDeferred( - self.store.create_appservice_txn(service, events, []) - ) + self.get_success(self._set_last_txn(self.as_list[1]["id"], 119643)) + self.get_success(self._set_last_txn(self.as_list[2]["id"], 9)) + self.get_success(self._set_last_txn(self.as_list[3]["id"], 9643)) + self.get_success(self._insert_txn(self.as_list[1]["id"], 119644, events)) + self.get_success(self._insert_txn(self.as_list[1]["id"], 119645, events)) + self.get_success(self._insert_txn(self.as_list[1]["id"], 119646, events)) + self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events)) + self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events)) + + txn = self.get_success(self.store.create_appservice_txn(service, events, [])) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - @defer.inlineCallbacks - def test_complete_appservice_txn_first_txn(self): + def test_complete_appservice_txn_first_txn( + self, + ) -> None: service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 1 - yield self._insert_txn(service.id, txn_id, events) - yield defer.ensureDeferred( + self.get_success(self._insert_txn(service.id, txn_id, events)) + self.get_success( self.store.complete_appservice_txn(txn_id=txn_id, service=service) ) - res = yield self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT last_txn FROM application_services_state WHERE as_id=?" - ), - (service.id,), + res = self.get_success( + self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT last_txn FROM application_services_state WHERE as_id=?" + ), + (service.id,), + ) ) self.assertEquals(1, len(res)) self.assertEquals(txn_id, res[0][0]) - res = yield self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT * FROM application_services_txns WHERE txn_id=?" - ), - (txn_id,), + res = self.get_success( + self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT * FROM application_services_txns WHERE txn_id=?" + ), + (txn_id,), + ) ) self.assertEquals(0, len(res)) - @defer.inlineCallbacks - def test_complete_appservice_txn_existing_in_state_table(self): + def test_complete_appservice_txn_existing_in_state_table( + self, + ) -> None: service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 5 - yield self._set_last_txn(service.id, 4) - yield self._insert_txn(service.id, txn_id, events) - yield defer.ensureDeferred( + self.get_success(self._set_last_txn(service.id, 4)) + self.get_success(self._insert_txn(service.id, txn_id, events)) + self.get_success( self.store.complete_appservice_txn(txn_id=txn_id, service=service) ) - res = yield self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT last_txn, state FROM application_services_state WHERE as_id=?" - ), - (service.id,), + res = self.get_success( + self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT last_txn, state FROM application_services_state WHERE as_id=?" + ), + (service.id,), + ) ) self.assertEquals(1, len(res)) self.assertEquals(txn_id, res[0][0]) - self.assertEquals(ApplicationServiceState.UP, res[0][1]) - - res = yield self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT * FROM application_services_txns WHERE txn_id=?" - ), - (txn_id,), + self.assertEquals(ApplicationServiceState.UP.value, res[0][1]) + + res = self.get_success( + self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT * FROM application_services_txns WHERE txn_id=?" + ), + (txn_id,), + ) ) self.assertEquals(0, len(res)) - @defer.inlineCallbacks - def test_get_oldest_unsent_txn_none(self): + def test_get_oldest_unsent_txn_none( + self, + ) -> None: service = Mock(id=self.as_list[0]["id"]) - txn = yield defer.ensureDeferred(self.store.get_oldest_unsent_txn(service)) + txn = self.get_success(self.store.get_oldest_unsent_txn(service)) self.assertEquals(None, txn) - @defer.inlineCallbacks - def test_get_oldest_unsent_txn(self): + def test_get_oldest_unsent_txn(self) -> None: service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) + # (ignore needed because Mypy won't allow us to assign to a method otherwise) + self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) # type: ignore[assignment] - yield self._insert_txn(self.as_list[1]["id"], 9, other_events) - yield self._insert_txn(service.id, 10, events) - yield self._insert_txn(service.id, 11, other_events) - yield self._insert_txn(service.id, 12, other_events) + self.get_success(self._insert_txn(self.as_list[1]["id"], 9, other_events)) + self.get_success(self._insert_txn(service.id, 10, events)) + self.get_success(self._insert_txn(service.id, 11, other_events)) + self.get_success(self._insert_txn(service.id, 12, other_events)) - txn = yield defer.ensureDeferred(self.store.get_oldest_unsent_txn(service)) + txn = self.get_success(self.store.get_oldest_unsent_txn(service)) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) self.assertEquals(events, txn.events) - @defer.inlineCallbacks - def test_get_appservices_by_state_single(self): - yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) - yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) + def test_get_appservices_by_state_single( + self, + ) -> None: + self.get_success( + self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) + ) + self.get_success( + self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) + ) - services = yield defer.ensureDeferred( + services = self.get_success( self.store.get_appservices_by_state(ApplicationServiceState.DOWN) ) self.assertEquals(1, len(services)) self.assertEquals(self.as_list[0]["id"], services[0].id) - @defer.inlineCallbacks - def test_get_appservices_by_state_multiple(self): - yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) - yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) - yield self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) - yield self._set_state(self.as_list[3]["id"], ApplicationServiceState.UP) + def test_get_appservices_by_state_multiple( + self, + ) -> None: + self.get_success( + self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) + ) + self.get_success( + self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) + ) + self.get_success( + self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) + ) + self.get_success( + self._set_state(self.as_list[3]["id"], ApplicationServiceState.UP) + ) - services = yield defer.ensureDeferred( + services = self.get_success( self.store.get_appservices_by_state(ApplicationServiceState.DOWN) ) self.assertEquals(2, len(services)) @@ -407,16 +455,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): - def make_homeserver(self, reactor, clock): - hs = self.setup_test_homeserver() - return hs - - def prepare(self, hs, reactor, clock): + def prepare( + self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer + ) -> None: self.service = Mock(id="foo") self.store = self.hs.get_datastore() - self.get_success(self.store.set_appservice_state(self.service, "up")) + self.get_success( + self.store.set_appservice_state(self.service, ApplicationServiceState.UP) + ) - def test_get_type_stream_id_for_appservice_no_value(self): + def test_get_type_stream_id_for_appservice_no_value(self) -> None: value = self.get_success( self.store.get_type_stream_id_for_appservice(self.service, "read_receipt") ) @@ -427,13 +475,13 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ) self.assertEquals(value, 0) - def test_get_type_stream_id_for_appservice_invalid_type(self): + def test_get_type_stream_id_for_appservice_invalid_type(self) -> None: self.get_failure( self.store.get_type_stream_id_for_appservice(self.service, "foobar"), ValueError, ) - def test_set_type_stream_id_for_appservice(self): + def test_set_type_stream_id_for_appservice(self) -> None: read_receipt_value = 1024 self.get_success( self.store.set_type_stream_id_for_appservice( @@ -455,7 +503,7 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ) self.assertEqual(result, read_receipt_value) - def test_set_type_stream_id_for_appservice_invalid_type(self): + def test_set_type_stream_id_for_appservice_invalid_type(self) -> None: self.get_failure( self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024), ValueError, @@ -464,12 +512,12 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): # required for ApplicationServiceTransactionStoreTestCase tests class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore): - def __init__(self, database: DatabasePool, db_conn, hs): + def __init__(self, database: DatabasePool, db_conn, hs) -> None: super().__init__(database, db_conn, hs) -class ApplicationServiceStoreConfigTestCase(unittest.TestCase): - def _write_config(self, suffix, **kwargs): +class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase): + def _write_config(self, suffix, **kwargs) -> str: vals = { "id": "id" + suffix, "url": "url" + suffix, @@ -485,41 +533,33 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): f.write(yaml.dump(vals)) return path - @defer.inlineCallbacks - def test_unique_works(self): + def test_unique_works(self) -> None: f1 = self._write_config(suffix="1") f2 = self._write_config(suffix="2") - hs = yield setup_test_homeserver( - self.addCleanup, federation_sender=Mock(), federation_client=Mock() - ) - - hs.config.appservice.app_service_config_files = [f1, f2] - hs.config.caches.event_cache_size = 1 + self.hs.config.appservice.app_service_config_files = [f1, f2] + self.hs.config.caches.event_cache_size = 1 - database = hs.get_datastores().databases[0] + database = self.hs.get_datastores().databases[0] ApplicationServiceStore( - database, make_conn(database._database_config, database.engine, "test"), hs + database, + make_conn(database._database_config, database.engine, "test"), + self.hs, ) - @defer.inlineCallbacks - def test_duplicate_ids(self): + def test_duplicate_ids(self) -> None: f1 = self._write_config(id="id", suffix="1") f2 = self._write_config(id="id", suffix="2") - hs = yield setup_test_homeserver( - self.addCleanup, federation_sender=Mock(), federation_client=Mock() - ) - - hs.config.appservice.app_service_config_files = [f1, f2] - hs.config.caches.event_cache_size = 1 + self.hs.config.appservice.app_service_config_files = [f1, f2] + self.hs.config.caches.event_cache_size = 1 with self.assertRaises(ConfigError) as cm: - database = hs.get_datastores().databases[0] + database = self.hs.get_datastores().databases[0] ApplicationServiceStore( database, make_conn(database._database_config, database.engine, "test"), - hs, + self.hs, ) e = cm.exception @@ -527,24 +567,19 @@ class ApplicationServiceStoreConfigTestCase(unittest.TestCase): self.assertIn(f2, str(e)) self.assertIn("id", str(e)) - @defer.inlineCallbacks - def test_duplicate_as_tokens(self): + def test_duplicate_as_tokens(self) -> None: f1 = self._write_config(as_token="as_token", suffix="1") f2 = self._write_config(as_token="as_token", suffix="2") - hs = yield setup_test_homeserver( - self.addCleanup, federation_sender=Mock(), federation_client=Mock() - ) - - hs.config.appservice.app_service_config_files = [f1, f2] - hs.config.caches.event_cache_size = 1 + self.hs.config.appservice.app_service_config_files = [f1, f2] + self.hs.config.caches.event_cache_size = 1 with self.assertRaises(ConfigError) as cm: - database = hs.get_datastores().databases[0] + database = self.hs.get_datastores().databases[0] ApplicationServiceStore( database, make_conn(database._database_config, database.engine, "test"), - hs, + self.hs, ) e = cm.exception diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index a5f5ebad41..d77c001506 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -1,8 +1,26 @@ -from unittest.mock import Mock +# Copyright 2021 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# Use backported mock for AsyncMock support on Python 3.6. +from mock import Mock + +from twisted.internet.defer import Deferred, ensureDeferred from synapse.storage.background_updates import BackgroundUpdater from tests import unittest +from tests.test_utils import make_awaitable class BackgroundUpdateTestCase(unittest.HomeserverTestCase): @@ -20,10 +38,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): def test_do_background_update(self): # the time we claim it takes to update one item when running the update - duration_ms = 4200 + duration_ms = 10 # the target runtime for each bg update - target_background_update_duration_ms = 5000000 + target_background_update_duration_ms = 100 store = self.hs.get_datastore() self.get_success( @@ -48,10 +66,8 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() res = self.get_success( - self.updates.do_next_background_update( - target_background_update_duration_ms - ), - by=0.1, + self.updates.do_next_background_update(False), + by=0.01, ) self.assertFalse(res) @@ -74,16 +90,93 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() - result = self.get_success( - self.updates.do_next_background_update(target_background_update_duration_ms) - ) + result = self.get_success(self.updates.do_next_background_update(False)) self.assertFalse(result) self.update_handler.assert_called_once() # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = self.get_success( - self.updates.do_next_background_update(target_background_update_duration_ms) - ) + result = self.get_success(self.updates.do_next_background_update(False)) self.assertTrue(result) self.assertFalse(self.update_handler.called) + + +class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase): + def prepare(self, reactor, clock, homeserver): + self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates + # the base test class should have run the real bg updates for us + self.assertTrue( + self.get_success(self.updates.has_completed_background_updates()) + ) + + self.update_deferred = Deferred() + self.update_handler = Mock(return_value=self.update_deferred) + self.updates.register_background_update_handler( + "test_update", self.update_handler + ) + + # Mock out the AsyncContextManager + self._update_ctx_manager = Mock(spec=["__aenter__", "__aexit__"]) + self._update_ctx_manager.__aenter__ = Mock( + return_value=make_awaitable(None), + ) + self._update_ctx_manager.__aexit__ = Mock(return_value=make_awaitable(None)) + + # Mock out the `update_handler` callback + self._on_update = Mock(return_value=self._update_ctx_manager) + + # Define a default batch size value that's not the same as the internal default + # value (100). + self._default_batch_size = 500 + + # Register the callbacks with more mocks + self.hs.get_module_api().register_background_update_controller_callbacks( + on_update=self._on_update, + min_batch_size=Mock(return_value=make_awaitable(self._default_batch_size)), + default_batch_size=Mock( + return_value=make_awaitable(self._default_batch_size), + ), + ) + + def test_controller(self): + store = self.hs.get_datastore() + self.get_success( + store.db_pool.simple_insert( + "background_updates", + values={"update_name": "test_update", "progress_json": "{}"}, + ) + ) + + # Set the return value for the context manager. + enter_defer = Deferred() + self._update_ctx_manager.__aenter__ = Mock(return_value=enter_defer) + + # Start the background update. + do_update_d = ensureDeferred(self.updates.do_next_background_update(True)) + + self.pump() + + # `run_update` should have been called, but the update handler won't be + # called until the `enter_defer` (returned by `__aenter__`) is resolved. + self._on_update.assert_called_once_with( + "test_update", + "master", + False, + ) + self.assertFalse(do_update_d.called) + self.assertFalse(self.update_deferred.called) + + # Resolving the `enter_defer` should call the update handler, which then + # blocks. + enter_defer.callback(100) + self.pump() + self.update_handler.assert_called_once_with({}, self._default_batch_size) + self.assertFalse(self.update_deferred.called) + self._update_ctx_manager.__aexit__.assert_not_called() + + # Resolving the update handler deferred should cause the + # `do_next_background_update` to finish and return + self.update_deferred.callback(100) + self.pump() + self._update_ctx_manager.__aexit__.assert_called() + self.get_success(do_update_d) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index b31c5eb5ec..7b7f6c349e 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(False), by=0.1 ) # Ensure that we did actually take multiple iterations to process the @@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(100), by=0.1 + self.store.db_pool.updates.do_next_background_update(False), by=0.1 ) # Ensure that we did actually take multiple iterations to process the diff --git a/tests/storage/test_main.py b/tests/storage/test_main.py index d2b7b89952..f8d11bac4e 100644 --- a/tests/storage/test_main.py +++ b/tests/storage/test_main.py @@ -13,42 +13,35 @@ # limitations under the License. -from twisted.internet import defer - from synapse.types import UserID from tests import unittest -from tests.utils import setup_test_homeserver -class DataStoreTestCase(unittest.TestCase): - @defer.inlineCallbacks - def setUp(self): - hs = yield setup_test_homeserver(self.addCleanup) +class DataStoreTestCase(unittest.HomeserverTestCase): + def setUp(self) -> None: + super(DataStoreTestCase, self).setUp() - self.store = hs.get_datastore() + self.store = self.hs.get_datastore() self.user = UserID.from_string("@abcde:test") self.displayname = "Frank" - @defer.inlineCallbacks - def test_get_users_paginate(self): - yield defer.ensureDeferred( - self.store.register_user(self.user.to_string(), "pass") - ) - yield defer.ensureDeferred(self.store.create_profile(self.user.localpart)) - yield defer.ensureDeferred( + def test_get_users_paginate(self) -> None: + self.get_success(self.store.register_user(self.user.to_string(), "pass")) + self.get_success(self.store.create_profile(self.user.localpart)) + self.get_success( self.store.set_profile_displayname(self.user.localpart, self.displayname) ) - users, total = yield defer.ensureDeferred( + users, total = self.get_success( self.store.get_users_paginate(0, 10, name="bc", guests=False) ) self.assertEquals(1, total) self.assertEquals(self.displayname, users.pop()["displayname"]) - users, total = yield defer.ensureDeferred( + users, total = self.get_success( self.store.get_users_paginate(0, 10, name="BC", guests=False) ) diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py index 37cf7bb232..7f5b28aed8 100644 --- a/tests/storage/test_user_directory.py +++ b/tests/storage/test_user_directory.py @@ -23,6 +23,7 @@ from synapse.rest import admin from synapse.rest.client import login, register, room from synapse.server import HomeServer from synapse.storage import DataStore +from synapse.storage.background_updates import _BackgroundUpdateHandler from synapse.storage.roommember import ProfileInfo from synapse.util import Clock @@ -391,7 +392,9 @@ class UserDirectoryInitialPopulationTestcase(HomeserverTestCase): with mock.patch.dict( self.store.db_pool.updates._background_update_handlers, - populate_user_directory_process_users=mocked_process_users, + populate_user_directory_process_users=_BackgroundUpdateHandler( + mocked_process_users, + ), ): self._purge_and_rebuild_user_dir() |