summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--synapse/handlers/account_data.py1
-rw-r--r--synapse/handlers/receipts.py4
-rw-r--r--synapse/handlers/sliding_sync.py222
-rw-r--r--synapse/rest/client/sync.py6
-rw-r--r--synapse/types/handlers/__init__.py18
-rw-r--r--synapse/types/rest/client/__init__.py18
-rw-r--r--tests/rest/client/test_sync.py835
7 files changed, 870 insertions, 234 deletions
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 97a463d8d0..8041326cd5 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -327,6 +327,7 @@ class AccountDataEventSource(EventSource[int, JsonDict]):
         explicit_room_id: Optional[str] = None,
     ) -> Tuple[List[JsonDict], int]:
         user_id = user.to_string()
+        # TODO: Take `to_key` into account
         last_stream_id = from_key
 
         current_stream_id = self.store.get_max_account_data_stream_id()
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index 8674a8fcdd..d04c76be2a 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -286,8 +286,10 @@ class ReceiptEventSource(EventSource[MultiWriterStreamToken, JsonMapping]):
         room_ids: Iterable[str],
         is_guest: bool,
         explicit_room_id: Optional[str] = None,
+        to_key: Optional[MultiWriterStreamToken] = None,
     ) -> Tuple[List[JsonMapping], MultiWriterStreamToken]:
