summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/config/ratelimiting.py10
-rw-r--r--synapse/events/validator.py2
-rw-r--r--synapse/federation/federation_client.py7
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/transport/client.py9
-rw-r--r--synapse/federation/transport/server/__init__.py24
-rw-r--r--synapse/federation/transport/server/_base.py24
-rw-r--r--synapse/federation/transport/server/federation.py41
-rw-r--r--synapse/handlers/admin.py10
-rw-r--r--synapse/handlers/e2e_keys.py108
-rw-r--r--synapse/handlers/e2e_room_keys.py18
-rw-r--r--synapse/handlers/initial_sync.py2
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/room.py59
-rw-r--r--synapse/handlers/sliding_sync.py441
-rw-r--r--synapse/handlers/sync.py21
-rw-r--r--synapse/http/matrixfederationclient.py55
-rw-r--r--synapse/media/_base.py63
-rw-r--r--synapse/media/media_repository.py61
-rw-r--r--synapse/media/media_storage.py223
-rw-r--r--synapse/media/storage_provider.py40
-rw-r--r--synapse/media/thumbnailer.py6
-rw-r--r--synapse/rest/client/account.py6
-rw-r--r--synapse/rest/client/devices.py4
-rw-r--r--synapse/rest/client/directory.py2
-rw-r--r--synapse/rest/client/keys.py13
-rw-r--r--synapse/rest/client/media.py2
-rw-r--r--synapse/rest/client/models.py99
-rw-r--r--synapse/rest/client/room.py3
-rw-r--r--synapse/rest/client/sync.py230
-rw-r--r--synapse/rest/key/v2/remote_key_resource.py2
-rw-r--r--synapse/rest/media/download_resource.py8
-rw-r--r--synapse/rest/media/thumbnail_resource.py2
-rw-r--r--synapse/server.py4
-rw-r--r--synapse/storage/databases/main/roommember.py14
-rw-r--r--synapse/storage/databases/main/user_directory.py66
-rw-r--r--synapse/storage/roommember.py2
-rw-r--r--synapse/types/handlers/__init__.py252
-rw-r--r--synapse/types/rest/__init__.py (renamed from synapse/rest/models.py)0
-rw-r--r--synapse/types/rest/client/__init__.py284
-rw-r--r--synapse/visibility.py2
42 files changed, 1951 insertions, 276 deletions
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 0a9123c56b..542e4faaa1 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -50,7 +50,7 @@ class Membership:
     KNOCK: Final = "knock"
     LEAVE: Final = "leave"
     BAN: Final = "ban"
-    LIST: Final = (INVITE, JOIN, KNOCK, LEAVE, BAN)
+    LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN}
 
 
 class PresenceState:
diff --git a/synapse/config/ratelimiting.py b/synapse/config/ratelimiting.py
index d2cb4576df..3fa33f5373 100644
--- a/synapse/config/ratelimiting.py
+++ b/synapse/config/ratelimiting.py
@@ -218,3 +218,13 @@ class RatelimitConfig(Config):
             "rc_media_create",
             defaults={"per_second": 10, "burst_count": 50},
         )
+
+        self.remote_media_downloads = RatelimitSettings(
+            key="rc_remote_media_downloads",
+            per_second=self.parse_size(
+                config.get("remote_media_download_per_second", "87K")
+            ),
+            burst_count=self.parse_size(
+                config.get("remote_media_download_burst_count", "500M")
+            ),
+        )
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index 62f0b67dbd..73b63b77f2 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -47,9 +47,9 @@ from synapse.events.utils import (
     validate_canonicaljson,
 )
 from synapse.http.servlet import validate_json_object
-from synapse.rest.models import RequestBodyModel
 from synapse.storage.controllers.state import server_acl_evaluator_from_event
 from synapse.types import EventID, JsonDict, RoomID, StrCollection, UserID
+from synapse.types.rest import RequestBodyModel
 
 
 class EventValidator:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index e613eb87a6..f0f5a37a57 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -56,6 +56,7 @@ from synapse.api.errors import (
     SynapseError,
     UnsupportedRoomVersionError,
 )
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.api.room_versions import (
     KNOWN_ROOM_VERSIONS,
     EventFormatVersions,
@@ -1877,6 +1878,8 @@ class FederationClient(FederationBase):
         output_stream: BinaryIO,
         max_size: int,
         max_timeout_ms: int,
+        download_ratelimiter: Ratelimiter,
+        ip_address: str,
     ) -> Tuple[int, Dict[bytes, List[bytes]]]:
         try:
             return await self.transport_layer.download_media_v3(
@@ -1885,6 +1888,8 @@ class FederationClient(FederationBase):
                 output_stream=output_stream,
                 max_size=max_size,
                 max_timeout_ms=max_timeout_ms,
+                download_ratelimiter=download_ratelimiter,
+                ip_address=ip_address,
             )
         except HttpResponseException as e:
             # If an error is received that is due to an unrecognised endpoint,
@@ -1905,6 +1910,8 @@ class FederationClient(FederationBase):
             output_stream=output_stream,
             max_size=max_size,
             max_timeout_ms=max_timeout_ms,
+            download_ratelimiter=download_ratelimiter,
+            ip_address=ip_address,
         )
 
 
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 7ffc650aa1..1932fa82a4 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -674,7 +674,7 @@ class FederationServer(FederationBase):
         # This is in addition to the HS-level rate limiting applied by
         # BaseFederationServlet.
         # type-ignore: mypy doesn't seem able to deduce the type of the limiter(!?)
-        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(  # type: ignore[has-type]
+        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
             requester=None,
             key=room_id,
             update=False,
@@ -717,7 +717,7 @@ class FederationServer(FederationBase):
             SynapseTags.SEND_JOIN_RESPONSE_IS_PARTIAL_STATE,
             caller_supports_partial_state,
         )
-        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(  # type: ignore[has-type]
+        await self._room_member_handler._join_rate_per_room_limiter.ratelimit(
             requester=None,
             key=room_id,
             update=False,
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index de408f7f8d..af1336fe5f 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -43,6 +43,7 @@ import ijson
 
 from synapse.api.constants import Direction, Membership
 from synapse.api.errors import Codes, HttpResponseException, SynapseError
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.api.room_versions import RoomVersion
 from synapse.api.urls import (
     FEDERATION_UNSTABLE_PREFIX,
@@ -819,6 +820,8 @@ class TransportLayerClient:
         output_stream: BinaryIO,
         max_size: int,
         max_timeout_ms: int,
+        download_ratelimiter: Ratelimiter,
+        ip_address: str,
     ) -> Tuple[int, Dict[bytes, List[bytes]]]:
         path = f"/_matrix/media/r0/download/{destination}/{media_id}"
 
@@ -834,6 +837,8 @@ class TransportLayerClient:
                 "allow_remote": "false",
                 "timeout_ms": str(max_timeout_ms),
             },
+            download_ratelimiter=download_ratelimiter,
+            ip_address=ip_address,
         )
 
     async def download_media_v3(
@@ -843,6 +848,8 @@ class TransportLayerClient:
         output_stream: BinaryIO,
         max_size: int,
         max_timeout_ms: int,
+        download_ratelimiter: Ratelimiter,
+        ip_address: str,
     ) -> Tuple[int, Dict[bytes, List[bytes]]]:
         path = f"/_matrix/media/v3/download/{destination}/{media_id}"
 
@@ -862,6 +869,8 @@ class TransportLayerClient:
                 "allow_redirect": "true",
             },
             follow_redirects=True,
+            download_ratelimiter=download_ratelimiter,
+            ip_address=ip_address,
         )
 
 
diff --git a/synapse/federation/transport/server/__init__.py b/synapse/federation/transport/server/__init__.py
index bac569e977..266675c9b8 100644
--- a/synapse/federation/transport/server/__init__.py
+++ b/synapse/federation/transport/server/__init__.py
@@ -19,6 +19,7 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
+import inspect
 import logging
 from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Type
 
