summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py5
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/api/ratelimiting.py5
-rw-r--r--synapse/app/homeserver.py2
-rw-r--r--synapse/config/repository.py6
-rw-r--r--synapse/config/server.py11
-rw-r--r--synapse/handlers/device.py17
-rw-r--r--synapse/handlers/e2e_room_keys.py18
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/sliding_sync.py1026
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/http/matrixfederationclient.py126
-rw-r--r--synapse/media/media_repository.py39
-rw-r--r--synapse/media/thumbnailer.py48
-rw-r--r--synapse/notifier.py13
-rw-r--r--synapse/rest/client/keys.py10
-rw-r--r--synapse/rest/client/sync.py75
-rw-r--r--synapse/rest/media/download_resource.py3
-rw-r--r--synapse/rest/media/thumbnail_resource.py5
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/databases/main/cache.py6
-rw-r--r--synapse/storage/databases/main/event_federation.py5
-rw-r--r--synapse/storage/databases/main/media_repository.py28
-rw-r--r--synapse/storage/databases/main/roommember.py83
-rw-r--r--synapse/storage/databases/main/state.py52
-rw-r--r--synapse/storage/databases/main/stream.py123
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/86/01_authenticate_media.sql15
-rw-r--r--synapse/types/__init__.py95
-rw-r--r--synapse/types/handlers/__init__.py97
-rw-r--r--synapse/types/rest/client/__init__.py62
-rw-r--r--synapse/util/caches/stream_change_cache.py12
32 files changed, 1617 insertions, 391 deletions
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 3bb4a34938..5c6db8118f 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -119,18 +119,19 @@ BOOLEAN_COLUMNS = {
     "e2e_room_keys": ["is_verified"],
     "event_edges": ["is_state"],
     "events": ["processed", "outlier", "contains_url"],
-    "local_media_repository": ["safe_from_quarantine"],
+    "local_media_repository": ["safe_from_quarantine", "authenticated"],
+    "per_user_experimental_features": ["enabled"],
     "presence_list": ["accepted"],
     "presence_stream": ["currently_active"],
     "public_room_list_stream": ["visibility"],
     "pushers": ["enabled"],
     "redactions": ["have_censored"],
+    "remote_media_cache": ["authenticated"],
     "room_stats_state": ["is_federatable"],
     "rooms": ["is_public", "has_auth_chain_index"],
     "users": ["shadow_banned", "approved", "locked", "suspended"],
     "un_partial_stated_event_stream": ["rejection_status_changed"],
     "users_who_share_rooms": ["share_private"],
-    "per_user_experimental_features": ["enabled"],
 }
 
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 12d18137e0..85001d9676 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -50,7 +50,7 @@ class Membership:
     KNOCK: Final = "knock"
     LEAVE: Final = "leave"
     BAN: Final = "ban"
-    LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN}
+    LIST: Final = frozenset((INVITE, JOIN, KNOCK, LEAVE, BAN))
 
 
 class PresenceState:
diff --git a/synapse/api/ratelimiting.py b/synapse/api/ratelimiting.py
index 26b8711851..b80630c5d3 100644
--- a/synapse/api/ratelimiting.py
+++ b/synapse/api/ratelimiting.py
@@ -236,9 +236,8 @@ class Ratelimiter:
             requester: The requester that is doing the action, if any.
             key: An arbitrary key used to classify an action. Defaults to the
                 requester's user ID.
-            n_actions: The number of times the user wants to do this action. If the user
-                cannot do all of the actions, the user's action count is not incremented
-                at all.
+            n_actions: The number of times the user performed the action. May be negative
+                to "refund" the rate limit.
             _time_now_s: The current time. Optional, defaults to the current time according
                 to self.clock. Only used by tests.
         """
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2b111847b7..e114ab7ec4 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -217,7 +217,7 @@ class SynapseHomeServer(HomeServer):
             )
 
         if name in ["media", "federation", "client"]:
-            if self.config.server.enable_media_repo:
+            if self.config.media.can_load_media_repo:
                 media_repo = self.get_media_repository_resource()
                 resources.update(
                     {
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 1645470499..97ce6de528 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -126,7 +126,7 @@ class ContentRepositoryConfig(Config):
         # Only enable the media repo if either the media repo is enabled or the
         # current worker app is the media repo.
         if (
-            self.root.server.enable_media_repo is False
+            config.get("enable_media_repo", True) is False
             and config.get("worker_app") != "synapse.app.media_repository"
         ):
             self.can_load_media_repo = False
@@ -272,6 +272,10 @@ class ContentRepositoryConfig(Config):
                 remote_media_lifetime
             )
 
+        self.enable_authenticated_media = config.get(
+            "enable_authenticated_media", False
+        )
+
     def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
         assert data_dir_path is not None
         media_store = os.path.join(data_dir_path, "media_store")
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a2b2305776..fd52c0475c 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -384,6 +384,11 @@ class ServerConfig(Config):
         # Whether to internally track presence, requires that presence is enabled,
         self.track_presence = self.presence_enabled and presence_enabled != "untracked"
 
+        # Determines if presence results for offline users are included on initial/full sync
+        self.presence_include_offline_users_on_sync = presence_config.get(
+            "include_offline_users_on_sync", False
+        )
+
         # Custom presence router module
         # This is the legacy way of configuring it (the config should now be put in the modules section)
         self.presence_router_module_class = None
@@ -395,12 +400,6 @@ class ServerConfig(Config):
                 self.presence_router_config,
             ) = load_module(presence_router_config, ("presence", "presence_router"))
 
-        # whether to enable the media repository endpoints. This should be set
-        # to false if the media repository is running as a separate endpoint;
-        # doing so ensures that we will not run cache cleanup jobs on the
-        # master, potentially causing inconsistency.
-        self.enable_media_repo = config.get("enable_media_repo", True)
-
         # Whether to require authentication to retrieve profile data (avatars,
         # display names) of other users through the client API.
         self.require_auth_for_profile_requests = config.get(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 0432d97109..4fc6fcd7ae 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -39,6 +39,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
 from synapse.types import (
+    DeviceListUpdates,
     JsonDict,
     JsonMapping,
     ScheduledTask,
@@ -214,7 +215,7 @@ class DeviceWorkerHandler:
     @cancellable
     async def get_user_ids_changed(
         self, user_id: str, from_token: StreamToken
-    ) -> JsonDict:
+    ) -> DeviceListUpdates:
         """Get list of users that have had the devices updated, or have newly
         joined a room, that `user_id` may be interested in.
         """
@@ -341,11 +342,19 @@ class DeviceWorkerHandler:
             possibly_joined = set()
             possibly_left = set()
 
-        result = {"changed": list(possibly_joined), "left": list(possibly_left)}
+        device_list_updates = DeviceListUpdates(
+            changed=possibly_joined,
+            left=possibly_left,
+        )
 
-        log_kv(result)
+        log_kv(
+            {
+                "changed": device_list_updates.changed,
+                "left": device_list_updates.left,
+            }
+        )
 
-        return result
+        return device_list_updates
 
     async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
         if not self.hs.is_mine(UserID.from_string(user_id)):
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 99f9f6e64a..f397911f28 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -34,7 +34,7 @@ from synapse.api.errors import (
 from synapse.logging.opentracing import log_kv, trace
 from synapse.storage.databases.main.e2e_room_keys import RoomKey
 from synapse.types import JsonDict
-from synapse.util.async_helpers import Linearizer
+from synapse.util.async_helpers import ReadWriteLock
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -58,7 +58,7 @@ class E2eRoomKeysHandler:
         # clients belonging to a user will receive and try to upload a new session at
         # roughly the same time.  Also used to lock out uploads when the key is being
         # changed.
-        self._upload_linearizer = Linearizer("upload_room_keys_lock")
+        self._upload_lock = ReadWriteLock()
 
     @trace
     async def get_room_keys(
@@ -89,7 +89,7 @@ class E2eRoomKeysHandler:
 
         # we deliberately take the lock to get keys so that changing the version
         # works atomically
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.read(user_id):
             # make sure the backup version exists
             try:
                 await self.store.get_e2e_room_keys_version_info(user_id, version)
@@ -132,7 +132,7 @@ class E2eRoomKeysHandler:
         """
 
         # lock for consistency with uploading
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             # make sure the backup version exists
             try:
                 version_info = await self.store.get_e2e_room_keys_version_info(
@@ -193,7 +193,7 @@ class E2eRoomKeysHandler:
         # TODO: Validate the JSON to make sure it has the right keys.
 
         # XXX: perhaps we should use a finer grained lock here?
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             # Check that the version we're trying to upload is the current version
             try:
                 version_info = await self.store.get_e2e_room_keys_version_info(user_id)
@@ -355,7 +355,7 @@ class E2eRoomKeysHandler:
         # TODO: Validate the JSON to make sure it has the right keys.
 
         # lock everyone out until we've switched version
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             new_version = await self.store.create_e2e_room_keys_version(
                 user_id, version_info
             )
@@ -382,7 +382,7 @@ class E2eRoomKeysHandler:
         }
         """
 
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.read(user_id):
             try:
                 res = await self.store.get_e2e_room_keys_version_info(user_id, version)
             except StoreError as e:
@@ -407,7 +407,7 @@ class E2eRoomKeysHandler:
             NotFoundError: if this backup version doesn't exist
         """
 
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             try:
                 await self.store.delete_e2e_room_keys_version(user_id, version)
             except StoreError as e:
@@ -437,7 +437,7 @@ class E2eRoomKeysHandler:
             raise SynapseError(
                 400, "Version in body does not match", Codes.INVALID_PARAM
             )
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             try:
                 old_info = await self.store.get_e2e_room_keys_version_info(
                     user_id, version
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2302d283a7..262d9f4044 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1188,6 +1188,8 @@ class RoomCreationHandler:
             )
             events_to_send.append((power_event, power_context))
         else:
+            # Please update the docs for `default_power_level_content_override` when
+            # updating the `events` dict below
             power_level_content: JsonDict = {
                 "users": {creator_id: 100},
                 "users_default": 0,
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8e2f751c02..554ab59bf3 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,23 +18,33 @@
 #
 #
 import logging
-from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
+from itertools import chain
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Final,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+)
 
 import attr
 from immutabledict import immutabledict
 
