summary refs log tree commit diff
path: root/synapse/handlers/sliding_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/sliding_sync.py')
-rw-r--r--synapse/handlers/sliding_sync.py435
1 files changed, 388 insertions, 47 deletions
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index a38ff32a71..af5359123c 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -17,6 +17,7 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
+import enum
 import logging
 from enum import Enum
 from itertools import chain
@@ -26,23 +27,35 @@ from typing import (
     Dict,
     Final,
     List,
+    Literal,
     Mapping,
     Optional,
     Sequence,
     Set,
     Tuple,
+    Union,
 )
 
 import attr
 from immutabledict import immutabledict
 from typing_extensions import assert_never
 
-from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
-from synapse.events import EventBase
-from synapse.events.utils import strip_event
+from synapse.api.constants import (
+    AccountDataTypes,
+    Direction,
+    EventContentFields,
+    EventTypes,
+    Membership,
+)
+from synapse.events import EventBase, StrippedStateEvent
+from synapse.events.utils import parse_stripped_state_event, strip_event
 from synapse.handlers.relations import BundledAggregations
 from synapse.logging.opentracing import log_kv, start_active_span, tag_args, trace
 from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.databases.main.state import (
+    ROOM_UNKNOWN_SENTINEL,
+    Sentinel as StateSentinel,
+)
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
@@ -50,6 +63,7 @@ from synapse.types import (
     JsonDict,
     JsonMapping,
     MultiWriterStreamToken,
+    MutableStateMap,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
@@ -71,6 +85,12 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
+class Sentinel(enum.Enum):
+    # defining a sentinel in this way allows mypy to correctly handle the
+    # type of a dictionary lookup and subsequent type narrowing.
+    UNSET_SENTINEL = object()
+
+
 # The event types that clients should consider as new activity.
 DEFAULT_BUMP_EVENT_TYPES = {
     EventTypes.Create,
@@ -494,8 +514,7 @@ class SlidingSyncHandler:
 
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
-        # Keep track of the rooms that we're going to display and need to fetch more
-        # info about
+        # Keep track of the rooms that we can display and need to fetch more info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
         if has_lists and sync_config.lists is not None:
             sync_room_map = await self.filter_rooms_relevant_for_sync(
@@ -625,10 +644,11 @@ class SlidingSyncHandler:
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
-        all_rooms = set(relevant_room_map)
 
         # Filter out rooms that haven't received updates and we've sent down
         # previously.
+        # Keep track of the rooms that we're going to display and need to fetch more info about
+        relevant_rooms_to_send_map = relevant_room_map
         if from_token:
             rooms_should_send = set()
 
@@ -673,7 +693,7 @@ class SlidingSyncHandler:
                 relevant_room_map.keys(), from_token.stream_token.room_key
             )
             rooms_should_send.update(rooms_that_have_updates)
-            relevant_room_map = {
+            relevant_rooms_to_send_map = {
                 room_id: room_sync_config
                 for room_id, room_sync_config in relevant_room_map.items()
                 if room_id in rooms_should_send
@@ -685,7 +705,7 @@ class SlidingSyncHandler:
             room_sync_result = await self.get_room_sync_data(
                 sync_config=sync_config,
                 room_id=room_id,
-                room_sync_config=relevant_room_map[room_id],
+                room_sync_config=relevant_rooms_to_send_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
                     room_id
                 ],
@@ -697,14 +717,20 @@ class SlidingSyncHandler:
             if room_sync_result or not from_token:
                 rooms[room_id] = room_sync_result
 
-        if relevant_room_map:
+        if relevant_rooms_to_send_map:
             with start_active_span("sliding_sync.generate_room_entries"):
-                await concurrently_execute(handle_room, relevant_room_map, 10)
+                await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
             actual_lists=lists,
-            actual_room_ids=all_rooms,
+            # We're purposely using `relevant_room_map` instead of
+            # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could
+            # send regardless of whether they have an event update or not. The
+            # extensions care about more than just normal events in the rooms (like
+            # account data, read receipts, typing indicators, to-device messages, etc).
+            actual_room_ids=set(relevant_room_map.keys()),
+            actual_room_response_map=rooms,
             from_token=from_token,
             to_token=to_token,
         )
@@ -714,7 +740,7 @@ class SlidingSyncHandler:
                 sync_config=sync_config,
                 room_configs=relevant_room_map,
                 from_token=from_token,
-                sent_room_ids=relevant_room_map.keys(),
+                sent_room_ids=relevant_rooms_to_send_map.keys(),
                 # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
                 unsent_room_ids=[],
             )
@@ -1179,6 +1205,265 @@ class SlidingSyncHandler:
 
         # return None
 
+    async def _bulk_get_stripped_state_for_rooms_from_sync_room_map(
+        self,
+        room_ids: StrCollection,
+        sync_room_map: Dict[str, _RoomMembershipForUser],
+    ) -> Dict[str, Optional[StateMap[StrippedStateEvent]]]:
+        """
+        Fetch stripped state for a list of room IDs. Stripped state is only
+        applicable to invite/knock rooms. Other rooms will have `None` as their
+        stripped state.
+
+        For invite rooms, we pull from `unsigned.invite_room_state`.
+        For knock rooms, we pull from `unsigned.knock_room_state`.
+
+        Args:
+            room_ids: Room IDs to fetch stripped state for
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
+
+        Returns:
+            Mapping from room_id to mapping of (type, state_key) to stripped state
+            event.
+        """
+        room_id_to_stripped_state_map: Dict[
+            str, Optional[StateMap[StrippedStateEvent]]
+        ] = {}
+
+        # Fetch what we haven't before
+        room_ids_to_fetch = [
+            room_id
+            for room_id in room_ids
+            if room_id not in room_id_to_stripped_state_map
+        ]
+
+        # Gather a list of event IDs we can grab stripped state from
+        invite_or_knock_event_ids: List[str] = []
+        for room_id in room_ids_to_fetch:
+            if sync_room_map[room_id].membership in (
+                Membership.INVITE,
+                Membership.KNOCK,
+            ):
+                event_id = sync_room_map[room_id].event_id
+                # If this is an invite/knock then there should be an event_id
+                assert event_id is not None
+                invite_or_knock_event_ids.append(event_id)
+            else:
+                room_id_to_stripped_state_map[room_id] = None
+
+        invite_or_knock_events = await self.store.get_events(invite_or_knock_event_ids)
+        for invite_or_knock_event in invite_or_knock_events.values():
+            room_id = invite_or_knock_event.room_id
+            membership = invite_or_knock_event.membership
+
+            raw_stripped_state_events = None
+            if membership == Membership.INVITE:
+                invite_room_state = invite_or_knock_event.unsigned.get(
+                    "invite_room_state"
+                )
+                raw_stripped_state_events = invite_room_state
+            elif membership == Membership.KNOCK:
+                knock_room_state = invite_or_knock_event.unsigned.get(
+                    "knock_room_state"
+                )
+                raw_stripped_state_events = knock_room_state
+            else:
+                raise AssertionError(
+                    f"Unexpected membership {membership} (this is a problem with Synapse itself)"
+                )
+
+            stripped_state_map: Optional[MutableStateMap[StrippedStateEvent]] = None
+            # Scrutinize unsigned things. `raw_stripped_state_events` should be a list
+            # of stripped events
+            if raw_stripped_state_events is not None:
+                stripped_state_map = {}
+                if isinstance(raw_stripped_state_events, list):
+                    for raw_stripped_event in raw_stripped_state_events:
+                        stripped_state_event = parse_stripped_state_event(
+                            raw_stripped_event
+                        )
+                        if stripped_state_event is not None:
+                            stripped_state_map[
+                                (
+                                    stripped_state_event.type,
+                                    stripped_state_event.state_key,
+                                )
+                            ] = stripped_state_event
+
+            room_id_to_stripped_state_map[room_id] = stripped_state_map
+
+        return room_id_to_stripped_state_map
+
+    async def _bulk_get_partial_current_state_content_for_rooms(
+        self,
+        content_type: Literal[
+            # `content.type` from `EventTypes.Create``
+            "room_type",
+            # `content.algorithm` from `EventTypes.RoomEncryption`
+            "room_encryption",
+        ],
+        room_ids: Set[str],
+        sync_room_map: Dict[str, _RoomMembershipForUser],
+        to_token: StreamToken,
+        room_id_to_stripped_state_map: Dict[
+            str, Optional[StateMap[StrippedStateEvent]]
+        ],
+    ) -> Mapping[str, Union[Optional[str], StateSentinel]]:
+        """
+        Get the given state event content for a list of rooms. First we check the
+        current state of the room, then fallback to stripped state if available, then
+        historical state.
+
+        Args:
+            content_type: Which content to grab
+            room_ids: Room IDs to fetch the given content field for.
+            sync_room_map: Dictionary of room IDs to sort along with membership
+                information in the room at the time of `to_token`.
+            to_token: We filter based on the state of the room at this token
+            room_id_to_stripped_state_map: This does not need to be filled in before
+                calling this function. Mapping from room_id to mapping of (type, state_key)
+                to stripped state event. Modified in place when we fetch new rooms so we can
+                save work next time this function is called.
+
+        Returns:
+            A mapping from room ID to the state event content if the room has
+            the given state event (event_type, ""), otherwise `None`. Rooms unknown to
+            this server will return `ROOM_UNKNOWN_SENTINEL`.
+        """
+        room_id_to_content: Dict[str, Union[Optional[str], StateSentinel]] = {}
+
+        # As a bulk shortcut, use the current state if the server is particpating in the
+        # room (meaning we have current state). Ideally, for leave/ban rooms, we would
+        # want the state at the time of the membership instead of current state to not
+        # leak anything but we consider the create/encryption stripped state events to
+        # not be a secret given they are often set at the start of the room and they are
+        # normally handed out on invite/knock.
+        #
+        # Be mindful to only use this for non-sensitive details. For example, even
+        # though the room name/avatar/topic are also stripped state, they seem a lot
+        # more senstive to leak the current state value of.
+        #
+        # Since this function is cached, we need to make a mutable copy via
+        # `dict(...)`.
+        event_type = ""
+        event_content_field = ""
+        if content_type == "room_type":
+            event_type = EventTypes.Create
+            event_content_field = EventContentFields.ROOM_TYPE
+            room_id_to_content = dict(await self.store.bulk_get_room_type(room_ids))
+        elif content_type == "room_encryption":
+            event_type = EventTypes.RoomEncryption
+            event_content_field = EventContentFields.ENCRYPTION_ALGORITHM
+            room_id_to_content = dict(
+                await self.store.bulk_get_room_encryption(room_ids)
+            )
+        else:
+            assert_never(content_type)
+
+        room_ids_with_results = [
+            room_id
+            for room_id, content_field in room_id_to_content.items()
+            if content_field is not ROOM_UNKNOWN_SENTINEL
+        ]
+
+        # We might not have current room state for remote invite/knocks if we are
+        # the first person on our server to see the room. The best we can do is look
+        # in the optional stripped state from the invite/knock event.
+        room_ids_without_results = room_ids.difference(
+            chain(
+                room_ids_with_results,
+                [
+                    room_id
+                    for room_id, stripped_state_map in room_id_to_stripped_state_map.items()
+                    if stripped_state_map is not None
+                ],
+            )
+        )
+        room_id_to_stripped_state_map.update(
+            await self._bulk_get_stripped_state_for_rooms_from_sync_room_map(
+                room_ids_without_results, sync_room_map
+            )
+        )
+
+        # Update our `room_id_to_content` map based on the stripped state
+        # (applies to invite/knock rooms)
+        rooms_ids_without_stripped_state: Set[str] = set()
+        for room_id in room_ids_without_results:
+            stripped_state_map = room_id_to_stripped_state_map.get(
+                room_id, Sentinel.UNSET_SENTINEL
+            )
+            assert stripped_state_map is not Sentinel.UNSET_SENTINEL, (
+                f"Stripped state left unset for room {room_id}. "
+                + "Make sure you're calling `_bulk_get_stripped_state_for_rooms_from_sync_room_map(...)` "
+                + "with that room_id. (this is a problem with Synapse itself)"
+            )
+
+            # If there is some stripped state, we assume the remote server passed *all*
+            # of the potential stripped state events for the room.
+            if stripped_state_map is not None:
+                create_stripped_event = stripped_state_map.get((EventTypes.Create, ""))
+                stripped_event = stripped_state_map.get((event_type, ""))
+                # Sanity check that we at-least have the create event
+                if create_stripped_event is not None:
+                    if stripped_event is not None:
+                        room_id_to_content[room_id] = stripped_event.content.get(
+                            event_content_field
+                        )
+                    else:
+                        # Didn't see the state event we're looking for in the stripped
+                        # state so we can assume relevant content field is `None`.
+                        room_id_to_content[room_id] = None
+            else:
+                rooms_ids_without_stripped_state.add(room_id)
+
+        # Last resort, we might not have current room state for rooms that the
+        # server has left (no one local is in the room) but we can look at the
+        # historical state.
+        #
+        # Update our `room_id_to_content` map based on the state at the time of
+        # the membership event.
+        for room_id in rooms_ids_without_stripped_state:
+            # TODO: It would be nice to look this up in a bulk way (N+1 queries)
+            #
+            # TODO: `get_state_at(...)` doesn't take into account the "current state".
+            room_state = await self.storage_controllers.state.get_state_at(
+                room_id=room_id,
+                stream_position=to_token.copy_and_replace(
+                    StreamKeyType.ROOM,
+                    sync_room_map[room_id].event_pos.to_room_stream_token(),
+                ),
+                state_filter=StateFilter.from_types(
+                    [
+                        (EventTypes.Create, ""),
+                        (event_type, ""),
+                    ]
+                ),
+                # Partially-stated rooms should have all state events except for
+                # remote membership events so we don't need to wait at all because
+                # we only want the create event and some non-member event.
+                await_full_state=False,
+            )
+            # We can use the create event as a canary to tell whether the server has
+            # seen the room before
+            create_event = room_state.get((EventTypes.Create, ""))
+            state_event = room_state.get((event_type, ""))
+
+            if create_event is None:
+                # Skip for unknown rooms
+                continue
+
+            if state_event is not None:
+                room_id_to_content[room_id] = state_event.content.get(
+                    event_content_field
+                )
+            else:
+                # Didn't see the state event we're looking for in the stripped
+                # state so we can assume relevant content field is `None`.
+                room_id_to_content[room_id] = None
+
+        return room_id_to_content
+
     @trace
     async def filter_rooms(
         self,
@@ -1201,6 +1486,10 @@ class SlidingSyncHandler:
             A filtered dictionary of room IDs along with membership information in the
             room at the time of `to_token`.
         """
+        room_id_to_stripped_state_map: Dict[
+            str, Optional[StateMap[StrippedStateEvent]]
+        ] = {}
+
         filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
@@ -1220,31 +1509,34 @@ class SlidingSyncHandler:
                     if not sync_room_map[room_id].is_dm
                 }
 
-        if filters.spaces:
+        if filters.spaces is not None:
             raise NotImplementedError()
 
         # Filter for encrypted rooms
         if filters.is_encrypted is not None:
+            room_id_to_encryption = (
+                await self._bulk_get_partial_current_state_content_for_rooms(
+                    content_type="room_encryption",
+                    room_ids=filtered_room_id_set,
+                    to_token=to_token,
+                    sync_room_map=sync_room_map,
+                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                )
+            )
+
             # Make a copy so we don't run into an error: `Set changed size during
             # iteration`, when we filter out and remove items
             for room_id in filtered_room_id_set.copy():
-                state_at_to_token = await self.storage_controllers.state.get_state_at(
-                    room_id,
-                    to_token,
-                    state_filter=StateFilter.from_types(
-                        [(EventTypes.RoomEncryption, "")]
-                    ),
-                    # Partially-stated rooms should have all state events except for the
-                    # membership events so we don't need to wait because we only care
-                    # about retrieving the `EventTypes.RoomEncryption` state event here.
-                    # Plus we don't want to block the whole sync waiting for this one
-                    # room.
-                    await_full_state=False,
-                )
-                is_encrypted = state_at_to_token.get((EventTypes.RoomEncryption, ""))
+                encryption = room_id_to_encryption.get(room_id, ROOM_UNKNOWN_SENTINEL)
+
+                # Just remove rooms if we can't determine their encryption status
+                if encryption is ROOM_UNKNOWN_SENTINEL:
+                    filtered_room_id_set.remove(room_id)
+                    continue
 
                 # If we're looking for encrypted rooms, filter out rooms that are not
                 # encrypted and vice versa
+                is_encrypted = encryption is not None
                 if (filters.is_encrypted and not is_encrypted) or (
                     not filters.is_encrypted and is_encrypted
                 ):
@@ -1270,15 +1562,26 @@ class SlidingSyncHandler:
         # provided in the list. `None` is a valid type for rooms which do not have a
         # room type.
         if filters.room_types is not None or filters.not_room_types is not None:
-            room_to_type = await self.store.bulk_get_room_type(
-                {
-                    room_id
-                    for room_id in filtered_room_id_set
-                    # We only know the room types for joined rooms
-                    if sync_room_map[room_id].membership == Membership.JOIN
-                }
+            room_id_to_type = (
+                await self._bulk_get_partial_current_state_content_for_rooms(
+                    content_type="room_type",
+                    room_ids=filtered_room_id_set,
+                    to_token=to_token,
+                    sync_room_map=sync_room_map,
+                    room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+                )
             )
-            for room_id, room_type in room_to_type.items():
+
+            # Make a copy so we don't run into an error: `Set changed size during
+            # iteration`, when we filter out and remove items
+            for room_id in filtered_room_id_set.copy():
+                room_type = room_id_to_type.get(room_id, ROOM_UNKNOWN_SENTINEL)
+
+                # Just remove rooms if we can't determine their type
+                if room_type is ROOM_UNKNOWN_SENTINEL:
+                    filtered_room_id_set.remove(room_id)
+                    continue
+
                 if (
                     filters.room_types is not None
                     and room_type not in filters.room_types
@@ -1291,13 +1594,24 @@ class SlidingSyncHandler:
                 ):
                     filtered_room_id_set.remove(room_id)
 
-        if filters.room_name_like:
+        if filters.room_name_like is not None:
+            # TODO: The room name is a bit more sensitive to leak than the
+            # create/encryption event. Maybe we should consider a better way to fetch
+            # historical state before implementing this.
+            #
+            # room_id_to_create_content = await self._bulk_get_partial_current_state_content_for_rooms(
+            #     content_type="room_name",
+            #     room_ids=filtered_room_id_set,
+            #     to_token=to_token,
+            #     sync_room_map=sync_room_map,
+            #     room_id_to_stripped_state_map=room_id_to_stripped_state_map,
+            # )
             raise NotImplementedError()
 
-        if filters.tags:
+        if filters.tags is not None:
             raise NotImplementedError()
 
-        if filters.not_tags:
+        if filters.not_tags is not None:
             raise NotImplementedError()
 
         # Assemble a new sync room map but only with the `filtered_room_id_set`
@@ -1378,14 +1692,17 @@ class SlidingSyncHandler:
                 in the room at the time of `to_token`.
             to_token: The point in the stream to sync up to.
         """
-        room_state_ids: StateMap[str]
+        state_ids: StateMap[str]
         # People shouldn't see past their leave/ban event
         if room_membership_for_user_at_to_token.membership in (
             Membership.LEAVE,
             Membership.BAN,
         ):
-            # TODO: `get_state_ids_at(...)` doesn't take into account the "current state"
-            room_state_ids = await self.storage_controllers.state.get_state_ids_at(
+            # TODO: `get_state_ids_at(...)` doesn't take into account the "current
+            # state". Maybe we need to use
+            # `get_forward_extremities_for_room_at_stream_ordering(...)` to "Fetch the
+            # current state at the time."
+            state_ids = await self.storage_controllers.state.get_state_ids_at(
                 room_id,
                 stream_position=to_token.copy_and_replace(
                     StreamKeyType.ROOM,
@@ -1404,7 +1721,7 @@ class SlidingSyncHandler:
             )
         # Otherwise, we can get the latest current state in the room
         else:
-            room_state_ids = await self.storage_controllers.state.get_current_state_ids(
+            state_ids = await self.storage_controllers.state.get_current_state_ids(
                 room_id,
                 state_filter,
                 # Partially-stated rooms should have all state events except for
@@ -1419,7 +1736,7 @@ class SlidingSyncHandler:
             )
             # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
 
-        return room_state_ids
+        return state_ids
 
     async def get_current_state_at(
         self,
@@ -1439,17 +1756,17 @@ class SlidingSyncHandler:
                 in the room at the time of `to_token`.
             to_token: The point in the stream to sync up to.
         """
-        room_state_ids = await self.get_current_state_ids_at(
+        state_ids = await self.get_current_state_ids_at(
             room_id=room_id,
             room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
             state_filter=state_filter,
             to_token=to_token,
         )
 
-        event_map = await self.store.get_events(list(room_state_ids.values()))
+        event_map = await self.store.get_events(list(state_ids.values()))
 
         state_map = {}
-        for key, event_id in room_state_ids.items():
+        for key, event_id in state_ids.items():
             event = event_map.get(event_id)
             if event:
                 state_map[key] = event
@@ -1958,6 +2275,7 @@ class SlidingSyncHandler:
         sync_config: SlidingSyncConfig,
         actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
         actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
@@ -1968,6 +2286,8 @@ class SlidingSyncHandler:
             actual_lists: Sliding window API. A map of list key to list results in the
                 Sliding Sync response.
             actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
         """
@@ -2009,6 +2329,7 @@ class SlidingSyncHandler:
                 sync_config=sync_config,
                 actual_lists=actual_lists,
                 actual_room_ids=actual_room_ids,
+                actual_room_response_map=actual_room_response_map,
                 receipts_request=sync_config.extensions.receipts,
                 to_token=to_token,
                 from_token=from_token,
@@ -2310,12 +2631,12 @@ class SlidingSyncHandler:
             account_data_by_room_map=account_data_by_room_map,
         )
 
-    @trace
     async def get_receipts_extension_response(
         self,
         sync_config: SlidingSyncConfig,
         actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
         actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
         receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
@@ -2327,6 +2648,8 @@ class SlidingSyncHandler:
             actual_lists: Sliding window API. A map of list key to list results in the
                 Sliding Sync response.
             actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
             account_data_request: The account_data extension from the request
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
@@ -2364,6 +2687,24 @@ class SlidingSyncHandler:
                 room_id = receipt["room_id"]
                 type = receipt["type"]
                 content = receipt["content"]
+
+                room_result = actual_room_response_map.get(room_id)
+                if room_result is not None:
+                    if room_result.initial:
+                        # TODO: In the future, it would be good to fetch less receipts
+                        # out of the database in the first place but we would need to
+                        # add a new `event_id` index to `receipts_linearized`.
+                        relevant_event_ids = [
+                            event.event_id for event in room_result.timeline_events
+                        ]
+
+                        assert isinstance(content, dict)
+                        content = {
+                            event_id: content_value
+                            for event_id, content_value in content.items()
+                            if event_id in relevant_event_ids
+                        }
+
                 room_id_to_receipt_map[room_id] = {"type": type, "content": content}
 
         return SlidingSyncResult.Extensions.ReceiptsExtension(