summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/16929.misc2
-rw-r--r--synapse/handlers/sync.py381
2 files changed, 238 insertions, 145 deletions
diff --git a/changelog.d/16929.misc b/changelog.d/16929.misc
new file mode 100644
index 0000000000..9489784e4a
--- /dev/null
+++ b/changelog.d/16929.misc
@@ -0,0 +1,2 @@
+Refactor state delta calculation in `/sync` handler.
+
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 5bb8a1439d..08fe4eb3b3 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1014,30 +1014,6 @@ class SyncHandler:
                     if event.is_state():
                         timeline_state[(event.type, event.state_key)] = event.event_id
 
-                if full_state:
-                    # always make sure we LL ourselves so we know we're in the room
-                    # (if we are) to fix https://github.com/vector-im/riot-web/issues/7209
-                    # We only need apply this on full state syncs given we disabled
-                    # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
-                    # We don't insert ourselves into `members_to_fetch`, because in some
-                    # rare cases (an empty event batch with a now_token after the user's
-                    # leave in a partial state room which another local user has
-                    # joined), the room state will be missing our membership and there
-                    # is no guarantee that our membership will be in the auth events of
-                    # timeline events when the room is partial stated.
-                    state_filter = StateFilter.from_lazy_load_member_list(
-                        members_to_fetch.union((sync_config.user.to_string(),))
-                    )
-                else:
-                    state_filter = StateFilter.from_lazy_load_member_list(
-                        members_to_fetch
-                    )
-
-                # We are happy to use partial state to compute the `/sync` response.
-                # Since partial state may not include the lazy-loaded memberships we
-                # require, we fix up the state response afterwards with memberships from
-                # auth events.
-                await_full_state = False
             else:
                 timeline_state = {
                     (event.type, event.state_key): event.event_id
@@ -1045,9 +1021,6 @@ class SyncHandler:
                     if event.is_state()
                 }
 
-                state_filter = StateFilter.all()
-                await_full_state = True
-
             # Now calculate the state to return in the sync response for the room.
             # This is more or less the change in state between the end of the previous
             # sync's timeline and the start of the current sync's timeline.
@@ -1057,131 +1030,28 @@ class SyncHandler:
             # whether the room is partial stated *before* fetching it.
             is_partial_state_room = await self.store.is_partial_state_room(room_id)
             if full_state:
-                if batch:
-                    state_at_timeline_end = (
-                        await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[-1].event_id,
-                            state_filter=state_filter,
-                            await_full_state=await_full_state,
-                        )
-                    )
-
-                    state_at_timeline_start = (
-                        await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[0].event_id,
-                            state_filter=state_filter,
-                            await_full_state=await_full_state,
-                        )
-                    )
-
-                else:
-                    state_at_timeline_end = await self.get_state_at(
-                        room_id,
-                        stream_position=now_token,
-                        state_filter=state_filter,
-                        await_full_state=await_full_state,
-                    )
-
-                    state_at_timeline_start = state_at_timeline_end
-
-                state_ids = _calculate_state(
-                    timeline_contains=timeline_state,
-                    timeline_start=state_at_timeline_start,
-                    timeline_end=state_at_timeline_end,
-                    previous_timeline_end={},
-                    lazy_load_members=lazy_load_members,
+                state_ids = await self._compute_state_delta_for_full_sync(
+                    room_id,
+                    sync_config.user,
+                    batch,
+                    now_token,
+                    members_to_fetch,
+                    timeline_state,
                 )
-            elif batch.limited:
-                if batch:
-                    state_at_timeline_start = (
-                        await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[0].event_id,
-                            state_filter=state_filter,
-                            await_full_state=await_full_state,
-                        )
-                    )
-                else:
-                    # We can get here if the user has ignored the senders of all
-                    # the recent events.
-                    state_at_timeline_start = await self.get_state_at(
-                        room_id,
-                        stream_position=now_token,
-                        state_filter=state_filter,
-                        await_full_state=await_full_state,
-                    )
-
-                # for now, we disable LL for gappy syncs - see
-                # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
-                # N.B. this slows down incr syncs as we are now processing way
-                # more state in the server than if we were LLing.
-                #
-                # We still have to filter timeline_start to LL entries (above) in order
-                # for _calculate_state's LL logic to work, as we have to include LL
-                # members for timeline senders in case they weren't loaded in the initial
-                # sync.  We do this by (counterintuitively) by filtering timeline_start
-                # members to just be ones which were timeline senders, which then ensures
-                # all of the rest get included in the state block (if we need to know
-                # about them).
-                state_filter = StateFilter.all()
-
+            else:
                 # If this is an initial sync then full_state should be set, and
                 # that case is handled above. We assert here to ensure that this
                 # is indeed the case.
                 assert since_token is not None
-                state_at_previous_sync = await self.get_state_at(
-                    room_id,
-                    stream_position=since_token,
-                    state_filter=state_filter,
-                    await_full_state=await_full_state,
-                )
 