-        to_key = self.get_current_key()
+        if to_key is None:
+            to_key = self.get_current_key()
 
         if from_key == to_key:
             return [], to_key
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 3231574402..0c7299137d 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -47,6 +47,7 @@ from synapse.types import (
     DeviceListUpdates,
     JsonDict,
     JsonMapping,
+    MultiWriterStreamToken,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
@@ -630,7 +631,8 @@ class SlidingSyncHandler:
 
         extensions = await self.get_extensions_response(
             sync_config=sync_config,
-            lists=lists,
+            actual_lists=lists,
+            actual_room_ids=set(rooms.keys()),
             from_token=from_token,
             to_token=to_token,
         )
@@ -1800,7 +1802,8 @@ class SlidingSyncHandler:
     async def get_extensions_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
@@ -1808,7 +1811,9 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
         """
@@ -1837,18 +1842,102 @@ class SlidingSyncHandler:
         if sync_config.extensions.account_data is not None:
             account_data_response = await self.get_account_data_extension_response(
                 sync_config=sync_config,
-                lists=lists,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
                 account_data_request=sync_config.extensions.account_data,
                 to_token=to_token,
                 from_token=from_token,
             )
 
+        receipts_response = None
+        if sync_config.extensions.receipts is not None:
+            receipts_response = await self.get_receipts_extension_response(
+                sync_config=sync_config,
+                actual_lists=actual_lists,
+                actual_room_ids=actual_room_ids,
+                receipts_request=sync_config.extensions.receipts,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
         return SlidingSyncResult.Extensions(
             to_device=to_device_response,
             e2ee=e2ee_response,
             account_data=account_data_response,
+            receipts=receipts_response,
         )
 
+    def find_relevant_room_ids_for_extension(
+        self,
+        requested_lists: Optional[List[str]],
+        requested_room_ids: Optional[List[str]],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+    ) -> Set[str]:
+        """
+        Handle the reserved `lists`/`rooms` keys for extensions. Extensions should only
+        return results for rooms in the Sliding Sync response. This matches up the
+        requested rooms/lists with the actual lists/rooms in the Sliding Sync response.
+
+        {"lists": []}                    // Do not process any lists.
+        {"lists": ["rooms", "dms"]}      // Process only a subset of lists.
+        {"lists": ["*"]}                 // Process all lists defined in the Sliding Window API. (This is the default.)
+
+        {"rooms": []}                    // Do not process any specific rooms.
+        {"rooms": ["!a:b", "!c:d"]}      // Process only a subset of room subscriptions.
+        {"rooms": ["*"]}                 // Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+
+        Args:
+            requested_lists: The `lists` from the extension request.
+            requested_room_ids: The `rooms` from the extension request.
+            actual_lists: The actual lists from the Sliding Sync response.
+            actual_room_ids: The actual room subscriptions from the Sliding Sync request.
+        """
+
+        # We only want to include account data for rooms that are already in the sliding
+        # sync response AND that were requested in the account data request.
+        relevant_room_ids: Set[str] = set()
+
+        # See what rooms from the room subscriptions we should get account data for
+        if requested_room_ids is not None:
+            for room_id in requested_room_ids:
+                # A wildcard means we process all rooms from the room subscriptions
+                if room_id == "*":
+                    relevant_room_ids.update(actual_room_ids)
+                    break
+
+                if room_id in actual_room_ids:
+                    relevant_room_ids.add(room_id)
+
+        # See what rooms from the sliding window lists we should get account data for
+        if requested_lists is not None:
+            for list_key in requested_lists:
+                # Just some typing because we share the variable name in multiple places
+                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
+
+                # A wildcard means we process rooms from all lists
+                if list_key == "*":
+                    for actual_list in actual_lists.values():
+                        # We only expect a single SYNC operation for any list
+                        assert len(actual_list.ops) == 1
+                        sync_op = actual_list.ops[0]
+                        assert sync_op.op == OperationType.SYNC
+
+                        relevant_room_ids.update(sync_op.room_ids)
+
+                    break
+
+                actual_list = actual_lists.get(list_key)
+                if actual_list is not None:
+                    # We only expect a single SYNC operation for any list
+                    assert len(actual_list.ops) == 1
+                    sync_op = actual_list.ops[0]
+                    assert sync_op.op == OperationType.SYNC
+
+                    relevant_room_ids.update(sync_op.room_ids)
+
+        return relevant_room_ids
+
     async def get_to_device_extension_response(
         self,
         sync_config: SlidingSyncConfig,
@@ -1976,7 +2065,8 @@ class SlidingSyncHandler:
     async def get_account_data_extension_response(
         self,
         sync_config: SlidingSyncConfig,
-        lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
         account_data_request: SlidingSyncConfig.Extensions.AccountDataExtension,
         to_token: StreamToken,
         from_token: Optional[SlidingSyncStreamToken],
@@ -1985,7 +2075,9 @@ class SlidingSyncHandler:
 
         Args:
             sync_config: Sync configuration
-            lists: Sliding window API. A map of list key to list results.
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
             account_data_request: The account_data extension from the request
             to_token: The point in the stream to sync up to.
             from_token: The point in the stream to sync from.
@@ -1998,6 +2090,7 @@ class SlidingSyncHandler:
 
         global_account_data_map: Mapping[str, JsonMapping] = {}
         if from_token is not None:
+            # TODO: This should take into account the `from_token` and `to_token`
             global_account_data_map = (
                 await self.store.get_updated_global_account_data_for_user(
                     user_id, from_token.stream_token.account_data_key
@@ -2009,76 +2102,40 @@ class SlidingSyncHandler:
             )
             if have_push_rules_changed:
                 global_account_data_map = dict(global_account_data_map)
+                # TODO: This should take into account the `from_token` and `to_token`
                 global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                     await self.push_rules_handler.push_rules_for_user(sync_config.user)
                 )
         else:
+            # TODO: This should take into account the `to_token`
             all_global_account_data = await self.store.get_global_account_data_for_user(
                 user_id
             )
 
             global_account_data_map = dict(all_global_account_data)
+            # TODO: This should take into account the  `to_token`
             global_account_data_map[AccountDataTypes.PUSH_RULES] = (
                 await self.push_rules_handler.push_rules_for_user(sync_config.user)
             )
 
-        # We only want to include account data for rooms that are already in the sliding
-        # sync response AND that were requested in the account data request.
-        relevant_room_ids: Set[str] = set()
-
-        # See what rooms from the room subscriptions we should get account data for
-        if (
-            account_data_request.rooms is not None
-            and sync_config.room_subscriptions is not None
-        ):
-            actual_room_ids = sync_config.room_subscriptions.keys()
-
-            for room_id in account_data_request.rooms:
-                # A wildcard means we process all rooms from the room subscriptions
-                if room_id == "*":
-                    relevant_room_ids.update(sync_config.room_subscriptions.keys())
-                    break
-
-                if room_id in actual_room_ids:
-                    relevant_room_ids.add(room_id)
-
-        # See what rooms from the sliding window lists we should get account data for
-        if account_data_request.lists is not None:
-            for list_key in account_data_request.lists:
-                # Just some typing because we share the variable name in multiple places
-                actual_list: Optional[SlidingSyncResult.SlidingWindowList] = None
-
-                # A wildcard means we process rooms from all lists
-                if list_key == "*":
-                    for actual_list in lists.values():
-                        # We only expect a single SYNC operation for any list
-                        assert len(actual_list.ops) == 1
-                        sync_op = actual_list.ops[0]
-                        assert sync_op.op == OperationType.SYNC
-
-                        relevant_room_ids.update(sync_op.room_ids)
-
-                    break
-
-                actual_list = lists.get(list_key)
-                if actual_list is not None:
-                    # We only expect a single SYNC operation for any list
-                    assert len(actual_list.ops) == 1
-                    sync_op = actual_list.ops[0]
-                    assert sync_op.op == OperationType.SYNC
-
-                    relevant_room_ids.update(sync_op.room_ids)
-
         # Fetch room account data
         account_data_by_room_map: Mapping[str, Mapping[str, JsonMapping]] = {}
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=account_data_request.lists,
+            requested_room_ids=account_data_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
         if len(relevant_room_ids) > 0:
             if from_token is not None:
+                # TODO: This should take into account the `from_token` and `to_token`
                 account_data_by_room_map = (
                     await self.store.get_updated_room_account_data_for_user(
                         user_id, from_token.stream_token.account_data_key
                     )
                 )
             else:
+                # TODO: This should take into account the `to_token`
                 account_data_by_room_map = (
                     await self.store.get_room_account_data_for_user(user_id)
                 )
@@ -2094,3 +2151,62 @@ class SlidingSyncHandler:
             global_account_data_map=global_account_data_map,
             account_data_by_room_map=account_data_by_room_map,
         )
+
+    async def get_receipts_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        actual_lists: Dict[str, SlidingSyncResult.SlidingWindowList],
+        actual_room_ids: Set[str],
+        receipts_request: SlidingSyncConfig.Extensions.ReceiptsExtension,
+        to_token: StreamToken,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.ReceiptsExtension]:
+        """Handle Receipts extension (MSC3960)
+
+        Args:
+            sync_config: Sync configuration
+            actual_lists: Sliding window API. A map of list key to list results in the
+                Sliding Sync response.
+            actual_room_ids: The actual room IDs in the the Sliding Sync response.
+            account_data_request: The account_data extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        # Skip if the extension is not enabled
+        if not receipts_request.enabled:
+            return None
+
+        relevant_room_ids = self.find_relevant_room_ids_for_extension(
+            requested_lists=receipts_request.lists,
+            requested_room_ids=receipts_request.rooms,
+            actual_lists=actual_lists,
+            actual_room_ids=actual_room_ids,
+        )
+
+        room_id_to_receipt_map: Mapping[str, JsonMapping] = {}
+        if len(relevant_room_ids) > 0:
+            receipt_source = self.event_sources.sources.receipt
+            receipts, _ = await receipt_source.get_new_events(
+                user=sync_config.user,
+                from_key=(
+                    from_token.stream_token.receipt_key
+                    if from_token
+                    else MultiWriterStreamToken(stream=0)
+                ),
+                to_key=to_token.receipt_key,
+                # This is a dummy value and isn't used in the function
+                limit=0,
+                room_ids=relevant_room_ids,
+                is_guest=False,
+            )
+
+            for receipt in receipts:
+                # These fields should exist for every receipt
+                room_id = receipt["room_id"]
+                type = receipt["type"]
+                content = receipt["content"]
+                room_id_to_receipt_map[room_id] = {"type": type, "content": content}
+
+        return SlidingSyncResult.Extensions.ReceiptsExtension(
+            room_id_to_receipt_map=room_id_to_receipt_map,
+        )
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 7cf1f56435..268c6521e0 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -1134,6 +1134,12 @@ class SlidingSyncRestServlet(RestServlet):
                 },
             }
 
