summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/17489.feature1
-rw-r--r--synapse/handlers/receipts.py4
-rw-r--r--synapse/handlers/sliding_sync.py329
-rw-r--r--synapse/rest/client/sync.py9
-rw-r--r--synapse/storage/databases/main/cache.py3
-rw-r--r--synapse/storage/databases/main/events.py24
-rw-r--r--synapse/storage/databases/main/stream.py145
-rw-r--r--synapse/storage/schema/main/delta/85/07_sliding_sync.sql24
-rw-r--r--synapse/types/handlers/__init__.py18
-rw-r--r--synapse/types/rest/client/__init__.py18
-rw-r--r--tests/rest/client/test_sync.py888
-rw-r--r--tests/storage/test_event_chain.py1
12 files changed, 1157 insertions, 307 deletions
diff --git a/changelog.d/17489.feature b/changelog.d/17489.feature
new file mode 100644
index 0000000000..5ace1e675e
--- /dev/null
+++ b/changelog.d/17489.feature
@@ -0,0 +1 @@
+Add receipts extension support to experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint.
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 8674a8fcdd..d04c76be2a 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -286,8 +286,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
+        to_key: Optional[MultiWriterStreamToken] = None,
     ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
-        to_key = self.get_current_key()
+        if to_key is None:
+            to_key = self.get_current_key()
 
         if from_key == to_key:
             return [], to_key
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 73414dbf69..ebb15a8451 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -49,6 +49,7 @@ from synapse.types import (
     DeviceListUpdates,
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
@@ -505,9 +506,14 @@ class SlidingSyncHandler:
             for list_key, list_config in sync_config.lists.items():
                 # Apply filters
                 filtered_sync_room_map = sync_room_map
-                if list_config.filters is not None:
+
+                if list_config.filters:
+
                     filtered_sync_room_map = await self.filter_rooms(
-                        sync_config.user, sync_room_map, list_config.filters, to_token
+                        sync_config.user,
+                        filtered_sync_room_map,
+                        list_config.filters,
+                        to_token,
                     )
 
                 # Sort the list
@@ -619,6 +625,7 @@ class SlidingSyncHandler:
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
+        all_rooms = set(relevant_room_map)
 
         # Filter out rooms that haven't received updates and we've sent down
         # previously.
@@ -689,7 +696,8 @@ class SlidingSyncHandler:
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
-            lists=lists,
+            actual_lists=lists,
+            actual_room_ids=all_rooms,
             from_token=from_token,
             to_token=to_token,
         )
@@ -697,6 +705,7 @@ class SlidingSyncHandler:
         if has_lists or has_room_subscriptions:
             connection_position = await self.connection_store.record_rooms(
                 sync_config=sync_config,
+                relevant_room_map=relevant_room_map,
                 from_token=from_token,
                 sent_room_ids=relevant_room_map.keys(),
                 # TODO: We need to calculate which rooms have had updates since the `from_token` but were not included in the `sent_room_ids`
@@ -1496,6 +1505,12 @@ class SlidingSyncHandler:
             else:
                 assert_never(room_status.status)
 
+            if (
+                room_status.timeline_limit is not None
+                and room_status.timeline_limit < room_sync_config.timeline_limit
+            ):
+                from_bound = None
+
             log_kv({"sliding_sync.room_status": room_status})
 
         log_kv({"sliding_sync.from_bound": from_bound, "sliding_sync.initial": initial})
@@ -1535,6 +1550,10 @@ class SlidingSyncHandler:
                     room_membership_for_user_at_to_token.event_pos.to_room_stream_token()
                 )
 
+            fiddled_timeline_limit = room_sync_config.timeline_limit
+            # if to_bound:
+            #     fiddled_timeline_limit = max(fiddled_timeline_limit, 10)
+
             timeline_events, new_room_key = await self.store.paginate_room_events(
                 room_id=room_id,
                 # The bounds are reversed so we can paginate backwards
@@ -1545,7 +1564,7 @@ class SlidingSyncHandler:
                 direction=Direction.BACKWARDS,
                 # We add one so we can determine if there are enough events to saturate
                 # the limit or not (see `limited`)
-                limit=room_sync_config.timeline_limit + 1,
+                limit=fiddled_timeline_limit + 1,
                 event_filter=None,
             )
 
@@ -1556,11 +1575,11 @@ class SlidingSyncHandler:
             # Determine our `limited` status based on the timeline. We do this before
             # filtering the events so we can accurately determine if there is more to
             # paginate even if we filter out some/all events.
-            if len(timeline_events) > room_sync_config.timeline_limit:
+            if len(timeline_events) > fiddled_timeline_limit:
                 limited = True
                 # Get rid of that extra "+ 1" event because we only used it to determine
                 # if we hit the limit or not
-                timeline_events = timeline_events[-room_sync_config.timeline_limit :]
+                timeline_events = timeline_events[-fiddled_timeline_limit:]
                 assert timeline_events[0].internal_metadata.stream_ordering
                 new_room_key = RoomStreamToken(
                     stream=timeline_events[0].internal_metadata.stream_ordering - 1
@@ -1852,24 +1871,37 @@ class SlidingSyncHandler:
                 )
 
         # Figure out the last bump event in the room
-        last_bump_event_result = (
-            await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+        bump_stamp = None
+        if timeline_events:
+            for e in reversed(timeline_events):
+                assert e.internal_metadata.stream_ordering is not None
+                if (
+                    e.type in DEFAULT_BUMP_EVENT_TYPES
+                    and e.internal_metadata.stream_ordering > 0
+                ):
+                    bump_stamp = e.internal_metadata.stream_ordering
+                    break
+
+        if bump_stamp is None:
+            # By default, just choose the membership event position
+            bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
+
+            last_bump_event_result = (
+                await self.store.get_last_event_pos_in_room_before_stream_ordering(
+                    room_id, to_token.room_key, event_types=DEFAULT_BUMP_EVENT_TYPES
+                )
             )
-        )
 
