summary refs log tree commit diff
path: root/tests/rest/client/test_sync.py
diff options
context:
space:
mode:
Diffstat (limited to 'tests/rest/client/test_sync.py')
-rw-r--r--tests/rest/client/test_sync.py1964
1 files changed, 1890 insertions, 74 deletions
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 6ff1f03c9a..2628869de6 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -20,7 +20,8 @@
 #
 import json
 import logging
-from typing import AbstractSet, Any, Dict, Iterable, List, Optional
+from http import HTTPStatus
+from typing import Any, Dict, Iterable, List
 
 from parameterized import parameterized, parameterized_class
 
@@ -38,7 +39,16 @@ from synapse.api.constants import (
 )
 from synapse.events import EventBase
 from synapse.handlers.sliding_sync import StateValues
-from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
+from synapse.rest.client import (
+    devices,
+    knock,
+    login,
+    read_marker,
+    receipts,
+    room,
+    sendtodevice,
+    sync,
+)
 from synapse.server import HomeServer
 from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
 from synapse.util import Clock
@@ -47,8 +57,9 @@ from tests import unittest
 from tests.federation.transport.test_knocking import (
     KnockingStrippedStateEventHelperMixin,
 )
-from tests.server import TimedOutException
+from tests.server import FakeChannel, TimedOutException
 from tests.test_utils.event_injection import mark_event_as_partial_state
+from tests.unittest import skip_unless
 
 logger = logging.getLogger(__name__)
 
@@ -1103,12 +1114,11 @@ class DeviceUnusedFallbackKeySyncTestCase(unittest.HomeserverTestCase):
         self.assertEqual(res, [])
 
         # Upload a fallback key for the user/device
-        fallback_key = {"alg1:k1": "fallback_key1"}
         self.get_success(
             self.e2e_keys_handler.upload_keys_for_user(
                 alice_user_id,
                 test_device_id,
-                {"fallback_keys": fallback_key},
+                {"fallback_keys": {"alg1:k1": "fallback_key1"}},
             )
         )
         # We should now have an unused alg1 key
@@ -1242,6 +1252,8 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.store = hs.get_datastores().main
         self.event_sources = hs.get_event_sources()
         self.storage_controllers = hs.get_storage_controllers()
+        self.account_data_handler = hs.get_account_data_handler()
+        self.notifier = hs.get_notifier()
 
     def _assertRequiredStateIncludes(
         self,
@@ -1250,7 +1262,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         exact: bool = False,
     ) -> None:
         """
-        Wrapper around `_assertIncludes` to give slightly better looking diff error
+        Wrapper around `assertIncludes` to give slightly better looking diff error
         messages that include some context "$event_id (type, state_key)".
 
         Args:
@@ -1266,7 +1278,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         for event in actual_required_state:
             assert isinstance(event, dict)
 
-        self._assertIncludes(
+        self.assertIncludes(
             {
                 f'{event["event_id"]} ("{event["type"]}", "{event["state_key"]}")'
                 for event in actual_required_state
@@ -1280,56 +1292,6 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             message=str(actual_required_state),
         )
 
-    def _assertIncludes(
-        self,
-        actual_items: AbstractSet[str],
-        expected_items: AbstractSet[str],
-        exact: bool = False,
-        message: Optional[str] = None,
-    ) -> None:
-        """
-        Assert that all of the `expected_items` are included in the `actual_items`.
-
-        This assert could also be called `assertContains`, `assertItemsInSet`
-
-        Args:
-            actual_items: The container
-            expected_items: The items to check for in the container
-            exact: Whether the actual state should be exactly equal to the expected
-                state (no extras).
-            message: Optional message to include in the failure message.
-        """
-        # Check that each set has the same items
-        if exact and actual_items == expected_items:
-            return
-        # Check for a superset
-        elif not exact and actual_items >= expected_items:
-            return
-
-        expected_lines: List[str] = []
-        for expected_item in expected_items:
-            is_expected_in_actual = expected_item in actual_items
-            expected_lines.append(
-                "{}  {}".format(" " if is_expected_in_actual else "?", expected_item)
-            )
-
-        actual_lines: List[str] = []
-        for actual_item in actual_items:
-            is_actual_in_expected = actual_item in expected_items
-            actual_lines.append(
-                "{}  {}".format("+" if is_actual_in_expected else " ", actual_item)
-            )
-
-        newline = "\n"
-        expected_string = f"Expected items to be in actual ('?' = missing expected items):\n {{\n{newline.join(expected_lines)}\n }}"
-        actual_string = f"Actual ('+' = found expected items):\n {{\n{newline.join(actual_lines)}\n }}"
-        first_message = (
-            "Items must match exactly" if exact else "Some expected items are missing."
-        )
-        diff_message = f"{first_message}\n{expected_string}\n{actual_string}"
-
-        self.fail(f"{diff_message}\n{message}")
-
     def _add_new_dm_to_global_account_data(
         self, source_user_id: str, target_user_id: str, target_room_id: str
     ) -> None:
@@ -1417,6 +1379,52 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         return room_id
 
+    def _bump_notifier_wait_for_events(self, user_id: str) -> None:
+        """
+        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
+        Sync results.
+        """
+        # We're expecting some new activity from this point onwards
+        from_token = self.event_sources.get_current_token()
+
+        triggered_notifier_wait_for_events = False
+
+        async def _on_new_acivity(
+            before_token: StreamToken, after_token: StreamToken
+        ) -> bool:
+            nonlocal triggered_notifier_wait_for_events
+            triggered_notifier_wait_for_events = True
+            return True
+
+        # Listen for some new activity for the user. We're just trying to confirm that
+        # our bump below actually does what we think it does (triggers new activity for
+        # the user).
+        result_awaitable = self.notifier.wait_for_events(
+            user_id,
+            1000,
+            _on_new_acivity,
+            from_token=from_token,
+        )
+
+        # Update the account data so that `notifier.wait_for_events(...)` wakes up.
+        # We're bumping account data because it won't show up in the Sliding Sync
+        # response so it won't affect whether we have results.
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
+            )
+        )
+
+        # Wait for our notifier result
+        self.get_success(result_awaitable)
+
+        if not triggered_notifier_wait_for_events:
+            raise AssertionError(
+                "Expected `notifier.wait_for_events(...)` to be triggered"
+            )
+
     def test_sync_list(self) -> None:
         """
         Test that room IDs show up in the Sliding Sync `lists`
@@ -1522,6 +1530,124 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         # with because we weren't able to find anything new yet.
         self.assertEqual(channel.json_body["pos"], future_position_token_serialized)
 
+    def test_wait_for_new_data(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+        (Only applies to incremental syncs with a `timeout` specified)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + "?timeout=10000"
+            + f"&pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 0]],
+                        "required_state": [],
+                        "timeline_limit": 1,
+                    }
+                }
+            },
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Bump the room with new events to trigger new results
+        event_response1 = self.helper.send(
+            room_id, "new activity in room", tok=user1_tok
+        )
+        # Should respond before the 10 second timeout
+        channel.await_result(timeout_ms=3000)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Check to make sure the new event is returned
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id]["timeline"]
+            ],
+            [
+                event_response1["event_id"],
+            ],
+            channel.json_body["rooms"][room_id]["timeline"],
+        )
+
+    # TODO: Once we remove `ops`, we should be able to add a `RoomResult.__bool__` to
+    # check if there are any updates since the `from_token`.
+    @skip_unless(
+        False,
+        "Once we remove ops from the Sliding Sync response, this test should pass",
+    )
+    def test_wait_for_new_data_timeout(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive but
+        no data ever arrives so we timeout. We're also making sure that the default data
+        doesn't trigger a false-positive for new data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + "?timeout=10000"
+            + f"&pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 0]],
+                        "required_state": [],
+                        "timeline_limit": 1,
+                    }
+                }
+            },
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Wake-up `notifier.wait_for_events(...)` that will cause us test
+        # `SlidingSyncResult.__bool__` for new results.
+        self._bump_notifier_wait_for_events(user1_id)
+        # Block for a little bit more to ensure we don't see any new results.
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=4000)
+        # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+        # 5000 + 4000 + 1200 > 10000)
+        channel.await_result(timeout_ms=1200)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We still see rooms because that's how Sliding Sync lists work but we reached
+        # the timeout before seeing them
+        self.assertEqual(
+            [event["event_id"] for event in channel.json_body["rooms"].keys()],
+            [room_id],
+        )
+
     def test_filter_list(self) -> None:
         """
         Test that filters apply to `lists`
@@ -1548,11 +1674,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         )
 
         # Create a normal room
-        room_id = self.helper.create_room_as(user1_id, tok=user2_tok)
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.join(room_id, user1_id, tok=user1_tok)
 
         # Create a room that user1 is invited to
-        invite_room_id = self.helper.create_room_as(user1_id, tok=user2_tok)
+        invite_room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
         self.helper.invite(invite_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
 
         # Make the Sliding Sync request
@@ -1653,6 +1779,20 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             list(channel.json_body["lists"]["room-invites"]),
         )
 
+        # Ensure DM's are correctly marked
+        self.assertDictEqual(
+            {
+                room_id: room.get("is_dm")
+                for room_id, room in channel.json_body["rooms"].items()
+            },
+            {
+                invite_room_id: None,
+                room_id: None,
+                invited_dm_room_id: True,
+                joined_dm_room_id: True,
+            },
+        )
+
     def test_sort_list(self) -> None:
         """
         Test that the `lists` are sorted by `stream_ordering`
@@ -1802,6 +1942,496 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             channel.json_body["lists"]["foo-list"],
         )
 
