diff --git a/changelog.d/16929.misc b/changelog.d/16929.misc
new file mode 100644
index 0000000000..9489784e4a
--- /dev/null
+++ b/changelog.d/16929.misc
@@ -0,0 +1,2 @@
+Refactor state delta calculation in `/sync` handler.
+
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5bb8a1439d..08fe4eb3b3 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1014,30 +1014,6 @@ class SyncHandler:
if event.is_state():
timeline_state[(event.type, event.state_key)] = event.event_id
- if full_state:
- # always make sure we LL ourselves so we know we're in the room
- # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
- # We only need apply this on full state syncs given we disabled
- # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
- # We don't insert ourselves into `members_to_fetch`, because in some
- # rare cases (an empty event batch with a now_token after the user's
- # leave in a partial state room which another local user has
- # joined), the room state will be missing our membership and there
- # is no guarantee that our membership will be in the auth events of
- # timeline events when the room is partial stated.
- state_filter = StateFilter.from_lazy_load_member_list(
- members_to_fetch.union((sync_config.user.to_string(),))
- )
- else:
- state_filter = StateFilter.from_lazy_load_member_list(
- members_to_fetch
- )
-
- # We are happy to use partial state to compute the `/sync` response.
- # Since partial state may not include the lazy-loaded memberships we
- # require, we fix up the state response afterwards with memberships from
- # auth events.
- await_full_state = False
else:
timeline_state = {
(event.type, event.state_key): event.event_id
@@ -1045,9 +1021,6 @@ class SyncHandler:
if event.is_state()
}
- state_filter = StateFilter.all()
- await_full_state = True
-
# Now calculate the state to return in the sync response for the room.
# This is more or less the change in state between the end of the previous
# sync's timeline and the start of the current sync's timeline.
@@ -1057,131 +1030,28 @@ class SyncHandler:
# whether the room is partial stated *before* fetching it.
is_partial_state_room = await self.store.is_partial_state_room(room_id)
if full_state:
- if batch:
- state_at_timeline_end = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
-
- state_at_timeline_start = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
-
- else:
- state_at_timeline_end = await self.get_state_at(
- room_id,
- stream_position=now_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
-
- state_at_timeline_start = state_at_timeline_end
-
- state_ids = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state_at_timeline_start,
- timeline_end=state_at_timeline_end,
- previous_timeline_end={},
- lazy_load_members=lazy_load_members,
+ state_ids = await self._compute_state_delta_for_full_sync(
+ room_id,
+ sync_config.user,
+ batch,
+ now_token,
+ members_to_fetch,
+ timeline_state,
)
- elif batch.limited:
- if batch:
- state_at_timeline_start = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
- else:
- # We can get here if the user has ignored the senders of all
- # the recent events.
- state_at_timeline_start = await self.get_state_at(
- room_id,
- stream_position=now_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
-
- # for now, we disable LL for gappy syncs - see
- # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
- # N.B. this slows down incr syncs as we are now processing way
- # more state in the server than if we were LLing.
- #
- # We still have to filter timeline_start to LL entries (above) in order
- # for _calculate_state's LL logic to work, as we have to include LL
- # members for timeline senders in case they weren't loaded in the initial
- # sync. We do this by (counterintuitively) by filtering timeline_start
- # members to just be ones which were timeline senders, which then ensures
- # all of the rest get included in the state block (if we need to know
- # about them).
- state_filter = StateFilter.all()
-
+ else:
# If this is an initial sync then full_state should be set, and
# that case is handled above. We assert here to ensure that this
# is indeed the case.
assert since_token is not None
- state_at_previous_sync = await self.get_state_at(
- room_id,
- stream_position=since_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- if batch:
- state_at_timeline_end = (
- await self._state_storage_controller.get_state_ids_for_event(
- batch.events[-1].event_id,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
- )
- else:
- # We can get here if the user has ignored the senders of all
- # the recent events.
- state_at_timeline_end = await self.get_state_at(
- room_id,
- stream_position=now_token,
- state_filter=state_filter,
- await_full_state=await_full_state,
- )
-
- state_ids = _calculate_state(
- timeline_contains=timeline_state,
- timeline_start=state_at_timeline_start,
- timeline_end=state_at_timeline_end,
- previous_timeline_end=state_at_previous_sync,
- # we have to include LL members in case LL initial sync missed them
- lazy_load_members=lazy_load_members,
+ state_ids = await self._compute_state_delta_for_incremental_sync(
+ room_id,
+ batch,
+ since_token,
+ now_token,
+ members_to_fetch,
+ timeline_state,
)
- else:
- state_ids = {}
- if lazy_load_members:
- if members_to_fetch and batch.events:
- # We're returning an incremental sync, with no
- # "gap" since the previous sync, so normally there would be
- # no state to return.
- # But we're lazy-loading, so the client might need some more
- # member events to understand the events in this timeline.
- # So we fish out all the member events corresponding to the
- # timeline here, and then dedupe any redundant ones below.
-
- state_ids = await self._state_storage_controller.get_state_ids_for_event(
- batch.events[0].event_id,
- # we only want members!
- state_filter=StateFilter.from_types(
- (EventTypes.Member, member)
- for member in members_to_fetch
- ),
- await_full_state=False,
- )
# If we only have partial state for the room, `state_ids` may be missing the
# memberships we wanted. We attempt to find some by digging through the auth
@@ -1245,6 +1115,227 @@ class SyncHandler:
if e.type != EventTypes.Aliases # until MSC2261 or alternative solution
}
+ async def _compute_state_delta_for_full_sync(
+ self,
+ room_id: str,
+ syncing_user: UserID,
+ batch: TimelineBatch,
+ now_token: StreamToken,
+ members_to_fetch: Optional[Set[str]],
+ timeline_state: StateMap[str],
+ ) -> StateMap[str]:
+ """Calculate the state events to be included in a full sync response.
+
+ As with `_compute_state_delta_for_incremental_sync`, the result will include
+ the membership events for the senders of each event in `members_to_fetch`.
+
+ Args:
+ room_id: The room we are calculating for.
+ syncing_user: The user that is calling `/sync`.
+ batch: The timeline batch for the room that will be sent to the user.
+ now_token: Token of the end of the current batch.
+ members_to_fetch: If lazy-loading is enabled, the memberships needed for
+ events in the timeline.
+ timeline_state: The contribution to the room state from state events in
+ `batch`. Only contains the last event for any given state key.
+
+ Returns:
+ A map from (type, state_key) to event_id, for each event that we believe
+ should be included in the `state` part of the sync response.
+ """
+ if members_to_fetch is not None:
+ # Lazy-loading of membership events is enabled.
+ #
+ # Always make sure we load our own membership event so we know if
+ # we're in the room, to fix https://github.com/vector-im/riot-web/issues/7209.
+ #
+ # We only need apply this on full state syncs given we disabled
+ # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
+ #
+ # We don't insert ourselves into `members_to_fetch`, because in some
+ # rare cases (an empty event batch with a now_token after the user's
+ # leave in a partial state room which another local user has
+ # joined), the room state will be missing our membership and there
+ # is no guarantee that our membership will be in the auth events of
+ # timeline events when the room is partial stated.
+ state_filter = StateFilter.from_lazy_load_member_list(
+ members_to_fetch.union((syncing_user.to_string(),))
+ )
+
+ # We are happy to use partial state to compute the `/sync` response.
+ # Since partial state may not include the lazy-loaded memberships we
+ # require, we fix up the state response afterwards with memberships from
+ # auth events.
+ await_full_state = False
+ lazy_load_members = True
+ else:
+ state_filter = StateFilter.all()
+ await_full_state = True
+ lazy_load_members = False
+
+ if batch:
+ state_at_timeline_end = (
+ await self._state_storage_controller.get_state_ids_for_event(
+ batch.events[-1].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+ )
+
+ state_at_timeline_start = (
+ await self._state_storage_controller.get_state_ids_for_event(
+ batch.events[0].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+ )
+ else:
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+
+ state_at_timeline_start = state_at_timeline_end
+
+ state_ids = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end={},
+ lazy_load_members=lazy_load_members,
+ )
+ return state_ids
+
+ async def _compute_state_delta_for_incremental_sync(
+ self,
+ room_id: str,
+ batch: TimelineBatch,
+ since_token: StreamToken,
+ now_token: StreamToken,
+ members_to_fetch: Optional[Set[str]],
+ timeline_state: StateMap[str],
+ ) -> StateMap[str]:
+ """Calculate the state events to be included in an incremental sync response.
+
+ If lazy-loading of membership events is enabled (as indicated by
+ `members_to_fetch` being not-`None`), the result will include the membership
+ events for each member in `members_to_fetch`. The caller
+ (`compute_state_delta`) is responsible for keeping track of which membership
+ events we have already sent to the client, and hence ripping them out.
+
+ Args:
+ room_id: The room we are calculating for.
+ batch: The timeline batch for the room that will be sent to the user.
+ since_token: Token of the end of the previous batch.
+ now_token: Token of the end of the current batch.
+ members_to_fetch: If lazy-loading is enabled, the memberships needed for
+ events in the timeline. Otherwise, `None`.
+ timeline_state: The contribution to the room state from state events in
+ `batch`. Only contains the last event for any given state key.
+
+ Returns:
+ A map from (type, state_key) to event_id, for each event that we believe
+ should be included in the `state` part of the sync response.
+ """
+ if members_to_fetch is not None:
+ # Lazy-loading is enabled. Only return the state that is needed.
+ state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+ await_full_state = False
+ lazy_load_members = True
+ else:
+ state_filter = StateFilter.all()
+ await_full_state = True
+ lazy_load_members = False
+
+ if batch.limited:
+ if batch:
+ state_at_timeline_start = (
+ await self._state_storage_controller.get_state_ids_for_event(
+ batch.events[0].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+ )
+ else:
+ # We can get here if the user has ignored the senders of all
+ # the recent events.
+ state_at_timeline_start = await self.get_state_at(
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+
+ # for now, we disable LL for gappy syncs - see
+ # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
+ # N.B. this slows down incr syncs as we are now processing way
+ # more state in the server than if we were LLing.
+ #
+ # We still have to filter timeline_start to LL entries (above) in order
+ # for _calculate_state's LL logic to work, as we have to include LL
+ # members for timeline senders in case they weren't loaded in the initial
+ # sync. We do this by (counterintuitively) by filtering timeline_start
+ # members to just be ones which were timeline senders, which then ensures
+ # all of the rest get included in the state block (if we need to know
+ # about them).
+ state_filter = StateFilter.all()
+
+ state_at_previous_sync = await self.get_state_at(
+ room_id,
+ stream_position=since_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+
+ if batch:
+ state_at_timeline_end = (
+ await self._state_storage_controller.get_state_ids_for_event(
+ batch.events[-1].event_id,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+ )
+ else:
+ # We can get here if the user has ignored the senders of all
+ # the recent events.
+ state_at_timeline_end = await self.get_state_at(
+ room_id,
+ stream_position=now_token,
+ state_filter=state_filter,
+ await_full_state=await_full_state,
+ )
+
+ state_ids = _calculate_state(
+ timeline_contains=timeline_state,
+ timeline_start=state_at_timeline_start,
+ timeline_end=state_at_timeline_end,
+ previous_timeline_end=state_at_previous_sync,
+ lazy_load_members=lazy_load_members,
+ )
+ else:
+ state_ids = {}
+ if lazy_load_members:
+ if members_to_fetch and batch.events:
+ # We're returning an incremental sync, with no
+ # "gap" since the previous sync, so normally there would be
+ # no state to return.
+ # But we're lazy-loading, so the client might need some more
+ # member events to understand the events in this timeline.
+ # So we fish out all the member events corresponding to the
+ # timeline here. The caller will then dedupe any redundant ones.
+
+ state_ids = await self._state_storage_controller.get_state_ids_for_event(
+ batch.events[0].event_id,
+ # we only want members!
+ state_filter=StateFilter.from_types(
+ (EventTypes.Member, member) for member in members_to_fetch
+ ),
+ await_full_state=False,
+ )
+ return state_ids
+
async def _find_missing_partial_state_memberships(
self,
room_id: str,
|