summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-23 17:35:28 +0100
committerErik Johnston <erik@matrix.org>2024-07-23 17:35:28 +0100
commit588dfb8e13441e70ed3d5b7c61c73eb0b0b081cd (patch)
tree4da6804cf8e6382747137fab266c03c8ae80a000 /synapse
parentMerge remote-tracking branch 'origin/develop' into erikj/ss_tokens (diff)
parentAddress changelog review comments (diff)
downloadsynapse-588dfb8e13441e70ed3d5b7c61c73eb0b0b081cd.tar.xz
Merge remote-tracking branch 'origin/develop' into erikj/ss_tokens
Diffstat (limited to 'synapse')
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py5
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/config/repository.py4
-rw-r--r--synapse/config/server.py5
-rw-r--r--synapse/handlers/device.py17
-rw-r--r--synapse/handlers/e2e_room_keys.py18
-rw-r--r--synapse/handlers/sliding_sync.py156
-rw-r--r--synapse/handlers/sync.py6
-rw-r--r--synapse/media/media_repository.py39
-rw-r--r--synapse/media/thumbnailer.py48
-rw-r--r--synapse/notifier.py13
-rw-r--r--synapse/rest/client/keys.py10
-rw-r--r--synapse/rest/client/sync.py32
-rw-r--r--synapse/rest/media/download_resource.py3
-rw-r--r--synapse/rest/media/thumbnail_resource.py5
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/databases/main/cache.py6
-rw-r--r--synapse/storage/databases/main/event_federation.py5
-rw-r--r--synapse/storage/databases/main/media_repository.py28
-rw-r--r--synapse/storage/databases/main/roommember.py26
-rw-r--r--synapse/storage/databases/main/stream.py123
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/86/01_authenticate_media.sql15
-rw-r--r--synapse/types/__init__.py30
-rw-r--r--synapse/types/handlers/__init__.py47
-rw-r--r--synapse/types/rest/client/__init__.py10
-rw-r--r--synapse/util/caches/stream_change_cache.py12
27 files changed, 588 insertions, 88 deletions
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 3bb4a34938..5c6db8118f 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -119,18 +119,19 @@ BOOLEAN_COLUMNS = {
     "e2e_room_keys": ["is_verified"],
     "event_edges": ["is_state"],
     "events": ["processed", "outlier", "contains_url"],
-    "local_media_repository": ["safe_from_quarantine"],
+    "local_media_repository": ["safe_from_quarantine", "authenticated"],
+    "per_user_experimental_features": ["enabled"],
     "presence_list": ["accepted"],
     "presence_stream": ["currently_active"],
     "public_room_list_stream": ["visibility"],
     "pushers": ["enabled"],
     "redactions": ["have_censored"],
+    "remote_media_cache": ["authenticated"],
     "room_stats_state": ["is_federatable"],
     "rooms": ["is_public", "has_auth_chain_index"],
     "users": ["shadow_banned", "approved", "locked", "suspended"],
     "un_partial_stated_event_stream": ["rejection_status_changed"],
     "users_who_share_rooms": ["share_private"],
-    "per_user_experimental_features": ["enabled"],
 }
 
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 12d18137e0..85001d9676 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -50,7 +50,7 @@ class Membership:
     KNOCK: Final = "knock"
     LEAVE: Final = "leave"
     BAN: Final = "ban"
-    LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN}
+    LIST: Final = frozenset((INVITE, JOIN, KNOCK, LEAVE, BAN))
 
 
 class PresenceState:
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index dc0e93ffa1..97ce6de528 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -272,6 +272,10 @@ class ContentRepositoryConfig(Config):
                 remote_media_lifetime
             )
 
+        self.enable_authenticated_media = config.get(
+            "enable_authenticated_media", False
+        )
+
     def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
         assert data_dir_path is not None
         media_store = os.path.join(data_dir_path, "media_store")
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 8bb97df175..fd52c0475c 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -384,6 +384,11 @@ class ServerConfig(Config):
         # Whether to internally track presence, requires that presence is enabled,
         self.track_presence = self.presence_enabled and presence_enabled != "untracked"
 
+        # Determines if presence results for offline users are included on initial/full sync
+        self.presence_include_offline_users_on_sync = presence_config.get(
+            "include_offline_users_on_sync", False
+        )
+
         # Custom presence router module
         # This is the legacy way of configuring it (the config should now be put in the modules section)
         self.presence_router_module_class = None
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 0432d97109..4fc6fcd7ae 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -39,6 +39,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.storage.databases.main.client_ips import DeviceLastConnectionInfo
 from synapse.types import (
+    DeviceListUpdates,
     JsonDict,
     JsonMapping,
     ScheduledTask,
@@ -214,7 +215,7 @@ class DeviceWorkerHandler:
     @cancellable
     async def get_user_ids_changed(
         self, user_id: str, from_token: StreamToken
-    ) -> JsonDict:
+    ) -> DeviceListUpdates:
         """Get list of users that have had the devices updated, or have newly
         joined a room, that `user_id` may be interested in.
         """
@@ -341,11 +342,19 @@ class DeviceWorkerHandler:
             possibly_joined = set()
             possibly_left = set()
 
-        result = {"changed": list(possibly_joined), "left": list(possibly_left)}
+        device_list_updates = DeviceListUpdates(
+            changed=possibly_joined,
+            left=possibly_left,
+        )
 
-        log_kv(result)
+        log_kv(
+            {
+                "changed": device_list_updates.changed,
+                "left": device_list_updates.left,
+            }
+        )
 
