summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync/extensions.py
diff options
context:
space:
mode:
authorErik Johnston <erikj@element.io>2024-09-19 21:51:51 +0300
committerGitHub <noreply@github.com>2024-09-19 19:51:51 +0100
commita851f6b237ad7b4488d2d80c0b0777436d24da6a (patch)
tree23dbfbd70fa08566a7de2a7b1146ccf080019809 /synapse/handlers/sliding_sync/extensions.py
parentSliding Sync: Avoid fetching left rooms and add back `newly_left` rooms (#17725) (diff)
downloadsynapse-a851f6b237ad7b4488d2d80c0b0777436d24da6a.tar.xz
Sliding sync: Add connection tracking to the `account_data` extension (#17695)
This is basically exactly the same logic as for receipts. Essentially we
just need to track which room account data we have and haven't sent down
to clients, and use that when we pull stuff out.

I think this just needs a couple of extra tests written

---------

Co-authored-by: Eric Eastwood <eric.eastwood@beta.gouv.fr>
Diffstat (limited to 'synapse/handlers/sliding_sync/extensions.py')
-rw-r--r--synapse/handlers/sliding_sync/extensions.py183
1 files changed, 131 insertions, 52 deletions
diff --git a/synapse/handlers/sliding_sync/extensions.py b/synapse/handlers/sliding_sync/extensions.py

index 287f4b04ad..56e1d9329e 100644 --- a/synapse/handlers/sliding_sync/extensions.py +++ b/synapse/handlers/sliding_sync/extensions.py
@@ -19,7 +19,6 @@ from typing import ( AbstractSet, ChainMap, Dict, - List, Mapping, MutableMapping, Optional, @@ -119,6 +118,8 @@ class SlidingSyncExtensionHandler: if sync_config.extensions.account_data is not None: account_data_response = await self.get_account_data_extension_response( sync_config=sync_config, + previous_connection_state=previous_connection_state, + new_connection_state=new_connection_state, actual_lists=actual_lists, actual_room_ids=actual_room_ids, account_data_request=sync_config.extensions.account_data, @@ -361,6 +362,8 @@ class SlidingSyncExtensionHandler: async def get_account_data_extension_response( self, sync_config: SlidingSyncConfig, + previous_connection_state: "PerConnectionState", + new_connection_state: "MutablePerConnectionState", actual_lists: Mapping[str, SlidingSyncResult.SlidingWindowList], actual_room_ids: Set[str], account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, @@ -425,15 +428,7 @@ class SlidingSyncExtensionHandler: # Fetch room account data # - # List of -> Mapping from room_id to mapping of `type` to `content` of room - # account data events. - # - # This is is a list so we can avoid making copies of immutable data and instead - # just provide multiple maps that need to be combined. Normally, we could - # reach for `ChainMap` in this scenario, but this is a nested map and accessing - # the ChainMap by room_id won't combine the two maps for that room (we would - # need a new `NestedChainMap` type class). - account_data_by_room_maps: List[Mapping[str, Mapping[str, JsonMapping]]] = [] + account_data_by_room_map: MutableMapping[str, Mapping[str, JsonMapping]] = {} relevant_room_ids = self.find_relevant_room_ids_for_extension( requested_lists=account_data_request.lists, requested_room_ids=account_data_request.rooms, @@ -441,9 +436,43 @@ class SlidingSyncExtensionHandler: actual_room_ids=actual_room_ids, ) if len(relevant_room_ids) > 0: + # We need to handle the different cases depending on if we have sent + # down account data previously or not, so we split the relevant + # rooms up into different collections based on status. + live_rooms = set() + previously_rooms: Dict[str, int] = {} + initial_rooms = set() + + for room_id in relevant_room_ids: + if not from_token: + initial_rooms.add(room_id) + continue + + room_status = previous_connection_state.account_data.have_sent_room( + room_id + ) + if room_status.status == HaveSentRoomFlag.LIVE: + live_rooms.add(room_id) + elif room_status.status == HaveSentRoomFlag.PREVIOUSLY: + assert room_status.last_token is not None + previously_rooms[room_id] = room_status.last_token + elif room_status.status == HaveSentRoomFlag.NEVER: + initial_rooms.add(room_id) + else: + assert_never(room_status.status) + + # We fetch all room account data since the from_token. This is so + # that we can record which rooms have updates that haven't been sent + # down. + # + # Mapping from room_id to mapping of `type` to `content` of room account + # data events. + all_updates_since_the_from_token: Mapping[ + str, Mapping[str, JsonMapping] + ] = {} if from_token is not None: # TODO: This should take into account the `from_token` and `to_token` - account_data_by_room_map = ( + all_updates_since_the_from_token = ( await self.store.get_updated_room_account_data_for_user( user_id, from_token.stream_token.account_data_key ) @@ -456,58 +485,108 @@ class SlidingSyncExtensionHandler: user_id, from_token.stream_token.account_data_key ) for room_id, tags in tags_by_room.items(): - account_data_by_room_map.setdefault(room_id, {})[ + all_updates_since_the_from_token.setdefault(room_id, {})[ AccountDataTypes.TAG ] = {"tags": tags} - account_data_by_room_maps.append(account_data_by_room_map) - else: - # TODO: This should take into account the `to_token` - immutable_account_data_by_room_map = ( - await self.store.get_room_account_data_for_user(user_id) - ) - account_data_by_room_maps.append(immutable_account_data_by_room_map) + # For live rooms we just get the updates from `all_updates_since_the_from_token` + if live_rooms: + for room_id in all_updates_since_the_from_token.keys() & live_rooms: + account_data_by_room_map[room_id] = ( + all_updates_since_the_from_token[room_id] + ) - # Add room tags - # - # TODO: This should take into account the `to_token` - tags_by_room = await self.store.get_tags_for_user(user_id) - account_data_by_room_maps.append( - { - room_id: {AccountDataTypes.TAG: {"tags": tags}} - for room_id, tags in tags_by_room.items() - } + # For previously and initial rooms we query each room individually. + if previously_rooms or initial_rooms: + + async def handle_previously(room_id: str) -> None: + # Either get updates or all account data in the room + # depending on if the room state is PREVIOUSLY or NEVER. + previous_token = previously_rooms.get(room_id) + if previous_token is not None: + room_account_data = await ( + self.store.get_updated_room_account_data_for_user_for_room( + user_id=user_id, + room_id=room_id, + from_stream_id=previous_token, + to_stream_id=to_token.account_data_key, + ) + ) + + # Add room tags + changed = await self.store.has_tags_changed_for_room( + user_id=user_id, + room_id=room_id, + from_stream_id=previous_token, + to_stream_id=to_token.account_data_key, + ) + if changed: + # XXX: Ideally, this should take into account the `to_token` + # and return the set of tags at that time but we don't track + # changes to tags so we just have to return all tags for the + # room. + immutable_tag_map = await self.store.get_tags_for_room( + user_id, room_id + ) + room_account_data[AccountDataTypes.TAG] = { + "tags": immutable_tag_map + } + + # Only add an entry if there were any updates. + if room_account_data: + account_data_by_room_map[room_id] = room_account_data + else: + # TODO: This should take into account the `to_token` + immutable_room_account_data = ( + await self.store.get_account_data_for_room(user_id, room_id) + ) + + # Add room tags + # + # XXX: Ideally, this should take into account the `to_token` + # and return the set of tags at that time but we don't track + # changes to tags so we just have to return all tags for the + # room. + immutable_tag_map = await self.store.get_tags_for_room( + user_id, room_id + ) + + account_data_by_room_map[room_id] = ChainMap( + {AccountDataTypes.TAG: {"tags": immutable_tag_map}} + if immutable_tag_map + else {}, + # Cast is safe because `ChainMap` only mutates the top-most map, + # see https://github.com/python/typeshed/issues/8430 + cast( + MutableMapping[str, JsonMapping], + immutable_room_account_data, + ), + ) + + # We handle these rooms concurrently to speed it up. + await concurrently_execute( + handle_previously, + previously_rooms.keys() | initial_rooms, + limit=20, ) - # Filter down to the relevant rooms ... and combine the maps - relevant_account_data_by_room_map: MutableMapping[ - str, Mapping[str, JsonMapping] - ] = {} - for room_id in relevant_room_ids: - # We want to avoid adding empty maps for relevant rooms that have no room - # account data so do a quick check to see if it's in any of the maps. - is_room_in_maps = False - for room_map in account_data_by_room_maps: - if room_id in room_map: - is_room_in_maps = True - break + # Now record which rooms are now up to data, and which rooms have + # pending updates to send. + new_connection_state.account_data.record_sent_rooms(relevant_room_ids) + missing_updates = ( + all_updates_since_the_from_token.keys() - relevant_room_ids + ) + if missing_updates: + # If we have missing updates then we must have had a from_token. + assert from_token is not None - # If we found the room in any of the maps, combine the maps for that room - if is_room_in_maps: - relevant_account_data_by_room_map[room_id] = ChainMap( - {}, - *( - # Cast is safe because `ChainMap` only mutates the top-most map, - # see https://github.com/python/typeshed/issues/8430 - cast(MutableMapping[str, JsonMapping], room_map[room_id]) - for room_map in account_data_by_room_maps - if room_map.get(room_id) - ), + new_connection_state.account_data.record_unsent_rooms( + missing_updates, from_token.stream_token.account_data_key ) return SlidingSyncResult.Extensions.AccountDataExtension( global_account_data_map=global_account_data_map, - account_data_by_room_map=relevant_account_data_by_room_map, + account_data_by_room_map=account_data_by_room_map, ) @trace