+        if extensions.receipts is not None:
+            serialized_extensions["receipts"] = {
+                # Same as the the top-level `account_data.events` field in Sync v2.
+                "rooms": extensions.receipts.room_id_to_receipt_map,
+            }
+
         return serialized_extensions
 
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 479222a18d..488ebd8365 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -349,12 +349,28 @@ class SlidingSyncResult:
                     self.global_account_data_map or self.account_data_by_room_map
                 )
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class ReceiptsExtension:
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                room_id_to_receipt_map: Mapping from room_id to `m.receipt` event (type, content)
+            """
+
+            room_id_to_receipt_map: Mapping[str, JsonMapping]
+
+            def __bool__(self) -> bool:
+                return bool(self.room_id_to_receipt_map)
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
         def __bool__(self) -> bool:
-            return bool(self.to_device or self.e2ee or self.account_data)
+            return bool(
+                self.to_device or self.e2ee or self.account_data or self.receipts
+            )
 
     next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index 34e07ddac5..ae8b7d8144 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -339,9 +339,27 @@ class SlidingSyncBody(RequestBodyModel):
             # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
             rooms: Optional[List[StrictStr]] = ["*"]
 
+        class ReceiptsExtension(RequestBodyModel):
+            """The Receipts extension (MSC3960)
+
+            Attributes:
+                enabled
+                lists: List of list keys (from the Sliding Window API) to apply this
+                    extension to.
+                rooms: List of room IDs (from the Room Subscription API) to apply this
+                    extension to.
+            """
+
+            enabled: Optional[StrictBool] = False
+            # Process all lists defined in the Sliding Window API. (This is the default.)
+            lists: Optional[List[StrictStr]] = ["*"]
+            # Process all room subscriptions defined in the Room Subscription API. (This is the default.)
+            rooms: Optional[List[StrictStr]] = ["*"]
+
         to_device: Optional[ToDeviceExtension] = None
         e2ee: Optional[E2eeExtension] = None
         account_data: Optional[AccountDataExtension] = None
+        receipts: Optional[ReceiptsExtension] = None
 
     # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
     if TYPE_CHECKING:
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 2bbbd95a76..4422deb884 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -1369,6 +1369,7 @@ class SlidingSyncTestCase(SlidingSyncBase):
         room.register_servlets,
         sync.register_servlets,
         devices.register_servlets,
+        receipts.register_servlets,
     ]
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
@@ -4434,6 +4435,225 @@ class SlidingSyncTestCase(SlidingSyncBase):
         # `world_readable` but currently we don't support this.
         self.assertIsNone(response_body["rooms"].get(room_id1), response_body["rooms"])
 
+    # Any extensions that use `lists`/`rooms` should be tested here
+    @parameterized.expand([("account_data",), ("receipts",)])
+    def test_extensions_lists_rooms_relevant_rooms(self, extension_name: str) -> None:
+        """
+        With various extensions, test out requesting different variations of
+        `lists`/`rooms`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create some rooms
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        room_id_to_human_name_map = {
+            room_id1: "room1",
+            room_id2: "room2",
+            room_id3: "room3",
+            room_id4: "room4",
+            room_id5: "room5",
+        }
+
+        for room_id in room_id_to_human_name_map.keys():
+            if extension_name == "account_data":
+                # Add some account data to each room
+                self.get_success(
+                    self.account_data_handler.add_account_data_to_room(
+                        user_id=user1_id,
+                        room_id=room_id,
+                        account_data_type="org.matrix.roorarraz",
+                        content={"roo": "rar"},
+                    )
+                )
+            elif extension_name == "receipts":
+                event_response = self.helper.send(
+                    room_id, body="new event", tok=user1_tok
+                )
+                # Read last event
+                channel = self.make_request(
+                    "POST",
+                    f"/rooms/{room_id}/receipt/{ReceiptTypes.READ}/{event_response["event_id"]}",
+                    {},
+                    access_token=user1_tok,
+                )
+                self.assertEqual(channel.code, 200, channel.json_body)
+            else:
+                raise AssertionError(f"Unknown extension name: {extension_name}")
+
+        main_sync_body = {
+            "lists": {
+                # We expect this list range to include room5 and room4
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+                # We expect this list range to include room5, room4, room3
+                "bar-list": {
+                    "ranges": [[0, 2]],
+                    "required_state": [],
+                    "timeline_limit": 0,
+                },
+            },
+            "room_subscriptions": {
+                room_id1: {
+                    "required_state": [],
+                    "timeline_limit": 0,
+                }
+            },
+        }
+
+        # Mix lists and rooms
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["foo-list", "non-existent-list"],
+                    "rooms": [room_id1, room_id2, "!non-existent-room"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Requested via `rooms` and a room subscription exists
+        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
+        # room3: ❌ Not requested
+        # room4: ✅ Shows up because requested via `lists` and list exists in the response
+        # room5: ✅ Shows up because requested via `lists` and list exists in the response
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try wildcards (this is the default)
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    # "lists": ["*"],
+                    # "rooms": ["*"],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room1", "room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Empty list will return nothing
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ❌ Not requested
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            set(),
+            exact=True,
+        )
+
+        # Try wildcard and none
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": ["*"],
+                    "rooms": [],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
+        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room3", "room4", "room5"},
+            exact=True,
+        )
+
+        # Try requesting a room that is only in a list
+        sync_body = {
+            **main_sync_body,
+            "extensions": {
+                extension_name: {
+                    "enabled": True,
+                    "lists": [],
+                    "rooms": [room_id5],
+                }
+            },
+        }
+        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+
+        # room1: ❌ Not requested
+        # room2: ❌ Not requested
+        # room3: ❌ Not requested
+        # room4: ❌ Not requested
+        # room5: ✅ Requested via `rooms` and is in a list
+        self.assertIncludes(
+            {
+                room_id_to_human_name_map[room_id]
+                for room_id in response_body["extensions"][extension_name]
+                .get("rooms")
+                .keys()
+            },
+            {"room5"},
+            exact=True,
+        )
+
 
 class SlidingSyncToDeviceExtensionTestCase(SlidingSyncBase):
     """Tests for the to-device sliding sync extension"""