-        # By default, just choose the membership event position
-        bump_stamp = room_membership_for_user_at_to_token.event_pos.stream
-        # But if we found a bump event, use that instead
-        if last_bump_event_result is not None:
-            _, new_bump_event_pos = last_bump_event_result
+            # But if we found a bump event, use that instead
+            if last_bump_event_result is not None:
+                _, new_bump_event_pos = last_bump_event_result
 
-            # If we've just joined a remote room, then the last bump event may
-            # have been backfilled (and so have a negative stream ordering).
-            # These negative stream orderings can't sensibly be compared, so
-            # instead we use the membership event position.
-            if new_bump_event_pos.stream > 0:
-                bump_stamp = new_bump_event_pos.stream
+                # If we've just joined a remote room, then the last bump event may
+                # have been backfilled (and so have a negative stream ordering).
+                # These negative stream orderings can't sensibly be compared, so
+                # instead we use the membership event position.
+                if new_bump_event_pos.stream > 0:
+                    bump_stamp = new_bump_event_pos.stream
 
         return SlidingSyncResult.RoomResult(
             name=room_name,
@@ -1902,7 +1934,8 @@ class SlidingSyncHandler:
     async def get_extensions_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
@@ -1910,7 +1943,9 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
         """
@@ -1939,18 +1974,102 @@ class SlidingSyncHandler:
         if sync_config.extensions.account_data is not None:
             account_data_response = await self.get_account_data_extension_response(
                 sync_config=sync_config,
-                lists=lists,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
                 account_data_request=sync_config.extensions.account_data,
                 to_token=to_token,
                 from_token=from_token,
             )
 
+        receipts_response = None
+        if sync_config.extensions.receipts is not None:
+            receipts_response = await self.get_receipts_extension_response(
+                sync_config=sync_config,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
+                receipts_request=sync_config.extensions.receipts,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
         return SlidingSyncResult.Extensions(
             to_device=to_device_response,
             e2ee=e2ee_response,
             account_data=account_data_response,
+            receipts=receipts_response,
         )
 
+    def find_relevant_room_ids_for_extension(
+        self,
+        requested_lists: Optional[List[str]],
+        requested_room_ids: Optional[List[str]],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+    ) -> Set[str]:
+        """
+        Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only
+        return results for rooms in the Sliding Sync response. This matches up the
+        requested rooms/lists with the actual lists/rooms in the Sliding Sync response.
+
+        {"lists": []}                    // Do not process any lists.
+        {"lists": ["rooms", "dms"]}      // Process only a subset of lists.
+        {"lists": ["*"]}                 // Process all lists defined in the Sliding Window API. (This is the default.)
+
+        {"rooms": []}                    // Do not process any specific rooms.
+        {"rooms": ["!a:b", "!c:d"]}      // Process only a subset of room subscriptions.
+        {"rooms": ["*"]}                 // Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+
+        Args:
+            requested_lists: The `lists` from the extension request.
+            requested_room_ids: The `rooms` from the extension request.
+            actual_lists: The actual lists from the Sliding Sync response.
+            actual_room_ids: The actual room subscriptions from the Sliding Sync request.
+        """
+
+        # We only want to include account data for rooms that are already in the sliding
+        # sync response AND that were requested in the account data request.
+        relevant_room_ids: Set[str] = set()
+
+        # See what rooms from the room subscriptions we should get account data for
+        if requested_room_ids is not None:
+            for room_id in requested_room_ids:
+                # A wildcard means we process all rooms from the room subscriptions
+                if room_id == "*":
+                    relevant_room_ids.update(actual_room_ids)
+                    break
+
+                if room_id in actual_room_ids:
+                    relevant_room_ids.add(room_id)
+
+        # See what rooms from the sliding window lists we should get account data for
+        if requested_lists is not None:
+            for list_key in requested_lists:
+                # Just some typing because we share the variable name in multiple places
+                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+                # A wildcard means we process rooms from all lists
+                if list_key == "*":
+                    for actual_list in actual_lists.values():
+                        # We only expect a single SYNC operation for any list
+                        assert len(actual_list.ops) == 1
+                        sync_op = actual_list.ops[0]
+                        assert sync_op.op == OperationType.SYNC
+
+                        relevant_room_ids.update(sync_op.room_ids)
+
+                    break
+
+                actual_list = actual_lists.get(list_key)
+                if actual_list is not None:
+                    # We only expect a single SYNC operation for any list
+                    assert len(actual_list.ops) == 1
+                    sync_op = actual_list.ops[0]
+                    assert sync_op.op == OperationType.SYNC
+
+                    relevant_room_ids.update(sync_op.room_ids)
+
+        return relevant_room_ids
+
     @trace
     async def get_to_device_extension_response(
         self,
@@ -2006,12 +2125,13 @@ class SlidingSyncHandler:
                 up_to_stream_id=since_stream_id,
             )
 
-            logger.debug(
-                "Deleted %d to-device messages up to %d for %s",
-                deleted,
-                since_stream_id,
-                user_id,
-            )
+            if deleted:
+                logger.debug(
+                    "Deleted %d to-device messages up to %d for %s",
+                    deleted,
+                    since_stream_id,
+                    user_id,
+                )
 
         messages, stream_id = await self.store.get_messages_for_device(
             user_id=user_id,
@@ -2081,7 +2201,8 @@ class SlidingSyncHandler:
     async def get_account_data_extension_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
         account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
@@ -2090,7 +2211,9 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
             account_data_request: The account_data extension from the request
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
@@ -2103,6 +2226,7 @@ class SlidingSyncHandler:
 
         global_account_data_map: Mapping[str, JsonMapping] = {}
         if from_token is not None:
+            # TODO: This should take into account the `from_token` and `to_token`
             global_account_data_map = (
                 await self.store.get_updated_global_account_data_for_user(
                     user_id, from_token.stream_token.account_data_key
@@ -2114,76 +2238,40 @@ class SlidingSyncHandler:
             )
             if have_push_rules_changed:
                 global_account_data_map = dict(global_account_data_map)
+                # TODO: This should take into account the `from_token` and `to_token`
                 global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                     await self.push_rules_handler.push_rules_for_user(sync_config.user)
                 )
         else:
+            # TODO: This should take into account the `to_token`
             all_global_account_data = await self.store.get_global_account_data_for_user(
                 user_id
             )
 
             global_account_data_map = dict(all_global_account_data)
+            # TODO: This should take into account the  `to_token`
             global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                 await self.push_rules_handler.push_rules_for_user(sync_config.user)
             )
 
-        # We only want to include account data for rooms that are already in the sliding
-        # sync response AND that were requested in the account data request.
-        relevant_room_ids: Set[str] = set()
-
-        # See what rooms from the room subscriptions we should get account data for
-        if (
-            account_data_request.rooms is not None
-            and sync_config.room_subscriptions is not None
-        ):
-            actual_room_ids = sync_config.room_subscriptions.keys()
-
-            for room_id in account_data_request.rooms:
-                # A wildcard means we process all rooms from the room subscriptions
-                if room_id == "*":
-                    relevant_room_ids.update(sync_config.room_subscriptions.keys())
-                    break
-
-                if room_id in actual_room_ids:
-                    relevant_room_ids.add(room_id)
-
-        # See what rooms from the sliding window lists we should get account data for
-        if account_data_request.lists is not None:
-            for list_key in account_data_request.lists:
-                # Just some typing because we share the variable name in multiple places
-                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
-
-                # A wildcard means we process rooms from all lists
-                if list_key == "*":
-                    for actual_list in lists.values():
-                        # We only expect a single SYNC operation for any list
-                        assert len(actual_list.ops) == 1
-                        sync_op = actual_list.ops[0]
-                        assert sync_op.op == OperationType.SYNC
-
-                        relevant_room_ids.update(sync_op.room_ids)
-
-                    break
-
-                actual_list = lists.get(list_key)
-                if actual_list is not None:
-                    # We only expect a single SYNC operation for any list
-                    assert len(actual_list.ops) == 1
-                    sync_op = actual_list.ops[0]
-                    assert sync_op.op == OperationType.SYNC
-
-                    relevant_room_ids.update(sync_op.room_ids)
-
         # Fetch room account data
         account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=account_data_request.lists,
+            requested_room_ids=account_data_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
         if len(relevant_room_ids) > 0:
             if from_token is not None:
+                # TODO: This should take into account the `from_token` and `to_token`
                 account_data_by_room_map = (
                     await self.store.get_updated_room_account_data_for_user(
                         user_id, from_token.stream_token.account_data_key
                     )
                 )
             else:
+                # TODO: This should take into account the `to_token`
                 account_data_by_room_map = (
                     await self.store.get_room_account_data_for_user(user_id)
                 )
@@ -2200,6 +2288,66 @@ class SlidingSyncHandler:
             account_data_by_room_map=account_data_by_room_map,
         )
 
+    @trace
+    async def get_receipts_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+        receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
+        to_token: StreamToken,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
+        """Handle Receipts extension (MSC3960)
+
+        Args:
+            sync_config: Sync configuration
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            account_data_request: The account_data extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        # Skip if the extension is not enabled
+        if not receipts_request.enabled:
+            return None
+
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=receipts_request.lists,
+            requested_room_ids=receipts_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
+
+        room_id_to_receipt_map: Dict[str, JsonMapping] = {}
+        if len(relevant_room_ids) > 0:
+            receipt_source = self.event_sources.sources.receipt
+            receipts, _ = await receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=(
+                    from_token.stream_token.receipt_key
+                    if from_token
+                    else MultiWriterStreamToken(stream=0)
+                ),
+                to_key=to_token.receipt_key,
+                # This is a dummy value and isn't used in the function
+                limit=0,
+                room_ids=relevant_room_ids,
+                is_guest=False,
+            )
+
+            for receipt in receipts:
+                # These fields should exist for every receipt
+                room_id = receipt["room_id"]
+                type = receipt["type"]
+                content = receipt["content"]
+                room_id_to_receipt_map[room_id] = {"type": type, "content": content}
+
+        return SlidingSyncResult.Extensions.ReceiptsExtension(
+            room_id_to_receipt_map=room_id_to_receipt_map,
+        )
+
 
 class HaveSentRoomFlag(Enum):
     """Flag for whether we have sent the room down a sliding sync connection.
@@ -2236,15 +2384,16 @@ class HaveSentRoom:
 
     status: HaveSentRoomFlag
     last_token: Optional[RoomStreamToken]
+    timeline_limit: Optional[int]
 
     @staticmethod
-    def previously(last_token: RoomStreamToken) -> "HaveSentRoom":
+    def previously(last_token: RoomStreamToken, timeline_limit: int) -> "HaveSentRoom":
         """Constructor for `PREVIOUSLY` flag."""
-        return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token)
+        return HaveSentRoom(HaveSentRoomFlag.PREVIOUSLY, last_token, timeline_limit)
 
 
-HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None)
-HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None)
+HAVE_SENT_ROOM_NEVER = HaveSentRoom(HaveSentRoomFlag.NEVER, None, None)
+HAVE_SENT_ROOM_LIVE = HaveSentRoom(HaveSentRoomFlag.LIVE, None, None)
 
 
 @attr.s(auto_attribs=True)
@@ -2299,6 +2448,7 @@ class SlidingSyncConnectionStore:
     async def record_rooms(
         self,
         sync_config: SlidingSyncConfig,
+        relevant_room_map: Dict[str, RoomSyncConfig],
         from_token: Optional[SlidingSyncStreamToken],
         *,
         sent_room_ids: StrCollection,
@@ -2353,17 +2503,20 @@ class SlidingSyncConnectionStore:
         #     sent anything down this time either so we leave it as NEVER.
 
         # Work out the new state for unsent rooms that were `LIVE`.
-        if from_token:
-            new_unsent_state = HaveSentRoom.previously(from_token.stream_token.room_key)
-        else:
-            new_unsent_state = HAVE_SENT_ROOM_NEVER
 
         for room_id in unsent_room_ids:
             prev_state = new_room_statuses.get(room_id)
             if prev_state is not None and prev_state.status == HaveSentRoomFlag.LIVE:
-                new_room_statuses[room_id] = new_unsent_state
                 have_updated = True
 
+                if from_token:
+                    new_room_statuses[room_id] = HaveSentRoom.previously(
+                        from_token.stream_token.room_key,
+                        relevant_room_map[room_id].timeline_limit,
+                    )
+                else:
+                    new_room_statuses[room_id] = HAVE_SENT_ROOM_NEVER
+
         if not have_updated:
             return prev_connection_token
 
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index ccfce6bd53..9cd39a3df9 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -941,6 +941,7 @@ class SlidingSyncRestServlet(RestServlet):
             logger.info("Client has disconnected; not serializing response.")
             return 200, {}
 
+        # logger.info("Sliding sync response: %r", sliding_sync_results)
         response_content = await self.encode_response(requester, sliding_sync_results)
 
         return 200, response_content
@@ -1001,7 +1002,7 @@ class SlidingSyncRestServlet(RestServlet):
         serialized_rooms: Dict[str, JsonDict] = {}
         for room_id, room_result in rooms.items():
             serialized_rooms[room_id] = {
-                "bump_stamp": room_result.bump_stamp,
+                "bump_stamp": abs(room_result.bump_stamp),
                 "joined_count": room_result.joined_count,
                 "invited_count": room_result.invited_count,
                 "notification_count": room_result.notification_count,
@@ -1150,6 +1151,12 @@ class SlidingSyncRestServlet(RestServlet):
                 },
             }
 
+        if extensions.receipts is not None:
+            serialized_extensions["receipts"] = {
+                # Same as the the top-level `account_data.events` field in Sync v2.
+                "rooms": extensions.receipts.room_id_to_receipt_map,
+            }
+
         return serialized_extensions
 
 
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 26b8e1a172..8c2c0c5ab0 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -309,6 +309,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
 
         if not backfilled:
             self._events_stream_cache.entity_has_changed(room_id, stream_ordering)  # type: ignore[attr-defined]
+            self._attempt_to_invalidate_cache(
+                "get_max_stream_ordering_in_room", (room_id,)
+            )
 
         if redacts:
             self._invalidate_local_get_event_cache(redacts)  # type: ignore[attr-defined]
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index 1f7acdb859..0c7c2f9306 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -551,7 +551,7 @@ class PersistEventsStore:
         # From this point onwards the events are only events that we haven't
         # seen before.
 
-        self._store_event_txn(txn, events_and_contexts=events_and_contexts)
+        self._store_event_txn(txn, room_id, events_and_contexts=events_and_contexts)
 
         if new_forward_extremities:
             self._update_forward_extremities_txn(
@@ -1555,6 +1555,7 @@ class PersistEventsStore:
     def _store_event_txn(
         self,
         txn: LoggingTransaction,
+        room_id: str,
         events_and_contexts: Collection[Tuple[EventBase, EventContext]],
     ) -> None:
         """Insert new events into the event, event_json, redaction and