-from synapse.api.constants import (
-    AccountDataTypes,
-    Direction,
-    EventContentFields,
-    EventTypes,
-    Membership,
-)
+from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
+from synapse.logging.opentracing import start_active_span, tag_args, trace
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.roommember import MemberSummary
 from synapse.types import (
+    DeviceListUpdates,
     JsonDict,
     PersistedEventPosition,
     Requester,
@@ -46,6 +56,7 @@ from synapse.types import (
 )
 from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
 from synapse.types.state import StateFilter
+from synapse.util.async_helpers import concurrently_execute
 from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
@@ -56,6 +67,7 @@ logger = logging.getLogger(__name__)
 
 # The event types that clients should consider as new activity.
 DEFAULT_BUMP_EVENT_TYPES = {
+    EventTypes.Create,
     EventTypes.Message,
     EventTypes.Encrypted,
     EventTypes.Sticker,
@@ -65,32 +77,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
 }
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+    """
+    Attributes:
+        room_id: The room ID of the membership event
+        event_id: The event ID of the membership event
+        event_pos: The stream position of the membership event
+        membership: The membership state of the user in the room
+        sender: The person who sent the membership event
+        newly_joined: Whether the user newly joined the room during the given token
+            range and is still joined to the room at the end of this range.
+        newly_left: Whether the user newly left (or kicked) the room during the given
+            token range and is still "leave" at the end of this range.
+        is_dm: Whether this user considers this room as a direct-message (DM) room
+    """
+
+    room_id: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    event_id: Optional[str]
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set because `current_state_delta_stream` will note the position that the reset
+    # happened.
+    event_pos: PersistedEventPosition
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set to `LEAVE` because we can make that assumption based on the situaton (see
+    # `get_current_state_delta_membership_changes_for_user(...)`)
+    membership: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    sender: Optional[str]
+    newly_joined: bool
+    newly_left: bool
+    is_dm: bool
+
+    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+        return attr.evolve(self, **kwds)
+
+
 def filter_membership_for_sync(
-    *, membership: str, user_id: str, sender: Optional[str]
+    *, user_id: str, room_membership_for_user: _RoomMembershipForUser
 ) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
     otherwise False.
 
     Attributes:
-        membership: The membership state of the user in the room.
         user_id: The user ID that the membership applies to
-        sender: The person who sent the membership event
+        room_membership_for_user: Membership information for the user in the room
     """
 
-    # Everything except `Membership.LEAVE` because we want everything that's *still*
-    # relevant to the user. There are few more things to include in the sync response
-    # (newly_left) but those are handled separately.
+    membership = room_membership_for_user.membership
+    sender = room_membership_for_user.sender
+    newly_left = room_membership_for_user.newly_left
+
+    # We want to allow everything except rooms the user has left unless `newly_left`
+    # because we want everything that's *still* relevant to the user. We include
+    # `newly_left` rooms because the last event that the user should see is their own
+    # leave event.
     #
-    # This logic includes kicks (leave events where the sender is not the same user) and
-    # can be read as "anything that isn't a leave or a leave with a different sender".
+    # A leave != kick. This logic includes kicks (leave events where the sender is not
+    # the same user).
     #
-    # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
-    # happened that removed the user from the room, or the user was the last person
-    # locally to leave the room which caused the server to leave the room. In both
-    # cases, we can just remove the rooms since they are no longer relevant to the user.
-    # They could still be added back later if they are `newly_left`.
-    return membership != Membership.LEAVE or sender not in (user_id, None)
+    # When `sender=None`, it means that a state reset happened that removed the user
+    # from the room without a corresponding leave event. We can just remove the rooms
+    # since they are no longer relevant to the user but will still appear if they are
+    # `newly_left`.
+    return (
+        # Anything except leave events
+        membership != Membership.LEAVE
+        # Unless...
+        or newly_left
+        # Allow kicks
+        or (membership == Membership.LEAVE and sender not in (user_id, None))
+    )
 
 
 # We can't freeze this class because we want to update it in place with the
@@ -282,29 +341,9 @@ class StateValues:
     # `sender` in the timeline). We only give special meaning to this value when it's a
     # `state_key`.
     LAZY: Final = "$LAZY"
-
-
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _RoomMembershipForUser:
-    """
-    Attributes:
-        event_id: The event ID of the membership event
-        event_pos: The stream position of the membership event
-        membership: The membership state of the user in the room
-        sender: The person who sent the membership event
-        newly_joined: Whether the user newly joined the room during the given token
-            range
-    """
-
-    room_id: str
-    event_id: Optional[str]
-    event_pos: PersistedEventPosition
-    membership: str
-    sender: Optional[str]
-    newly_joined: bool
-
-    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
-        return attr.evolve(self, **kwds)
+    # Subsitute with the requester's user ID. Typically used by clients to get
+    # the user's membership.
+    ME: Final = "$ME"
 
 
 class SlidingSyncHandler:
@@ -316,6 +355,7 @@ class SlidingSyncHandler:
         self.notifier = hs.get_notifier()
         self.event_sources = hs.get_event_sources()
         self.relations_handler = hs.get_relations_handler()
+        self.device_handler = hs.get_device_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
     async def wait_for_sync_for_user(
@@ -344,10 +384,6 @@ class SlidingSyncHandler:
         # auth_blocking will occur)
         await self.auth_blocking.check_auth_blocking(requester=requester)
 
-        # TODO: If the To-Device extension is enabled and we have a `from_token`, delete
-        # any to-device messages before that token (since we now know that the device
-        # has received them). (see sync v2 for how to do this)
-
         # If we're working with a user-provided token, we need to make sure to wait for
         # this worker to catch up with the token so we don't skip past any incoming
         # events or future events if the user is nefariously, manually modifying the
@@ -425,18 +461,31 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        # Get all of the room IDs that the user should be able to see in the sync
+        # response
+        has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
+        has_room_subscriptions = (
+            sync_config.room_subscriptions is not None
+            and len(sync_config.room_subscriptions) > 0
+        )
+        if has_lists or has_room_subscriptions:
+            room_membership_for_user_map = (
+                await self.get_room_membership_for_user_at_to_token(
+                    user=sync_config.user,
+                    to_token=to_token,
+                    from_token=from_token,
+                )
+            )
+
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we're going to display and need to fetch more
         # info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
-        if sync_config.lists:
-            # Get all of the room IDs that the user should be able to see in the sync
-            # response
-            sync_room_map = await self.get_sync_room_ids_for_user(
-                sync_config.user,
-                from_token=from_token,
-                to_token=to_token,
+        if has_lists and sync_config.lists is not None:
+            sync_room_map = await self.filter_rooms_relevant_for_sync(
+                user=sync_config.user,
+                room_membership_for_user_map=room_membership_for_user_map,
             )
 
             for list_key, list_config in sync_config.lists.items():
@@ -464,9 +513,9 @@ class SlidingSyncHandler:
                 membership_state_keys = room_sync_config.required_state_map.get(
                     EventTypes.Member
                 )
+                # Also see `StateFilter.must_await_full_state(...)` for comparison
                 lazy_loading = (
                     membership_state_keys is not None
-                    and len(membership_state_keys) == 1
                     and StateValues.LAZY in membership_state_keys
                 )
 
@@ -524,51 +573,88 @@ class SlidingSyncHandler:
                     ops=ops,
                 )
 
-        # TODO: if (sync_config.room_subscriptions):
+        # Handle room subscriptions
+        if has_room_subscriptions and sync_config.room_subscriptions is not None:
+            for room_id, room_subscription in sync_config.room_subscriptions.items():
+                room_membership_for_user_at_to_token = (
+                    await self.check_room_subscription_allowed_for_user(
+                        room_id=room_id,
+                        room_membership_for_user_map=room_membership_for_user_map,
+                        to_token=to_token,
+                    )
+                )
+
+                # Skip this room if the user isn't allowed to see it
+                if not room_membership_for_user_at_to_token:
+                    continue
+
+                room_membership_for_user_map[room_id] = (
+                    room_membership_for_user_at_to_token
+                )
+
+                # Take the superset of the `RoomSyncConfig` for each room.
+                #
+                # Update our `relevant_room_map` with the room we're going to display
+                # and need to fetch more info about.
+                room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
+                existing_room_sync_config = relevant_room_map.get(room_id)
+                if existing_room_sync_config is not None:
+                    existing_room_sync_config.combine_room_sync_config(room_sync_config)
+                else:
+                    relevant_room_map[room_id] = room_sync_config
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
-        for room_id, room_sync_config in relevant_room_map.items():
+
+        @trace
+        @tag_args
+        async def handle_room(room_id: str) -> None:
             room_sync_result = await self.get_room_sync_data(
                 user=sync_config.user,
                 room_id=room_id,
-                room_sync_config=room_sync_config,
-                room_membership_for_user_at_to_token=sync_room_map[room_id],
+                room_sync_config=relevant_room_map[room_id],
+                room_membership_for_user_at_to_token=room_membership_for_user_map[
+                    room_id
+                ],
                 from_token=from_token,
                 to_token=to_token,
             )
 
             rooms[room_id] = room_sync_result
 
+        with start_active_span("sliding_sync.generate_room_entries"):
+            await concurrently_execute(handle_room, relevant_room_map, 10)
+
+        extensions = await self.get_extensions_response(
+            sync_config=sync_config,
+            from_token=from_token,
+            to_token=to_token,
+        )
+
         return SlidingSyncResult(
             next_pos=to_token,
             lists=lists,
             rooms=rooms,
-            extensions={},
+            extensions=extensions,
         )
 
-    async def get_sync_room_ids_for_user(
+    async def get_room_membership_for_user_at_to_token(
         self,
         user: UserID,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[StreamToken],
     ) -> Dict[str, _RoomMembershipForUser]:
         """
-        Fetch room IDs that should be listed for this user in the sync response (the
-        full room list that will be filtered, sorted, and sliced).
+        Fetch room IDs that the user has had membership in (the full room list including
+        long-lost left rooms that will be filtered, sorted, and sliced).
 
-        We're looking for rooms where the user has the following state in the token
-        range (> `from_token` and <= `to_token`):
+        We're looking for rooms where the user has had any sort of membership in the
+        token range (> `from_token` and <= `to_token`)
 
-        - `invite`, `join`, `knock`, `ban` membership events
-        - Kicks (`leave` membership events where `sender` is different from the
-          `user_id`/`state_key`)
-        - `newly_left` (rooms that were left during the given token range)
-        - In order for bans/kicks to not show up in sync, you need to `/forget` those
-          rooms. This doesn't modify the event itself though and only adds the
-          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
-          to tell when a room was forgotten at the moment so we can't factor it into the
-          from/to range.
+        In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+        doesn't modify the event itself though and only adds the `forgotten` flag to the
+        `room_memberships` table in Synapse. There isn't a way to tell when a room was
+        forgotten at the moment so we can't factor it into the token range.
 
         Args:
             user: User to fetch rooms for
@@ -576,8 +662,8 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
 
         Returns:
-            A dictionary of room IDs that should be listed in the sync response along
-            with membership information in that room at the time of `to_token`.
+            A dictionary of room IDs that the user has had membership in along with
+            membership information in that room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -588,9 +674,6 @@ class SlidingSyncHandler:
             # We want to fetch any kind of membership (joined and left rooms) in order
             # to get the `event_pos` of the latest room membership event for the
             # user.
-            #
-            # We will filter out the rooms that don't belong below (see
-            # `filter_membership_for_sync`)
             membership_list=Membership.LIST,
             excluded_rooms=self.rooms_to_exclude_globally,
         )
@@ -610,7 +693,10 @@ class SlidingSyncHandler:
                 event_pos=room_for_user.event_pos,
                 membership=room_for_user.membership,
                 sender=room_for_user.sender,
+                # We will update these fields below to be accurate
                 newly_joined=False,
+                newly_left=False,
+                is_dm=False,
             )
             for room_for_user in room_for_user_list
         }
@@ -635,10 +721,17 @@ class SlidingSyncHandler:
                 instance_to_max_stream_ordering_map[instance_name] = stream_ordering
 
         # Then assemble the `RoomStreamToken`
+        min_stream_pos = min(instance_to_max_stream_ordering_map.values())
         membership_snapshot_token = RoomStreamToken(
             # Minimum position in the `instance_map`
-            stream=min(instance_to_max_stream_ordering_map.values()),
-            instance_map=immutabledict(instance_to_max_stream_ordering_map),
+            stream=min_stream_pos,
+            instance_map=immutabledict(
+                {
+                    instance_name: stream_pos
+                    for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
+                    if stream_pos > min_stream_pos
+                }
+            ),
         )
 
         # Since we fetched the users room list at some point in time after the from/to
@@ -648,10 +741,9 @@ class SlidingSyncHandler:
         # - 1a) Remove rooms that the user joined after the `to_token`
         # - 1b) Add back rooms that the user left after the `to_token`
         # - 1c) Update room membership events to the point in time of the `to_token`
-        # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
-        # - 3) Figure out which rooms are `newly_joined`
-
-        # 1) -----------------------------------------------------
+        # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+        # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
+        # - 4) Figure out which rooms are DM's
 
         # 1) Fetch membership changes that fall in the range from `to_token` up to
         # `membership_snapshot_token`
@@ -711,7 +803,10 @@ class SlidingSyncHandler:
                         event_pos=first_membership_change_after_to_token.prev_event_pos,
                         membership=first_membership_change_after_to_token.prev_membership,
                         sender=first_membership_change_after_to_token.prev_sender,
+                        # We will update these fields below to be accurate
                         newly_joined=False,
+                        newly_left=False,
+                        is_dm=False,
                     )
                 else:
                     # If we can't find the previous membership event, we shouldn't
@@ -719,22 +814,6 @@ class SlidingSyncHandler:
                     # exact membership state and shouldn't rely on the current snapshot.
                     sync_room_id_set.pop(room_id, None)
 
-        # Filter the rooms that that we have updated room membership events to the point
-        # in time of the `to_token` (from the "1)" fixups)
-        filtered_sync_room_id_set = {
-            room_id: room_membership_for_user
-            for room_id, room_membership_for_user in sync_room_id_set.items()
-            if filter_membership_for_sync(
-                membership=room_membership_for_user.membership,
-                user_id=user_id,
-                sender=room_membership_for_user.sender,
-            )
-        }
-
-        # 2) -----------------------------------------------------
-        # We fix-up newly_left rooms after the first fixup because it may have removed
-        # some left rooms that we can figure out are newly_left in the following code
-
         # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
         current_state_delta_membership_changes_in_from_to_range = []
         if from_token:
@@ -796,18 +875,40 @@ class SlidingSyncHandler:
             if last_membership_change_in_from_to_range.membership == Membership.JOIN:
                 possibly_newly_joined_room_ids.add(room_id)
 
-            # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
-            # include newly_left rooms because the last event that the user should see
-            # is their own leave event
+            # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
-                    room_id=room_id,
-                    event_id=last_membership_change_in_from_to_range.event_id,
-                    event_pos=last_membership_change_in_from_to_range.event_pos,
-                    membership=last_membership_change_in_from_to_range.membership,
-                    sender=last_membership_change_in_from_to_range.sender,
-                    newly_joined=False,
-                )
+                # 2) Mark this room as `newly_left`
+
+                # If we're seeing a membership change here, we should expect to already
+                # have it in our snapshot but if a state reset happens, it wouldn't have
+                # shown up in our snapshot but appear as a change here.
+                existing_sync_entry = sync_room_id_set.get(room_id)
+                if existing_sync_entry is not None:
+                    # Normal expected case
+                    sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
+                        newly_left=True
+                    )
+                else:
+                    # State reset!
+                    logger.warn(
+                        "State reset detected for room_id %s with %s who is no longer in the room",
+                        room_id,
+                        user_id,
+                    )
+                    # Even though a state reset happened which removed the person from
+                    # the room, we still add it the list so the user knows they left the
+                    # room. Downstream code can check for a state reset by looking for
+                    # `event_id=None and membership is not None`.
+                    sync_room_id_set[room_id] = _RoomMembershipForUser(
+                        room_id=room_id,
+                        event_id=last_membership_change_in_from_to_range.event_id,
+                        event_pos=last_membership_change_in_from_to_range.event_pos,
+                        membership=last_membership_change_in_from_to_range.membership,
+                        sender=last_membership_change_in_from_to_range.sender,
+                        newly_joined=False,
+                        newly_left=True,
+                        is_dm=False,
+                    )
 
         # 3) Figure out `newly_joined`
         for room_id in possibly_newly_joined_room_ids:
@@ -818,9 +919,9 @@ class SlidingSyncHandler:
             # also some non-join in the range, we know they `newly_joined`.
             if has_non_join_in_from_to_range:
                 # We found a `newly_joined` room (we left and joined within the token range)
-                filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
-                    room_id
-                ].copy_and_replace(newly_joined=True)
+                sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                    newly_joined=True
+                )
             else:
                 prev_event_id = first_membership_change_by_room_id_in_from_to_range[
                     room_id
@@ -832,7 +933,7 @@ class SlidingSyncHandler:
                 if prev_event_id is None:
                     # We found a `newly_joined` room (we are joining the room for the
                     # first time within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
                 # Last resort, we need to step back to the previous membership event
@@ -840,11 +941,150 @@ class SlidingSyncHandler:
                 elif prev_membership != Membership.JOIN:
                     # We found a `newly_joined` room (we left before the token range
                     # and joined within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
 
-        return filtered_sync_room_id_set
+        # 4) Figure out which rooms the user considers to be direct-message (DM) rooms
+        #
+        # We're using global account data (`m.direct`) instead of checking for
+        # `is_direct` on membership events because that property only appears for
+        # the invitee membership event (doesn't show up for the inviter).
+        #
+        # We're unable to take `to_token` into account for global account data since
+        # we only keep track of the latest account data for the user.
+        dm_map = await self.store.get_global_account_data_by_type_for_user(
+            user_id, AccountDataTypes.DIRECT
+        )
+
+        # Flatten out the map. Account data is set by the client so it needs to be
+        # scrutinized.
+        dm_room_id_set = set()
+        if isinstance(dm_map, dict):
+            for room_ids in dm_map.values():
+                # Account data should be a list of room IDs. Ignore anything else
+                if isinstance(room_ids, list):
+                    for room_id in room_ids:
+                        if isinstance(room_id, str):
+                            dm_room_id_set.add(room_id)
+
+        # 4) Fixup
+        for room_id in sync_room_id_set:
+            sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                is_dm=room_id in dm_room_id_set
+            )
+
+        return sync_room_id_set
+
+    async def filter_rooms_relevant_for_sync(
+        self,
+        user: UserID,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Filter room IDs that should/can be listed for this user in the sync response (the
+        full room list that will be further filtered, sorted, and sliced).
+
+        We're looking for rooms where the user has the following state in the token
+        range (> `from_token` and <= `to_token`):
+
+        - `invite`, `join`, `knock`, `ban` membership events
+        - Kicks (`leave` membership events where `sender` is different from the
+          `user_id`/`state_key`)
+        - `newly_left` (rooms that were left during the given token range)
+        - In order for bans/kicks to not show up in sync, you need to `/forget` those
+          rooms. This doesn't modify the event itself though and only adds the
+          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+          to tell when a room was forgotten at the moment so we can't factor it into the
+          from/to range.
+
+        Args:
+            user: User that is syncing
+            room_membership_for_user_map: Room membership for the user
+
+        Returns:
+            A dictionary of room IDs that should be listed in the sync response along
+            with membership information in that room at the time of `to_token`.
+        """
+        user_id = user.to_string()
+
+        # Filter rooms to only what we're interested to sync with
+        filtered_sync_room_map = {
+            room_id: room_membership_for_user
+            for room_id, room_membership_for_user in room_membership_for_user_map.items()
+            if filter_membership_for_sync(
+                user_id=user_id,
+                room_membership_for_user=room_membership_for_user,
+            )
+        }
+
+        return filtered_sync_room_map
+
+    async def check_room_subscription_allowed_for_user(
+        self,
+        room_id: str,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+        to_token: StreamToken,
+    ) -> Optional[_RoomMembershipForUser]:
+        """
+        Check whether the user is allowed to see the room based on whether they have
+        ever had membership in the room or if the room is `world_readable`.
+
+        Similar to `check_user_in_room_or_world_readable(...)`
+
+        Args:
+            room_id: Room to check
+            room_membership_for_user_map: Room membership for the user at the time of
+                the `to_token` (<= `to_token`).
+            to_token: The token to fetch rooms up to.
+
+        Returns:
+            The room membership for the user if they are allowed to subscribe to the
+            room else `None`.
+        """
+
+        # We can first check if they are already allowed to see the room based
+        # on our previous work to assemble the `room_membership_for_user_map`.
+        #
+        # If they have had any membership in the room over time (up to the `to_token`),
+        # let them subscribe and see what they can.
+        existing_membership_for_user = room_membership_for_user_map.get(room_id)
+        if existing_membership_for_user is not None:
+            return existing_membership_for_user
+
+        # TODO: Handle `world_readable` rooms
+        return None
+
+        # If the room is `world_readable`, it doesn't matter whether they can join,
+        # everyone can see the room.
+        # not_in_room_membership_for_user = _RoomMembershipForUser(
+        #     room_id=room_id,
+        #     event_id=None,
+        #     event_pos=None,
+        #     membership=None,
+        #     sender=None,
+        #     newly_joined=False,
+        #     newly_left=False,
+        #     is_dm=False,
+        # )
+        # room_state = await self.get_current_state_at(
+        #     room_id=room_id,
+        #     room_membership_for_user_at_to_token=not_in_room_membership_for_user,
+        #     state_filter=StateFilter.from_types(
+        #         [(EventTypes.RoomHistoryVisibility, "")]
+        #     ),
+        #     to_token=to_token,
+        # )
+
+        # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
+        # if (
+        #     visibility_event is not None
+        #     and visibility_event.content.get("history_visibility")
+        #     == HistoryVisibility.WORLD_READABLE
+        # ):
+        #     return not_in_room_membership_for_user
+
+        # return None
 
     async def filter_rooms(
         self,
@@ -867,41 +1107,24 @@ class SlidingSyncHandler:
             A filtered dictionary of room IDs along with membership information in the
             room at the time of `to_token`.
         """
-        user_id = user.to_string()
-
-        # TODO: Apply filters
-
         filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
-            # We're using global account data (`m.direct`) instead of checking for
-            # `is_direct` on membership events because that property only appears for
-            # the invitee membership event (doesn't show up for the inviter). Account
-            # data is set by the client so it needs to be scrutinized.
-            #
-            # We're unable to take `to_token` into account for global account data since
-            # we only keep track of the latest account data for the user.
-            dm_map = await self.store.get_global_account_data_by_type_for_user(
-                user_id, AccountDataTypes.DIRECT
-            )
-
-            # Flatten out the map
-            dm_room_id_set = set()
-            if isinstance(dm_map, dict):
-                for room_ids in dm_map.values():
-                    # Account data should be a list of room IDs. Ignore anything else
-                    if isinstance(room_ids, list):
-                        for room_id in room_ids:
-                            if isinstance(room_id, str):
-                                dm_room_id_set.add(room_id)
-
             if filters.is_dm:
                 # Only DM rooms please
-                filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if sync_room_map[room_id].is_dm
+                }
             else:
                 # Only non-DM rooms please
-                filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if not sync_room_map[room_id].is_dm
+                }
 
         if filters.spaces:
             raise NotImplementedError()
@@ -953,11 +1176,15 @@ class SlidingSyncHandler:
         # provided in the list. `None` is a valid type for rooms which do not have a
         # room type.
         if filters.room_types is not None or filters.not_room_types is not None:
-            # Make a copy so we don't run into an error: `Set changed size during
-            # iteration`, when we filter out and remove items
-            for room_id in filtered_room_id_set.copy():
-                create_event = await self.store.get_create_event_for_room(room_id)
-                room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
+            room_to_type = await self.store.bulk_get_room_type(
+                {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    # We only know the room types for joined rooms
+                    if sync_room_map[room_id].membership == Membership.JOIN
+                }
+            )
+            for room_id, room_type in room_to_type.items():
                 if (
                     filters.room_types is not None
                     and room_type not in filters.room_types
@@ -1003,34 +1230,33 @@ class SlidingSyncHandler:
         # Assemble a map of room ID to the `stream_ordering` of the last activity that the
         # user should see in the room (<= `to_token`)
         last_activity_in_room_map: Dict[str, int] = {}
-        for room_id, room_for_user in sync_room_map.items():
-            # If they are fully-joined to the room, let's find the latest activity
-            # at/before the `to_token`.
-            if room_for_user.membership == Membership.JOIN:
-                last_event_result = (
-                    await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                        room_id, to_token.room_key
-                    )
-                )
-
-                # If the room has no events at/before the `to_token`, this is probably a
-                # mistake in the code that generates the `sync_room_map` since that should
-                # only give us rooms that the user had membership in during the token range.
-                assert last_event_result is not None
 
-                _, event_pos = last_event_result
-
-                last_activity_in_room_map[room_id] = event_pos.stream
-            else:
-                # Otherwise, if the user has left/been invited/knocked/been banned from
-                # a room, they shouldn't see anything past that point.
+        for room_id, room_for_user in sync_room_map.items():
+            if room_for_user.membership != Membership.JOIN:
+                # If the user has left/been invited/knocked/been banned from a
+                # room, they shouldn't see anything past that point.
                 #
-                # FIXME: It's possible that people should see beyond this point in
-                # invited/knocked cases if for example the room has
+                # FIXME: It's possible that people should see beyond this point
+                # in invited/knocked cases if for example the room has
                 # `invite`/`world_readable` history visibility, see
                 # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
                 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
 
+        # For fully-joined rooms, we find the latest activity at/before the
+        # `to_token`.
+        joined_room_positions = (
+            await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
+                [
+                    room_id
+                    for room_id, room_for_user in sync_room_map.items()
+                    if room_for_user.membership == Membership.JOIN
+                ],
+                to_token.room_key,
+            )
+        )
+
+        last_activity_in_room_map.update(joined_room_positions)
+
         return sorted(
             sync_room_map.values(),
             # Sort by the last activity (stream_ordering) in the room
@@ -1039,6 +1265,102 @@ class SlidingSyncHandler:
             reverse=True,
         )
 