@@ -5447,188 +5667,407 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
             exact=True,
         )
 
-    def test_room_account_data_relevant_rooms(self) -> None:
+    def test_wait_for_new_data(self) -> None:
         """
-        Test out different variations of `lists`/`rooms` we are requesting account data for.
+        Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+        (Only applies to incremental syncs with a `timeout` specified)
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
-        # Create a room and add some room account data
-        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Bump the global account data to trigger new results
         self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id1,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
+            self.account_data_handler.add_account_data_for_user(
+                user1_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
             )
         )
+        # Should respond before the 10 second timeout
+        channel.await_result(timeout_ms=3000)
+        self.assertEqual(channel.code, 200, channel.json_body)
 
-        # Create another room with some room account data
-        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id2,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
+        # We should see the global account data update
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in channel.json_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            {"org.matrix.foobarbaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            channel.json_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
         )
 
-        # Create another room with some room account data
-        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id3,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
+    def test_wait_for_new_data_timeout(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive but
+        no data ever arrives so we timeout. We're also making sure that the default data
+        from the account_data extension doesn't trigger a false-positive for new data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        sync_body = {
+            "lists": {},
+            "extensions": {
+                "account_data": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?timeout=10000&pos={from_token}",
+            content=sync_body,
+            access_token=user1_tok,
+            await_result=False,
         )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Wake-up `notifier.wait_for_events(...)` that will cause us test
+        # `SlidingSyncResult.__bool__` for new results.
+        self._bump_notifier_wait_for_events(
+            user1_id,
+            # We choose `StreamKeyType.PRESENCE` because we're testing for account data
+            # and don't want to contaminate the account data results using
+            # `StreamKeyType.ACCOUNT_DATA`.
+            wake_stream_key=StreamKeyType.PRESENCE,
+        )
+        # Block for a little bit more to ensure we don't see any new results.
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=4000)
+        # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+        # 5000 + 4000 + 1200 > 10000)
+        channel.await_result(timeout_ms=1200)
+        self.assertEqual(channel.code, 200, channel.json_body)
 
-        # Create another room with some room account data
-        room_id4 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id4,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
-            )
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("global")
+        )
+        self.assertIsNotNone(
+            channel.json_body["extensions"]["account_data"].get("rooms")
         )
 
-        # Create another room with some room account data
-        room_id5 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+class SlidingSyncReceiptsExtensionTestCase(unittest.HomeserverTestCase):
+    """Tests for the receipts sliding sync extension"""
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        sync.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
+        self.account_data_handler = hs.get_account_data_handler()
+        self.notifier = hs.get_notifier()
+        self.sync_endpoint = (
+            "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+        )
+
+    # TODO: Remove once https://github.com/element-hq/synapse/pull/17481 lands
+    def _bump_notifier_wait_for_events(self, user_id: str) -> None:
+        """
+        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
+        Sync results.
+        """
+        # We're expecting some new activity from this point onwards
+        from_token = self.event_sources.get_current_token()
+
+        triggered_notifier_wait_for_events = False
+
+        async def _on_new_acivity(
+            before_token: StreamToken, after_token: StreamToken
+        ) -> bool:
+            nonlocal triggered_notifier_wait_for_events
+            triggered_notifier_wait_for_events = True
+            return True
+
+        # Listen for some new activity for the user. We're just trying to confirm that
+        # our bump below actually does what we think it does (triggers new activity for
+        # the user).
+        result_awaitable = self.notifier.wait_for_events(
+            user_id,
+            1000,
+            _on_new_acivity,
+            from_token=from_token,
+        )
+
+        # Update the account data so that `notifier.wait_for_events(...)` wakes up.
+        # We're bumping account data because it won't show up in the Sliding Sync
+        # response so it won't affect whether we have results.
         self.get_success(
-            self.account_data_handler.add_account_data_to_room(
-                user_id=user1_id,
-                room_id=room_id5,
-                account_data_type="org.matrix.roorarraz",
-                content={"roo": "rar"},
+            self.account_data_handler.add_account_data_for_user(
+                user_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
             )
         )
 
-        room_id_to_human_name_map = {
-            room_id1: "room1",
-            room_id2: "room2",
-            room_id3: "room3",
-            room_id4: "room4",
-            room_id5: "room5",
-        }
+        # Wait for our notifier result
+        self.get_success(result_awaitable)
 
-        # Mix lists and rooms
+        if not triggered_notifier_wait_for_events:
+            raise AssertionError(
+                "Expected `notifier.wait_for_events(...)` to be triggered"
+            )
+
+    def test_no_data_initial_sync(self) -> None:
+        """
+        Test that enabling the account_data extension works during an intitial sync,
+        even if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Make an initial Sliding Sync request with the account_data extension enabled
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
-            "room_subscriptions": {
-                room_id1: {
-                    "required_state": [],
-                    "timeline_limit": 0,
-                }
-            },
+            "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": ["foo-list", "non-existent-list"],
-                    "rooms": [room_id1, room_id2, "!non-existent-room"],
                 }
             },
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ✅ Requested via `rooms` and a room subscription exists
-        # room2: ❌ Requested via `rooms` but not in the response (from lists or room subscriptions)
-        # room3: ❌ Not requested
-        # room4: ✅ Shows up because requested via `lists` and list exists in the response
-        # room5: ✅ Shows up because requested via `lists` and list exists in the response
         self.assertIncludes(
             {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
             },
-            {"room1", "room4", "room5"},
+            # Even though we don't have any global account data set, Synapse saves some
+            # default push rules for us.
+            {AccountDataTypes.PUSH_RULES},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
             exact=True,
         )
 
