summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorErik Johnston <erik@matrix.org>2024-07-18 12:51:37 +0100
committerErik Johnston <erik@matrix.org>2024-07-18 12:51:37 +0100
commit84d14b4aa82ba73907d4db746be230a58bf65bfc (patch)
treeb463749fda5c2fd8ba7d67c1e30d94df95afa542 /synapse
parentMerge remote-tracking branch 'origin/release-v1.111' into matrix-org-hotfixes (diff)
parentAdd `m.room.create` to default bump event types (#17453) (diff)
downloadsynapse-84d14b4aa82ba73907d4db746be230a58bf65bfc.tar.xz
Merge remote-tracking branch 'origin/develop' into matrix-org-hotfixes
Diffstat (limited to 'synapse')
-rw-r--r--synapse/app/homeserver.py2
-rw-r--r--synapse/config/repository.py2
-rw-r--r--synapse/config/server.py6
-rw-r--r--synapse/handlers/sliding_sync.py754
-rw-r--r--synapse/http/matrixfederationclient.py126
-rw-r--r--synapse/rest/client/sync.py32
-rw-r--r--synapse/storage/databases/main/roommember.py57
-rw-r--r--synapse/types/__init__.py65
-rw-r--r--synapse/types/handlers/__init__.py18
-rw-r--r--synapse/types/rest/client/__init__.py4
10 files changed, 756 insertions, 310 deletions
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 2b111847b7..e114ab7ec4 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -217,7 +217,7 @@ class SynapseHomeServer(HomeServer):
             )
 
         if name in ["media", "federation", "client"]:
-            if self.config.server.enable_media_repo:
+            if self.config.media.can_load_media_repo:
                 media_repo = self.get_media_repository_resource()
                 resources.update(
                     {
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 1645470499..dc0e93ffa1 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -126,7 +126,7 @@ class ContentRepositoryConfig(Config):
         # Only enable the media repo if either the media repo is enabled or the
         # current worker app is the media repo.
         if (
-            self.root.server.enable_media_repo is False
+            config.get("enable_media_repo", True) is False
             and config.get("worker_app") != "synapse.app.media_repository"
         ):
             self.can_load_media_repo = False
diff --git a/synapse/config/server.py b/synapse/config/server.py
index a2b2305776..8bb97df175 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -395,12 +395,6 @@ class ServerConfig(Config):
                 self.presence_router_config,
             ) = load_module(presence_router_config, ("presence", "presence_router"))
 
-        # whether to enable the media repository endpoints. This should be set
-        # to false if the media repository is running as a separate endpoint;
-        # doing so ensures that we will not run cache cleanup jobs on the
-        # master, potentially causing inconsistency.
-        self.enable_media_repo = config.get("enable_media_repo", True)
-
         # Whether to require authentication to retrieve profile data (avatars,
         # display names) of other users through the client API.
         self.require_auth_for_profile_requests = config.get(
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 8e6c2fb860..a23a6b9dd9 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -19,7 +19,7 @@
 #
 import logging
 from itertools import chain
-from typing import TYPE_CHECKING, Any, Dict, Final, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Any, Dict, Final, List, Mapping, Optional, Set, Tuple
 
 import attr
 from immutabledict import immutabledict
@@ -28,7 +28,9 @@ from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membe
 from synapse.events import EventBase
 from synapse.events.utils import strip_event
 from synapse.handlers.relations import BundledAggregations
+from synapse.storage.databases.main.roommember import extract_heroes_from_room_summary
 from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.storage.roommember import MemberSummary
 from synapse.types import (
     JsonDict,
     PersistedEventPosition,
@@ -51,6 +53,7 @@ logger = logging.getLogger(__name__)
 
 # The event types that clients should consider as new activity.
 DEFAULT_BUMP_EVENT_TYPES = {
+    EventTypes.Create,
     EventTypes.Message,
     EventTypes.Encrypted,
     EventTypes.Sticker,
@@ -60,32 +63,79 @@ DEFAULT_BUMP_EVENT_TYPES = {
 }
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+    """
+    Attributes:
+        room_id: The room ID of the membership event
+        event_id: The event ID of the membership event
+        event_pos: The stream position of the membership event
+        membership: The membership state of the user in the room
+        sender: The person who sent the membership event
+        newly_joined: Whether the user newly joined the room during the given token
+            range and is still joined to the room at the end of this range.
+        newly_left: Whether the user newly left (or kicked) the room during the given
+            token range and is still "leave" at the end of this range.
+        is_dm: Whether this user considers this room as a direct-message (DM) room
+    """
+
+    room_id: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    event_id: Optional[str]
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set because `current_state_delta_stream` will note the position that the reset
+    # happened.
+    event_pos: PersistedEventPosition
+    # Even during a state reset which removes the user from the room, we expect this to
+    # be set to `LEAVE` because we can make that assumption based on the situaton (see
+    # `get_current_state_delta_membership_changes_for_user(...)`)
+    membership: str
+    # Optional because state resets can affect room membership without a corresponding event.
+    sender: Optional[str]
+    newly_joined: bool
+    newly_left: bool
+    is_dm: bool
+
+    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+        return attr.evolve(self, **kwds)
+
+
 def filter_membership_for_sync(
-    *, membership: str, user_id: str, sender: Optional[str]
+    *, user_id: str, room_membership_for_user: _RoomMembershipForUser
 ) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
     otherwise False.
 
     Attributes:
-        membership: The membership state of the user in the room.
         user_id: The user ID that the membership applies to
-        sender: The person who sent the membership event
+        room_membership_for_user: Membership information for the user in the room
     """
 
-    # Everything except `Membership.LEAVE` because we want everything that's *still*
-    # relevant to the user. There are few more things to include in the sync response
-    # (newly_left) but those are handled separately.
+    membership = room_membership_for_user.membership
+    sender = room_membership_for_user.sender
+    newly_left = room_membership_for_user.newly_left
+
+    # We want to allow everything except rooms the user has left unless `newly_left`
+    # because we want everything that's *still* relevant to the user. We include
+    # `newly_left` rooms because the last event that the user should see is their own
+    # leave event.
     #
-    # This logic includes kicks (leave events where the sender is not the same user) and
-    # can be read as "anything that isn't a leave or a leave with a different sender".
+    # A leave != kick. This logic includes kicks (leave events where the sender is not
+    # the same user).
     #
-    # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
-    # happened that removed the user from the room, or the user was the last person
-    # locally to leave the room which caused the server to leave the room. In both
-    # cases, we can just remove the rooms since they are no longer relevant to the user.
-    # They could still be added back later if they are `newly_left`.
-    return membership != Membership.LEAVE or sender not in (user_id, None)
+    # When `sender=None`, it means that a state reset happened that removed the user
+    # from the room without a corresponding leave event. We can just remove the rooms
+    # since they are no longer relevant to the user but will still appear if they are
+    # `newly_left`.
+    return (
+        # Anything except leave events
+        membership != Membership.LEAVE
+        # Unless...
+        or newly_left
+        # Allow kicks
+        or (membership == Membership.LEAVE and sender not in (user_id, None))
+    )
 
 
 # We can't freeze this class because we want to update it in place with the
@@ -279,29 +329,6 @@ class StateValues:
     LAZY: Final = "$LAZY"
 
 
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _RoomMembershipForUser:
-    """
-    Attributes:
-        event_id: The event ID of the membership event
-        event_pos: The stream position of the membership event
-        membership: The membership state of the user in the room
-        sender: The person who sent the membership event
-        newly_joined: Whether the user newly joined the room during the given token
-            range
-    """
-
-    room_id: str
-    event_id: Optional[str]
-    event_pos: PersistedEventPosition
-    membership: str
-    sender: Optional[str]
-    newly_joined: bool
-
-    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
-        return attr.evolve(self, **kwds)
-
-
 class SlidingSyncHandler:
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
@@ -420,18 +447,31 @@ class SlidingSyncHandler:
             # See https://github.com/matrix-org/matrix-doc/issues/1144
             raise NotImplementedError()
 
+        # Get all of the room IDs that the user should be able to see in the sync
+        # response
+        has_lists = sync_config.lists is not None and len(sync_config.lists) > 0
+        has_room_subscriptions = (
+            sync_config.room_subscriptions is not None
+            and len(sync_config.room_subscriptions) > 0
+        )
+        if has_lists or has_room_subscriptions:
+            room_membership_for_user_map = (
+                await self.get_room_membership_for_user_at_to_token(
+                    user=sync_config.user,
+                    to_token=to_token,
+                    from_token=from_token,
+                )
+            )
+
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
         # Keep track of the rooms that we're going to display and need to fetch more
         # info about
         relevant_room_map: Dict[str, RoomSyncConfig] = {}
-        if sync_config.lists:
-            # Get all of the room IDs that the user should be able to see in the sync
-            # response
-            sync_room_map = await self.get_sync_room_ids_for_user(
-                sync_config.user,
-                from_token=from_token,
-                to_token=to_token,
+        if has_lists and sync_config.lists is not None:
+            sync_room_map = await self.filter_rooms_relevant_for_sync(
+                user=sync_config.user,
+                room_membership_for_user_map=room_membership_for_user_map,
             )
 
             for list_key, list_config in sync_config.lists.items():
@@ -520,7 +560,35 @@ class SlidingSyncHandler:
                     ops=ops,
                 )
 
-        # TODO: if (sync_config.room_subscriptions):
+        # Handle room subscriptions
+        if has_room_subscriptions and sync_config.room_subscriptions is not None:
+            for room_id, room_subscription in sync_config.room_subscriptions.items():
+                room_membership_for_user_at_to_token = (
+                    await self.check_room_subscription_allowed_for_user(
+                        room_id=room_id,
+                        room_membership_for_user_map=room_membership_for_user_map,
+                        to_token=to_token,
+                    )
+                )
+
+                # Skip this room if the user isn't allowed to see it
+                if not room_membership_for_user_at_to_token:
+                    continue
+
+                room_membership_for_user_map[room_id] = (
+                    room_membership_for_user_at_to_token
+                )
+
+                # Take the superset of the `RoomSyncConfig` for each room.
+                #
+                # Update our `relevant_room_map` with the room we're going to display
+                # and need to fetch more info about.
+                room_sync_config = RoomSyncConfig.from_room_config(room_subscription)
+                existing_room_sync_config = relevant_room_map.get(room_id)
+                if existing_room_sync_config is not None:
+                    existing_room_sync_config.combine_room_sync_config(room_sync_config)
+                else:
+                    relevant_room_map[room_id] = room_sync_config
 
         # Fetch room data
         rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
@@ -529,7 +597,9 @@ class SlidingSyncHandler:
                 user=sync_config.user,
                 room_id=room_id,
                 room_sync_config=room_sync_config,
-                room_membership_for_user_at_to_token=sync_room_map[room_id],
+                room_membership_for_user_at_to_token=room_membership_for_user_map[
+                    room_id
+                ],
                 from_token=from_token,
                 to_token=to_token,
             )
@@ -547,28 +617,23 @@ class SlidingSyncHandler:
             extensions=extensions,
         )
 
-    async def get_sync_room_ids_for_user(
+    async def get_room_membership_for_user_at_to_token(
         self,
         user: UserID,
         to_token: StreamToken,
-        from_token: Optional[StreamToken] = None,
+        from_token: Optional[StreamToken],
     ) -> Dict[str, _RoomMembershipForUser]:
         """
-        Fetch room IDs that should be listed for this user in the sync response (the
-        full room list that will be filtered, sorted, and sliced).
+        Fetch room IDs that the user has had membership in (the full room list including
+        long-lost left rooms that will be filtered, sorted, and sliced).
 
-        We're looking for rooms where the user has the following state in the token
-        range (> `from_token` and <= `to_token`):
+        We're looking for rooms where the user has had any sort of membership in the
+        token range (> `from_token` and <= `to_token`)
 
-        - `invite`, `join`, `knock`, `ban` membership events
-        - Kicks (`leave` membership events where `sender` is different from the
-          `user_id`/`state_key`)
-        - `newly_left` (rooms that were left during the given token range)
-        - In order for bans/kicks to not show up in sync, you need to `/forget` those
-          rooms. This doesn't modify the event itself though and only adds the
-          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
-          to tell when a room was forgotten at the moment so we can't factor it into the
-          from/to range.
+        In order for bans/kicks to not show up, you need to `/forget` those rooms. This
+        doesn't modify the event itself though and only adds the `forgotten` flag to the
+        `room_memberships` table in Synapse. There isn't a way to tell when a room was
+        forgotten at the moment so we can't factor it into the token range.
 
         Args:
             user: User to fetch rooms for
@@ -576,8 +641,8 @@ class SlidingSyncHandler:
             from_token: The point in the stream to sync from.
 
         Returns:
-            A dictionary of room IDs that should be listed in the sync response along
-            with membership information in that room at the time of `to_token`.
+            A dictionary of room IDs that the user has had membership in along with
+            membership information in that room at the time of `to_token`.
         """
         user_id = user.to_string()
 
@@ -588,9 +653,6 @@ class SlidingSyncHandler:
             # We want to fetch any kind of membership (joined and left rooms) in order
             # to get the `event_pos` of the latest room membership event for the
             # user.
-            #
-            # We will filter out the rooms that don't belong below (see
-            # `filter_membership_for_sync`)
             membership_list=Membership.LIST,
             excluded_rooms=self.rooms_to_exclude_globally,
         )
@@ -610,7 +672,10 @@ class SlidingSyncHandler:
                 event_pos=room_for_user.event_pos,
                 membership=room_for_user.membership,
                 sender=room_for_user.sender,
+                # We will update these fields below to be accurate
                 newly_joined=False,
+                newly_left=False,
+                is_dm=False,
             )
             for room_for_user in room_for_user_list
         }
@@ -635,10 +700,17 @@ class SlidingSyncHandler:
                 instance_to_max_stream_ordering_map[instance_name] = stream_ordering
 
         # Then assemble the `RoomStreamToken`
+        min_stream_pos = min(instance_to_max_stream_ordering_map.values())
         membership_snapshot_token = RoomStreamToken(
             # Minimum position in the `instance_map`
-            stream=min(instance_to_max_stream_ordering_map.values()),
-            instance_map=immutabledict(instance_to_max_stream_ordering_map),
+            stream=min_stream_pos,
+            instance_map=immutabledict(
+                {
+                    instance_name: stream_pos
+                    for instance_name, stream_pos in instance_to_max_stream_ordering_map.items()
+                    if stream_pos > min_stream_pos
+                }
+            ),
         )
 
         # Since we fetched the users room list at some point in time after the from/to
@@ -648,10 +720,9 @@ class SlidingSyncHandler:
         # - 1a) Remove rooms that the user joined after the `to_token`
         # - 1b) Add back rooms that the user left after the `to_token`
         # - 1c) Update room membership events to the point in time of the `to_token`
-        # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
-        # - 3) Figure out which rooms are `newly_joined`
-
-        # 1) -----------------------------------------------------
+        # - 2) Figure out which rooms are `newly_left` rooms (> `from_token` and <= `to_token`)
+        # - 3) Figure out which rooms are `newly_joined` (> `from_token` and <= `to_token`)
+        # - 4) Figure out which rooms are DM's
 
         # 1) Fetch membership changes that fall in the range from `to_token` up to
         # `membership_snapshot_token`
@@ -711,7 +782,10 @@ class SlidingSyncHandler:
                         event_pos=first_membership_change_after_to_token.prev_event_pos,
                         membership=first_membership_change_after_to_token.prev_membership,
                         sender=first_membership_change_after_to_token.prev_sender,
+                        # We will update these fields below to be accurate
                         newly_joined=False,
+                        newly_left=False,
+                        is_dm=False,
                     )
                 else:
                     # If we can't find the previous membership event, we shouldn't
@@ -719,22 +793,6 @@ class SlidingSyncHandler:
                     # exact membership state and shouldn't rely on the current snapshot.
                     sync_room_id_set.pop(room_id, None)
 
-        # Filter the rooms that that we have updated room membership events to the point
-        # in time of the `to_token` (from the "1)" fixups)
-        filtered_sync_room_id_set = {
-            room_id: room_membership_for_user
-            for room_id, room_membership_for_user in sync_room_id_set.items()
-            if filter_membership_for_sync(
-                membership=room_membership_for_user.membership,
-                user_id=user_id,
-                sender=room_membership_for_user.sender,
-            )
-        }
-
-        # 2) -----------------------------------------------------
-        # We fix-up newly_left rooms after the first fixup because it may have removed
-        # some left rooms that we can figure out are newly_left in the following code
-
         # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
         current_state_delta_membership_changes_in_from_to_range = []
         if from_token:
@@ -796,18 +854,40 @@ class SlidingSyncHandler:
             if last_membership_change_in_from_to_range.membership == Membership.JOIN:
                 possibly_newly_joined_room_ids.add(room_id)
 
-            # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
-            # include newly_left rooms because the last event that the user should see
-            # is their own leave event
+            # 2) Figure out newly_left rooms (> `from_token` and <= `to_token`).
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
-                    room_id=room_id,
-                    event_id=last_membership_change_in_from_to_range.event_id,
-                    event_pos=last_membership_change_in_from_to_range.event_pos,
-                    membership=last_membership_change_in_from_to_range.membership,
-                    sender=last_membership_change_in_from_to_range.sender,
-                    newly_joined=False,
-                )
+                # 2) Mark this room as `newly_left`
+
+                # If we're seeing a membership change here, we should expect to already
+                # have it in our snapshot but if a state reset happens, it wouldn't have
+                # shown up in our snapshot but appear as a change here.
+                existing_sync_entry = sync_room_id_set.get(room_id)
+                if existing_sync_entry is not None:
+                    # Normal expected case
+                    sync_room_id_set[room_id] = existing_sync_entry.copy_and_replace(
+                        newly_left=True
+                    )
+                else:
+                    # State reset!
+                    logger.warn(
+                        "State reset detected for room_id %s with %s who is no longer in the room",
+                        room_id,
+                        user_id,
+                    )
+                    # Even though a state reset happened which removed the person from
+                    # the room, we still add it the list so the user knows they left the
+                    # room. Downstream code can check for a state reset by looking for
+                    # `event_id=None and membership is not None`.
+                    sync_room_id_set[room_id] = _RoomMembershipForUser(
+                        room_id=room_id,
+                        event_id=last_membership_change_in_from_to_range.event_id,
+                        event_pos=last_membership_change_in_from_to_range.event_pos,
+                        membership=last_membership_change_in_from_to_range.membership,
+                        sender=last_membership_change_in_from_to_range.sender,
+                        newly_joined=False,
+                        newly_left=True,
+                        is_dm=False,
+                    )
 
         # 3) Figure out `newly_joined`
         for room_id in possibly_newly_joined_room_ids:
@@ -818,9 +898,9 @@ class SlidingSyncHandler:
             # also some non-join in the range, we know they `newly_joined`.
             if has_non_join_in_from_to_range:
                 # We found a `newly_joined` room (we left and joined within the token range)
-                filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
-                    room_id
-                ].copy_and_replace(newly_joined=True)
+                sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                    newly_joined=True
+                )
             else:
                 prev_event_id = first_membership_change_by_room_id_in_from_to_range[
                     room_id
@@ -832,7 +912,7 @@ class SlidingSyncHandler:
                 if prev_event_id is None:
                     # We found a `newly_joined` room (we are joining the room for the
                     # first time within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
                 # Last resort, we need to step back to the previous membership event
@@ -840,11 +920,150 @@ class SlidingSyncHandler:
                 elif prev_membership != Membership.JOIN:
                     # We found a `newly_joined` room (we left before the token range
                     # and joined within the token range)
-                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    sync_room_id_set[room_id] = sync_room_id_set[
                         room_id
                     ].copy_and_replace(newly_joined=True)
 
-        return filtered_sync_room_id_set
+        # 4) Figure out which rooms the user considers to be direct-message (DM) rooms
+        #
+        # We're using global account data (`m.direct`) instead of checking for
+        # `is_direct` on membership events because that property only appears for
+        # the invitee membership event (doesn't show up for the inviter).
+        #
+        # We're unable to take `to_token` into account for global account data since
+        # we only keep track of the latest account data for the user.
+        dm_map = await self.store.get_global_account_data_by_type_for_user(
+            user_id, AccountDataTypes.DIRECT
+        )
+
+        # Flatten out the map. Account data is set by the client so it needs to be
+        # scrutinized.
+        dm_room_id_set = set()
+        if isinstance(dm_map, dict):
+            for room_ids in dm_map.values():
+                # Account data should be a list of room IDs. Ignore anything else
+                if isinstance(room_ids, list):
+                    for room_id in room_ids:
+                        if isinstance(room_id, str):
+                            dm_room_id_set.add(room_id)
+
+        # 4) Fixup
+        for room_id in sync_room_id_set:
+            sync_room_id_set[room_id] = sync_room_id_set[room_id].copy_and_replace(
+                is_dm=room_id in dm_room_id_set
+            )
+
+        return sync_room_id_set
+
+    async def filter_rooms_relevant_for_sync(
+        self,
+        user: UserID,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+    ) -> Dict[str, _RoomMembershipForUser]:
+        """
+        Filter room IDs that should/can be listed for this user in the sync response (the
+        full room list that will be further filtered, sorted, and sliced).
+
+        We're looking for rooms where the user has the following state in the token
+        range (> `from_token` and <= `to_token`):
+
+        - `invite`, `join`, `knock`, `ban` membership events
+        - Kicks (`leave` membership events where `sender` is different from the
+          `user_id`/`state_key`)
+        - `newly_left` (rooms that were left during the given token range)
+        - In order for bans/kicks to not show up in sync, you need to `/forget` those
+          rooms. This doesn't modify the event itself though and only adds the
+          `forgotten` flag to the `room_memberships` table in Synapse. There isn't a way
+          to tell when a room was forgotten at the moment so we can't factor it into the
+          from/to range.
+
+        Args:
+            user: User that is syncing
+            room_membership_for_user_map: Room membership for the user
+
+        Returns:
+            A dictionary of room IDs that should be listed in the sync response along
+            with membership information in that room at the time of `to_token`.
+        """
+        user_id = user.to_string()
+
+        # Filter rooms to only what we're interested to sync with
+        filtered_sync_room_map = {
+            room_id: room_membership_for_user
+            for room_id, room_membership_for_user in room_membership_for_user_map.items()
+            if filter_membership_for_sync(
+                user_id=user_id,
+                room_membership_for_user=room_membership_for_user,
+            )
+        }
+
+        return filtered_sync_room_map
+
+    async def check_room_subscription_allowed_for_user(
+        self,
+        room_id: str,
+        room_membership_for_user_map: Dict[str, _RoomMembershipForUser],
+        to_token: StreamToken,
+    ) -> Optional[_RoomMembershipForUser]:
+        """
+        Check whether the user is allowed to see the room based on whether they have
+        ever had membership in the room or if the room is `world_readable`.
+
+        Similar to `check_user_in_room_or_world_readable(...)`
+
+        Args:
+            room_id: Room to check
+            room_membership_for_user_map: Room membership for the user at the time of
+                the `to_token` (<= `to_token`).
+            to_token: The token to fetch rooms up to.
+
+        Returns:
+            The room membership for the user if they are allowed to subscribe to the
+            room else `None`.
+        """
+
+        # We can first check if they are already allowed to see the room based
+        # on our previous work to assemble the `room_membership_for_user_map`.
+        #
+        # If they have had any membership in the room over time (up to the `to_token`),
+        # let them subscribe and see what they can.
+        existing_membership_for_user = room_membership_for_user_map.get(room_id)
+        if existing_membership_for_user is not None:
+            return existing_membership_for_user
+
+        # TODO: Handle `world_readable` rooms
+        return None
+
+        # If the room is `world_readable`, it doesn't matter whether they can join,
+        # everyone can see the room.
+        # not_in_room_membership_for_user = _RoomMembershipForUser(
+        #     room_id=room_id,
+        #     event_id=None,
+        #     event_pos=None,
+        #     membership=None,
+        #     sender=None,
+        #     newly_joined=False,
+        #     newly_left=False,
+        #     is_dm=False,
+        # )
+        # room_state = await self.get_current_state_at(
+        #     room_id=room_id,
+        #     room_membership_for_user_at_to_token=not_in_room_membership_for_user,
+        #     state_filter=StateFilter.from_types(
+        #         [(EventTypes.RoomHistoryVisibility, "")]
+        #     ),
+        #     to_token=to_token,
+        # )
+
+        # visibility_event = room_state.get((EventTypes.RoomHistoryVisibility, ""))
+        # if (
+        #     visibility_event is not None
+        #     and visibility_event.content.get("history_visibility")
+        #     == HistoryVisibility.WORLD_READABLE
+        # ):
+        #     return not_in_room_membership_for_user
+
+        # return None
 
     async def filter_rooms(
         self,
@@ -867,41 +1086,24 @@ class SlidingSyncHandler:
             A filtered dictionary of room IDs along with membership information in the
             room at the time of `to_token`.
         """
-        user_id = user.to_string()
-
-        # TODO: Apply filters
-
         filtered_room_id_set = set(sync_room_map.keys())
 
         # Filter for Direct-Message (DM) rooms
         if filters.is_dm is not None:
-            # We're using global account data (`m.direct`) instead of checking for
-            # `is_direct` on membership events because that property only appears for
-            # the invitee membership event (doesn't show up for the inviter). Account
-            # data is set by the client so it needs to be scrutinized.
-            #
-            # We're unable to take `to_token` into account for global account data since
-            # we only keep track of the latest account data for the user.
-            dm_map = await self.store.get_global_account_data_by_type_for_user(
-                user_id, AccountDataTypes.DIRECT
-            )
-
-            # Flatten out the map
-            dm_room_id_set = set()
-            if isinstance(dm_map, dict):
-                for room_ids in dm_map.values():
-                    # Account data should be a list of room IDs. Ignore anything else
-                    if isinstance(room_ids, list):
-                        for room_id in room_ids:
-                            if isinstance(room_id, str):
-                                dm_room_id_set.add(room_id)
-
             if filters.is_dm:
                 # Only DM rooms please
-                filtered_room_id_set = filtered_room_id_set.intersection(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if sync_room_map[room_id].is_dm
+                }
             else:
                 # Only non-DM rooms please
-                filtered_room_id_set = filtered_room_id_set.difference(dm_room_id_set)
+                filtered_room_id_set = {
+                    room_id
+                    for room_id in filtered_room_id_set
+                    if not sync_room_map[room_id].is_dm
+                }
 
         if filters.spaces:
             raise NotImplementedError()
@@ -1043,6 +1245,102 @@ class SlidingSyncHandler:
             reverse=True,
         )
 
+    async def get_current_state_ids_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[str]:
+        """
+        Get current state IDs for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids: StateMap[str]
+        # People shouldn't see past their leave/ban event
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: `get_state_ids_at(...)` doesn't take into account the "current state"
+            room_state_ids = await self.storage_controllers.state.get_state_ids_at(
+                room_id,
+                stream_position=to_token.copy_and_replace(
+                    StreamKeyType.ROOM,
+                    room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
+                ),
+                state_filter=state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+        # Otherwise, we can get the latest current state in the room
+        else:
+            room_state_ids = await self.storage_controllers.state.get_current_state_ids(
+                room_id,
+                state_filter,
+                # Partially-stated rooms should have all state events except for
+                # remote membership events. Since we've already excluded
+                # partially-stated rooms unless `required_state` only has
+                # `["m.room.member", "$LAZY"]` for membership, we should be able to
+                # retrieve everything requested. When we're lazy-loading, if there
+                # are some remote senders in the timeline, we should also have their
+                # membership event because we had to auth that timeline event. Plus
+                # we don't want to block the whole sync waiting for this one room.
+                await_full_state=False,
+            )
+            # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
+
+        return room_state_ids
+
+    async def get_current_state_at(
+        self,
+        room_id: str,
+        room_membership_for_user_at_to_token: _RoomMembershipForUser,
+        state_filter: StateFilter,
+        to_token: StreamToken,
+    ) -> StateMap[EventBase]:
+        """
+        Get current state for the user in the room according to their membership. This
+        will be the current state at the time of their LEAVE/BAN, otherwise will be the
+        current state <= to_token.
+
+        Args:
+            room_id: The room ID to fetch data for
+            room_membership_for_user_at_token: Membership information for the user
+                in the room at the time of `to_token`.
+            to_token: The point in the stream to sync up to.
+        """
+        room_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=state_filter,
+            to_token=to_token,
+        )
+
+        event_map = await self.store.get_events(list(room_state_ids.values()))
+
+        state_map = {}
+        for key, event_id in room_state_ids.items():
+            event = event_map.get(event_id)
+            if event:
+                state_map[key] = event
+
+        return state_map
+
     async def get_room_sync_data(
         self,
         user: UserID,
@@ -1074,7 +1372,7 @@ class SlidingSyncHandler:
         # membership. Currently, we have to make all of these optional because
         # `invite`/`knock` rooms only have `stripped_state`. See
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
-        timeline_events: Optional[List[EventBase]] = None
+        timeline_events: List[EventBase] = []
         bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
         limited: Optional[bool] = None
         prev_batch_token: Optional[StreamToken] = None
@@ -1206,7 +1504,7 @@ class SlidingSyncHandler:
 
         # Figure out any stripped state events for invite/knocks. This allows the
         # potential joiner to identify the room.
-        stripped_state: Optional[List[JsonDict]] = None
+        stripped_state: List[JsonDict] = []
         if room_membership_for_user_at_to_token.membership in (
             Membership.INVITE,
             Membership.KNOCK,
@@ -1232,10 +1530,10 @@ class SlidingSyncHandler:
             stripped_state.append(strip_event(invite_or_knock_event))
 
         # TODO: Handle state resets. For example, if we see
-        # `room_membership_for_user_at_to_token.membership = Membership.LEAVE` but
-        # `required_state` doesn't include it, we should indicate to the client that a
-        # state reset happened. Perhaps we should indicate this by setting `initial:
-        # True` and empty `required_state`.
+        # `room_membership_for_user_at_to_token.event_id=None and
+        # room_membership_for_user_at_to_token.membership is not None`, we should
+        # indicate to the client that a state reset happened. Perhaps we should indicate
+        # this by setting `initial: True` and empty `required_state`.
 
         # TODO: Since we can't determine whether we've already sent a room down this
         # Sliding Sync connection before (we plan to add this optimization in the
@@ -1243,6 +1541,44 @@ class SlidingSyncHandler:
         # updates.
         initial = True
 
+        # Check whether the room has a name set
+        name_state_ids = await self.get_current_state_ids_at(
+            room_id=room_id,
+            room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+            state_filter=StateFilter.from_types([(EventTypes.Name, "")]),
+            to_token=to_token,
+        )
+        name_event_id = name_state_ids.get((EventTypes.Name, ""))
+
+        room_membership_summary: Mapping[str, MemberSummary]
+        empty_membership_summary = MemberSummary([], 0)
+        if room_membership_for_user_at_to_token.membership in (
+            Membership.LEAVE,
+            Membership.BAN,
+        ):
+            # TODO: Figure out how to get the membership summary for left/banned rooms
+            room_membership_summary = {}
+        else:
+            room_membership_summary = await self.store.get_room_summary(room_id)
+            # TODO: Reverse/rewind back to the `to_token`
+
+        # `heroes` are required if the room name is not set.
+        #
+        # Note: When you're the first one on your server to be invited to a new room
+        # over federation, we only have access to some stripped state in
+        # `event.unsigned.invite_room_state` which currently doesn't include `heroes`,
+        # see https://github.com/matrix-org/matrix-spec/issues/380. This means that
+        # clients won't be able to calculate the room name when necessary and just a
+        # pitfall we have to deal with until that spec issue is resolved.
+        hero_user_ids: List[str] = []
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        if name_event_id is None:
+            hero_user_ids = extract_heroes_from_room_summary(
+                room_membership_summary, me=user.to_string()
+            )
+
         # Fetch the `required_state` for the room
         #
         # No `required_state` for invite/knock rooms (just `stripped_state`)
@@ -1253,13 +1589,11 @@ class SlidingSyncHandler:
         # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
         #
         # Calculate the `StateFilter` based on the `required_state` for the room
-        room_state: Optional[StateMap[EventBase]] = None
-        required_room_state: Optional[StateMap[EventBase]] = None
+        required_state_filter = StateFilter.none()
         if room_membership_for_user_at_to_token.membership not in (
             Membership.INVITE,
             Membership.KNOCK,
         ):
-            required_state_filter = StateFilter.none()
             # If we have a double wildcard ("*", "*") in the `required_state`, we need
             # to fetch all state for the room
             #
@@ -1325,86 +1659,65 @@ class SlidingSyncHandler:
 
                 required_state_filter = StateFilter.from_types(required_state_types)
 
-            # We need this base set of info for the response so let's just fetch it along
-            # with the `required_state` for the room
-            META_ROOM_STATE = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")]
+        # We need this base set of info for the response so let's just fetch it along
+        # with the `required_state` for the room
+        meta_room_state = [(EventTypes.Name, ""), (EventTypes.RoomAvatar, "")] + [
+            (EventTypes.Member, hero_user_id) for hero_user_id in hero_user_ids
+        ]
+        state_filter = StateFilter.all()
+        if required_state_filter != StateFilter.all():
             state_filter = StateFilter(
                 types=StateFilter.from_types(
-                    chain(META_ROOM_STATE, required_state_filter.to_types())
+                    chain(meta_room_state, required_state_filter.to_types())
                 ).types,
                 include_others=required_state_filter.include_others,
             )
 
-            # We can return all of the state that was requested if this was the first
-            # time we've sent the room down this connection.
-            if initial:
-                # People shouldn't see past their leave/ban event
-                if room_membership_for_user_at_to_token.membership in (
-                    Membership.LEAVE,
-                    Membership.BAN,
-                ):
-                    room_state = await self.storage_controllers.state.get_state_at(
-                        room_id,
-                        stream_position=to_token.copy_and_replace(
-                            StreamKeyType.ROOM,
-                            room_membership_for_user_at_to_token.event_pos.to_room_stream_token(),
-                        ),
-                        state_filter=state_filter,
-                        # Partially-stated rooms should have all state events except for
-                        # remote membership events. Since we've already excluded
-                        # partially-stated rooms unless `required_state` only has
-                        # `["m.room.member", "$LAZY"]` for membership, we should be able to
-                        # retrieve everything requested. When we're lazy-loading, if there
-                        # are some remote senders in the timeline, we should also have their
-                        # membership event because we had to auth that timeline event. Plus
-                        # we don't want to block the whole sync waiting for this one room.
-                        await_full_state=False,
-                    )
-                # Otherwise, we can get the latest current state in the room
-                else:
-                    room_state = await self.storage_controllers.state.get_current_state(
-                        room_id,
-                        state_filter,
-                        # Partially-stated rooms should have all state events except for
-                        # remote membership events. Since we've already excluded
-                        # partially-stated rooms unless `required_state` only has
-                        # `["m.room.member", "$LAZY"]` for membership, we should be able to
-                        # retrieve everything requested. When we're lazy-loading, if there
-                        # are some remote senders in the timeline, we should also have their
-                        # membership event because we had to auth that timeline event. Plus
-                        # we don't want to block the whole sync waiting for this one room.
-                        await_full_state=False,
-                    )
-                    # TODO: Query `current_state_delta_stream` and reverse/rewind back to the `to_token`
-            else:
-                # TODO: Once we can figure out if we've sent a room down this connection before,
-                # we can return updates instead of the full required state.
-                raise NotImplementedError()
+        # We can return all of the state that was requested if this was the first
+        # time we've sent the room down this connection.
+        room_state: StateMap[EventBase] = {}
+        if initial:
+            room_state = await self.get_current_state_at(
+                room_id=room_id,
+                room_membership_for_user_at_to_token=room_membership_for_user_at_to_token,
+                state_filter=state_filter,
+                to_token=to_token,
+            )
+        else:
+            # TODO: Once we can figure out if we've sent a room down this connection before,
+            # we can return updates instead of the full required state.
+            raise NotImplementedError()
 
-            if required_state_filter != StateFilter.none():
-                required_room_state = required_state_filter.filter_state(room_state)
+        required_room_state: StateMap[EventBase] = {}
+        if required_state_filter != StateFilter.none():
+            required_room_state = required_state_filter.filter_state(room_state)
 
         # Find the room name and avatar from the state
         room_name: Optional[str] = None
+        # TODO: Should we also check for `EventTypes.CanonicalAlias`
+        # (`m.room.canonical_alias`) as a fallback for the room name? see
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1671260153
+        name_event = room_state.get((EventTypes.Name, ""))
+        if name_event is not None:
+            room_name = name_event.content.get("name")
+
         room_avatar: Optional[str] = None
-        if room_state is not None:
-            name_event = room_state.get((EventTypes.Name, ""))
-            if name_event is not None:
-                room_name = name_event.content.get("name")
-
-            avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
-            if avatar_event is not None:
-                room_avatar = avatar_event.content.get("url")
-        elif stripped_state is not None:
-            for event in stripped_state:
-                if event["type"] == EventTypes.Name:
-                    room_name = event.get("content", {}).get("name")
-                elif event["type"] == EventTypes.RoomAvatar:
-                    room_avatar = event.get("content", {}).get("url")
-
-                # Found everything so we can stop looking
-                if room_name is not None and room_avatar is not None:
-                    break
+        avatar_event = room_state.get((EventTypes.RoomAvatar, ""))
+        if avatar_event is not None:
+            room_avatar = avatar_event.content.get("url")
+
+        # Assemble heroes: extract the info from the state we just fetched
+        heroes: List[SlidingSyncResult.RoomResult.StrippedHero] = []
+        for hero_user_id in hero_user_ids:
+            member_event = room_state.get((EventTypes.Member, hero_user_id))
+            if member_event is not None:
+                heroes.append(
+                    SlidingSyncResult.RoomResult.StrippedHero(
+                        user_id=hero_user_id,
+                        display_name=member_event.content.get("displayname"),
+                        avatar_url=member_event.content.get("avatar_url"),
+                    )
+                )
 
         # Figure out the last bump event in the room
         last_bump_event_result = (
@@ -1423,14 +1736,10 @@ class SlidingSyncHandler:
         return SlidingSyncResult.RoomResult(
             name=room_name,
             avatar=room_avatar,
-            # TODO: Dummy value
-            heroes=None,
-            # TODO: Dummy value
-            is_dm=False,
+            heroes=heroes,
+            is_dm=room_membership_for_user_at_to_token.is_dm,
             initial=initial,
-            required_state=(
-                list(required_room_state.values()) if required_room_state else None
-            ),
+            required_state=list(required_room_state.values()),
             timeline_events=timeline_events,
             bundled_aggregations=bundled_aggregations,
             stripped_state=stripped_state,
@@ -1438,9 +1747,12 @@ class SlidingSyncHandler:
             limited=limited,
             num_live=num_live,
             bump_stamp=bump_stamp,
-            # TODO: Dummy values
-            joined_count=0,
-            invited_count=0,
+            joined_count=room_membership_summary.get(
+                Membership.JOIN, empty_membership_summary
+            ).count,
+            invited_count=room_membership_summary.get(
+                Membership.INVITE, empty_membership_summary
+            ).count,
             # TODO: These are just dummy values. We could potentially just remove these
             # since notifications can only really be done correctly on the client anyway
             # (encrypted rooms).
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 749b01dd0e..6fd75fd381 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -90,7 +90,7 @@ from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.logging.opentracing import set_tag, start_active_span, tags
 from synapse.types import JsonDict
 from synapse.util import json_decoder
-from synapse.util.async_helpers import AwakenableSleeper, timeout_deferred
+from synapse.util.async_helpers import AwakenableSleeper, Linearizer, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.util.stringutils import parse_and_validate_server_name
 
@@ -475,6 +475,8 @@ class MatrixFederationHttpClient:
             use_proxy=True,
         )
 
+        self.remote_download_linearizer = Linearizer("remote_download_linearizer", 6)
+
     def wake_destination(self, destination: str) -> None:
         """Called when the remote server may have come back online."""
 
@@ -1486,35 +1488,44 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         try:
-            # add a byte of headroom to max size as function errs at >=
-            d = read_body_with_max_size(response, output_stream, expected_size + 1)
-            d.addTimeout(self.default_timeout_seconds, self.reactor)
-            length = await make_deferred_yieldable(d)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as function errs at >=
+                d = read_body_with_max_size(response, output_stream, expected_size + 1)
+                d.addTimeout(self.default_timeout_seconds, self.reactor)
+                length = await make_deferred_yieldable(d)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1560,6 +1571,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers
 
     async def federation_get_file(
@@ -1630,29 +1648,37 @@ class MatrixFederationHttpClient:
         )
 
         headers = dict(response.headers.getAllRawHeaders())
-
         expected_size = response.length
-        # if we don't get an expected length then use the max length
+
         if expected_size == UNKNOWN_LENGTH:
             expected_size = max_size
-            logger.debug(
-                f"File size unknown, assuming file is max allowable size: {max_size}"
-            )
+        else:
+            if int(expected_size) > max_size:
+                msg = "Requested file is too large > %r bytes" % (max_size,)
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(HTTPStatus.BAD_GATEWAY, msg, Codes.TOO_LARGE)
 
-        read_body, _ = await download_ratelimiter.can_do_action(
-            requester=None,
-            key=ip_address,
-            n_actions=expected_size,
-        )
-        if not read_body:
-            msg = "Requested file size exceeds ratelimits"
-            logger.warning(
-                "{%s} [%s] %s",
-                request.txn_id,
-                request.destination,
-                msg,
+            read_body, _ = await download_ratelimiter.can_do_action(
+                requester=None,
+                key=ip_address,
+                n_actions=expected_size,
             )
-            raise SynapseError(HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED)
+            if not read_body:
+                msg = "Requested file size exceeds ratelimits"
+                logger.warning(
+                    "{%s} [%s] %s",
+                    request.txn_id,
+                    request.destination,
+                    msg,
+                )
+                raise SynapseError(
+                    HTTPStatus.TOO_MANY_REQUESTS, msg, Codes.LIMIT_EXCEEDED
+                )
 
         # this should be a multipart/mixed response with the boundary string in the header
         try:
@@ -1672,11 +1698,12 @@ class MatrixFederationHttpClient:
             raise SynapseError(HTTPStatus.BAD_GATEWAY, msg)
 
         try:
-            # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
-            deferred = read_multipart_response(
-                response, output_stream, boundary, expected_size + 1
-            )
-            deferred.addTimeout(self.default_timeout_seconds, self.reactor)
+            async with self.remote_download_linearizer.queue(ip_address):
+                # add a byte of headroom to max size as `_MultipartParserProtocol.dataReceived` errs at >=
+                deferred = read_multipart_response(
+                    response, output_stream, boundary, expected_size + 1
+                )
+                deferred.addTimeout(self.default_timeout_seconds, self.reactor)
         except BodyExceededMaxSize:
             msg = "Requested file is too large > %r bytes" % (expected_size,)
             logger.warning(
@@ -1743,6 +1770,13 @@ class MatrixFederationHttpClient:
             request.method,
             request.uri.decode("ascii"),
         )
+
+        # if we didn't know the length upfront, decrement the actual size from ratelimiter
+        if response.length == UNKNOWN_LENGTH:
+            download_ratelimiter.record_action(
+                requester=None, key=ip_address, n_actions=length
+            )
+
         return length, headers, multipart_response.json
 
 
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index 94d5faf9f7..1d8cbfdf00 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -997,8 +997,21 @@ class SlidingSyncRestServlet(RestServlet):
             if room_result.avatar:
                 serialized_rooms[room_id]["avatar"] = room_result.avatar
 
-            if room_result.heroes:
-                serialized_rooms[room_id]["heroes"] = room_result.heroes
+            if room_result.heroes is not None and len(room_result.heroes) > 0:
+                serialized_heroes = []
+                for hero in room_result.heroes:
+                    serialized_hero = {
+                        "user_id": hero.user_id,
+                    }
+                    if hero.display_name is not None:
+                        # Not a typo, just how "displayname" is spelled in the spec
+                        serialized_hero["displayname"] = hero.display_name
+
+                    if hero.avatar_url is not None:
+                        serialized_hero["avatar_url"] = hero.avatar_url
+
+                    serialized_heroes.append(serialized_hero)
+                serialized_rooms[room_id]["heroes"] = serialized_heroes
 
             # We should only include the `initial` key if it's `True` to save bandwidth.
             # The absense of this flag means `False`.
@@ -1006,7 +1019,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["initial"] = room_result.initial
 
             # This will be omitted for invite/knock rooms with `stripped_state`
-            if room_result.required_state is not None:
+            if (
+                room_result.required_state is not None
+                and len(room_result.required_state) > 0
+            ):
                 serialized_required_state = (
                     await self.event_serializer.serialize_events(
                         room_result.required_state,
@@ -1017,7 +1033,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["required_state"] = serialized_required_state
 
             # This will be omitted for invite/knock rooms with `stripped_state`
-            if room_result.timeline_events is not None:
+            if (
+                room_result.timeline_events is not None
+                and len(room_result.timeline_events) > 0
+            ):
                 serialized_timeline = await self.event_serializer.serialize_events(
                     room_result.timeline_events,
                     time_now,
@@ -1045,7 +1064,10 @@ class SlidingSyncRestServlet(RestServlet):
                 serialized_rooms[room_id]["is_dm"] = room_result.is_dm
 
             # Stripped state only applies to invite/knock rooms
-            if room_result.stripped_state is not None:
+            if (
+                room_result.stripped_state is not None
+                and len(room_result.stripped_state) > 0
+            ):
                 # TODO: `knocked_state` but that isn't specced yet.
                 #
                 # TODO: Instead of adding `knocked_state`, it would be good to rename
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index 5d2fd08495..f62d9f705d 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -279,8 +279,19 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
 
     @cached(max_entries=100000)  # type: ignore[synapse-@cached-mutable]
     async def get_room_summary(self, room_id: str) -> Mapping[str, MemberSummary]:
-        """Get the details of a room roughly suitable for use by the room
+        """
+        Get the details of a room roughly suitable for use by the room
         summary extension to /sync. Useful when lazy loading room members.
+
+        Returns the total count of members in the room by membership type, and a
+        truncated list of members (the heroes). This will be the first 6 members of the
+        room:
+        - We want 5 heroes plus 1, in case one of them is the
+        calling user.
+        - They are ordered by `stream_ordering`, which are joined or
+        invited. When no joined or invited members are available, this also includes
+        banned and left users.
+
         Args:
             room_id: The room ID to query
         Returns:
@@ -308,23 +319,36 @@ class RoomMemberWorkerStore(EventsWorkerStore, CacheInvalidationWorkerStore):
             for count, membership in txn:
                 res.setdefault(membership, MemberSummary([], count))
 
-            # we order by membership and then fairly arbitrarily by event_id so
-            # heroes are consistent
-            # Note, rejected events will have a null membership field, so
-            # we we manually filter them out.
+            # Order by membership (joins -> invites -> leave (former insiders) ->
+            # everything else (outsiders like bans/knocks), then by `stream_ordering` so
+            # the first members in the room show up first and to make the sort stable
+            # (consistent heroes).
+            #
+            # Note: rejected events will have a null membership field, so we we manually
+            # filter them out.
             sql = """
                 SELECT state_key, membership, event_id
                 FROM current_state_events
                 WHERE type = 'm.room.member' AND room_id = ?
                     AND membership IS NOT NULL
                 ORDER BY
-                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 ELSE 3 END ASC,
-                    event_id ASC
+                    CASE membership WHEN ? THEN 1 WHEN ? THEN 2 WHEN ? THEN 3 ELSE 4 END ASC,
+                    event_stream_ordering ASC
                 LIMIT ?
             """
 
-            # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
-            txn.execute(sql, (room_id, Membership.JOIN, Membership.INVITE, 6))
+            txn.execute(
+                sql,
+                (
+                    room_id,
+                    # Sort order
+                    Membership.JOIN,
+                    Membership.INVITE,
+                    Membership.LEAVE,
+                    # 6 is 5 (number of heroes) plus 1, in case one of them is the calling user.
+                    6,
+                ),
+            )
             for user_id, membership, event_id in txn:
                 summary = res[membership]
                 # we will always have a summary for this membership type at this
@@ -1509,10 +1533,19 @@ def extract_heroes_from_room_summary(
 ) -> List[str]:
     """Determine the users that represent a room, from the perspective of the `me` user.
 
+    This function expects `MemberSummary.members` to already be sorted by
+    `stream_ordering` like the results from `get_room_summary(...)`.
+
     The rules which say which users we select are specified in the "Room Summary"
     section of
     https://spec.matrix.org/v1.4/client-server-api/#get_matrixclientv3sync
 
+
+    Args:
+        details: Mapping from membership type to member summary. We expect
+            `MemberSummary.members` to already be sorted by `stream_ordering`.
+        me: The user for whom we are determining the heroes for.
+
     Returns a list (possibly empty) of heroes' mxids.
     """
     empty_ms = MemberSummary([], 0)
@@ -1527,11 +1560,11 @@ def extract_heroes_from_room_summary(
         r[0] for r in details.get(Membership.LEAVE, empty_ms).members if r[0] != me
     ] + [r[0] for r in details.get(Membership.BAN, empty_ms).members if r[0] != me]
 
-    # FIXME: order by stream ordering rather than as returned by SQL
+    # We expect `MemberSummary.members` to already be sorted by `stream_ordering`
     if joined_user_ids or invited_user_ids:
-        return sorted(joined_user_ids + invited_user_ids)[0:5]
+        return (joined_user_ids + invited_user_ids)[0:5]
     else:
-        return sorted(gone_user_ids)[0:5]
+        return gone_user_ids[0:5]
 
 
 @attr.s(slots=True, auto_attribs=True)
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index b22a13ef01..3962ecc996 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -20,6 +20,7 @@
 #
 #
 import abc
+import logging
 import re
 import string
 from enum import Enum
@@ -74,6 +75,9 @@ if TYPE_CHECKING:
     from synapse.storage.databases.main import DataStore, PurgeEventsStore
     from synapse.storage.databases.main.appservice import ApplicationServiceWorkerStore
 
+
+logger = logging.getLogger(__name__)
+
 # Define a state map type from type/state_key to T (usually an event ID or
 # event)
 T = TypeVar("T")
@@ -454,6 +458,8 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     represented by a default `stream` attribute and a map of instance name to
     stream position of any writers that are ahead of the default stream
     position.
+
+    The values in `instance_map` must be greater than the `stream` attribute.
     """
 
     stream: int = attr.ib(validator=attr.validators.instance_of(int), kw_only=True)
@@ -468,6 +474,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
         kw_only=True,
     )
 
+    def __attrs_post_init__(self) -> None:
+        # Enforce that all instances have a value greater than the min stream
+        # position.
+        for i, v in self.instance_map.items():
+            if v <= self.stream:
+                raise ValueError(
+                    f"'instance_map' includes a stream position before the main 'stream' attribute. Instance: {i}"
+                )
+
     @classmethod
     @abc.abstractmethod
     async def parse(cls, store: "DataStore", string: str) -> "Self":
@@ -494,6 +509,9 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
             for instance in set(self.instance_map).union(other.instance_map)
         }
 
+        # Filter out any redundant entries.
+        instance_map = {i: s for i, s in instance_map.items() if s > max_stream}
+
         return attr.evolve(
             self, stream=max_stream, instance_map=immutabledict(instance_map)
         )
@@ -539,10 +557,15 @@ class AbstractMultiWriterStreamToken(metaclass=abc.ABCMeta):
     def bound_stream_token(self, max_stream: int) -> "Self":
         """Bound the stream positions to a maximum value"""
 
+        min_pos = min(self.stream, max_stream)
         return type(self)(
-            stream=min(self.stream, max_stream),
+            stream=min_pos,
             instance_map=immutabledict(
-                {k: min(s, max_stream) for k, s in self.instance_map.items()}
+                {
+                    k: min(s, max_stream)
+                    for k, s in self.instance_map.items()
+                    if min(s, max_stream) > min_pos
+                }
             ),
         )
 
@@ -637,6 +660,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 "Cannot set both 'topological' and 'instance_map' on 'RoomStreamToken'."
             )
 
