summary refs log tree commit diff
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-22 12:08:55 +0100
committerErik Johnston <erik@matrix.org>2024-07-22 12:11:09 +0100
commit94f06143fcbeb597505b394aba5b0908995177c0 (patch)
treed325599a8485a1f57afc6d8326f497f70e0ae6da
parentRemove fiddling (diff)
parentWIP faster sort (diff)
downloadsynapse-94f06143fcbeb597505b394aba5b0908995177c0.tar.xz
Merge branch 'erikj/ss_faster_sort' into erikj/ss_hacks
-rw-r--r--CHANGES.md7
-rw-r--r--changelog.d/17433.feature1
-rw-r--r--changelog.d/17435.bugfix1
-rw-r--r--changelog.d/17451.doc1
-rw-r--r--changelog.d/17453.misc1
-rw-r--r--changelog.d/17458.misc1
-rw-r--r--changelog.d/17460.misc1
-rw-r--r--debian/changelog6
-rw-r--r--docs/usage/configuration/config_documentation.md44
-rw-r--r--poetry.lock23
-rw-r--r--pyproject.toml2
-rwxr-xr-xsynapse/_scripts/synapse_port_db.py5
-rw-r--r--synapse/api/constants.py2
-rw-r--r--synapse/config/repository.py4
-rw-r--r--synapse/handlers/room.py2
-rw-r--r--synapse/handlers/sliding_sync.py38
-rw-r--r--synapse/media/media_repository.py39
-rw-r--r--synapse/media/thumbnailer.py48
-rw-r--r--synapse/rest/media/download_resource.py3
-rw-r--r--synapse/rest/media/thumbnail_resource.py5
-rw-r--r--synapse/storage/_base.py5
-rw-r--r--synapse/storage/databases/main/cache.py6
-rw-r--r--synapse/storage/databases/main/media_repository.py28
-rw-r--r--synapse/storage/databases/main/roommember.py84
-rw-r--r--synapse/storage/databases/main/stream.py90
-rw-r--r--synapse/storage/schema/__init__.py5
-rw-r--r--synapse/storage/schema/main/delta/86/01_authenticate_media.sql15
-rw-r--r--tests/handlers/test_sync.py1
-rw-r--r--tests/rest/client/test_media.py209
-rw-r--r--tests/rest/client/test_sync.py21
-rw-r--r--tests/storage/test_roommember.py403
31 files changed, 1022 insertions, 79 deletions
diff --git a/CHANGES.md b/CHANGES.md
index 8279960b5b..0a2b816ed1 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -1,3 +1,10 @@
+# Synapse 1.111.0 (2024-07-16)
+
+No significant changes since 1.111.0rc2.
+
+
+
+
 # Synapse 1.111.0rc2 (2024-07-10)
 
 ### Bugfixes
