From ca909013c8c8ed99c92f055b2da56c68b49c4df0 Mon Sep 17 00:00:00 2001 From: Eric Eastwood Date: Thu, 8 Aug 2024 17:49:15 -0500 Subject: Fill in `stream_ordering`/`bump_stamp` for any event being persisted --- synapse/storage/databases/main/events.py | 67 +++++- .../main/delta/87/01_sliding_sync_memberships.sql | 13 +- tests/storage/test_events.py | 238 +++++++++++++++++---- 3 files changed, 266 insertions(+), 52 deletions(-) diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py index 1dd114c601..2b91f1f67c 100644 --- a/synapse/storage/databases/main/events.py +++ b/synapse/storage/databases/main/events.py @@ -1530,12 +1530,13 @@ class PersistEventsStore: room_id, # Even though `Mapping`/`Dict` have no guaranteed order, some # implementations may preserve insertion order so we're just going to - # choose the best possible answer by using the "last" event ID which we + # choose the best possible answer by using the "first" event ID which we # will assume will have the greatest `stream_ordering`. We really just # need *some* answer in case we are the first ones inserting into the - # table and this will resolve itself when we update this field in the - # persist events loop. - list(to_insert.values())[-1], + # table and in reality, `_store_event_txn()` is run before this function + # so it will already have the correct value. This is just to account for + # things changing in the future. + next(iter(to_insert.values())), ] # If we have a `bump_event_id`, let's update the `bump_stamp` column bump_stamp_column = "" @@ -1977,6 +1978,64 @@ class PersistEventsStore: ], ) + # Handle updating `sliding_sync_joined_rooms` + room_id_to_stream_ordering_map: Dict[str, int] = {} + room_id_to_bump_stamp_map: Dict[str, int] = {} + for event, _ in events_and_contexts: + existing_stream_ordering = room_id_to_stream_ordering_map.get(event.room_id) + # This should exist at this point because we're inserting events here which require it + assert event.internal_metadata.stream_ordering is not None + if ( + existing_stream_ordering is None + or existing_stream_ordering < event.internal_metadata.stream_ordering + ): + room_id_to_stream_ordering_map[event.room_id] = ( + event.internal_metadata.stream_ordering + ) + + if event.type in SLIDING_SYNC_DEFAULT_BUMP_EVENT_TYPES: + existing_bump_stamp = room_id_to_bump_stamp_map.get(event.room_id) + # This should exist at this point because we're inserting events here which require it + assert event.internal_metadata.stream_ordering is not None + if ( + existing_bump_stamp is None + or existing_bump_stamp < event.internal_metadata.stream_ordering + ): + room_id_to_bump_stamp_map[event.room_id] = ( + event.internal_metadata.stream_ordering + ) + + # `_store_event_txn` is run before `_update_current_state_txn` which handles + # deleting the rows if we are no longer in the room so we don't need to worry + # about inserting something that will be orphaned. + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_joined_rooms", + key_names=("room_id",), + key_values=[ + (room_id,) for room_id in room_id_to_stream_ordering_map.keys() + ], + value_names=("event_stream_ordering",), + value_values=[ + (room_id_to_stream_ordering_map[room_id],) + for room_id in room_id_to_stream_ordering_map.keys() + ], + ) + # This has to be separate from the upsert above because we won't have a + # `bump_stamp` for every event and we don't want to overwrite the existing value + # with `None`. + self.db_pool.simple_upsert_many_txn( + txn, + table="sliding_sync_joined_rooms", + key_names=("room_id",), + key_values=[(room_id,) for room_id in room_id_to_bump_stamp_map.keys()], + value_names=("bump_stamp",), + value_values=[ + (room_id_to_bump_stamp_map[room_id],) + for room_id in room_id_to_bump_stamp_map.keys() + ], + ) + def _store_rejected_events_txn( self, txn: LoggingTransaction, diff --git a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql index 61ea65aba2..a60fabb586 100644 --- a/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql +++ b/synapse/storage/schema/main/delta/87/01_sliding_sync_memberships.sql @@ -31,7 +31,16 @@ CREATE TABLE IF NOT EXISTS sliding_sync_joined_rooms( PRIMARY KEY (room_id) ); - +-- Store the user's non-join room memberships. Only stores the latest membership event +-- for a given user which matches `local_current_membership` (except we don't store +-- joins). +-- +-- FIXME: It might be easier to just store any membership here but just indicate that +-- the state is a snapshot of the current state at the time of the membership event. +-- That way we don't have to worry about clearing out +-- `sliding_sync_non_join_memberships` when the the user joins a room. And we always +-- have the full picture. Perhaps we can call it `sliding_sync_membership_snapshots`. +-- -- We don't include `bump_stamp` here because we can just use the `stream_ordering` from -- the membership event itself as the `bump_stamp`. CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships( @@ -50,6 +59,8 @@ CREATE TABLE IF NOT EXISTS sliding_sync_non_join_memberships( -- `m.room.encryption` -> `content.algorithm` (according to the current state at the -- time of the membership) is_encrypted BOOLEAN DEFAULT 0 NOT NULL, + -- FIXME: Maybe we want to add `tombstone_successor_room_id` here to help with `include_old_rooms` + -- (tracked by https://github.com/element-hq/synapse/issues/17540) PRIMARY KEY (room_id, user_id) ); diff --git a/tests/storage/test_events.py b/tests/storage/test_events.py index 9543123473..b8302a7928 100644 --- a/tests/storage/test_events.py +++ b/tests/storage/test_events.py @@ -617,8 +617,9 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) - # User1 joins the room - self.helper.join(room_id1, user1_id, tok=user1_tok) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() self.assertIncludes( @@ -630,15 +631,20 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): sliding_sync_joined_rooms_results[room_id1], _SlidingSyncJoinedRoomResult( room_id=room_id1, - # TODO - event_stream_ordering=None, - bump_stamp=None, + # History visibility just happens to be the last event sent in the room + event_stream_ordering=state_map[ + (EventTypes.RoomHistoryVisibility, "") + ].internal_metadata.stream_ordering, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, room_type=None, room_name=None, is_encrypted=False, ), ) + # No one is non-joined to this room so we shouldn't see anything sliding_sync_non_join_memberships_results = ( self._get_sliding_sync_non_join_memberships() ) @@ -674,7 +680,14 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): ) # User1 joins the room - self.helper.join(room_id1, user1_id, tok=user1_tok) + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + user1_join_event_pos = self.get_success( + self.store.get_position_for_event(user1_join_response["event_id"]) + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() self.assertIncludes( @@ -686,9 +699,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): sliding_sync_joined_rooms_results[room_id1], _SlidingSyncJoinedRoomResult( room_id=room_id1, - # TODO - event_stream_ordering=None, - bump_stamp=None, + event_stream_ordering=user1_join_event_pos.stream, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, room_type=None, room_name="my super duper room", is_encrypted=True, @@ -704,10 +718,73 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): exact=True, ) - def test_joined_room_with_info_updated(self) -> None: + def test_joined_space_room_with_info(self) -> None: """ - Test info in `sliding_sync_joined_rooms` is updated when the current state is - updated. + Test joined space room with name shows up in `sliding_sync_joined_rooms`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + space_room_id = self.helper.create_room_as( + user2_id, + tok=user2_tok, + extra_content={ + "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE} + }, + ) + # Add a room name + self.helper.send_state( + space_room_id, + EventTypes.Name, + {"name": "my super duper space"}, + tok=user2_tok, + ) + + # User1 joins the room + user1_join_response = self.helper.join(space_room_id, user1_id, tok=user1_tok) + user1_join_event_pos = self.get_success( + self.store.get_position_for_event(user1_join_response["event_id"]) + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(space_room_id) + ) + + sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() + self.assertIncludes( + set(sliding_sync_joined_rooms_results.keys()), + {space_room_id}, + exact=True, + ) + self.assertEqual( + sliding_sync_joined_rooms_results[space_room_id], + _SlidingSyncJoinedRoomResult( + room_id=space_room_id, + event_stream_ordering=user1_join_event_pos.stream, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, + room_type=RoomTypes.SPACE, + room_name="my super duper space", + is_encrypted=False, + ), + ) + + sliding_sync_non_join_memberships_results = ( + self._get_sliding_sync_non_join_memberships() + ) + self.assertIncludes( + set(sliding_sync_non_join_memberships_results.keys()), + set(), + exact=True, + ) + + def test_joined_room_with_state_updated(self) -> None: + """ + Test state derived info in `sliding_sync_joined_rooms` is updated when the + current state is updated. """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") @@ -724,7 +801,14 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): ) # User1 joins the room - self.helper.join(room_id1, user1_id, tok=user1_tok) + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + user1_join_event_pos = self.get_success( + self.store.get_position_for_event(user1_join_response["event_id"]) + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() self.assertIncludes( @@ -736,9 +820,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): sliding_sync_joined_rooms_results[room_id1], _SlidingSyncJoinedRoomResult( room_id=room_id1, - # TODO - event_stream_ordering=None, - bump_stamp=None, + event_stream_ordering=user1_join_event_pos.stream, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, room_type=None, room_name="my super duper room", is_encrypted=False, @@ -762,12 +847,15 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): tok=user2_tok, ) # Encrypt the room - self.helper.send_state( + encrypt_room_response = self.helper.send_state( room_id1, EventTypes.RoomEncryption, {EventContentFields.ENCRYPTION_ALGORITHM: "m.megolm.v1.aes-sha2"}, tok=user2_tok, ) + encrypt_room_event_pos = self.get_success( + self.store.get_position_for_event(encrypt_room_response["event_id"]) + ) sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() self.assertIncludes( @@ -780,9 +868,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): sliding_sync_joined_rooms_results[room_id1], _SlidingSyncJoinedRoomResult( room_id=room_id1, - # TODO - event_stream_ordering=None, - bump_stamp=None, + event_stream_ordering=encrypt_room_event_pos.stream, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, room_type=None, room_name="my super duper room was renamed", is_encrypted=True, @@ -798,48 +887,88 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): exact=True, ) - def test_joined_space_room_with_info(self) -> None: + def test_joined_room_is_bumped(self) -> None: """ - Test joined space room with name shows up in `sliding_sync_joined_rooms`. + Test that `event_stream_ordering` and `bump_stamp` is updated when a new bump + event is sent (`sliding_sync_joined_rooms`). """ user1_id = self.register_user("user1", "pass") user1_tok = self.login(user1_id, "pass") user2_id = self.register_user("user2", "pass") user2_tok = self.login(user2_id, "pass") - space_room_id = self.helper.create_room_as( - user2_id, - tok=user2_tok, - extra_content={ - "creation_content": {EventContentFields.ROOM_TYPE: RoomTypes.SPACE} - }, - ) + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok) # Add a room name self.helper.send_state( - space_room_id, + room_id1, EventTypes.Name, - {"name": "my super duper space"}, + {"name": "my super duper room"}, tok=user2_tok, ) # User1 joins the room - self.helper.join(space_room_id, user1_id, tok=user1_tok) + user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok) + user1_join_event_pos = self.get_success( + self.store.get_position_for_event(user1_join_response["event_id"]) + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() self.assertIncludes( set(sliding_sync_joined_rooms_results.keys()), - {space_room_id}, + {room_id1}, exact=True, ) self.assertEqual( - sliding_sync_joined_rooms_results[space_room_id], + sliding_sync_joined_rooms_results[room_id1], _SlidingSyncJoinedRoomResult( - room_id=space_room_id, - # TODO - event_stream_ordering=None, - bump_stamp=None, - room_type=RoomTypes.SPACE, - room_name="my super duper space", + room_id=room_id1, + event_stream_ordering=user1_join_event_pos.stream, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, + room_type=None, + room_name="my super duper room", + is_encrypted=False, + ), + ) + + sliding_sync_non_join_memberships_results = ( + self._get_sliding_sync_non_join_memberships() + ) + self.assertIncludes( + set(sliding_sync_non_join_memberships_results.keys()), + set(), + exact=True, + ) + + # Send a new message to bump the room + event_response = self.helper.send(room_id1, "some message", tok=user1_tok) + event_pos = self.get_success( + self.store.get_position_for_event(event_response["event_id"]) + ) + + sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() + self.assertIncludes( + set(sliding_sync_joined_rooms_results.keys()), + {room_id1}, + exact=True, + ) + # Make sure we see the new room name + self.assertEqual( + sliding_sync_joined_rooms_results[room_id1], + _SlidingSyncJoinedRoomResult( + room_id=room_id1, + # Updated `event_stream_ordering` + event_stream_ordering=event_pos.stream, + # And since the event was a bump event, the `bump_stamp` should be updated + bump_stamp=event_pos.stream, + # The state is still the same (it didn't change) + room_type=None, + room_name="my super duper room", is_encrypted=False, ), ) @@ -896,12 +1025,19 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): # Update the room name after we are invited just to make sure # we don't update non-join memberships when the room name changes. - self.helper.send_state( + rename_response = self.helper.send_state( space_room_id, EventTypes.Name, {"name": "my super duper space was renamed"}, tok=user2_tok, ) + rename_event_pos = self.get_success( + self.store.get_position_for_event(rename_response["event_id"]) + ) + + state_map = self.get_success( + self.storage_controllers.state.get_current_state(space_room_id) + ) # User2 is still joined to the room so we should still have an entry in the # `sliding_sync_joined_rooms` table. @@ -915,9 +1051,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): sliding_sync_joined_rooms_results[space_room_id], _SlidingSyncJoinedRoomResult( room_id=space_room_id, - # TODO - event_stream_ordering=None, - bump_stamp=None, + event_stream_ordering=rename_event_pos.stream, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, room_type=RoomTypes.SPACE, room_name="my super duper space was renamed", is_encrypted=True, @@ -981,6 +1118,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): self.store.get_position_for_event(user3_ban_response["event_id"]) ) + state_map = self.get_success( + self.storage_controllers.state.get_current_state(room_id1) + ) + # User2 is still joined to the room so we should still have an entry # in the `sliding_sync_joined_rooms` table. sliding_sync_joined_rooms_results = self._get_sliding_sync_joined_rooms() @@ -993,9 +1134,10 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): sliding_sync_joined_rooms_results[room_id1], _SlidingSyncJoinedRoomResult( room_id=room_id1, - # TODO - event_stream_ordering=None, - bump_stamp=None, + event_stream_ordering=user3_ban_event_pos.stream, + bump_stamp=state_map[ + (EventTypes.Create, "") + ].internal_metadata.stream_ordering, room_type=None, room_name=None, is_encrypted=False, @@ -1042,6 +1184,8 @@ class SlidingSyncPrePopulatedTablesTestCase(HomeserverTestCase): # TODO: Test remote invite + # TODO Test for non-join membership changing + def test_non_join_server_left_room(self) -> None: """ Test everyone local leaves the room but their leave membership still shows up in -- cgit 1.5.1