+        super().__attrs_post_init__()
+
     @classmethod
     async def parse(cls, store: "PurgeEventsStore", string: str) -> "RoomStreamToken":
         try:
@@ -651,6 +676,11 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -666,7 +696,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid room stream token %r" % (string,))
 
     @classmethod
@@ -713,6 +746,8 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
         return self.instance_map.get(instance_name, self.stream)
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.topological is not None:
             return "t%d-%d" % (self.topological, self.stream)
         elif self.instance_map:
@@ -727,8 +762,10 @@ class RoomStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return f"s{self.stream}"
         else:
             return "s%d" % (self.stream,)
 
@@ -756,6 +793,11 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
 
                 instance_map = {}
                 for part in parts[1:]:
+                    if not part:
+                        # Handle tokens of the form `m5~`, which were created by
+                        # a bug
+                        continue
+
                     key, value = part.split(".")
                     instance_id = int(key)
                     pos = int(value)
@@ -770,10 +812,15 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
         except CancelledError:
             raise
         except Exception:
-            pass
+            # We log an exception here as even though this *might* be a client
+            # handing a bad token, its more likely that Synapse returned a bad
+            # token (and we really want to catch those!).
+            logger.exception("Failed to parse stream token: %r", string)
         raise SynapseError(400, "Invalid stream token %r" % (string,))
 
     async def to_string(self, store: "DataStore") -> str:
+        """See class level docstring for information about the format."""
+
         if self.instance_map:
             entries = []
             for name, pos in self.instance_map.items():
@@ -786,8 +833,10 @@ class MultiWriterStreamToken(AbstractMultiWriterStreamToken):
                 instance_id = await store.get_id_for_instance(name)
                 entries.append(f"{instance_id}.{pos}")
 
-            encoded_map = "~".join(entries)
-            return f"m{self.stream}~{encoded_map}"
+            if entries:
+                encoded_map = "~".join(entries)
+                return f"m{self.stream}~{encoded_map}"
+            return str(self.stream)
         else:
             return str(self.stream)
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index a8a3a8f242..409120470a 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -200,18 +200,24 @@ class SlidingSyncResult:
                 flag set. (same as sync v2)
         """
 
+        @attr.s(slots=True, frozen=True, auto_attribs=True)
+        class StrippedHero:
+            user_id: str
+            display_name: Optional[str]
+            avatar_url: Optional[str]
+
         name: Optional[str]
         avatar: Optional[str]
-        heroes: Optional[List[EventBase]]
+        heroes: Optional[List[StrippedHero]]
         is_dm: bool
         initial: bool
-        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
-        required_state: Optional[List[EventBase]]
-        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
-        timeline_events: Optional[List[EventBase]]
+        # Should be empty for invite/knock rooms with `stripped_state`
+        required_state: List[EventBase]
+        # Should be empty for invite/knock rooms with `stripped_state`
+        timeline_events: List[EventBase]
         bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
         # Optional because it's only relevant to invite/knock rooms
-        stripped_state: Optional[List[JsonDict]]
+        stripped_state: List[JsonDict]
         # Only optional because it won't be included for invite/knock rooms with `stripped_state`
         prev_batch: Optional[StreamToken]
         # Only optional because it won't be included for invite/knock rooms with `stripped_state`
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index 1e8fe76c99..dbe37bc712 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -200,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel):
                     }
 
             timeline_limit: The maximum number of timeline events to return per response.
-            include_heroes: Return a stripped variant of membership events (containing
-                `user_id` and optionally `avatar_url` and `displayname`) for the users used
-                to calculate the room name.
             filters: Filters to apply to the list before sorting.
         """
 
@@ -270,7 +267,6 @@ class SlidingSyncBody(RequestBodyModel):
         else:
             ranges: Optional[List[Tuple[conint(ge=0, strict=True), conint(ge=0, strict=True)]]] = None  # type: ignore[valid-type]
         slow_get_all_rooms: Optional[StrictBool] = False
-        include_heroes: Optional[StrictBool] = False
         filters: Optional[Filters] = None
 
     class RoomSubscription(CommonRoomParameters):