diff options
Diffstat (limited to 'tests/storage')
-rw-r--r-- | tests/storage/databases/main/test_events_worker.py | 139 | ||||
-rw-r--r-- | tests/storage/test_appservice.py | 439 | ||||
-rw-r--r-- | tests/storage/test_background_update.py | 119 | ||||
-rw-r--r-- | tests/storage/test_event_chain.py | 4 | ||||
-rw-r--r-- | tests/storage/test_main.py | 27 | ||||
-rw-r--r-- | tests/storage/test_user_directory.py | 5 |
6 files changed, 236 insertions, 497 deletions
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py index 5ae491ff5a..a649e8c618 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py @@ -12,24 +12,11 @@ # See the License for the specific language governing permissions and # limitations under the License. import json -from contextlib import contextmanager -from typing import Generator -from twisted.enterprise.adbapi import ConnectionPool -from twisted.internet.defer import ensureDeferred -from twisted.test.proto_helpers import MemoryReactor - -from synapse.api.room_versions import EventFormatVersions, RoomVersions from synapse.logging.context import LoggingContext from synapse.rest import admin from synapse.rest.client import login, room -from synapse.server import HomeServer -from synapse.storage.databases.main.events_worker import ( - EVENT_QUEUE_THREADS, - EventsWorkerStore, -) -from synapse.storage.types import Connection -from synapse.util import Clock +from synapse.storage.databases.main.events_worker import EventsWorkerStore from synapse.util.async_helpers import yieldable_gather_results from tests import unittest @@ -157,127 +144,3 @@ class EventCacheTestCase(unittest.HomeserverTestCase): # We should have fetched the event from the DB self.assertEqual(ctx.get_resource_usage().evt_db_fetch_count, 1) - - -class DatabaseOutageTestCase(unittest.HomeserverTestCase): - """Test event fetching during a database outage.""" - - def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): - self.store: EventsWorkerStore = hs.get_datastore() - - self.room_id = f"!room:{hs.hostname}" - self.event_ids = [f"event{i}" for i in range(20)] - - self._populate_events() - - def _populate_events(self) -> None: - """Ensure that there are test events in the database. - - When testing with the in-memory SQLite database, all the events are lost during - the simulated outage. - - To ensure consistency between `room_id`s and `event_id`s before and after the - outage, rows are built and inserted manually. - - Upserts are used to handle the non-SQLite case where events are not lost. - """ - self.get_success( - self.store.db_pool.simple_upsert( - "rooms", - {"room_id": self.room_id}, - {"room_version": RoomVersions.V4.identifier}, - ) - ) - - self.event_ids = [f"event{i}" for i in range(20)] - for idx, event_id in enumerate(self.event_ids): - self.get_success( - self.store.db_pool.simple_upsert( - "events", - {"event_id": event_id}, - { - "event_id": event_id, - "room_id": self.room_id, - "topological_ordering": idx, - "stream_ordering": idx, - "type": "test", - "processed": True, - "outlier": False, - }, - ) - ) - self.get_success( - self.store.db_pool.simple_upsert( - "event_json", - {"event_id": event_id}, - { - "room_id": self.room_id, - "json": json.dumps({"type": "test", "room_id": self.room_id}), - "internal_metadata": "{}", - "format_version": EventFormatVersions.V3, - }, - ) - ) - - @contextmanager - def _outage(self) -> Generator[None, None, None]: - """Simulate a database outage. - - Returns: - A context manager. While the context is active, any attempts to connect to - the database will fail. - """ - connection_pool = self.store.db_pool._db_pool - - # Close all connections and shut down the database `ThreadPool`. - connection_pool.close() - - # Restart the database `ThreadPool`. - connection_pool.start() - - original_connection_factory = connection_pool.connectionFactory - - def connection_factory(_pool: ConnectionPool) -> Connection: - raise Exception("Could not connect to the database.") - - connection_pool.connectionFactory = connection_factory # type: ignore[assignment] - try: - yield - finally: - connection_pool.connectionFactory = original_connection_factory - - # If the in-memory SQLite database is being used, all the events are gone. - # Restore the test data. - self._populate_events() - - def test_failure(self) -> None: - """Test that event fetches do not get stuck during a database outage.""" - with self._outage(): - failure = self.get_failure( - self.store.get_event(self.event_ids[0]), Exception - ) - self.assertEqual(str(failure.value), "Could not connect to the database.") - - def test_recovery(self) -> None: - """Test that event fetchers recover after a database outage.""" - with self._outage(): - # Kick off a bunch of event fetches but do not pump the reactor - event_deferreds = [] - for event_id in self.event_ids: - event_deferreds.append(ensureDeferred(self.store.get_event(event_id))) - - # We should have maxed out on event fetcher threads - self.assertEqual(self.store._event_fetch_ongoing, EVENT_QUEUE_THREADS) - - # All the event fetchers will fail - self.pump() - self.assertEqual(self.store._event_fetch_ongoing, 0) - - for event_deferred in event_deferreds: - failure = self.get_failure(event_deferred, Exception) - self.assertEqual( - str(failure.value), "Could not connect to the database." - ) - - # This next event fetch should succeed - self.get_success(self.store.get_event(self.event_ids[0])) diff --git a/tests/storage/test_appservice.py b/tests/storage/test_appservice.py index 329490caad..f26d5acf9c 100644 --- a/tests/storage/test_appservice.py +++ b/tests/storage/test_appservice.py @@ -14,37 +14,35 @@ import json import os import tempfile -from typing import List, Optional, cast from unittest.mock import Mock import yaml from twisted.internet import defer -from twisted.test.proto_helpers import MemoryReactor from synapse.appservice import ApplicationService, ApplicationServiceState from synapse.config._base import ConfigError -from synapse.events import EventBase -from synapse.server import HomeServer from synapse.storage.database import DatabasePool, make_conn from synapse.storage.databases.main.appservice import ( ApplicationServiceStore, ApplicationServiceTransactionStore, ) -from synapse.util import Clock from tests import unittest from tests.test_utils import make_awaitable +from tests.utils import setup_test_homeserver -class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase): +class ApplicationServiceStoreTestCase(unittest.TestCase): + @defer.inlineCallbacks def setUp(self): - super(ApplicationServiceStoreTestCase, self).setUp() - - self.as_yaml_files: List[str] = [] + self.as_yaml_files = [] + hs = yield setup_test_homeserver( + self.addCleanup, federation_sender=Mock(), federation_client=Mock() + ) - self.hs.config.appservice.app_service_config_files = self.as_yaml_files - self.hs.config.caches.event_cache_size = 1 + hs.config.appservice.app_service_config_files = self.as_yaml_files + hs.config.caches.event_cache_size = 1 self.as_token = "token1" self.as_url = "some_url" @@ -55,14 +53,12 @@ class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase): self._add_appservice("token2", "as2", "some_url", "some_hs_token", "bob") self._add_appservice("token3", "as3", "some_url", "some_hs_token", "bob") # must be done after inserts - database = self.hs.get_datastores().databases[0] + database = hs.get_datastores().databases[0] self.store = ApplicationServiceStore( - database, - make_conn(database._database_config, database.engine, "test"), - self.hs, + database, make_conn(database._database_config, database.engine, "test"), hs ) - def tearDown(self) -> None: + def tearDown(self): # TODO: suboptimal that we need to create files for tests! for f in self.as_yaml_files: try: @@ -70,9 +66,7 @@ class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase): except Exception: pass - super(ApplicationServiceStoreTestCase, self).tearDown() - - def _add_appservice(self, as_token, id, url, hs_token, sender) -> None: + def _add_appservice(self, as_token, id, url, hs_token, sender): as_yaml = { "url": url, "as_token": as_token, @@ -86,13 +80,12 @@ class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - def test_retrieve_unknown_service_token(self) -> None: + def test_retrieve_unknown_service_token(self): service = self.store.get_app_service_by_token("invalid_token") self.assertEquals(service, None) - def test_retrieval_of_service(self) -> None: + def test_retrieval_of_service(self): stored_service = self.store.get_app_service_by_token(self.as_token) - assert stored_service is not None self.assertEquals(stored_service.token, self.as_token) self.assertEquals(stored_service.id, self.as_id) self.assertEquals(stored_service.url, self.as_url) @@ -100,18 +93,22 @@ class ApplicationServiceStoreTestCase(unittest.HomeserverTestCase): self.assertEquals(stored_service.namespaces[ApplicationService.NS_ROOMS], []) self.assertEquals(stored_service.namespaces[ApplicationService.NS_USERS], []) - def test_retrieval_of_all_services(self) -> None: + def test_retrieval_of_all_services(self): services = self.store.get_app_services() self.assertEquals(len(services), 3) -class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): - def setUp(self) -> None: - super(ApplicationServiceTransactionStoreTestCase, self).setUp() - self.as_yaml_files: List[str] = [] +class ApplicationServiceTransactionStoreTestCase(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + self.as_yaml_files = [] + + hs = yield setup_test_homeserver( + self.addCleanup, federation_sender=Mock(), federation_client=Mock() + ) - self.hs.config.appservice.app_service_config_files = self.as_yaml_files - self.hs.config.caches.event_cache_size = 1 + hs.config.appservice.app_service_config_files = self.as_yaml_files + hs.config.caches.event_cache_size = 1 self.as_list = [ {"token": "token1", "url": "https://matrix-as.org", "id": "id_1"}, @@ -120,21 +117,21 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): {"token": "gamma_tok", "url": "https://gamma.com", "id": "id_gamma"}, ] for s in self.as_list: - self._add_service(s["url"], s["token"], s["id"]) + yield self._add_service(s["url"], s["token"], s["id"]) self.as_yaml_files = [] # We assume there is only one database in these tests - database = self.hs.get_datastores().databases[0] + database = hs.get_datastores().databases[0] self.db_pool = database._db_pool self.engine = database.engine - db_config = self.hs.config.database.get_single_database() + db_config = hs.config.database.get_single_database() self.store = TestTransactionStore( - database, make_conn(db_config, self.engine, "test"), self.hs + database, make_conn(db_config, self.engine, "test"), hs ) - def _add_service(self, url, as_token, id) -> None: + def _add_service(self, url, as_token, id): as_yaml = { "url": url, "as_token": as_token, @@ -148,15 +145,13 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): outfile.write(yaml.dump(as_yaml)) self.as_yaml_files.append(as_token) - def _set_state( - self, id: str, state: ApplicationServiceState, txn: Optional[int] = None - ): + def _set_state(self, id, state, txn=None): return self.db_pool.runOperation( self.engine.convert_param_style( "INSERT INTO application_services_state(as_id, state, last_txn) " "VALUES(?,?,?)" ), - (id, state.value, txn), + (id, state, txn), ) def _insert_txn(self, as_id, txn_id, events): @@ -174,277 +169,234 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): "INSERT INTO application_services_state(as_id, last_txn, state) " "VALUES(?,?,?)" ), - (as_id, txn_id, ApplicationServiceState.UP.value), + (as_id, txn_id, ApplicationServiceState.UP), ) - def test_get_appservice_state_none( - self, - ) -> None: + @defer.inlineCallbacks + def test_get_appservice_state_none(self): service = Mock(id="999") - state = self.get_success(self.store.get_appservice_state(service)) + state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) self.assertEquals(None, state) - def test_get_appservice_state_up( - self, - ) -> None: - self.get_success( - self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) - ) + @defer.inlineCallbacks + def test_get_appservice_state_up(self): + yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) service = Mock(id=self.as_list[0]["id"]) - state = self.get_success( - defer.ensureDeferred(self.store.get_appservice_state(service)) - ) + state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) self.assertEquals(ApplicationServiceState.UP, state) - def test_get_appservice_state_down( - self, - ) -> None: - self.get_success( - self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) - ) - self.get_success( - self._set_state(self.as_list[1]["id"], ApplicationServiceState.DOWN) - ) - self.get_success( - self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) - ) + @defer.inlineCallbacks + def test_get_appservice_state_down(self): + yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.UP) + yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.DOWN) + yield self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) service = Mock(id=self.as_list[1]["id"]) - state = self.get_success(self.store.get_appservice_state(service)) + state = yield defer.ensureDeferred(self.store.get_appservice_state(service)) self.assertEquals(ApplicationServiceState.DOWN, state) - def test_get_appservices_by_state_none( - self, - ) -> None: - services = self.get_success( + @defer.inlineCallbacks + def test_get_appservices_by_state_none(self): + services = yield defer.ensureDeferred( self.store.get_appservices_by_state(ApplicationServiceState.DOWN) ) self.assertEquals(0, len(services)) - def test_set_appservices_state_down( - self, - ) -> None: + @defer.inlineCallbacks + def test_set_appservices_state_down(self): service = Mock(id=self.as_list[1]["id"]) - self.get_success( + yield defer.ensureDeferred( self.store.set_appservice_state(service, ApplicationServiceState.DOWN) ) - rows = self.get_success( - self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT as_id FROM application_services_state WHERE state=?" - ), - (ApplicationServiceState.DOWN.value,), - ) + rows = yield self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT as_id FROM application_services_state WHERE state=?" + ), + (ApplicationServiceState.DOWN,), ) self.assertEquals(service.id, rows[0][0]) - def test_set_appservices_state_multiple_up( - self, - ) -> None: + @defer.inlineCallbacks + def test_set_appservices_state_multiple_up(self): service = Mock(id=self.as_list[1]["id"]) - self.get_success( + yield defer.ensureDeferred( self.store.set_appservice_state(service, ApplicationServiceState.UP) ) - self.get_success( + yield defer.ensureDeferred( self.store.set_appservice_state(service, ApplicationServiceState.DOWN) ) - self.get_success( + yield defer.ensureDeferred( self.store.set_appservice_state(service, ApplicationServiceState.UP) ) - rows = self.get_success( - self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT as_id FROM application_services_state WHERE state=?" - ), - (ApplicationServiceState.UP.value,), - ) + rows = yield self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT as_id FROM application_services_state WHERE state=?" + ), + (ApplicationServiceState.UP,), ) self.assertEquals(service.id, rows[0][0]) - def test_create_appservice_txn_first( - self, - ) -> None: + @defer.inlineCallbacks + def test_create_appservice_txn_first(self): service = Mock(id=self.as_list[0]["id"]) - events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - txn = self.get_success( - defer.ensureDeferred(self.store.create_appservice_txn(service, events, [])) + events = [Mock(event_id="e1"), Mock(event_id="e2")] + txn = yield defer.ensureDeferred( + self.store.create_appservice_txn(service, events, []) ) self.assertEquals(txn.id, 1) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - def test_create_appservice_txn_older_last_txn( - self, - ) -> None: + @defer.inlineCallbacks + def test_create_appservice_txn_older_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - self.get_success(self._set_last_txn(service.id, 9643)) # AS is falling behind - self.get_success(self._insert_txn(service.id, 9644, events)) - self.get_success(self._insert_txn(service.id, 9645, events)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + events = [Mock(event_id="e1"), Mock(event_id="e2")] + yield self._set_last_txn(service.id, 9643) # AS is falling behind + yield self._insert_txn(service.id, 9644, events) + yield self._insert_txn(service.id, 9645, events) + txn = yield defer.ensureDeferred( + self.store.create_appservice_txn(service, events, []) + ) self.assertEquals(txn.id, 9646) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - def test_create_appservice_txn_up_to_date_last_txn( - self, - ) -> None: + @defer.inlineCallbacks + def test_create_appservice_txn_up_to_date_last_txn(self): service = Mock(id=self.as_list[0]["id"]) - events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - self.get_success(self._set_last_txn(service.id, 9643)) - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + events = [Mock(event_id="e1"), Mock(event_id="e2")] + yield self._set_last_txn(service.id, 9643) + txn = yield defer.ensureDeferred( + self.store.create_appservice_txn(service, events, []) + ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - def test_create_appservice_txn_up_fuzzing( - self, - ) -> None: + @defer.inlineCallbacks + def test_create_appservice_txn_up_fuzzing(self): service = Mock(id=self.as_list[0]["id"]) - events = cast(List[EventBase], [Mock(event_id="e1"), Mock(event_id="e2")]) - self.get_success(self._set_last_txn(service.id, 9643)) + events = [Mock(event_id="e1"), Mock(event_id="e2")] + yield self._set_last_txn(service.id, 9643) # dump in rows with higher IDs to make sure the queries aren't wrong. - self.get_success(self._set_last_txn(self.as_list[1]["id"], 119643)) - self.get_success(self._set_last_txn(self.as_list[2]["id"], 9)) - self.get_success(self._set_last_txn(self.as_list[3]["id"], 9643)) - self.get_success(self._insert_txn(self.as_list[1]["id"], 119644, events)) - self.get_success(self._insert_txn(self.as_list[1]["id"], 119645, events)) - self.get_success(self._insert_txn(self.as_list[1]["id"], 119646, events)) - self.get_success(self._insert_txn(self.as_list[2]["id"], 10, events)) - self.get_success(self._insert_txn(self.as_list[3]["id"], 9643, events)) - - txn = self.get_success(self.store.create_appservice_txn(service, events, [])) + yield self._set_last_txn(self.as_list[1]["id"], 119643) + yield self._set_last_txn(self.as_list[2]["id"], 9) + yield self._set_last_txn(self.as_list[3]["id"], 9643) + yield self._insert_txn(self.as_list[1]["id"], 119644, events) + yield self._insert_txn(self.as_list[1]["id"], 119645, events) + yield self._insert_txn(self.as_list[1]["id"], 119646, events) + yield self._insert_txn(self.as_list[2]["id"], 10, events) + yield self._insert_txn(self.as_list[3]["id"], 9643, events) + + txn = yield defer.ensureDeferred( + self.store.create_appservice_txn(service, events, []) + ) self.assertEquals(txn.id, 9644) self.assertEquals(txn.events, events) self.assertEquals(txn.service, service) - def test_complete_appservice_txn_first_txn( - self, - ) -> None: + @defer.inlineCallbacks + def test_complete_appservice_txn_first_txn(self): service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 1 - self.get_success(self._insert_txn(service.id, txn_id, events)) - self.get_success( + yield self._insert_txn(service.id, txn_id, events) + yield defer.ensureDeferred( self.store.complete_appservice_txn(txn_id=txn_id, service=service) ) - res = self.get_success( - self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT last_txn FROM application_services_state WHERE as_id=?" - ), - (service.id,), - ) + res = yield self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT last_txn FROM application_services_state WHERE as_id=?" + ), + (service.id,), ) self.assertEquals(1, len(res)) self.assertEquals(txn_id, res[0][0]) - res = self.get_success( - self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT * FROM application_services_txns WHERE txn_id=?" - ), - (txn_id,), - ) + res = yield self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT * FROM application_services_txns WHERE txn_id=?" + ), + (txn_id,), ) self.assertEquals(0, len(res)) - def test_complete_appservice_txn_existing_in_state_table( - self, - ) -> None: + @defer.inlineCallbacks + def test_complete_appservice_txn_existing_in_state_table(self): service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] txn_id = 5 - self.get_success(self._set_last_txn(service.id, 4)) - self.get_success(self._insert_txn(service.id, txn_id, events)) - self.get_success( + yield self._set_last_txn(service.id, 4) + yield self._insert_txn(service.id, txn_id, events) + yield defer.ensureDeferred( self.store.complete_appservice_txn(txn_id=txn_id, service=service) ) - res = self.get_success( - self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT last_txn, state FROM application_services_state WHERE as_id=?" - ), - (service.id,), - ) + res = yield self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT last_txn, state FROM application_services_state WHERE as_id=?" + ), + (service.id,), ) self.assertEquals(1, len(res)) self.assertEquals(txn_id, res[0][0]) - self.assertEquals(ApplicationServiceState.UP.value, res[0][1]) - - res = self.get_success( - self.db_pool.runQuery( - self.engine.convert_param_style( - "SELECT * FROM application_services_txns WHERE txn_id=?" - ), - (txn_id,), - ) + self.assertEquals(ApplicationServiceState.UP, res[0][1]) + + res = yield self.db_pool.runQuery( + self.engine.convert_param_style( + "SELECT * FROM application_services_txns WHERE txn_id=?" + ), + (txn_id,), ) self.assertEquals(0, len(res)) - def test_get_oldest_unsent_txn_none( - self, - ) -> None: + @defer.inlineCallbacks + def test_get_oldest_unsent_txn_none(self): service = Mock(id=self.as_list[0]["id"]) - txn = self.get_success(self.store.get_oldest_unsent_txn(service)) + txn = yield defer.ensureDeferred(self.store.get_oldest_unsent_txn(service)) self.assertEquals(None, txn) - def test_get_oldest_unsent_txn(self) -> None: + @defer.inlineCallbacks + def test_get_oldest_unsent_txn(self): service = Mock(id=self.as_list[0]["id"]) events = [Mock(event_id="e1"), Mock(event_id="e2")] other_events = [Mock(event_id="e5"), Mock(event_id="e6")] # we aren't testing store._base stuff here, so mock this out - # (ignore needed because Mypy won't allow us to assign to a method otherwise) - self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) # type: ignore[assignment] + self.store.get_events_as_list = Mock(return_value=make_awaitable(events)) - self.get_success(self._insert_txn(self.as_list[1]["id"], 9, other_events)) - self.get_success(self._insert_txn(service.id, 10, events)) - self.get_success(self._insert_txn(service.id, 11, other_events)) - self.get_success(self._insert_txn(service.id, 12, other_events)) + yield self._insert_txn(self.as_list[1]["id"], 9, other_events) + yield self._insert_txn(service.id, 10, events) + yield self._insert_txn(service.id, 11, other_events) + yield self._insert_txn(service.id, 12, other_events) - txn = self.get_success(self.store.get_oldest_unsent_txn(service)) + txn = yield defer.ensureDeferred(self.store.get_oldest_unsent_txn(service)) self.assertEquals(service, txn.service) self.assertEquals(10, txn.id) self.assertEquals(events, txn.events) - def test_get_appservices_by_state_single( - self, - ) -> None: - self.get_success( - self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) - ) - self.get_success( - self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) - ) + @defer.inlineCallbacks + def test_get_appservices_by_state_single(self): + yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) + yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) - services = self.get_success( + services = yield defer.ensureDeferred( self.store.get_appservices_by_state(ApplicationServiceState.DOWN) ) self.assertEquals(1, len(services)) self.assertEquals(self.as_list[0]["id"], services[0].id) - def test_get_appservices_by_state_multiple( - self, - ) -> None: - self.get_success( - self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) - ) - self.get_success( - self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) - ) - self.get_success( - self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) - ) - self.get_success( - self._set_state(self.as_list[3]["id"], ApplicationServiceState.UP) - ) + @defer.inlineCallbacks + def test_get_appservices_by_state_multiple(self): + yield self._set_state(self.as_list[0]["id"], ApplicationServiceState.DOWN) + yield self._set_state(self.as_list[1]["id"], ApplicationServiceState.UP) + yield self._set_state(self.as_list[2]["id"], ApplicationServiceState.DOWN) + yield self._set_state(self.as_list[3]["id"], ApplicationServiceState.UP) - services = self.get_success( + services = yield defer.ensureDeferred( self.store.get_appservices_by_state(ApplicationServiceState.DOWN) ) self.assertEquals(2, len(services)) @@ -455,16 +407,16 @@ class ApplicationServiceTransactionStoreTestCase(unittest.HomeserverTestCase): class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): - def prepare( - self, reactor: MemoryReactor, clock: Clock, homeserver: HomeServer - ) -> None: + def make_homeserver(self, reactor, clock): + hs = self.setup_test_homeserver() + return hs + + def prepare(self, hs, reactor, clock): self.service = Mock(id="foo") self.store = self.hs.get_datastore() - self.get_success( - self.store.set_appservice_state(self.service, ApplicationServiceState.UP) - ) + self.get_success(self.store.set_appservice_state(self.service, "up")) - def test_get_type_stream_id_for_appservice_no_value(self) -> None: + def test_get_type_stream_id_for_appservice_no_value(self): value = self.get_success( self.store.get_type_stream_id_for_appservice(self.service, "read_receipt") ) @@ -475,13 +427,13 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ) self.assertEquals(value, 0) - def test_get_type_stream_id_for_appservice_invalid_type(self) -> None: + def test_get_type_stream_id_for_appservice_invalid_type(self): self.get_failure( self.store.get_type_stream_id_for_appservice(self.service, "foobar"), ValueError, ) - def test_set_type_stream_id_for_appservice(self) -> None: + def test_set_type_stream_id_for_appservice(self): read_receipt_value = 1024 self.get_success( self.store.set_type_stream_id_for_appservice( @@ -503,7 +455,7 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): ) self.assertEqual(result, read_receipt_value) - def test_set_type_stream_id_for_appservice_invalid_type(self) -> None: + def test_set_type_stream_id_for_appservice_invalid_type(self): self.get_failure( self.store.set_type_stream_id_for_appservice(self.service, "foobar", 1024), ValueError, @@ -512,12 +464,12 @@ class ApplicationServiceStoreTypeStreamIds(unittest.HomeserverTestCase): # required for ApplicationServiceTransactionStoreTestCase tests class TestTransactionStore(ApplicationServiceTransactionStore, ApplicationServiceStore): - def __init__(self, database: DatabasePool, db_conn, hs) -> None: + def __init__(self, database: DatabasePool, db_conn, hs): super().__init__(database, db_conn, hs) -class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase): - def _write_config(self, suffix, **kwargs) -> str: +class ApplicationServiceStoreConfigTestCase(unittest.TestCase): + def _write_config(self, suffix, **kwargs): vals = { "id": "id" + suffix, "url": "url" + suffix, @@ -533,33 +485,41 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase): f.write(yaml.dump(vals)) return path - def test_unique_works(self) -> None: + @defer.inlineCallbacks + def test_unique_works(self): f1 = self._write_config(suffix="1") f2 = self._write_config(suffix="2") - self.hs.config.appservice.app_service_config_files = [f1, f2] - self.hs.config.caches.event_cache_size = 1 + hs = yield setup_test_homeserver( + self.addCleanup, federation_sender=Mock(), federation_client=Mock() + ) + + hs.config.appservice.app_service_config_files = [f1, f2] + hs.config.caches.event_cache_size = 1 - database = self.hs.get_datastores().databases[0] + database = hs.get_datastores().databases[0] ApplicationServiceStore( - database, - make_conn(database._database_config, database.engine, "test"), - self.hs, + database, make_conn(database._database_config, database.engine, "test"), hs ) - def test_duplicate_ids(self) -> None: + @defer.inlineCallbacks + def test_duplicate_ids(self): f1 = self._write_config(id="id", suffix="1") f2 = self._write_config(id="id", suffix="2") - self.hs.config.appservice.app_service_config_files = [f1, f2] - self.hs.config.caches.event_cache_size = 1 + hs = yield setup_test_homeserver( + self.addCleanup, federation_sender=Mock(), federation_client=Mock() + ) + + hs.config.appservice.app_service_config_files = [f1, f2] + hs.config.caches.event_cache_size = 1 with self.assertRaises(ConfigError) as cm: - database = self.hs.get_datastores().databases[0] + database = hs.get_datastores().databases[0] ApplicationServiceStore( database, make_conn(database._database_config, database.engine, "test"), - self.hs, + hs, ) e = cm.exception @@ -567,19 +527,24 @@ class ApplicationServiceStoreConfigTestCase(unittest.HomeserverTestCase): self.assertIn(f2, str(e)) self.assertIn("id", str(e)) - def test_duplicate_as_tokens(self) -> None: + @defer.inlineCallbacks + def test_duplicate_as_tokens(self): f1 = self._write_config(as_token="as_token", suffix="1") f2 = self._write_config(as_token="as_token", suffix="2") - self.hs.config.appservice.app_service_config_files = [f1, f2] - self.hs.config.caches.event_cache_size = 1 + hs = yield setup_test_homeserver( + self.addCleanup, federation_sender=Mock(), federation_client=Mock() + ) + + hs.config.appservice.app_service_config_files = [f1, f2] + hs.config.caches.event_cache_size = 1 with self.assertRaises(ConfigError) as cm: - database = self.hs.get_datastores().databases[0] + database = hs.get_datastores().databases[0] ApplicationServiceStore( database, make_conn(database._database_config, database.engine, "test"), - self.hs, + hs, ) e = cm.exception diff --git a/tests/storage/test_background_update.py b/tests/storage/test_background_update.py index d77c001506..a5f5ebad41 100644 --- a/tests/storage/test_background_update.py +++ b/tests/storage/test_background_update.py @@ -1,26 +1,8 @@ -# Copyright 2021 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# Use backported mock for AsyncMock support on Python 3.6. -from mock import Mock - -from twisted.internet.defer import Deferred, ensureDeferred +from unittest.mock import Mock from synapse.storage.background_updates import BackgroundUpdater from tests import unittest -from tests.test_utils import make_awaitable class BackgroundUpdateTestCase(unittest.HomeserverTestCase): @@ -38,10 +20,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): def test_do_background_update(self): # the time we claim it takes to update one item when running the update - duration_ms = 10 + duration_ms = 4200 # the target runtime for each bg update - target_background_update_duration_ms = 100 + target_background_update_duration_ms = 5000000 store = self.hs.get_datastore() self.get_success( @@ -66,8 +48,10 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() res = self.get_success( - self.updates.do_next_background_update(False), - by=0.01, + self.updates.do_next_background_update( + target_background_update_duration_ms + ), + by=0.1, ) self.assertFalse(res) @@ -90,93 +74,16 @@ class BackgroundUpdateTestCase(unittest.HomeserverTestCase): self.update_handler.side_effect = update self.update_handler.reset_mock() - result = self.get_success(self.updates.do_next_background_update(False)) + result = self.get_success( + self.updates.do_next_background_update(target_background_update_duration_ms) + ) self.assertFalse(result) self.update_handler.assert_called_once() # third step: we don't expect to be called any more self.update_handler.reset_mock() - result = self.get_success(self.updates.do_next_background_update(False)) + result = self.get_success( + self.updates.do_next_background_update(target_background_update_duration_ms) + ) self.assertTrue(result) self.assertFalse(self.update_handler.called) - - -class BackgroundUpdateControllerTestCase(unittest.HomeserverTestCase): - def prepare(self, reactor, clock, homeserver): - self.updates: BackgroundUpdater = self.hs.get_datastore().db_pool.updates - # the base test class should have run the real bg updates for us - self.assertTrue( - self.get_success(self.updates.has_completed_background_updates()) - ) - - self.update_deferred = Deferred() - self.update_handler = Mock(return_value=self.update_deferred) - self.updates.register_background_update_handler( - "test_update", self.update_handler - ) - - # Mock out the AsyncContextManager - self._update_ctx_manager = Mock(spec=["__aenter__", "__aexit__"]) - self._update_ctx_manager.__aenter__ = Mock( - return_value=make_awaitable(None), - ) - self._update_ctx_manager.__aexit__ = Mock(return_value=make_awaitable(None)) - - # Mock out the `update_handler` callback - self._on_update = Mock(return_value=self._update_ctx_manager) - - # Define a default batch size value that's not the same as the internal default - # value (100). - self._default_batch_size = 500 - - # Register the callbacks with more mocks - self.hs.get_module_api().register_background_update_controller_callbacks( - on_update=self._on_update, - min_batch_size=Mock(return_value=make_awaitable(self._default_batch_size)), - default_batch_size=Mock( - return_value=make_awaitable(self._default_batch_size), - ), - ) - - def test_controller(self): - store = self.hs.get_datastore() - self.get_success( - store.db_pool.simple_insert( - "background_updates", - values={"update_name": "test_update", "progress_json": "{}"}, - ) - ) - - # Set the return value for the context manager. - enter_defer = Deferred() - self._update_ctx_manager.__aenter__ = Mock(return_value=enter_defer) - - # Start the background update. - do_update_d = ensureDeferred(self.updates.do_next_background_update(True)) - - self.pump() - - # `run_update` should have been called, but the update handler won't be - # called until the `enter_defer` (returned by `__aenter__`) is resolved. - self._on_update.assert_called_once_with( - "test_update", - "master", - False, - ) - self.assertFalse(do_update_d.called) - self.assertFalse(self.update_deferred.called) - - # Resolving the `enter_defer` should call the update handler, which then - # blocks. - enter_defer.callback(100) - self.pump() - self.update_handler.assert_called_once_with({}, self._default_batch_size) - self.assertFalse(self.update_deferred.called) - self._update_ctx_manager.__aexit__.assert_not_called() - - # Resolving the update handler deferred should cause the - # `do_next_background_update` to finish and return - self.update_deferred.callback(100) - self.pump() - self._update_ctx_manager.__aexit__.assert_called() - self.get_success(do_update_d) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 7b7f6c349e..b31c5eb5ec 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -664,7 +664,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(False), by=0.1 + self.store.db_pool.updates.do_next_background_update(100), by=0.1 ) # Ensure that we did actually take multiple iterations to process the @@ -723,7 +723,7 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase): ): iterations += 1 self.get_success( - self.store.db_pool.updates.do_next_background_update(False), by=0.1 + self.store.db_pool.updates.do_next_background_update(100), by=0.1 ) # Ensure that we did actually take multiple iterations to process the diff --git a/tests/storage/test_main.py b/tests/storage/test_main.py index f8d11bac4e..d2b7b89952 100644 --- a/tests/storage/test_main.py +++ b/tests/storage/test_main.py @@ -13,35 +13,42 @@ # limitations under the License. +from twisted.internet import defer + from synapse.types import UserID from tests import unittest +from tests.utils import setup_test_homeserver -class DataStoreTestCase(unittest.HomeserverTestCase): - def setUp(self) -> None: - super(DataStoreTestCase, self).setUp() +class DataStoreTestCase(unittest.TestCase): + @defer.inlineCallbacks + def setUp(self): + hs = yield setup_test_homeserver(self.addCleanup) - self.store = self.hs.get_datastore() + self.store = hs.get_datastore() self.user = UserID.from_string("@abcde:test") self.displayname = "Frank" - def test_get_users_paginate(self) -> None: - self.get_success(self.store.register_user(self.user.to_string(), "pass")) - self.get_success(self.store.create_profile(self.user.localpart)) - self.get_success( + @defer.inlineCallbacks + def test_get_users_paginate(self): + yield defer.ensureDeferred( + self.store.register_user(self.user.to_string(), "pass") + ) + yield defer.ensureDeferred(self.store.create_profile(self.user.localpart)) + yield defer.ensureDeferred( self.store.set_profile_displayname(self.user.localpart, self.displayname) ) - users, total = self.get_success( + users, total = yield defer.ensureDeferred( self.store.get_users_paginate(0, 10, name="bc", guests=False) ) self.assertEquals(1, total) self.assertEquals(self.displayname, users.pop()["displayname"]) - users, total = self.get_success( + users, total = yield defer.ensureDeferred( self.store.get_users_paginate(0, 10, name="BC", guests=False) ) diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py index 7f5b28aed8..37cf7bb232 100644 --- a/tests/storage/test_user_directory.py +++ b/tests/storage/test_user_directory.py @@ -23,7 +23,6 @@ from synapse.rest import admin from synapse.rest.client import login, register, room from synapse.server import HomeServer from synapse.storage import DataStore -from synapse.storage.background_updates import _BackgroundUpdateHandler from synapse.storage.roommember import ProfileInfo from synapse.util import Clock @@ -392,9 +391,7 @@ class UserDirectoryInitialPopulationTestcase(HomeserverTestCase): with mock.patch.dict( self.store.db_pool.updates._background_update_handlers, - populate_user_directory_process_users=_BackgroundUpdateHandler( - mocked_process_users, - ), + populate_user_directory_process_users=mocked_process_users, ): self._purge_and_rebuild_user_dir() |