summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-07-30 12:49:55 -0500
committerGitHub <noreply@github.com>2024-07-30 12:49:55 -0500
commitb221f0b84b984d236ea11383cc21f6d07ca3c2ec (patch)
treeaa1ddc9638a46f057f4b138a0483ae81385f1dd9
parentMerge branch 'master' into develop (diff)
downloadsynapse-b221f0b84b984d236ea11383cc21f6d07ca3c2ec.tar.xz
Sliding Sync: Add receipts extension (MSC3960) (#17489)
[MSC3960](https://github.com/matrix-org/matrix-spec-proposals/pull/3960): Receipts extension

Based on
[MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575):
Sliding Sync
-rw-r--r--changelog.d/17489.feature1
-rw-r--r--synapse/handlers/receipts.py4
-rw-r--r--synapse/handlers/sliding_sync.py268
-rw-r--r--synapse/rest/client/sync.py6
-rw-r--r--synapse/types/handlers/__init__.py20
-rw-r--r--synapse/types/rest/client/__init__.py18
-rw-r--r--tests/rest/client/test_sync.py1021
7 files changed, 1070 insertions, 268 deletions
diff --git a/changelog.d/17489.feature b/changelog.d/17489.feature
new file mode 100644
index 0000000000..5ace1e675e
--- /dev/null
+++ b/changelog.d/17489.feature
@@ -0,0 +1 @@
+Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 8674a8fcdd..d04c76be2a 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -286,8 +286,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
+        to_key: Optional[MultiWriterStreamToken] = None,
     ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
-        to_key = self.get_current_key()
+        if to_key is None:
+            to_key = self.get_current_key()
 
         if from_key == to_key:
             return [], to_key
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 73414dbf69..7a734f6712 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -49,6 +49,7 @@ from synapse.types import (
     DeviceListUpdates,
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
@@ -493,8 +494,7 @@ class SlidingSyncHandler:
 
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
-        # Keep track of the rooms that we're going to display and need to fetch more
-        # info about
+        # Keep track of the rooms that we can display and need to fetch more info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
         if has_lists and sync_config.lists is not None:
             sync_room_map = await self.filter_rooms_relevant_for_sync(
@@ -622,6 +622,8 @@ class SlidingSyncHandler:
 
         # Filter out rooms that haven't received updates and we've sent down
         # previously.
+        # Keep track of the rooms that we're going to display and need to fetch more info about
+        relevant_rooms_to_send_map = relevant_room_map
         if from_token:
             rooms_should_send = set()
 
@@ -659,7 +661,7 @@ class SlidingSyncHandler:
                 relevant_room_map.keys(), from_token.stream_token.room_key
             )
             rooms_should_send.update(rooms_that_have_updates)
-            relevant_room_map = {
+            relevant_rooms_to_send_map = {
                 room_id: room_sync_config
                 for room_id, room_sync_config in relevant_room_map.items()
                 if room_id in rooms_should_send
@@ -671,7 +673,7 @@ class SlidingSyncHandler:
             room_sync_result = await self.get_room_sync_data(
                 sync_config=sync_config,
                 room_id=room_id,
-                room_sync_config=relevant_room_map[room_id],
+                room_sync_config=relevant_rooms_to_send_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
                     room_id
                 ],
@@ -683,13 +685,20 @@ class SlidingSyncHandler:
             if room_sync_result or not from_token:
                 rooms[room_id] = room_sync_result
 
-        if relevant_room_map:
+        if relevant_rooms_to_send_map:
             with start_active_span("sliding_sync.generate_room_entries"):
-                await concurrently_execute(handle_room, relevant_room_map, 10)
+                await concurrently_execute(handle_room, relevant_rooms_to_send_map, 10)
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
-            lists=lists,
+            actual_lists=lists,
+            # We're purposely using `relevant_room_map` instead of
+            # `relevant_rooms_to_send_map` here. This needs to be all room_ids we could
+            # send regardless of whether they have an event update or not. The
+            # extensions care about more than just normal events in the rooms (like
+            # account data, read receipts, typing indicators, to-device messages, etc).
+            actual_room_ids=set(relevant_room_map.keys()),
+            actual_room_response_map=rooms,
             from_token=from_token,
             to_token=to_token,
         )
@@ -698,7 +707,7 @@ class SlidingSyncHandler:
             connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
                 from_token=from_token,
-                sent_room_ids=relevant_room_map.keys(),
+                sent_room_ids=relevant_rooms_to_send_map.keys(),
                 # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
                 unsent_room_ids=[],
             )
@@ -1902,7 +1911,9 @@ class SlidingSyncHandler:
     async def get_extensions_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
@@ -1910,7 +1921,11 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
         """
@@ -1939,18 +1954,103 @@ class SlidingSyncHandler:
         if sync_config.extensions.account_data is not None:
             account_data_response = await self.get_account_data_extension_response(
                 sync_config=sync_config,
-                lists=lists,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
                 account_data_request=sync_config.extensions.account_data,
                 to_token=to_token,
                 from_token=from_token,
             )
 
+        receipts_response = None
+        if sync_config.extensions.receipts is not None:
+            receipts_response = await self.get_receipts_extension_response(
+                sync_config=sync_config,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
+                actual_room_response_map=actual_room_response_map,
+                receipts_request=sync_config.extensions.receipts,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
         return SlidingSyncResult.Extensions(
             to_device=to_device_response,
             e2ee=e2ee_response,
             account_data=account_data_response,
+            receipts=receipts_response,
         )
 
+    def find_relevant_room_ids_for_extension(
+        self,
+        requested_lists: Optional[List[str]],
+        requested_room_ids: Optional[List[str]],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+    ) -> Set[str]:
+        """
+        Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only
+        return results for rooms in the Sliding Sync response. This matches up the
+        requested rooms/lists with the actual lists/rooms in the Sliding Sync response.
+
+        {"lists": []}                    // Do not process any lists.
+        {"lists": ["rooms", "dms"]}      // Process only a subset of lists.
+        {"lists": ["*"]}                 // Process all lists defined in the Sliding Window API. (This is the default.)
+
+        {"rooms": []}                    // Do not process any specific rooms.
+        {"rooms": ["!a:b", "!c:d"]}      // Process only a subset of room subscriptions.
+        {"rooms": ["*"]}                 // Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+
+        Args:
+            requested_lists: The `lists` from the extension request.
+            requested_room_ids: The `rooms` from the extension request.
+            actual_lists: The actual lists from the Sliding Sync response.
+            actual_room_ids: The actual room subscriptions from the Sliding Sync request.
+        """
+
+        # We only want to include account data for rooms that are already in the sliding
+        # sync response AND that were requested in the account data request.
+        relevant_room_ids: Set[str] = set()
+
+        # See what rooms from the room subscriptions we should get account data for
+        if requested_room_ids is not None:
+            for room_id in requested_room_ids:
+                # A wildcard means we process all rooms from the room subscriptions
+                if room_id == "*":
+                    relevant_room_ids.update(actual_room_ids)
+                    break
+
+                if room_id in actual_room_ids:
+                    relevant_room_ids.add(room_id)
+
+        # See what rooms from the sliding window lists we should get account data for
+        if requested_lists is not None:
+            for list_key in requested_lists:
+                # Just some typing because we share the variable name in multiple places
+                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+                # A wildcard means we process rooms from all lists
+                if list_key == "*":
+                    for actual_list in actual_lists.values():
+                        # We only expect a single SYNC operation for any list
+                        assert len(actual_list.ops) == 1
+                        sync_op = actual_list.ops[0]
+                        assert sync_op.op == OperationType.SYNC
+
+                        relevant_room_ids.update(sync_op.room_ids)
+
+                    break
+
+                actual_list = actual_lists.get(list_key)
+                if actual_list is not None:
+                    # We only expect a single SYNC operation for any list
+                    assert len(actual_list.ops) == 1
+                    sync_op = actual_list.ops[0]
+                    assert sync_op.op == OperationType.SYNC
+
+                    relevant_room_ids.update(sync_op.room_ids)
+
+        return relevant_room_ids
+
     @trace
     async def get_to_device_extension_response(
         self,
@@ -2081,7 +2181,8 @@ class SlidingSyncHandler:
     async def get_account_data_extension_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
         account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
@@ -2090,7 +2191,9 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
             account_data_request: The account_data extension from the request
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
@@ -2103,6 +2206,7 @@ class SlidingSyncHandler:
 
         global_account_data_map: Mapping[str, JsonMapping] = {}
         if from_token is not None:
+            # TODO: This should take into account the `from_token` and `to_token`
             global_account_data_map = (
                 await self.store.get_updated_global_account_data_for_user(
                     user_id, from_token.stream_token.account_data_key
@@ -2114,76 +2218,40 @@ class SlidingSyncHandler:
             )
             if have_push_rules_changed:
                 global_account_data_map = dict(global_account_data_map)
+                # TODO: This should take into account the `from_token` and `to_token`
                 global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                     await self.push_rules_handler.push_rules_for_user(sync_config.user)
                 )
         else:
+            # TODO: This should take into account the `to_token`
             all_global_account_data = await self.store.get_global_account_data_for_user(
                 user_id
             )
 
             global_account_data_map = dict(all_global_account_data)
+            # TODO: This should take into account the  `to_token`
             global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                 await self.push_rules_handler.push_rules_for_user(sync_config.user)
             )
 
-        # We only want to include account data for rooms that are already in the sliding
-        # sync response AND that were requested in the account data request.
-        relevant_room_ids: Set[str] = set()
-
-        # See what rooms from the room subscriptions we should get account data for
-        if (
-            account_data_request.rooms is not None
-            and sync_config.room_subscriptions is not None
-        ):
-            actual_room_ids = sync_config.room_subscriptions.keys()
-
-            for room_id in account_data_request.rooms:
-                # A wildcard means we process all rooms from the room subscriptions
-                if room_id == "*":
-                    relevant_room_ids.update(sync_config.room_subscriptions.keys())
-                    break
-
-                if room_id in actual_room_ids:
-                    relevant_room_ids.add(room_id)
-
-        # See what rooms from the sliding window lists we should get account data for
-        if account_data_request.lists is not None:
-            for list_key in account_data_request.lists:
-                # Just some typing because we share the variable name in multiple places
-                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
-
-                # A wildcard means we process rooms from all lists
-                if list_key == "*":
-                    for actual_list in lists.values():
-                        # We only expect a single SYNC operation for any list
-                        assert len(actual_list.ops) == 1
-                        sync_op = actual_list.ops[0]
-                        assert sync_op.op == OperationType.SYNC
-
-                        relevant_room_ids.update(sync_op.room_ids)
-
-                    break
-
-                actual_list = lists.get(list_key)
-                if actual_list is not None:
-                    # We only expect a single SYNC operation for any list
-                    assert len(actual_list.ops) == 1
-                    sync_op = actual_list.ops[0]
-                    assert sync_op.op == OperationType.SYNC
-
-                    relevant_room_ids.update(sync_op.room_ids)
-
         # Fetch room account data
         account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=account_data_request.lists,
+            requested_room_ids=account_data_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
         if len(relevant_room_ids) > 0:
             if from_token is not None:
+                # TODO: This should take into account the `from_token` and `to_token`
                 account_data_by_room_map = (
                     await self.store.get_updated_room_account_data_for_user(
                         user_id, from_token.stream_token.account_data_key
                     )
                 )
             else:
+                # TODO: This should take into account the `to_token`
                 account_data_by_room_map = (
                     await self.store.get_room_account_data_for_user(user_id)
                 )
@@ -2200,6 +2268,86 @@ class SlidingSyncHandler:
             account_data_by_room_map=account_data_by_room_map,
         )
 
+    async def get_receipts_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+        actual_room_response_map: Dict[str, SlidingSyncResult.RoomResult],
+        receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
+        to_token: StreamToken,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
+        """Handle Receipts extension (MSC3960)
+
+        Args:
+            sync_config: Sync configuration
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            actual_room_response_map: A map of room ID to room results in the the
+                Sliding Sync response.
+            account_data_request: The account_data extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        # Skip if the extension is not enabled
+        if not receipts_request.enabled:
+            return None
+
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=receipts_request.lists,
+            requested_room_ids=receipts_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
+
+        room_id_to_receipt_map: Dict[str, JsonMapping] = {}
+        if len(relevant_room_ids) > 0:
+            receipt_source = self.event_sources.sources.receipt
+            receipts, _ = await receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=(
+                    from_token.stream_token.receipt_key
+                    if from_token
+                    else MultiWriterStreamToken(stream=0)
+                ),
+                to_key=to_token.receipt_key,
+                # This is a dummy value and isn't used in the function
+                limit=0,
+                room_ids=relevant_room_ids,
+                is_guest=False,
+            )
+
+            for receipt in receipts:
+                # These fields should exist for every receipt
+                room_id = receipt["room_id"]
+                type = receipt["type"]
+                content = receipt["content"]
+
+                room_result = actual_room_response_map.get(room_id)
+                if room_result is not None:
+                    if room_result.initial:
+                        # TODO: In the future, it would be good to fetch less receipts
+                        # out of the database in the first place but we would need to
+                        # add a new `event_id` index to `receipts_linearized`.
+                        relevant_event_ids = [
+                            event.event_id for event in room_result.timeline_events
+                        ]
+
+                        assert isinstance(content, dict)
+                        content = {
+                            event_id: content_value
+                            for event_id, content_value in content.items()
+                            if event_id in relevant_event_ids
+                        }
+
+                room_id_to_receipt_map[room_id] = {"type": type, "content": content}
+
+        return SlidingSyncResult.Extensions.ReceiptsExtension(
+            room_id_to_receipt_map=room_id_to_receipt_map,
+        )
+
 
 class HaveSentRoomFlag(Enum):
     """Flag for whether we have sent the room down a sliding sync connection.
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index ccfce6bd53..c607d08de5 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -1150,6 +1150,12 @@ class SlidingSyncRestServlet(RestServlet):
                 },
             }
 
+        if extensions.receipts is not None:
+            serialized_extensions["receipts"] = {
+                # Same as the the top-level `account_data.events` field in Sync v2.
+                "rooms": extensions.receipts.room_id_to_receipt_map,
+            }
+
         return serialized_extensions
 
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index f26cc0e903..2f7e92665c 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -152,7 +152,7 @@ class SlidingSyncResult:
     Attributes:
         next_pos: The next position token in the sliding window to request (next_batch).
         lists: Sliding window API. A map of list key to list results.
-        rooms: Room subscription API. A map of room ID to room subscription to room results.
+        rooms: Room subscription API. A map of room ID to room results.
         extensions: Extensions API. A map of extension key to extension results.
     """
 
@@ -361,12 +361,28 @@ class SlidingSyncResult:
                     self.global_account_data_map or self.account_data_by_room_map
                 )
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class ReceiptsExtension:
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
+            """
+
+            room_id_to_receipt_map: Mapping[str, JsonMapping]
+
+            def __bool__(self) -> bool:
+                return bool(self.room_id_to_receipt_map)
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
         def __bool__(self) -> bool:
-            return bool(self.to_device or self.e2ee or self.account_data)
+            return bool(
+                self.to_device or self.e2ee or self.account_data or self.receipts
+            )
 
     next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index dfe3b1e0f7..4e632e4492 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -342,9 +342,27 @@ class SlidingSyncBody(RequestBodyModel):
             # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
             rooms: Optional[List[StrictStr]] = ["*"]
 
+        class ReceiptsExtension(RequestBodyModel):
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                enabled
+                lists: List of list keys (from the Sliding Window API) to apply this
+                    extension to.
+                rooms: List of room IDs (from the Room Subscription API) to apply this
+                    extension to.
+            """
+
+            enabled: Optional[StrictBool] = False
+            # Process all lists defined in the Sliding Window API. (This is the default.)
+            lists: Optional[List[StrictStr]] = ["*"]
+            # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+            rooms: Optional[List[StrictStr]] = ["*"]
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
     conn_id: Optional[str]
 
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 5abf1041be..1184adde70 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -30,6 +30,7 @@ from twisted.test.proto_helpers import MemoryReactor
 import synapse.rest.admin
 from synapse.api.constants import (
     AccountDataTypes,
+    EduTypes,
     EventContentFields,
     EventTypes,
     HistoryVisibility,
@@ -1369,12 +1370,14 @@ class SlidingSyncTestCase(SlidingSyncBase):
         room.register_servlets,
         sync.register_servlets,
         devices.register_servlets,
+        receipts.register_servlets,
     ]
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
         self.event_sources = hs.get_event_sources()
         self.storage_controllers = hs.get_storage_controllers()
+        self.account_data_handler = hs.get_account_data_handler()
 
     def _assertRequiredStateIncludes(
         self,
@@ -4454,6 +4457,225 @@ class SlidingSyncTestCase(SlidingSyncBase):
         # `world_readable` but currently we don't support this.
         self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
 
+    # Any extensions that use `lists`/`rooms` should be tested here
+    @parameterized.expand([("account_data",), ("receipts",)])
+    def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None:
+        """
+        With various extensions, test out requesting different variations of
+        `lists`/`rooms`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create some rooms
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        room_id_to_human_name_map = {
+            room_id1: "room1",
+            room_id2: "room2",
+            room_id3: "room3",
+            room_id4: "room4",
+            room_id5: "room5",
+        }
+
+        for room_id in room_id_to_human_name_map.keys():
+            if extension_name == "account_data":
+                # Add some account data to each room
+                self.get_success(
+                    self.account_data_handler.add_account_data_to_room(
+                        user_id=user1_id,
+                        room_id=room_id,
+                        account_data_type="org.matrix.roorarraz",
+                        content={"roo": "rar"},
+                    )
+                )
+            elif extension_name == "receipts":
+                event_response = self.helper.send(
+                    room_id, body="new event", tok=user1_tok
+                )
+                # Read last event
+                channel = self.make_request(
+                    "POST",
+                    f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}",
+                    {},
+                    access_token=user1_tok,
+                )
+                self.assertEqual(channel.code, 200, channel.json_body)
+            else:
+                raise AssertionError(f"Unknown extension name: {extension_name}")
+
+        main_sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+        }
+
+        # Mix lists and rooms
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["foo-list", "non-existent-list"],
+                    "rooms": [room_id1, room_id2, "!non-existent-room"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Requested via `rooms` and a room subscription exists
+        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
+        # room3: ❌ Not requested
+        # room4: ✅ Shows up because requested via `lists` and list exists in the response
+        # room5: ✅ Shows up because requested via `lists` and list exists in the response
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try wildcards (this is the default)
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    # "lists": ["*"],
+                    # "rooms": ["*"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Empty list will return nothing
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ❌ Not requested
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            set(),
+            exact=True,
+        )
+
+        # Try wildcard and none
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["*"],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try requesting a room that is only in a list
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [room_id5],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ✅ Requested via `rooms` and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room5"},
+            exact=True,
+        )
+
     def test_rooms_required_state_incremental_sync_LIVE(self) -> None:
         """Test that we only get state updates in incremental sync for rooms
         we've already seen (LIVE).
@@ -5928,266 +6150,640 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
             exact=True,
         )
 
-    def test_room_account_data_relevant_rooms(self) -> None:
+    def test_wait_for_new_data(self) -> None:
         """
-        Test out different variations of `lists`/`rooms` we are requesting account data for.
+        Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+        (Only applies to incremental syncs with a `timeout` specified)
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
-        # Create a room and add some room account data
-        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id1,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
-        )
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
 
-        # Create another room with some room account data
-        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id2,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
-        )
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # Create another room with some room account data
-        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id3,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
+            await_result=False,
         )
-
-        # Create another room with some room account data
-        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Bump the global account data to trigger new results
         self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id4,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
+            self.account_data_handler.add_account_data_for_user(
+                user1_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
             )
         )
+        # Should respond before the 10 second timeout
+        channel.await_result(timeout_ms=3000)
+        self.assertEqual(channel.code, 200, channel.json_body)
 
-        # Create another room with some room account data
-        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id5,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
+        # We should see the global account data update
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in channel.json_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            {"org.matrix.foobarbaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            channel.json_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
         )
 
-        room_id_to_human_name_map = {
-            room_id1: "room1",
-            room_id2: "room2",
-            room_id3: "room3",
-            room_id4: "room4",
-            room_id5: "room5",
-        }
+    def test_wait_for_new_data_timeout(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive but
+        no data ever arrives so we timeout. We're also making sure that the default data
+        from the account_data extension doesn't trigger a false-positive for new data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
 
-        # Mix lists and rooms
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
-            "room_subscriptions": {
-                room_id1: {
-                    "required_state": [],
-                    "timeline_limit": 0,
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
                 }
             },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Wake-up `notifier.wait_for_events(...)` that will cause us test
+        # `SlidingSyncResult.__bool__` for new results.
+        self._bump_notifier_wait_for_events(
+            user1_id,
+            # We choose `StreamKeyType.PRESENCE` because we're testing for account data
+            # and don't want to contaminate the account data results using
+            # `StreamKeyType.ACCOUNT_DATA`.
+            wake_stream_key=StreamKeyType.PRESENCE,
+        )
+        # Block for a little bit more to ensure we don't see any new results.
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=4000)
+        # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+        # 5000 + 4000 + 1200 > 10000)
+        channel.await_result(timeout_ms=1200)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("global")
+        )
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("rooms")
+        )
+
+
+class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
+    """Tests for the receipts sliding sync extension"""
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        sync.register_servlets,
+        receipts.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+
+    def test_no_data_initial_sync(self) -> None:
+        """
+        Test that enabling the receipts extension works during an intitial sync,
+        even if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Make an initial Sliding Sync request with the receipts extension enabled
+        sync_body = {
+            "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": ["foo-list", "non-existent-list"],
-                    "rooms": [room_id1, room_id2, "!non-existent-room"],
                 }
             },
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ✅ Requested via `rooms` and a room subscription exists
-        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
-        # room3: ❌ Not requested
-        # room4: ✅ Shows up because requested via `lists` and list exists in the response
-        # room5: ✅ Shows up because requested via `lists` and list exists in the response
         self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
-            },
-            {"room1", "room4", "room5"},
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            set(),
             exact=True,
         )
 
-        # Try wildcards (this is the default)
+    def test_no_data_incremental_sync(self) -> None:
+        """
+        Test that enabling receipts extension works during an incremental sync, even
+        if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
+            "lists": {},
+            "extensions": {
+                "receipts": {
+                    "enabled": True,
+                }
             },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make an incremental Sliding Sync request with the receipts extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertIncludes(
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_receipts_initial_sync_with_timeline(self) -> None:
+        """
+        On initial sync, we only return receipts for events in a given room's timeline.
+
+        We also make sure that we only return receipts for rooms that we request and are
+        already being returned in the Sliding Sync response.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+
+        # Create a room
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user3_id, tok=user3_tok)
+        self.helper.join(room_id1, user4_id, tok=user4_tok)
+        room1_event_response1 = self.helper.send(
+            room_id1, body="new event1", tok=user2_tok
+        )
+        room1_event_response2 = self.helper.send(
+            room_id1, body="new event2", tok=user2_tok
+        )
+        # User1 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User2 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User3 reads the first event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
+            {},
+            access_token=user3_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User4 privately reads the last event (make sure this doesn't leak to the other users)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ_PRIVATE}/{room1_event_response2['event_id']}",
+            {},
+            access_token=user4_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Create another room
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        self.helper.join(room_id2, user3_id, tok=user3_tok)
+        self.helper.join(room_id2, user4_id, tok=user4_tok)
+        room2_event_response1 = self.helper.send(
+            room_id2, body="new event2", tok=user2_tok
+        )
+        # User1 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User2 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User4 privately reads the last event (make sure this doesn't leak to the other users)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{room2_event_response1['event_id']}",
+            {},
+            access_token=user4_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make an initial Sliding Sync request with the receipts extension enabled
+        sync_body = {
+            "lists": {},
             "room_subscriptions": {
                 room_id1: {
                     "required_state": [],
-                    "timeline_limit": 0,
+                    # On initial sync, we only have receipts for events in the timeline
+                    "timeline_limit": 1,
                 }
             },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    # "lists": ["*"],
-                    # "rooms": ["*"],
+                    "rooms": [room_id1, room_id2],
                 }
             },
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        # Only the latest event in the room is in the timelie because the `timeline_limit` is 1
         self.assertIncludes(
             {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
+                event["event_id"]
+                for event in response_body["rooms"][room_id1].get("timeline", [])
             },
-            {"room1", "room3", "room4", "room5"},
+            {room1_event_response2["event_id"]},
             exact=True,
+            message=str(response_body["rooms"][room_id1]),
         )
 
-        # Empty list will return nothing
+        # Even though we requested room2, we only expect room1 to show up because that's
+        # the only room in the Sliding Sync response (room2 is not one of our room
+        # subscriptions or in a sliding window list).
+        self.assertIncludes(
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We can see user1 and user2 read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response2["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user1_id, user2_id},
+            exact=True,
+        )
+        # User1 did not have a private read receipt and we shouldn't leak others'
+        # private read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response2["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
+            set(),
+            exact=True,
+        )
+
+        # We shouldn't see receipts for event2 since it wasn't in the timeline and this is an initial sync
+        self.assertIsNone(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"].get(
+                room1_event_response1["event_id"]
+            )
+        )
+
+    def test_receipts_incremental_sync(self) -> None:
+        """
+        On incremental sync, we return all receipts in the token range for a given room
+        but only for rooms that we request and are being returned in the Sliding Sync
+        response.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+
+        # Create room1
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user3_id, tok=user3_tok)
+        room1_event_response1 = self.helper.send(
+            room_id1, body="new event2", tok=user2_tok
+        )
+        # User2 reads the last event (before the `from_token`)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Create room2
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        room2_event_response1 = self.helper.send(
+            room_id2, body="new event2", tok=user2_tok
+        )
+        # User1 reads the last event (before the `from_token`)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{room2_event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Create room3
+        room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id3, user1_id, tok=user1_tok)
+        self.helper.join(room_id3, user3_id, tok=user3_tok)
+        room3_event_response1 = self.helper.send(
+            room_id3, body="new event", tok=user2_tok
+        )
+
+        # Create room4
+        room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id4, user1_id, tok=user1_tok)
+        self.helper.join(room_id4, user3_id, tok=user3_tok)
+        event_response4 = self.helper.send(room_id4, body="new event", tok=user2_tok)
+        # User1 reads the last event (before the `from_token`)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id4}/receipt/{ReceiptTypes.READ}/{event_response4['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
+            "lists": {},
+            "room_subscriptions": {
+                room_id1: {
                     "required_state": [],
                     "timeline_limit": 0,
                 },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
+                room_id3: {
                     "required_state": [],
                     "timeline_limit": 0,
                 },
-            },
-            "room_subscriptions": {
-                room_id1: {
+                room_id4: {
                     "required_state": [],
                     "timeline_limit": 0,
-                }
+                },
             },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": [],
-                    "rooms": [],
+                    "rooms": [room_id1, room_id2, room_id3, room_id4],
                 }
             },
         }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ❌ Not requested
-        # room4: ❌ Not requested
-        # room5: ❌ Not requested
+        # Add some more read receipts after the `from_token`
+        #
+        # User1 reads room1
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User1 privately reads room2
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{room2_event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User3 reads room3
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id3}/receipt/{ReceiptTypes.READ}/{room3_event_response1['event_id']}",
+            {},
+            access_token=user3_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # No activity for room4 after the `from_token`
+
+        # Make an incremental Sliding Sync request with the receipts extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # Even though we requested room2, we only expect rooms to show up if they are
+        # already in the Sliding Sync response. room4 doesn't show up because there is
+        # no activity after the `from_token`.
         self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
-            },
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            {room_id1, room_id3},
+            exact=True,
+        )
+
+        # Check room1:
+        #
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We only see that user1 has read something in room1 since the `from_token`
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response1["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user1_id},
+            exact=True,
+        )
+        # User1 did not send a private read receipt in this room and we shouldn't leak
+        # others' private read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response1["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
             set(),
             exact=True,
         )
+        # No events in the timeline since they were sent before the `from_token`
+        self.assertNotIn(room_id1, response_body["rooms"])
+
+        # Check room3:
+        #
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id3]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We only see that user3 has read something in room1 since the `from_token`
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][
+                room3_event_response1["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user3_id},
+            exact=True,
+        )
+        # User1 did not send a private read receipt in this room and we shouldn't leak
+        # others' private read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][
+                room3_event_response1["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
+            set(),
+            exact=True,
+        )
+        # No events in the timeline since they were sent before the `from_token`
+        self.assertNotIn(room_id3, response_body["rooms"])
+
+    def test_receipts_incremental_sync_all_live_receipts(self) -> None:
+        """
+        On incremental sync, we return all receipts in the token range for a given room
+        even if they are not in the timeline.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Create room1
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
 