@@ -1629,6 +1630,27 @@ class PersistEventsStore:
             ],
         )
 
+        # Update the `sliding_sync_room_metadata` with the latest
+        # (non-backfilled, ie positive) stream ordering.
+        #
+        # We know this list is sorted and non-empty, so we just take the last
+        # one event.
+        max_stream_ordering: int
+        for e, _ in events_and_contexts:
+            assert e.internal_metadata.stream_ordering is not None
+            max_stream_ordering = e.internal_metadata.stream_ordering
+
+        if max_stream_ordering > 0:
+            self.db_pool.simple_upsert_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                keyvalues={"room_id": room_id},
+                values={
+                    "instance_name": self._instance_name,
+                    "last_stream_ordering": max_stream_ordering,
+                },
+            )
+
         # If we're persisting an unredacted event we go and ensure
         # that we mark any redactions that reference this event as
         # requiring censoring.
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index 4207e73c7f..430c837828 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -50,6 +50,7 @@ from typing import (
     Dict,
     Iterable,
     List,
+    Mapping,
     Optional,
     Set,
     Tuple,
@@ -78,8 +79,13 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
-from synapse.util.caches.descriptors import cached
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StrCollection,
+)
+from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
 from synapse.util.iterutils import batch_iter
@@ -611,6 +617,10 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
         self._stream_order_on_start = self.get_room_max_stream_ordering()
         self._min_stream_order_on_start = self.get_room_min_stream_ordering()
 