+    def test_rooms_meta_when_joined(self) -> None:
+        """
+        Test that the `rooms` `name` and `avatar` are included in the response and
+        reflect the current state of the room when the user is joined to the room.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "name": "my super room",
+            },
+        )
+        # Set the room avatar URL
+        self.helper.send_state(
+            room_id1,
+            EventTypes.RoomAvatar,
+            {"url": "mxc://DUMMY_MEDIA_ID"},
+            tok=user2_tok,
+        )
+
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Reflect the current state of the room
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["name"],
+            "my super room",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["avatar"],
+            "mxc://DUMMY_MEDIA_ID",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            2,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            0,
+        )
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("is_dm"),
+        )
+
+    def test_rooms_meta_when_invited(self) -> None:
+        """
+        Test that the `rooms` `name` and `avatar` are included in the response and
+        reflect the current state of the room when the user is invited to the room.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "name": "my super room",
+            },
+        )
+        # Set the room avatar URL
+        self.helper.send_state(
+            room_id1,
+            EventTypes.RoomAvatar,
+            {"url": "mxc://DUMMY_MEDIA_ID"},
+            tok=user2_tok,
+        )
+
+        # User1 is invited to the room
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        # Update the room name after user1 has left
+        self.helper.send_state(
+            room_id1,
+            EventTypes.Name,
+            {"name": "my super duper room"},
+            tok=user2_tok,
+        )
+        # Update the room avatar URL after user1 has left
+        self.helper.send_state(
+            room_id1,
+            EventTypes.RoomAvatar,
+            {"url": "mxc://UPDATED_DUMMY_MEDIA_ID"},
+            tok=user2_tok,
+        )
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # This should still reflect the current state of the room even when the user is
+        # invited.
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["name"],
+            "my super duper room",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["avatar"],
+            "mxc://UPDATED_DUMMY_MEDIA_ID",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            1,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            1,
+        )
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("is_dm"),
+        )
+
+    def test_rooms_meta_when_banned(self) -> None:
+        """
+        Test that the `rooms` `name` and `avatar` reflect the state of the room when the
+        user was banned (do not leak current state).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "name": "my super room",
+            },
+        )
+        # Set the room avatar URL
+        self.helper.send_state(
+            room_id1,
+            EventTypes.RoomAvatar,
+            {"url": "mxc://DUMMY_MEDIA_ID"},
+            tok=user2_tok,
+        )
+
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        # Update the room name after user1 has left
+        self.helper.send_state(
+            room_id1,
+            EventTypes.Name,
+            {"name": "my super duper room"},
+            tok=user2_tok,
+        )
+        # Update the room avatar URL after user1 has left
+        self.helper.send_state(
+            room_id1,
+            EventTypes.RoomAvatar,
+            {"url": "mxc://UPDATED_DUMMY_MEDIA_ID"},
+            tok=user2_tok,
+        )
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Reflect the state of the room at the time of leaving
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["name"],
+            "my super room",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["avatar"],
+            "mxc://DUMMY_MEDIA_ID",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            # FIXME: The actual number should be "1" (user2) but we currently don't
+            # support this for rooms where the user has left/been banned.
+            0,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            0,
+        )
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("is_dm"),
+        )
+
+    def test_rooms_meta_heroes(self) -> None:
+        """
+        Test that the `rooms` `heroes` are included in the response when the room
+        doesn't have a room name set.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "name": "my super room",
+            },
+        )
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # User3 is invited
+        self.helper.invite(room_id1, src=user2_id, targ=user3_id, tok=user2_tok)
+
+        room_id2 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                # No room name set so that `heroes` is populated
+                #
+                # "name": "my super room2",
+            },
+        )
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        # User3 is invited
+        self.helper.invite(room_id2, src=user2_id, targ=user3_id, tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Room1 has a name so we shouldn't see any `heroes` which the client would use
+        # the calculate the room name themselves.
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["name"],
+            "my super room",
+            channel.json_body["rooms"][room_id1],
+        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("heroes"))
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            2,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            1,
+        )
+
+        # Room2 doesn't have a name so we should see `heroes` populated
+        self.assertIsNone(channel.json_body["rooms"][room_id2].get("name"))
+        self.assertCountEqual(
+            [
+                hero["user_id"]
+                for hero in channel.json_body["rooms"][room_id2].get("heroes", [])
+            ],
+            # Heroes shouldn't include the user themselves (we shouldn't see user1)
+            [user2_id, user3_id],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id2]["joined_count"],
+            2,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id2]["invited_count"],
+            1,
+        )
+
+        # We didn't request any state so we shouldn't see any `required_state`
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
+        self.assertIsNone(channel.json_body["rooms"][room_id2].get("required_state"))
+
+    def test_rooms_meta_heroes_max(self) -> None:
+        """
+        Test that the `rooms` `heroes` only includes the first 5 users (not including
+        yourself).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+        user6_id = self.register_user("user6", "pass")
+        user6_tok = self.login(user6_id, "pass")
+        user7_id = self.register_user("user7", "pass")
+        user7_tok = self.login(user7_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                # No room name set so that `heroes` is populated
+                #
+                # "name": "my super room",
+            },
+        )
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user3_id, tok=user3_tok)
+        self.helper.join(room_id1, user4_id, tok=user4_tok)
+        self.helper.join(room_id1, user5_id, tok=user5_tok)
+        self.helper.join(room_id1, user6_id, tok=user6_tok)
+        self.helper.join(room_id1, user7_id, tok=user7_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Room2 doesn't have a name so we should see `heroes` populated
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+        self.assertCountEqual(
+            [
+                hero["user_id"]
+                for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+            ],
+            # Heroes should be the first 5 users in the room (excluding the user
+            # themselves, we shouldn't see `user1`)
+            [user2_id, user3_id, user4_id, user5_id, user6_id],
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            7,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            0,
+        )
+
+        # We didn't request any state so we shouldn't see any `required_state`
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("required_state"))
+
+    def test_rooms_meta_heroes_when_banned(self) -> None:
+        """
+        Test that the `rooms` `heroes` are included in the response when the room
+        doesn't have a room name set but doesn't leak information past their ban.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        _user5_tok = self.login(user5_id, "pass")
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                # No room name set so that `heroes` is populated
+                #
+                # "name": "my super room",
+            },
+        )
+        # User1 joins the room
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # User3 is invited
+        self.helper.invite(room_id1, src=user2_id, targ=user3_id, tok=user2_tok)
+
+        # User1 is banned from the room
+        self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        # User4 joins the room after user1 is banned
+        self.helper.join(room_id1, user4_id, tok=user4_tok)
+        # User5 is invited after user1 is banned
+        self.helper.invite(room_id1, src=user2_id, targ=user5_id, tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 0,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Room2 doesn't have a name so we should see `heroes` populated
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
+        self.assertCountEqual(
+            [
+                hero["user_id"]
+                for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+            ],
+            # Heroes shouldn't include the user themselves (we shouldn't see user1). We
+            # also shouldn't see user4 since they joined after user1 was banned.
+            #
+            # FIXME: The actual result should be `[user2_id, user3_id]` but we currently
+            # don't support this for rooms where the user has left/been banned.
+            [],
+        )
+
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["joined_count"],
+            # FIXME: The actual number should be "1" (user2) but we currently don't
+            # support this for rooms where the user has left/been banned.
+            0,
+        )
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["invited_count"],
+            # We shouldn't see user5 since they were invited after user1 was banned.
+            #
+            # FIXME: The actual number should be "1" (user3) but we currently don't
+            # support this for rooms where the user has left/been banned.
+            0,
+        )
+
     def test_rooms_limited_initial_sync(self) -> None:
         """
         Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`
@@ -2872,11 +3502,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         self.assertEqual(channel.code, 200, channel.json_body)
 
         # Nothing to see for this banned user in the room in the token range
-        self.assertEqual(
-            channel.json_body["rooms"][room_id1]["timeline"],
-            [],
-            channel.json_body["rooms"][room_id1]["timeline"],
-        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("timeline"))
         # No events returned in the timeline so nothing is "live"
         self.assertEqual(
             channel.json_body["rooms"][room_id1]["num_live"],
@@ -2973,6 +3599,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_incremental_sync(self) -> None:
         """