-        # Try wildcard and none
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
+            "lists": {},
             "room_subscriptions": {
                 room_id1: {
                     "required_state": [],
-                    "timeline_limit": 0,
-                }
+                    # The timeline will only include event2
+                    "timeline_limit": 1,
+                },
             },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": ["*"],
-                    "rooms": [],
+                    "rooms": [room_id1],
                 }
             },
         }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        room1_event_response1 = self.helper.send(
+            room_id1, body="new event1", tok=user2_tok
+        )
+        room1_event_response2 = self.helper.send(
+            room_id1, body="new event2", tok=user2_tok
+        )
+
+        # User1 reads event1
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User2 reads event2
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{room1_event_response2['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make an incremental Sliding Sync request with the receipts extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # We should see room1 because it has receipts in the token range
+        self.assertIncludes(
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We should see all receipts in the token range regardless of whether the events
+        # are in the timeline
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response1["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user1_id},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                room1_event_response2["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user2_id},
+            exact=True,
+        )
+        # Only the latest event in the timeline because the `timeline_limit` is 1
         self.assertIncludes(
             {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
+                event["event_id"]
+                for event in response_body["rooms"][room_id1].get("timeline", [])
             },
-            {"room3", "room4", "room5"},
+            {room1_event_response2["event_id"]},
             exact=True,
+            message=str(response_body["rooms"][room_id1]),
         )
 
     def test_wait_for_new_data(self) -> None:
@@ -6203,18 +6799,26 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
 
         room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id, user1_id, tok=user1_tok)
+        event_response = self.helper.send(room_id, body="new event", tok=user2_tok)
 
         sync_body = {
             "lists": {},
+            "room_subscriptions": {
+                room_id: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
+                    "rooms": [room_id],
                 }
             },
         }
         _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # Make an incremental Sliding Sync request with the account_data extension enabled