-                if batch:
-                    state_at_timeline_end = (
-                        await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[-1].event_id,
-                            state_filter=state_filter,
-                            await_full_state=await_full_state,
-                        )
-                    )
-                else:
-                    # We can get here if the user has ignored the senders of all
-                    # the recent events.
-                    state_at_timeline_end = await self.get_state_at(
-                        room_id,
-                        stream_position=now_token,
-                        state_filter=state_filter,
-                        await_full_state=await_full_state,
-                    )
-
-                state_ids = _calculate_state(
-                    timeline_contains=timeline_state,
-                    timeline_start=state_at_timeline_start,
-                    timeline_end=state_at_timeline_end,
-                    previous_timeline_end=state_at_previous_sync,
-                    # we have to include LL members in case LL initial sync missed them
-                    lazy_load_members=lazy_load_members,
+                state_ids = await self._compute_state_delta_for_incremental_sync(
+                    room_id,
+                    batch,
+                    since_token,
+                    now_token,
+                    members_to_fetch,
+                    timeline_state,
                 )
-            else:
-                state_ids = {}
-                if lazy_load_members:
-                    if members_to_fetch and batch.events:
-                        # We're returning an incremental sync, with no
-                        # "gap" since the previous sync, so normally there would be
-                        # no state to return.
-                        # But we're lazy-loading, so the client might need some more
-                        # member events to understand the events in this timeline.
-                        # So we fish out all the member events corresponding to the
-                        # timeline here, and then dedupe any redundant ones below.
-
-                        state_ids = await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[0].event_id,
-                            # we only want members!
-                            state_filter=StateFilter.from_types(
-                                (EventTypes.Member, member)
-                                for member in members_to_fetch
-                            ),
-                            await_full_state=False,
-                        )
 
             # If we only have partial state for the room, `state_ids` may be missing the
             # memberships we wanted. We attempt to find some by digging through the auth
@@ -1245,6 +1115,227 @@ class SyncHandler:
             if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
         }
 
