summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py155
1 files changed, 151 insertions, 4 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py

index 07545e9496..c0e5192f00 100644 --- a/synapse/handlers/sliding_sync.py +++ b/synapse/handlers/sliding_sync.py
@@ -48,6 +48,7 @@ from synapse.storage.roommember import MemberSummary from synapse.types import ( DeviceListUpdates, JsonDict, + JsonMapping, PersistedEventPosition, Requester, RoomStreamToken, @@ -360,6 +361,7 @@ class SlidingSyncHandler: self.event_sources = hs.get_event_sources() self.relations_handler = hs.get_relations_handler() self.device_handler = hs.get_device_handler() + self.push_rules_handler = hs.get_push_rules_handler() self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync self.connection_store = SlidingSyncConnectionStore() @@ -667,6 +669,7 @@ class SlidingSyncHandler: extensions = await self.get_extensions_response( sync_config=sync_config, + lists=lists, from_token=from_token, to_token=to_token, ) @@ -1776,6 +1779,9 @@ class SlidingSyncHandler: else: assert from_bound is not None + # TODO: Limit the number of state events we're about to send down + # the room, if its too many we should change this to an + # `initial=True`? deltas = await self.store.get_current_state_deltas_for_room( room_id=room_id, from_token=from_bound, @@ -1830,8 +1836,14 @@ class SlidingSyncHandler: bump_stamp = room_membership_for_user_at_to_token.event_pos.stream # But if we found a bump event, use that instead if last_bump_event_result is not None: - _, bump_event_pos = last_bump_event_result - bump_stamp = bump_event_pos.stream + _, new_bump_event_pos = last_bump_event_result + + # If we've just joined a remote room, then the last bump event may + # have been backfilled (and so have a negative stream ordering). + # These negative stream orderings can't sensibly be compared, so + # instead we use the membership event position. + if new_bump_event_pos.stream > 0: + bump_stamp = new_bump_event_pos.stream return SlidingSyncResult.RoomResult( name=room_name, @@ -1863,6 +1875,7 @@ class SlidingSyncHandler: async def get_extensions_response( self, sync_config: SlidingSyncConfig, + lists: Dict[str, SlidingSyncResult.SlidingWindowList], to_token: StreamToken, from_token: Optional[SlidingSyncStreamToken], ) -> SlidingSyncResult.Extensions: @@ -1870,6 +1883,7 @@ class SlidingSyncHandler: Args: sync_config: Sync configuration + lists: Sliding window API. A map of list key to list results. to_token: The point in the stream to sync up to. from_token: The point in the stream to sync from. """ @@ -1894,9 +1908,20 @@ class SlidingSyncHandler: from_token=from_token, ) + account_data_response = None + if sync_config.extensions.account_data is not None: + account_data_response = await self.get_account_data_extension_response( + sync_config=sync_config, + lists=lists, + account_data_request=sync_config.extensions.account_data, + to_token=to_token, + from_token=from_token, + ) + return SlidingSyncResult.Extensions( to_device=to_device_response, e2ee=e2ee_response, + account_data=account_data_response, ) async def get_to_device_extension_response( @@ -2023,6 +2048,128 @@ class SlidingSyncHandler: device_unused_fallback_key_types=device_unused_fallback_key_types, ) + async def get_account_data_extension_response( + self, + sync_config: SlidingSyncConfig, + lists: Dict[str, SlidingSyncResult.SlidingWindowList], + account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension, + to_token: StreamToken, + from_token: Optional[SlidingSyncStreamToken], + ) -> Optional[SlidingSyncResult.Extensions.AccountDataExtension]: + """Handle Account Data extension (MSC3959) + + Args: + sync_config: Sync configuration + lists: Sliding window API. A map of list key to list results. + account_data_request: The account_data extension from the request + to_token: The point in the stream to sync up to. + from_token: The point in the stream to sync from. + """ + user_id = sync_config.user.to_string() + + # Skip if the extension is not enabled + if not account_data_request.enabled: + return None + + global_account_data_map: Mapping[str, JsonMapping] = {} + if from_token is not None: + global_account_data_map = ( + await self.store.get_updated_global_account_data_for_user( + user_id, from_token.stream_token.account_data_key + ) + ) + + have_push_rules_changed = await self.store.have_push_rules_changed_for_user( + user_id, from_token.stream_token.push_rules_key + ) + if have_push_rules_changed: + global_account_data_map = dict(global_account_data_map) + global_account_data_map[AccountDataTypes.PUSH_RULES] = ( + await self.push_rules_handler.push_rules_for_user(sync_config.user) + ) + else: + all_global_account_data = await self.store.get_global_account_data_for_user( + user_id + ) + + global_account_data_map = dict(all_global_account_data) + global_account_data_map[AccountDataTypes.PUSH_RULES] = ( + await self.push_rules_handler.push_rules_for_user(sync_config.user) + ) + + # We only want to include account data for rooms that are already in the sliding + # sync response AND that were requested in the account data request. + relevant_room_ids: Set[str] = set() + + # See what rooms from the room subscriptions we should get account data for + if ( + account_data_request.rooms is not None + and sync_config.room_subscriptions is not None + ): + actual_room_ids = sync_config.room_subscriptions.keys() + + for room_id in account_data_request.rooms: + # A wildcard means we process all rooms from the room subscriptions + if room_id == "*": + relevant_room_ids.update(sync_config.room_subscriptions.keys()) + break + + if room_id in actual_room_ids: + relevant_room_ids.add(room_id) + + # See what rooms from the sliding window lists we should get account data for + if account_data_request.lists is not None: + for list_key in account_data_request.lists: + # Just some typing because we share the variable name in multiple places + actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None + + # A wildcard means we process rooms from all lists + if list_key == "*": + for actual_list in lists.values(): + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + break + + actual_list = lists.get(list_key) + if actual_list is not None: + # We only expect a single SYNC operation for any list + assert len(actual_list.ops) == 1 + sync_op = actual_list.ops[0] + assert sync_op.op == OperationType.SYNC + + relevant_room_ids.update(sync_op.room_ids) + + # Fetch room account data + account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {} + if len(relevant_room_ids) > 0: + if from_token is not None: + account_data_by_room_map = ( + await self.store.get_updated_room_account_data_for_user( + user_id, from_token.stream_token.account_data_key + ) + ) + else: + account_data_by_room_map = ( + await self.store.get_room_account_data_for_user(user_id) + ) + + # Filter down to the relevant rooms + account_data_by_room_map = { + room_id: account_data_map + for room_id, account_data_map in account_data_by_room_map.items() + if room_id in relevant_room_ids + } + + return SlidingSyncResult.Extensions.AccountDataExtension( + global_account_data_map=global_account_data_map, + account_data_by_room_map=account_data_by_room_map, + ) + class HaveSentRoomFlag(Enum): """Flag for whether we have sent the room down a sliding sync connection. @@ -2236,8 +2383,8 @@ class SlidingSyncConnectionStore: user_id = sync_config.user.to_string() - # If this is missing, only one sliding sync connection is allowed per - # given conn_id. + # Only one sliding sync connection is allowed per given conn_id (empty + # or not). conn_id = sync_config.conn_id or "" if sync_config.requester.device_id: