diff options
-rw-r--r-- | changelog.d/16929.misc | 2 | ||||
-rw-r--r-- | synapse/handlers/sync.py | 381 |
2 files changed, 238 insertions, 145 deletions
diff --git a/changelog.d/16929.misc b/changelog.d/16929.misc new file mode 100644 index 0000000000..9489784e4a --- /dev/null +++ b/changelog.d/16929.misc @@ -0,0 +1,2 @@ +Refactor state delta calculation in `/sync` handler. + diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py index 5bb8a1439d..08fe4eb3b3 100644 --- a/synapse/handlers/sync.py +++ b/synapse/handlers/sync.py @@ -1014,30 +1014,6 @@ class SyncHandler: if event.is_state(): timeline_state[(event.type, event.state_key)] = event.event_id - if full_state: - # always make sure we LL ourselves so we know we're in the room - # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209 - # We only need apply this on full state syncs given we disabled - # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840. - # We don't insert ourselves into `members_to_fetch`, because in some - # rare cases (an empty event batch with a now_token after the user's - # leave in a partial state room which another local user has - # joined), the room state will be missing our membership and there - # is no guarantee that our membership will be in the auth events of - # timeline events when the room is partial stated. - state_filter = StateFilter.from_lazy_load_member_list( - members_to_fetch.union((sync_config.user.to_string(),)) - ) - else: - state_filter = StateFilter.from_lazy_load_member_list( - members_to_fetch - ) - - # We are happy to use partial state to compute the `/sync` response. - # Since partial state may not include the lazy-loaded memberships we - # require, we fix up the state response afterwards with memberships from - # auth events. - await_full_state = False else: timeline_state = { (event.type, event.state_key): event.event_id @@ -1045,9 +1021,6 @@ class SyncHandler: if event.is_state() } - state_filter = StateFilter.all() - await_full_state = True - # Now calculate the state to return in the sync response for the room. # This is more or less the change in state between the end of the previous # sync's timeline and the start of the current sync's timeline. @@ -1057,131 +1030,28 @@ class SyncHandler: # whether the room is partial stated *before* fetching it. is_partial_state_room = await self.store.is_partial_state_room(room_id) if full_state: - if batch: - state_at_timeline_end = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) - - state_at_timeline_start = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) - - else: - state_at_timeline_end = await self.get_state_at( - room_id, - stream_position=now_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) - - state_at_timeline_start = state_at_timeline_end - - state_ids = _calculate_state( - timeline_contains=timeline_state, - timeline_start=state_at_timeline_start, - timeline_end=state_at_timeline_end, - previous_timeline_end={}, - lazy_load_members=lazy_load_members, + state_ids = await self._compute_state_delta_for_full_sync( + room_id, + sync_config.user, + batch, + now_token, + members_to_fetch, + timeline_state, ) - elif batch.limited: - if batch: - state_at_timeline_start = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) - else: - # We can get here if the user has ignored the senders of all - # the recent events. - state_at_timeline_start = await self.get_state_at( - room_id, - stream_position=now_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) - - # for now, we disable LL for gappy syncs - see - # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346 - # N.B. this slows down incr syncs as we are now processing way - # more state in the server than if we were LLing. - # - # We still have to filter timeline_start to LL entries (above) in order - # for _calculate_state's LL logic to work, as we have to include LL - # members for timeline senders in case they weren't loaded in the initial - # sync. We do this by (counterintuitively) by filtering timeline_start - # members to just be ones which were timeline senders, which then ensures - # all of the rest get included in the state block (if we need to know - # about them). - state_filter = StateFilter.all() - + else: # If this is an initial sync then full_state should be set, and # that case is handled above. We assert here to ensure that this # is indeed the case. assert since_token is not None - state_at_previous_sync = await self.get_state_at( - room_id, - stream_position=since_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) - if batch: - state_at_timeline_end = ( - await self._state_storage_controller.get_state_ids_for_event( - batch.events[-1].event_id, - state_filter=state_filter, - await_full_state=await_full_state, - ) - ) - else: - # We can get here if the user has ignored the senders of all - # the recent events. - state_at_timeline_end = await self.get_state_at( - room_id, - stream_position=now_token, - state_filter=state_filter, - await_full_state=await_full_state, - ) - - state_ids = _calculate_state( - timeline_contains=timeline_state, - timeline_start=state_at_timeline_start, - timeline_end=state_at_timeline_end, - previous_timeline_end=state_at_previous_sync, - # we have to include LL members in case LL initial sync missed them - lazy_load_members=lazy_load_members, + state_ids = await self._compute_state_delta_for_incremental_sync( + room_id, + batch, + since_token, + now_token, + members_to_fetch, + timeline_state, ) - else: - state_ids = {} - if lazy_load_members: - if members_to_fetch and batch.events: - # We're returning an incremental sync, with no - # "gap" since the previous sync, so normally there would be - # no state to return. - # But we're lazy-loading, so the client might need some more - # member events to understand the events in this timeline. - # So we fish out all the member events corresponding to the - # timeline here, and then dedupe any redundant ones below. - - state_ids = await self._state_storage_controller.get_state_ids_for_event( - batch.events[0].event_id, - # we only want members! - state_filter=StateFilter.from_types( - (EventTypes.Member, member) - for member in members_to_fetch - ), - await_full_state=False, - ) # If we only have partial state for the room, `state_ids` may be missing the # memberships we wanted. We attempt to find some by digging through the auth @@ -1245,6 +1115,227 @@ class SyncHandler: if e.type != EventTypes.Aliases # until MSC2261 or alternative solution } + async def _compute_state_delta_for_full_sync( + self, + room_id: str, + syncing_user: UserID, + batch: TimelineBatch, + now_token: StreamToken, + members_to_fetch: Optional[Set[str]], + timeline_state: StateMap[str], + ) -> StateMap[str]: + """Calculate the state events to be included in a full sync response. + + As with `_compute_state_delta_for_incremental_sync`, the result will include + the membership events for the senders of each event in `members_to_fetch`. + + Args: + room_id: The room we are calculating for. + syncing_user: The user that is calling `/sync`. + batch: The timeline batch for the room that will be sent to the user. + now_token: Token of the end of the current batch. + members_to_fetch: If lazy-loading is enabled, the memberships needed for + events in the timeline. + timeline_state: The contribution to the room state from state events in + `batch`. Only contains the last event for any given state key. + + Returns: + A map from (type, state_key) to event_id, for each event that we believe + should be included in the `state` part of the sync response. + """ + if members_to_fetch is not None: + # Lazy-loading of membership events is enabled. + # + # Always make sure we load our own membership event so we know if + # we're in the room, to fix https://github.com/vector-im/riot-web/issues/7209. + # + # We only need apply this on full state syncs given we disabled + # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840. + # + # We don't insert ourselves into `members_to_fetch`, because in some + # rare cases (an empty event batch with a now_token after the user's + # leave in a partial state room which another local user has + # joined), the room state will be missing our membership and there + # is no guarantee that our membership will be in the auth events of + # timeline events when the room is partial stated. + state_filter = StateFilter.from_lazy_load_member_list( + members_to_fetch.union((syncing_user.to_string(),)) + ) + + # We are happy to use partial state to compute the `/sync` response. + # Since partial state may not include the lazy-loaded memberships we + # require, we fix up the state response afterwards with memberships from + # auth events. + await_full_state = False + lazy_load_members = True + else: + state_filter = StateFilter.all() + await_full_state = True + lazy_load_members = False + + if batch: + state_at_timeline_end = ( + await self._state_storage_controller.get_state_ids_for_event( + batch.events[-1].event_id, + state_filter=state_filter, + await_full_state=await_full_state, + ) + ) + + state_at_timeline_start = ( + await self._state_storage_controller.get_state_ids_for_event( + batch.events[0].event_id, + state_filter=state_filter, + await_full_state=await_full_state, + ) + ) + else: + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + + state_at_timeline_start = state_at_timeline_end + + state_ids = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state_at_timeline_start, + timeline_end=state_at_timeline_end, + previous_timeline_end={}, + lazy_load_members=lazy_load_members, + ) + return state_ids + + async def _compute_state_delta_for_incremental_sync( + self, + room_id: str, + batch: TimelineBatch, + since_token: StreamToken, + now_token: StreamToken, + members_to_fetch: Optional[Set[str]], + timeline_state: StateMap[str], + ) -> StateMap[str]: + """Calculate the state events to be included in an incremental sync response. + + If lazy-loading of membership events is enabled (as indicated by + `members_to_fetch` being not-`None`), the result will include the membership + events for each member in `members_to_fetch`. The caller + (`compute_state_delta`) is responsible for keeping track of which membership + events we have already sent to the client, and hence ripping them out. + + Args: + room_id: The room we are calculating for. + batch: The timeline batch for the room that will be sent to the user. + since_token: Token of the end of the previous batch. + now_token: Token of the end of the current batch. + members_to_fetch: If lazy-loading is enabled, the memberships needed for + events in the timeline. Otherwise, `None`. + timeline_state: The contribution to the room state from state events in + `batch`. Only contains the last event for any given state key. + + Returns: + A map from (type, state_key) to event_id, for each event that we believe + should be included in the `state` part of the sync response. + """ + if members_to_fetch is not None: + # Lazy-loading is enabled. Only return the state that is needed. + state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch) + await_full_state = False + lazy_load_members = True + else: + state_filter = StateFilter.all() + await_full_state = True + lazy_load_members = False + + if batch.limited: + if batch: + state_at_timeline_start = ( + await self._state_storage_controller.get_state_ids_for_event( + batch.events[0].event_id, + state_filter=state_filter, + await_full_state=await_full_state, + ) + ) + else: + # We can get here if the user has ignored the senders of all + # the recent events. + state_at_timeline_start = await self.get_state_at( + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + + # for now, we disable LL for gappy syncs - see + # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346 + # N.B. this slows down incr syncs as we are now processing way + # more state in the server than if we were LLing. + # + # We still have to filter timeline_start to LL entries (above) in order + # for _calculate_state's LL logic to work, as we have to include LL + # members for timeline senders in case they weren't loaded in the initial + # sync. We do this by (counterintuitively) by filtering timeline_start + # members to just be ones which were timeline senders, which then ensures + # all of the rest get included in the state block (if we need to know + # about them). + state_filter = StateFilter.all() + + state_at_previous_sync = await self.get_state_at( + room_id, + stream_position=since_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + + if batch: + state_at_timeline_end = ( + await self._state_storage_controller.get_state_ids_for_event( + batch.events[-1].event_id, + state_filter=state_filter, + await_full_state=await_full_state, + ) + ) + else: + # We can get here if the user has ignored the senders of all + # the recent events. + state_at_timeline_end = await self.get_state_at( + room_id, + stream_position=now_token, + state_filter=state_filter, + await_full_state=await_full_state, + ) + + state_ids = _calculate_state( + timeline_contains=timeline_state, + timeline_start=state_at_timeline_start, + timeline_end=state_at_timeline_end, + previous_timeline_end=state_at_previous_sync, + lazy_load_members=lazy_load_members, + ) + else: + state_ids = {} + if lazy_load_members: + if members_to_fetch and batch.events: + # We're returning an incremental sync, with no + # "gap" since the previous sync, so normally there would be + # no state to return. + # But we're lazy-loading, so the client might need some more + # member events to understand the events in this timeline. + # So we fish out all the member events corresponding to the + # timeline here. The caller will then dedupe any redundant ones. + + state_ids = await self._state_storage_controller.get_state_ids_for_event( + batch.events[0].event_id, + # we only want members! + state_filter=StateFilter.from_types( + (EventTypes.Member, member) for member in members_to_fetch + ), + await_full_state=False, + ) + return state_ids + async def _find_missing_partial_state_memberships( self, room_id: str, |