diff --git a/changelog.d/17433.feature b/changelog.d/17433.feature
new file mode 100644
index 0000000000..ac9b5dee69
--- /dev/null
+++ b/changelog.d/17433.feature
@@ -0,0 +1 @@
+Prepare for authenticated media freeze.
\ No newline at end of file
diff --git a/changelog.d/17435.bugfix b/changelog.d/17435.bugfix
new file mode 100644
index 0000000000..2d06a7c7fc
--- /dev/null
+++ b/changelog.d/17435.bugfix
@@ -0,0 +1 @@
+Order `heroes` by `stream_ordering` as the Matrix specification states (applies to `/sync`).
diff --git a/changelog.d/17451.doc b/changelog.d/17451.doc
new file mode 100644
index 0000000000..357ac2c906
--- /dev/null
+++ b/changelog.d/17451.doc
@@ -0,0 +1 @@
+Improve documentation for the [`default_power_level_content_override`](https://element-hq.github.io/synapse/latest/usage/configuration/config_documentation.html#default_power_level_content_override) config option.
diff --git a/changelog.d/17453.misc b/changelog.d/17453.misc
new file mode 100644
index 0000000000..2978a52477
--- /dev/null
+++ b/changelog.d/17453.misc
@@ -0,0 +1 @@
+Update experimental [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575) Sliding Sync `/sync` endpoint to bump room when it is created.
diff --git a/changelog.d/17458.misc b/changelog.d/17458.misc
new file mode 100644
index 0000000000..09cce15d0d
--- /dev/null
+++ b/changelog.d/17458.misc
@@ -0,0 +1 @@
+Speed up generating sliding sync responses.
diff --git a/changelog.d/17460.misc b/changelog.d/17460.misc
new file mode 100644
index 0000000000..fd99da5a95
--- /dev/null
+++ b/changelog.d/17460.misc
@@ -0,0 +1 @@
+Add cache to `get_rooms_for_local_user_where_membership_is` to speed up sliding sync.
diff --git a/debian/changelog b/debian/changelog
index 0f3dcc64e6..0470e25f2d 100644
--- a/debian/changelog
+++ b/debian/changelog
@@ -1,3 +1,9 @@
+matrix-synapse-py3 (1.111.0) stable; urgency=medium
+
+  * New Synapse release 1.111.0.
+
+ -- Synapse Packaging team <packages@matrix.org>  Tue, 16 Jul 2024 12:42:46 +0200
+
 matrix-synapse-py3 (1.111.0~rc2) stable; urgency=medium
 
   * New synapse release 1.111.0rc2.
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 65b03ad0f8..e8bc2df798 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -1864,6 +1864,18 @@ federation_rr_transactions_per_room_per_second: 40
 Config options related to Synapse's media store.
 
 ---
+### `enable_authenticated_media`
+
+When set to true, all subsequent media uploads will be marked as authenticated, and will not be available over legacy
+unauthenticated media endpoints (`/_matrix/media/(r0|v3|v1)/download` and `/_matrix/media/(r0|v3|v1)/thumbnail`) - requests for authenticated media over these endpoints will result in a 404. All media, including authenticated media, will be available over the authenticated media endpoints `_matrix/client/v1/media/download` and `_matrix/client/v1/media/thumbnail`. Media uploaded prior to setting this option to true will still be available over the legacy endpoints. Note if the setting is switched to false
+after enabling, media marked as authenticated will be available over legacy endpoints. Defaults to false, but
+this will change to true in a future Synapse release.
+
+Example configuration:
+```yaml
+enable_authenticated_media: true
+```
+---
 ### `enable_media_repo`
 
 Enable the media store service in the Synapse master. Defaults to true.
@@ -4134,6 +4146,38 @@ default_power_level_content_override:
    trusted_private_chat: null
    public_chat: null
 ```
+
+The default power levels for each preset are:
+```yaml
+"m.room.name": 50
+"m.room.power_levels": 100
+"m.room.history_visibility": 100
+"m.room.canonical_alias": 50
+"m.room.avatar": 50
+"m.room.tombstone": 100
+"m.room.server_acl": 100
+"m.room.encryption": 100
+```
+
+So a complete example where the default power-levels for a preset are maintained
+but the power level for a new key is set is:
+```yaml
+default_power_level_content_override:
+   private_chat:
+    events:
+      "com.example.foo": 0
+      "m.room.name": 50
+      "m.room.power_levels": 100
+      "m.room.history_visibility": 100
+      "m.room.canonical_alias": 50
+      "m.room.avatar": 50
+      "m.room.tombstone": 100
+      "m.room.server_acl": 100
+      "m.room.encryption": 100
+   trusted_private_chat: null
+   public_chat: null
+```
+
 ---
 ### `forget_rooms_on_leave`
 
diff --git a/poetry.lock b/poetry.lock
index 4092a41884..2bfcb59cf2 100644
--- a/poetry.lock
+++ b/poetry.lock
@@ -2430,13 +2430,13 @@ doc = ["Sphinx", "sphinx-rtd-theme"]
 
 [[package]]
 name = "sentry-sdk"
-version = "2.6.0"
+version = "2.8.0"
 description = "Python client for Sentry (https://sentry.io)"
 optional = true
 python-versions = ">=3.6"
 files = [
-    {file = "sentry_sdk-2.6.0-py2.py3-none-any.whl", hash = "sha256:422b91cb49378b97e7e8d0e8d5a1069df23689d45262b86f54988a7db264e874"},
-    {file = "sentry_sdk-2.6.0.tar.gz", hash = "sha256:65cc07e9c6995c5e316109f138570b32da3bd7ff8d0d0ee4aaf2628c3dd8127d"},
+    {file = "sentry_sdk-2.8.0-py2.py3-none-any.whl", hash = "sha256:6051562d2cfa8087bb8b4b8b79dc44690f8a054762a29c07e22588b1f619bfb5"},
+    {file = "sentry_sdk-2.8.0.tar.gz", hash = "sha256:aa4314f877d9cd9add5a0c9ba18e3f27f99f7de835ce36bd150e48a41c7c646f"},
 ]
 
 [package.dependencies]
@@ -2466,7 +2466,7 @@ langchain = ["langchain (>=0.0.210)"]
 loguru = ["loguru (>=0.5)"]
 openai = ["openai (>=1.0.0)", "tiktoken (>=0.3.0)"]
 opentelemetry = ["opentelemetry-distro (>=0.35b0)"]
-opentelemetry-experimental = ["opentelemetry-distro (>=0.40b0,<1.0)", "opentelemetry-instrumentation-aiohttp-client (>=0.40b0,<1.0)", "opentelemetry-instrumentation-django (>=0.40b0,<1.0)", "opentelemetry-instrumentation-fastapi (>=0.40b0,<1.0)", "opentelemetry-instrumentation-flask (>=0.40b0,<1.0)", "opentelemetry-instrumentation-requests (>=0.40b0,<1.0)", "opentelemetry-instrumentation-sqlite3 (>=0.40b0,<1.0)", "opentelemetry-instrumentation-urllib (>=0.40b0,<1.0)"]
+opentelemetry-experimental = ["opentelemetry-instrumentation-aio-pika (==0.46b0)", "opentelemetry-instrumentation-aiohttp-client (==0.46b0)", "opentelemetry-instrumentation-aiopg (==0.46b0)", "opentelemetry-instrumentation-asgi (==0.46b0)", "opentelemetry-instrumentation-asyncio (==0.46b0)", "opentelemetry-instrumentation-asyncpg (==0.46b0)", "opentelemetry-instrumentation-aws-lambda (==0.46b0)", "opentelemetry-instrumentation-boto (==0.46b0)", "opentelemetry-instrumentation-boto3sqs (==0.46b0)", "opentelemetry-instrumentation-botocore (==0.46b0)", "opentelemetry-instrumentation-cassandra (==0.46b0)", "opentelemetry-instrumentation-celery (==0.46b0)", "opentelemetry-instrumentation-confluent-kafka (==0.46b0)", "opentelemetry-instrumentation-dbapi (==0.46b0)", "opentelemetry-instrumentation-django (==0.46b0)", "opentelemetry-instrumentation-elasticsearch (==0.46b0)", "opentelemetry-instrumentation-falcon (==0.46b0)", "opentelemetry-instrumentation-fastapi (==0.46b0)", "opentelemetry-instrumentation-flask (==0.46b0)", "opentelemetry-instrumentation-grpc (==0.46b0)", "opentelemetry-instrumentation-httpx (==0.46b0)", "opentelemetry-instrumentation-jinja2 (==0.46b0)", "opentelemetry-instrumentation-kafka-python (==0.46b0)", "opentelemetry-instrumentation-logging (==0.46b0)", "opentelemetry-instrumentation-mysql (==0.46b0)", "opentelemetry-instrumentation-mysqlclient (==0.46b0)", "opentelemetry-instrumentation-pika (==0.46b0)", "opentelemetry-instrumentation-psycopg (==0.46b0)", "opentelemetry-instrumentation-psycopg2 (==0.46b0)", "opentelemetry-instrumentation-pymemcache (==0.46b0)", "opentelemetry-instrumentation-pymongo (==0.46b0)", "opentelemetry-instrumentation-pymysql (==0.46b0)", "opentelemetry-instrumentation-pyramid (==0.46b0)", "opentelemetry-instrumentation-redis (==0.46b0)", "opentelemetry-instrumentation-remoulade (==0.46b0)", "opentelemetry-instrumentation-requests (==0.46b0)", "opentelemetry-instrumentation-sklearn (==0.46b0)", "opentelemetry-instrumentation-sqlalchemy (==0.46b0)", "opentelemetry-instrumentation-sqlite3 (==0.46b0)", "opentelemetry-instrumentation-starlette (==0.46b0)", "opentelemetry-instrumentation-system-metrics (==0.46b0)", "opentelemetry-instrumentation-threading (==0.46b0)", "opentelemetry-instrumentation-tornado (==0.46b0)", "opentelemetry-instrumentation-tortoiseorm (==0.46b0)", "opentelemetry-instrumentation-urllib (==0.46b0)", "opentelemetry-instrumentation-urllib3 (==0.46b0)", "opentelemetry-instrumentation-wsgi (==0.46b0)"]
 pure-eval = ["asttokens", "executing", "pure-eval"]
 pymongo = ["pymongo (>=3.1)"]
 pyspark = ["pyspark (>=2.4.4)"]
@@ -2476,7 +2476,7 @@ sanic = ["sanic (>=0.8)"]
 sqlalchemy = ["sqlalchemy (>=1.2)"]
 starlette = ["starlette (>=0.19.1)"]
 starlite = ["starlite (>=1.48)"]
-tornado = ["tornado (>=5)"]
+tornado = ["tornado (>=6)"]
 
 [[package]]
 name = "service-identity"
@@ -2504,18 +2504,19 @@ tests = ["coverage[toml] (>=5.0.2)", "pytest"]
 
 [[package]]
 name = "setuptools"
-version = "70.0.0"
+version = "67.6.0"
 description = "Easily download, build, install, upgrade, and uninstall Python packages"
 optional = false
-python-versions = ">=3.8"
+python-versions = ">=3.7"
 files = [
-    {file = "setuptools-70.0.0-py3-none-any.whl", hash = "sha256:54faa7f2e8d2d11bcd2c07bed282eef1046b5c080d1c32add737d7b5817b1ad4"},
-    {file = "setuptools-70.0.0.tar.gz", hash = "sha256:f211a66637b8fa059bb28183da127d4e86396c991a942b028c6650d4319c3fd0"},
+    {file = "setuptools-67.6.0-py3-none-any.whl", hash = "sha256:b78aaa36f6b90a074c1fa651168723acbf45d14cb1196b6f02c0fd07f17623b2"},
+    {file = "setuptools-67.6.0.tar.gz", hash = "sha256:2ee892cd5f29f3373097f5a814697e397cf3ce313616df0af11231e2ad118077"},
 ]
 
 [package.extras]
-docs = ["furo", "jaraco.packaging (>=9.3)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "pyproject-hooks (!=1.1)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (>=1,<2)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
-testing = ["build[virtualenv] (>=1.0.3)", "filelock (>=3.4.0)", "importlib-metadata", "ini2toml[lite] (>=0.14)", "jaraco.develop (>=7.21)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "mypy (==1.9)", "packaging (>=23.2)", "pip (>=19.1)", "pyproject-hooks (!=1.1)", "pytest (>=6,!=8.1.1)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=2.2)", "pytest-home (>=0.5)", "pytest-mypy", "pytest-perf", "pytest-ruff (>=0.2.1)", "pytest-subprocess", "pytest-timeout", "pytest-xdist (>=3)", "tomli", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
+docs = ["furo", "jaraco.packaging (>=9)", "jaraco.tidelift (>=1.4)", "pygments-github-lexers (==0.0.5)", "rst.linker (>=1.9)", "sphinx (>=3.5)", "sphinx-favicon", "sphinx-hoverxref (<2)", "sphinx-inline-tabs", "sphinx-lint", "sphinx-notfound-page (==0.8.3)", "sphinx-reredirects", "sphinxcontrib-towncrier"]
+testing = ["build[virtualenv]", "filelock (>=3.4.0)", "flake8 (<5)", "flake8-2020", "ini2toml[lite] (>=0.9)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pip (>=19.1)", "pip-run (>=8.8)", "pytest (>=6)", "pytest-black (>=0.3.7)", "pytest-checkdocs (>=2.4)", "pytest-cov", "pytest-enabler (>=1.3)", "pytest-flake8", "pytest-mypy (>=0.9.1)", "pytest-perf", "pytest-timeout", "pytest-xdist", "tomli-w (>=1.0.0)", "virtualenv (>=13.0.0)", "wheel"]
+testing-integration = ["build[virtualenv]", "filelock (>=3.4.0)", "jaraco.envs (>=2.2)", "jaraco.path (>=3.2.0)", "pytest", "pytest-enabler", "pytest-xdist", "tomli", "virtualenv (>=13.0.0)", "wheel"]
 
 [[package]]
 name = "setuptools-rust"
diff --git a/pyproject.toml b/pyproject.toml
index 3df42ae448..1f9ee2b944 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -97,7 +97,7 @@ module-name = "synapse.synapse_rust"
 
 [tool.poetry]
 name = "matrix-synapse"
-version = "1.111.0rc2"
+version = "1.111.0"
 description = "Homeserver for the Matrix decentralised comms protocol"
 authors = ["Matrix.org Team and Contributors <packages@matrix.org>"]
 license = "AGPL-3.0-or-later"
diff --git a/synapse/_scripts/synapse_port_db.py b/synapse/_scripts/synapse_port_db.py
index 3bb4a34938..5c6db8118f 100755
--- a/synapse/_scripts/synapse_port_db.py
+++ b/synapse/_scripts/synapse_port_db.py
@@ -119,18 +119,19 @@ BOOLEAN_COLUMNS = {
     "e2e_room_keys": ["is_verified"],
     "event_edges": ["is_state"],
     "events": ["processed", "outlier", "contains_url"],
-    "local_media_repository": ["safe_from_quarantine"],
+    "local_media_repository": ["safe_from_quarantine", "authenticated"],
+    "per_user_experimental_features": ["enabled"],
     "presence_list": ["accepted"],
     "presence_stream": ["currently_active"],
     "public_room_list_stream": ["visibility"],
     "pushers": ["enabled"],
     "redactions": ["have_censored"],
+    "remote_media_cache": ["authenticated"],
     "room_stats_state": ["is_federatable"],
     "rooms": ["is_public", "has_auth_chain_index"],
     "users": ["shadow_banned", "approved", "locked", "suspended"],
     "un_partial_stated_event_stream": ["rejection_status_changed"],
     "users_who_share_rooms": ["share_private"],
-    "per_user_experimental_features": ["enabled"],
 }
 
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 12d18137e0..85001d9676 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -50,7 +50,7 @@ class Membership:
     KNOCK: Final = "knock"
     LEAVE: Final = "leave"
     BAN: Final = "ban"
-    LIST: Final = {INVITE, JOIN, KNOCK, LEAVE, BAN}
+    LIST: Final = frozenset((INVITE, JOIN, KNOCK, LEAVE, BAN))
 
 
 class PresenceState:
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index dc0e93ffa1..97ce6de528 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -272,6 +272,10 @@ class ContentRepositoryConfig(Config):
                 remote_media_lifetime
             )
 
+        self.enable_authenticated_media = config.get(
+            "enable_authenticated_media", False
+        )
+
     def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
         assert data_dir_path is not None
         media_store = os.path.join(data_dir_path, "media_store")
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 2302d283a7..262d9f4044 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -1188,6 +1188,8 @@ class RoomCreationHandler:
             )
             events_to_send.append((power_event, power_context))
         else:
+            # Please update the docs for `default_power_level_content_override` when
+            # updating the `events` dict below
             power_level_content: JsonDict = {
                 "users": {creator_id: 100},
                 "users_default": 0,
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 5257a85557..b07b62a8fc 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -59,6 +59,7 @@ logger = logging.getLogger(__name__)
 
 # The event types that clients should consider as new activity.
 DEFAULT_BUMP_EVENT_TYPES = {
+    EventTypes.Create,
     EventTypes.Message,
     EventTypes.Encrypted,
     EventTypes.Sticker,
@@ -644,7 +645,7 @@ class SlidingSyncHandler:
             room_sync_result = await self.get_room_sync_data(
                 sync_config=sync_config,
                 room_id=room_id,
-                room_sync_config=room_sync_config,
+                room_sync_config=relevant_room_map[room_id],
                 room_membership_for_user_at_to_token=room_membership_for_user_map[
                     room_id
                 ],
@@ -1274,19 +1275,11 @@ class SlidingSyncHandler:
         # Assemble a map of room ID to the `stream_ordering` of the last activity that the
         # user should see in the room (<= `to_token`)
         last_activity_in_room_map: Dict[str, int] = {}
-        to_fetch = []
+
         for room_id, room_for_user in sync_room_map.items():
             # If they are fully-joined to the room, let's find the latest activity
             # at/before the `to_token`.
-            if room_for_user.membership == Membership.JOIN:
-                stream = self.store._events_stream_cache._entity_to_key.get(room_id)
-                if stream is not None:
-                    if stream <= to_token.room_key.stream:
-                        last_activity_in_room_map[room_id] = stream
-                        continue
-
-                to_fetch.append(room_id)
-            else:
+            if room_for_user.membership != Membership.JOIN:
                 # Otherwise, if the user has left/been invited/knocked/been banned from
                 # a room, they shouldn't see anything past that point.
                 #
@@ -1296,19 +1289,18 @@ class SlidingSyncHandler:
                 # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
                 last_activity_in_room_map[room_id] = room_for_user.event_pos.stream
 
-        ordering_map = await self.store.get_max_stream_ordering_in_rooms(to_fetch)
-        for room_id, stream_pos in ordering_map.items():
-            if stream_pos is None:
-                continue
-
-            if stream_pos.persisted_after(to_token.room_key):
-                continue
-
-            last_activity_in_room_map[room_id] = stream_pos.stream
+        joined_room_positions = (
+            await self.store.bulk_get_last_event_pos_in_room_before_stream_ordering(
+                [
+                    room_id
+                    for room_id, room_for_user in sync_room_map.items()
+                    if room_for_user.membership == Membership.JOIN
+                ],
+                to_token.room_key,
+            )
+        )
 
-        for room_id in sync_room_map.keys() - last_activity_in_room_map.keys():
-            # TODO: Handle better
-            last_activity_in_room_map[room_id] = sync_room_map[room_id].event_pos.stream
+        last_activity_in_room_map.update(joined_room_positions)
 
         return sorted(
             sync_room_map.values(),
diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py
index 87c929eb20..8bc92305fe 100644
--- a/synapse/media/media_repository.py
+++ b/synapse/media/media_repository.py
@@ -430,6 +430,7 @@ class MediaRepository:
         media_id: str,
         name: Optional[str],
         max_timeout_ms: int,
+        allow_authenticated: bool = True,
         federation: bool = False,
     ) -> None:
         """Responds to requests for local media, if exists, or returns 404.
@@ -442,6 +443,7 @@ class MediaRepository:
                 the filename in the Content-Disposition header of the response.
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
+            allow_authenticated: whether media marked as authenticated may be served to this request
             federation: whether the local media being fetched is for a federation request
 
         Returns:
@@ -451,6 +453,10 @@ class MediaRepository:
         if not media_info:
             return
 
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         self.mark_recently_accessed(None, media_id)
 
         media_type = media_info.media_type
@@ -481,6 +487,7 @@ class MediaRepository:
         max_timeout_ms: int,
         ip_address: str,
         use_federation_endpoint: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         """Respond to requests for remote media.
 
@@ -495,6 +502,8 @@ class MediaRepository:
             ip_address: the IP address of the requester
             use_federation_endpoint: whether to request the remote media over the new
                 federation `/download` endpoint
+            allow_authenticated: whether media marked as authenticated may be served to this
+                request
 
         Returns:
             Resolves once a response has successfully been written to request
@@ -526,6 +535,7 @@ class MediaRepository:
                 self.download_ratelimiter,
                 ip_address,
                 use_federation_endpoint,
+                allow_authenticated,
             )
 
         # We deliberately stream the file outside the lock
@@ -548,6 +558,7 @@ class MediaRepository:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool,
     ) -> RemoteMedia:
         """Gets the media info associated with the remote file, downloading
         if necessary.
@@ -560,6 +571,8 @@ class MediaRepository:
             ip_address: IP address of the requester
             use_federation: if a download is necessary, whether to request the remote file
                 over the federation `/download` endpoint
+            allow_authenticated: whether media marked as authenticated may be served to this
+                request
 
         Returns:
             The media info of the file
@@ -581,6 +594,7 @@ class MediaRepository:
                 self.download_ratelimiter,
                 ip_address,
                 use_federation,
+                allow_authenticated,
             )
 
         # Ensure we actually use the responder so that it releases resources
@@ -598,6 +612,7 @@ class MediaRepository:
         download_ratelimiter: Ratelimiter,
         ip_address: str,
         use_federation_endpoint: bool,
+        allow_authenticated: bool,
     ) -> Tuple[Optional[Responder], RemoteMedia]:
         """Looks for media in local cache, if not there then attempt to
         download from remote server.
@@ -619,6 +634,11 @@ class MediaRepository:
         """
         media_info = await self.store.get_cached_remote_media(server_name, media_id)
 
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            # if it isn't cached then don't fetch it or if it's authenticated then don't serve it
+            if not media_info or media_info.authenticated:
+                raise NotFoundError()
+
         # file_id is the ID we use to track the file locally. If we've already
         # seen the file then reuse the existing ID, otherwise generate a new
         # one.
@@ -792,6 +812,11 @@ class MediaRepository:
 
         logger.info("Stored remote media in file %r", fname)
 
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         return RemoteMedia(
             media_origin=server_name,
             media_id=media_id,
@@ -802,6 +827,7 @@ class MediaRepository:
             filesystem_id=file_id,
             last_access_ts=time_now_ms,
             quarantined_by=None,
+            authenticated=authenticated,
         )
 
     async def _federation_download_remote_file(
@@ -915,6 +941,11 @@ class MediaRepository:
 
         logger.debug("Stored remote media in file %r", fname)
 
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         return RemoteMedia(
             media_origin=server_name,
             media_id=media_id,
@@ -925,6 +956,7 @@ class MediaRepository:
             filesystem_id=file_id,
             last_access_ts=time_now_ms,
             quarantined_by=None,
+            authenticated=authenticated,
         )
 
     def _get_thumbnail_requirements(
@@ -1030,7 +1062,12 @@ class MediaRepository:
             t_len = os.path.getsize(output_path)
 
             await self.store.store_local_thumbnail(
-                media_id, t_width, t_height, t_type, t_method, t_len
+                media_id,
+                t_width,
+                t_height,
+                t_type,
+                t_method,
+                t_len,
             )
 
             return output_path
diff --git a/synapse/media/thumbnailer.py b/synapse/media/thumbnailer.py
index 413a720e40..ef6aa8ccf5 100644
--- a/synapse/media/thumbnailer.py
+++ b/synapse/media/thumbnailer.py
@@ -26,7 +26,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple, Type
 
 from PIL import Image
 
-from synapse.api.errors import Codes, SynapseError, cs_error
+from synapse.api.errors import Codes, NotFoundError, SynapseError, cs_error
 from synapse.config.repository import THUMBNAIL_SUPPORTED_MEDIA_FORMAT_MAP
 from synapse.http.server import respond_with_json
 from synapse.http.site import SynapseRequest
@@ -274,6 +274,7 @@ class ThumbnailProvider:
         m_type: str,
         max_timeout_ms: int,
         for_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_local_media_info(
             request, media_id, max_timeout_ms
@@ -281,6 +282,12 @@ class ThumbnailProvider:
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
         await self._select_and_respond_with_thumbnail(
             request,
@@ -307,14 +314,20 @@ class ThumbnailProvider:
         desired_type: str,
         max_timeout_ms: int,
         for_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_local_media_info(
             request, media_id, max_timeout_ms
         )
-
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_local_media_thumbnails(media_id)
         for info in thumbnail_infos:
             t_w = info.width == desired_width
@@ -381,14 +394,27 @@ class ThumbnailProvider:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms, ip_address, use_federation
+            server_name,
+            media_id,
+            max_timeout_ms,
+            ip_address,
+            use_federation,
+            allow_authenticated,
         )
         if not media_info:
             respond_404(request)
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                respond_404(request)
+                return
+
         thumbnail_infos = await self.store.get_remote_media_thumbnails(
             server_name, media_id
         )
@@ -446,16 +472,28 @@ class ThumbnailProvider:
         max_timeout_ms: int,
         ip_address: str,
         use_federation: bool,
+        allow_authenticated: bool = True,
     ) -> None:
         # TODO: Don't download the whole remote file
         # We should proxy the thumbnail from the remote server instead of
         # downloading the remote file and generating our own thumbnails.
         media_info = await self.media_repo.get_remote_media_info(
-            server_name, media_id, max_timeout_ms, ip_address, use_federation
+            server_name,
+            media_id,
+            max_timeout_ms,
+            ip_address,
+            use_federation,
+            allow_authenticated,
         )
         if not media_info:
             return
 
+        # if the media the thumbnail is generated from is authenticated, don't serve the
+        # thumbnail over an unauthenticated endpoint
+        if self.hs.config.media.enable_authenticated_media and not allow_authenticated:
+            if media_info.authenticated:
+                raise NotFoundError()
+
         thumbnail_infos = await self.store.get_remote_media_thumbnails(
             server_name, media_id
         )
@@ -485,8 +523,8 @@ class ThumbnailProvider:
         file_id: str,
         url_cache: bool,
         for_federation: bool,
-        server_name: Optional[str] = None,
         media_info: Optional[LocalMedia] = None,
+        server_name: Optional[str] = None,
     ) -> None:
         """
         Respond to a request with an appropriate thumbnail from the previously generated thumbnails.
diff --git a/synapse/rest/media/download_resource.py b/synapse/rest/media/download_resource.py
index c32c626905..3c3f703667 100644
--- a/synapse/rest/media/download_resource.py
+++ b/synapse/rest/media/download_resource.py
@@ -84,7 +84,7 @@ class DownloadResource(RestServlet):
 
         if self._is_mine_server_name(server_name):
             await self.media_repo.get_local_media(
-                request, media_id, file_name, max_timeout_ms
+                request, media_id, file_name, max_timeout_ms, allow_authenticated=False
             )
         else:
             allow_remote = parse_boolean(request, "allow_remote", default=True)
@@ -106,4 +106,5 @@ class DownloadResource(RestServlet):
                 max_timeout_ms,
                 ip_address,
                 False,
+                allow_authenticated=False,
             )
diff --git a/synapse/rest/media/thumbnail_resource.py b/synapse/rest/media/thumbnail_resource.py
index 70354aa439..536fea4c32 100644
--- a/synapse/rest/media/thumbnail_resource.py
+++ b/synapse/rest/media/thumbnail_resource.py
@@ -96,6 +96,7 @@ class ThumbnailResource(RestServlet):
                     m_type,
                     max_timeout_ms,
                     False,
+                    allow_authenticated=False,
                 )
             else:
                 await self.thumbnail_provider.respond_local_thumbnail(
@@ -107,6 +108,7 @@ class ThumbnailResource(RestServlet):
                     m_type,
                     max_timeout_ms,
                     False,
+                    allow_authenticated=False,
                 )
             self.media_repo.mark_recently_accessed(None, media_id)
         else:
@@ -134,6 +136,7 @@ class ThumbnailResource(RestServlet):
                 m_type,
                 max_timeout_ms,
                 ip_address,
-                False,
+                use_federation=False,
+                allow_authenticated=False,
             )
             self.media_repo.mark_recently_accessed(server_name, media_id)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index ebe2723095..066f3d08ae 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -121,7 +121,7 @@ class SQLBaseStore(metaclass=ABCMeta):
             )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (user_id,))
             self._attempt_to_invalidate_cache(
-                "get_rooms_for_local_user_where_membership_is", (user_id,)
+                "_get_rooms_for_local_user_where_membership_is_inner", (user_id,)
             )
 
         # Purge other caches based on room state.
@@ -149,6 +149,9 @@ class SQLBaseStore(metaclass=ABCMeta):
         self._attempt_to_invalidate_cache("does_pair_of_users_share_a_room", None)
         self._attempt_to_invalidate_cache("get_user_in_room_with_profile", None)
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache(
+            "_get_rooms_for_local_user_where_membership_is_inner", None
+        )
         self._attempt_to_invalidate_cache("get_room_summary", (room_id,))
 
     def _attempt_to_invalidate_cache(
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2fcd927089..8c2c0c5ab0 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -334,6 +334,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
                 "get_invited_rooms_for_local_user", (state_key,)
             )
             self._attempt_to_invalidate_cache("get_rooms_for_user", (state_key,))
+            self._attempt_to_invalidate_cache(
+                "_get_rooms_for_local_user_where_membership_is_inner", (state_key,)
+            )
 
             self._attempt_to_invalidate_cache(
                 "did_forget",
@@ -396,6 +399,9 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         self._attempt_to_invalidate_cache("get_thread_id_for_receipts", None)
         self._attempt_to_invalidate_cache("get_invited_rooms_for_local_user", None)
         self._attempt_to_invalidate_cache("get_rooms_for_user", None)
+        self._attempt_to_invalidate_cache(
+            "_get_rooms_for_local_user_where_membership_is_inner", None
+        )
         self._attempt_to_invalidate_cache("did_forget", None)
         self._attempt_to_invalidate_cache("get_forgotten_rooms_for_user", None)
         self._attempt_to_invalidate_cache("get_references_for_event", None)
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index 6128332af8..7617fd3ad4 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -64,6 +64,7 @@ class LocalMedia:
     quarantined_by: Optional[str]
     safe_from_quarantine: bool
     user_id: Optional[str]
+    authenticated: Optional[bool]
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -77,6 +78,7 @@ class RemoteMedia:
     created_ts: int
     last_access_ts: int
     quarantined_by: Optional[str]
+    authenticated: Optional[bool]
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -218,6 +220,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "last_access_ts",
                 "safe_from_quarantine",
                 "user_id",
+                "authenticated",
             ),
             allow_none=True,
             desc="get_local_media",
@@ -235,6 +238,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             last_access_ts=row[6],
             safe_from_quarantine=row[7],
             user_id=row[8],
+            authenticated=row[9],
         )
 
     async def get_local_media_by_user_paginate(
@@ -290,7 +294,8 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     last_access_ts,
                     quarantined_by,
                     safe_from_quarantine,
-                    user_id
+                    user_id,
+                    authenticated
                 FROM local_media_repository
                 WHERE user_id = ?
                 ORDER BY {order_by_column} {order}, media_id ASC
@@ -314,6 +319,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                     quarantined_by=row[7],
                     safe_from_quarantine=bool(row[8]),
                     user_id=row[9],
+                    authenticated=row[10],
                 )
                 for row in txn
             ]
@@ -417,12 +423,18 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         time_now_ms: int,
         user_id: UserID,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "local_media_repository",
             {
                 "media_id": media_id,
                 "created_ts": time_now_ms,
                 "user_id": user_id.to_string(),
+                "authenticated": authenticated,
             },
             desc="store_local_media_id",
         )
@@ -438,6 +450,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         user_id: UserID,
         url_cache: Optional[str] = None,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "local_media_repository",
             {
@@ -448,6 +465,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "media_length": media_length,
                 "user_id": user_id.to_string(),
                 "url_cache": url_cache,
+                "authenticated": authenticated,
             },
             desc="store_local_media",
         )
@@ -638,6 +656,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "filesystem_id",
                 "last_access_ts",
                 "quarantined_by",
+                "authenticated",
             ),
             allow_none=True,
             desc="get_cached_remote_media",
@@ -654,6 +673,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             filesystem_id=row[4],
             last_access_ts=row[5],
             quarantined_by=row[6],
+            authenticated=row[7],
         )
 
     async def store_cached_remote_media(
@@ -666,6 +686,11 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
         upload_name: Optional[str],
         filesystem_id: str,
     ) -> None:
+        if self.hs.config.media.enable_authenticated_media:
+            authenticated = True
+        else:
+            authenticated = False
+
         await self.db_pool.simple_insert(
             "remote_media_cache",
             {
@@ -677,6 +702,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
                 "upload_name": upload_name,
                 "filesystem_id": filesystem_id,
                 "last_access_ts": time_now_ms,
+                "authenticated": authenticated,
             },
             desc="store_cached_remote_media",
         )
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 8bd35af56e..2e0e6afac5 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -279,8 +279,19 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
     @cached(max_entries=100000)  # type: ignore[synapse-@cached-mutable]
     async def get_room_summary(self, room_id: str) -> Mapping[str, MemberSummary]:
-        """Get the details of a room roughly suitable for use by the room
+        """
+        Get the details of a room roughly suitable for use by the room
         summary extension to /sync. Useful when lazy loading room members.
+
+        Returns the total count of members in the room by membership type, and a
+        truncated list of members (the heroes). This will be the first 6 members of the
+        room:
+        - We want 5 heroes plus 1, in case one of them is the
+        calling user.
+        - They are ordered by `stream_ordering`, which are joined or
+        invited. When no joined or invited members are available, this also includes
+        banned and left users.
+
         Args:
             room_id: The room ID to query
         Returns:
@@ -308,23 +319,36 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             for count, membership in txn:
                 res.setdefault(membership, MemberSummary([], count))
 
-            # we order by membership and then fairly arbitrarily by event_id so
-            # heroes are consistent
-            # Note, rejected events will have a null membership field, so
-            # we we manually filter them out.
+            # Order by membership (joins -> invites -> leave (former insiders) ->
+            # everything else (outsiders like bans/knocks), then by `stream_ordering` so
+            # the first members in the room show up first and to make the sort stable
+            # (consistent heroes).
+            #
+            # Note: rejected events will have a null membership field, so we we manually
+            # filter them out.
             sql = """
                 SELECT state_key, membership, event_id
                 FROM current_state_events
                 WHERE type = 'm.room.member' AND room_id = ?
                     AND membership IS NOT NULL
                 ORDER BY
-                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                    event_id ASC
+                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 WHEN ? THEN 3 ELSE 4 END ASC,
+                    event_stream_ordering ASC
                 LIMIT ?
             """
 
-            # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
-            txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    # Sort order
+                    Membership.JOIN,
+                    Membership.INVITE,
+                    Membership.LEAVE,
+                    # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
+                    6,
+                ),
+            )
             for user_id, membership, event_id in txn:
                 summary = res[membership]
                 # we will always have a summary for this membership type at this
@@ -398,7 +422,6 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
                 return invite
         return None
 
-    @cached(max_entries=1000, uncached_args=["excluded_rooms"], tree=True)
     async def get_rooms_for_local_user_where_membership_is(
         self,
         user_id: str,
@@ -422,9 +445,11 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
         if not membership_list:
             return []
 
-        rooms = await self.db_pool.runInteraction(
-            "get_rooms_for_local_user_where_membership_is",
-            self._get_rooms_for_local_user_where_membership_is_txn,
+        # Convert membership list to frozen set as a) it needs to be hashable,
+        # and b) we don't care about the order.
+        membership_list = frozenset(membership_list)
+
+        rooms = await self._get_rooms_for_local_user_where_membership_is_inner(
             user_id,
             membership_list,
         )
@@ -443,6 +468,24 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
         return [room for room in rooms if room.room_id not in rooms_to_exclude]
 
+    @cached(max_entries=1000, tree=True)
+    async def _get_rooms_for_local_user_where_membership_is_inner(
+        self,
+        user_id: str,
+        membership_list: Collection[str],
+    ) -> Sequence[RoomsForUser]:
+        if not membership_list:
+            return []
+
+        rooms = await self.db_pool.runInteraction(
+            "get_rooms_for_local_user_where_membership_is",
+            self._get_rooms_for_local_user_where_membership_is_txn,
+            user_id,
+            membership_list,
+        )
+
+        return rooms
+
     def _get_rooms_for_local_user_where_membership_is_txn(
         self,
         txn: LoggingTransaction,
@@ -1510,10 +1553,19 @@ def extract_heroes_from_room_summary(
 ) -> List[str]:
     """Determine the users that represent a room, from the perspective of the `me` user.
 
+    This function expects `MemberSummary.members` to already be sorted by
+    `stream_ordering` like the results from `get_room_summary(...)`.
+
     The rules which say which users we select are specified in the "Room Summary"
     section of
     https://spec.matrix.org/v1.4/client-server-api/#get_matrixclientv3sync
 
+
+    Args:
+        details: Mapping from membership type to member summary. We expect
+            `MemberSummary.members` to already be sorted by `stream_ordering`.
+        me: The user for whom we are determining the heroes for.
+
     Returns a list (possibly empty) of heroes' mxids.
     """
     empty_ms = MemberSummary([], 0)
@@ -1528,11 +1580,11 @@ def extract_heroes_from_room_summary(
         r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
     ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
 
-    # FIXME: order by stream ordering rather than as returned by SQL
+    # We expect `MemberSummary.members` to already be sorted by `stream_ordering`
     if joined_user_ids or invited_user_ids:
-        return sorted(joined_user_ids + invited_user_ids)[0:5]
+        return (joined_user_ids + invited_user_ids)[0:5]
     else:
-        return sorted(gone_user_ids)[0:5]
+        return gone_user_ids[0:5]
 
 
 @attr.s(slots=True, auto_attribs=True)
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index d3e4340d4c..7df811e451 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -88,6 +88,7 @@ from synapse.types import (
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.cancellation import cancellable
+from synapse.util.iterutils import batch_iter
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -1349,6 +1350,95 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             get_last_event_pos_in_room_before_stream_ordering_txn,
         )
 
+    async def bulk_get_last_event_pos_in_room_before_stream_ordering(
+        self,
+        room_ids: StrCollection,
+        end_token: RoomStreamToken,
+    ) -> Dict[str, int]:
+        """Bulk fetch the latest event position in the given rooms"""
+
+        min_token = end_token.stream
+        max_token = end_token.get_max_stream_pos()
+        results: Dict[str, int] = {}
+
+        missing_room_ids: Set[str] = set()
+        for room_id in room_ids:
+            stream_pos = self._events_stream_cache._entity_to_key.get(room_id)
+            if stream_pos and stream_pos < max_token:
+                results[room_id] = stream_pos
+            else:
+                missing_room_ids.add(room_id)
+
+        def bulk_get_last_event_pos_txn(
+            txn: LoggingTransaction, batch_room_ids: StrCollection
+        ) -> Dict[str, int]:
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", batch_room_ids
+            )
+            sql = f"""
+                SELECT room_id, (
+                    SELECT stream_ordering FROM events AS e
+                    WHERE e.room_id = r.room_id
+                        AND stream_ordering <= ?
+                        AND NOT outlier
+                    ORDER BY stream_ordering DESC
+                    LIMIT 1
+                )
+                FROM rooms AS r
+                WHERE {clause}
+            """
+            txn.execute(sql, [max_token] + args)
+            return {row[0]: row[1] for row in txn}
+
+        recheck_rooms: Set[str] = set()
+        for batched in batch_iter(missing_room_ids, 1000):
+            result = await self.db_pool.runInteraction(
+                "bulk_get_last_event_pos_in_room_before_stream_ordering",
+                bulk_get_last_event_pos_txn,
+                batched,
+            )
+
+            for room_id, stream in result.items():
+                if min_token < stream:
+                    recheck_rooms.add(room_id)
+                else:
+                    results[room_id] = stream
+
+        if not recheck_rooms:
+            return results
+
+        def bulk_get_last_event_pos_recheck_txn(
+            txn: LoggingTransaction, batch_room_ids: StrCollection
+        ) -> Dict[str, int]:
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "room_id", batch_room_ids
+            )
+            sql = f"""
+                SELECT room_id, instance_name, stream_ordering
+                WHERE ? < stream_ordering AND stream_ordering <= ?
+                    AND {clause}
+                ORDER BY stream_ordering ASC
+            """
+            txn.execute(sql, [min_token, max_token] + args)
+            results: Dict[str, int] = {}
+            for row in txn:
+                room_id = row[0]
+                event_pos = PersistedEventPosition(row[1], row[2])
+                if not event_pos.persisted_after(end_token):
+                    results[room_id] = event_pos.stream
+
+            return results
+
+        for batched in batch_iter(recheck_rooms, 1000):
+            recheck_result = await self.db_pool.runInteraction(
+                "bulk_get_last_event_pos_in_room_before_stream_ordering_recheck",
+                bulk_get_last_event_pos_recheck_txn,
+                batched,
+            )
+            results.update(recheck_result)
+
+        return results
+
     async def get_current_room_stream_token_for_room_id(
         self, room_id: str
     ) -> RoomStreamToken:
diff --git a/synapse/storage/schema/__init__.py b/synapse/storage/schema/__init__.py
index 0dc5d24249..581d00346b 100644
--- a/synapse/storage/schema/__init__.py
+++ b/synapse/storage/schema/__init__.py
@@ -19,7 +19,7 @@
 #
 #
 
-SCHEMA_VERSION = 85  # remember to update the list below when updating
+SCHEMA_VERSION = 86  # remember to update the list below when updating
 """Represents the expectations made by the codebase about the database schema
 
 This should be incremented whenever the codebase changes its requirements on the
@@ -139,6 +139,9 @@ Changes in SCHEMA_VERSION = 84
 
 Changes in SCHEMA_VERSION = 85
     - Add a column `suspended` to the `users` table
+
+Changes in SCHEMA_VERSION = 86
+    - Add a column `authenticated` to the tables `local_media_repository` and `remote_media_cache`
 """
 
 
diff --git a/synapse/storage/schema/main/delta/86/01_authenticate_media.sql b/synapse/storage/schema/main/delta/86/01_authenticate_media.sql
new file mode 100644
index 0000000000..c1ac01ae95
--- /dev/null
+++ b/synapse/storage/schema/main/delta/86/01_authenticate_media.sql
@@ -0,0 +1,15 @@
+--
+-- This file is licensed under the Affero General Public License (AGPL) version 3.
+--
+-- Copyright (C) 2024 New Vector, Ltd
+--
+-- This program is free software: you can redistribute it and/or modify
+-- it under the terms of the GNU Affero General Public License as
+-- published by the Free Software Foundation, either version 3 of the
+-- License, or (at your option) any later version.
+--
+-- See the GNU Affero General Public License for more details:
+-- <https://www.gnu.org/licenses/agpl-3.0.html>.
+
+ALTER TABLE remote_media_cache ADD COLUMN authenticated BOOLEAN DEFAULT FALSE NOT NULL;
+ALTER TABLE local_media_repository ADD COLUMN authenticated BOOLEAN DEFAULT FALSE NOT NULL;
diff --git a/tests/handlers/test_sync.py b/tests/handlers/test_sync.py
index 77aafa492e..fa55f76916 100644
--- a/tests/handlers/test_sync.py
+++ b/tests/handlers/test_sync.py
@@ -211,6 +211,7 @@ class SyncTestCase(tests.unittest.HomeserverTestCase):
 
         # Blow away caches (supported room versions can only change due to a restart).
         self.store.get_rooms_for_user.invalidate_all()
+        self.store._get_rooms_for_local_user_where_membership_is_inner.invalidate_all()
         self.store._get_event_cache.clear()
         self.store._event_ref.clear()
 
diff --git a/tests/rest/client/test_media.py b/tests/rest/client/test_media.py
index 466c5a0b70..30b6d31d0a 100644
--- a/tests/rest/client/test_media.py
+++ b/tests/rest/client/test_media.py
@@ -43,6 +43,7 @@ from twisted.python.failure import Failure
 from twisted.test.proto_helpers import AccumulatingProtocol, MemoryReactor
 from twisted.web.http_headers import Headers
 from twisted.web.iweb import UNKNOWN_LENGTH, IResponse
+from twisted.web.resource import Resource
 
 from synapse.api.errors import HttpResponseException
 from synapse.api.ratelimiting import Ratelimiter
@@ -2466,3 +2467,211 @@ class DownloadAndThumbnailTestCase(unittest.HomeserverTestCase):
                 server_name=None,
             )
         )
+
+
+configs = [
+    {"extra_config": {"dynamic_thumbnails": True}},
+    {"extra_config": {"dynamic_thumbnails": False}},
+]
+
+
+@parameterized_class(configs)
+class AuthenticatedMediaTestCase(unittest.HomeserverTestCase):
+    extra_config: Dict[str, Any]
+    servlets = [
+        media.register_servlets,
+        login.register_servlets,
+        admin.register_servlets,
+    ]
+
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        config = self.default_config()
+
+        self.clock = clock
+        self.storage_path = self.mktemp()
+        self.media_store_path = self.mktemp()
+        os.mkdir(self.storage_path)
+        os.mkdir(self.media_store_path)
+        config["media_store_path"] = self.media_store_path
+        config["enable_authenticated_media"] = True
+
+        provider_config = {
+            "module": "synapse.media.storage_provider.FileStorageProviderBackend",
+            "store_local": True,
+            "store_synchronous": False,
+            "store_remote": True,
+            "config": {"directory": self.storage_path},
+        }
+
+        config["media_storage_providers"] = [provider_config]
+        config.update(self.extra_config)
+
+        return self.setup_test_homeserver(config=config)
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.repo = hs.get_media_repository()
+        self.client = hs.get_federation_http_client()
+        self.store = hs.get_datastores().main
+        self.user = self.register_user("user", "pass")
+        self.tok = self.login("user", "pass")
+
+    def create_resource_dict(self) -> Dict[str, Resource]:
+        resources = super().create_resource_dict()
+        resources["/_matrix/media"] = self.hs.get_media_repository_resource()
+        return resources
+
+    def test_authenticated_media(self) -> None:
+        # upload some local media with authentication on
+        channel = self.make_request(
+            "POST",
+            "_matrix/media/v3/upload?filename=test_png_upload",
+            SMALL_PNG,
+            self.tok,
+            shorthand=False,
+            content_type=b"image/png",
+            custom_headers=[("Content-Length", str(67))],
+        )
+        self.assertEqual(channel.code, 200)
+        res = channel.json_body.get("content_uri")
+        assert res is not None
+        uri = res.split("mxc://")[1]
+
+        # request media over authenticated endpoint, should be found
+        channel2 = self.make_request(
+            "GET",
+            f"_matrix/client/v1/media/download/{uri}",
+            access_token=self.tok,
+            shorthand=False,
+        )
+        self.assertEqual(channel2.code, 200)
+
+        # request same media over unauthenticated media, should raise 404 not found
+        channel3 = self.make_request(
+            "GET", f"_matrix/media/v3/download/{uri}", shorthand=False
+        )
+        self.assertEqual(channel3.code, 404)
+
+        # check thumbnails as well
+        params = "?width=32&height=32&method=crop"
+        channel4 = self.make_request(
+            "GET",
+            f"/_matrix/client/v1/media/thumbnail/{uri}{params}",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        self.assertEqual(channel4.code, 200)
+
+        params = "?width=32&height=32&method=crop"
+        channel5 = self.make_request(
+            "GET",
+            f"/_matrix/media/r0/thumbnail/{uri}{params}",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        self.assertEqual(channel5.code, 404)
+
+        # Inject a piece of remote media.
+        file_id = "abcdefg12345"
+        file_info = FileInfo(server_name="lonelyIsland", file_id=file_id)
+
+        media_storage = self.hs.get_media_repository().media_storage
+
+        ctx = media_storage.store_into_file(file_info)
+        (f, fname) = self.get_success(ctx.__aenter__())
+        f.write(SMALL_PNG)
+        self.get_success(ctx.__aexit__(None, None, None))
+
+        # we write the authenticated status when storing media, so this should pick up
+        # config and authenticate the media
+        self.get_success(
+            self.store.store_cached_remote_media(
+                origin="lonelyIsland",
+                media_id="52",
+                media_type="image/png",
+                media_length=1,
+                time_now_ms=self.clock.time_msec(),
+                upload_name="remote_test.png",
+                filesystem_id=file_id,
+            )
+        )
+
+        # ensure we have thumbnails for the non-dynamic code path
+        if self.extra_config == {"dynamic_thumbnails": False}:
+            self.get_success(
+                self.repo._generate_thumbnails(
+                    "lonelyIsland", "52", file_id, "image/png"
+                )
+            )
+
+        channel6 = self.make_request(
+            "GET",
+            "_matrix/client/v1/media/download/lonelyIsland/52",
+            access_token=self.tok,
+            shorthand=False,
+        )
+        self.assertEqual(channel6.code, 200)
+
+        channel7 = self.make_request(
+            "GET", f"_matrix/media/v3/download/{uri}", shorthand=False
+        )
+        self.assertEqual(channel7.code, 404)
+
+        params = "?width=32&height=32&method=crop"
+        channel8 = self.make_request(
+            "GET",
+            f"/_matrix/client/v1/media/thumbnail/lonelyIsland/52{params}",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        self.assertEqual(channel8.code, 200)
+
+        channel9 = self.make_request(
+            "GET",
+            f"/_matrix/media/r0/thumbnail/lonelyIsland/52{params}",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        self.assertEqual(channel9.code, 404)
+
+        # Inject a piece of local media that isn't authenticated
+        file_id = "abcdefg123456"
+        file_info = FileInfo(None, file_id=file_id)
+
+        ctx = media_storage.store_into_file(file_info)
+        (f, fname) = self.get_success(ctx.__aenter__())
+        f.write(SMALL_PNG)
+        self.get_success(ctx.__aexit__(None, None, None))
+
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                "local_media_repository",
+                {
+                    "media_id": "abcdefg123456",
+                    "media_type": "image/png",
+                    "created_ts": self.clock.time_msec(),
+                    "upload_name": "test_local",
+                    "media_length": 1,
+                    "user_id": "someone",
+                    "url_cache": None,
+                    "authenticated": False,
+                },
+                desc="store_local_media",
+            )
+        )
+
+        # check that unauthenticated media is still available over both endpoints
+        channel9 = self.make_request(
+            "GET",
+            "/_matrix/client/v1/media/download/test/abcdefg123456",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        self.assertEqual(channel9.code, 200)
+
+        channel10 = self.make_request(
+            "GET",
+            "/_matrix/media/r0/download/test/abcdefg123456",
+            shorthand=False,
+            access_token=self.tok,
+        )
+        self.assertEqual(channel10.code, 200)
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 0c66111e07..2a27571929 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -2167,18 +2167,15 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
         # Room2 doesn't have a name so we should see `heroes` populated
         self.assertIsNone(channel.json_body["rooms"][room_id1].get("name"))
-        # FIXME: Remove this basic assertion and uncomment the better assertion below
-        # after https://github.com/element-hq/synapse/pull/17435 merges
-        self.assertEqual(len(channel.json_body["rooms"][room_id1].get("heroes", [])), 5)
-        # self.assertCountEqual(
-        #     [
-        #         hero["user_id"]
-        #         for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
-        #     ],
-        #     # Heroes should be the first 5 users in the room (excluding the user
-        #     # themselves, we shouldn't see `user1`)
-        #     [user2_id, user3_id, user4_id, user5_id, user6_id],
-        # )
+        self.assertCountEqual(
+            [
+                hero["user_id"]
+                for hero in channel.json_body["rooms"][room_id1].get("heroes", [])
+            ],
+            # Heroes should be the first 5 users in the room (excluding the user
+            # themselves, we shouldn't see `user1`)
+            [user2_id, user3_id, user4_id, user5_id, user6_id],
+        )
         self.assertEqual(
             channel.json_body["rooms"][room_id1]["joined_count"],
             7,
diff --git a/tests/storage/test_roommember.py b/tests/storage/test_roommember.py
index dd475f54fa..e2f19e25e3 100644
--- a/tests/storage/test_roommember.py
+++ b/tests/storage/test_roommember.py
@@ -19,20 +19,28 @@
 # [This file includes modifications made by New Vector Limited]
 #
 #
+import logging
 from typing import List, Optional, Tuple, cast
 
 from twisted.test.proto_helpers import MemoryReactor
 
-from synapse.api.constants import Membership
+from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.room_versions import RoomVersions
+from synapse.rest import admin
 from synapse.rest.admin import register_servlets_for_client_rest_resource
-from synapse.rest.client import login, room
+from synapse.rest.client import knock, login, room
 from synapse.server import HomeServer
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
+from synapse.storage.roommember import MemberSummary
 from synapse.types import UserID, create_requester
 from synapse.util import Clock
 
 from tests import unittest
 from tests.server import TestHomeServer
 from tests.test_utils import event_injection
+from tests.unittest import skip_unless
+
+logger = logging.getLogger(__name__)
 
 
 class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
@@ -240,6 +248,397 @@ class RoomMemberStoreTestCase(unittest.HomeserverTestCase):
         )
 
 
+class RoomSummaryTestCase(unittest.HomeserverTestCase):
+    """
+    Test `/sync` room summary related logic like `get_room_summary(...)` and
+    `extract_heroes_from_room_summary(...)`
+    """
+
+    servlets = [
+        admin.register_servlets,
+        knock.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        self.store = self.hs.get_datastores().main
+
+    def _assert_member_summary(
+        self,
+        actual_member_summary: MemberSummary,
+        expected_member_list: List[str],
+        *,
+        expected_member_count: Optional[int] = None,
+    ) -> None:
+        """
+        Assert that the `MemberSummary` object has the expected members.
+        """
+        self.assertListEqual(
+            [
+                user_id
+                for user_id, _membership_event_id in actual_member_summary.members
+            ],
+            expected_member_list,
+        )
+        self.assertEqual(
+            actual_member_summary.count,
+            (
+                expected_member_count
+                if expected_member_count is not None
+                else len(expected_member_list)
+            ),
+        )
+
+    def test_get_room_summary_membership(self) -> None:
+        """
+        Test that `get_room_summary(...)` gets every kind of membership when there
+        aren't that many members in the room.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+
+        # Setup a room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 is invited by user1
+        self.helper.invite(room_id, targ=user3_id, tok=user1_tok)
+
+        # User4 leaves
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.leave(room_id, user4_id, tok=user4_tok)
+
+        # User5 joins
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+        empty_ms = MemberSummary([], 0)
+
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.JOIN, empty_ms),
+            [user1_id, user5_id],
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.INVITE, empty_ms), [user3_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.LEAVE, empty_ms), [user4_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.BAN, empty_ms), [user2_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.KNOCK, empty_ms),
+            [
+                # No one knocked
+            ],
+        )
+
+    def test_get_room_summary_membership_order(self) -> None:
+        """
+        Test that `get_room_summary(...)` stacks our limit of 6 in this order: joins ->
+        invites -> leave -> everything else (bans/knocks)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+        user6_id = self.register_user("user6", "pass")
+        user6_tok = self.login(user6_id, "pass")
+        user7_id = self.register_user("user7", "pass")
+        user7_tok = self.login(user7_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # We expect the order to be joins -> invites -> leave -> bans so setup the users
+        # *NOT* in that same order to make sure we're actually sorting them.
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 is invited by user1
+        self.helper.invite(room_id, targ=user3_id, tok=user1_tok)
+
+        # User4 leaves
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.leave(room_id, user4_id, tok=user4_tok)
+
+        # User5, User6, User7 joins
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+        self.helper.join(room_id, user6_id, tok=user6_tok)
+        self.helper.join(room_id, user7_id, tok=user7_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+        empty_ms = MemberSummary([], 0)
+
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.JOIN, empty_ms),
+            [user1_id, user5_id, user6_id, user7_id],
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.INVITE, empty_ms), [user3_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.LEAVE, empty_ms), [user4_id]
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.BAN, empty_ms),
+            [
+                # The banned user is not in the summary because the summary can only fit
+                # 6 members and prefers everything else before bans
+                #
+                # user2_id
+            ],
+            # But we still see the count of banned users
+            expected_member_count=1,
+        )
+        self._assert_member_summary(
+            room_membership_summary.get(Membership.KNOCK, empty_ms),
+            [
+                # No one knocked
+            ],
+        )
+
+    def test_extract_heroes_from_room_summary_excludes_self(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` does not include the user
+        itself.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 joins
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        # We first ask from the perspective of a random fake user
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # Make sure user1 is in the room (ensure our test setup is correct)
+        self.assertListEqual(hero_user_ids, [user1_id, user2_id])
+
+        # Now, we ask for the room summary from the perspective of user1
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me=user1_id
+        )
+
+        # User1 should not be included in the list of heroes because they are the one
+        # asking
+        self.assertListEqual(hero_user_ids, [user2_id])
+
+    def test_extract_heroes_from_room_summary_first_five_joins(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` returns the first 5 joins.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+        user6_id = self.register_user("user6", "pass")
+        user6_tok = self.login(user6_id, "pass")
+        user7_id = self.register_user("user7", "pass")
+        user7_tok = self.login(user7_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 -> User7 joins
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.join(room_id, user3_id, tok=user3_tok)
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+        self.helper.join(room_id, user6_id, tok=user6_tok)
+        self.helper.join(room_id, user7_id, tok=user7_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakuser"
+        )
+
+        # First 5 users to join the room
+        self.assertListEqual(
+            hero_user_ids, [user1_id, user2_id, user3_id, user4_id, user5_id]
+        )
+
+    def test_extract_heroes_from_room_summary_membership_order(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` prefers joins/invites over
+        everything else.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        user4_tok = self.login(user4_id, "pass")
+        user5_id = self.register_user("user5", "pass")
+        user5_tok = self.login(user5_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # We expect the order to be joins -> invites -> leave -> bans so setup the users
+        # *NOT* in that same order to make sure we're actually sorting them.
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 is invited by user1
+        self.helper.invite(room_id, targ=user3_id, tok=user1_tok)
+
+        # User4 leaves
+        self.helper.join(room_id, user4_id, tok=user4_tok)
+        self.helper.leave(room_id, user4_id, tok=user4_tok)
+
+        # User5 joins
+        self.helper.join(room_id, user5_id, tok=user5_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # Prefer joins -> invites, over everything else
+        self.assertListEqual(
+            hero_user_ids,
+            [
+                # The joins
+                user1_id,
+                user5_id,
+                # The invites
+                user3_id,
+            ],
+        )
+
+    @skip_unless(
+        False,
+        "Test is not possible because when everyone leaves the room, "
+        + "the server is `no_longer_in_room` and we don't have any `current_state_events` to query",
+    )
+    def test_extract_heroes_from_room_summary_fallback_leave_ban(self) -> None:
+        """
+        Test that `extract_heroes_from_room_summary(...)` falls back to leave/ban if
+        there aren't any joins/invites.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        user3_tok = self.login(user3_id, "pass")
+
+        # Setup the room (user1 is the creator and is joined to the room)
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        # User2 is banned
+        self.helper.join(room_id, user2_id, tok=user2_tok)
+        self.helper.ban(room_id, src=user1_id, targ=user2_id, tok=user1_tok)
+
+        # User3 leaves
+        self.helper.join(room_id, user3_id, tok=user3_tok)
+        self.helper.leave(room_id, user3_id, tok=user3_tok)
+
+        # User1 leaves (we're doing this last because they're the room creator)
+        self.helper.leave(room_id, user1_id, tok=user1_tok)
+
+        room_membership_summary = self.get_success(self.store.get_room_summary(room_id))
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # Fallback to people who left -> banned
+        self.assertListEqual(
+            hero_user_ids,
+            [user3_id, user1_id, user3_id],
+        )
+
+    def test_extract_heroes_from_room_summary_excludes_knocks(self) -> None:
+        """
+        People who knock on the room have (potentially) never been in the room before
+        and are total outsiders. Plus the spec doesn't mention them at all for heroes.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup the knock room (user1 is the creator and is joined to the room)
+        knock_room_id = self.helper.create_room_as(
+            user1_id, tok=user1_tok, room_version=RoomVersions.V7.identifier
+        )
+        self.helper.send_state(
+            knock_room_id,
+            EventTypes.JoinRules,
+            {"join_rule": JoinRules.KNOCK},
+            tok=user1_tok,
+        )
+
+        # User2 knocks on the room
+        knock_channel = self.make_request(
+            "POST",
+            "/_matrix/client/r0/knock/%s" % (knock_room_id,),
+            b"{}",
+            user2_tok,
+        )
+        self.assertEqual(knock_channel.code, 200, knock_channel.result)
+
+        room_membership_summary = self.get_success(
+            self.store.get_room_summary(knock_room_id)
+        )
+
+        hero_user_ids = extract_heroes_from_room_summary(
+            room_membership_summary, me="@fakeuser"
+        )
+
+        # user1 is the creator and is joined to the room (should show up as a hero)
+        # user2 is knocking on the room (should not show up as a hero)
+        self.assertListEqual(
+            hero_user_ids,
+            [user1_id],
+        )
+
+
 class CurrentStateMembershipUpdateTestCase(unittest.HomeserverTestCase):
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
         self.store = hs.get_datastores().main