@@ -3027,6 +3654,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard(self) -> None:
         """
@@ -3084,6 +3712,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             state_map.values(),
             exact=True,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard_event_type(self) -> None:
         """
@@ -3147,6 +3776,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             # events when the `event_type` is a wildcard.
             exact=False,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_wildcard_state_key(self) -> None:
         """
@@ -3192,6 +3822,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_lazy_loading_room_members(self) -> None:
         """
@@ -3247,6 +3878,81 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+
+    def test_rooms_required_state_me(self) -> None:
+        """
+        Test `rooms.required_state` correctly handles $ME.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        self.helper.send(room_id1, "1", tok=user2_tok)
+
+        # Also send normal state events with state keys of the users, first
+        # change the power levels to allow this.
+        self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.PowerLevels,
+            body={"users": {user1_id: 50, user2_id: 100}},
+            tok=user2_tok,
+        )
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.foo",
+            state_key=user1_id,
+            body={},
+            tok=user1_tok,
+        )
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.foo",
+            state_key=user2_id,
+            body={},
+            tok=user2_tok,
+        )
+
+        # Make the Sliding Sync request with a request for '$ME'.
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [
+                            [EventTypes.Create, ""],
+                            [EventTypes.Member, StateValues.ME],
+                            ["org.matrix.foo", StateValues.ME],
+                        ],
+                        "timeline_limit": 3,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
+        )
+
+        # Only user2 and user3 sent events in the 3 events we see in the `timeline`
+        self._assertRequiredStateIncludes(
+            channel.json_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[(EventTypes.Create, "")],
+                state_map[(EventTypes.Member, user1_id)],
+                state_map[("org.matrix.foo", user1_id)],
+            },
+            exact=True,
+        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     @parameterized.expand([(Membership.LEAVE,), (Membership.BAN,)])
     def test_rooms_required_state_leave_ban(self, stop_membership: str) -> None:
@@ -3329,6 +4035,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             },
             exact=True,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_combine_superset(self) -> None:
         """
@@ -3349,6 +4056,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             body={"foo": "bar"},
             tok=user2_tok,
         )
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.bar_state",
+            state_key="",
+            body={"bar": "qux"},
+            tok=user2_tok,
+        )
 
         # Make the Sliding Sync request with wildcards for the `state_key`
         channel = self.make_request(
@@ -3372,16 +4086,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                         ],
                         "timeline_limit": 0,
                     },
-                }
-                # TODO: Room subscription should also combine with the `required_state`
-                # "room_subscriptions": {
-                #     room_id1: {
-                #         "required_state": [
-                #             ["org.matrix.bar_state", ""]
-                #         ],
-                #         "timeline_limit": 0,
-                #     }
-                # }
+                },
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [["org.matrix.bar_state", ""]],
+                        "timeline_limit": 0,
+                    }
+                },
             },
             access_token=user1_tok,
         )