+    async def get_current_state_ids_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[str]:
+        """
+        Get current state IDs for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids: StateMap[str]
+        # People shouldn't see past their leave/ban event
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: `get_state_ids_at(...)` doesn't take into account the "current state"
+            room_state_ids = await self.storage_controllers.state.get_state_ids_at(
+                room_id,
+                stream_position=to_token.copy_and_replace(
+                    StreamKeyType.ROOM,
+                    room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
+                ),
+                state_filter=state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+        # Otherwise, we can get the latest current state in the room
+        else:
+            room_state_ids = await self.storage_controllers.state.get_current_state_ids(
+                room_id,
+                state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+            # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
+
+        return room_state_ids
+
+    async def get_current_state_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[EventBase]:
+        """
+        Get current state for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=state_filter,
+            to_token=to_token,
+        )
+
+        event_map = await self.store.get_events(list(room_state_ids.values()))
+
+        state_map = {}
+        for key, event_id in room_state_ids.items():
+            event = event_map.get(event_id)
+            if event:
+                state_map[key] = event
+
+        return state_map
+
     async def get_room_sync_data(
         self,
         user: UserID,
@@ -1070,7 +1392,7 @@ class SlidingSyncHandler:
         # membership. Currently, we have to make all of these optional because
         # `invite`/`knock` rooms only have `stripped_state`. See
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
-        timeline_events: Optional[List[EventBase]] = None
+        timeline_events: List[EventBase] = []
         bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
         limited: Optional[bool] = None
         prev_batch_token: Optional[StreamToken] = None
@@ -1228,10 +1550,10 @@ class SlidingSyncHandler:
             stripped_state.append(strip_event(invite_or_knock_event))
 
         # TODO: Handle state resets. For example, if we see
-        # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
-        # `required_state` doesn't include it, we should indicate to the client that a
-        # state reset happened. Perhaps we should indicate this by setting `initial:
-        # True` and empty `required_state`.
+        # `room_membership_for_user_at_to_token.event_id=None and
+        # room_membership_for_user_at_to_token.membership is not None`, we should
+        # indicate to the client that a state reset happened. Perhaps we should indicate
+        # this by setting `initial: True` and empty `required_state`.
 
         # TODO: Since we can't determine whether we've already sent a room down this
         # Sliding Sync connection before (we plan to add this optimization in the
@@ -1239,7 +1561,45 @@ class SlidingSyncHandler:
         # updates.
         initial = True
 
-        # Fetch the required state for the room
+        # Check whether the room has a name set
+        name_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
+            to_token=to_token,
+        )
+        name_event_id = name_state_ids.get((EventTypes.Name, ""))
+
+        room_membership_summary: Mapping[str, MemberSummary]
+        empty_membership_summary = MemberSummary([], 0)
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: Figure out how to get the membership summary for left/banned rooms
+            room_membership_summary = {}
+        else:
+            room_membership_summary = await self.store.get_room_summary(room_id)
+            # TODO: Reverse/rewind back to the `to_token`
+
+        # `heroes` are required if the room name is not set.
+        #
+        # Note: When you're the first one on your server to be invited to a new room
+        # over federation, we only have access to some stripped state in
+        # `event.unsigned.invite_room_state` which currently doesn't include `heroes`,
+        # see https://github.com/matrix-org/matrix-spec/issues/380. This means that
+        # clients won't be able to calculate the room name when necessary and just a
+        # pitfall we have to deal with until that spec issue is resolved.
+        hero_user_ids: List[str] = []
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        if name_event_id is None:
+            hero_user_ids = extract_heroes_from_room_summary(
+                room_membership_summary, me=user.to_string()
+            )
+
+        # Fetch the `required_state` for the room
         #
         # No `required_state` for invite/knock rooms (just `stripped_state`)
         #
@@ -1247,13 +1607,13 @@ class SlidingSyncHandler:
         # of membership. Currently, we have to make this optional because
         # `invite`/`knock` rooms only have `stripped_state`. See
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
-        room_state: Optional[StateMap[EventBase]] = None
+        #
+        # Calculate the `StateFilter` based on the `required_state` for the room
+        required_state_filter = StateFilter.none()
         if room_membership_for_user_at_to_token.membership not in (
             Membership.INVITE,
             Membership.KNOCK,
         ):
-            # Calculate the `StateFilter` based on the `required_state` for the room
-            state_filter: Optional[StateFilter] = StateFilter.none()
             # If we have a double wildcard ("*", "*") in the `required_state`, we need
             # to fetch all state for the room
             #
@@ -1276,7 +1636,7 @@ class SlidingSyncHandler:
             if StateValues.WILDCARD in room_sync_config.required_state_map.get(
                 StateValues.WILDCARD, set()
             ):
-                state_filter = StateFilter.all()
+                required_state_filter = StateFilter.all()
             # TODO: `StateFilter` currently doesn't support wildcard event types. We're
             # currently working around this by returning all state to the client but it
             # would be nice to fetch less from the database and return just what the
@@ -1285,7 +1645,7 @@ class SlidingSyncHandler:
                 room_sync_config.required_state_map.get(StateValues.WILDCARD)
                 is not None
             ):
-                state_filter = StateFilter.all()
+                required_state_filter = StateFilter.all()
             else:
                 required_state_types: List[Tuple[str, Optional[str]]] = []
                 for (
@@ -1314,54 +1674,72 @@ class SlidingSyncHandler:
 
                             # FIXME: We probably also care about invite, ban, kick, targets, etc
                             # but the spec only mentions "senders".
+                        elif state_key == StateValues.ME:
+                            required_state_types.append((state_type, user.to_string()))
                         else:
                             required_state_types.append((state_type, state_key))
 
-                state_filter = StateFilter.from_types(required_state_types)
-
-            # We can skip fetching state if we don't need any
-            if state_filter != StateFilter.none():
-                # We can return all of the state that was requested if we're doing an
-                # initial sync
-                if initial:
-                    # People shouldn't see past their leave/ban event
-                    if room_membership_for_user_at_to_token.membership in (
-                        Membership.LEAVE,
-                        Membership.BAN,
-                    ):
-                        room_state = await self.storage_controllers.state.get_state_at(
-                            room_id,
-                            stream_position=to_token.copy_and_replace(
-                                StreamKeyType.ROOM,
-                                room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
-                            ),
-                            state_filter=state_filter,
-                            # Partially-stated rooms should have all state events except for
-                            # the membership events and since we've already excluded
-                            # partially-stated rooms unless `required_state` only has
-                            # `["m.room.member", "$LAZY"]` for membership, we should be able
-                            # to retrieve everything requested. Plus we don't want to block
-                            # the whole sync waiting for this one room.
-                            await_full_state=False,
-                        )
-                    # Otherwise, we can get the latest current state in the room
-                    else:
-                        room_state = await self.storage_controllers.state.get_current_state(
-                            room_id,
-                            state_filter,
-                            # Partially-stated rooms should have all state events except for
-                            # the membership events and since we've already excluded
-                            # partially-stated rooms unless `required_state` only has
-                            # `["m.room.member", "$LAZY"]` for membership, we should be able
-                            # to retrieve everything requested. Plus we don't want to block
-                            # the whole sync waiting for this one room.
-                            await_full_state=False,
-                        )
-                        # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
-                else:
-                    # TODO: Once we can figure out if we've sent a room down this connection before,
-                    # we can return updates instead of the full required state.
-                    raise NotImplementedError()
+                required_state_filter = StateFilter.from_types(required_state_types)
+
+        # We need this base set of info for the response so let's just fetch it along
+        # with the `required_state` for the room
+        meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
+            (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
+        ]
+        state_filter = StateFilter.all()
+        if required_state_filter != StateFilter.all():
+            state_filter = StateFilter(
+                types=StateFilter.from_types(
+                    chain(meta_room_state, required_state_filter.to_types())
+                ).types,
+                include_others=required_state_filter.include_others,
+            )
+
+        # We can return all of the state that was requested if this was the first
+        # time we've sent the room down this connection.
+        room_state: StateMap[EventBase] = {}
+        if initial:
+            room_state = await self.get_current_state_at(
+                room_id=room_id,
+                room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+                state_filter=state_filter,
+                to_token=to_token,
+            )
+        else:
+            # TODO: Once we can figure out if we've sent a room down this connection before,
+            # we can return updates instead of the full required state.
+            raise NotImplementedError()
+
+        required_room_state: StateMap[EventBase] = {}
+        if required_state_filter != StateFilter.none():
+            required_room_state = required_state_filter.filter_state(room_state)
+
+        # Find the room name and avatar from the state
+        room_name: Optional[str] = None
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        name_event = room_state.get((EventTypes.Name, ""))
+        if name_event is not None:
+            room_name = name_event.content.get("name")
+
+        room_avatar: Optional[str] = None
+        avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+        if avatar_event is not None:
+            room_avatar = avatar_event.content.get("url")
+
+        # Assemble heroes: extract the info from the state we just fetched
+        heroes: List[SlidingSyncResult.RoomResult.StrippedHero] = []
+        for hero_user_id in hero_user_ids:
+            member_event = room_state.get((EventTypes.Member, hero_user_id))
+            if member_event is not None:
+                heroes.append(
+                    SlidingSyncResult.RoomResult.StrippedHero(
+                        user_id=hero_user_id,
+                        display_name=member_event.content.get("displayname"),
+                        avatar_url=member_event.content.get("avatar_url"),
+                    )
+                )
 
         # Figure out the last bump event in the room
         last_bump_event_result = (
@@ -1378,16 +1756,12 @@ class SlidingSyncHandler:
             bump_stamp = bump_event_pos.stream
 
         return SlidingSyncResult.RoomResult(
-            # TODO: Dummy value
-            name=None,
-            # TODO: Dummy value
-            avatar=None,
-            # TODO: Dummy value
-            heroes=None,
-            # TODO: Dummy value
-            is_dm=False,
+            name=room_name,
+            avatar=room_avatar,
+            heroes=heroes,
+            is_dm=room_membership_for_user_at_to_token.is_dm,
             initial=initial,
-            required_state=list(room_state.values()) if room_state else None,
+            required_state=list(required_room_state.values()),
             timeline_events=timeline_events,
             bundled_aggregations=bundled_aggregations,
             stripped_state=stripped_state,
@@ -1395,12 +1769,178 @@ class SlidingSyncHandler:
             limited=limited,
             num_live=num_live,
             bump_stamp=bump_stamp,
-            # TODO: Dummy values
-            joined_count=0,
-            invited_count=0,
+            joined_count=room_membership_summary.get(
+                Membership.JOIN, empty_membership_summary
+            ).count,
+            invited_count=room_membership_summary.get(
+                Membership.INVITE, empty_membership_summary
+            ).count,
             # TODO: These are just dummy values. We could potentially just remove these
             # since notifications can only really be done correctly on the client anyway
             # (encrypted rooms).
             notification_count=0,
             highlight_count=0,
         )
+
+    async def get_extensions_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken],
+    ) -> SlidingSyncResult.Extensions:
+        """Handle extension requests.
+
+        Args:
+            sync_config: Sync configuration
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+
+        if sync_config.extensions is None:
+            return SlidingSyncResult.Extensions()
+
+        to_device_response = None
+        if sync_config.extensions.to_device is not None:
+            to_device_response = await self.get_to_device_extension_response(
+                sync_config=sync_config,
+                to_device_request=sync_config.extensions.to_device,
+                to_token=to_token,
+            )
+
+        e2ee_response = None
+        if sync_config.extensions.e2ee is not None:
+            e2ee_response = await self.get_e2ee_extension_response(
+                sync_config=sync_config,
+                e2ee_request=sync_config.extensions.e2ee,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
+        return SlidingSyncResult.Extensions(
+            to_device=to_device_response,
+            e2ee=e2ee_response,
+        )
+
+    async def get_to_device_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension,
+        to_token: StreamToken,
+    ) -> Optional[SlidingSyncResult.Extensions.ToDeviceExtension]:
+        """Handle to-device extension (MSC3885)
+
+        Args:
+            sync_config: Sync configuration
+            to_device_request: The to-device extension from the request
+            to_token: The point in the stream to sync up to.
+        """
+        user_id = sync_config.user.to_string()
+        device_id = sync_config.device_id
+
+        # Skip if the extension is not enabled
+        if not to_device_request.enabled:
+            return None
+
+        # Check that this request has a valid device ID (not all requests have
+        # to belong to a device, and so device_id is None)
+        if device_id is None:
+            return SlidingSyncResult.Extensions.ToDeviceExtension(
+                next_batch=f"{to_token.to_device_key}",
+                events=[],
+            )
+
+        since_stream_id = 0
+        if to_device_request.since is not None:
+            # We've already validated this is an int.
+            since_stream_id = int(to_device_request.since)
+
+            if to_token.to_device_key < since_stream_id:
+                # The since token is ahead of our current token, so we return an
+                # empty response.
+                logger.warning(
+                    "Got to-device.since from the future. since token: %r is ahead of our current to_device stream position: %r",
+                    since_stream_id,
+                    to_token.to_device_key,
+                )
+                return SlidingSyncResult.Extensions.ToDeviceExtension(
+                    next_batch=to_device_request.since,
+                    events=[],
+                )
+
+            # Delete everything before the given since token, as we know the
+            # device must have received them.
+            deleted = await self.store.delete_messages_for_device(
+                user_id=user_id,
+                device_id=device_id,
+                up_to_stream_id=since_stream_id,
+            )
+
+            logger.debug(
+                "Deleted %d to-device messages up to %d for %s",
+                deleted,
+                since_stream_id,
+                user_id,
+            )
+
+        messages, stream_id = await self.store.get_messages_for_device(
+            user_id=user_id,
+            device_id=device_id,
+            from_stream_id=since_stream_id,
+            to_stream_id=to_token.to_device_key,
+            limit=min(to_device_request.limit, 100),  # Limit to at most 100 events
+        )
+
+        return SlidingSyncResult.Extensions.ToDeviceExtension(
+            next_batch=f"{stream_id}",
+            events=messages,
+        )
+
+    async def get_e2ee_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
+        """Handle E2EE device extension (MSC3884)
+
+        Args:
+            sync_config: Sync configuration
+            e2ee_request: The e2ee extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        user_id = sync_config.user.to_string()
+        device_id = sync_config.device_id
+
+        # Skip if the extension is not enabled
+        if not e2ee_request.enabled:
+            return None
+
+        device_list_updates: Optional[DeviceListUpdates] = None
+        if from_token is not None:
+            # TODO: This should take into account the `from_token` and `to_token`
+            device_list_updates = await self.device_handler.get_user_ids_changed(
+                user_id=user_id,
+                from_token=from_token,
+            )
+
+        device_one_time_keys_count: Mapping[str, int] = {}
+        device_unused_fallback_key_types: Sequence[str] = []
+        if device_id:
+            # TODO: We should have a way to let clients differentiate between the states of:
+            #   * no change in OTK count since the provided since token
+            #   * the server has zero OTKs left for this device
+            #  Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
+            device_one_time_keys_count = await self.store.count_e2e_one_time_keys(
+                user_id, device_id
+            )
+            device_unused_fallback_key_types = (
+                await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
+            )
+
+        return SlidingSyncResult.Extensions.E2eeExtension(
+            device_list_updates=device_list_updates,
+            device_one_time_keys_count=device_one_time_keys_count,
+            device_unused_fallback_key_types=device_unused_fallback_key_types,
+        )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index de227faec3..ede014180c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2270,7 +2270,11 @@ class SyncHandler:
             user=user,
             from_key=presence_key,
             is_guest=sync_config.is_guest,