+        database.updates.register_background_update_handler(
+            "sliding_sync_room_metadata", self._sliding_sync_room_metadata_bg_update
+        )
+
     def get_room_max_stream_ordering(self) -> int:
         """Get the stream_ordering of regular events that we have committed up to
 
@@ -1186,6 +1196,52 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return None
 
+    @cachedList(
+        cached_method_name="get_max_stream_ordering_in_room",
+        list_name="room_ids",
+    )
+    async def get_max_stream_ordering_in_rooms(
+        self, room_ids: StrCollection
+    ) -> Mapping[str, Optional[PersistedEventPosition]]:
+        """Get the positions for the latest event in a room.
+
+        A batched version of `get_max_stream_ordering_in_room`.
+        """
+        rows = await self.db_pool.simple_select_many_batch(
+            table="sliding_sync_room_metadata",
+            column="room_id",
+            iterable=room_ids,
+            retcols=("room_id", "instance_name", "last_stream_ordering"),
+            desc="get_max_stream_ordering_in_rooms",
+        )
+
+        return {
+            room_id: PersistedEventPosition(instance_name, stream)
+            for room_id, instance_name, stream in rows
+        }
+
+    @cached(max_entries=10000)
+    async def get_max_stream_ordering_in_room(
+        self,
+        room_id: str,
+    ) -> Optional[PersistedEventPosition]:
+        """Get the position for the latest event in a room.
+
+        Note: this may be after the current token for the room stream on this
+        process (e.g. due to replication lag)
+        """
+        row = await self.db_pool.simple_select_one(
+            table="sliding_sync_room_metadata",
+            retcols=("instance_name", "last_stream_ordering"),
+            keyvalues={"room_id": room_id},
+            allow_none=True,
+            desc="get_max_stream_ordering_in_room",
+        )
+        if not row:
+            return None
+
+        return PersistedEventPosition(instance_name=row[0], stream=row[1])
+
     async def get_last_event_pos_in_room_before_stream_ordering(
         self,
         room_id: str,
@@ -2105,6 +2161,91 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return None
 
+    async def _sliding_sync_room_metadata_bg_update(
+        self, progress: JsonDict, batch_size: int
+    ) -> int:
+        """Background update to fill out 'sliding_sync_room_metadata' table"""
+        previous_room = progress.get("previous_room", "")
+
+        def _sliding_sync_room_metadata_bg_update_txn(txn: LoggingTransaction) -> int:
+            # Both these queries are just getting the most recent
+            # instance_name/stream ordering for the next N rooms.
+            if isinstance(self.database_engine, PostgresEngine):
+                sql = """
+                    SELECT room_id, instance_name, stream_ordering FROM rooms AS r,
+                    LATERAL (
+                        SELECT instance_name, stream_ordering
+                        FROM events WHERE events.room_id = r.room_id
+                        ORDER BY stream_ordering DESC
+                        LIMIT 1
+                    ) e
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+            else:
+                sql = """
+                    SELECT
+                        room_id,
+                        (
+                            SELECT instance_name
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        ),
+                        (
+                            SELECT stream_ordering
+                            FROM events WHERE events.room_id = r.room_id
+                            ORDER BY stream_ordering DESC
+                            LIMIT 1
+                        )
+                    FROM rooms AS r
+                    WHERE r.room_id > ?
+                    ORDER BY r.room_id ASC
+                    LIMIT ?
+                """
+
+            txn.execute(sql, (previous_room, batch_size))
+            rows = txn.fetchall()
+            if not rows:
+                return 0
+
+            self.db_pool.simple_upsert_many_txn(
+                txn,
+                table="sliding_sync_room_metadata",
+                key_names=("room_id",),
+                key_values=[(room_id,) for room_id, _, _ in rows],
+                value_names=(
+                    "instance_name",
+                    "last_stream_ordering",
+                ),
+                value_values=[
+                    (
+                        instance_name or "master",
+                        stream,
+                    )
+                    for _, instance_name, stream in rows
+                ],
+            )
+
+            self.db_pool.updates._background_update_progress_txn(
+                txn, "sliding_sync_room_metadata", {"previous_room": rows[-1][0]}
+            )
+
+            return len(rows)
+
+        rows = await self.db_pool.runInteraction(
+            "_sliding_sync_room_metadata_bg_update",
+            _sliding_sync_room_metadata_bg_update_txn,
+        )
+
+        if rows == 0:
+            await self.db_pool.updates._end_background_update(
+                "sliding_sync_room_metadata"
+            )
+
+        return rows
+
     def get_rooms_that_might_have_updates(
         self, room_ids: StrCollection, from_token: RoomStreamToken
     ) -> StrCollection:
diff --git a/synapse/storage/schema/main/delta/85/07_sliding_sync.sql b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
new file mode 100644
index 0000000000..e8bc33ff40
--- /dev/null
+++ b/synapse/storage/schema/main/delta/85/07_sliding_sync.sql
@@ -0,0 +1,24 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+-- A table that maps from room ID to metadata useful for sliding sync.
+CREATE TABLE sliding_sync_room_metadata (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    -- The instance_name / stream ordering of the last event in the room.
+    instance_name TEXT NOT NULL,
+    last_stream_ordering BIGINT NOT NULL
+);
+
+INSERT INTO background_updates (ordering, update_name, progress_json) VALUES
+    (8507, 'sliding_sync_room_metadata', '{}');
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index f26cc0e903..12bdb94d3a 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -361,12 +361,28 @@ class SlidingSyncResult:
                     self.global_account_data_map or self.account_data_by_room_map
                 )
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class ReceiptsExtension:
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
+            """
+
+            room_id_to_receipt_map: Mapping[str, JsonMapping]
+
+            def __bool__(self) -> bool:
+                return bool(self.room_id_to_receipt_map)
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
         def __bool__(self) -> bool:
-            return bool(self.to_device or self.e2ee or self.account_data)
+            return bool(
+                self.to_device or self.e2ee or self.account_data or self.receipts
+            )
 
     next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index dfe3b1e0f7..4e632e4492 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -342,9 +342,27 @@ class SlidingSyncBody(RequestBodyModel):
             # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
             rooms: Optional[List[StrictStr]] = ["*"]
 
+        class ReceiptsExtension(RequestBodyModel):
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                enabled
+                lists: List of list keys (from the Sliding Window API) to apply this
+                    extension to.
+                rooms: List of room IDs (from the Room Subscription API) to apply this
+                    extension to.
+            """
+
+            enabled: Optional[StrictBool] = False
+            # Process all lists defined in the Sliding Window API. (This is the default.)
+            lists: Optional[List[StrictStr]] = ["*"]
+            # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+            rooms: Optional[List[StrictStr]] = ["*"]
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
     conn_id: Optional[str]
 
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 5abf1041be..a97660e2f2 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -30,6 +30,7 @@ from twisted.test.proto_helpers import MemoryReactor
 import synapse.rest.admin
 from synapse.api.constants import (
     AccountDataTypes,
+    EduTypes,
     EventContentFields,
     EventTypes,
     HistoryVisibility,
@@ -1369,12 +1370,14 @@ class SlidingSyncTestCase(SlidingSyncBase):
         room.register_servlets,
         sync.register_servlets,
         devices.register_servlets,
+        receipts.register_servlets,
     ]
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main
         self.event_sources = hs.get_event_sources()
         self.storage_controllers = hs.get_storage_controllers()