-        return result
+        return device_list_updates
 
     async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
         if not self.hs.is_mine(UserID.from_string(user_id)):
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 99f9f6e64a..f397911f28 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -34,7 +34,7 @@ from synapse.api.errors import (
 from synapse.logging.opentracing import log_kv, trace
 from synapse.storage.databases.main.e2e_room_keys import RoomKey
 from synapse.types import JsonDict
-from synapse.util.async_helpers import Linearizer
+from synapse.util.async_helpers import ReadWriteLock
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -58,7 +58,7 @@ class E2eRoomKeysHandler:
         # clients belonging to a user will receive and try to upload a new session at
         # roughly the same time.  Also used to lock out uploads when the key is being
         # changed.
-        self._upload_linearizer = Linearizer("upload_room_keys_lock")
+        self._upload_lock = ReadWriteLock()
 
     @trace
     async def get_room_keys(
@@ -89,7 +89,7 @@ class E2eRoomKeysHandler:
 
         # we deliberately take the lock to get keys so that changing the version
         # works atomically
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.read(user_id):
             # make sure the backup version exists
             try:
                 await self.store.get_e2e_room_keys_version_info(user_id, version)
@@ -132,7 +132,7 @@ class E2eRoomKeysHandler:
         """
 
         # lock for consistency with uploading
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             # make sure the backup version exists
             try:
                 version_info = await self.store.get_e2e_room_keys_version_info(
@@ -193,7 +193,7 @@ class E2eRoomKeysHandler:
         # TODO: Validate the JSON to make sure it has the right keys.
 
         # XXX: perhaps we should use a finer grained lock here?
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             # Check that the version we're trying to upload is the current version
             try:
                 version_info = await self.store.get_e2e_room_keys_version_info(user_id)
@@ -355,7 +355,7 @@ class E2eRoomKeysHandler:
         # TODO: Validate the JSON to make sure it has the right keys.
 
         # lock everyone out until we've switched version
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             new_version = await self.store.create_e2e_room_keys_version(
                 user_id, version_info
             )
@@ -382,7 +382,7 @@ class E2eRoomKeysHandler:
         }
         """
 
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.read(user_id):
             try:
                 res = await self.store.get_e2e_room_keys_version_info(user_id, version)
             except StoreError as e:
@@ -407,7 +407,7 @@ class E2eRoomKeysHandler:
             NotFoundError: if this backup version doesn't exist
         """
 
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             try:
                 await self.store.delete_e2e_room_keys_version(user_id, version)
             except StoreError as e:
@@ -437,7 +437,7 @@ class E2eRoomKeysHandler:
             raise SynapseError(
                 400, "Version in body does not match", Codes.INVALID_PARAM
             )
-        async with self._upload_linearizer.queue(user_id):
+        async with self._upload_lock.write(user_id):
             try:
                 old_info = await self.store.get_e2e_room_keys_version_info(
                     user_id, version
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 824902f7a7..1173f1a144 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -19,7 +19,18 @@
 #
 import logging
 from itertools import chain
-from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Dict,
+    Final,
+    List,
+    Mapping,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+)
 
 import attr
 from immutabledict import immutabledict
@@ -33,6 +44,7 @@ from synapse.storage.databases.main.roommember import extract_heroes_from_room_s
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.storage.roommember import MemberSummary
 from synapse.types import (
+    DeviceListUpdates,
     JsonDict,
     PersistedEventPosition,
     Requester,
@@ -330,6 +342,9 @@ class StateValues:
     # `sender` in the timeline). We only give special meaning to this value when it's a
     # `state_key`.
     LAZY: Final = "$LAZY"
+    # Subsitute with the requester's user ID. Typically used by clients to get
+    # the user's membership.
+    ME: Final = "$ME"
 
 
 class SlidingSyncHandler:
@@ -341,6 +356,7 @@ class SlidingSyncHandler:
         self.notifier = hs.get_notifier()
         self.event_sources = hs.get_event_sources()
         self.relations_handler = hs.get_relations_handler()
+        self.device_handler = hs.get_device_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
     async def wait_for_sync_for_user(
@@ -369,10 +385,6 @@ class SlidingSyncHandler:
         # auth_blocking will occur)
         await self.auth_blocking.check_auth_blocking(requester=requester)
 
-        # TODO: If the To-Device extension is enabled and we have a `from_token`, delete
-        # any to-device messages before that token (since we now know that the device
-        # has received them). (see sync v2 for how to do this)
-
         # If we're working with a user-provided token, we need to make sure to wait for
         # this worker to catch up with the token so we don't skip past any incoming
         # events or future events if the user is nefariously, manually modifying the
@@ -505,7 +517,6 @@ class SlidingSyncHandler:
                 # Also see `StateFilter.must_await_full_state(...)` for comparison
                 lazy_loading = (
                     membership_state_keys is not None
-                    and len(membership_state_keys) == 1
                     and StateValues.LAZY in membership_state_keys
                 )
 
@@ -616,7 +627,9 @@ class SlidingSyncHandler:
             await concurrently_execute(handle_room, relevant_room_map, 10)
 
         extensions = await self.get_extensions_response(
-            sync_config=sync_config, to_token=to_token
+            sync_config=sync_config,
+            from_token=from_token,
+            to_token=to_token,
         )
 
         # TODO: Update this when we implement per-connection state
@@ -1221,34 +1234,33 @@ class SlidingSyncHandler:
         # Assemble a map of room ID to the `stream_ordering` of the last activity that the
         # user should see in the room (<= `to_token`)
         last_activity_in_room_map: Dict[str, int] = {}
-        for room_id, room_for_user in sync_room_map.items():
-            # If they are fully-joined to the room, let's find the latest activity
-            # at/before the `to_token`.
-            if room_for_user.membership == Membership.JOIN:
-                last_event_result = (
-                    await self.store.get_last_event_pos_in_room_before_stream_ordering(
-                        room_id, to_token.room_key
-                    )
-                )
-
-                # If the room has no events at/before the `to_token`, this is probably a
-                # mistake in the code that generates the `sync_room_map` since that should
-                # only give us rooms that the user had membership in during the token range.
-                assert last_event_result is not None
 
-                _, event_pos = last_event_result
-
-                last_activity_in_room_map[room_id] = event_pos.stream
-            else:
-                # Otherwise, if the user has left/been invited/knocked/been banned from
-                # a room, they shouldn't see anything past that point.
+        for room_id, room_for_user in sync_room_map.items():
+            if room_for_user.membership != Membership.JOIN:
+                # If the user has left/been invited/knocked/been banned from a
+                # room, they shouldn't see anything past that point.
                 #
-                # FIXME: It's possible that people should see beyond this point in
-                # invited/knocked cases if for example the room has
+                # FIXME: It's possible that people should see beyond this point
+                # in invited/knocked cases if for example the room has
                 # `invite`/`world_readable` history visibility, see
                 # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
                 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
 
+        # For fully-joined rooms, we find the latest activity at/before the
+        # `to_token`.
+        joined_room_positions = (
+            await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
+                [
+                    room_id
+                    for room_id, room_for_user in sync_room_map.items()
+                    if room_for_user.membership == Membership.JOIN
+                ],
+                to_token.room_key,
+            )
+        )
+
+        last_activity_in_room_map.update(joined_room_positions)
+
         return sorted(
             sync_room_map.values(),
             # Sort by the last activity (stream_ordering) in the room
@@ -1666,6 +1678,8 @@ class SlidingSyncHandler:
 
                             # FIXME: We probably also care about invite, ban, kick, targets, etc
                             # but the spec only mentions "senders".
+                        elif state_key == StateValues.ME:
+                            required_state_types.append((state_type, user.to_string()))
                         else:
                             required_state_types.append((state_type, state_key))
 
@@ -1776,33 +1790,47 @@ class SlidingSyncHandler:
         self,
         sync_config: SlidingSyncConfig,
         to_token: StreamToken,
+        from_token: Optional[SlidingSyncStreamToken],
     ) -> SlidingSyncResult.Extensions:
         """Handle extension requests.
 
         Args:
             sync_config: Sync configuration
             to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
         """
 
         if sync_config.extensions is None:
             return SlidingSyncResult.Extensions()
 
         to_device_response = None
-        if sync_config.extensions.to_device:
-            to_device_response = await self.get_to_device_extensions_response(
+        if sync_config.extensions.to_device is not None:
+            to_device_response = await self.get_to_device_extension_response(
                 sync_config=sync_config,
                 to_device_request=sync_config.extensions.to_device,
                 to_token=to_token,
             )
 
-        return SlidingSyncResult.Extensions(to_device=to_device_response)
+        e2ee_response = None
+        if sync_config.extensions.e2ee is not None:
+            e2ee_response = await self.get_e2ee_extension_response(
+                sync_config=sync_config,
+                e2ee_request=sync_config.extensions.e2ee,
+                to_token=to_token,
+                from_token=from_token,
+            )
+
+        return SlidingSyncResult.Extensions(
+            to_device=to_device_response,
+            e2ee=e2ee_response,
+        )
 
-    async def get_to_device_extensions_response(
+    async def get_to_device_extension_response(
         self,
         sync_config: SlidingSyncConfig,
         to_device_request: SlidingSyncConfig.Extensions.ToDeviceExtension,
         to_token: StreamToken,
-    ) -> SlidingSyncResult.Extensions.ToDeviceExtension:
+    ) -> Optional[SlidingSyncResult.Extensions.ToDeviceExtension]:
         """Handle to-device extension (MSC3885)
 
         Args:
@@ -1810,14 +1838,16 @@ class SlidingSyncHandler:
             to_device_request: The to-device extension from the request
             to_token: The point in the stream to sync up to.
         """
-
         user_id = sync_config.user.to_string()
         device_id = sync_config.device_id
 
+        # Skip if the extension is not enabled
+        if not to_device_request.enabled:
+            return None
+
         # Check that this request has a valid device ID (not all requests have
-        # to belong to a device, and so device_id is None), and that the
-        # extension is enabled.
-        if device_id is None or not to_device_request.enabled:
+        # to belong to a device, and so device_id is None)
+        if device_id is None:
             return SlidingSyncResult.Extensions.ToDeviceExtension(
                 next_batch=f"{to_token.to_device_key}",
                 events=[],
@@ -1868,3 +1898,53 @@ class SlidingSyncHandler:
             next_batch=f"{stream_id}",
             events=messages,
         )
+
+    async def get_e2ee_extension_response(
+        self,
+        sync_config: SlidingSyncConfig,
+        e2ee_request: SlidingSyncConfig.Extensions.E2eeExtension,
+        to_token: StreamToken,
+        from_token: Optional[SlidingSyncStreamToken],
+    ) -> Optional[SlidingSyncResult.Extensions.E2eeExtension]:
+        """Handle E2EE device extension (MSC3884)
+
+        Args:
+            sync_config: Sync configuration
+            e2ee_request: The e2ee extension from the request
+            to_token: The point in the stream to sync up to.
+            from_token: The point in the stream to sync from.
+        """
+        user_id = sync_config.user.to_string()
+        device_id = sync_config.device_id
+
+        # Skip if the extension is not enabled
+        if not e2ee_request.enabled:
+            return None
+
+        device_list_updates: Optional[DeviceListUpdates] = None
+        if from_token is not None:
+            # TODO: This should take into account the `from_token` and `to_token`
+            device_list_updates = await self.device_handler.get_user_ids_changed(
+                user_id=user_id,
+                from_token=from_token.stream,
+            )
+
+        device_one_time_keys_count: Mapping[str, int] = {}
+        device_unused_fallback_key_types: Sequence[str] = []
+        if device_id:
+            # TODO: We should have a way to let clients differentiate between the states of:
+            #   * no change in OTK count since the provided since token
+            #   * the server has zero OTKs left for this device
+            #  Spec issue: https://github.com/matrix-org/matrix-doc/issues/3298
+            device_one_time_keys_count = await self.store.count_e2e_one_time_keys(
+                user_id, device_id
+            )
+            device_unused_fallback_key_types = (
+                await self.store.get_e2e_unused_fallback_key_types(user_id, device_id)
+            )
+
+        return SlidingSyncResult.Extensions.E2eeExtension(
+            device_list_updates=device_list_updates,
+            device_one_time_keys_count=device_one_time_keys_count,
+            device_unused_fallback_key_types=device_unused_fallback_key_types,
+        )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index de227faec3..ede014180c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2270,7 +2270,11 @@ class SyncHandler:
             user=user,
             from_key=presence_key,
             is_guest=sync_config.is_guest,
-            include_offline=include_offline,
+            include_offline=(
+                True
+                if self.hs_config.server.presence_include_offline_users_on_sync
+                else include_offline
+            ),
         )
         assert presence_key
         sync_result_builder.now_token = now_token.copy_and_replace(
diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py
index 87c929eb20..8bc92305fe 100644
--- a/synapse/media/media_repository.py
+++ b/synapse/media/media_repository.py
@@ -430,6 +430,7 @@ class MediaRepository:
         media_id: str,
         name: Optional[str],
         max_timeout_ms: int,
+        allow_authenticated: bool = True,
         federation: bool = False,
     ) -> None:
         """Responds to requests for local media, if exists, or returns 404.
@@ -442,6 +443,7 @@ class MediaRepository:
                 the filename in the Content-Disposition header of the response.
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            allow_authenticated: whether media marked as authenticated may be served to this request
             federation: whether the local media being fetched is for a federation request
 
         Returns:
@@ -451,6 +453,10 @@ class MediaRepository:
         if not media_info:
             return
 
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         self.mark_recently_accessed(None, media_id)
 
         media_type = media_info.media_type
@@ -481,6 +487,7 @@ class MediaRepository:
         max_timeout_ms: int,
         ip_address: str,
         use_federation_endpoint: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         """Respond to requests for remote media.
 
@@ -495,6 +502,8 @@ class MediaRepository:
             ip_address: the IP address of the requester
             use_federation_endpoint: whether to request the remote media over the new
                 federation `/download` endpoint
+            allow_authenticated: whether media marked as authenticated may be served to this
+                request
 
         Returns:
             Resolves once a response has successfully been written to request
@@ -526,6 +535,7 @@ class MediaRepository:
                 self.download_ratelimiter,
                 ip_address,
                 use_federation_endpoint,
+                allow_authenticated,
             )
 
         # We deliberately stream the file outside the lock
@@ -548,6 +558,7 @@ class MediaRepository:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool,
     ) -> RemoteMedia:
         """Gets the media info associated with the remote file, downloading
         if necessary.
@@ -560,6 +571,8 @@ class MediaRepository:
             ip_address: IP address of the requester
             use_federation: if a download is necessary, whether to request the remote file
                 over the federation `/download` endpoint
+            allow_authenticated: whether media marked as authenticated may be served to this
+                request
 
         Returns:
             The media info of the file
@@ -581,6 +594,7 @@ class MediaRepository:
                 self.download_ratelimiter,
                 ip_address,
                 use_federation,
+                allow_authenticated,
             )
 
         # Ensure we actually use the responder so that it releases resources
@@ -598,6 +612,7 @@ class MediaRepository:
         download_ratelimiter: Ratelimiter,
         ip_address: str,
         use_federation_endpoint: bool,
+        allow_authenticated: bool,
     ) -> Tuple[Optional[Responder], RemoteMedia]:
         """Looks for media in local cache, if not there then attempt to
         download from remote server.
@@ -619,6 +634,11 @@ class MediaRepository:
         """
         media_info = await self.store.get_cached_remote_media(server_name, media_id)
 
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            # if it isn't cached then don't fetch it or if it's authenticated then don't serve it
+            if not media_info or media_info.authenticated:
+                raise NotFoundError()
+
         # file_id is the ID we use to track the file locally. If we've already
         # seen the file then reuse the existing ID, otherwise generate a new
         # one.