+        # Make an incremental Sliding Sync request with the receipts extension enabled
         channel = self.make_request(
             "POST",
             self.sync_endpoint + f"?timeout=10000&pos={from_token}",
@@ -6225,31 +6829,43 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
         with self.assertRaises(TimedOutException):
             channel.await_result(timeout_ms=5000)
-        # Bump the global account data to trigger new results
-        self.get_success(
-            self.account_data_handler.add_account_data_for_user(
-                user1_id,
-                "org.matrix.foobarbaz",
-                {"foo": "bar"},
-            )
+        # Bump the receipts to trigger new results
+        receipt_channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}",
+            {},
+            access_token=user2_tok,
         )
+        self.assertEqual(receipt_channel.code, 200, receipt_channel.json_body)
         # Should respond before the 10 second timeout
         channel.await_result(timeout_ms=3000)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        # We should see the global account data update
+        # We should see the new receipt
         self.assertIncludes(
-            {
-                global_event["type"]
-                for global_event in channel.json_body["extensions"]["account_data"].get(
-                    "global"
-                )
-            },
-            {"org.matrix.foobarbaz"},
+            channel.json_body.get("extensions", {})
+            .get("receipts", {})
+            .get("rooms", {})
+            .keys(),
+            {room_id},
             exact=True,
+            message=str(channel.json_body),
         )
         self.assertIncludes(
-            channel.json_body["extensions"]["account_data"].get("rooms").keys(),
+            channel.json_body["extensions"]["receipts"]["rooms"][room_id]["content"][
+                event_response["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user2_id},
+            exact=True,
+        )
+        # User1 did not send a private read receipt in this room and we shouldn't leak
+        # others' private read receipts
+        self.assertIncludes(
+            channel.json_body["extensions"]["receipts"]["rooms"][room_id]["content"][
+                event_response["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
             set(),
             exact=True,
         )
@@ -6258,7 +6874,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         """
         Test to make sure that the Sliding Sync request waits for new data to arrive but
         no data ever arrives so we timeout. We're also making sure that the default data
-        from the account_data extension doesn't trigger a false-positive for new data.
+        from the receipts extension doesn't trigger a false-positive for new data.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -6266,7 +6882,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         sync_body = {
             "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
                 }
             },
@@ -6287,11 +6903,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         # Wake-up `notifier.wait_for_events(...)` that will cause us test
         # `SlidingSyncResult.__bool__` for new results.
         self._bump_notifier_wait_for_events(
-            user1_id,
-            # We choose `StreamKeyType.PRESENCE` because we're testing for account data
-            # and don't want to contaminate the account data results using
-            # `StreamKeyType.ACCOUNT_DATA`.
-            wake_stream_key=StreamKeyType.PRESENCE,
+            user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
         )
         # Block for a little bit more to ensure we don't see any new results.
         with self.assertRaises(TimedOutException):
@@ -6301,9 +6913,8 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         channel.await_result(timeout_ms=1200)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        self.assertIsNotNone(
-            channel.json_body["extensions"]["account_data"].get("global")
-        )
-        self.assertIsNotNone(
-            channel.json_body["extensions"]["account_data"].get("rooms")
+        self.assertIncludes(
+            channel.json_body["extensions"]["receipts"].get("rooms").keys(),
+            set(),
+            exact=True,
         )