+        self.account_data_handler = hs.get_account_data_handler()
 
     def _assertRequiredStateIncludes(
         self,
@@ -4915,6 +4918,225 @@ class SlidingSyncTestCase(SlidingSyncBase):
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
         self.assertEqual(response_body["rooms"][room_id1]["initial"], True)
 
+    # Any extensions that use `lists`/`rooms` should be tested here
+    @parameterized.expand([("account_data",), ("receipts",)])
+    def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None:
+        """
+        With various extensions, test out requesting different variations of
+        `lists`/`rooms`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create some rooms
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        room_id_to_human_name_map = {
+            room_id1: "room1",
+            room_id2: "room2",
+            room_id3: "room3",
+            room_id4: "room4",
+            room_id5: "room5",
+        }
+
+        for room_id in room_id_to_human_name_map.keys():
+            if extension_name == "account_data":
+                # Add some account data to each room
+                self.get_success(
+                    self.account_data_handler.add_account_data_to_room(
+                        user_id=user1_id,
+                        room_id=room_id,
+                        account_data_type="org.matrix.roorarraz",
+                        content={"roo": "rar"},
+                    )
+                )
+            elif extension_name == "receipts":
+                event_response = self.helper.send(
+                    room_id, body="new event", tok=user1_tok
+                )
+                # Read last event
+                channel = self.make_request(
+                    "POST",
+                    f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}",
+                    {},
+                    access_token=user1_tok,
+                )
+                self.assertEqual(channel.code, 200, channel.json_body)
+            else:
+                raise AssertionError(f"Unknown extension name: {extension_name}")
+
+        main_sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+        }
+
+        # Mix lists and rooms
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["foo-list", "non-existent-list"],
+                    "rooms": [room_id1, room_id2, "!non-existent-room"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Requested via `rooms` and a room subscription exists
+        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
+        # room3: ❌ Not requested
+        # room4: ✅ Shows up because requested via `lists` and list exists in the response
+        # room5: ✅ Shows up because requested via `lists` and list exists in the response
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try wildcards (this is the default)
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    # "lists": ["*"],
+                    # "rooms": ["*"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Empty list will return nothing
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ❌ Not requested
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            set(),
+            exact=True,
+        )
+
+        # Try wildcard and none
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["*"],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try requesting a room that is only in a list
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [room_id5],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ✅ Requested via `rooms` and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room5"},
+            exact=True,
+        )
+
 
 class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
     """Tests for the to-device sliding sync extension"""
@@ -5928,188 +6150,266 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
             exact=True,
         )
 
-    def test_room_account_data_relevant_rooms(self) -> None:
+    def test_wait_for_new_data(self) -> None:
         """
-        Test out different variations of `lists`/`rooms` we are requesting account data for.
+        Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+        (Only applies to incremental syncs with a `timeout` specified)
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
-        # Create a room and add some room account data
-        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id1,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
-        )
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
 
-        # Create another room with some room account data
-        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id2,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
-        )
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # Create another room with some room account data
-        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id3,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
+            await_result=False,
         )
-
-        # Create another room with some room account data
-        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Bump the global account data to trigger new results
         self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id4,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
+            self.account_data_handler.add_account_data_for_user(
+                user1_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
             )
         )
+        # Should respond before the 10 second timeout
+        channel.await_result(timeout_ms=3000)
+        self.assertEqual(channel.code, 200, channel.json_body)
 
-        # Create another room with some room account data
-        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id5,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
+        # We should see the global account data update
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in channel.json_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            {"org.matrix.foobarbaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            channel.json_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
         )
 