@@ -792,6 +812,11 @@ class MediaRepository:
 
         logger.info("Stored remote media in file %r", fname)
 
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         return RemoteMedia(
             media_origin=server_name,
             media_id=media_id,
@@ -802,6 +827,7 @@ class MediaRepository:
             filesystem_id=file_id,
             last_access_ts=time_now_ms,
             quarantined_by=None,
+            authenticated=authenticated,
         )
 
     async def _federation_download_remote_file(
@@ -915,6 +941,11 @@ class MediaRepository:
 
         logger.debug("Stored remote media in file %r", fname)
 
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         return RemoteMedia(
             media_origin=server_name,
             media_id=media_id,
@@ -925,6 +956,7 @@ class MediaRepository:
             filesystem_id=file_id,
             last_access_ts=time_now_ms,
             quarantined_by=None,
+            authenticated=authenticated,
         )
 
     def _get_thumbnail_requirements(
@@ -1030,7 +1062,12 @@ class MediaRepository:
             t_len = os.path.getsize(output_path)
 
             await self.store.store_local_thumbnail(
-                media_id, t_width, t_height, t_type, t_method, t_len
+                media_id,
+                t_width,
+                t_height,
+                t_type,
+                t_method,
+                t_len,
             )
 
             return output_path
diff --git a/synapse/media/thumbnailer.py b/synapse/media/thumbnailer.py
index 413a720e40..ef6aa8ccf5 100644
--- a/synapse/media/thumbnailer.py
+++ b/synapse/media/thumbnailer.py
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple, Type
 
 from PIL import Image
 
-from synapse.api.errors import Codes, SynapseError, cs_error
+from synapse.api.errors import Codes, NotFoundError, SynapseError, cs_error
 from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP
 from synapse.http.server import respond_with_json
 from synapse.http.site import SynapseRequest
@@ -274,6 +274,7 @@ class ThumbnailProvider:
         m_type: str,
         max_timeout_ms: int,
         for_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_local_media_info(
             request, media_id, max_timeout_ms
@@ -281,6 +282,12 @@ class ThumbnailProvider:
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
         await self._select_and_respond_with_thumbnail(
             request,
@@ -307,14 +314,20 @@ class ThumbnailProvider:
         desired_type: str,
         max_timeout_ms: int,
         for_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_local_media_info(
             request, media_id, max_timeout_ms
         )
-
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
         for info in thumbnail_infos:
             t_w = info.width == desired_width
@@ -381,14 +394,27 @@ class ThumbnailProvider:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms, ip_address, use_federation
+            server_name,
+            media_id,
+            max_timeout_ms,
+            ip_address,
+            use_federation,
+            allow_authenticated,
         )
         if not media_info:
             respond_404(request)
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                respond_404(request)
+                return
+
         thumbnail_infos = await self.store.get_remote_media_thumbnails(
             server_name, media_id
         )
@@ -446,16 +472,28 @@ class ThumbnailProvider:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         # TODO: Don't download the whole remote file
         # We should proxy the thumbnail from the remote server instead of
         # downloading the remote file and generating our own thumbnails.
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms, ip_address, use_federation
+            server_name,
+            media_id,
+            max_timeout_ms,
+            ip_address,
+            use_federation,
+            allow_authenticated,
         )
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_remote_media_thumbnails(
             server_name, media_id
         )
@@ -485,8 +523,8 @@ class ThumbnailProvider:
         file_id: str,
         url_cache: bool,
         for_federation: bool,
-        server_name: Optional[str] = None,
         media_info: Optional[LocalMedia] = None,
+        server_name: Optional[str] = None,
     ) -> None:
         """
         Respond to a request with an appropriate thumbnail from the previously generated thumbnails.
diff --git a/synapse/notifier.py b/synapse/notifier.py
index c3ecf86ec4..7a2b54036c 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -773,6 +773,7 @@ class Notifier:
         stream_token = await self.event_sources.bound_future_token(stream_token)
 
         start = self.clock.time_msec()
+        logged = False
         while True:
             current_token = self.event_sources.get_current_token()
             if stream_token.is_before_or_eq(current_token):
@@ -783,11 +784,13 @@ class Notifier:
             if now - start > 10_000:
                 return False
 
-            logger.info(
-                "Waiting for current token to reach %s; currently at %s",
-                stream_token,
-                current_token,
-            )
+            if not logged:
+                logger.info(
+                    "Waiting for current token to reach %s; currently at %s",
+                    stream_token,
+                    current_token,
+                )
+                logged = True
 
             # TODO: be better
             await self.clock.sleep(0.5)
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index 67de634eab..eddad7d5b8 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -256,9 +256,15 @@ class KeyChangesServlet(RestServlet):
 
         user_id = requester.user.to_string()
 
-        results = await self.device_handler.get_user_ids_changed(user_id, from_token)
+        device_list_updates = await self.device_handler.get_user_ids_changed(
+            user_id, from_token
+        )
+
+        response: JsonDict = {}
+        response["changed"] = list(device_list_updates.changed)
+        response["left"] = list(device_list_updates.left)
 
-        return 200, results
+        return 200, response
 
 
 class OneTimeKeyServlet(RestServlet):
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index b3967db18d..d72dfa2b10 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -1083,15 +1083,41 @@ class SlidingSyncRestServlet(RestServlet):
     async def encode_extensions(
         self, requester: Requester, extensions: SlidingSyncResult.Extensions
     ) -> JsonDict:
-        result = {}
+        serialized_extensions: JsonDict = {}
 
         if extensions.to_device is not None:
-            result["to_device"] = {
+            serialized_extensions["to_device"] = {
                 "next_batch": extensions.to_device.next_batch,
                 "events": extensions.to_device.events,
             }
 
-        return result
+        if extensions.e2ee is not None:
+            serialized_extensions["e2ee"] = {
+                # We always include this because
+                # https://github.com/vector-im/element-android/issues/3725. The spec
+                # isn't terribly clear on when this can be omitted and how a client
+                # would tell the difference between "no keys present" and "nothing
+                # changed" in terms of whole field absent / individual key type entry
+                # absent Corresponding synapse issue:
+                # https://github.com/matrix-org/synapse/issues/10456
+                "device_one_time_keys_count": extensions.e2ee.device_one_time_keys_count,
+                # https://github.com/matrix-org/matrix-doc/blob/54255851f642f84a4f1aaf7bc063eebe3d76752b/proposals/2732-olm-fallback-keys.md
+                # states that this field should always be included, as long as the
+                # server supports the feature.
+                "device_unused_fallback_key_types": extensions.e2ee.device_unused_fallback_key_types,
+            }
+
+            if extensions.e2ee.device_list_updates is not None:
+                serialized_extensions["e2ee"]["device_lists"] = {}
+
+                serialized_extensions["e2ee"]["device_lists"]["changed"] = list(
+                    extensions.e2ee.device_list_updates.changed
+                )
+                serialized_extensions["e2ee"]["device_lists"]["left"] = list(
+                    extensions.e2ee.device_list_updates.left
+                )
+
+        return serialized_extensions
 
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py
index c32c626905..3c3f703667 100644
--- a/synapse/rest/media/download_resource.py
+++ b/synapse/rest/media/download_resource.py
@@ -84,7 +84,7 @@ class DownloadResource(RestServlet):
 
         if self._is_mine_server_name(server_name):
             await self.media_repo.get_local_media(
-                request, media_id, file_name, max_timeout_ms
+                request, media_id, file_name, max_timeout_ms, allow_authenticated=False
             )
         else:
             allow_remote = parse_boolean(request, "allow_remote", default=True)
@@ -106,4 +106,5 @@ class DownloadResource(RestServlet):
                 max_timeout_ms,
                 ip_address,
                 False,
+                allow_authenticated=False,
             )
diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py
index 70354aa439..536fea4c32 100644
--- a/synapse/rest/media/thumbnail_resource.py
+++ b/synapse/rest/media/thumbnail_resource.py
@@ -96,6 +96,7 @@ class ThumbnailResource(RestServlet):
                     m_type,
                     max_timeout_ms,
                     False,
+                    allow_authenticated=False,
                 )
             else:
                 await self.thumbnail_provider.respond_local_thumbnail(
@@ -107,6 +108,7 @@ class ThumbnailResource(RestServlet):
                     m_type,
                     max_timeout_ms,
                     False,
+                    allow_authenticated=False,
                 )
             self.media_repo.mark_recently_accessed(None, media_id)
         else:
@@ -134,6 +136,7 @@ class ThumbnailResource(RestServlet):
                 m_type,
                 max_timeout_ms,
                 ip_address,
-                False,
+                use_federation=False,
+                allow_authenticated=False,
             )
             self.media_repo.mark_recently_accessed(server_name, media_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 881888fa93..066f3d08ae 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -120,6 +120,9 @@ class SQLBaseStore(metaclass=ABCMeta):
                 "get_user_in_room_with_profile", (room_id, user_id)
             )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
+            self._attempt_to_invalidate_cache(
+                "_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
+            )
 
         # Purge other caches based on room state.
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
@@ -146,6 +149,9 @@ class SQLBaseStore(metaclass=ABCMeta):
         self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
         self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache(
+            "_get_rooms_for_local_user_where_membership_is_inner", None
+        )
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
 
     def _attempt_to_invalidate_cache(
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2d6b75e47e..26b8e1a172 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -331,6 +331,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 "get_invited_rooms_for_local_user", (state_key,)
             )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
+            self._attempt_to_invalidate_cache(
+                "_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
+            )
 
             self._attempt_to_invalidate_cache(
                 "did_forget",
@@ -393,6 +396,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
         self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache(
+            "_get_rooms_for_local_user_where_membership_is_inner", None
+        )
         self._attempt_to_invalidate_cache("did_forget", None)
         self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
         self._attempt_to_invalidate_cache("get_references_for_event", None)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 24abab4a23..715846865b 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1313,6 +1313,11 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
         # We want to make the cache more effective, so we clamp to the last
         # change before the given ordering.
         last_change = self._events_stream_cache.get_max_pos_of_last_change(room_id)  # type: ignore[attr-defined]
+        if last_change is None:
+            # If the room isn't in the cache we know that the last change was
+            # somewhere before the earliest known position of the cache, so we
+            # can clamp to that.
+            last_change = self._events_stream_cache.get_earliest_known_position()  # type: ignore[attr-defined]
 
         # We don't always have a full stream_to_exterm_id table, e.g. after
         # the upgrade that introduced it, so we make sure we never ask for a
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 6128332af8..7617fd3ad4 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -64,6 +64,7 @@ class LocalMedia:
     quarantined_by: Optional[str]
     safe_from_quarantine: bool
     user_id: Optional[str]
+    authenticated: Optional[bool]
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -77,6 +78,7 @@ class RemoteMedia:
     created_ts: int
     last_access_ts: int
     quarantined_by: Optional[str]
+    authenticated: Optional[bool]
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -218,6 +220,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "last_access_ts",
                 "safe_from_quarantine",
                 "user_id",
+                "authenticated",
             ),
             allow_none=True,
             desc="get_local_media",
@@ -235,6 +238,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             last_access_ts=row[6],
             safe_from_quarantine=row[7],
             user_id=row[8],
+            authenticated=row[9],
         )
 
     async def get_local_media_by_user_paginate(
@@ -290,7 +294,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     last_access_ts,
                     quarantined_by,
                     safe_from_quarantine,
-                    user_id
+                    user_id,
+                    authenticated
                 FROM local_media_repository
                 WHERE user_id = ?
                 ORDER BY {order_by_column} {order}, media_id ASC
@@ -314,6 +319,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     quarantined_by=row[7],
                     safe_from_quarantine=bool(row[8]),
                     user_id=row[9],
+                    authenticated=row[10],
                 )
                 for row in txn
             ]
@@ -417,12 +423,18 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         time_now_ms: int,
         user_id: UserID,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "local_media_repository",
             {
                 "media_id": media_id,
                 "created_ts": time_now_ms,
                 "user_id": user_id.to_string(),
+                "authenticated": authenticated,
             },
             desc="store_local_media_id",
         )
@@ -438,6 +450,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         user_id: UserID,
         url_cache: Optional[str] = None,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "local_media_repository",
             {
@@ -448,6 +465,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "media_length": media_length,
                 "user_id": user_id.to_string(),
                 "url_cache": url_cache,
+                "authenticated": authenticated,
             },
             desc="store_local_media",
         )
@@ -638,6 +656,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "filesystem_id",
                 "last_access_ts",
                 "quarantined_by",
+                "authenticated",
             ),
             allow_none=True,
             desc="get_cached_remote_media",
@@ -654,6 +673,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             filesystem_id=row[4],
             last_access_ts=row[5],
             quarantined_by=row[6],
+            authenticated=row[7],
         )
 
     async def store_cached_remote_media(
@@ -666,6 +686,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         upload_name: Optional[str],
         filesystem_id: str,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "remote_media_cache",
             {
@@ -677,6 +702,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "upload_name": upload_name,
                 "filesystem_id": filesystem_id,
                 "last_access_ts": time_now_ms,
+                "authenticated": authenticated,
             },
             desc="store_cached_remote_media",
         )
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index f62d9f705d..640ab123f0 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -445,9 +445,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         if not membership_list:
             return []
 
-        rooms = await self.db_pool.runInteraction(
-            "get_rooms_for_local_user_where_membership_is",
-            self._get_rooms_for_local_user_where_membership_is_txn,
+        # Convert membership list to frozen set as a) it needs to be hashable,
+        # and b) we don't care about the order.
+        membership_list = frozenset(membership_list)
+
+        rooms = await self._get_rooms_for_local_user_where_membership_is_inner(
             user_id,
             membership_list,
         )
@@ -466,6 +468,24 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
         return [room for room in rooms if room.room_id not in rooms_to_exclude]
 
+    @cached(max_entries=1000, tree=True)
+    async def _get_rooms_for_local_user_where_membership_is_inner(
+        self,
+        user_id: str,
+        membership_list: Collection[str],
+    ) -> Sequence[RoomsForUser]:
+        if not membership_list:
+            return []
+
+        rooms = await self.db_pool.runInteraction(
+            "get_rooms_for_local_user_where_membership_is",
+            self._get_rooms_for_local_user_where_membership_is_txn,
+            user_id,
+            membership_list,
+        )
+
+        return rooms
+
     def _get_rooms_for_local_user_where_membership_is_txn(
         self,
         txn: LoggingTransaction,
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index e74e0d2e91..b034361aec 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -78,10 +78,11 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import PersistedEventPosition, RoomStreamToken
+from synapse.types import PersistedEventPosition, RoomStreamToken, StrCollection
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
+from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -1293,6 +1294,126 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             get_last_event_pos_in_room_before_stream_ordering_txn,
         )
 
+    async def bulk_get_last_event_pos_in_room_before_stream_ordering(
+        self,
+        room_ids: StrCollection,
+        end_token: RoomStreamToken,
+    ) -> Dict[str, int]:
+        """Bulk fetch the stream position of the latest events in the given
+        rooms
+        """
+
+        min_token = end_token.stream
+        max_token = end_token.get_max_stream_pos()
+        results: Dict[str, int] = {}
+
+        # First, we check for the rooms in the stream change cache to see if we
+        # can just use the latest position from it.
+        missing_room_ids: Set[str] = set()
+        for room_id in room_ids:
+            stream_pos = self._events_stream_cache.get_max_pos_of_last_change(room_id)
+            if stream_pos and stream_pos <= min_token:
+                results[room_id] = stream_pos
+            else:
+                missing_room_ids.add(room_id)
+
+        # Next, we query the stream position from the DB. At first we fetch all
+        # positions less than the *max* stream pos in the token, then filter
+        # them down. We do this as a) this is a cheaper query, and b) the vast
+        # majority of rooms will have a latest token from before the min stream
+        # pos.
+
+        def bulk_get_last_event_pos_txn(
+            txn: LoggingTransaction, batch_room_ids: StrCollection
+        ) -> Dict[str, int]:
+            # This query fetches the latest stream position in the rooms before
+            # the given max position.
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", batch_room_ids
+            )
+            sql = f"""
+                SELECT room_id, (
+                    SELECT stream_ordering FROM events AS e
+                    LEFT JOIN rejections USING (event_id)
+                    WHERE e.room_id = r.room_id
+                        AND stream_ordering <= ?
+                        AND NOT outlier
+                        AND rejection_reason IS NULL
+                    ORDER BY stream_ordering DESC
+                    LIMIT 1
+                )
+                FROM rooms AS r
+                WHERE {clause}
+            """
+            txn.execute(sql, [max_token] + args)
+            return {row[0]: row[1] for row in txn}
+
+        recheck_rooms: Set[str] = set()
+        for batched in batch_iter(missing_room_ids, 1000):
+            result = await self.db_pool.runInteraction(
+                "bulk_get_last_event_pos_in_room_before_stream_ordering",
+                bulk_get_last_event_pos_txn,
+                batched,
+            )
+
+            # Check that the stream position for the rooms are from before the
+            # minimum position of the token. If not then we need to fetch more
+            # rows.
+            for room_id, stream in result.items():
+                if stream <= min_token:
+                    results[room_id] = stream
+                else:
+                    recheck_rooms.add(room_id)
+
+        if not recheck_rooms:
+            return results
+
+        # For the remaining rooms we need to fetch all rows between the min and
+        # max stream positions in the end token, and filter out the rows that
+        # are after the end token.
+        #
+        # This query should be fast as the range between the min and max should
+        # be small.
+
+        def bulk_get_last_event_pos_recheck_txn(
+            txn: LoggingTransaction, batch_room_ids: StrCollection
+        ) -> Dict[str, int]:
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", batch_room_ids
+            )
+            sql = f"""
+                SELECT room_id, instance_name, stream_ordering
+                FROM events
+                WHERE ? < stream_ordering AND stream_ordering <= ?
+                    AND NOT outlier
+                    AND rejection_reason IS NULL
+                    AND {clause}
+                ORDER BY stream_ordering ASC
+            """
+            txn.execute(sql, [min_token, max_token] + args)
+
+            # We take the max stream ordering that is less than the token. Since
+            # we ordered by stream ordering we just need to iterate through and
+            # take the last matching stream ordering.
+            txn_results: Dict[str, int] = {}
+            for row in txn:
+                room_id = row[0]
+                event_pos = PersistedEventPosition(row[1], row[2])
+                if not event_pos.persisted_after(end_token):
+                    txn_results[room_id] = event_pos.stream
+
+            return txn_results
+
+        for batched in batch_iter(recheck_rooms, 1000):
+            recheck_result = await self.db_pool.runInteraction(
+                "bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
+                bulk_get_last_event_pos_recheck_txn,
+                batched,
+            )
+            results.update(recheck_result)
+
+        return results
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: str
     ) -> RoomStreamToken:
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 0dc5d24249..581d00346b 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -19,7 +19,7 @@
 #
 #
 
-SCHEMA_VERSION = 85  # remember to update the list below when updating
+SCHEMA_VERSION = 86  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -139,6 +139,9 @@ Changes in SCHEMA_VERSION = 84
 
 Changes in SCHEMA_VERSION = 85
     - Add a column `suspended` to the `users` table
+
+Changes in SCHEMA_VERSION = 86
+    - Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
 """
 
 
diff --git a/synapse/storage/schema/main/delta/86/01_authenticate_media.sql b/synapse/storage/schema/main/delta/86/01_authenticate_media.sql
new file mode 100644
index 0000000000..c1ac01ae95
--- /dev/null
+++ b/synapse/storage/schema/main/delta/86/01_authenticate_media.sql
@@ -0,0 +1,15 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+ALTER TABLE remote_media_cache ADD COLUMN authenticated BOOLEAN DEFAULT FALSE NOT NULL;
+ALTER TABLE local_media_repository ADD COLUMN authenticated BOOLEAN DEFAULT FALSE NOT NULL;
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index a0956febc0..2294bc22e0 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -777,6 +777,13 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
 
         return super().bound_stream_token(max_stream)
 
+    def __str__(self) -> str:
+        instances = ", ".join(f"{k}: {v}" for k, v in sorted(self.instance_map.items()))
+        return (
+            f"RoomStreamToken(stream: {self.stream}, topological: {self.topological}, "
+            f"instances: {{{instances}}})"
+        )
+
 
 @attr.s(frozen=True, slots=True, order=False)
 class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
@@ -873,6 +880,13 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
 
         return True
 
+    def __str__(self) -> str:
+        instances = ", ".join(f"{k}: {v}" for k, v in sorted(self.instance_map.items()))
+        return (
+            f"MultiWriterStreamToken(stream: {self.stream}, "
+            f"instances: {{{instances}}})"
+        )
+
 
 class StreamKeyType(Enum):
     """Known stream types.
@@ -1131,6 +1145,15 @@ class StreamToken:
 
         return True
 
+    def __str__(self) -> str:
+        return (
+            f"StreamToken(room: {self.room_key}, presence: {self.presence_key}, "
+            f"typing: {self.typing_key}, receipt: {self.receipt_key}, "
+            f"account_data: {self.account_data_key}, push_rules: {self.push_rules_key}, "
+            f"to_device: {self.to_device_key}, device_list: {self.device_list_key}, "
+            f"groups: {self.groups_key}, un_partial_stated_rooms: {self.un_partial_stated_rooms_key})"
+        )
+
 
 StreamToken.START = StreamToken(
     RoomStreamToken(stream=0), 0, 0, MultiWriterStreamToken(stream=0), 0, 0, 0, 0, 0, 0
@@ -1262,11 +1285,12 @@ class ReadReceipt:
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class DeviceListUpdates:
     """
-    An object containing a diff of information regarding other users' device lists, intended for
-    a recipient to carry out device list tracking.
+    An object containing a diff of information regarding other users' device lists,
+    intended for a recipient to carry out device list tracking.
 
     Attributes:
-        changed: A set of users whose device lists have changed recently.
+        changed: A set of users who have updated their device identity or
+            cross-signing keys, or who now share an encrypted room with.
         left: A set of users who the recipient no longer needs to track the device lists of.
             Typically when those users no longer share any end-to-end encryption enabled rooms.
     """
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 642e5483a9..59eb0963ee 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -18,7 +18,7 @@
 #
 #
 from enum import Enum
-from typing import TYPE_CHECKING, Dict, Final, List, Optional, Sequence, Tuple
+from typing import TYPE_CHECKING, Dict, Final, List, Mapping, Optional, Sequence, Tuple
 
 import attr
 from typing_extensions import TypedDict
@@ -32,6 +32,7 @@ else:
 
 from synapse.events import EventBase
 from synapse.types import (
+    DeviceListUpdates,
     JsonDict,
     JsonMapping,
     SlidingSyncStreamToken,
@@ -270,6 +271,7 @@ class SlidingSyncResult:
 
         Attributes:
             to_device: The to-device extension (MSC3885)
+            e2ee: The E2EE device extension (MSC3884)
         """
 
         @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -288,10 +290,51 @@ class SlidingSyncResult:
             def __bool__(self) -> bool:
                 return bool(self.events)
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class E2eeExtension:
+            """The E2EE device extension (MSC3884)
+
+            Attributes:
+                device_list_updates: List of user_ids whose devices have changed or left (only
+                    present on incremental syncs).
+                device_one_time_keys_count: Map from key algorithm to the number of
+                    unclaimed one-time keys currently held on the server for this device. If
+                    an algorithm is unlisted, the count for that algorithm is assumed to be
+                    zero. If this entire parameter is missing, the count for all algorithms
+                    is assumed to be zero.
+                device_unused_fallback_key_types: List of unused fallback key algorithms
+                    for this device.
+            """
+
+            # Only present on incremental syncs
+            device_list_updates: Optional[DeviceListUpdates]
+            device_one_time_keys_count: Mapping[str, int]
+            device_unused_fallback_key_types: Sequence[str]
+
+            def __bool__(self) -> bool:
+                # Note that "signed_curve25519" is always returned in key count responses
+                # regardless of whether we uploaded any keys for it. This is necessary until
+                # https://github.com/matrix-org/matrix-doc/issues/3298 is fixed.
+                #
+                # Also related:
+                # https://github.com/element-hq/element-android/issues/3725 and
+                # https://github.com/matrix-org/synapse/issues/10456
+                default_otk = self.device_one_time_keys_count.get("signed_curve25519")
+                more_than_default_otk = len(self.device_one_time_keys_count) > 1 or (
+                    default_otk is not None and default_otk > 0
+                )
+
+                return bool(
+                    more_than_default_otk
+                    or self.device_list_updates
+                    or self.device_unused_fallback_key_types
+                )
+
         to_device: Optional[ToDeviceExtension] = None
+        e2ee: Optional[E2eeExtension] = None
 
         def __bool__(self) -> bool:
-            return bool(self.to_device)
+            return bool(self.to_device or self.e2ee)
 
     next_pos: SlidingSyncStreamToken
     lists: Dict[str, SlidingWindowList]
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index dbe37bc712..f3c45a0d6a 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -313,7 +313,17 @@ class SlidingSyncBody(RequestBodyModel):
 
                 return value
 
+        class E2eeExtension(RequestBodyModel):
+            """The E2EE device extension (MSC3884)
+
+            Attributes:
+                enabled
+            """
+
+            enabled: Optional[StrictBool] = False
+
         to_device: Optional[ToDeviceExtension] = None
+        e2ee: Optional[E2eeExtension] = None
 
     # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
     if TYPE_CHECKING:
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 91c335f85b..16fcb00206 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -327,7 +327,7 @@ class StreamChangeCache:
             for entity in r:
                 self._entity_to_key.pop(entity, None)
 
-    def get_max_pos_of_last_change(self, entity: EntityType) -> int:
+    def get_max_pos_of_last_change(self, entity: EntityType) -> Optional[int]:
         """Returns an upper bound of the stream id of the last change to an
         entity.
 
@@ -335,7 +335,11 @@ class StreamChangeCache:
             entity: The entity to check.
 
         Return:
-            The stream position of the latest change for the given entity or
-            the earliest known stream position if the entitiy is unknown.
+            The stream position of the latest change for the given entity, if
+            known
         """
-        return self._entity_to_key.get(entity, self._earliest_known_stream_pos)
+        return self._entity_to_key.get(entity)
+
+    def get_earliest_known_position(self) -> int:
+        """Returns the earliest position in the cache."""
+        return self._earliest_known_stream_pos