-            include_offline=include_offline,
+            include_offline=(
+                True
+                if self.hs_config.server.presence_include_offline_users_on_sync
+                else include_offline
+            ),
         )
         assert presence_key
         sync_result_builder.now_token = now_token.copy_and_replace(
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 749b01dd0e..6fd75fd381 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -90,7 +90,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.types import JsonDict
 from synapse.util import json_decoder
-from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
+from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.util.stringutils import parse_and_validate_server_name
 
@@ -475,6 +475,8 @@ class MatrixFederationHttpClient:
             use_proxy=True,
         )
 
+        self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)
+
     def wake_destination(self, destination: str) -> None:
         """Called when the remote server may have come back online."""
 
@@ -1486,35 +1488,44 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         try:
-            # add a byte of headroom to max size as function errs at >=
-            d = read_body_with_max_size(response, output_stream, expected_size + 1)
-            d.addTimeout(self.default_timeout_seconds, self.reactor)
-            length = await make_deferred_yieldable(d)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as function errs at >=
+                d = read_body_with_max_size(response, output_stream, expected_size + 1)
+                d.addTimeout(self.default_timeout_seconds, self.reactor)
+                length = await make_deferred_yieldable(d)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1560,6 +1571,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers
 
     async def federation_get_file(
@@ -1630,29 +1648,37 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         # this should be a multipart/mixed response with the boundary string in the header
         try:
@@ -1672,11 +1698,12 @@ class MatrixFederationHttpClient:
             raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
 
         try:
-            # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
-            deferred = read_multipart_response(
-                response, output_stream, boundary, expected_size + 1
-            )
-            deferred.addTimeout(self.default_timeout_seconds, self.reactor)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
+                deferred = read_multipart_response(
+                    response, output_stream, boundary, expected_size + 1
+                )
+                deferred.addTimeout(self.default_timeout_seconds, self.reactor)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1743,6 +1770,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers, multipart_response.json
 
 
diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py
index 87c929eb20..8bc92305fe 100644
--- a/synapse/media/media_repository.py
+++ b/synapse/media/media_repository.py
@@ -430,6 +430,7 @@ class MediaRepository:
         media_id: str,
         name: Optional[str],
         max_timeout_ms: int,
+        allow_authenticated: bool = True,
         federation: bool = False,
     ) -> None:
         """Responds to requests for local media, if exists, or returns 404.
@@ -442,6 +443,7 @@ class MediaRepository:
                 the filename in the Content-Disposition header of the response.
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            allow_authenticated: whether media marked as authenticated may be served to this request
             federation: whether the local media being fetched is for a federation request
 
         Returns:
@@ -451,6 +453,10 @@ class MediaRepository:
         if not media_info:
             return
 
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         self.mark_recently_accessed(None, media_id)
 
         media_type = media_info.media_type
@@ -481,6 +487,7 @@ class MediaRepository:
         max_timeout_ms: int,
         ip_address: str,
         use_federation_endpoint: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         """Respond to requests for remote media.
 
@@ -495,6 +502,8 @@ class MediaRepository:
             ip_address: the IP address of the requester
             use_federation_endpoint: whether to request the remote media over the new
                 federation `/download` endpoint
+            allow_authenticated: whether media marked as authenticated may be served to this
+                request
 
         Returns:
             Resolves once a response has successfully been written to request
@@ -526,6 +535,7 @@ class MediaRepository:
                 self.download_ratelimiter,
                 ip_address,
                 use_federation_endpoint,
+                allow_authenticated,
             )
 
         # We deliberately stream the file outside the lock
@@ -548,6 +558,7 @@ class MediaRepository:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool,
     ) -> RemoteMedia:
         """Gets the media info associated with the remote file, downloading
         if necessary.
@@ -560,6 +571,8 @@ class MediaRepository:
             ip_address: IP address of the requester
             use_federation: if a download is necessary, whether to request the remote file
                 over the federation `/download` endpoint
+            allow_authenticated: whether media marked as authenticated may be served to this
+                request
 
         Returns:
             The media info of the file
@@ -581,6 +594,7 @@ class MediaRepository:
                 self.download_ratelimiter,
                 ip_address,
                 use_federation,
+                allow_authenticated,
             )
 
         # Ensure we actually use the responder so that it releases resources
@@ -598,6 +612,7 @@ class MediaRepository:
         download_ratelimiter: Ratelimiter,
         ip_address: str,
         use_federation_endpoint: bool,
+        allow_authenticated: bool,
     ) -> Tuple[Optional[Responder], RemoteMedia]:
         """Looks for media in local cache, if not there then attempt to
         download from remote server.
@@ -619,6 +634,11 @@ class MediaRepository:
         """
         media_info = await self.store.get_cached_remote_media(server_name, media_id)
 
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            # if it isn't cached then don't fetch it or if it's authenticated then don't serve it
+            if not media_info or media_info.authenticated:
+                raise NotFoundError()
+
         # file_id is the ID we use to track the file locally. If we've already
         # seen the file then reuse the existing ID, otherwise generate a new
         # one.
@@ -792,6 +812,11 @@ class MediaRepository:
 
         logger.info("Stored remote media in file %r", fname)
 
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         return RemoteMedia(
             media_origin=server_name,
             media_id=media_id,
@@ -802,6 +827,7 @@ class MediaRepository:
             filesystem_id=file_id,
             last_access_ts=time_now_ms,
             quarantined_by=None,
+            authenticated=authenticated,
         )
 
     async def _federation_download_remote_file(
@@ -915,6 +941,11 @@ class MediaRepository:
 
         logger.debug("Stored remote media in file %r", fname)
 
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         return RemoteMedia(
             media_origin=server_name,
             media_id=media_id,
@@ -925,6 +956,7 @@ class MediaRepository:
             filesystem_id=file_id,
             last_access_ts=time_now_ms,
             quarantined_by=None,
+            authenticated=authenticated,
         )
 
     def _get_thumbnail_requirements(
@@ -1030,7 +1062,12 @@ class MediaRepository:
             t_len = os.path.getsize(output_path)
 
             await self.store.store_local_thumbnail(
-                media_id, t_width, t_height, t_type, t_method, t_len
+                media_id,
+                t_width,
+                t_height,
+                t_type,
+                t_method,
+                t_len,
             )
 
             return output_path
diff --git a/synapse/media/thumbnailer.py b/synapse/media/thumbnailer.py
index 413a720e40..ef6aa8ccf5 100644
--- a/synapse/media/thumbnailer.py
+++ b/synapse/media/thumbnailer.py
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple, Type
 
 from PIL import Image
 
-from synapse.api.errors import Codes, SynapseError, cs_error
+from synapse.api.errors import Codes, NotFoundError, SynapseError, cs_error
 from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP
 from synapse.http.server import respond_with_json
 from synapse.http.site import SynapseRequest
@@ -274,6 +274,7 @@ class ThumbnailProvider:
         m_type: str,
         max_timeout_ms: int,
         for_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_local_media_info(
             request, media_id, max_timeout_ms
@@ -281,6 +282,12 @@ class ThumbnailProvider:
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
         await self._select_and_respond_with_thumbnail(
             request,
@@ -307,14 +314,20 @@ class ThumbnailProvider:
         desired_type: str,
         max_timeout_ms: int,
         for_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_local_media_info(
             request, media_id, max_timeout_ms
         )
-
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
         for info in thumbnail_infos:
             t_w = info.width == desired_width
@@ -381,14 +394,27 @@ class ThumbnailProvider:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms, ip_address, use_federation
+            server_name,
+            media_id,
+            max_timeout_ms,
+            ip_address,
+            use_federation,
+            allow_authenticated,
         )
         if not media_info:
             respond_404(request)
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                respond_404(request)
+                return
+
         thumbnail_infos = await self.store.get_remote_media_thumbnails(
             server_name, media_id
         )
@@ -446,16 +472,28 @@ class ThumbnailProvider:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         # TODO: Don't download the whole remote file
         # We should proxy the thumbnail from the remote server instead of
         # downloading the remote file and generating our own thumbnails.
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms, ip_address, use_federation
+            server_name,
+            media_id,
+            max_timeout_ms,
+            ip_address,
+            use_federation,
+            allow_authenticated,
         )
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_remote_media_thumbnails(
             server_name, media_id
         )
@@ -485,8 +523,8 @@ class ThumbnailProvider:
         file_id: str,
         url_cache: bool,
         for_federation: bool,
-        server_name: Optional[str] = None,
         media_info: Optional[LocalMedia] = None,
+        server_name: Optional[str] = None,
     ) -> None:
         """
         Respond to a request with an appropriate thumbnail from the previously generated thumbnails.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c3ecf86ec4..7a2b54036c 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -773,6 +773,7 @@ class Notifier:
         stream_token = await self.event_sources.bound_future_token(stream_token)
 
         start = self.clock.time_msec()
+        logged = False
         while True:
             current_token = self.event_sources.get_current_token()
             if stream_token.is_before_or_eq(current_token):
@@ -783,11 +784,13 @@ class Notifier:
             if now - start > 10_000:
                 return False
 