-        room_id_to_human_name_map = {
-            room_id1: "room1",
-            room_id2: "room2",
-            room_id3: "room3",
-            room_id4: "room4",
-            room_id5: "room5",
-        }
+    def test_wait_for_new_data_timeout(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive but
+        no data ever arrives so we timeout. We're also making sure that the default data
+        from the account_data extension doesn't trigger a false-positive for new data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
 
-        # Mix lists and rooms
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
-            "room_subscriptions": {
-                room_id1: {
-                    "required_state": [],
-                    "timeline_limit": 0,
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
                 }
             },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Wake-up `notifier.wait_for_events(...)` that will cause us test
+        # `SlidingSyncResult.__bool__` for new results.
+        self._bump_notifier_wait_for_events(
+            user1_id,
+            # We choose `StreamKeyType.PRESENCE` because we're testing for account data
+            # and don't want to contaminate the account data results using
+            # `StreamKeyType.ACCOUNT_DATA`.
+            wake_stream_key=StreamKeyType.PRESENCE,
+        )
+        # Block for a little bit more to ensure we don't see any new results.
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=4000)
+        # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+        # 5000 + 4000 + 1200 > 10000)
+        channel.await_result(timeout_ms=1200)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("global")
+        )
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("rooms")
+        )
+
+
+class SlidingSyncReceiptsExtensionTestCase(SlidingSyncBase):
+    """Tests for the receipts sliding sync extension"""
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        sync.register_servlets,
+        receipts.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+
+    def test_no_data_initial_sync(self) -> None:
+        """
+        Test that enabling the receipts extension works during an intitial sync,
+        even if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Make an initial Sliding Sync request with the receipts extension enabled
+        sync_body = {
+            "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": ["foo-list", "non-existent-list"],
-                    "rooms": [room_id1, room_id2, "!non-existent-room"],
                 }
             },
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ✅ Requested via `rooms` and a room subscription exists
-        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
-        # room3: ❌ Not requested
-        # room4: ✅ Shows up because requested via `lists` and list exists in the response
-        # room5: ✅ Shows up because requested via `lists` and list exists in the response
         self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
-            },
-            {"room1", "room4", "room5"},
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            set(),
             exact=True,
         )
 
-        # Try wildcards (this is the default)
+    def test_no_data_incremental_sync(self) -> None:
+        """
+        Test that enabling receipts extension works during an incremental sync, even
+        if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
-            "room_subscriptions": {
-                room_id1: {
-                    "required_state": [],
-                    "timeline_limit": 0,
-                }
-            },
+            "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    # "lists": ["*"],
-                    # "rooms": ["*"],
                 }
             },
         }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make an incremental Sliding Sync request with the receipts extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
 
-        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
         self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
-            },
-            {"room1", "room3", "room4", "room5"},
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            set(),
             exact=True,
         )
 
