summary refs log tree commit diff
path: root/tests/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'tests/storage/databases')
-rw-r--r--tests/storage/databases/main/test_events_worker.py154
-rw-r--r--tests/storage/databases/main/test_receipts.py209
2 files changed, 308 insertions, 55 deletions
diff --git a/tests/storage/databases/main/test_events_worker.py b/tests/storage/databases/main/test_events_worker.py

index 46d829b062..5773172ab8 100644 --- a/tests/storage/databases/main/test_events_worker.py +++ b/tests/storage/databases/main/test_events_worker.py
@@ -35,66 +35,45 @@ from synapse.util import Clock from synapse.util.async_helpers import yieldable_gather_results from tests import unittest +from tests.test_utils.event_injection import create_event, inject_event class HaveSeenEventsTestCase(unittest.HomeserverTestCase): + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + def prepare(self, reactor, clock, hs): + self.hs = hs self.store: EventsWorkerStore = hs.get_datastores().main - # insert some test data - for rid in ("room1", "room2"): - self.get_success( - self.store.db_pool.simple_insert( - "rooms", - {"room_id": rid, "room_version": 4}, - ) - ) + self.user = self.register_user("user", "pass") + self.token = self.login(self.user, "pass") + self.room_id = self.helper.create_room_as(self.user, tok=self.token) self.event_ids: List[str] = [] - for idx, rid in enumerate( - ( - "room1", - "room1", - "room1", - "room2", - ) - ): - event_json = {"type": f"test {idx}", "room_id": rid} - event = make_event_from_dict(event_json, room_version=RoomVersions.V4) - event_id = event.event_id - - self.get_success( - self.store.db_pool.simple_insert( - "events", - { - "event_id": event_id, - "room_id": rid, - "topological_ordering": idx, - "stream_ordering": idx, - "type": event.type, - "processed": True, - "outlier": False, - }, - ) - ) - self.get_success( - self.store.db_pool.simple_insert( - "event_json", - { - "event_id": event_id, - "room_id": rid, - "json": json.dumps(event_json), - "internal_metadata": "{}", - "format_version": 3, - }, + for i in range(3): + event = self.get_success( + inject_event( + hs, + room_version=RoomVersions.V7.identifier, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": f"foobarbaz{i}"}, ) ) - self.event_ids.append(event_id) + + self.event_ids.append(event.event_id) def test_simple(self): with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) @@ -104,22 +83,87 @@ class HaveSeenEventsTestCase(unittest.HomeserverTestCase): # a second lookup of the same events should cause no queries with LoggingContext(name="test") as ctx: res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0], "event19"]) + self.store.have_seen_events( + self.room_id, [self.event_ids[0], "eventdoesnotexist"] + ) ) self.assertEqual(res, {self.event_ids[0]}) self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) - def test_query_via_event_cache(self): - # fetch an event into the event cache - self.get_success(self.store.get_event(self.event_ids[0])) + def test_persisting_event_invalidates_cache(self): + """ + Test to make sure that the `have_seen_event` cache + is invalidated after we persist an event and returns + the updated value. + """ + event, event_context = self.get_success( + create_event( + self.hs, + room_id=self.room_id, + sender=self.user, + type="test_event_type", + content={"body": "garply"}, + ) + ) - # looking it up should now cause no db hits with LoggingContext(name="test") as ctx: + # First, check `have_seen_event` for an event we have not seen yet + # to prime the cache with a `false` value. res = self.get_success( - self.store.have_seen_events("room1", [self.event_ids[0]]) + self.store.have_seen_events(event.room_id, [event.event_id]) ) - self.assertEqual(res, {self.event_ids[0]}) - self.assertEqual(ctx.get_resource_usage().db_txn_count, 0) + self.assertEqual(res, set()) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + # Persist the event which should invalidate or prefill the + # `have_seen_event` cache so we don't return stale values. + persistence = self.hs.get_storage_controllers().persistence + self.get_success( + persistence.persist_event( + event, + event_context, + ) + ) + + with LoggingContext(name="test") as ctx: + # Check `have_seen_event` again and we should see the updated fact + # that we have now seen the event after persisting it. + res = self.get_success( + self.store.have_seen_events(event.room_id, [event.event_id]) + ) + self.assertEqual(res, {event.event_id}) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + def test_invalidate_cache_by_room_id(self): + """ + Test to make sure that all events associated with the given `(room_id,)` + are invalidated in the `have_seen_event` cache. + """ + with LoggingContext(name="test") as ctx: + # Prime the cache with some values + res = self.get_success( + self.store.have_seen_events(self.room_id, self.event_ids) + ) + self.assertEqual(res, set(self.event_ids)) + + # That should result in a single db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) + + # Clear the cache with any events associated with the `room_id` + self.store.have_seen_event.invalidate((self.room_id,)) + + with LoggingContext(name="test") as ctx: + res = self.get_success( + self.store.have_seen_events(self.room_id, self.event_ids) + ) + self.assertEqual(res, set(self.event_ids)) + + # Since we cleared the cache, it should result in another db query to lookup + self.assertEqual(ctx.get_resource_usage().db_txn_count, 1) class EventCacheTestCase(unittest.HomeserverTestCase): @@ -254,7 +298,7 @@ class DatabaseOutageTestCase(unittest.HomeserverTestCase): "room_id": self.room_id, "json": json.dumps(event_json), "internal_metadata": "{}", - "format_version": EventFormatVersions.V3, + "format_version": EventFormatVersions.ROOM_V4_PLUS, }, ) ) diff --git a/tests/storage/databases/main/test_receipts.py b/tests/storage/databases/main/test_receipts.py new file mode 100644
index 0000000000..c4f12d81d7 --- /dev/null +++ b/tests/storage/databases/main/test_receipts.py
@@ -0,0 +1,209 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the 'License'); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an 'AS IS' BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from typing import Any, Dict, Optional, Sequence, Tuple + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.rest import admin +from synapse.rest.client import login, room +from synapse.server import HomeServer +from synapse.storage.database import LoggingTransaction +from synapse.util import Clock + +from tests.unittest import HomeserverTestCase + + +class ReceiptsBackgroundUpdateStoreTestCase(HomeserverTestCase): + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer): + self.store = hs.get_datastores().main + self.user_id = self.register_user("foo", "pass") + self.token = self.login("foo", "pass") + self.room_id = self.helper.create_room_as(self.user_id, tok=self.token) + self.other_room_id = self.helper.create_room_as(self.user_id, tok=self.token) + + def _test_background_receipts_unique_index( + self, + update_name: str, + index_name: str, + table: str, + receipts: Dict[Tuple[str, str, str], Sequence[Dict[str, Any]]], + expected_unique_receipts: Dict[Tuple[str, str, str], Optional[Dict[str, Any]]], + ): + """Test that the background update to uniqueify non-thread receipts in + the given receipts table works properly. + + Args: + update_name: The name of the background update to test. + index_name: The name of the index that the background update creates. + table: The table of receipts that the background update fixes. + receipts: The test data containing duplicate receipts. + A list of receipt rows to insert, grouped by + `(room_id, receipt_type, user_id)`. + expected_unique_receipts: A dictionary of `(room_id, receipt_type, user_id)` + keys and expected receipt key-values after duplicate receipts have been + removed. + """ + # First, undo the background update. + def drop_receipts_unique_index(txn: LoggingTransaction) -> None: + txn.execute(f"DROP INDEX IF EXISTS {index_name}") + + self.get_success( + self.store.db_pool.runInteraction( + "drop_receipts_unique_index", + drop_receipts_unique_index, + ) + ) + + # Populate the receipts table, including duplicates. + for (room_id, receipt_type, user_id), rows in receipts.items(): + for row in rows: + self.get_success( + self.store.db_pool.simple_insert( + table, + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + "thread_id": None, + "data": "{}", + **row, + }, + ) + ) + + # Insert and run the background update. + self.get_success( + self.store.db_pool.simple_insert( + "background_updates", + { + "update_name": update_name, + "progress_json": "{}", + }, + ) + ) + + self.store.db_pool.updates._all_done = False + + self.wait_for_background_updates() + + # Check that the remaining receipts match expectations. + for ( + room_id, + receipt_type, + user_id, + ), expected_row in expected_unique_receipts.items(): + # Include the receipt key in the returned columns, for more informative + # assertion messages. + columns = ["room_id", "receipt_type", "user_id"] + if expected_row is not None: + columns += expected_row.keys() + + rows = self.get_success( + self.store.db_pool.simple_select_list( + table=table, + keyvalues={ + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + # `simple_select_onecol` does not support NULL filters, + # so skip the filter on `thread_id`. + }, + retcols=columns, + desc="get_receipt", + ) + ) + + if expected_row is not None: + self.assertEqual( + len(rows), + 1, + f"Background update did not leave behind latest receipt in {table}", + ) + self.assertEqual( + rows[0], + { + "room_id": room_id, + "receipt_type": receipt_type, + "user_id": user_id, + **expected_row, + }, + ) + else: + self.assertEqual( + len(rows), + 0, + f"Background update did not remove all duplicate receipts from {table}", + ) + + def test_background_receipts_linearized_unique_index(self): + """Test that the background update to uniqueify non-thread receipts in + `receipts_linearized` works properly. + """ + self._test_background_receipts_unique_index( + "receipts_linearized_unique_index", + "receipts_linearized_unique_index", + "receipts_linearized", + receipts={ + (self.room_id, "m.read", self.user_id): [ + {"stream_id": 5, "event_id": "$some_event"}, + {"stream_id": 6, "event_id": "$some_event"}, + ], + (self.other_room_id, "m.read", self.user_id): [ + {"stream_id": 7, "event_id": "$some_event"} + ], + }, + expected_unique_receipts={ + (self.room_id, "m.read", self.user_id): {"stream_id": 6}, + (self.other_room_id, "m.read", self.user_id): {"stream_id": 7}, + }, + ) + + def test_background_receipts_graph_unique_index(self): + """Test that the background update to uniqueify non-thread receipts in + `receipts_graph` works properly. + """ + self._test_background_receipts_unique_index( + "receipts_graph_unique_index", + "receipts_graph_unique_index", + "receipts_graph", + receipts={ + (self.room_id, "m.read", self.user_id): [ + { + "event_ids": '["$some_event"]', + }, + { + "event_ids": '["$some_event"]', + }, + ], + (self.other_room_id, "m.read", self.user_id): [ + { + "event_ids": '["$some_event"]', + } + ], + }, + expected_unique_receipts={ + (self.room_id, "m.read", self.user_id): None, + (self.other_room_id, "m.read", self.user_id): { + "event_ids": '["$some_event"]' + }, + }, + )