@@ -3398,9 +4109,11 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
                 state_map[(EventTypes.Member, user1_id)],
                 state_map[(EventTypes.Member, user2_id)],
                 state_map[("org.matrix.foo_state", "")],
+                state_map[("org.matrix.bar_state", "")],
             },
             exact=True,
         )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
 
     def test_rooms_required_state_partial_state(self) -> None:
         """
@@ -3488,3 +4201,1106 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             ],
             channel.json_body["lists"]["foo-list"],
         )
+
+    def test_room_subscriptions_with_join_membership(self) -> None:
+        """
+        Test `room_subscriptions` with a joined room should give us timeline and current
+        state events.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            [EventTypes.Create, ""],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
+        )
+
+        # We should see some state
+        self._assertRequiredStateIncludes(
+            channel.json_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[(EventTypes.Create, "")],
+            },
+            exact=True,
+        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+
+        # We should see some events
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                join_response["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        # No "live" events in an initial sync (no `from_token` to define the "live"
+        # range)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+        # There are more events to paginate to
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            True,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_room_subscriptions_with_leave_membership(self) -> None:
+        """
+        Test `room_subscriptions` with a leave room should give us timeline and state
+        events up to the leave event.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.foo_state",
+            state_key="",
+            body={"foo": "bar"},
+            tok=user2_tok,
+        )
+
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        state_map = self.get_success(
+            self.storage_controllers.state.get_current_state(room_id1)
+        )
+
+        # Send some events after user1 leaves
+        self.helper.send(room_id1, "activity after leave", tok=user2_tok)
+        # Update state after user1 leaves
+        self.helper.send_state(
+            room_id1,
+            event_type="org.matrix.foo_state",
+            state_key="",
+            body={"foo": "qux"},
+            tok=user2_tok,
+        )
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            ["org.matrix.foo_state", ""],
+                        ],
+                        "timeline_limit": 2,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should see the state at the time of the leave
+        self._assertRequiredStateIncludes(
+            channel.json_body["rooms"][room_id1]["required_state"],
+            {
+                state_map[("org.matrix.foo_state", "")],
+            },
+            exact=True,
+        )
+        self.assertIsNone(channel.json_body["rooms"][room_id1].get("invite_state"))
+
+        # We should see some before we left (nothing after)
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                join_response["event_id"],
+                leave_response["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        # No "live" events in an initial sync (no `from_token` to define the "live"
+        # range)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+        # There are more events to paginate to
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            True,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_room_subscriptions_no_leak_private_room(self) -> None:
+        """
+        Test `room_subscriptions` with a private room we have never been in should not
+        leak any data to the user.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=False)
+
+        # We should not be able to join the private room
+        self.helper.join(
+            room_id1, user1_id, tok=user1_tok, expect_code=HTTPStatus.FORBIDDEN
+        )
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            [EventTypes.Create, ""],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should not see the room at all (we're not in it)
+        self.assertIsNone(
+            channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
+        )
+
+    def test_room_subscriptions_world_readable(self) -> None:
+        """
+        Test `room_subscriptions` with a room that has `world_readable` history visibility
+
+        FIXME: We should be able to see the room timeline and state
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Create a room with `world_readable` history visibility
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "preset": "public_chat",
+                "initial_state": [
+                    {
+                        "content": {
+                            "history_visibility": HistoryVisibility.WORLD_READABLE
+                        },
+                        "state_key": "",
+                        "type": EventTypes.RoomHistoryVisibility,
+                    }
+                ],
+            },
+        )
+        # Ensure we're testing with a room with `world_readable` history visibility
+        # which means events are visible to anyone even without membership.
+        history_visibility_response = self.helper.get_state(
+            room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+        )
+        self.assertEqual(
+            history_visibility_response.get("history_visibility"),
+            HistoryVisibility.WORLD_READABLE,
+        )
+
+        # Note: We never join the room
+
+        # Make the Sliding Sync request with just the room subscription
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "room_subscriptions": {
+                    room_id1: {
+                        "required_state": [
+                            [EventTypes.Create, ""],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # FIXME: In the future, we should be able to see the room because it's
+        # `world_readable` but currently we don't support this.
+        self.assertIsNone(
+            channel.json_body["rooms"].get(room_id1), channel.json_body["rooms"]
+        )
+
+
+class SlidingSyncToDeviceExtensionTestCase(unittest.HomeserverTestCase):
+    """Tests for the to-device sliding sync extension"""
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        sync.register_servlets,
+        sendtodevice.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+        self.account_data_handler = hs.get_account_data_handler()
+        self.notifier = hs.get_notifier()
+        self.sync_endpoint = (
+            "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+        )
+
+    def _bump_notifier_wait_for_events(self, user_id: str) -> None:
+        """
+        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
+        Sync results.
+        """
+        # We're expecting some new activity from this point onwards
+        from_token = self.event_sources.get_current_token()
+
+        triggered_notifier_wait_for_events = False
+
+        async def _on_new_acivity(
+            before_token: StreamToken, after_token: StreamToken
+        ) -> bool:
+            nonlocal triggered_notifier_wait_for_events
+            triggered_notifier_wait_for_events = True
+            return True
+
+        # Listen for some new activity for the user. We're just trying to confirm that
+        # our bump below actually does what we think it does (triggers new activity for
+        # the user).
+        result_awaitable = self.notifier.wait_for_events(
+            user_id,
+            1000,
+            _on_new_acivity,
+            from_token=from_token,
+        )
+
+        # Update the account data so that `notifier.wait_for_events(...)` wakes up.
+        # We're bumping account data because it won't show up in the Sliding Sync
+        # response so it won't affect whether we have results.
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
+            )
+        )
+
+        # Wait for our notifier result
+        self.get_success(result_awaitable)
+
+        if not triggered_notifier_wait_for_events:
+            raise AssertionError(
+                "Expected `notifier.wait_for_events(...)` to be triggered"
+            )
+
+    def _assert_to_device_response(
+        self, channel: FakeChannel, expected_messages: List[JsonDict]
+    ) -> str:
+        """Assert the sliding sync response was successful and has the expected
+        to-device messages.
+
+        Returns the next_batch token from the to-device section.
+        """
+        self.assertEqual(channel.code, 200, channel.json_body)
+        extensions = channel.json_body["extensions"]
+        to_device = extensions["to_device"]
+        self.assertIsInstance(to_device["next_batch"], str)
+        self.assertEqual(to_device["events"], expected_messages)
+
+        return to_device["next_batch"]
+
+    def test_no_data(self) -> None:
+        """Test that enabling to-device extension works, even if there is
+        no-data
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+
+        # We expect no to-device messages
+        self._assert_to_device_response(channel, [])
+
+    def test_data_initial_sync(self) -> None:
+        """Test that we get to-device messages when we don't specify a since
+        token"""
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass", "d1")
+        user2_id = self.register_user("u2", "pass")
+        user2_tok = self.login(user2_id, "pass", "d2")
+
+        # Send the to-device message
+        test_msg = {"foo": "bar"}
+        chan = self.make_request(
+            "PUT",
+            "/_matrix/client/r0/sendToDevice/m.test/1234",
+            content={"messages": {user1_id: {"d1": test_msg}}},
+            access_token=user2_tok,
+        )
+        self.assertEqual(chan.code, 200, chan.result)
+
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self._assert_to_device_response(
+            channel,
+            [{"content": test_msg, "sender": user2_id, "type": "m.test"}],
+        )
+
+    def test_data_incremental_sync(self) -> None:
+        """Test that we get to-device messages over incremental syncs"""
+
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass", "d1")
+        user2_id = self.register_user("u2", "pass")
+        user2_tok = self.login(user2_id, "pass", "d2")
+
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        # No to-device messages yet.
+        next_batch = self._assert_to_device_response(channel, [])
+
+        test_msg = {"foo": "bar"}
+        chan = self.make_request(
+            "PUT",
+            "/_matrix/client/r0/sendToDevice/m.test/1234",
+            content={"messages": {user1_id: {"d1": test_msg}}},
+            access_token=user2_tok,
+        )
+        self.assertEqual(chan.code, 200, chan.result)
+
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                        "since": next_batch,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        next_batch = self._assert_to_device_response(
+            channel,
+            [{"content": test_msg, "sender": user2_id, "type": "m.test"}],
+        )
+
+        # The next sliding sync request should not include the to-device
+        # message.
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                        "since": next_batch,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self._assert_to_device_response(channel, [])
+
+        # An initial sliding sync request should not include the to-device
+        # message, as it should have been deleted
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self._assert_to_device_response(channel, [])
+
+    def test_wait_for_new_data(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+        (Only applies to incremental syncs with a `timeout` specified)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass", "d1")
+        user2_id = self.register_user("u2", "pass")
+        user2_tok = self.login(user2_id, "pass", "d2")
+
+        from_token = self.event_sources.get_current_token()
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + "?timeout=10000"
+            + f"&pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Bump the to-device messages to trigger new results
+        test_msg = {"foo": "bar"}
+        send_to_device_channel = self.make_request(
+            "PUT",
+            "/_matrix/client/r0/sendToDevice/m.test/1234",
+            content={"messages": {user1_id: {"d1": test_msg}}},
+            access_token=user2_tok,
+        )
+        self.assertEqual(
+            send_to_device_channel.code, 200, send_to_device_channel.result
+        )
+        # Should respond before the 10 second timeout
+        channel.await_result(timeout_ms=3000)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        self._assert_to_device_response(
+            channel,
+            [{"content": test_msg, "sender": user2_id, "type": "m.test"}],
+        )
+
+    def test_wait_for_new_data_timeout(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive but
+        no data ever arrives so we timeout. We're also making sure that the default data
+        from the To-Device extension doesn't trigger a false-positive for new data.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        from_token = self.event_sources.get_current_token()
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + "?timeout=10000"
+            + f"&pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {},
+                "extensions": {
+                    "to_device": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Wake-up `notifier.wait_for_events(...)` that will cause us test
+        # `SlidingSyncResult.__bool__` for new results.
+        self._bump_notifier_wait_for_events(user1_id)
+        # Block for a little bit more to ensure we don't see any new results.
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=4000)
+        # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+        # 5000 + 4000 + 1200 > 10000)
+        channel.await_result(timeout_ms=1200)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        self._assert_to_device_response(channel, [])
+
+
+class SlidingSyncE2eeExtensionTestCase(unittest.HomeserverTestCase):
+    """Tests for the e2ee sliding sync extension"""
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        sync.register_servlets,
+        devices.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+        self.e2e_keys_handler = hs.get_e2e_keys_handler()
+        self.account_data_handler = hs.get_account_data_handler()
+        self.notifier = hs.get_notifier()
+        self.sync_endpoint = (
+            "/_matrix/client/unstable/org.matrix.simplified_msc3575/sync"
+        )
+
+    def _bump_notifier_wait_for_events(self, user_id: str) -> None:
+        """
+        Wake-up a `notifier.wait_for_events(user_id)` call without affecting the Sliding
+        Sync results.
+        """
+        # We're expecting some new activity from this point onwards
+        from_token = self.event_sources.get_current_token()
+
+        triggered_notifier_wait_for_events = False
+
+        async def _on_new_acivity(
+            before_token: StreamToken, after_token: StreamToken
+        ) -> bool:
+            nonlocal triggered_notifier_wait_for_events
+            triggered_notifier_wait_for_events = True
+            return True
+
+        # Listen for some new activity for the user. We're just trying to confirm that
+        # our bump below actually does what we think it does (triggers new activity for
+        # the user).
+        result_awaitable = self.notifier.wait_for_events(
+            user_id,
+            1000,
+            _on_new_acivity,
+            from_token=from_token,
+        )
+
+        # Update the account data so that `notifier.wait_for_events(...)` wakes up.
+        # We're bumping account data because it won't show up in the Sliding Sync
+        # response so it won't affect whether we have results.
+        self.get_success(
+            self.account_data_handler.add_account_data_for_user(
+                user_id,
+                "org.matrix.foobarbaz",
+                {"foo": "bar"},
+            )
+        )
+
+        # Wait for our notifier result
+        self.get_success(result_awaitable)
+
+        if not triggered_notifier_wait_for_events:
+            raise AssertionError(
+                "Expected `notifier.wait_for_events(...)` to be triggered"
+            )
+
+    def test_no_data_initial_sync(self) -> None:
+        """
+        Test that enabling e2ee extension works during an intitial sync, even if there
+        is no-data
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Make an initial Sliding Sync request with the e2ee extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "e2ee": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Device list updates are only present for incremental syncs
+        self.assertIsNone(channel.json_body["extensions"]["e2ee"].get("device_lists"))
+
+        # Both of these should be present even when empty
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"],
+            {
+                # This is always present because of
+                # https://github.com/element-hq/element-android/issues/3725 and
+                # https://github.com/matrix-org/synapse/issues/10456
+                "signed_curve25519": 0
+            },
+        )
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
+            [],
+        )
+
+    def test_no_data_incremental_sync(self) -> None:
+        """
+        Test that enabling e2ee extension works during an incremental sync, even if
+        there is no-data
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        from_token = self.event_sources.get_current_token()
+
+        # Make an incremental Sliding Sync request with the e2ee extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {},
+                "extensions": {
+                    "e2ee": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Device list shows up for incremental syncs
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]
+            .get("device_lists", {})
+            .get("changed"),
+            [],
+        )
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+            [],
+        )
+
+        # Both of these should be present even when empty
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"],
+            {
+                # Note that "signed_curve25519" is always returned in key count responses
+                # regardless of whether we uploaded any keys for it. This is necessary until
+                # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
+                #
+                # Also related:
+                # https://github.com/element-hq/element-android/issues/3725 and
+                # https://github.com/matrix-org/synapse/issues/10456
+                "signed_curve25519": 0
+            },
+        )
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
+            [],
+        )
+
+    def test_wait_for_new_data(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive.
+
+        (Only applies to incremental syncs with a `timeout` specified)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        test_device_id = "TESTDEVICE"
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass", device_id=test_device_id)
+
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+        self.helper.join(room_id, user3_id, tok=user3_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + "?timeout=10000"
+            + f"&pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {},
+                "extensions": {
+                    "e2ee": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Bump the device lists to trigger new results
+        # Have user3 update their device list
+        device_update_channel = self.make_request(
+            "PUT",
+            f"/devices/{test_device_id}",
+            {
+                "display_name": "New Device Name",
+            },
+            access_token=user3_tok,
+        )
+        self.assertEqual(
+            device_update_channel.code, 200, device_update_channel.json_body
+        )
+        # Should respond before the 10 second timeout
+        channel.await_result(timeout_ms=3000)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should see the device list update
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]
+            .get("device_lists", {})
+            .get("changed"),
+            [user3_id],
+        )
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+            [],
+        )
+
+    def test_wait_for_new_data_timeout(self) -> None:
+        """
+        Test to make sure that the Sliding Sync request waits for new data to arrive but
+        no data ever arrives so we timeout. We're also making sure that the default data
+        from the E2EE extension doesn't trigger a false-positive for new data (see
+        `device_one_time_keys_count.signed_curve25519`).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        from_token = self.event_sources.get_current_token()
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + "?timeout=10000"
+            + f"&pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {},
+                "extensions": {
+                    "e2ee": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+            await_result=False,
+        )
+        # Block for 5 seconds to make sure we are `notifier.wait_for_events(...)`
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=5000)
+        # Wake-up `notifier.wait_for_events(...)` that will cause us test
+        # `SlidingSyncResult.__bool__` for new results.
+        self._bump_notifier_wait_for_events(user1_id)
+        # Block for a little bit more to ensure we don't see any new results.
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=4000)
+        # Wait for the sync to complete (wait for the rest of the 10 second timeout,
+        # 5000 + 4000 + 1200 > 10000)
+        channel.await_result(timeout_ms=1200)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Device lists are present for incremental syncs but empty because no device changes
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]
+            .get("device_lists", {})
+            .get("changed"),
+            [],
+        )
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+            [],
+        )
+
+        # Both of these should be present even when empty
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]["device_one_time_keys_count"],
+            {
+                # Note that "signed_curve25519" is always returned in key count responses
+                # regardless of whether we uploaded any keys for it. This is necessary until
+                # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
+                #
+                # Also related:
+                # https://github.com/element-hq/element-android/issues/3725 and
+                # https://github.com/matrix-org/synapse/issues/10456
+                "signed_curve25519": 0
+            },
+        )
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]["device_unused_fallback_key_types"],
+            [],
+        )
+
+    def test_device_lists(self) -> None:
+        """
+        Test that device list updates are included in the response
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        test_device_id = "TESTDEVICE"
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass", device_id=test_device_id)
+
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id, user1_id, tok=user1_tok)
+        self.helper.join(room_id, user3_id, tok=user3_tok)
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        # Have user3 update their device list
+        channel = self.make_request(
+            "PUT",
+            f"/devices/{test_device_id}",
+            {
+                "display_name": "New Device Name",
+            },
+            access_token=user3_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # User4 leaves the room
+        self.helper.leave(room_id, user4_id, tok=user4_tok)
+
+        # Make an incremental Sliding Sync request with the e2ee extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {},
+                "extensions": {
+                    "e2ee": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Device list updates show up
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"]
+            .get("device_lists", {})
+            .get("changed"),
+            [user3_id],
+        )
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"].get("device_lists", {}).get("left"),
+            [user4_id],
+        )
+
+    def test_device_one_time_keys_count(self) -> None:
+        """
+        Test that `device_one_time_keys_count` are included in the response
+        """
+        test_device_id = "TESTDEVICE"
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass", device_id=test_device_id)
+
+        # Upload one time keys for the user/device
+        keys: JsonDict = {
+            "alg1:k1": "key1",
+            "alg2:k2": {"key": "key2", "signatures": {"k1": "sig1"}},
+            "alg2:k3": {"key": "key3"},
+        }
+        upload_keys_response = self.get_success(
+            self.e2e_keys_handler.upload_keys_for_user(
+                user1_id, test_device_id, {"one_time_keys": keys}
+            )
+        )
+        self.assertDictEqual(
+            upload_keys_response,
+            {
+                "one_time_key_counts": {
+                    "alg1": 1,
+                    "alg2": 2,
+                    # Note that "signed_curve25519" is always returned in key count responses
+                    # regardless of whether we uploaded any keys for it. This is necessary until
+                    # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
+                    #
+                    # Also related:
+                    # https://github.com/element-hq/element-android/issues/3725 and
+                    # https://github.com/matrix-org/synapse/issues/10456
+                    "signed_curve25519": 0,
+                }
+            },
+        )
+
+        # Make a Sliding Sync request with the e2ee extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "e2ee": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Check for those one time key counts
+        self.assertEqual(
+            channel.json_body["extensions"]["e2ee"].get("device_one_time_keys_count"),
+            {
+                "alg1": 1,
+                "alg2": 2,
+                # Note that "signed_curve25519" is always returned in key count responses
+                # regardless of whether we uploaded any keys for it. This is necessary until
+                # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
+                #
+                # Also related:
+                # https://github.com/element-hq/element-android/issues/3725 and
+                # https://github.com/matrix-org/synapse/issues/10456
+                "signed_curve25519": 0,
+            },
+        )
+
+    def test_device_unused_fallback_key_types(self) -> None:
+        """
+        Test that `device_unused_fallback_key_types` are included in the response
+        """
+        test_device_id = "TESTDEVICE"
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass", device_id=test_device_id)
+
+        # We shouldn't have any unused fallback keys yet
+        res = self.get_success(
+            self.store.get_e2e_unused_fallback_key_types(user1_id, test_device_id)
+        )
+        self.assertEqual(res, [])
+
+        # Upload a fallback key for the user/device
+        self.get_success(
+            self.e2e_keys_handler.upload_keys_for_user(
+                user1_id,
+                test_device_id,
+                {"fallback_keys": {"alg1:k1": "fallback_key1"}},
+            )
+        )
+        # We should now have an unused alg1 key
+        fallback_res = self.get_success(
+            self.store.get_e2e_unused_fallback_key_types(user1_id, test_device_id)
+        )
+        self.assertEqual(fallback_res, ["alg1"], fallback_res)
+
+        # Make a Sliding Sync request with the e2ee extension enabled
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {},
+                "extensions": {
+                    "e2ee": {
+                        "enabled": True,
+                    }
+                },
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Check for the unused fallback key types
+        self.assertListEqual(
+            channel.json_body["extensions"]["e2ee"].get(
+                "device_unused_fallback_key_types"
+            ),
+            ["alg1"],
+        )