+    async def _compute_state_delta_for_full_sync(
+        self,
+        room_id: str,
+        syncing_user: UserID,
+        batch: TimelineBatch,
+        now_token: StreamToken,
+        members_to_fetch: Optional[Set[str]],
+        timeline_state: StateMap[str],
+    ) -> StateMap[str]:
+        """Calculate the state events to be included in a full sync response.
+
+        As with `_compute_state_delta_for_incremental_sync`, the result will include
+        the membership events for the senders of each event in `members_to_fetch`.
+
+        Args:
+            room_id: The room we are calculating for.
+            syncing_user: The user that is calling `/sync`.
+            batch: The timeline batch for the room that will be sent to the user.
+            now_token: Token of the end of the current batch.
+            members_to_fetch: If lazy-loading is enabled, the memberships needed for
+                events in the timeline.
+            timeline_state: The contribution to the room state from state events in
+                `batch`. Only contains the last event for any given state key.
+
+        Returns:
+            A map from (type, state_key) to event_id, for each event that we believe
+            should be included in the `state` part of the sync response.
+        """
+        if members_to_fetch is not None:
+            # Lazy-loading of membership events is enabled.
+            #
+            # Always make sure we load our own membership event so we know if
+            # we're in the room, to fix https://github.com/vector-im/riot-web/issues/7209.
+            #
+            # We only need apply this on full state syncs given we disabled
+            # LL for incr syncs in https://github.com/matrix-org/synapse/pull/3840.
+            #
+            # We don't insert ourselves into `members_to_fetch`, because in some
+            # rare cases (an empty event batch with a now_token after the user's
+            # leave in a partial state room which another local user has
+            # joined), the room state will be missing our membership and there
+            # is no guarantee that our membership will be in the auth events of
+            # timeline events when the room is partial stated.
+            state_filter = StateFilter.from_lazy_load_member_list(
+                members_to_fetch.union((syncing_user.to_string(),))
+            )
+
+            # We are happy to use partial state to compute the `/sync` response.
+            # Since partial state may not include the lazy-loaded memberships we
+            # require, we fix up the state response afterwards with memberships from
+            # auth events.
+            await_full_state = False
+            lazy_load_members = True
+        else:
+            state_filter = StateFilter.all()
+            await_full_state = True
+            lazy_load_members = False
+
+        if batch:
+            state_at_timeline_end = (
+                await self._state_storage_controller.get_state_ids_for_event(
+                    batch.events[-1].event_id,
+                    state_filter=state_filter,
+                    await_full_state=await_full_state,
+                )
+            )
+
+            state_at_timeline_start = (
+                await self._state_storage_controller.get_state_ids_for_event(
+                    batch.events[0].event_id,
+                    state_filter=state_filter,
+                    await_full_state=await_full_state,
+                )
+            )
+        else:
+            state_at_timeline_end = await self.get_state_at(
+                room_id,
+                stream_position=now_token,
+                state_filter=state_filter,
+                await_full_state=await_full_state,
+            )
+
+            state_at_timeline_start = state_at_timeline_end
+
+        state_ids = _calculate_state(
+            timeline_contains=timeline_state,
+            timeline_start=state_at_timeline_start,
+            timeline_end=state_at_timeline_end,
+            previous_timeline_end={},
+            lazy_load_members=lazy_load_members,
+        )
+        return state_ids
+
+    async def _compute_state_delta_for_incremental_sync(
+        self,
+        room_id: str,
+        batch: TimelineBatch,
+        since_token: StreamToken,
+        now_token: StreamToken,
+        members_to_fetch: Optional[Set[str]],
+        timeline_state: StateMap[str],
+    ) -> StateMap[str]:
+        """Calculate the state events to be included in an incremental sync response.
+
+        If lazy-loading of membership events is enabled (as indicated by
+        `members_to_fetch` being not-`None`), the result will include the membership
+        events for each member in `members_to_fetch`. The caller
+        (`compute_state_delta`) is responsible for keeping track of which membership
+        events we have already sent to the client, and hence ripping them out.
+
+        Args:
+            room_id: The room we are calculating for.
+            batch: The timeline batch for the room that will be sent to the user.
+            since_token: Token of the end of the previous batch.
+            now_token: Token of the end of the current batch.
+            members_to_fetch: If lazy-loading is enabled, the memberships needed for
+                events in the timeline. Otherwise, `None`.
+            timeline_state: The contribution to the room state from state events in
+                `batch`. Only contains the last event for any given state key.
+
+        Returns:
+            A map from (type, state_key) to event_id, for each event that we believe
+            should be included in the `state` part of the sync response.
+        """
+        if members_to_fetch is not None:
+            # Lazy-loading is enabled. Only return the state that is needed.
+            state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+            await_full_state = False
+            lazy_load_members = True
+        else:
+            state_filter = StateFilter.all()
+            await_full_state = True
+            lazy_load_members = False
+
+        if batch.limited:
+            if batch:
+                state_at_timeline_start = (
+                    await self._state_storage_controller.get_state_ids_for_event(
+                        batch.events[0].event_id,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
+                    )
+                )
+            else:
+                # We can get here if the user has ignored the senders of all
+                # the recent events.
+                state_at_timeline_start = await self.get_state_at(
+                    room_id,
+                    stream_position=now_token,
+                    state_filter=state_filter,
+                    await_full_state=await_full_state,
+                )
+
+            # for now, we disable LL for gappy syncs - see
+            # https://github.com/vector-im/riot-web/issues/7211#issuecomment-419976346
+            # N.B. this slows down incr syncs as we are now processing way
+            # more state in the server than if we were LLing.
+            #
+            # We still have to filter timeline_start to LL entries (above) in order
+            # for _calculate_state's LL logic to work, as we have to include LL
+            # members for timeline senders in case they weren't loaded in the initial
+            # sync.  We do this by (counterintuitively) by filtering timeline_start
+            # members to just be ones which were timeline senders, which then ensures
+            # all of the rest get included in the state block (if we need to know
+            # about them).
+            state_filter = StateFilter.all()
+
+            state_at_previous_sync = await self.get_state_at(
+                room_id,
+                stream_position=since_token,
+                state_filter=state_filter,
+                await_full_state=await_full_state,
+            )
+
+            if batch:
+                state_at_timeline_end = (
+                    await self._state_storage_controller.get_state_ids_for_event(
+                        batch.events[-1].event_id,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
+                    )
+                )
+            else:
+                # We can get here if the user has ignored the senders of all
+                # the recent events.
+                state_at_timeline_end = await self.get_state_at(
+                    room_id,
+                    stream_position=now_token,
+                    state_filter=state_filter,
+                    await_full_state=await_full_state,
+                )
+
+            state_ids = _calculate_state(
+                timeline_contains=timeline_state,
+                timeline_start=state_at_timeline_start,
+                timeline_end=state_at_timeline_end,
+                previous_timeline_end=state_at_previous_sync,
+                lazy_load_members=lazy_load_members,
+            )
+        else:
+            state_ids = {}
+            if lazy_load_members:
+                if members_to_fetch and batch.events:
+                    # We're returning an incremental sync, with no
+                    # "gap" since the previous sync, so normally there would be
+                    # no state to return.
+                    # But we're lazy-loading, so the client might need some more
+                    # member events to understand the events in this timeline.
+                    # So we fish out all the member events corresponding to the
+                    # timeline here. The caller will then dedupe any redundant ones.
+
+                    state_ids = await self._state_storage_controller.get_state_ids_for_event(
+                        batch.events[0].event_id,
+                        # we only want members!
+                        state_filter=StateFilter.from_types(
+                            (EventTypes.Member, member) for member in members_to_fetch
+                        ),
+                        await_full_state=False,
+                    )
+        return state_ids
+
     async def _find_missing_partial_state_memberships(
         self,
         room_id: str,