@@ -33,6 +34,7 @@ from synapse.federation.transport.server.federation import (
     FEDERATION_SERVLET_CLASSES,
     FederationAccountStatusServlet,
     FederationUnstableClientKeysClaimServlet,
+    FederationUnstableMediaDownloadServlet,
 )
 from synapse.http.server import HttpServer, JsonResource
 from synapse.http.servlet import (
@@ -315,6 +317,28 @@ def register_servlets(
             ):
                 continue
 
+            if servletclass == FederationUnstableMediaDownloadServlet:
+                if (
+                    not hs.config.server.enable_media_repo
+                    or not hs.config.experimental.msc3916_authenticated_media_enabled
+                ):
+                    continue
+
+                # don't load the endpoint if the storage provider is incompatible
+                media_repo = hs.get_media_repository()
+                load_download_endpoint = True
+                for provider in media_repo.media_storage.storage_providers:
+                    signature = inspect.signature(provider.backend.fetch)
+                    if "federation" not in signature.parameters:
+                        logger.warning(
+                            f"Federation media `/download` endpoint will not be enabled as storage provider {provider.backend} is not compatible with this endpoint."
+                        )
+                        load_download_endpoint = False
+                        break
+
+                if not load_download_endpoint:
+                    continue
+
             servletclass(
                 hs=hs,
                 authenticator=authenticator,
diff --git a/synapse/federation/transport/server/_base.py b/synapse/federation/transport/server/_base.py
index db0f5076a9..4e2717b565 100644
--- a/synapse/federation/transport/server/_base.py
+++ b/synapse/federation/transport/server/_base.py
@@ -360,13 +360,29 @@ class BaseFederationServlet:
                                     "request"
                                 )
                                 return None
+                            if (
+                                func.__self__.__class__.__name__  # type: ignore
+                                == "FederationUnstableMediaDownloadServlet"
+                            ):
+                                response = await func(
+                                    origin, content, request, *args, **kwargs
+                                )
+                            else:
+                                response = await func(
+                                    origin, content, request.args, *args, **kwargs
+                                )
+                    else:
+                        if (
+                            func.__self__.__class__.__name__  # type: ignore
+                            == "FederationUnstableMediaDownloadServlet"
+                        ):
+                            response = await func(
+                                origin, content, request, *args, **kwargs
+                            )
+                        else:
                             response = await func(
                                 origin, content, request.args, *args, **kwargs
                             )
-                    else:
-                        response = await func(
-                            origin, content, request.args, *args, **kwargs
-                        )
             finally:
                 # if we used the origin's context as the parent, add a new span using
                 # the servlet span as a parent, so that we have a link
diff --git a/synapse/federation/transport/server/federation.py b/synapse/federation/transport/server/federation.py
index a59734785f..1f02451efa 100644
--- a/synapse/federation/transport/server/federation.py
+++ b/synapse/federation/transport/server/federation.py
@@ -44,10 +44,13 @@ from synapse.federation.transport.server._base import (
 )
 from synapse.http.servlet import (
     parse_boolean_from_args,
+    parse_integer,
     parse_integer_from_args,
     parse_string_from_args,
     parse_strings_from_args,
 )
+from synapse.http.site import SynapseRequest
+from synapse.media._base import DEFAULT_MAX_TIMEOUT_MS, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS
 from synapse.types import JsonDict
 from synapse.util import SYNAPSE_VERSION
 from synapse.util.ratelimitutils import FederationRateLimiter
@@ -787,6 +790,43 @@ class FederationAccountStatusServlet(BaseFederationServerServlet):
         return 200, {"account_statuses": statuses, "failures": failures}
 
 
+class FederationUnstableMediaDownloadServlet(BaseFederationServerServlet):
+    """
+    Implementation of new federation media `/download` endpoint outlined in MSC3916. Returns
+    a multipart/form-data response consisting of a JSON object and the requested media
+    item. This endpoint only returns local media.
+    """
+
+    PATH = "/media/download/(?P<media_id>[^/]*)"
+    PREFIX = FEDERATION_UNSTABLE_PREFIX + "/org.matrix.msc3916"
+    RATELIMIT = True
+
+    def __init__(
+        self,
+        hs: "HomeServer",
+        ratelimiter: FederationRateLimiter,
+        authenticator: Authenticator,
+        server_name: str,
+    ):
+        super().__init__(hs, authenticator, ratelimiter, server_name)
+        self.media_repo = self.hs.get_media_repository()
+
+    async def on_GET(
+        self,
+        origin: Optional[str],
+        content: Literal[None],
+        request: SynapseRequest,
+        media_id: str,
+    ) -> None:
+        max_timeout_ms = parse_integer(
+            request, "timeout_ms", default=DEFAULT_MAX_TIMEOUT_MS
+        )
+        max_timeout_ms = min(max_timeout_ms, MAXIMUM_ALLOWED_MAX_TIMEOUT_MS)
+        await self.media_repo.get_local_media(
+            request, media_id, None, max_timeout_ms, federation=True
+        )
+
+
 FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationSendServlet,
     FederationEventServlet,
@@ -818,4 +858,5 @@ FEDERATION_SERVLET_CLASSES: Tuple[Type[BaseFederationServlet], ...] = (
     FederationV1SendKnockServlet,
     FederationMakeKnockServlet,
     FederationAccountStatusServlet,
+    FederationUnstableMediaDownloadServlet,
 )
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 702d40332c..21d3bb37f3 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -126,13 +126,7 @@ class AdminHandler:
         # Get all rooms the user is in or has been in
         rooms = await self._store.get_rooms_for_local_user_where_membership_is(
             user_id,
-            membership_list=(
-                Membership.JOIN,
-                Membership.LEAVE,
-                Membership.BAN,
-                Membership.INVITE,
-                Membership.KNOCK,
-            ),
+            membership_list=Membership.LIST,
         )
 
         # We only try and fetch events for rooms the user has been in. If
@@ -179,7 +173,7 @@ class AdminHandler:
             if room.membership == Membership.JOIN:
                 stream_ordering = self._store.get_room_max_stream_ordering()
             else:
-                stream_ordering = room.stream_ordering
+                stream_ordering = room.event_pos.stream
 
             from_key = RoomStreamToken(topological=0, stream=0)
             to_key = RoomStreamToken(stream=stream_ordering)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 560530a7b3..668cec513b 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -35,6 +35,7 @@ from synapse.api.errors import CodeMessageException, Codes, NotFoundError, Synap
 from synapse.handlers.device import DeviceHandler
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import log_kv, set_tag, tag_args, trace
+from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
 from synapse.types import (
     JsonDict,
     JsonMapping,
@@ -45,7 +46,10 @@ from synapse.types import (
 from synapse.util import json_decoder
 from synapse.util.async_helpers import Linearizer, concurrently_execute
 from synapse.util.cancellation import cancellable
-from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.retryutils import (
+    NotRetryingDestination,
+    filter_destinations_by_retry_limiter,
+)
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -86,6 +90,12 @@ class E2eKeysHandler:
                 edu_updater.incoming_signing_key_update,
             )
 
+            self.device_key_uploader = self.upload_device_keys_for_user
+        else:
+            self.device_key_uploader = (
+                ReplicationUploadKeysForUserRestServlet.make_client(hs)
+            )
+
         # doesn't really work as part of the generic query API, because the
         # query request requires an object POST, but we abuse the
         # "query handler" interface.
@@ -268,10 +278,8 @@ class E2eKeysHandler:
                 "%d destinations to query devices for", len(remote_queries_not_in_cache)
             )
 
-            async def _query(
-                destination_queries: Tuple[str, Dict[str, Iterable[str]]]
-            ) -> None:
-                destination, queries = destination_queries
+            async def _query(destination: str) -> None:
+                queries = remote_queries_not_in_cache[destination]
                 return await self._query_devices_for_destination(
                     results,
                     cross_signing_keys,
@@ -281,9 +289,20 @@ class E2eKeysHandler:
                     timeout,
                 )
 
+            # Only try and fetch keys for destinations that are not marked as
+            # down.
+            filtered_destinations = await filter_destinations_by_retry_limiter(
+                remote_queries_not_in_cache.keys(),
+                self.clock,
+                self.store,
+                # Let's give an arbitrary grace period for those hosts that are
+                # only recently down
+                retry_due_within_ms=60 * 1000,
+            )
+
             await concurrently_execute(
                 _query,
-                remote_queries_not_in_cache.items(),
+                filtered_destinations,
                 10,
                 delay_cancellation=True,
             )
@@ -784,36 +803,17 @@ class E2eKeysHandler:
             "one_time_keys": A mapping from algorithm to number of keys for that
                 algorithm, including those previously persisted.
         """
-        # This can only be called from the main process.
-        assert isinstance(self.device_handler, DeviceHandler)
-
         time_now = self.clock.time_msec()
 
         # TODO: Validate the JSON to make sure it has the right keys.
         device_keys = keys.get("device_keys", None)
         if device_keys:
-            logger.info(
-                "Updating device_keys for device %r for user %s at %d",
-                device_id,
-                user_id,
-                time_now,
+            await self.device_key_uploader(
+                user_id=user_id,
+                device_id=device_id,
+                keys={"device_keys": device_keys},
             )
-            log_kv(
-                {
-                    "message": "Updating device_keys for user.",
-                    "user_id": user_id,
-                    "device_id": device_id,
-                }
-            )
-            # TODO: Sign the JSON with the server key
-            changed = await self.store.set_e2e_device_keys(
-                user_id, device_id, time_now, device_keys
-            )
-            if changed:
-                # Only notify about device updates *if* the keys actually changed
-                await self.device_handler.notify_device_update(user_id, [device_id])
-        else:
-            log_kv({"message": "Not updating device_keys for user", "user_id": user_id})
+
         one_time_keys = keys.get("one_time_keys", None)
         if one_time_keys:
             log_kv(
@@ -849,6 +849,49 @@ class E2eKeysHandler:
                 {"message": "Did not update fallback_keys", "reason": "no keys given"}
             )
 
+        result = await self.store.count_e2e_one_time_keys(user_id, device_id)
+
+        set_tag("one_time_key_counts", str(result))
+        return {"one_time_key_counts": result}
+
+    @tag_args
+    async def upload_device_keys_for_user(
+        self, user_id: str, device_id: str, keys: JsonDict
+    ) -> None:
+        """
+        Args:
+            user_id: user whose keys are being uploaded.
+            device_id: device whose keys are being uploaded.
+            device_keys: the `device_keys` of an /keys/upload request.
+
+        """
+        # This can only be called from the main process.
+        assert isinstance(self.device_handler, DeviceHandler)
+
+        time_now = self.clock.time_msec()
+
+        device_keys = keys["device_keys"]
+        logger.info(
+            "Updating device_keys for device %r for user %s at %d",
+            device_id,
+            user_id,
+            time_now,
+        )
+        log_kv(
+            {
+                "message": "Updating device_keys for user.",
+                "user_id": user_id,
+                "device_id": device_id,
+            }
+        )
+        # TODO: Sign the JSON with the server key
+        changed = await self.store.set_e2e_device_keys(
+            user_id, device_id, time_now, device_keys
+        )
+        if changed:
+            # Only notify about device updates *if* the keys actually changed
+            await self.device_handler.notify_device_update(user_id, [device_id])
+
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
         # old access_token without an associated device_id. Either way, we
@@ -856,11 +899,6 @@ class E2eKeysHandler:
         # keys without a corresponding device.
         await self.device_handler.check_device_registered(user_id, device_id)
 
-        result = await self.store.count_e2e_one_time_keys(user_id, device_id)
-
-        set_tag("one_time_key_counts", str(result))
-        return {"one_time_key_counts": result}
-
     async def _upload_one_time_keys_for_user(
         self, user_id: str, device_id: str, time_now: int, one_time_keys: JsonDict
     ) -> None:
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index e76a51ba30..99f9f6e64a 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -247,6 +247,12 @@ class E2eRoomKeysHandler:
                     if current_room_key:
                         if self._should_replace_room_key(current_room_key, room_key):
                             log_kv({"message": "Replacing room key."})
+                            logger.debug(
+                                "Replacing room key. room=%s session=%s user=%s",
+                                room_id,
+                                session_id,
+                                user_id,
+                            )
                             # updates are done one at a time in the DB, so send
                             # updates right away rather than batching them up,
                             # like we do with the inserts
@@ -256,6 +262,12 @@ class E2eRoomKeysHandler:
                             changed = True
                         else:
                             log_kv({"message": "Not replacing room_key."})
+                            logger.debug(
+                                "Not replacing room key. room=%s session=%s user=%s",
+                                room_id,
+                                session_id,
+                                user_id,
+                            )
                     else:
                         log_kv(
                             {
@@ -265,6 +277,12 @@ class E2eRoomKeysHandler:
                             }
                         )
                         log_kv({"message": "Replacing room key."})
+                        logger.debug(
+                            "Inserting new room key. room=%s session=%s user=%s",
+                            room_id,
+                            session_id,
+                            user_id,
+                        )
                         to_insert.append((room_id, session_id, room_key))
                         changed = True
 
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index d99fc4bec0..84d6fecf31 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -199,7 +199,7 @@ class InitialSyncHandler:
                     )
                 elif event.membership == Membership.LEAVE:
                     room_end_token = RoomStreamToken(
-                        stream=event.stream_ordering,
+                        stream=event.event_pos.stream,
                     )
                     deferred_room_state = run_in_background(
                         self._state_storage_controller.get_state_for_events,
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 6617105cdb..dab3f90e74 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -27,7 +27,6 @@ from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
 from synapse.events.utils import SerializeEventConfig
-from synapse.handlers.room import ShutdownRoomParams, ShutdownRoomResponse
 from synapse.handlers.worker_lock import NEW_EVENT_DURING_PURGE_LOCK_NAME
 from synapse.logging.opentracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -41,6 +40,7 @@ from synapse.types import (
     StreamKeyType,
     TaskStatus,
 )
+from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse
 from synapse.types.state import StateFilter
 from synapse.util.async_helpers import ReadWriteLock
 from synapse.visibility import filter_events_for_client
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 51739a2653..203209427b 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -40,7 +40,6 @@ from typing import (
 )
 
 import attr
-from typing_extensions import TypedDict
 
 import synapse.events.snapshot
 from synapse.api.constants import (
@@ -88,6 +87,7 @@ from synapse.types import (
     UserID,
     create_requester,
 )
+from synapse.types.handlers import ShutdownRoomParams, ShutdownRoomResponse
 from synapse.types.state import StateFilter
 from synapse.util import stringutils
 from synapse.util.caches.response_cache import ResponseCache
@@ -1780,63 +1780,6 @@ class RoomEventSource(EventSource[RoomStreamToken, EventBase]):
         return self.store.get_current_room_stream_token_for_room_id(room_id)
 
 
-class ShutdownRoomParams(TypedDict):
-    """
-    Attributes:
-        requester_user_id:
-            User who requested the action. Will be recorded as putting the room on the
-            blocking list.
-        new_room_user_id:
-            If set, a new room will be created with this user ID
-            as the creator and admin, and all users in the old room will be
-            moved into that room. If not set, no new room will be created
-            and the users will just be removed from the old room.
-        new_room_name:
-            A string representing the name of the room that new users will
-            be invited to. Defaults to `Content Violation Notification`
-        message:
-            A string containing the first message that will be sent as
-            `new_room_user_id` in the new room. Ideally this will clearly
-            convey why the original room was shut down.
-            Defaults to `Sharing illegal content on this server is not
-            permitted and rooms in violation will be blocked.`
-        block:
-            If set to `true`, this room will be added to a blocking list,
-            preventing future attempts to join the room. Defaults to `false`.
-        purge:
-            If set to `true`, purge the given room from the database.
-        force_purge:
-            If set to `true`, the room will be purged from database
-            even if there are still users joined to the room.
-    """
-
-    requester_user_id: Optional[str]
-    new_room_user_id: Optional[str]
-    new_room_name: Optional[str]
-    message: Optional[str]
-    block: bool
-    purge: bool
-    force_purge: bool
-
-
-class ShutdownRoomResponse(TypedDict):
-    """
-    Attributes:
-        kicked_users: An array of users (`user_id`) that were kicked.
-        failed_to_kick_users:
-            An array of users (`user_id`) that that were not kicked.
-        local_aliases:
-            An array of strings representing the local aliases that were
-            migrated from the old room to the new.
-        new_room_id: A string representing the room ID of the new room.
-    """
-
-    kicked_users: List[str]
-    failed_to_kick_users: List[str]
-    local_aliases: List[str]
-    new_room_id: Optional[str]
-
-
 class RoomShutdownHandler:
     DEFAULT_MESSAGE = (
         "Sharing illegal content on this server is not permitted and rooms in"
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
new file mode 100644
index 0000000000..1c37f83a2b
--- /dev/null
+++ b/synapse/handlers/sliding_sync.py
@@ -0,0 +1,441 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+import logging
+from typing import TYPE_CHECKING, AbstractSet, Dict, List, Optional
+
+from immutabledict import immutabledict
+
+from synapse.api.constants import Membership
+from synapse.events import EventBase
+from synapse.types import Requester, RoomStreamToken, StreamToken, UserID
+from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+logger = logging.getLogger(__name__)
+
+
+def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
+    """
+    Returns True if the membership event should be included in the sync response,
+    otherwise False.
+
+    Attributes:
+        membership: The membership state of the user in the room.
+        user_id: The user ID that the membership applies to
+        sender: The person who sent the membership event
+    """
+
+    # Everything except `Membership.LEAVE` because we want everything that's *still*
+    # relevant to the user. There are few more things to include in the sync response
+    # (newly_left) but those are handled separately.
+    #
+    # This logic includes kicks (leave events where the sender is not the same user) and
+    # can be read as "anything that isn't a leave or a leave with a different sender".
+    return membership != Membership.LEAVE or sender != user_id
+
+
+class SlidingSyncHandler:
+    def __init__(self, hs: "HomeServer"):
+        self.clock = hs.get_clock()
+        self.store = hs.get_datastores().main
+        self.auth_blocking = hs.get_auth_blocking()
+        self.notifier = hs.get_notifier()
+        self.event_sources = hs.get_event_sources()
+        self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
+
+    async def wait_for_sync_for_user(
+        self,
+        requester: Requester,
+        sync_config: SlidingSyncConfig,
+        from_token: Optional[StreamToken] = None,
+        timeout_ms: int = 0,
+    ) -> SlidingSyncResult:
+        """Get the sync for a client if we have new data for it now. Otherwise
+        wait for new data to arrive on the server. If the timeout expires, then
+        return an empty sync result.
+        """
+        # If the user is not part of the mau group, then check that limits have
+        # not been exceeded (if not part of the group by this point, almost certain
+        # auth_blocking will occur)
+        await self.auth_blocking.check_auth_blocking(requester=requester)
+
+        # TODO: If the To-Device extension is enabled and we have a `from_token`, delete
+        # any to-device messages before that token (since we now know that the device
+        # has received them). (see sync v2 for how to do this)
+
+        # If we're working with a user-provided token, we need to make sure to wait for
+        # this worker to catch up with the token so we don't skip past any incoming
+        # events or future events if the user is nefariously, manually modifying the
+        # token.
+        if from_token is not None:
+            # We need to make sure this worker has caught up with the token. If
+            # this returns false, it means we timed out waiting, and we should
+            # just return an empty response.
+            before_wait_ts = self.clock.time_msec()
+            if not await self.notifier.wait_for_stream_token(from_token):
+                logger.warning(
+                    "Timed out waiting for worker to catch up. Returning empty response"
+                )
+                return SlidingSyncResult.empty(from_token)
+
+            # If we've spent significant time waiting to catch up, take it off
+            # the timeout.
+            after_wait_ts = self.clock.time_msec()
+            if after_wait_ts - before_wait_ts > 1_000:
+                timeout_ms -= after_wait_ts - before_wait_ts
+                timeout_ms = max(timeout_ms, 0)
+
+        # We're going to respond immediately if the timeout is 0 or if this is an
+        # initial sync (without a `from_token`) so we can avoid calling
+        # `notifier.wait_for_events()`.
+        if timeout_ms == 0 or from_token is None:
+            now_token = self.event_sources.get_current_token()
+            result = await self.current_sync_for_user(
+                sync_config,
+                from_token=from_token,
+                to_token=now_token,
+            )
+        else:
+            # Otherwise, we wait for something to happen and report it to the user.
+            async def current_sync_callback(
+                before_token: StreamToken, after_token: StreamToken
+            ) -> SlidingSyncResult:
+                return await self.current_sync_for_user(
+                    sync_config,
+                    from_token=from_token,
+                    to_token=after_token,
+                )
+
+            result = await self.notifier.wait_for_events(
+                sync_config.user.to_string(),
+                timeout_ms,
+                current_sync_callback,
+                from_token=from_token,
+            )
+
+        return result
+
+    async def current_sync_for_user(
+        self,
+        sync_config: SlidingSyncConfig,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken] = None,
+    ) -> SlidingSyncResult:
+        """
+        Generates the response body of a Sliding Sync result, represented as a
+        `SlidingSyncResult`.
+        """
+        user_id = sync_config.user.to_string()
+        app_service = self.store.get_app_service_by_user_id(user_id)
+        if app_service:
+            # We no longer support AS users using /sync directly.
+            # See https://github.com/matrix-org/matrix-doc/issues/1144
+            raise NotImplementedError()
+
+        # Get all of the room IDs that the user should be able to see in the sync
+        # response
+        room_id_set = await self.get_sync_room_ids_for_user(
+            sync_config.user,
+            from_token=from_token,
+            to_token=to_token,
+        )
+
+        # Assemble sliding window lists
+        lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
+        if sync_config.lists:
+            for list_key, list_config in sync_config.lists.items():
+                # TODO: Apply filters
+                #
+                # TODO: Exclude partially stated rooms unless the `required_state` has
+                # `["m.room.member", "$LAZY"]`
+                filtered_room_ids = room_id_set
+                # TODO: Apply sorts
+                sorted_room_ids = sorted(filtered_room_ids)
+
+                ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
+                if list_config.ranges:
+                    for range in list_config.ranges:
+                        ops.append(
+                            SlidingSyncResult.SlidingWindowList.Operation(
+                                op=OperationType.SYNC,
+                                range=range,
+                                room_ids=sorted_room_ids[range[0] : range[1]],
+                            )
+                        )
+
+                lists[list_key] = SlidingSyncResult.SlidingWindowList(
+                    count=len(sorted_room_ids),
+                    ops=ops,
+                )
+
+        return SlidingSyncResult(
+            next_pos=to_token,
+            lists=lists,
+            # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
+            rooms={},
+            extensions={},
+        )
+
+    async def get_sync_room_ids_for_user(
+        self,
+        user: UserID,
+        to_token: StreamToken,
+        from_token: Optional[StreamToken] = None,
+    ) -> AbstractSet[str]:
+        """
+        Fetch room IDs that should be listed for this user in the sync response (the
+        full room list that will be filtered, sorted, and sliced).
+
+        We're looking for rooms where the user has the following state in the token
+        range (> `from_token` and <= `to_token`):
+
+        - `invite`, `join`, `knock`, `ban` membership events
+        - Kicks (`leave` membership events where `sender` is different from the
+          `user_id`/`state_key`)
+        - `newly_left` (rooms that were left during the given token range)
+        - In order for bans/kicks to not show up in sync, you need to `/forget` those
+          rooms. This doesn't modify the event itself though and only adds the
+          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+          to tell when a room was forgotten at the moment so we can't factor it into the
+          from/to range.
+        """
+        user_id = user.to_string()
+
+        # First grab a current snapshot rooms for the user
+        # (also handles forgotten rooms)
+        room_for_user_list = await self.store.get_rooms_for_local_user_where_membership_is(
+            user_id=user_id,
+            # We want to fetch any kind of membership (joined and left rooms) in order
+            # to get the `event_pos` of the latest room membership event for the
+            # user.
+            #
+            # We will filter out the rooms that don't belong below (see
+            # `filter_membership_for_sync`)
+            membership_list=Membership.LIST,
+            excluded_rooms=self.rooms_to_exclude_globally,
+        )
+
+        # If the user has never joined any rooms before, we can just return an empty list
+        if not room_for_user_list:
+            return set()
+
+        # Our working list of rooms that can show up in the sync response
+        sync_room_id_set = {
+            room_for_user.room_id
+            for room_for_user in room_for_user_list
+            if filter_membership_for_sync(
+                membership=room_for_user.membership,
+                user_id=user_id,
+                sender=room_for_user.sender,
+            )
+        }
+
+        # Get the `RoomStreamToken` that represents the spot we queried up to when we got
+        # our membership snapshot from `get_rooms_for_local_user_where_membership_is()`.
+        #
+        # First, we need to get the max stream_ordering of each event persister instance
+        # that we queried events from.
+        instance_to_max_stream_ordering_map: Dict[str, int] = {}
+        for room_for_user in room_for_user_list:
+            instance_name = room_for_user.event_pos.instance_name
+            stream_ordering = room_for_user.event_pos.stream
+
+            current_instance_max_stream_ordering = (
+                instance_to_max_stream_ordering_map.get(instance_name)
+            )
+            if (
+                current_instance_max_stream_ordering is None
+                or stream_ordering > current_instance_max_stream_ordering
+            ):
+                instance_to_max_stream_ordering_map[instance_name] = stream_ordering
+
+        # Then assemble the `RoomStreamToken`
+        membership_snapshot_token = RoomStreamToken(
+            # Minimum position in the `instance_map`
+            stream=min(instance_to_max_stream_ordering_map.values()),
+            instance_map=immutabledict(instance_to_max_stream_ordering_map),
+        )
+
+        # If our `to_token` is already the same or ahead of the latest room membership
+        # for the user, we can just straight-up return the room list (nothing has
+        # changed)
+        if membership_snapshot_token.is_before_or_eq(to_token.room_key):
+            return sync_room_id_set
+
+        # Since we fetched the users room list at some point in time after the from/to
+        # tokens, we need to revert/rewind some membership changes to match the point in
+        # time of the `to_token`. In particular, we need to make these fixups:
+        #
+        # - 1a) Remove rooms that the user joined after the `to_token`
+        # - 1b) Add back rooms that the user left after the `to_token`
+        # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
+        #
+        # Below, we're doing two separate lookups for membership changes. We could
+        # request everything for both fixups in one range, [`from_token.room_key`,
+        # `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
+        # comparison without `instance_name` (which is flawed). We could refactor
+        # `event.internal_metadata` to include `instance_name` but it might turn out a
+        # little difficult and a bigger, broader Synapse change than we want to make.
+
+        # 1) -----------------------------------------------------
+
+        # 1) Fetch membership changes that fall in the range from `to_token` up to
+        # `membership_snapshot_token`
+        membership_change_events_after_to_token = (
+            await self.store.get_membership_changes_for_user(
+                user_id,
+                from_key=to_token.room_key,
+                to_key=membership_snapshot_token,
+                excluded_rooms=self.rooms_to_exclude_globally,
+            )
+        )
+
+        # 1) Assemble a list of the last membership events in some given ranges. Someone
+        # could have left and joined multiple times during the given range but we only
+        # care about end-result so we grab the last one.
+        last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
+        # We also need the first membership event after the `to_token` so we can step
+        # backward to the previous membership that would apply to the from/to range.
+        first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
+        for event in membership_change_events_after_to_token:
+            last_membership_change_by_room_id_after_to_token[event.room_id] = event
+            # Only set if we haven't already set it
+            first_membership_change_by_room_id_after_to_token.setdefault(
+                event.room_id, event
+            )
+
+        # 1) Fixup
+        for (
+            last_membership_change_after_to_token
+        ) in last_membership_change_by_room_id_after_to_token.values():
+            room_id = last_membership_change_after_to_token.room_id
+
+            # We want to find the first membership change after the `to_token` then step
+            # backward to know the membership in the from/to range.
+            first_membership_change_after_to_token = (
+                first_membership_change_by_room_id_after_to_token.get(room_id)
+            )
+            assert first_membership_change_after_to_token is not None, (
+                "If there was a `last_membership_change_after_to_token` that we're iterating over, "
+                + "then there should be corresponding a first change. For example, even if there "
+                + "is only one event after the `to_token`, the first and last event will be same event. "
+                + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
+                + "/`first_membership_change_by_room_id_after_to_token` dicts above."
+            )
+            # TODO: Instead of reading from `unsigned`, refactor this to use the
+            # `current_state_delta_stream` table in the future. Probably a new
+            # `get_membership_changes_for_user()` function that uses
+            # `current_state_delta_stream` with a join to `room_memberships`. This would
+            # help in state reset scenarios since `prev_content` is looking at the
+            # current branch vs the current room state. This is all just data given to
+            # the client so no real harm to data integrity, but we'd like to be nice to
+            # the client. Since the `current_state_delta_stream` table is new, it
+            # doesn't have all events in it. Since this is Sliding Sync, if we ever need
+            # to, we can signal the client to throw all of their state away by sending
+            # "operation: RESET".
+            prev_content = first_membership_change_after_to_token.unsigned.get(
+                "prev_content", {}
+            )
+            prev_membership = prev_content.get("membership", None)
+            prev_sender = first_membership_change_after_to_token.unsigned.get(
+                "prev_sender", None
+            )
+
+            # Check if the previous membership (membership that applies to the from/to
+            # range) should be included in our `sync_room_id_set`
+            should_prev_membership_be_included = (
+                prev_membership is not None
+                and prev_sender is not None
+                and filter_membership_for_sync(
+                    membership=prev_membership,
+                    user_id=user_id,
+                    sender=prev_sender,
+                )
+            )
+
+            # Check if the last membership (membership that applies to our snapshot) was
+            # already included in our `sync_room_id_set`
+            was_last_membership_already_included = filter_membership_for_sync(
+                membership=last_membership_change_after_to_token.membership,
+                user_id=user_id,
+                sender=last_membership_change_after_to_token.sender,
+            )
+
+            # 1a) Add back rooms that the user left after the `to_token`
+            #
+            # For example, if the last membership event after the `to_token` is a leave
+            # event, then the room was excluded from `sync_room_id_set` when we first
+            # crafted it above. We should add these rooms back as long as the user also
+            # was part of the room before the `to_token`.
+            if (
+                not was_last_membership_already_included
+                and should_prev_membership_be_included
+            ):
+                sync_room_id_set.add(room_id)
+            # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
+            #
+            # For example, if the last membership event after the `to_token` is a "join"
+            # event, then the room was included `sync_room_id_set` when we first crafted
+            # it above. We should remove these rooms as long as the user also wasn't
+            # part of the room before the `to_token`.
+            elif (
+                was_last_membership_already_included
+                and not should_prev_membership_be_included
+            ):
+                sync_room_id_set.discard(room_id)
+
+        # 2) -----------------------------------------------------
+        # We fix-up newly_left rooms after the first fixup because it may have removed
+        # some left rooms that we can figure out our newly_left in the following code
+
+        # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
+        membership_change_events_in_from_to_range = []
+        if from_token:
+            membership_change_events_in_from_to_range = (
+                await self.store.get_membership_changes_for_user(
+                    user_id,
+                    from_key=from_token.room_key,
+                    to_key=to_token.room_key,
+                    excluded_rooms=self.rooms_to_exclude_globally,
+                )
+            )
+
+        # 2) Assemble a list of the last membership events in some given ranges. Someone
+        # could have left and joined multiple times during the given range but we only
+        # care about end-result so we grab the last one.
+        last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
+        for event in membership_change_events_in_from_to_range:
+            last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
+
+        # 2) Fixup
+        for (
+            last_membership_change_in_from_to_range
+        ) in last_membership_change_by_room_id_in_from_to_range.values():
+            room_id = last_membership_change_in_from_to_range.room_id
+
+            # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
+            # include newly_left rooms because the last event that the user should see
+            # is their own leave event
+            if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
+                sync_room_id_set.add(room_id)
+
+        return sync_room_id_set
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6389c51b1c..39964726c5 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -2002,7 +2002,7 @@ class SyncHandler:
         """
         user_id = sync_config.user.to_string()
 
-        # Note: we get the users room list *before* we get the current token, this
+        # Note: we get the users room list *before* we get the `now_token`, this
         # avoids checking back in history if rooms are joined after the token is fetched.
         token_before_rooms = self.event_sources.get_current_token()
         mutable_joined_room_ids = set(await self.store.get_rooms_for_user(user_id))
@@ -2014,10 +2014,10 @@ class SyncHandler:
         now_token = self.event_sources.get_current_token()
         log_kv({"now_token": now_token})
 
-        # Since we fetched the users room list before the token, there's a small window
-        # during which membership events may have been persisted, so we fetch these now
-        # and modify the joined room list for any changes between the get_rooms_for_user
-        # call and the get_current_token call.
+        # Since we fetched the users room list before calculating the `now_token` (see
+        # above), there's a small window during which membership events may have been
+        # persisted, so we fetch these now and modify the joined room list for any
+        # changes between the get_rooms_for_user call and the get_current_token call.
         membership_change_events = []
         if since_token:
             membership_change_events = await self.store.get_membership_changes_for_user(
@@ -2027,16 +2027,19 @@ class SyncHandler:
                 self.rooms_to_exclude_globally,
             )
 
-            mem_last_change_by_room_id: Dict[str, EventBase] = {}
+            last_membership_change_by_room_id: Dict[str, EventBase] = {}
             for event in membership_change_events:
-                mem_last_change_by_room_id[event.room_id] = event
+                last_membership_change_by_room_id[event.room_id] = event
 
             # For the latest membership event in each room found, add/remove the room ID
             # from the joined room list accordingly. In this case we only care if the
             # latest change is JOIN.
 
-            for room_id, event in mem_last_change_by_room_id.items():
+            for room_id, event in last_membership_change_by_room_id.items():
                 assert event.internal_metadata.stream_ordering
+                # As a shortcut, skip any events that happened before we got our
+                # `get_rooms_for_user()` snapshot (any changes are already represented
+                # in that list).
                 if (
                     event.internal_metadata.stream_ordering
                     < token_before_rooms.room_key.stream
@@ -2830,7 +2833,7 @@ class SyncHandler:
                             continue
 
                 leave_token = now_token.copy_and_replace(
-                    StreamKeyType.ROOM, RoomStreamToken(stream=event.stream_ordering)
+                    StreamKeyType.ROOM, RoomStreamToken(stream=event.event_pos.stream)
                 )
                 room_entries.append(
                     RoomSyncResultBuilder(
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c73a589e6c..104b803b0f 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -57,7 +57,7 @@ from twisted.internet.interfaces import IReactorTime
 from twisted.internet.task import Cooperator
 from twisted.web.client import ResponseFailed
 from twisted.web.http_headers import Headers
-from twisted.web.iweb import IAgent, IBodyProducer, IResponse
+from twisted.web.iweb import UNKNOWN_LENGTH, IAgent, IBodyProducer, IResponse
 
 import synapse.metrics
 import synapse.util.retryutils
@@ -68,6 +68,7 @@ from synapse.api.errors import (
     RequestSendFailed,
     SynapseError,
 )
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.crypto.context_factory import FederationPolicyForHTTPS
 from synapse.http import QuieterFileBodyProducer
 from synapse.http.client import (
@@ -1411,9 +1412,11 @@ class MatrixFederationHttpClient:
         destination: str,
         path: str,
         output_stream: BinaryIO,
+        download_ratelimiter: Ratelimiter,
+        ip_address: str,
+        max_size: int,
         args: Optional[QueryParams] = None,
         retry_on_dns_fail: bool = True,
-        max_size: Optional[int] = None,
         ignore_backoff: bool = False,
         follow_redirects: bool = False,
     ) -> Tuple[int, Dict[bytes, List[bytes]]]:
@@ -1422,6 +1425,10 @@ class MatrixFederationHttpClient:
             destination: The remote server to send the HTTP request to.
             path: The HTTP path to GET.
             output_stream: File to write the response body to.
+            download_ratelimiter: a ratelimiter to limit remote media downloads, keyed to
+                requester IP
+            ip_address: IP address of the requester
+            max_size: maximum allowable size in bytes of the file
             args: Optional dictionary used to create the query string.
             ignore_backoff: true to ignore the historical backoff data
                 and try the request anyway.
@@ -1441,11 +1448,27 @@ class MatrixFederationHttpClient:
                 federation whitelist
             RequestSendFailed: If there were problems connecting to the
                 remote, due to e.g. DNS failures, connection timeouts etc.
+            SynapseError: If the requested file exceeds ratelimits
         """
         request = MatrixFederationRequest(
             method="GET", destination=destination, path=path, query=args
         )
 
+        # check for a minimum balance of 1MiB in ratelimiter before initiating request
+        send_req, _ = await download_ratelimiter.can_do_action(
+            requester=None, key=ip_address, n_actions=1048576, update=False
+        )
+
+        if not send_req:
+            msg = "Requested file size exceeds ratelimits"
+            logger.warning(
+                "{%s} [%s] %s",
+                request.txn_id,
+                request.destination,
+                msg,
+            )
+            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+
         response = await self._send_request(
             request,
             retry_on_dns_fail=retry_on_dns_fail,
@@ -1455,12 +1478,36 @@ class MatrixFederationHttpClient:
 
         headers = dict(response.headers.getAllRawHeaders())
 
+        expected_size = response.length
+        # if we don't get an expected length then use the max length
+        if expected_size == UNKNOWN_LENGTH:
+            expected_size = max_size
+            logger.debug(
+                f"File size unknown, assuming file is max allowable size: {max_size}"
+            )
+
+        read_body, _ = await download_ratelimiter.can_do_action(
+            requester=None,
+            key=ip_address,
+            n_actions=expected_size,
+        )
+        if not read_body:
+            msg = "Requested file size exceeds ratelimits"
+            logger.warning(
+                "{%s} [%s] %s",
+                request.txn_id,
+                request.destination,
+                msg,
+            )
+            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+
         try:
-            d = read_body_with_max_size(response, output_stream, max_size)
+            # add a byte of headroom to max size as function errs at >=
+            d = read_body_with_max_size(response, output_stream, expected_size + 1)
             d.addTimeout(self.default_timeout_seconds, self.reactor)
             length = await make_deferred_yieldable(d)
         except BodyExceededMaxSize:
-            msg = "Requested file is too large > %r bytes" % (max_size,)
+            msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
                 "{%s} [%s] %s",
                 request.txn_id,
diff --git a/synapse/media/_base.py b/synapse/media/_base.py
index 3fbed6062f..19bca94170 100644
--- a/synapse/media/_base.py
+++ b/synapse/media/_base.py
@@ -25,7 +25,16 @@ import os
 import urllib
 from abc import ABC, abstractmethod
 from types import TracebackType
-from typing import Awaitable, Dict, Generator, List, Optional, Tuple, Type
+from typing import (
+    TYPE_CHECKING,
+    Awaitable,
+    Dict,
+    Generator,
+    List,
+    Optional,
+    Tuple,
+    Type,
+)
 
 import attr
 
@@ -39,6 +48,11 @@ from synapse.http.site import SynapseRequest
 from synapse.logging.context import make_deferred_yieldable
 from synapse.util.stringutils import is_ascii
 
+if TYPE_CHECKING:
+    from synapse.media.media_storage import MultipartResponder
+    from synapse.storage.databases.main.media_repository import LocalMedia
+
+
 logger = logging.getLogger(__name__)
 
 # list all text content types that will have the charset default to UTF-8 when
@@ -260,6 +274,53 @@ def _can_encode_filename_as_token(x: str) -> bool:
     return True
 
 
+async def respond_with_multipart_responder(
+    request: SynapseRequest,
+    responder: "Optional[MultipartResponder]",
+    media_info: "LocalMedia",
+) -> None:
+    """
+    Responds via a Multipart responder for the federation media `/download` requests
+
+    Args:
+        request: the federation request to respond to
+        responder: the Multipart responder which will send the response
+        media_info: metadata about the media item
+    """
+    if not responder:
+        respond_404(request)
+        return
+
+    # If we have a responder we *must* use it as a context manager.
+    with responder:
+        if request._disconnected:
+            logger.warning(
+                "Not sending response to request %s, already disconnected.", request
+            )
+            return
+
+        logger.debug("Responding to media request with responder %s", responder)
+        if media_info.media_length is not None:
+            request.setHeader(b"Content-Length", b"%d" % (media_info.media_length,))
+        request.setHeader(
+            b"Content-Type", b"multipart/mixed; boundary=%s" % responder.boundary
+        )
+
+        try:
+            await responder.write_to_consumer(request)
+        except Exception as e:
+            # The majority of the time this will be due to the client having gone
+            # away. Unfortunately, Twisted simply throws a generic exception at us
+            # in that case.
+            logger.warning("Failed to write to consumer: %s %s", type(e), e)
+
+            # Unregister the producer, if it has one, so Twisted doesn't complain
+            if request.producer:
+                request.unregisterProducer()
+
+    finish_request(request)
+
+
 async def respond_with_responder(
     request: SynapseRequest,
     responder: "Optional[Responder]",
diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py
index 9c29e09653..c335e518a0 100644
--- a/synapse/media/media_repository.py
+++ b/synapse/media/media_repository.py
@@ -42,6 +42,7 @@ from synapse.api.errors import (
     SynapseError,
     cs_error,
 )
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.config.repository import ThumbnailRequirement
 from synapse.http.server import respond_with_json
 from synapse.http.site import SynapseRequest
@@ -53,10 +54,11 @@ from synapse.media._base import (
     ThumbnailInfo,
     get_filename_from_headers,
     respond_404,
+    respond_with_multipart_responder,
     respond_with_responder,
 )
 from synapse.media.filepath import MediaFilePaths
-from synapse.media.media_storage import MediaStorage
+from synapse.media.media_storage import MediaStorage, MultipartResponder
 from synapse.media.storage_provider import StorageProviderWrapper
 from synapse.media.thumbnailer import Thumbnailer, ThumbnailError
 from synapse.media.url_previewer import UrlPreviewer
@@ -111,6 +113,12 @@ class MediaRepository:
         )
         self.prevent_media_downloads_from = hs.config.media.prevent_media_downloads_from
 
+        self.download_ratelimiter = Ratelimiter(
+            store=hs.get_storage_controllers().main,
+            clock=hs.get_clock(),
+            cfg=hs.config.ratelimiting.remote_media_downloads,
+        )
+
         # List of StorageProviders where we should search for media and
         # potentially upload to.
         storage_providers = []
@@ -422,6 +430,7 @@ class MediaRepository:
         media_id: str,
         name: Optional[str],
         max_timeout_ms: int,
+        federation: bool = False,
     ) -> None:
         """Responds to requests for local media, if exists, or returns 404.
 
@@ -433,6 +442,7 @@ class MediaRepository:
                 the filename in the Content-Disposition header of the response.
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            federation: whether the local media being fetched is for a federation request
 
         Returns:
             Resolves once a response has successfully been written to request
@@ -452,10 +462,17 @@ class MediaRepository:
 
         file_info = FileInfo(None, media_id, url_cache=bool(url_cache))
 
-        responder = await self.media_storage.fetch_media(file_info)
-        await respond_with_responder(
-            request, responder, media_type, media_length, upload_name
+        responder = await self.media_storage.fetch_media(
+            file_info, media_info, federation
         )
+        if federation:
+            # this really should be a Multipart responder but just in case
+            assert isinstance(responder, MultipartResponder)
+            await respond_with_multipart_responder(request, responder, media_info)
+        else:
+            await respond_with_responder(
+                request, responder, media_type, media_length, upload_name
+            )
 
     async def get_remote_media(
         self,
@@ -464,6 +481,7 @@ class MediaRepository:
         media_id: str,
         name: Optional[str],
         max_timeout_ms: int,
+        ip_address: str,
     ) -> None:
         """Respond to requests for remote media.
 
@@ -475,6 +493,7 @@ class MediaRepository:
                 the filename in the Content-Disposition header of the response.
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            ip_address: the IP address of the requester
 
         Returns:
             Resolves once a response has successfully been written to request
@@ -500,7 +519,11 @@ class MediaRepository:
         key = (server_name, media_id)
         async with self.remote_media_linearizer.queue(key):
             responder, media_info = await self._get_remote_media_impl(
-                server_name, media_id, max_timeout_ms
+                server_name,
+                media_id,
+                max_timeout_ms,
+                self.download_ratelimiter,
+                ip_address,
             )
 
         # We deliberately stream the file outside the lock
@@ -517,7 +540,7 @@ class MediaRepository:
             respond_404(request)
 
     async def get_remote_media_info(
-        self, server_name: str, media_id: str, max_timeout_ms: int
+        self, server_name: str, media_id: str, max_timeout_ms: int, ip_address: str
     ) -> RemoteMedia:
         """Gets the media info associated with the remote file, downloading
         if necessary.
@@ -527,6 +550,7 @@ class MediaRepository:
             media_id: The media ID of the content (as defined by the remote server).
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            ip_address: IP address of the requester
 
         Returns:
             The media info of the file
@@ -542,7 +566,11 @@ class MediaRepository:
         key = (server_name, media_id)
         async with self.remote_media_linearizer.queue(key):
             responder, media_info = await self._get_remote_media_impl(
-                server_name, media_id, max_timeout_ms
+                server_name,
+                media_id,
+                max_timeout_ms,
+                self.download_ratelimiter,
+                ip_address,
             )
 
         # Ensure we actually use the responder so that it releases resources
@@ -553,7 +581,12 @@ class MediaRepository:
         return media_info
 
     async def _get_remote_media_impl(
-        self, server_name: str, media_id: str, max_timeout_ms: int
+        self,
+        server_name: str,
+        media_id: str,
+        max_timeout_ms: int,
+        download_ratelimiter: Ratelimiter,
+        ip_address: str,
     ) -> Tuple[Optional[Responder], RemoteMedia]:
         """Looks for media in local cache, if not there then attempt to
         download from remote server.
@@ -564,6 +597,9 @@ class MediaRepository:
                 remote server).
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            download_ratelimiter: a ratelimiter limiting remote media downloads, keyed to
+                requester IP.
+            ip_address: the IP address of the requester
 
         Returns:
             A tuple of responder and the media info of the file.
@@ -596,7 +632,7 @@ class MediaRepository:
 
         try:
             media_info = await self._download_remote_file(
-                server_name, media_id, max_timeout_ms
+                server_name, media_id, max_timeout_ms, download_ratelimiter, ip_address
             )
         except SynapseError:
             raise
@@ -630,6 +666,8 @@ class MediaRepository:
         server_name: str,
         media_id: str,
         max_timeout_ms: int,
+        download_ratelimiter: Ratelimiter,
+        ip_address: str,
     ) -> RemoteMedia:
         """Attempt to download the remote file from the given server name,
         using the given file_id as the local id.
@@ -641,6 +679,9 @@ class MediaRepository:
                 locally generated.
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            download_ratelimiter: a ratelimiter limiting remote media downloads, keyed to
+                requester IP
+            ip_address: the IP address of the requester
 
         Returns:
             The media info of the file.
@@ -658,6 +699,8 @@ class MediaRepository:
                     output_stream=f,
                     max_size=self.max_upload_size,
                     max_timeout_ms=max_timeout_ms,
+                    download_ratelimiter=download_ratelimiter,
+                    ip_address=ip_address,
                 )
             except RequestSendFailed as e:
                 logger.warning(
diff --git a/synapse/media/media_storage.py b/synapse/media/media_storage.py
index b3cd3fd8f4..2f55d12b6b 100644
--- a/synapse/media/media_storage.py
+++ b/synapse/media/media_storage.py
@@ -19,9 +19,12 @@
 #
 #
 import contextlib
+import json
 import logging
 import os
 import shutil
+from contextlib import closing
+from io import BytesIO
 from types import TracebackType
 from typing import (
     IO,
@@ -30,14 +33,19 @@ from typing import (
     AsyncIterator,
     BinaryIO,
     Callable,
+    List,
     Optional,
     Sequence,
     Tuple,
     Type,
+    Union,
 )
+from uuid import uuid4
 
 import attr
+from zope.interface import implementer
 
+from twisted.internet import defer, interfaces
 from twisted.internet.defer import Deferred
 from twisted.internet.interfaces import IConsumer
 from twisted.protocols.basic import FileSender
@@ -48,15 +56,19 @@ from synapse.logging.opentracing import start_active_span, trace, trace_with_opn
 from synapse.util import Clock
 from synapse.util.file_consumer import BackgroundFileConsumer
 
+from ..storage.databases.main.media_repository import LocalMedia
+from ..types import JsonDict
 from ._base import FileInfo, Responder
 from .filepath import MediaFilePaths
 
 if TYPE_CHECKING:
-    from synapse.media.storage_provider import StorageProvider
+    from synapse.media.storage_provider import StorageProviderWrapper
     from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
 
+CRLF = b"\r\n"
+
 
 class MediaStorage:
     """Responsible for storing/fetching files from local sources.
@@ -73,7 +85,7 @@ class MediaStorage:
         hs: "HomeServer",
         local_media_directory: str,
         filepaths: MediaFilePaths,
-        storage_providers: Sequence["StorageProvider"],
+        storage_providers: Sequence["StorageProviderWrapper"],
     ):
         self.hs = hs
         self.reactor = hs.get_reactor()
@@ -169,15 +181,23 @@ class MediaStorage:
 
             raise e from None
 
-    async def fetch_media(self, file_info: FileInfo) -> Optional[Responder]:
+    async def fetch_media(
+        self,
+        file_info: FileInfo,
+        media_info: Optional[LocalMedia] = None,
+        federation: bool = False,
+    ) -> Optional[Responder]:
         """Attempts to fetch media described by file_info from the local cache
         and configured storage providers.
 
         Args:
-            file_info
+            file_info: Metadata about the media file
+            media_info: Metadata about the media item
+            federation: Whether this file is being fetched for a federation request
 
         Returns:
-            Returns a Responder if the file was found, otherwise None.
+            If the file was found returns a Responder (a Multipart Responder if the requested
+            file is for the federation /download endpoint), otherwise None.
         """
         paths = [self._file_info_to_path(file_info)]
 
@@ -197,12 +217,19 @@ class MediaStorage:
             local_path = os.path.join(self.local_media_directory, path)
             if os.path.exists(local_path):
                 logger.debug("responding with local file %s", local_path)
-                return FileResponder(open(local_path, "rb"))
+                if federation:
+                    assert media_info is not None
+                    boundary = uuid4().hex.encode("ascii")
+                    return MultipartResponder(
+                        open(local_path, "rb"), media_info, boundary
+                    )
+                else:
+                    return FileResponder(open(local_path, "rb"))
             logger.debug("local file %s did not exist", local_path)
 
         for provider in self.storage_providers:
             for path in paths:
-                res: Any = await provider.fetch(path, file_info)
+                res: Any = await provider.fetch(path, file_info, media_info, federation)
                 if res:
                     logger.debug("Streaming %s from %s", path, provider)
                     return res
@@ -316,7 +343,7 @@ class FileResponder(Responder):
     """Wraps an open file that can be sent to a request.
 
     Args:
-        open_file: A file like object to be streamed ot the client,
+        open_file: A file like object to be streamed to the client,
             is closed when finished streaming.
     """
 
@@ -337,6 +364,38 @@ class FileResponder(Responder):
         self.open_file.close()
 
 
+class MultipartResponder(Responder):
+    """Wraps an open file, formats the response according to MSC3916 and sends it to a
+    federation request.
+
+    Args:
+        open_file: A file like object to be streamed to the client,
+            is closed when finished streaming.
+        media_info: metadata about the media item
+        boundary: bytes to use for the multipart response boundary
+    """
+
+    def __init__(self, open_file: IO, media_info: LocalMedia, boundary: bytes) -> None:
+        self.open_file = open_file
+        self.media_info = media_info
+        self.boundary = boundary
+
+    def write_to_consumer(self, consumer: IConsumer) -> Deferred:
+        return make_deferred_yieldable(
+            MultipartFileSender().beginFileTransfer(
+                self.open_file, consumer, self.media_info.media_type, {}, self.boundary
+            )
+        )
+
+    def __exit__(
+        self,
+        exc_type: Optional[Type[BaseException]],
+        exc_val: Optional[BaseException],
+        exc_tb: Optional[TracebackType],
+    ) -> None:
+        self.open_file.close()
+
+
 class SpamMediaException(NotFoundError):
     """The media was blocked by a spam checker, so we simply 404 the request (in
     the same way as if it was quarantined).
@@ -370,3 +429,151 @@ class ReadableFileWrapper:
 
                 # We yield to the reactor by sleeping for 0 seconds.
                 await self.clock.sleep(0)
+
+
+@implementer(interfaces.IProducer)
+class MultipartFileSender:
+    """
+    A producer that sends the contents of a file to a federation request in the format
+    outlined in MSC3916 - a multipart/format-data response where the first field is a
+    JSON object and the second is the requested file.
+
+    This is a slight re-writing of twisted.protocols.basic.FileSender to achieve the format
+    outlined above.
+    """
+
+    CHUNK_SIZE = 2**14
+
+    lastSent = ""
+    deferred: Optional[defer.Deferred] = None
+
+    def beginFileTransfer(
+        self,
+        file: IO,
+        consumer: IConsumer,
+        file_content_type: str,
+        json_object: JsonDict,
+        boundary: bytes,
+    ) -> Deferred:
+        """
+        Begin transferring a file
+
+        Args:
+            file: The file object to read data from
+            consumer: The synapse request to write the data to
+            file_content_type: The content-type of the file
+            json_object: The JSON object to write to the first field of the response
+            boundary: bytes to be used as the multipart/form-data boundary
+
+        Returns:  A deferred whose callback will be invoked when the file has
+        been completely written to the consumer. The last byte written to the
+        consumer is passed to the callback.
+        """
+        self.file: Optional[IO] = file
+        self.consumer = consumer
+        self.json_field = json_object
+        self.json_field_written = False
+        self.content_type_written = False
+        self.file_content_type = file_content_type
+        self.boundary = boundary
+        self.deferred: Deferred = defer.Deferred()
+        self.consumer.registerProducer(self, False)
+        # while it's not entirely clear why this assignment is necessary, it mirrors
+        # the behavior in FileSender.beginFileTransfer and thus is preserved here
+        deferred = self.deferred
+        return deferred
+
+    def resumeProducing(self) -> None:
+        # write the first field, which will always be a json field
+        if not self.json_field_written:
+            self.consumer.write(CRLF + b"--" + self.boundary + CRLF)
+
+            content_type = Header(b"Content-Type", b"application/json")
+            self.consumer.write(bytes(content_type) + CRLF)
+
+            json_field = json.dumps(self.json_field)
+            json_bytes = json_field.encode("utf-8")
+            self.consumer.write(json_bytes)
+            self.consumer.write(CRLF + b"--" + self.boundary + CRLF)
+
+            self.json_field_written = True
+
+        chunk: Any = ""
+        if self.file:
+            # if we haven't written the content type yet, do so
+            if not self.content_type_written:
+                type = self.file_content_type.encode("utf-8")
+                content_type = Header(b"Content-Type", type)
+                self.consumer.write(bytes(content_type) + CRLF)
+                self.content_type_written = True
+
+            chunk = self.file.read(self.CHUNK_SIZE)
+
+        if not chunk:
+            # we've reached the end of the file
+            self.consumer.write(CRLF + b"--" + self.boundary + b"--" + CRLF)
+            self.file = None
+            self.consumer.unregisterProducer()
+
+            if self.deferred:
+                self.deferred.callback(self.lastSent)
+                self.deferred = None
+            return
+
+        self.consumer.write(chunk)
+        self.lastSent = chunk[-1:]
+
+    def pauseProducing(self) -> None:
+        pass
+
+    def stopProducing(self) -> None:
+        if self.deferred:
+            self.deferred.errback(Exception("Consumer asked us to stop producing"))
+            self.deferred = None
+
+
+class Header:
+    """
+    `Header` This class is a tiny wrapper that produces
+    request headers. We can't use standard python header
+    class because it encodes unicode fields using =? bla bla ?=
+    encoding, which is correct, but no one in HTTP world expects
+    that, everyone wants utf-8 raw bytes. (stolen from treq.multipart)
+
+    """
+
+    def __init__(
+        self,
+        name: bytes,
+        value: Any,
+        params: Optional[List[Tuple[Any, Any]]] = None,
+    ):
+        self.name = name
+        self.value = value
+        self.params = params or []
+
+    def add_param(self, name: Any, value: Any) -> None:
+        self.params.append((name, value))
+
+    def __bytes__(self) -> bytes:
+        with closing(BytesIO()) as h:
+            h.write(self.name + b": " + escape(self.value).encode("us-ascii"))
+            if self.params:
+                for name, val in self.params:
+                    h.write(b"; ")
+                    h.write(escape(name).encode("us-ascii"))
+                    h.write(b"=")
+                    h.write(b'"' + escape(val).encode("utf-8") + b'"')
+            h.seek(0)
+            return h.read()
+
+
+def escape(value: Union[str, bytes]) -> str:
+    """
+    This function prevents header values from corrupting the request,
+    a newline in the file name parameter makes form-data request unreadable
+    for a majority of parsers. (stolen from treq.multipart)
+    """
+    if isinstance(value, bytes):
+        value = value.decode("utf-8")
+    return value.replace("\r", "").replace("\n", "").replace('"', '\\"')
diff --git a/synapse/media/storage_provider.py b/synapse/media/storage_provider.py
index 06e5d27a53..a2d50adf65 100644
--- a/synapse/media/storage_provider.py
+++ b/synapse/media/storage_provider.py
@@ -24,14 +24,16 @@ import logging
 import os
 import shutil
 from typing import TYPE_CHECKING, Callable, Optional
+from uuid import uuid4
 
 from synapse.config._base import Config
 from synapse.logging.context import defer_to_thread, run_in_background
 from synapse.logging.opentracing import start_active_span, trace_with_opname
 from synapse.util.async_helpers import maybe_awaitable
 
+from ..storage.databases.main.media_repository import LocalMedia
 from ._base import FileInfo, Responder
-from .media_storage import FileResponder
+from .media_storage import FileResponder, MultipartResponder
 
 logger = logging.getLogger(__name__)
 
@@ -55,13 +57,21 @@ class StorageProvider(metaclass=abc.ABCMeta):
         """
 
     @abc.abstractmethod
-    async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
+    async def fetch(
+        self,
+        path: str,
+        file_info: FileInfo,
+        media_info: Optional[LocalMedia] = None,
+        federation: bool = False,
+    ) -> Optional[Responder]:
         """Attempt to fetch the file described by file_info and stream it
         into writer.
 
         Args:
             path: Relative path of file in local cache
             file_info: The metadata of the file.
+            media_info: metadata of the media item
+            federation: Whether the requested media is for a federation request
 
         Returns:
             Returns a Responder if the provider has the file, otherwise returns None.
@@ -124,7 +134,13 @@ class StorageProviderWrapper(StorageProvider):
             run_in_background(store)
 
     @trace_with_opname("StorageProviderWrapper.fetch")
-    async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
+    async def fetch(
+        self,
+        path: str,
+        file_info: FileInfo,
+        media_info: Optional[LocalMedia] = None,
+        federation: bool = False,
+    ) -> Optional[Responder]:
         if file_info.url_cache:
             # Files in the URL preview cache definitely aren't stored here,
             # so avoid any potentially slow I/O or network access.
@@ -132,7 +148,9 @@ class StorageProviderWrapper(StorageProvider):
 
         # store_file is supposed to return an Awaitable, but guard
         # against improper implementations.
-        return await maybe_awaitable(self.backend.fetch(path, file_info))
+        return await maybe_awaitable(
+            self.backend.fetch(path, file_info, media_info, federation)
+        )
 
 
 class FileStorageProviderBackend(StorageProvider):
@@ -172,11 +190,23 @@ class FileStorageProviderBackend(StorageProvider):
             )
 
     @trace_with_opname("FileStorageProviderBackend.fetch")
-    async def fetch(self, path: str, file_info: FileInfo) -> Optional[Responder]:
+    async def fetch(
+        self,
+        path: str,
+        file_info: FileInfo,
+        media_info: Optional[LocalMedia] = None,
+        federation: bool = False,
+    ) -> Optional[Responder]:
         """See StorageProvider.fetch"""
 
         backup_fname = os.path.join(self.base_directory, path)
         if os.path.isfile(backup_fname):
+            if federation:
+                assert media_info is not None
+                boundary = uuid4().hex.encode("ascii")
+                return MultipartResponder(
+                    open(backup_fname, "rb"), media_info, boundary
+                )
             return FileResponder(open(backup_fname, "rb"))
 
         return None
diff --git a/synapse/media/thumbnailer.py b/synapse/media/thumbnailer.py
index cc3acf51e1..f8a9560784 100644
--- a/synapse/media/thumbnailer.py
+++ b/synapse/media/thumbnailer.py
@@ -359,9 +359,10 @@ class ThumbnailProvider:
         desired_method: str,
         desired_type: str,
         max_timeout_ms: int,
+        ip_address: str,
     ) -> None:
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms
+            server_name, media_id, max_timeout_ms, ip_address
         )
         if not media_info:
             respond_404(request)
@@ -422,12 +423,13 @@ class ThumbnailProvider:
         method: str,
         m_type: str,
         max_timeout_ms: int,
+        ip_address: str,
     ) -> None:
         # TODO: Don't download the whole remote file
         # We should proxy the thumbnail from the remote server instead of
         # downloading the remote file and generating our own thumbnails.
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms
+            server_name, media_id, max_timeout_ms, ip_address
         )
         if not media_info:
             return
diff --git a/synapse/rest/client/account.py b/synapse/rest/client/account.py
index 6ac07d354c..8daa449f9e 100644
--- a/synapse/rest/client/account.py
+++ b/synapse/rest/client/account.py
@@ -56,14 +56,14 @@ from synapse.http.servlet import (
 from synapse.http.site import SynapseRequest
 from synapse.metrics import threepid_send_requests
 from synapse.push.mailer import Mailer
-from synapse.rest.client.models import (
+from synapse.types import JsonDict
+from synapse.types.rest import RequestBodyModel
+from synapse.types.rest.client import (
     AuthenticationData,
     ClientSecretStr,
     EmailRequestTokenBody,
     MsisdnRequestTokenBody,
 )
-from synapse.rest.models import RequestBodyModel
-from synapse.types import JsonDict
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.stringutils import assert_valid_client_secret, random_string
 from synapse.util.threepids import check_3pid_allowed, validate_email
diff --git a/synapse/rest/client/devices.py b/synapse/rest/client/devices.py
index b1b803549e..8313d687b7 100644
--- a/synapse/rest/client/devices.py
+++ b/synapse/rest/client/devices.py
@@ -42,9 +42,9 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.rest.client._base import client_patterns, interactive_auth_handler
-from synapse.rest.client.models import AuthenticationData
-from synapse.rest.models import RequestBodyModel
 from synapse.types import JsonDict
+from synapse.types.rest import RequestBodyModel
+from synapse.types.rest.client import AuthenticationData
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
diff --git a/synapse/rest/client/directory.py b/synapse/rest/client/directory.py
index 8099fdf3e4..11fdd0f7c6 100644
--- a/synapse/rest/client/directory.py
+++ b/synapse/rest/client/directory.py
@@ -41,8 +41,8 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.rest.client._base import client_patterns
-from synapse.rest.models import RequestBodyModel
 from synapse.types import JsonDict, RoomAlias
+from synapse.types.rest import RequestBodyModel
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
diff --git a/synapse/rest/client/keys.py b/synapse/rest/client/keys.py
index a0017257ce..306db07b86 100644
--- a/synapse/rest/client/keys.py
+++ b/synapse/rest/client/keys.py
@@ -36,7 +36,6 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import log_kv, set_tag
-from synapse.replication.http.devices import ReplicationUploadKeysForUserRestServlet
 from synapse.rest.client._base import client_patterns, interactive_auth_handler
 from synapse.types import JsonDict, StreamToken
 from synapse.util.cancellation import cancellable
@@ -105,13 +104,8 @@ class KeyUploadServlet(RestServlet):
         self.auth = hs.get_auth()
         self.e2e_keys_handler = hs.get_e2e_keys_handler()
         self.device_handler = hs.get_device_handler()
-
-        if hs.config.worker.worker_app is None:
-            # if main process
-            self.key_uploader = self.e2e_keys_handler.upload_keys_for_user
-        else:
-            # then a worker
-            self.key_uploader = ReplicationUploadKeysForUserRestServlet.make_client(hs)
+        self._clock = hs.get_clock()
+        self._store = hs.get_datastores().main
 
     async def on_POST(
         self, request: SynapseRequest, device_id: Optional[str]
@@ -151,9 +145,10 @@ class KeyUploadServlet(RestServlet):
                 400, "To upload keys, you must pass device_id when authenticating"
             )
 
-        result = await self.key_uploader(
+        result = await self.e2e_keys_handler.upload_keys_for_user(
             user_id=user_id, device_id=device_id, keys=body
         )
+
         return 200, result
 
 
diff --git a/synapse/rest/client/media.py b/synapse/rest/client/media.py
index 172d240783..0c089163c1 100644
--- a/synapse/rest/client/media.py
+++ b/synapse/rest/client/media.py
@@ -174,6 +174,7 @@ class UnstableThumbnailResource(RestServlet):
                 respond_404(request)
                 return
 
+            ip_address = request.getClientAddress().host
             remote_resp_function = (
                 self.thumbnailer.select_or_generate_remote_thumbnail
                 if self.dynamic_thumbnails
@@ -188,6 +189,7 @@ class UnstableThumbnailResource(RestServlet):
                 method,
                 m_type,
                 max_timeout_ms,
+                ip_address,
             )
             self.media_repo.mark_recently_accessed(server_name, media_id)
 
diff --git a/synapse/rest/client/models.py b/synapse/rest/client/models.py
deleted file mode 100644
index fc1aed2889..0000000000
--- a/synapse/rest/client/models.py
+++ /dev/null
@@ -1,99 +0,0 @@
-#
-# This file is licensed under the Affero General Public License (AGPL) version 3.
-#
-# Copyright 2022 The Matrix.org Foundation C.I.C.
-# Copyright (C) 2023 New Vector, Ltd
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the GNU Affero General Public License as
-# published by the Free Software Foundation, either version 3 of the
-# License, or (at your option) any later version.
-#
-# See the GNU Affero General Public License for more details:
-# <https://www.gnu.org/licenses/agpl-3.0.html>.
-#
-# Originally licensed under the Apache License, Version 2.0:
-# <http://www.apache.org/licenses/LICENSE-2.0>.
-#
-# [This file includes modifications made by New Vector Limited]
-#
-#
-from typing import TYPE_CHECKING, Dict, Optional
-
-from synapse._pydantic_compat import HAS_PYDANTIC_V2
-
-if TYPE_CHECKING or HAS_PYDANTIC_V2:
-    from pydantic.v1 import Extra, StrictInt, StrictStr, constr, validator
-else:
-    from pydantic import Extra, StrictInt, StrictStr, constr, validator
-
-from synapse.rest.models import RequestBodyModel
-from synapse.util.threepids import validate_email
-
-
-class AuthenticationData(RequestBodyModel):
-    """
-    Data used during user-interactive authentication.
-
-    (The name "Authentication Data" is taken directly from the spec.)
-
-    Additional keys will be present, depending on the `type` field. Use
-    `.dict(exclude_unset=True)` to access them.
-    """
-
-    class Config:
-        extra = Extra.allow
-
-    session: Optional[StrictStr] = None
-    type: Optional[StrictStr] = None
-
-
-if TYPE_CHECKING:
-    ClientSecretStr = StrictStr
-else:
-    # See also assert_valid_client_secret()
-    ClientSecretStr = constr(
-        regex="[0-9a-zA-Z.=_-]",  # noqa: F722
-        min_length=1,
-        max_length=255,
-        strict=True,
-    )
-
-
-class ThreepidRequestTokenBody(RequestBodyModel):
-    client_secret: ClientSecretStr
-    id_server: Optional[StrictStr]
-    id_access_token: Optional[StrictStr]
-    next_link: Optional[StrictStr]
-    send_attempt: StrictInt
-
-    @validator("id_access_token", always=True)
-    def token_required_for_identity_server(
-        cls, token: Optional[str], values: Dict[str, object]
-    ) -> Optional[str]:
-        if values.get("id_server") is not None and token is None:
-            raise ValueError("id_access_token is required if an id_server is supplied.")
-        return token
-
-
-class EmailRequestTokenBody(ThreepidRequestTokenBody):
-    email: StrictStr
-
-    # Canonicalise the email address. The addresses are all stored canonicalised
-    # in the database. This allows the user to reset his password without having to
-    # know the exact spelling (eg. upper and lower case) of address in the database.
-    # Without this, an email stored in the database as "foo@bar.com" would cause
-    # user requests for "FOO@bar.com" to raise a Not Found error.
-    _email_validator = validator("email", allow_reuse=True)(validate_email)
-
-
-if TYPE_CHECKING:
-    ISO3116_1_Alpha_2 = StrictStr
-else:
-    # Per spec: two-letter uppercase ISO-3166-1-alpha-2
-    ISO3116_1_Alpha_2 = constr(regex="[A-Z]{2}", strict=True)
-
-
-class MsisdnRequestTokenBody(ThreepidRequestTokenBody):
-    country: ISO3116_1_Alpha_2
-    phone_number: StrictStr
diff --git a/synapse/rest/client/room.py b/synapse/rest/client/room.py
index fb4d44211e..61fdf71a27 100644
--- a/synapse/rest/client/room.py
+++ b/synapse/rest/client/room.py
@@ -292,6 +292,9 @@ class RoomStateEventRestServlet(RestServlet):
         try:
             if event_type == EventTypes.Member:
                 membership = content.get("membership", None)
+                if not isinstance(membership, str):
+                    raise SynapseError(400, "Invalid membership (must be a string)")
+
                 event_id, _ = await self.room_member_handler.update_membership(
                     requester,
                     target=UserID.from_string(state_key),
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 27ea943e31..1b0ac20d94 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -33,6 +33,7 @@ from synapse.events.utils import (
     format_event_raw,
 )
 from synapse.handlers.presence import format_user_presence_state
+from synapse.handlers.sliding_sync import SlidingSyncConfig, SlidingSyncResult
 from synapse.handlers.sync import (
     ArchivedSyncResult,
     InvitedSyncResult,
@@ -43,10 +44,17 @@ from synapse.handlers.sync import (
     SyncVersion,
 )
 from synapse.http.server import HttpServer
-from synapse.http.servlet import RestServlet, parse_boolean, parse_integer, parse_string
+from synapse.http.servlet import (
+    RestServlet,
+    parse_and_validate_json_object_from_request,
+    parse_boolean,
+    parse_integer,
+    parse_string,
+)
 from synapse.http.site import SynapseRequest
 from synapse.logging.opentracing import trace_with_opname
 from synapse.types import JsonDict, Requester, StreamToken
+from synapse.types.rest.client import SlidingSyncBody
 from synapse.util import json_decoder
 from synapse.util.caches.lrucache import LruCache
 
@@ -735,8 +743,228 @@ class SlidingSyncE2eeRestServlet(RestServlet):
         return 200, response
 
 
+class SlidingSyncRestServlet(RestServlet):
+    """
+    API endpoint for MSC3575 Sliding Sync `/sync`. Allows for clients to request a
+    subset (sliding window) of rooms, state, and timeline events (just what they need)
+    in order to bootstrap quickly and subscribe to only what the client cares about.
+    Because the client can specify what it cares about, we can respond quickly and skip
+    all of the work we would normally have to do with a sync v2 response.
+
+    Request query parameters:
+        timeout: How long to wait for new events in milliseconds.
+        pos: Stream position token when asking for incremental deltas.
+
+    Request body::
+        {
+            // Sliding Window API
+            "lists": {
+                "foo-list": {
+                    "ranges": [ [0, 99] ],
+                    "sort": [ "by_notification_level", "by_recency", "by_name" ],
+                    "required_state": [
+                        ["m.room.join_rules", ""],
+                        ["m.room.history_visibility", ""],
+                        ["m.space.child", "*"]
+                    ],
+                    "timeline_limit": 10,
+                    "filters": {
+                        "is_dm": true
+                    },
+                    "bump_event_types": [ "m.room.message", "m.room.encrypted" ],
+                }
+            },
+            // Room Subscriptions API
+            "room_subscriptions": {
+                "!sub1:bar": {
+                    "required_state": [ ["*","*"] ],
+                    "timeline_limit": 10,
+                    "include_old_rooms": {
+                        "timeline_limit": 1,
+                        "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
+                    }
+                }
+            },
+            // Extensions API
+            "extensions": {}
+        }
+
+    Response JSON::
+        {
+            "next_pos": "s58_224_0_13_10_1_1_16_0_1",
+            "lists": {
+                "foo-list": {
+                    "count": 1337,
+                    "ops": [{
+                        "op": "SYNC",
+                        "range": [0, 99],
+                        "room_ids": [
+                            "!foo:bar",
+                            // ... 99 more room IDs
+                        ]
+                    }]
+                }
+            },
+            // Aggregated rooms from lists and room subscriptions
+            "rooms": {
+                // Room from room subscription
+                "!sub1:bar": {
+                    "name": "Alice and Bob",
+                    "avatar": "mxc://...",
+                    "initial": true,
+                    "required_state": [
+                        {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
+                        {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+                        {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+                        {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}}
+                    ],
+                    "timeline": [
+                        {"sender":"@alice:example.com","type":"m.room.create", "state_key":"", "content":{"creator":"@alice:example.com"}},
+                        {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+                        {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+                        {"sender":"@alice:example.com","type":"m.room.member", "state_key":"@alice:example.com", "content":{"membership":"join"}},
+                        {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
+                        {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
+                    ],
+                    "prev_batch": "t111_222_333",
+                    "joined_count": 41,
+                    "invited_count": 1,
+                    "notification_count": 1,
+                    "highlight_count": 0
+                },
+                // rooms from list
+                "!foo:bar": {
+                    "name": "The calculated room name",
+                    "avatar": "mxc://...",
+                    "initial": true,
+                    "required_state": [
+                        {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+                        {"sender":"@alice:example.com","type":"m.room.history_visibility", "state_key":"", "content":{"history_visibility":"joined"}},
+                        {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!foo:example.com", "content":{"via":["example.com"]}},
+                        {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!bar:example.com", "content":{"via":["example.com"]}},
+                        {"sender":"@alice:example.com","type":"m.space.child", "state_key":"!baz:example.com", "content":{"via":["example.com"]}}
+                    ],
+                    "timeline": [
+                        {"sender":"@alice:example.com","type":"m.room.join_rules", "state_key":"", "content":{"join_rule":"invite"}},
+                        {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"A"}},
+                        {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"B"}},
+                        {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"C"}},
+                        {"sender":"@alice:example.com","type":"m.room.message", "content":{"body":"D"}},
+                    ],
+                    "prev_batch": "t111_222_333",
+                    "joined_count": 4,
+                    "invited_count": 0,
+                    "notification_count": 54,
+                    "highlight_count": 3
+                },
+                 // ... 99 more items
+            },
+            "extensions": {}
+        }
+    """
+
+    PATTERNS = client_patterns(
+        "/org.matrix.msc3575/sync$", releases=[], v1=False, unstable=True
+    )
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__()
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastores().main
+        self.filtering = hs.get_filtering()
+        self.sliding_sync_handler = hs.get_sliding_sync_handler()
+
+    # TODO: Update this to `on_GET` once we figure out how we want to handle params
+    async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
+        requester = await self.auth.get_user_by_req(request, allow_guest=True)
+        user = requester.user
+        device_id = requester.device_id
+
+        timeout = parse_integer(request, "timeout", default=0)
+        # Position in the stream
+        from_token_string = parse_string(request, "pos")
+
+        from_token = None
+        if from_token_string is not None:
+            from_token = await StreamToken.from_string(self.store, from_token_string)
+
+        # TODO: We currently don't know whether we're going to use sticky params or
+        # maybe some filters like sync v2  where they are built up once and referenced
+        # by filter ID. For now, we will just prototype with always passing everything
+        # in.
+        body = parse_and_validate_json_object_from_request(request, SlidingSyncBody)
+        logger.info("Sliding sync request: %r", body)
+
+        sync_config = SlidingSyncConfig(
+            user=user,
+            device_id=device_id,
+            # FIXME: Currently, we're just manually copying the fields from the
+            # `SlidingSyncBody` into the config. How can we gurantee into the future
+            # that we don't forget any? I would like something more structured like
+            # `copy_attributes(from=body, to=config)`
+            lists=body.lists,
+            room_subscriptions=body.room_subscriptions,
+            extensions=body.extensions,
+        )
+
+        sliding_sync_results = await self.sliding_sync_handler.wait_for_sync_for_user(
+            requester,
+            sync_config,
+            from_token,
+            timeout,
+        )
+
+        # The client may have disconnected by now; don't bother to serialize the
+        # response if so.
+        if request._disconnected:
+            logger.info("Client has disconnected; not serializing response.")
+            return 200, {}
+
+        response_content = await self.encode_response(sliding_sync_results)
+
+        return 200, response_content
+
+    # TODO: Is there a better way to encode things?
+    async def encode_response(
+        self,
+        sliding_sync_result: SlidingSyncResult,
+    ) -> JsonDict:
+        response: JsonDict = defaultdict(dict)
+
+        response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store)
+        serialized_lists = self.encode_lists(sliding_sync_result.lists)
+        if serialized_lists:
+            response["lists"] = serialized_lists
+        response["rooms"] = {}  # TODO: sliding_sync_result.rooms
+        response["extensions"] = {}  # TODO: sliding_sync_result.extensions
+
+        return response
+
+    def encode_lists(
+        self, lists: Dict[str, SlidingSyncResult.SlidingWindowList]
+    ) -> JsonDict:
+        def encode_operation(
+            operation: SlidingSyncResult.SlidingWindowList.Operation,
+        ) -> JsonDict:
+            return {
+                "op": operation.op.value,
+                "range": operation.range,
+                "room_ids": operation.room_ids,
+            }
+
+        serialized_lists = {}
+        for list_key, list_result in lists.items():
+            serialized_lists[list_key] = {
+                "count": list_result.count,
+                "ops": [encode_operation(op) for op in list_result.ops],
+            }
+
+        return serialized_lists
+
+
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     SyncRestServlet(hs).register(http_server)
 
     if hs.config.experimental.msc3575_enabled:
+        SlidingSyncRestServlet(hs).register(http_server)
         SlidingSyncE2eeRestServlet(hs).register(http_server)
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index dc7325fc57..a411ed614e 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -41,9 +41,9 @@ from synapse.http.servlet import (
     parse_and_validate_json_object_from_request,
     parse_integer,
 )
-from synapse.rest.models import RequestBodyModel
 from synapse.storage.keys import FetchKeyResultForRemote
 from synapse.types import JsonDict
+from synapse.types.rest import RequestBodyModel
 from synapse.util import json_decoder
 from synapse.util.async_helpers import yieldable_gather_results
 
diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py
index 8ba723c8d4..1628d58926 100644
--- a/synapse/rest/media/download_resource.py
+++ b/synapse/rest/media/download_resource.py
@@ -97,6 +97,12 @@ class DownloadResource(RestServlet):
                 respond_404(request)
                 return
 
+            ip_address = request.getClientAddress().host
             await self.media_repo.get_remote_media(
-                request, server_name, media_id, file_name, max_timeout_ms
+                request,
+                server_name,
+                media_id,
+                file_name,
+                max_timeout_ms,
+                ip_address,
             )
diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py
index fe8fbb06e4..ce511c6dce 100644
--- a/synapse/rest/media/thumbnail_resource.py
+++ b/synapse/rest/media/thumbnail_resource.py
@@ -104,6 +104,7 @@ class ThumbnailResource(RestServlet):
                 respond_404(request)
                 return
 
+            ip_address = request.getClientAddress().host
             remote_resp_function = (
                 self.thumbnail_provider.select_or_generate_remote_thumbnail
                 if self.dynamic_thumbnails
@@ -118,5 +119,6 @@ class ThumbnailResource(RestServlet):
                 method,
                 m_type,
                 max_timeout_ms,
+                ip_address,
             )
             self.media_repo.mark_recently_accessed(server_name, media_id)
diff --git a/synapse/server.py b/synapse/server.py
index 95e319d2e6..ae927c3904 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -109,6 +109,7 @@ from synapse.handlers.room_summary import RoomSummaryHandler
 from synapse.handlers.search import SearchHandler
 from synapse.handlers.send_email import SendEmailHandler
 from synapse.handlers.set_password import SetPasswordHandler
+from synapse.handlers.sliding_sync import SlidingSyncHandler
 from synapse.handlers.sso import SsoHandler
 from synapse.handlers.stats import StatsHandler
 from synapse.handlers.sync import SyncHandler
@@ -554,6 +555,9 @@ class HomeServer(metaclass=abc.ABCMeta):
     def get_sync_handler(self) -> SyncHandler:
         return SyncHandler(self)
 
+    def get_sliding_sync_handler(self) -> SlidingSyncHandler:
+        return SlidingSyncHandler(self)
+
     @cache_in_self
     def get_room_list_handler(self) -> RoomListHandler:
         return RoomListHandler(self)
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 9fddbb2caf..d8b54dc4e3 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -476,7 +476,7 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         )
 
         sql = """
-            SELECT room_id, e.sender, c.membership, event_id, e.stream_ordering, r.room_version
+            SELECT room_id, e.sender, c.membership, event_id, e.instance_name, e.stream_ordering, r.room_version
             FROM local_current_membership AS c
             INNER JOIN events AS e USING (room_id, event_id)
             INNER JOIN rooms AS r USING (room_id)
@@ -488,7 +488,17 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         )
 
         txn.execute(sql, (user_id, *args))
-        results = [RoomsForUser(*r) for r in txn]
+        results = [
+            RoomsForUser(
+                room_id=room_id,
+                sender=sender,
+                membership=membership,
+                event_id=event_id,
+                event_pos=PersistedEventPosition(instance_name, stream_ordering),
+                room_version_id=room_version,
+            )
+            for room_id, sender, membership, event_id, instance_name, stream_ordering, room_version in txn
+        ]
 
         return results
 
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 0513e7dc06..6e18f714d7 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -1281,7 +1281,7 @@ def _parse_words_with_regex(search_term: str) -> List[str]:
     Break down search term into words, when we don't have ICU available.
     See: `_parse_words`
     """
-    return re.findall(r"([\w\-]+)", search_term, re.UNICODE)
+    return re.findall(r"([\w-]+)", search_term, re.UNICODE)
 
 
 def _parse_words_with_icu(search_term: str) -> List[str]:
@@ -1303,15 +1303,69 @@ def _parse_words_with_icu(search_term: str) -> List[str]:
         if j < 0:
             break
 
-        result = search_term[i:j]
+        # We want to make sure that we split on `@` and `:` specifically, as
+        # they occur in user IDs.
+        for result in re.split(r"[@:]+", search_term[i:j]):
+            results.append(result.strip())
+
+        i = j
+
+    # libicu will break up words that have punctuation in them, but to handle
+    # cases where user IDs have '-', '.' and '_' in them we want to *not* break
+    # those into words and instead allow the DB to tokenise them how it wants.
+    #
+    # In particular, user-71 in postgres gets tokenised to "user, -71", and this
+    # will not match a query for "user, 71".
+    new_results: List[str] = []
+    i = 0
+    while i < len(results):
+        curr = results[i]
+
+        prev = None
+        next = None
+        if i > 0:
+            prev = results[i - 1]
+        if i + 1 < len(results):
+            next = results[i + 1]
+
+        i += 1
 
         # libicu considers spaces and punctuation between words as words, but we don't
         # want to include those in results as they would result in syntax errors in SQL
         # queries (e.g. "foo bar" would result in the search query including "foo &  &
         # bar").
-        if len(re.findall(r"([\w\-]+)", result, re.UNICODE)):
-            results.append(result)
+        if not curr:
+            continue
+
+        if curr in ["-", ".", "_"]:
+            prefix = ""
+            suffix = ""
+
+            # Check if the next item is a word, and if so use it as the suffix.
+            # We check for if its a word as we don't want to concatenate
+            # multiple punctuation marks.
+            if next is not None and re.match(r"\w", next):
+                suffix = next
+                i += 1  # We're using next, so we skip it in the outer loop.
+            else:
+                # We want to avoid creating terms like "user-", as we should
+                # strip trailing punctuation.
+                continue
 
-        i = j
+            if prev and re.match(r"\w", prev) and new_results:
+                prefix = new_results[-1]
+                new_results.pop()
+
+            # We might not have a prefix here, but that's fine as we want to
+            # ensure that we don't strip preceding punctuation e.g. '-71'
+            # shouldn't be converted to '71'.
+
+            new_results.append(f"{prefix}{curr}{suffix}")
+            continue
+        elif not re.match(r"\w", curr):
+            # Ignore other punctuation
+            continue
+
+        new_results.append(curr)
 
-    return results
+    return new_results
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index 7471f81a19..80c9630867 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -35,7 +35,7 @@ class RoomsForUser:
     sender: str
     membership: str
     event_id: str
-    stream_ordering: int
+    event_pos: PersistedEventPosition
     room_version_id: str
 
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
new file mode 100644
index 0000000000..1d65551d5b
--- /dev/null
+++ b/synapse/types/handlers/__init__.py
@@ -0,0 +1,252 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+from enum import Enum
+from typing import TYPE_CHECKING, Dict, Final, List, Optional, Tuple
+
+import attr
+from typing_extensions import TypedDict
+
+from synapse._pydantic_compat import HAS_PYDANTIC_V2
+
+if TYPE_CHECKING or HAS_PYDANTIC_V2:
+    from pydantic.v1 import Extra
+else:
+    from pydantic import Extra
+
+from synapse.events import EventBase
+from synapse.types import JsonMapping, StreamToken, UserID
+from synapse.types.rest.client import SlidingSyncBody
+
+
+class ShutdownRoomParams(TypedDict):
+    """
+    Attributes:
+        requester_user_id:
+            User who requested the action. Will be recorded as putting the room on the
+            blocking list.
+        new_room_user_id:
+            If set, a new room will be created with this user ID
+            as the creator and admin, and all users in the old room will be
+            moved into that room. If not set, no new room will be created
+            and the users will just be removed from the old room.
+        new_room_name:
+            A string representing the name of the room that new users will
+            be invited to. Defaults to `Content Violation Notification`
+        message:
+            A string containing the first message that will be sent as
+            `new_room_user_id` in the new room. Ideally this will clearly
+            convey why the original room was shut down.
+            Defaults to `Sharing illegal content on this server is not
+            permitted and rooms in violation will be blocked.`
+        block:
+            If set to `true`, this room will be added to a blocking list,
+            preventing future attempts to join the room. Defaults to `false`.
+        purge:
+            If set to `true`, purge the given room from the database.
+        force_purge:
+            If set to `true`, the room will be purged from database
+            even if there are still users joined to the room.
+    """
+
+    requester_user_id: Optional[str]
+    new_room_user_id: Optional[str]
+    new_room_name: Optional[str]
+    message: Optional[str]
+    block: bool
+    purge: bool
+    force_purge: bool
+
+
+class ShutdownRoomResponse(TypedDict):
+    """
+    Attributes:
+        kicked_users: An array of users (`user_id`) that were kicked.
+        failed_to_kick_users:
+            An array of users (`user_id`) that that were not kicked.
+        local_aliases:
+            An array of strings representing the local aliases that were
+            migrated from the old room to the new.
+        new_room_id: A string representing the room ID of the new room.
+    """
+
+    kicked_users: List[str]
+    failed_to_kick_users: List[str]
+    local_aliases: List[str]
+    new_room_id: Optional[str]
+
+
+class SlidingSyncConfig(SlidingSyncBody):
+    """
+    Inherit from `SlidingSyncBody` since we need all of the same fields and add a few
+    extra fields that we need in the handler
+    """
+
+    user: UserID
+    device_id: Optional[str]
+
+    # Pydantic config
+    class Config:
+        # By default, ignore fields that we don't recognise.
+        extra = Extra.ignore
+        # By default, don't allow fields to be reassigned after parsing.
+        allow_mutation = False
+        # Allow custom types like `UserID` to be used in the model
+        arbitrary_types_allowed = True
+
+
+class OperationType(Enum):
+    """
+    Represents the operation types in a Sliding Sync window.
+
+    Attributes:
+        SYNC: Sets a range of entries. Clients SHOULD discard what they previous knew about
+            entries in this range.
+        INSERT: Sets a single entry. If the position is not empty then clients MUST move
+            entries to the left or the right depending on where the closest empty space is.
+        DELETE: Remove a single entry. Often comes before an INSERT to allow entries to move
+            places.
+        INVALIDATE: Remove a range of entries. Clients MAY persist the invalidated range for
+            offline support, but they should be treated as empty when additional operations
+            which concern indexes in the range arrive from the server.
+    """
+
+    SYNC: Final = "SYNC"
+    INSERT: Final = "INSERT"
+    DELETE: Final = "DELETE"
+    INVALIDATE: Final = "INVALIDATE"
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class SlidingSyncResult:
+    """
+    The Sliding Sync result to be serialized to JSON for a response.
+
+    Attributes:
+        next_pos: The next position token in the sliding window to request (next_batch).
+        lists: Sliding window API. A map of list key to list results.
+        rooms: Room subscription API. A map of room ID to room subscription to room results.
+        extensions: Extensions API. A map of extension key to extension results.
+    """
+
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class RoomResult:
+        """
+        Attributes:
+            name: Room name or calculated room name.
+            avatar: Room avatar
+            heroes: List of stripped membership events (containing `user_id` and optionally
+                `avatar_url` and `displayname`) for the users used to calculate the room name.
+            initial: Flag which is set when this is the first time the server is sending this
+                data on this connection. Clients can use this flag to replace or update
+                their local state. When there is an update, servers MUST omit this flag
+                entirely and NOT send "initial":false as this is wasteful on bandwidth. The
+                absence of this flag means 'false'.
+            required_state: The current state of the room
+            timeline: Latest events in the room. The last event is the most recent
+            is_dm: Flag to specify whether the room is a direct-message room (most likely
+                between two people).
+            invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
+                in sync v2, absent on joined/left rooms
+            prev_batch: A token that can be passed as a start parameter to the
+                `/rooms/<room_id>/messages` API to retrieve earlier messages.
+            limited: True if their are more events than fit between the given position and now.
+                Sync again to get more.
+            joined_count: The number of users with membership of join, including the client's
+                own user ID. (same as sync `v2 m.joined_member_count`)
+            invited_count: The number of users with membership of invite. (same as sync v2
+                `m.invited_member_count`)
+            notification_count: The total number of unread notifications for this room. (same
+                as sync v2)
+            highlight_count: The number of unread notifications for this room with the highlight
+                flag set. (same as sync v2)
+            num_live: The number of timeline events which have just occurred and are not historical.
+                The last N events are 'live' and should be treated as such. This is mostly
+                useful to determine whether a given @mention event should make a noise or not.
+                Clients cannot rely solely on the absence of `initial: true` to determine live
+                events because if a room not in the sliding window bumps into the window because
+                of an @mention it will have `initial: true` yet contain a single live event
+                (with potentially other old events in the timeline).
+        """
+
+        name: str
+        avatar: Optional[str]
+        heroes: Optional[List[EventBase]]
+        initial: bool
+        required_state: List[EventBase]
+        timeline: List[EventBase]
+        is_dm: bool
+        invite_state: List[EventBase]
+        prev_batch: StreamToken
+        limited: bool
+        joined_count: int
+        invited_count: int
+        notification_count: int
+        highlight_count: int
+        num_live: int
+
+    @attr.s(slots=True, frozen=True, auto_attribs=True)
+    class SlidingWindowList:
+        """
+        Attributes:
+            count: The total number of entries in the list. Always present if this list
+                is.
+            ops: The sliding list operations to perform.
+        """
+
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class Operation:
+            """
+            Attributes:
+                op: The operation type to perform.
+                range: Which index positions are affected by this operation. These are
+                    both inclusive.
+                room_ids: Which room IDs are affected by this operation. These IDs match
+                    up to the positions in the `range`, so the last room ID in this list
+                    matches the 9th index. The room data is held in a separate object.
+            """
+
+            op: OperationType
+            range: Tuple[int, int]
+            room_ids: List[str]
+
+        count: int
+        ops: List[Operation]
+
+    next_pos: StreamToken
+    lists: Dict[str, SlidingWindowList]
+    rooms: Dict[str, RoomResult]
+    extensions: JsonMapping
+
+    def __bool__(self) -> bool:
+        """Make the result appear empty if there are no updates. This is used
+        to tell if the notifier needs to wait for more events when polling for
+        events.
+        """
+        return bool(self.lists or self.rooms or self.extensions)
+
+    @staticmethod
+    def empty(next_pos: StreamToken) -> "SlidingSyncResult":
+        "Return a new empty result"
+        return SlidingSyncResult(
+            next_pos=next_pos,
+            lists={},
+            rooms={},
+            extensions={},
+        )
diff --git a/synapse/rest/models.py b/synapse/types/rest/__init__.py
index 2b6f5ed35a..2b6f5ed35a 100644
--- a/synapse/rest/models.py
+++ b/synapse/types/rest/__init__.py
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
new file mode 100644
index 0000000000..ef261518a0
--- /dev/null
+++ b/synapse/types/rest/client/__init__.py
@@ -0,0 +1,284 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+# Copyright (C) 2023 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union
+
+from synapse._pydantic_compat import HAS_PYDANTIC_V2
+
+if TYPE_CHECKING or HAS_PYDANTIC_V2:
+    from pydantic.v1 import (
+        Extra,
+        StrictBool,
+        StrictInt,
+        StrictStr,
+        conint,
+        constr,
+        validator,
+    )
+else:
+    from pydantic import (
+        Extra,
+        StrictBool,
+        StrictInt,
+        StrictStr,
+        conint,
+        constr,
+        validator,
+    )
+
+from synapse.types.rest import RequestBodyModel
+from synapse.util.threepids import validate_email
+
+
+class AuthenticationData(RequestBodyModel):
+    """
+    Data used during user-interactive authentication.
+
+    (The name "Authentication Data" is taken directly from the spec.)
+
+    Additional keys will be present, depending on the `type` field. Use
+    `.dict(exclude_unset=True)` to access them.
+    """
+
+    class Config:
+        extra = Extra.allow
+
+    session: Optional[StrictStr] = None
+    type: Optional[StrictStr] = None
+
+
+if TYPE_CHECKING:
+    ClientSecretStr = StrictStr
+else:
+    # See also assert_valid_client_secret()
+    ClientSecretStr = constr(
+        regex="[0-9a-zA-Z.=_-]",  # noqa: F722
+        min_length=1,
+        max_length=255,
+        strict=True,
+    )
+
+
+class ThreepidRequestTokenBody(RequestBodyModel):
+    client_secret: ClientSecretStr
+    id_server: Optional[StrictStr]
+    id_access_token: Optional[StrictStr]
+    next_link: Optional[StrictStr]
+    send_attempt: StrictInt
+
+    @validator("id_access_token", always=True)
+    def token_required_for_identity_server(
+        cls, token: Optional[str], values: Dict[str, object]
+    ) -> Optional[str]:
+        if values.get("id_server") is not None and token is None:
+            raise ValueError("id_access_token is required if an id_server is supplied.")
+        return token
+
+
+class EmailRequestTokenBody(ThreepidRequestTokenBody):
+    email: StrictStr
+
+    # Canonicalise the email address. The addresses are all stored canonicalised
+    # in the database. This allows the user to reset his password without having to
+    # know the exact spelling (eg. upper and lower case) of address in the database.
+    # Without this, an email stored in the database as "foo@bar.com" would cause
+    # user requests for "FOO@bar.com" to raise a Not Found error.
+    _email_validator = validator("email", allow_reuse=True)(validate_email)
+
+
+if TYPE_CHECKING:
+    ISO3116_1_Alpha_2 = StrictStr
+else:
+    # Per spec: two-letter uppercase ISO-3166-1-alpha-2
+    ISO3116_1_Alpha_2 = constr(regex="[A-Z]{2}", strict=True)
+
+
+class MsisdnRequestTokenBody(ThreepidRequestTokenBody):
+    country: ISO3116_1_Alpha_2
+    phone_number: StrictStr
+
+
+class SlidingSyncBody(RequestBodyModel):
+    """
+    Sliding Sync API request body.
+
+    Attributes:
+        lists: Sliding window API. A map of list key to list information
+            (:class:`SlidingSyncList`). Max lists: 100. The list keys should be
+            arbitrary strings which the client is using to refer to the list. Keep this
+            small as it needs to be sent a lot. Max length: 64 bytes.
+        room_subscriptions: Room subscription API. A map of room ID to room subscription
+            information. Used to subscribe to a specific room. Sometimes clients know
+            exactly which room they want to get information about e.g by following a
+            permalink or by refreshing a webapp currently viewing a specific room. The
+            sliding window API alone is insufficient for this use case because there's
+            no way to say "please track this room explicitly".
+        extensions: Extensions API. A map of extension key to extension config.
+    """
+
+    class CommonRoomParameters(RequestBodyModel):
+        """
+        Common parameters shared between the sliding window and room subscription APIs.
+
+        Attributes:
+            required_state: Required state for each room returned. An array of event
+                type and state key tuples. Elements in this array are ORd together to
+                produce the final set of state events to return. One unique exception is
+                when you request all state events via `["*", "*"]`. When used, all state
+                events are returned by default, and additional entries FILTER OUT the
+                returned set of state events. These additional entries cannot use `*`
+                themselves. For example, `["*", "*"], ["m.room.member",
+                "@alice:example.com"]` will *exclude* every `m.room.member` event
+                *except* for `@alice:example.com`, and include every other state event.
+                In addition, `["*", "*"], ["m.space.child", "*"]` is an error, the
+                `m.space.child` filter is not required as it would have been returned
+                anyway.
+            timeline_limit: The maximum number of timeline events to return per response.
+                (Max 1000 messages)
+            include_old_rooms: Determines if `predecessor` rooms are included in the
+                `rooms` response. The user MUST be joined to old rooms for them to show up
+                in the response.
+        """
+
+        class IncludeOldRooms(RequestBodyModel):
+            timeline_limit: StrictInt
+            required_state: List[Tuple[StrictStr, StrictStr]]
+
+        required_state: List[Tuple[StrictStr, StrictStr]]
+        # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
+        if TYPE_CHECKING:
+            timeline_limit: int
+        else:
+            timeline_limit: conint(le=1000, strict=True)  # type: ignore[valid-type]
+        include_old_rooms: Optional[IncludeOldRooms] = None
+
+    class SlidingSyncList(CommonRoomParameters):
+        """
+        Attributes:
+            ranges: Sliding window ranges. If this field is missing, no sliding window
+                is used and all rooms are returned in this list. Integers are
+                *inclusive*.
+            sort: How the list should be sorted on the server. The first value is
+                applied first, then tiebreaks are performed with each subsequent sort
+                listed.
+
+                    FIXME: Furthermore, it's not currently defined how servers should behave
+                    if they encounter a filter or sort operation they do not recognise. If
+                    the server rejects the request with an HTTP 400 then that will break
+                    backwards compatibility with new clients vs old servers. However, the
+                    client would be otherwise unaware that only some of the sort/filter
+                    operations have taken effect. We may need to include a "warnings"
+                    section to indicate which sort/filter operations are unrecognised,
+                    allowing for some form of graceful degradation of service.
+                    -- https://github.com/matrix-org/matrix-spec-proposals/blob/kegan/sync-v3/proposals/3575-sync.md#filter-and-sort-extensions
+
+            slow_get_all_rooms: Just get all rooms (for clients that don't want to deal with
+                sliding windows). When true, the `ranges` and `sort` fields are ignored.
+            required_state: Required state for each room returned. An array of event
+                type and state key tuples. Elements in this array are ORd together to
+                produce the final set of state events to return.
+
+                One unique exception is when you request all state events via `["*",
+                "*"]`. When used, all state events are returned by default, and
+                additional entries FILTER OUT the returned set of state events. These
+                additional entries cannot use `*` themselves. For example, `["*", "*"],
+                ["m.room.member", "@alice:example.com"]` will *exclude* every
+                `m.room.member` event *except* for `@alice:example.com`, and include
+                every other state event. In addition, `["*", "*"], ["m.space.child",
+                "*"]` is an error, the `m.space.child` filter is not required as it
+                would have been returned anyway.
+
+                Room members can be lazily-loaded by using the special `$LAZY` state key
+                (`["m.room.member", "$LAZY"]`). Typically, when you view a room, you
+                want to retrieve all state events except for m.room.member events which
+                you want to lazily load. To get this behaviour, clients can send the
+                following::
+
+                    {
+                        "required_state": [
+                            // activate lazy loading
+                            ["m.room.member", "$LAZY"],
+                            // request all state events _except_ for m.room.member
+                            events which are lazily loaded
+                            ["*", "*"]
+                        ]
+                    }
+
+            timeline_limit: The maximum number of timeline events to return per response.
+            include_old_rooms: Determines if `predecessor` rooms are included in the
+                `rooms` response. The user MUST be joined to old rooms for them to show up
+                in the response.
+            include_heroes: Return a stripped variant of membership events (containing
+                `user_id` and optionally `avatar_url` and `displayname`) for the users used
+                to calculate the room name.
+            filters: Filters to apply to the list before sorting.
+            bump_event_types: Allowlist of event types which should be considered recent activity
+                when sorting `by_recency`. By omitting event types from this field,
+                clients can ensure that uninteresting events (e.g. a profile rename) do
+                not cause a room to jump to the top of its list(s). Empty or omitted
+                `bump_event_types` have no effect—all events in a room will be
+                considered recent activity.
+        """
+
+        class Filters(RequestBodyModel):
+            is_dm: Optional[StrictBool] = None
+            spaces: Optional[List[StrictStr]] = None
+            is_encrypted: Optional[StrictBool] = None
+            is_invite: Optional[StrictBool] = None
+            room_types: Optional[List[Union[StrictStr, None]]] = None
+            not_room_types: Optional[List[StrictStr]] = None
+            room_name_like: Optional[StrictStr] = None
+            tags: Optional[List[StrictStr]] = None
+            not_tags: Optional[List[StrictStr]] = None
+
+        # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
+        if TYPE_CHECKING:
+            ranges: Optional[List[Tuple[int, int]]] = None
+        else:
+            ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None  # type: ignore[valid-type]
+        sort: Optional[List[StrictStr]] = None
+        slow_get_all_rooms: Optional[StrictBool] = False
+        include_heroes: Optional[StrictBool] = False
+        filters: Optional[Filters] = None
+        bump_event_types: Optional[List[StrictStr]] = None
+
+    class RoomSubscription(CommonRoomParameters):
+        pass
+
+    class Extension(RequestBodyModel):
+        enabled: Optional[StrictBool] = False
+        lists: Optional[List[StrictStr]] = None
+        rooms: Optional[List[StrictStr]] = None
+
+    # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
+    if TYPE_CHECKING:
+        lists: Optional[Dict[str, SlidingSyncList]] = None
+    else:
+        lists: Optional[Dict[constr(max_length=64, strict=True), SlidingSyncList]] = None  # type: ignore[valid-type]
+    room_subscriptions: Optional[Dict[StrictStr, RoomSubscription]] = None
+    extensions: Optional[Dict[StrictStr, Extension]] = None
+
+    @validator("lists")
+    def lists_length_check(
+        cls, value: Optional[Dict[str, SlidingSyncList]]
+    ) -> Optional[Dict[str, SlidingSyncList]]:
+        if value is not None:
+            assert len(value) <= 100, f"Max lists: 100 but saw {len(value)}"
+        return value
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 09a947ef15..c891bd845b 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -151,7 +151,7 @@ async def filter_events_for_client(
             filter_send_to_client=filter_send_to_client,
             sender_ignored=event.sender in ignore_list,
             always_include_ids=always_include_ids,
-            retention_policy=retention_policies[room_id],
+            retention_policy=retention_policies[event.room_id],
             state=state_after_event,
             is_peeking=is_peeking,
             sender_erased=erased_senders.get(event.sender, False),