-            logger.info(
-                "Waiting for current token to reach %s; currently at %s",
-                stream_token,
-                current_token,
-            )
+            if not logged:
+                logger.info(
+                    "Waiting for current token to reach %s; currently at %s",
+                    stream_token,
+                    current_token,
+                )
+                logged = True
 
             # TODO: be better
             await self.clock.sleep(0.5)
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index 67de634eab..eddad7d5b8 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -256,9 +256,15 @@ class KeyChangesServlet(RestServlet):
 
         user_id = requester.user.to_string()
 
-        results = await self.device_handler.get_user_ids_changed(user_id, from_token)
+        device_list_updates = await self.device_handler.get_user_ids_changed(
+            user_id, from_token
+        )
+
+        response: JsonDict = {}
+        response["changed"] = list(device_list_updates.changed)
+        response["left"] = list(device_list_updates.left)
 
-        return 200, results
+        return 200, response
 
 
 class OneTimeKeyServlet(RestServlet):
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 13aed1dc85..93fe1d439e 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -942,7 +942,9 @@ class SlidingSyncRestServlet(RestServlet):
         response["rooms"] = await self.encode_rooms(
             requester, sliding_sync_result.rooms
         )
-        response["extensions"] = {}  # TODO: sliding_sync_result.extensions
+        response["extensions"] = await self.encode_extensions(
+            requester, sliding_sync_result.extensions
+        )
 
         return response
 
@@ -995,8 +997,21 @@ class SlidingSyncRestServlet(RestServlet):
             if room_result.avatar:
                 serialized_rooms[room_id]["avatar"] = room_result.avatar
 
-            if room_result.heroes:
-                serialized_rooms[room_id]["heroes"] = room_result.heroes
+            if room_result.heroes is not None and len(room_result.heroes) > 0:
+                serialized_heroes = []
+                for hero in room_result.heroes:
+                    serialized_hero = {
+                        "user_id": hero.user_id,
+                    }
+                    if hero.display_name is not None:
+                        # Not a typo, just how "displayname" is spelled in the spec
+                        serialized_hero["displayname"] = hero.display_name
+
+                    if hero.avatar_url is not None:
+                        serialized_hero["avatar_url"] = hero.avatar_url
+
+                    serialized_heroes.append(serialized_hero)
+                serialized_rooms[room_id]["heroes"] = serialized_heroes
 
             # We should only include the `initial` key if it's `True` to save bandwidth.
             # The absense of this flag means `False`.
@@ -1004,7 +1019,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["initial"] = room_result.initial
 
             # This will be omitted for invite/knock rooms with `stripped_state`
-            if room_result.required_state is not None:
+            if (
+                room_result.required_state is not None
+                and len(room_result.required_state) > 0
+            ):
                 serialized_required_state = (
                     await self.event_serializer.serialize_events(
                         room_result.required_state,
@@ -1015,7 +1033,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["required_state"] = serialized_required_state
 
             # This will be omitted for invite/knock rooms with `stripped_state`
-            if room_result.timeline_events is not None:
+            if (
+                room_result.timeline_events is not None
+                and len(room_result.timeline_events) > 0
+            ):
                 serialized_timeline = await self.event_serializer.serialize_events(
                     room_result.timeline_events,
                     time_now,
@@ -1043,7 +1064,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["is_dm"] = room_result.is_dm
 
             # Stripped state only applies to invite/knock rooms
-            if room_result.stripped_state is not None:
+            if (
+                room_result.stripped_state is not None
+                and len(room_result.stripped_state) > 0
+            ):
                 # TODO: `knocked_state` but that isn't specced yet.
                 #
                 # TODO: Instead of adding `knocked_state`, it would be good to rename
@@ -1054,6 +1078,45 @@ class SlidingSyncRestServlet(RestServlet):
 
         return serialized_rooms
 
+    async def encode_extensions(
+        self, requester: Requester, extensions: SlidingSyncResult.Extensions
+    ) -> JsonDict:
+        serialized_extensions: JsonDict = {}
+
+        if extensions.to_device is not None:
+            serialized_extensions["to_device"] = {
+                "next_batch": extensions.to_device.next_batch,
+                "events": extensions.to_device.events,
+            }
+
+        if extensions.e2ee is not None:
+            serialized_extensions["e2ee"] = {
+                # We always include this because
+                # https://github.com/vector-im/element-android/issues/3725. The spec
+                # isn't terribly clear on when this can be omitted and how a client
+                # would tell the difference between "no keys present" and "nothing
+                # changed" in terms of whole field absent / individual key type entry
+                # absent Corresponding synapse issue:
+                # https://github.com/matrix-org/synapse/issues/10456
+                "device_one_time_keys_count": extensions.e2ee.device_one_time_keys_count,
+                # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
+                # states that this field should always be included, as long as the
+                # server supports the feature.
+                "device_unused_fallback_key_types": extensions.e2ee.device_unused_fallback_key_types,
+            }
+
+            if extensions.e2ee.device_list_updates is not None:
+                serialized_extensions["e2ee"]["device_lists"] = {}
+
+                serialized_extensions["e2ee"]["device_lists"]["changed"] = list(
+                    extensions.e2ee.device_list_updates.changed
+                )
+                serialized_extensions["e2ee"]["device_lists"]["left"] = list(
+                    extensions.e2ee.device_list_updates.left
+                )
+
+        return serialized_extensions
+
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     SyncRestServlet(hs).register(http_server)
diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py
index c32c626905..3c3f703667 100644
--- a/synapse/rest/media/download_resource.py
+++ b/synapse/rest/media/download_resource.py
@@ -84,7 +84,7 @@ class DownloadResource(RestServlet):
 
         if self._is_mine_server_name(server_name):
             await self.media_repo.get_local_media(
-                request, media_id, file_name, max_timeout_ms
+                request, media_id, file_name, max_timeout_ms, allow_authenticated=False
             )
         else:
             allow_remote = parse_boolean(request, "allow_remote", default=True)
@@ -106,4 +106,5 @@ class DownloadResource(RestServlet):
                 max_timeout_ms,
                 ip_address,
                 False,
+                allow_authenticated=False,
             )
diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py
index 70354aa439..536fea4c32 100644
--- a/synapse/rest/media/thumbnail_resource.py
+++ b/synapse/rest/media/thumbnail_resource.py
@@ -96,6 +96,7 @@ class ThumbnailResource(RestServlet):
                     m_type,
                     max_timeout_ms,
                     False,
+                    allow_authenticated=False,
                 )
             else:
                 await self.thumbnail_provider.respond_local_thumbnail(
@@ -107,6 +108,7 @@ class ThumbnailResource(RestServlet):
                     m_type,
                     max_timeout_ms,
                     False,
+                    allow_authenticated=False,
                 )
             self.media_repo.mark_recently_accessed(None, media_id)
         else:
@@ -134,6 +136,7 @@ class ThumbnailResource(RestServlet):
                 m_type,
                 max_timeout_ms,
                 ip_address,
-                False,
+                use_federation=False,
+                allow_authenticated=False,
             )
             self.media_repo.mark_recently_accessed(server_name, media_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 881888fa93..066f3d08ae 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -120,6 +120,9 @@ class SQLBaseStore(metaclass=ABCMeta):
                 "get_user_in_room_with_profile", (room_id, user_id)
             )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
+            self._attempt_to_invalidate_cache(
+                "_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
+            )
 
         # Purge other caches based on room state.
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
@@ -146,6 +149,9 @@ class SQLBaseStore(metaclass=ABCMeta):
         self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
         self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache(
+            "_get_rooms_for_local_user_where_membership_is_inner", None
+        )
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
 
     def _attempt_to_invalidate_cache(
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2d6b75e47e..26b8e1a172 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -331,6 +331,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 "get_invited_rooms_for_local_user", (state_key,)
             )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
+            self._attempt_to_invalidate_cache(
+                "_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
+            )
 
             self._attempt_to_invalidate_cache(
                 "did_forget",
@@ -393,6 +396,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
         self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache(
+            "_get_rooms_for_local_user_where_membership_is_inner", None
+        )
         self._attempt_to_invalidate_cache("did_forget", None)
         self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
         self._attempt_to_invalidate_cache("get_references_for_event", None)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 24abab4a23..715846865b 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1313,6 +1313,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         # We want to make the cache more effective, so we clamp to the last
         # change before the given ordering.
         last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)  # type: ignore[attr-defined]
+        if last_change is None:
+            # If the room isn't in the cache we know that the last change was
+            # somewhere before the earliest known position of the cache, so we
+            # can clamp to that.
+            last_change = self._events_stream_cache.get_earliest_known_position()  # type: ignore[attr-defined]
 
         # We don't always have a full stream_to_exterm_id table, e.g. after
         # the upgrade that introduced it, so we make sure we never ask for a
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 6128332af8..7617fd3ad4 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -64,6 +64,7 @@ class LocalMedia:
     quarantined_by: Optional[str]
     safe_from_quarantine: bool
     user_id: Optional[str]
+    authenticated: Optional[bool]
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -77,6 +78,7 @@ class RemoteMedia:
     created_ts: int
     last_access_ts: int
     quarantined_by: Optional[str]
+    authenticated: Optional[bool]
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -218,6 +220,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "last_access_ts",
                 "safe_from_quarantine",
                 "user_id",
+                "authenticated",
             ),
             allow_none=True,
             desc="get_local_media",
@@ -235,6 +238,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             last_access_ts=row[6],
             safe_from_quarantine=row[7],
             user_id=row[8],
+            authenticated=row[9],
         )
 
     async def get_local_media_by_user_paginate(
@@ -290,7 +294,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     last_access_ts,
                     quarantined_by,
                     safe_from_quarantine,
-                    user_id
+                    user_id,
+                    authenticated
                 FROM local_media_repository
                 WHERE user_id = ?
                 ORDER BY {order_by_column} {order}, media_id ASC
@@ -314,6 +319,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     quarantined_by=row[7],
                     safe_from_quarantine=bool(row[8]),
                     user_id=row[9],
+                    authenticated=row[10],
                 )
                 for row in txn
             ]
@@ -417,12 +423,18 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         time_now_ms: int,
         user_id: UserID,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "local_media_repository",
             {
                 "media_id": media_id,
                 "created_ts": time_now_ms,
                 "user_id": user_id.to_string(),
+                "authenticated": authenticated,
             },
             desc="store_local_media_id",
         )
@@ -438,6 +450,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         user_id: UserID,
         url_cache: Optional[str] = None,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "local_media_repository",
             {
@@ -448,6 +465,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "media_length": media_length,
                 "user_id": user_id.to_string(),
                 "url_cache": url_cache,
+                "authenticated": authenticated,
             },
             desc="store_local_media",
         )
@@ -638,6 +656,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "filesystem_id",
                 "last_access_ts",
                 "quarantined_by",
+                "authenticated",
             ),
             allow_none=True,
             desc="get_cached_remote_media",
@@ -654,6 +673,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             filesystem_id=row[4],
             last_access_ts=row[5],
             quarantined_by=row[6],
+            authenticated=row[7],
         )
 
     async def store_cached_remote_media(
@@ -666,6 +686,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         upload_name: Optional[str],
         filesystem_id: str,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "remote_media_cache",
             {
@@ -677,6 +702,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "upload_name": upload_name,
                 "filesystem_id": filesystem_id,
                 "last_access_ts": time_now_ms,
+                "authenticated": authenticated,
             },
             desc="store_cached_remote_media",
         )
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 5d2fd08495..640ab123f0 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -279,8 +279,19 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
     @cached(max_entries=100000)  # type: ignore[synapse-@cached-mutable]
     async def get_room_summary(self, room_id: str) -> Mapping[str, MemberSummary]:
-        """Get the details of a room roughly suitable for use by the room
+        """
+        Get the details of a room roughly suitable for use by the room
         summary extension to /sync. Useful when lazy loading room members.
+
+        Returns the total count of members in the room by membership type, and a
+        truncated list of members (the heroes). This will be the first 6 members of the
+        room:
+        - We want 5 heroes plus 1, in case one of them is the
+        calling user.
+        - They are ordered by `stream_ordering`, which are joined or
+        invited. When no joined or invited members are available, this also includes
+        banned and left users.
+
         Args:
             room_id: The room ID to query
         Returns:
@@ -308,23 +319,36 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             for count, membership in txn:
                 res.setdefault(membership, MemberSummary([], count))
 
