diff options
Diffstat (limited to 'tests/storage')
-rw-r--r-- | tests/storage/test_cleanup_extrems.py | 254 | ||||
-rw-r--r-- | tests/storage/test_devices.py | 69 | ||||
-rw-r--r-- | tests/storage/test_keys.py | 70 |
3 files changed, 374 insertions, 19 deletions
diff --git a/tests/storage/test_cleanup_extrems.py b/tests/storage/test_cleanup_extrems.py new file mode 100644 index 0000000000..6aa8b8b3c6 --- /dev/null +++ b/tests/storage/test_cleanup_extrems.py @@ -0,0 +1,254 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import os.path + +from synapse.api.constants import EventTypes +from synapse.storage import prepare_database +from synapse.types import Requester, UserID + +from tests.unittest import HomeserverTestCase + + +class CleanupExtremBackgroundUpdateStoreTestCase(HomeserverTestCase): + """Test the background update to clean forward extremities table. + """ + def make_homeserver(self, reactor, clock): + # Hack until we understand why test_forked_graph_cleanup fails with v4 + config = self.default_config() + config['default_room_version'] = '1' + return self.setup_test_homeserver(config=config) + + def prepare(self, reactor, clock, homeserver): + self.store = homeserver.get_datastore() + self.event_creator = homeserver.get_event_creation_handler() + self.room_creator = homeserver.get_room_creation_handler() + + # Create a test user and room + self.user = UserID("alice", "test") + self.requester = Requester(self.user, None, False, None, None) + info = self.get_success(self.room_creator.create_room(self.requester, {})) + self.room_id = info["room_id"] + + def create_and_send_event(self, soft_failed=False, prev_event_ids=None): + """Create and send an event. + + Args: + soft_failed (bool): Whether to create a soft failed event or not + prev_event_ids (list[str]|None): Explicitly set the prev events, + or if None just use the default + + Returns: + str: The new event's ID. + """ + prev_events_and_hashes = None + if prev_event_ids: + prev_events_and_hashes = [[p, {}, 0] for p in prev_event_ids] + + event, context = self.get_success( + self.event_creator.create_event( + self.requester, + { + "type": EventTypes.Message, + "room_id": self.room_id, + "sender": self.user.to_string(), + "content": {"body": "", "msgtype": "m.text"}, + }, + prev_events_and_hashes=prev_events_and_hashes, + ) + ) + + if soft_failed: + event.internal_metadata.soft_failed = True + + self.get_success( + self.event_creator.send_nonmember_event(self.requester, event, context) + ) + + return event.event_id + + def add_extremity(self, event_id): + """Add the given event as an extremity to the room. + """ + self.get_success( + self.store._simple_insert( + table="event_forward_extremities", + values={"room_id": self.room_id, "event_id": event_id}, + desc="test_add_extremity", + ) + ) + + self.store.get_latest_event_ids_in_room.invalidate((self.room_id,)) + + def run_background_update(self): + """Re run the background update to clean up the extremities. + """ + # Make sure we don't clash with in progress updates. + self.assertTrue(self.store._all_done, "Background updates are still ongoing") + + schema_path = os.path.join( + prepare_database.dir_path, + "schema", + "delta", + "54", + "delete_forward_extremities.sql", + ) + + def run_delta_file(txn): + prepare_database.executescript(txn, schema_path) + + self.get_success( + self.store.runInteraction("test_delete_forward_extremities", run_delta_file) + ) + + # Ugh, have to reset this flag + self.store._all_done = False + + while not self.get_success(self.store.has_completed_background_updates()): + self.get_success(self.store.do_next_background_update(100), by=0.1) + + def test_soft_failed_extremities_handled_correctly(self): + """Test that extremities are correctly calculated in the presence of + soft failed events. + + Tests a graph like: + + A <- SF1 <- SF2 <- B + + Where SF* are soft failed. + """ + + # Create the room graph + event_id_1 = self.create_and_send_event() + event_id_2 = self.create_and_send_event(True, [event_id_1]) + event_id_3 = self.create_and_send_event(True, [event_id_2]) + event_id_4 = self.create_and_send_event(False, [event_id_3]) + + # Check the latest events are as expected + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + + self.assertEqual(latest_event_ids, [event_id_4]) + + def test_basic_cleanup(self): + """Test that extremities are correctly calculated in the presence of + soft failed events. + + Tests a graph like: + + A <- SF1 <- B + + Where SF* are soft failed, and with extremities of A and B + """ + # Create the room graph + event_id_a = self.create_and_send_event() + event_id_sf1 = self.create_and_send_event(True, [event_id_a]) + event_id_b = self.create_and_send_event(False, [event_id_sf1]) + + # Add the new extremity and check the latest events are as expected + self.add_extremity(event_id_a) + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b))) + + # Run the background update and check it did the right thing + self.run_background_update() + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual(latest_event_ids, [event_id_b]) + + def test_chain_of_fail_cleanup(self): + """Test that extremities are correctly calculated in the presence of + soft failed events. + + Tests a graph like: + + A <- SF1 <- SF2 <- B + + Where SF* are soft failed, and with extremities of A and B + """ + # Create the room graph + event_id_a = self.create_and_send_event() + event_id_sf1 = self.create_and_send_event(True, [event_id_a]) + event_id_sf2 = self.create_and_send_event(True, [event_id_sf1]) + event_id_b = self.create_and_send_event(False, [event_id_sf2]) + + # Add the new extremity and check the latest events are as expected + self.add_extremity(event_id_a) + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual(set(latest_event_ids), set((event_id_a, event_id_b))) + + # Run the background update and check it did the right thing + self.run_background_update() + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual(latest_event_ids, [event_id_b]) + + def test_forked_graph_cleanup(self): + r"""Test that extremities are correctly calculated in the presence of + soft failed events. + + Tests a graph like, where time flows down the page: + + A B + / \ / + / \ / + SF1 SF2 + | | + SF3 | + / \ | + | \ | + C SF4 + + Where SF* are soft failed, and with them A, B and C marked as + extremities. This should resolve to B and C being marked as extremity. + """ + + # Create the room graph + event_id_a = self.create_and_send_event() + event_id_b = self.create_and_send_event() + event_id_sf1 = self.create_and_send_event(True, [event_id_a]) + event_id_sf2 = self.create_and_send_event(True, [event_id_a, event_id_b]) + event_id_sf3 = self.create_and_send_event(True, [event_id_sf1]) + self.create_and_send_event(True, [event_id_sf2, event_id_sf3]) # SF4 + event_id_c = self.create_and_send_event(False, [event_id_sf3]) + + # Add the new extremity and check the latest events are as expected + self.add_extremity(event_id_a) + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual( + set(latest_event_ids), set((event_id_a, event_id_b, event_id_c)) + ) + + # Run the background update and check it did the right thing + self.run_background_update() + + latest_event_ids = self.get_success( + self.store.get_latest_event_ids_in_room(self.room_id) + ) + self.assertEqual(set(latest_event_ids), set([event_id_b, event_id_c])) diff --git a/tests/storage/test_devices.py b/tests/storage/test_devices.py index aef4dfaf57..6396ccddb5 100644 --- a/tests/storage/test_devices.py +++ b/tests/storage/test_devices.py @@ -72,6 +72,75 @@ class DeviceStoreTestCase(tests.unittest.TestCase): ) @defer.inlineCallbacks + def test_get_devices_by_remote(self): + device_ids = ["device_id1", "device_id2"] + + # Add two device updates with a single stream_id + yield self.store.add_device_change_to_streams( + "user_id", device_ids, ["somehost"], + ) + + # Get all device updates ever meant for this remote + now_stream_id, device_updates = yield self.store.get_devices_by_remote( + "somehost", -1, limit=100, + ) + + # Check original device_ids are contained within these updates + self._check_devices_in_updates(device_ids, device_updates) + + @defer.inlineCallbacks + def test_get_devices_by_remote_limited(self): + # Test breaking the update limit in 1, 101, and 1 device_id segments + + # first add one device + device_ids1 = ["device_id0"] + yield self.store.add_device_change_to_streams( + "user_id", device_ids1, ["someotherhost"], + ) + + # then add 101 + device_ids2 = ["device_id" + str(i + 1) for i in range(101)] + yield self.store.add_device_change_to_streams( + "user_id", device_ids2, ["someotherhost"], + ) + + # then one more + device_ids3 = ["newdevice"] + yield self.store.add_device_change_to_streams( + "user_id", device_ids3, ["someotherhost"], + ) + + # + # now read them back. + # + + # first we should get a single update + now_stream_id, device_updates = yield self.store.get_devices_by_remote( + "someotherhost", -1, limit=100, + ) + self._check_devices_in_updates(device_ids1, device_updates) + + # Then we should get an empty list back as the 101 devices broke the limit + now_stream_id, device_updates = yield self.store.get_devices_by_remote( + "someotherhost", now_stream_id, limit=100, + ) + self.assertEqual(len(device_updates), 0) + + # The 101 devices should've been cleared, so we should now just get one device + # update + now_stream_id, device_updates = yield self.store.get_devices_by_remote( + "someotherhost", now_stream_id, limit=100, + ) + self._check_devices_in_updates(device_ids3, device_updates) + + def _check_devices_in_updates(self, expected_device_ids, device_updates): + """Check that an specific device ids exist in a list of device update EDUs""" + self.assertEqual(len(device_updates), len(expected_device_ids)) + + received_device_ids = {update["device_id"] for update in device_updates} + self.assertEqual(received_device_ids, set(expected_device_ids)) + + @defer.inlineCallbacks def test_update_device(self): yield self.store.store_device("user_id", "device_id", "display_name 1") diff --git a/tests/storage/test_keys.py b/tests/storage/test_keys.py index 6bfaa00fe9..e07ff01201 100644 --- a/tests/storage/test_keys.py +++ b/tests/storage/test_keys.py @@ -17,6 +17,8 @@ import signedjson.key from twisted.internet.defer import Deferred +from synapse.storage.keys import FetchKeyResult + import tests.unittest KEY_1 = signedjson.key.decode_verify_key_base64( @@ -31,23 +33,34 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase): def test_get_server_verify_keys(self): store = self.hs.get_datastore() - d = store.store_server_verify_key("server1", "from_server", 0, KEY_1) - self.get_success(d) - d = store.store_server_verify_key("server1", "from_server", 0, KEY_2) + key_id_1 = "ed25519:key1" + key_id_2 = "ed25519:KEY_ID_2" + d = store.store_server_verify_keys( + "from_server", + 10, + [ + ("server1", key_id_1, FetchKeyResult(KEY_1, 100)), + ("server1", key_id_2, FetchKeyResult(KEY_2, 200)), + ], + ) self.get_success(d) d = store.get_server_verify_keys( - [ - ("server1", "ed25519:key1"), - ("server1", "ed25519:key2"), - ("server1", "ed25519:key3"), - ] + [("server1", key_id_1), ("server1", key_id_2), ("server1", "ed25519:key3")] ) res = self.get_success(d) self.assertEqual(len(res.keys()), 3) - self.assertEqual(res[("server1", "ed25519:key1")].version, "key1") - self.assertEqual(res[("server1", "ed25519:key2")].version, "key2") + res1 = res[("server1", key_id_1)] + self.assertEqual(res1.verify_key, KEY_1) + self.assertEqual(res1.verify_key.version, "key1") + self.assertEqual(res1.valid_until_ts, 100) + + res2 = res[("server1", key_id_2)] + self.assertEqual(res2.verify_key, KEY_2) + # version comes from the ID it was stored with + self.assertEqual(res2.verify_key.version, "KEY_ID_2") + self.assertEqual(res2.valid_until_ts, 200) # non-existent result gives None self.assertIsNone(res[("server1", "ed25519:key3")]) @@ -60,32 +73,51 @@ class KeyStoreTestCase(tests.unittest.HomeserverTestCase): key_id_1 = "ed25519:key1" key_id_2 = "ed25519:key2" - d = store.store_server_verify_key("srv1", "from_server", 0, KEY_1) - self.get_success(d) - d = store.store_server_verify_key("srv1", "from_server", 0, KEY_2) + d = store.store_server_verify_keys( + "from_server", + 0, + [ + ("srv1", key_id_1, FetchKeyResult(KEY_1, 100)), + ("srv1", key_id_2, FetchKeyResult(KEY_2, 200)), + ], + ) self.get_success(d) d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)]) res = self.get_success(d) self.assertEqual(len(res.keys()), 2) - self.assertEqual(res[("srv1", key_id_1)], KEY_1) - self.assertEqual(res[("srv1", key_id_2)], KEY_2) + + res1 = res[("srv1", key_id_1)] + self.assertEqual(res1.verify_key, KEY_1) + self.assertEqual(res1.valid_until_ts, 100) + + res2 = res[("srv1", key_id_2)] + self.assertEqual(res2.verify_key, KEY_2) + self.assertEqual(res2.valid_until_ts, 200) # we should be able to look up the same thing again without a db hit res = store.get_server_verify_keys([("srv1", key_id_1)]) if isinstance(res, Deferred): res = self.successResultOf(res) self.assertEqual(len(res.keys()), 1) - self.assertEqual(res[("srv1", key_id_1)], KEY_1) + self.assertEqual(res[("srv1", key_id_1)].verify_key, KEY_1) new_key_2 = signedjson.key.get_verify_key( signedjson.key.generate_signing_key("key2") ) - d = store.store_server_verify_key("srv1", "from_server", 10, new_key_2) + d = store.store_server_verify_keys( + "from_server", 10, [("srv1", key_id_2, FetchKeyResult(new_key_2, 300))] + ) self.get_success(d) d = store.get_server_verify_keys([("srv1", key_id_1), ("srv1", key_id_2)]) res = self.get_success(d) self.assertEqual(len(res.keys()), 2) - self.assertEqual(res[("srv1", key_id_1)], KEY_1) - self.assertEqual(res[("srv1", key_id_2)], new_key_2) + + res1 = res[("srv1", key_id_1)] + self.assertEqual(res1.verify_key, KEY_1) + self.assertEqual(res1.valid_until_ts, 100) + + res2 = res[("srv1", key_id_2)] + self.assertEqual(res2.verify_key, new_key_2) + self.assertEqual(res2.valid_until_ts, 300) |