-        # Try wildcards (this is the default)
+    def test_no_data_incremental_sync(self) -> None:
+        """
+        Test that enabling account_data extension works during an incremental sync, even
+        if there is no-data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
-            "room_subscriptions": {
-                room_id1: {
-                    "required_state": [],
-                    "timeline_limit": 0,
+            "lists": {},
+            "extensions": {
+                "receipts": {
+                    "enabled": True,
                 }
             },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        # There has been no account data changes since the `from_token` so we shouldn't
+        # see any account data here.
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
+            },
+            set(),
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_global_account_data_initial_sync(self) -> None:
+        """
+        On initial sync, we should return all global account data on initial sync.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Update the global account data
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id=user1_id,
+                account_data_type="org.matrix.foobarbaz",
+                content={"foo": "bar"},
+            )
+        )
+
+        # Make an initial Sliding Sync request with the account_data extension enabled
+        sync_body = {
+            "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    # "lists": ["*"],
-                    # "rooms": ["*"],
                 }
             },
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ✅ Shows up because of default `rooms` wildcard and is in one of the room subscriptions
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        # It should show us all of the global account data
         self.assertIncludes(
             {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
-                .get("rooms")
-                .keys()
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
             },
-            {"room1", "room3", "room4", "room5"},
+            {AccountDataTypes.PUSH_RULES, "org.matrix.foobarbaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
             exact=True,
         )
 
-        # Empty list will return nothing
+    def test_global_account_data_incremental_sync(self) -> None:
+        """
+        On incremental sync, we should only account data that has changed since the
+        `from_token`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Add some global account data
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id=user1_id,
+                account_data_type="org.matrix.foobarbaz",
+                content={"foo": "bar"},
+            )
+        )
+
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
+            "lists": {},
+            "extensions": {
+                "receipts": {
+                    "enabled": True,
+                }
+            },
+        }
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
+
+        # Add some other global account data
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id=user1_id,
+                account_data_type="org.matrix.doodardaz",
+                content={"doo": "dar"},
+            )
+        )
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertIncludes(
+            {
+                global_event["type"]
+                for global_event in response_body["extensions"]["account_data"].get(
+                    "global"
+                )
             },
+            # We should only see the new global account data that happened after the `from_token`
+            {"org.matrix.doodardaz"},
+            exact=True,
+        )
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            set(),
+            exact=True,
+        )
+
+    def test_room_account_data_initial_sync(self) -> None:
+        """
+        On initial sync, we return all account data for a given room but only for
+        rooms that we request and are being returned in the Sliding Sync response.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a room and add some room account data
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id1,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id2,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Make an initial Sliding Sync request with the account_data extension enabled
+        sync_body = {
+            "lists": {},
             "room_subscriptions": {
                 room_id1: {
                     "required_state": [],
@@ -5636,47 +6075,66 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
                 }
             },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": [],
-                    "rooms": [],
+                    "rooms": [room_id1, room_id2],
                 }
             },
         }
         response_body, _ = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ❌ Not requested