-        # Empty list will return nothing
+    def test_receipts_initial_sync(self) -> None:
+        """
+        On initial sync, we return all receipts for a given room but only for
+        rooms that we request and are being returned in the Sliding Sync response.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+
+        # Create a room
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user3_id, tok=user3_tok)
+        event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok)
+        # User1 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User2 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User3 privately reads the last event (make sure this doesn't leak to the other users)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response1['event_id']}",
+            {},
+            access_token=user3_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Create another room
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        self.helper.join(room_id2, user3_id, tok=user3_tok)
+        event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok)
+        # User1 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User2 reads the last event
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User3 privately reads the last event (make sure this doesn't leak to the other users)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}",
+            {},
+            access_token=user3_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make an initial Sliding Sync request with the receipts extension enabled
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
+            "lists": {},
             "room_subscriptions": {
                 room_id1: {
                     "required_state": [],
@@ -6117,76 +6417,223 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
                 }
             },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": [],
-                    "rooms": [],
+                    "rooms": [room_id1, room_id2],
                 }
             },
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ❌ Not requested
-        # room4: ❌ Not requested
-        # room5: ❌ Not requested
+        # Even though we requested room2, we only expect room1 to show up because that's
+        # the only room in the Sliding Sync response (room2 is not one of our room
+        # subscriptions or in a sliding window list).
         self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
-            },
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We can see user1 and user2 read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                event_response1["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user1_id, user2_id},
+            exact=True,
+        )
+        # User1 did not have a private read receipt and we shouldn't leak others'
+        # private read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                event_response1["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
             set(),
             exact=True,
         )
 
-        # Try wildcard and none
+    def test_receipts_incremental_sync(self) -> None:
+        """
+        On incremental sync, we return all receipts for a given room but only for
+        rooms that we request and are being returned in the Sliding Sync response.
+        """
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+
+        # Create room1
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user3_id, tok=user3_tok)
+        event_response1 = self.helper.send(room_id1, body="new event", tok=user2_tok)
+        # User2 reads the last event (before the `from_token`)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            {},
+            access_token=user2_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Create room2
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        event_response2 = self.helper.send(room_id2, body="new event", tok=user2_tok)
+        # User1 reads the last event (before the `from_token`)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ}/{event_response2['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Create room3
+        room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id3, user1_id, tok=user1_tok)
+        self.helper.join(room_id3, user3_id, tok=user3_tok)
+        event_response3 = self.helper.send(room_id3, body="new event", tok=user2_tok)
+
+        # Create room4
+        room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id4, user1_id, tok=user1_tok)
+        self.helper.join(room_id4, user3_id, tok=user3_tok)
+        event_response4 = self.helper.send(room_id4, body="new event", tok=user2_tok)
+        # User1 reads the last event (before the `from_token`)
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id4}/receipt/{ReceiptTypes.READ}/{event_response4['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
+            "lists": {},
+            "room_subscriptions": {
+                room_id1: {
                     "required_state": [],
                     "timeline_limit": 0,
                 },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
+                room_id3: {
                     "required_state": [],
                     "timeline_limit": 0,
                 },
-            },
-            "room_subscriptions": {
-                room_id1: {
+                room_id4: {
                     "required_state": [],
                     "timeline_limit": 0,
-                }
+                },
             },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": ["*"],
-                    "rooms": [],
+                    "rooms": [room_id1, room_id2, room_id3, room_id4],
                 }
             },
         }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        # Add some more read receipts after the `from_token`
+        #
+        # User1 reads room1
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id1}/receipt/{ReceiptTypes.READ}/{event_response1['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User1 privately reads room2
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id2}/receipt/{ReceiptTypes.READ_PRIVATE}/{event_response2['event_id']}",
+            {},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # User3 reads room3
+        channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id3}/receipt/{ReceiptTypes.READ}/{event_response3['event_id']}",
+            {},
+            access_token=user3_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        # No activity for room4 after the `from_token`
+
+        # Make an incremental Sliding Sync request with the receipts extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # Even though we requested room2, we only expect rooms to show up if they are
+        # already in the Sliding Sync response. room4 doesn't show up because there is
+        # no activity after the `from_token`.
         self.assertIncludes(
-            {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
-            },
-            {"room3", "room4", "room5"},
+            response_body["extensions"]["receipts"].get("rooms").keys(),
+            {room_id1, room_id3},
+            exact=True,
+        )
+
+        # Check room1:
+        #
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We only see that user1 has read something in room1 since the `from_token`
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                event_response1["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user1_id},
+            exact=True,
+        )
+        # User1 did not send a private read receipt in this room and we shouldn't leak
+        # others' private read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id1]["content"][
+                event_response1["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
+            set(),
+            exact=True,
+        )
+
+        # Check room3:
+        #
+        # Sanity check that it's the correct ephemeral event type
+        self.assertEqual(
+            response_body["extensions"]["receipts"]["rooms"][room_id3]["type"],
+            EduTypes.RECEIPT,
+        )
+        # We only see that user3 has read something in room1 since the `from_token`
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][
+                event_response3["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user3_id},
+            exact=True,
+        )
+        # User1 did not send a private read receipt in this room and we shouldn't leak
+        # others' private read receipts
+        self.assertIncludes(
+            response_body["extensions"]["receipts"]["rooms"][room_id3]["content"][
+                event_response3["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
+            set(),
             exact=True,
         )
 
@@ -6203,18 +6650,26 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
 
         room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id, user1_id, tok=user1_tok)
+        event_response = self.helper.send(room_id, body="new event", tok=user2_tok)
 
         sync_body = {
             "lists": {},
+            "room_subscriptions": {
+                room_id: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
+                    "rooms": [room_id],
                 }
             },
         }
         _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # Make an incremental Sliding Sync request with the account_data extension enabled
+        # Make an incremental Sliding Sync request with the receipts extension enabled
         channel = self.make_request(
             "POST",
             self.sync_endpoint + f"?timeout=10000&pos={from_token}",
@@ -6225,31 +6680,43 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
         with self.assertRaises(TimedOutException):
             channel.await_result(timeout_ms=5000)
-        # Bump the global account data to trigger new results
-        self.get_success(
-            self.account_data_handler.add_account_data_for_user(
-                user1_id,
-                "org.matrix.foobarbaz",
-                {"foo": "bar"},
-            )
+        # Bump the receipts to trigger new results
+        receipt_channel = self.make_request(
+            "POST",
+            f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response['event_id']}",
+            {},
+            access_token=user2_tok,
         )
+        self.assertEqual(receipt_channel.code, 200, receipt_channel.json_body)
         # Should respond before the 10 second timeout
         channel.await_result(timeout_ms=3000)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        # We should see the global account data update
+        # We should see the new receipt
         self.assertIncludes(
-            {
-                global_event["type"]
-                for global_event in channel.json_body["extensions"]["account_data"].get(
-                    "global"
-                )
-            },
-            {"org.matrix.foobarbaz"},
+            channel.json_body.get("extensions", {})
+            .get("receipts", {})
+            .get("rooms", {})
+            .keys(),
+            {room_id},
             exact=True,
+            message=str(channel.json_body),
         )
         self.assertIncludes(
-            channel.json_body["extensions"]["account_data"].get("rooms").keys(),
+            channel.json_body["extensions"]["receipts"]["rooms"][room_id]["content"][
+                event_response["event_id"]
+            ][ReceiptTypes.READ].keys(),
+            {user2_id},
+            exact=True,
+        )
+        # User1 did not send a private read receipt in this room and we shouldn't leak
+        # others' private read receipts
+        self.assertIncludes(
+            channel.json_body["extensions"]["receipts"]["rooms"][room_id]["content"][
+                event_response["event_id"]
+            ]
+            .get(ReceiptTypes.READ_PRIVATE, {})
+            .keys(),
             set(),
             exact=True,
         )
@@ -6258,7 +6725,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         """
         Test to make sure that the Sliding Sync request waits for new data to arrive but
         no data ever arrives so we timeout. We're also making sure that the default data
-        from the account_data extension doesn't trigger a false-positive for new data.
+        from the receipts extension doesn't trigger a false-positive for new data.
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -6266,7 +6733,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         sync_body = {
             "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
                 }
             },
@@ -6287,11 +6754,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         # Wake-up `notifier.wait_for_events(...)` that will cause us test
         # `SlidingSyncResult.__bool__` for new results.
         self._bump_notifier_wait_for_events(
-            user1_id,
-            # We choose `StreamKeyType.PRESENCE` because we're testing for account data
-            # and don't want to contaminate the account data results using
-            # `StreamKeyType.ACCOUNT_DATA`.
-            wake_stream_key=StreamKeyType.PRESENCE,
+            user1_id, wake_stream_key=StreamKeyType.ACCOUNT_DATA
         )
         # Block for a little bit more to ensure we don't see any new results.
         with self.assertRaises(TimedOutException):
@@ -6301,9 +6764,8 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         channel.await_result(timeout_ms=1200)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        self.assertIsNotNone(
-            channel.json_body["extensions"]["account_data"].get("global")
-        )
-        self.assertIsNotNone(
-            channel.json_body["extensions"]["account_data"].get("rooms")
+        self.assertIncludes(
+            channel.json_body["extensions"]["receipts"].get("rooms").keys(),
+            set(),
+            exact=True,
         )
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index c4e216c308..037bbca1ba 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -440,6 +440,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
             assert persist_events_store is not None
             persist_events_store._store_event_txn(
                 txn,
+                events[0].room_id,
                 [
                     (e, EventContext(self.hs.get_storage_controllers(), {}))
                     for e in events