-            # we order by membership and then fairly arbitrarily by event_id so
-            # heroes are consistent
-            # Note, rejected events will have a null membership field, so
-            # we we manually filter them out.
+            # Order by membership (joins -> invites -> leave (former insiders) ->
+            # everything else (outsiders like bans/knocks), then by `stream_ordering` so
+            # the first members in the room show up first and to make the sort stable
+            # (consistent heroes).
+            #
+            # Note: rejected events will have a null membership field, so we we manually
+            # filter them out.
             sql = """
                 SELECT state_key, membership, event_id
                 FROM current_state_events
                 WHERE type = 'm.room.member' AND room_id = ?
                     AND membership IS NOT NULL
                 ORDER BY
-                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                    event_id ASC
+                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 WHEN ? THEN 3 ELSE 4 END ASC,
+                    event_stream_ordering ASC
                 LIMIT ?
             """
 
-            # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
-            txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    # Sort order
+                    Membership.JOIN,
+                    Membership.INVITE,
+                    Membership.LEAVE,
+                    # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
+                    6,
+                ),
+            )
             for user_id, membership, event_id in txn:
                 summary = res[membership]
                 # we will always have a summary for this membership type at this
@@ -421,9 +445,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         if not membership_list:
             return []
 
-        rooms = await self.db_pool.runInteraction(
-            "get_rooms_for_local_user_where_membership_is",
-            self._get_rooms_for_local_user_where_membership_is_txn,
+        # Convert membership list to frozen set as a) it needs to be hashable,
+        # and b) we don't care about the order.
+        membership_list = frozenset(membership_list)
+
+        rooms = await self._get_rooms_for_local_user_where_membership_is_inner(
             user_id,
             membership_list,
         )
@@ -442,6 +468,24 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
         return [room for room in rooms if room.room_id not in rooms_to_exclude]
 
+    @cached(max_entries=1000, tree=True)
+    async def _get_rooms_for_local_user_where_membership_is_inner(
+        self,
+        user_id: str,
+        membership_list: Collection[str],
+    ) -> Sequence[RoomsForUser]:
+        if not membership_list:
+            return []
+
+        rooms = await self.db_pool.runInteraction(
+            "get_rooms_for_local_user_where_membership_is",
+            self._get_rooms_for_local_user_where_membership_is_txn,
+            user_id,
+            membership_list,
+        )
+
+        return rooms
+
     def _get_rooms_for_local_user_where_membership_is_txn(
         self,
         txn: LoggingTransaction,
@@ -1509,10 +1553,19 @@ def extract_heroes_from_room_summary(
 ) -> List[str]:
     """Determine the users that represent a room, from the perspective of the `me` user.
 
+    This function expects `MemberSummary.members` to already be sorted by
+    `stream_ordering` like the results from `get_room_summary(...)`.
+
     The rules which say which users we select are specified in the "Room Summary"
     section of
     https://spec.matrix.org/v1.4/client-server-api/#get_matrixclientv3sync
 
+
+    Args:
+        details: Mapping from membership type to member summary. We expect
+            `MemberSummary.members` to already be sorted by `stream_ordering`.
+        me: The user for whom we are determining the heroes for.
+
     Returns a list (possibly empty) of heroes' mxids.
     """
     empty_ms = MemberSummary([], 0)
@@ -1527,11 +1580,11 @@ def extract_heroes_from_room_summary(
         r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
     ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
 
-    # FIXME: order by stream ordering rather than as returned by SQL
+    # We expect `MemberSummary.members` to already be sorted by `stream_ordering`
     if joined_user_ids or invited_user_ids:
-        return sorted(joined_user_ids + invited_user_ids)[0:5]
+        return (joined_user_ids + invited_user_ids)[0:5]
     else:
-        return sorted(gone_user_ids)[0:5]
+        return gone_user_ids[0:5]
 
 
 @attr.s(slots=True, auto_attribs=True)
diff --git a/synapse/storage/databases/main/state.py b/synapse/storage/databases/main/state.py
index b2a67aff89..5188b2f7a4 100644
--- a/synapse/storage/databases/main/state.py
+++ b/synapse/storage/databases/main/state.py
@@ -41,7 +41,7 @@ from typing import (
 
 import attr
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventContentFields, EventTypes, Membership
 from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
@@ -298,6 +298,56 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         create_event = await self.get_event(create_id)
         return create_event
 
+    @cached(max_entries=10000)
+    async def get_room_type(self, room_id: str) -> Optional[str]:
+        """Get the room type for a given room. The server must be joined to the
+        given room.
+        """
+
+        row = await self.db_pool.simple_select_one(
+            table="room_stats_state",
+            keyvalues={"room_id": room_id},
+            retcols=("room_type",),
+            allow_none=True,
+            desc="get_room_type",
+        )
+
+        if row is not None:
+            return row[0]
+
+        # If we haven't updated `room_stats_state` with the room yet, query the
+        # create event directly.
+        create_event = await self.get_create_event_for_room(room_id)
+        room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
+        return room_type
+
+    @cachedList(cached_method_name="get_room_type", list_name="room_ids")
+    async def bulk_get_room_type(
+        self, room_ids: Set[str]
+    ) -> Mapping[str, Optional[str]]:
+        """Bulk fetch room types for the given rooms, the server must be in all
+        the rooms given.
+        """
+
+        rows = await self.db_pool.simple_select_many_batch(
+            table="room_stats_state",
+            column="room_id",
+            iterable=room_ids,
+            retcols=("room_id", "room_type"),
+            desc="bulk_get_room_type",
+        )
+
+        # If we haven't updated `room_stats_state` with the room yet, query the
+        # create events directly. This should happen only rarely so we don't
+        # mind if we do this in a loop.
+        results = dict(rows)
+        for room_id in room_ids - results.keys():
+            create_event = await self.get_create_event_for_room(room_id)
+            room_type = create_event.content.get(EventContentFields.ROOM_TYPE)
+            results[room_id] = room_type
+
+        return results
+
     @cached(max_entries=100000, iterable=True)
     async def get_partial_current_state_ids(self, room_id: str) -> StateMap[str]:
         """Get the current state event ids for a room based on the
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e74e0d2e91..b034361aec 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -78,10 +78,11 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken
+from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
+from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -1293,6 +1294,126 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             get_last_event_pos_in_room_before_stream_ordering_txn,
         )
 
+    async def bulk_get_last_event_pos_in_room_before_stream_ordering(
+        self,
+        room_ids: StrCollection,
+        end_token: RoomStreamToken,
+    ) -> Dict[str, int]:
+        """Bulk fetch the stream position of the latest events in the given
+        rooms
+        """
+
+        min_token = end_token.stream
+        max_token = end_token.get_max_stream_pos()
+        results: Dict[str, int] = {}
+
+        # First, we check for the rooms in the stream change cache to see if we
+        # can just use the latest position from it.
+        missing_room_ids: Set[str] = set()
+        for room_id in room_ids:
+            stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
+            if stream_pos and stream_pos <= min_token:
+                results[room_id] = stream_pos
+            else:
+                missing_room_ids.add(room_id)
+
+        # Next, we query the stream position from the DB. At first we fetch all
+        # positions less than the *max* stream pos in the token, then filter
+        # them down. We do this as a) this is a cheaper query, and b) the vast
+        # majority of rooms will have a latest token from before the min stream
+        # pos.
+
+        def bulk_get_last_event_pos_txn(
+            txn: LoggingTransaction, batch_room_ids: StrCollection
+        ) -> Dict[str, int]:
+            # This query fetches the latest stream position in the rooms before
+            # the given max position.
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", batch_room_ids
+            )
+            sql = f"""
+                SELECT room_id, (
+                    SELECT stream_ordering FROM events AS e
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE e.room_id = r.room_id
+                        AND stream_ordering <= ?
+                        AND NOT outlier
+                        AND rejection_reason IS NULL
+                    ORDER BY stream_ordering DESC
+                    LIMIT 1
+                )
+                FROM rooms AS r
+                WHERE {clause}
+            """
+            txn.execute(sql, [max_token] + args)
+            return {row[0]: row[1] for row in txn}
+
+        recheck_rooms: Set[str] = set()
+        for batched in batch_iter(missing_room_ids, 1000):
+            result = await self.db_pool.runInteraction(
+                "bulk_get_last_event_pos_in_room_before_stream_ordering",
+                bulk_get_last_event_pos_txn,
+                batched,
+            )
+
+            # Check that the stream position for the rooms are from before the
+            # minimum position of the token. If not then we need to fetch more
+            # rows.
+            for room_id, stream in result.items():
+                if stream <= min_token:
+                    results[room_id] = stream
+                else:
+                    recheck_rooms.add(room_id)
+
+        if not recheck_rooms:
+            return results
+
+        # For the remaining rooms we need to fetch all rows between the min and
+        # max stream positions in the end token, and filter out the rows that
+        # are after the end token.
+        #
+        # This query should be fast as the range between the min and max should
+        # be small.
+
+        def bulk_get_last_event_pos_recheck_txn(
+            txn: LoggingTransaction, batch_room_ids: StrCollection
+        ) -> Dict[str, int]:
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", batch_room_ids
+            )
+            sql = f"""
+                SELECT room_id, instance_name, stream_ordering
+                FROM events
+                WHERE ? < stream_ordering AND stream_ordering <= ?
+                    AND NOT outlier
+                    AND rejection_reason IS NULL
+                    AND {clause}
+                ORDER BY stream_ordering ASC
+            """
+            txn.execute(sql, [min_token, max_token] + args)
+
+            # We take the max stream ordering that is less than the token. Since
+            # we ordered by stream ordering we just need to iterate through and
+            # take the last matching stream ordering.
+            txn_results: Dict[str, int] = {}
+            for row in txn:
+                room_id = row[0]
+                event_pos = PersistedEventPosition(row[1], row[2])
+                if not event_pos.persisted_after(end_token):
+                    txn_results[room_id] = event_pos.stream
+
+            return txn_results
+
+        for batched in batch_iter(recheck_rooms, 1000):
+            recheck_result = await self.db_pool.runInteraction(
+                "bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
+                bulk_get_last_event_pos_recheck_txn,
+                batched,
+            )
+            results.update(recheck_result)
+
+        return results
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: str
     ) -> RoomStreamToken:
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 0dc5d24249..581d00346b 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -19,7 +19,7 @@
 #
 #
 
-SCHEMA_VERSION = 85  # remember to update the list below when updating
+SCHEMA_VERSION = 86  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -139,6 +139,9 @@ Changes in SCHEMA_VERSION = 84
 
 Changes in SCHEMA_VERSION = 85
     - Add a column `suspended` to the `users` table
+
+Changes in SCHEMA_VERSION = 86
+    - Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
 """
 
 
diff --git a/synapse/storage/schema/main/delta/86/01_authenticate_media.sql b/synapse/storage/schema/main/delta/86/01_authenticate_media.sql
new file mode 100644
index 0000000000..c1ac01ae95
--- /dev/null
+++ b/synapse/storage/schema/main/delta/86/01_authenticate_media.sql
@@ -0,0 +1,15 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+ALTER TABLE remote_media_cache ADD COLUMN authenticated BOOLEAN DEFAULT FALSE NOT NULL;
+ALTER TABLE local_media_repository ADD COLUMN authenticated BOOLEAN DEFAULT FALSE NOT NULL;
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index b22a13ef01..c0d30ac2a3 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -20,6 +20,7 @@
 #
 #
 import abc
+import logging
 import re
 import string
 from enum import Enum
@@ -74,6 +75,9 @@ if TYPE_CHECKING:
     from synapse.storage.databases.main import DataStore, PurgeEventsStore
     from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
 
+
+logger = logging.getLogger(__name__)
+
 # Define a state map type from type/state_key to T (usually an event ID or
 # event)
 T = TypeVar("T")
@@ -454,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     represented by a default `stream` attribute and a map of instance name to
     stream position of any writers that are ahead of the default stream
     position.
+
+    The values in `instance_map` must be greater than the `stream` attribute.
     """
 
     stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True)
@@ -468,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
         kw_only=True,
     )
 
+    def __attrs_post_init__(self) -> None:
+        # Enforce that all instances have a value greater than the min stream
+        # position.
+        for i, v in self.instance_map.items():
+            if v <= self.stream:
+                raise ValueError(
+                    f"'instance_map' includes a stream position before the main 'stream' attribute. Instance: {i}"
+                )
+
     @classmethod
     @abc.abstractmethod
     async def parse(cls, store: "DataStore", string: str) -> "Self":
@@ -494,6 +509,9 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
             for instance in set(self.instance_map).union(other.instance_map)
         }
 
+        # Filter out any redundant entries.
+        instance_map = {i: s for i, s in instance_map.items() if s > max_stream}
+
         return attr.evolve(
             self, stream=max_stream, instance_map=immutabledict(instance_map)
         )
@@ -539,10 +557,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     def bound_stream_token(self, max_stream: int) -> "Self":
         """Bound the stream positions to a maximum value"""
 
+        min_pos = min(self.stream, max_stream)
         return type(self)(
-            stream=min(self.stream, max_stream),
+            stream=min_pos,
             instance_map=immutabledict(
-                {k: min(s, max_stream) for k, s in self.instance_map.items()}
+                {
+                    k: min(s, max_stream)
+                    for k, s in self.instance_map.items()
+                    if min(s, max_stream) > min_pos
+                }
             ),
         )
 
@@ -637,6 +660,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
             )
 
+        super().__attrs_post_init__()
+
     @classmethod
     async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken":
         try:
@@ -651,6 +676,11 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -666,7 +696,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid room stream token %r" % (string,))
 
     @classmethod
@@ -713,6 +746,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         return self.instance_map.get(instance_name, self.stream)
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.topological is not None:
             return "t%d-%d" % (self.topological, self.stream)
         elif self.instance_map:
@@ -727,8 +762,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return f"s{self.stream}"
         else:
             return "s%d" % (self.stream,)
 
@@ -740,6 +777,13 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
 
         return super().bound_stream_token(max_stream)
 
+    def __str__(self) -> str:
+        instances = ", ".join(f"{k}: {v}" for k, v in sorted(self.instance_map.items()))
+        return (
+            f"RoomStreamToken(stream: {self.stream}, topological: {self.topological}, "
+            f"instances: {{{instances}}})"
+        )
+
 
 @attr.s(frozen=True, slots=True, order=False)
 class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
@@ -756,6 +800,11 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -770,10 +819,15 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid stream token %r" % (string,))
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.instance_map:
             entries = []
             for name, pos in self.instance_map.items():
@@ -786,8 +840,10 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return str(self.stream)
         else:
             return str(self.stream)
 
@@ -824,6 +880,13 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
 
         return True
 
+    def __str__(self) -> str:
+        instances = ", ".join(f"{k}: {v}" for k, v in sorted(self.instance_map.items()))
+        return (
+            f"MultiWriterStreamToken(stream: {self.stream}, "
+            f"instances: {{{instances}}})"
+        )
+
 
 class StreamKeyType(Enum):
     """Known stream types.
@@ -1082,6 +1145,15 @@ class StreamToken:
 
         return True
 
+    def __str__(self) -> str:
+        return (
+            f"StreamToken(room: {self.room_key}, presence: {self.presence_key}, "
+            f"typing: {self.typing_key}, receipt: {self.receipt_key}, "
+            f"account_data: {self.account_data_key}, push_rules: {self.push_rules_key}, "
+            f"to_device: {self.to_device_key}, device_list: {self.device_list_key}, "
+            f"groups: {self.groups_key}, un_partial_stated_rooms: {self.un_partial_stated_rooms_key})"
+        )
+
 
 StreamToken.START = StreamToken(
     RoomStreamToken(stream=0), 0, 0, MultiWriterStreamToken(stream=0), 0, 0, 0, 0, 0, 0
@@ -1170,11 +1242,12 @@ class ReadReceipt:
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class DeviceListUpdates:
     """
-    An object containing a diff of information regarding other users' device lists, intended for
-    a recipient to carry out device list tracking.
+    An object containing a diff of information regarding other users' device lists,
+    intended for a recipient to carry out device list tracking.
 
     Attributes:
-        changed: A set of users whose device lists have changed recently.
+        changed: A set of users who have updated their device identity or
+            cross-signing keys, or who now share an encrypted room with.
         left: A set of users who the recipient no longer needs to track the device lists of.
             Typically when those users no longer share any end-to-end encryption enabled rooms.
     """
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 43dcdf20dd..4c6c42db04 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -18,7 +18,7 @@
 #
 #
 from enum import Enum
-from typing import TYPE_CHECKING, Dict, Final, List, Optional, Tuple
+from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple
 
 import attr
 from typing_extensions import TypedDict
@@ -31,7 +31,7 @@ else:
     from pydantic import Extra
 
 from synapse.events import EventBase
-from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
+from synapse.types import DeviceListUpdates, JsonDict, JsonMapping, StreamToken, UserID
 from synapse.types.rest.client import SlidingSyncBody
 
 if TYPE_CHECKING:
@@ -200,18 +200,24 @@ class SlidingSyncResult:
                 flag set. (same as sync v2)
         """
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class StrippedHero:
+            user_id: str
+            display_name: Optional[str]
+            avatar_url: Optional[str]
+
         name: Optional[str]
         avatar: Optional[str]
-        heroes: Optional[List[EventBase]]
+        heroes: Optional[List[StrippedHero]]
         is_dm: bool
         initial: bool
-        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
-        required_state: Optional[List[EventBase]]
-        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
-        timeline_events: Optional[List[EventBase]]
+        # Should be empty for invite/knock rooms with `stripped_state`
+        required_state: List[EventBase]
+        # Should be empty for invite/knock rooms with `stripped_state`
+        timeline_events: List[EventBase]
         bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
         # Optional because it's only relevant to invite/knock rooms
-        stripped_state: Optional[List[JsonDict]]
+        stripped_state: List[JsonDict]
         # Only optional because it won't be included for invite/knock rooms with `stripped_state`
         prev_batch: Optional[StreamToken]
         # Only optional because it won't be included for invite/knock rooms with `stripped_state`
@@ -252,10 +258,81 @@ class SlidingSyncResult:
         count: int
         ops: List[Operation]
 
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class Extensions:
+        """Responses for extensions
+
+        Attributes:
+            to_device: The to-device extension (MSC3885)
+            e2ee: The E2EE device extension (MSC3884)
+        """
+
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class ToDeviceExtension:
+            """The to-device extension (MSC3885)
+
+            Attributes:
+                next_batch: The to-device stream token the client should use
+                    to get more results
+                events: A list of to-device messages for the client
+            """
+
+            next_batch: str
+            events: Sequence[JsonMapping]
+
+            def __bool__(self) -> bool:
+                return bool(self.events)
+
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class E2eeExtension:
+            """The E2EE device extension (MSC3884)
+
+            Attributes:
+                device_list_updates: List of user_ids whose devices have changed or left (only
+                    present on incremental syncs).
+                device_one_time_keys_count: Map from key algorithm to the number of
+                    unclaimed one-time keys currently held on the server for this device. If
+                    an algorithm is unlisted, the count for that algorithm is assumed to be
+                    zero. If this entire parameter is missing, the count for all algorithms
+                    is assumed to be zero.
+                device_unused_fallback_key_types: List of unused fallback key algorithms
+                    for this device.
+            """
+
+            # Only present on incremental syncs
+            device_list_updates: Optional[DeviceListUpdates]
+            device_one_time_keys_count: Mapping[str, int]
+            device_unused_fallback_key_types: Sequence[str]
+
+            def __bool__(self) -> bool:
+                # Note that "signed_curve25519" is always returned in key count responses
+                # regardless of whether we uploaded any keys for it. This is necessary until
+                # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
+                #
+                # Also related:
+                # https://github.com/element-hq/element-android/issues/3725 and
+                # https://github.com/matrix-org/synapse/issues/10456
+                default_otk = self.device_one_time_keys_count.get("signed_curve25519")
+                more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
+                    default_otk is not None and default_otk > 0
+                )
+
+                return bool(
+                    more_than_default_otk
+                    or self.device_list_updates
+                    or self.device_unused_fallback_key_types
+                )
+
+        to_device: Optional[ToDeviceExtension] = None
+        e2ee: Optional[E2eeExtension] = None
+
+        def __bool__(self) -> bool:
+            return bool(self.to_device or self.e2ee)
+
     next_pos: StreamToken
     lists: Dict[str, SlidingWindowList]
     rooms: Dict[str, RoomResult]
-    extensions: JsonMapping
+    extensions: Extensions
 
     def __bool__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -271,5 +348,5 @@ class SlidingSyncResult:
             next_pos=next_pos,
             lists={},
             rooms={},
-            extensions={},
+            extensions=SlidingSyncResult.Extensions(),
         )
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index 55f6b44053..f3c45a0d6a 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -200,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel):
                     }
 
             timeline_limit: The maximum number of timeline events to return per response.
-            include_heroes: Return a stripped variant of membership events (containing
-                `user_id` and optionally `avatar_url` and `displayname`) for the users used
-                to calculate the room name.
             filters: Filters to apply to the list before sorting.
         """
 
@@ -270,16 +267,63 @@ class SlidingSyncBody(RequestBodyModel):
         else:
             ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None  # type: ignore[valid-type]
         slow_get_all_rooms: Optional[StrictBool] = False
-        include_heroes: Optional[StrictBool] = False
         filters: Optional[Filters] = None
 
     class RoomSubscription(CommonRoomParameters):
         pass
 
-    class Extension(RequestBodyModel):
-        enabled: Optional[StrictBool] = False
-        lists: Optional[List[StrictStr]] = None
-        rooms: Optional[List[StrictStr]] = None
+    class Extensions(RequestBodyModel):
+        """The extensions section of the request.
+
+        Extensions MUST have an `enabled` flag which defaults to `false`. If a client
+        sends an unknown extension name, the server MUST ignore it (or else backwards
+        compatibility between clients and servers is broken when a newer client tries to
+        communicate with an older server).
+        """
+
+        class ToDeviceExtension(RequestBodyModel):
+            """The to-device extension (MSC3885)
+
+            Attributes:
+                enabled
+                limit: Maximum number of to-device messages to return
+                since: The `next_batch` from the previous sync response
+            """
+
+            enabled: Optional[StrictBool] = False
+            limit: StrictInt = 100
+            since: Optional[StrictStr] = None
+
+            @validator("since")
+            def since_token_check(
+                cls, value: Optional[StrictStr]
+            ) -> Optional[StrictStr]:
+                # `since` comes in as an opaque string token but we know that it's just
+                # an integer representing the position in the device inbox stream. We
+                # want to pre-validate it to make sure it works fine in downstream code.
+                if value is None:
+                    return value
+
+                try:
+                    int(value)
+                except ValueError:
+                    raise ValueError(
+                        "'extensions.to_device.since' is invalid (should look like an int)"
+                    )
+
+                return value
+
+        class E2eeExtension(RequestBodyModel):
+            """The E2EE device extension (MSC3884)
+
+            Attributes:
+                enabled
+            """
+
+            enabled: Optional[StrictBool] = False
+
+        to_device: Optional[ToDeviceExtension] = None
+        e2ee: Optional[E2eeExtension] = None
 
     # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
     if TYPE_CHECKING:
@@ -287,7 +331,7 @@ class SlidingSyncBody(RequestBodyModel):
     else:
         lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]] = None  # type: ignore[valid-type]
     room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]] = None
-    extensions: Optional[Dict[StrictStr, Extension]] = None
+    extensions: Optional[Extensions] = None
 
     @validator("lists")
     def lists_length_check(
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 91c335f85b..16fcb00206 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -327,7 +327,7 @@ class StreamChangeCache:
             for entity in r:
                 self._entity_to_key.pop(entity, None)
 
-    def get_max_pos_of_last_change(self, entity: EntityType) -> int:
+    def get_max_pos_of_last_change(self, entity: EntityType) -> Optional[int]:
         """Returns an upper bound of the stream id of the last change to an
         entity.
 
@@ -335,7 +335,11 @@ class StreamChangeCache:
             entity: The entity to check.
 
         Return:
-            The stream position of the latest change for the given entity or
-            the earliest known stream position if the entitiy is unknown.
+            The stream position of the latest change for the given entity, if
+            known
         """
-        return self._entity_to_key.get(entity, self._earliest_known_stream_pos)
+        return self._entity_to_key.get(entity)
+
+    def get_earliest_known_position(self) -> int:
+        """Returns the earliest position in the cache."""
+        return self._earliest_known_stream_pos