-        # room4: ❌ Not requested
-        # room5: ❌ Not requested
+        self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+        # Even though we requested room2, we only expect room1 to show up because that's
+        # the only room in the Sliding Sync response (room2 is not one of our room
+        # subscriptions or in a sliding window list).
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
         self.assertIncludes(
             {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
+                event["type"]
+                for event in response_body["extensions"]["account_data"]
                 .get("rooms")
-                .keys()
+                .get(room_id1)
             },
-            set(),
+            {"org.matrix.roorarraz"},
             exact=True,
         )
 
-        # Try wildcard and none
+    def test_room_account_data_incremental_sync(self) -> None:
+        """
+        On incremental sync, we return all account data for a given room but only for
+        rooms that we request and are being returned in the Sliding Sync response.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Create a room and add some room account data
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id1,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Create another room with some room account data
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id2,
+                account_data_type="org.matrix.roorarraz",
+                content={"roo": "rar"},
+            )
+        )
+
         sync_body = {
-            "lists": {
-                # We expect this list range to include room5 and room4
-                "foo-list": {
-                    "ranges": [[0, 1]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-                # We expect this list range to include room5, room4, room3
-                "bar-list": {
-                    "ranges": [[0, 2]],
-                    "required_state": [],
-                    "timeline_limit": 0,
-                },
-            },
+            "lists": {},
             "room_subscriptions": {
                 room_id1: {
                     "required_state": [],
@@ -5684,28 +6142,53 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
                 }
             },
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
-                    "lists": ["*"],
-                    "rooms": [],
+                    "rooms": [room_id1, room_id2],
                 }
             },
         }
-        response_body, _ = self.do_sync(sync_body, tok=user1_tok)
+        _, from_token = self.do_sync(sync_body, tok=user1_tok)
 
-        # room1: ❌ Not requested
-        # room2: ❌ Not requested
-        # room3: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room4: ✅ Shows up because of default `lists` wildcard and is in a list
-        # room5: ✅ Shows up because of default `lists` wildcard and is in a list
+        # Add some other room account data
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id1,
+                account_data_type="org.matrix.roorarraz2",
+                content={"roo": "rar"},
+            )
+        )
+        self.get_success(
+            self.account_data_handler.add_account_data_to_room(
+                user_id=user1_id,
+                room_id=room_id2,
+                account_data_type="org.matrix.roorarraz2",
+                content={"roo": "rar"},
+            )
+        )
+
+        # Make an incremental Sliding Sync request with the account_data extension enabled
+        response_body, _ = self.do_sync(sync_body, since=from_token, tok=user1_tok)
+
+        self.assertIsNotNone(response_body["extensions"]["account_data"].get("global"))
+        # Even though we requested room2, we only expect room1 to show up because that's
+        # the only room in the Sliding Sync response (room2 is not one of our room
+        # subscriptions or in a sliding window list).
+        self.assertIncludes(
+            response_body["extensions"]["account_data"].get("rooms").keys(),
+            {room_id1},
+            exact=True,
+        )
+        # We should only see the new room account data that happened after the `from_token`
         self.assertIncludes(
             {
-                room_id_to_human_name_map[room_id]
-                for room_id in response_body["extensions"]["account_data"]
+                event["type"]
+                for event in response_body["extensions"]["account_data"]
                 .get("rooms")
-                .keys()
+                .get(room_id1)
             },
-            {"room3", "room4", "room5"},
+            {"org.matrix.roorarraz2"},
             exact=True,
         )
 
@@ -5726,7 +6209,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         sync_body = {
             "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
                 }
             },
@@ -5785,7 +6268,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
         sync_body = {
             "lists": {},
             "extensions": {
-                "account_data": {
+                "receipts": {
                     "enabled": True,
                 }
             },
@@ -5805,13 +6288,7 @@ class SlidingSyncAccountDataExtensionTestCase(SlidingSyncBase):
             channel.await_result(timeout_ms=5000)
         # Wake-up `notifier.wait_for_events(...)` that will cause us test
         # `SlidingSyncResult.__bool__` for new results.
-        self._bump_notifier_wait_for_events(
-            user1_id,
-            # We choose `StreamKeyType.PRESENCE` because we're testing for account data
-            # and don't want to contaminate the account data results using
-            # `StreamKeyType.ACCOUNT_DATA`.
-            wake_stream_key=StreamKeyType.PRESENCE,
-        )
+        self._bump_notifier_wait_for_events(user1_id)
         # Block for a little bit more to ensure we don't see any new results.
         with self.assertRaises(TimedOutException):
             channel.await_result(timeout_ms=4000)