summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-07-02 11:07:05 -0500
committerGitHub <noreply@github.com>2024-07-02 11:07:05 -0500
commitfa916558056013678e88d9dc2a2f64b161d9c77f (patch)
tree2a726ca48f2a131047d31199a481c9b41f539f5c /synapse
parentMerge branch 'release-v1.110' into develop (diff)
downloadsynapse-fa916558056013678e88d9dc2a2f64b161d9c77f.tar.xz
Return some room data in Sliding Sync `/sync` (#17320)
 - Timeline events
 - Stripped `invite_state`

Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync
Diffstat (limited to 'synapse')
-rw-r--r--synapse/events/utils.py18
-rw-r--r--synapse/handlers/sliding_sync.py642
-rw-r--r--synapse/rest/client/sync.py112
-rw-r--r--synapse/storage/databases/main/events_worker.py12
-rw-r--r--synapse/storage/databases/main/stream.py282
-rw-r--r--synapse/storage/schema/main/delta/42/current_state_delta.sql5
-rw-r--r--synapse/types/__init__.py3
-rw-r--r--synapse/types/handlers/__init__.py37
-rw-r--r--synapse/types/rest/client/__init__.py11
9 files changed, 920 insertions, 202 deletions
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index b997d82d71..f937fd4698 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -836,3 +836,21 @@ def maybe_upsert_event_field(
             del container[key]
 
     return upsert_okay
+
+
+def strip_event(event: EventBase) -> JsonDict:
+    """
+    Used for "stripped state" events which provide a simplified view of the state of a
+    room intended to help a potential joiner identify the room (relevant when the user
+    is invited or knocked).
+
+    Stripped state events can only have the `sender`, `type`, `state_key` and `content`
+    properties present.
+    """
+
+    return {
+        "type": event.type,
+        "state_key": event.state_key,
+        "content": event.content,
+        "sender": event.sender,
+    }
diff --git a/synapse/handlers/sliding_sync.py b/synapse/handlers/sliding_sync.py
index 847a638bba..8622ef8472 100644
--- a/synapse/handlers/sliding_sync.py
+++ b/synapse/handlers/sliding_sync.py
@@ -18,22 +18,28 @@
 #
 #
 import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Set, Tuple
 
+import attr
 from immutabledict import immutabledict
 
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, Direction, EventTypes, Membership
 from synapse.events import EventBase
-from synapse.storage.roommember import RoomsForUser
+from synapse.events.utils import strip_event
+from synapse.handlers.relations import BundledAggregations
+from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
 from synapse.types import (
+    JsonDict,
     PersistedEventPosition,
     Requester,
     RoomStreamToken,
+    StreamKeyType,
     StreamToken,
     UserID,
 )
 from synapse.types.handlers import OperationType, SlidingSyncConfig, SlidingSyncResult
 from synapse.types.state import StateFilter
+from synapse.visibility import filter_events_for_client
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -41,28 +47,9 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-def convert_event_to_rooms_for_user(event: EventBase) -> RoomsForUser:
-    """
-    Quick helper to convert an event to a `RoomsForUser` object.
-    """
-    # These fields should be present for all persisted events
-    assert event.internal_metadata.stream_ordering is not None
-    assert event.internal_metadata.instance_name is not None
-
-    return RoomsForUser(
-        room_id=event.room_id,
-        sender=event.sender,
-        membership=event.membership,
-        event_id=event.event_id,
-        event_pos=PersistedEventPosition(
-            event.internal_metadata.instance_name,
-            event.internal_metadata.stream_ordering,
-        ),
-        room_version_id=event.room_version.identifier,
-    )
-
-
-def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) -> bool:
+def filter_membership_for_sync(
+    *, membership: str, user_id: str, sender: Optional[str]
+) -> bool:
     """
     Returns True if the membership event should be included in the sync response,
     otherwise False.
@@ -79,7 +66,54 @@ def filter_membership_for_sync(*, membership: str, user_id: str, sender: str) ->
     #
     # This logic includes kicks (leave events where the sender is not the same user) and
     # can be read as "anything that isn't a leave or a leave with a different sender".
-    return membership != Membership.LEAVE or sender != user_id
+    #
+    # When `sender=None` and `membership=Membership.LEAVE`, it means that a state reset
+    # happened that removed the user from the room, or the user was the last person
+    # locally to leave the room which caused the server to leave the room. In both
+    # cases, we can just remove the rooms since they are no longer relevant to the user.
+    # They could still be added back later if they are `newly_left`.
+    return membership != Membership.LEAVE or sender not in (user_id, None)
+
+
+# We can't freeze this class because we want to update it in place with the
+# de-duplicated data.
+@attr.s(slots=True, auto_attribs=True)
+class RoomSyncConfig:
+    """
+    Holds the config for what data we should fetch for a room in the sync response.
+
+    Attributes:
+        timeline_limit: The maximum number of events to return in the timeline.
+        required_state: The set of state events requested for the room. The
+            values are close to `StateKey` but actually use a syntax where you can
+            provide `*` wildcard and `$LAZY` for lazy room members as the `state_key` part
+            of the tuple (type, state_key).
+    """
+
+    timeline_limit: int
+    required_state: Set[Tuple[str, str]]
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _RoomMembershipForUser:
+    """
+    Attributes:
+        event_id: The event ID of the membership event
+        event_pos: The stream position of the membership event
+        membership: The membership state of the user in the room
+        sender: The person who sent the membership event
+        newly_joined: Whether the user newly joined the room during the given token
+            range
+    """
+
+    event_id: Optional[str]
+    event_pos: PersistedEventPosition
+    membership: str
+    sender: Optional[str]
+    newly_joined: bool
+
+    def copy_and_replace(self, **kwds: Any) -> "_RoomMembershipForUser":
+        return attr.evolve(self, **kwds)
 
 
 class SlidingSyncHandler:
@@ -90,6 +124,7 @@ class SlidingSyncHandler:
         self.auth_blocking = hs.get_auth_blocking()
         self.notifier = hs.get_notifier()
         self.event_sources = hs.get_event_sources()
+        self.relations_handler = hs.get_relations_handler()
         self.rooms_to_exclude_globally = hs.config.server.rooms_to_exclude_from_sync
 
     async def wait_for_sync_for_user(
@@ -201,6 +236,7 @@ class SlidingSyncHandler:
 
         # Assemble sliding window lists
         lists: Dict[str, SlidingSyncResult.SlidingWindowList] = {}
+        relevant_room_map: Dict[str, RoomSyncConfig] = {}
         if sync_config.lists:
             # Get all of the room IDs that the user should be able to see in the sync
             # response
@@ -225,29 +261,67 @@ class SlidingSyncHandler:
                 ops: List[SlidingSyncResult.SlidingWindowList.Operation] = []
                 if list_config.ranges:
                     for range in list_config.ranges:
+                        sliced_room_ids = [
+                            room_id
+                            # Both sides of range are inclusive
+                            for room_id, _ in sorted_room_info[range[0] : range[1] + 1]
+                        ]
+
                         ops.append(
                             SlidingSyncResult.SlidingWindowList.Operation(
                                 op=OperationType.SYNC,
                                 range=range,
-                                room_ids=[
-                                    room_id
-                                    for room_id, _ in sorted_room_info[
-                                        range[0] : range[1]
-                                    ]
-                                ],
+                                room_ids=sliced_room_ids,
                             )
                         )
 
+                        # Take the superset of the `RoomSyncConfig` for each room
+                        for room_id in sliced_room_ids:
+                            if relevant_room_map.get(room_id) is not None:
+                                # Take the highest timeline limit
+                                if (
+                                    relevant_room_map[room_id].timeline_limit
+                                    < list_config.timeline_limit
+                                ):
+                                    relevant_room_map[room_id].timeline_limit = (
+                                        list_config.timeline_limit
+                                    )
+
+                                # Union the required state
+                                relevant_room_map[room_id].required_state.update(
+                                    list_config.required_state
+                                )
+                            else:
+                                relevant_room_map[room_id] = RoomSyncConfig(
+                                    timeline_limit=list_config.timeline_limit,
+                                    required_state=set(list_config.required_state),
+                                )
+
                 lists[list_key] = SlidingSyncResult.SlidingWindowList(
                     count=len(sorted_room_info),
                     ops=ops,
                 )
 
+        # TODO: if (sync_config.room_subscriptions):
+
+        # Fetch room data
+        rooms: Dict[str, SlidingSyncResult.RoomResult] = {}
+        for room_id, room_sync_config in relevant_room_map.items():
+            room_sync_result = await self.get_room_sync_data(
+                user=sync_config.user,
+                room_id=room_id,
+                room_sync_config=room_sync_config,
+                rooms_membership_for_user_at_to_token=sync_room_map[room_id],
+                from_token=from_token,
+                to_token=to_token,
+            )
+
+            rooms[room_id] = room_sync_result
+
         return SlidingSyncResult(
             next_pos=to_token,
             lists=lists,
-            # TODO: Gather room data for rooms in lists and `sync_config.room_subscriptions`
-            rooms={},
+            rooms=rooms,
             extensions={},
         )
 
@@ -256,7 +330,7 @@ class SlidingSyncHandler:
         user: UserID,
         to_token: StreamToken,
         from_token: Optional[StreamToken] = None,
-    ) -> Dict[str, RoomsForUser]:
+    ) -> Dict[str, _RoomMembershipForUser]:
         """
         Fetch room IDs that should be listed for this user in the sync response (the
         full room list that will be filtered, sorted, and sliced).
@@ -305,13 +379,17 @@ class SlidingSyncHandler:
 
         # Our working list of rooms that can show up in the sync response
         sync_room_id_set = {
-            room_for_user.room_id: room_for_user
-            for room_for_user in room_for_user_list
-            if filter_membership_for_sync(
+            # Note: The `room_for_user` we're assigning here will need to be fixed up
+            # (below) because they are potentially from the current snapshot time
+            # instead from the time of the `to_token`.
+            room_for_user.room_id: _RoomMembershipForUser(
+                event_id=room_for_user.event_id,
+                event_pos=room_for_user.event_pos,
                 membership=room_for_user.membership,
-                user_id=user_id,
                 sender=room_for_user.sender,
+                newly_joined=False,
             )
+            for room_for_user in room_for_user_list
         }
 
         # Get the `RoomStreamToken` that represents the spot we queried up to when we got
@@ -346,14 +424,9 @@ class SlidingSyncHandler:
         #
         # - 1a) Remove rooms that the user joined after the `to_token`
         # - 1b) Add back rooms that the user left after the `to_token`
+        # - 1c) Update room membership events to the point in time of the `to_token`
         # - 2) Add back newly_left rooms (> `from_token` and <= `to_token`)
-        #
-        # Below, we're doing two separate lookups for membership changes. We could
-        # request everything for both fixups in one range, [`from_token.room_key`,
-        # `membership_snapshot_token`), but we want to avoid raw `stream_ordering`
-        # comparison without `instance_name` (which is flawed). We could refactor
-        # `event.internal_metadata` to include `instance_name` but it might turn out a
-        # little difficult and a bigger, broader Synapse change than we want to make.
+        # - 3) Figure out which rooms are `newly_joined`
 
         # 1) -----------------------------------------------------
 
@@ -363,159 +436,198 @@ class SlidingSyncHandler:
         # If our `to_token` is already the same or ahead of the latest room membership
         # for the user, we don't need to do any "2)" fix-ups and can just straight-up
         # use the room list from the snapshot as a base (nothing has changed)
-        membership_change_events_after_to_token = []
+        current_state_delta_membership_changes_after_to_token = []
         if not membership_snapshot_token.is_before_or_eq(to_token.room_key):
-            membership_change_events_after_to_token = (
-                await self.store.get_membership_changes_for_user(
+            current_state_delta_membership_changes_after_to_token = (
+                await self.store.get_current_state_delta_membership_changes_for_user(
                     user_id,
                     from_key=to_token.room_key,
                     to_key=membership_snapshot_token,
-                    excluded_rooms=self.rooms_to_exclude_globally,
+                    excluded_room_ids=self.rooms_to_exclude_globally,
                 )
             )
 
-        # 1) Assemble a list of the last membership events in some given ranges. Someone
-        # could have left and joined multiple times during the given range but we only
-        # care about end-result so we grab the last one.
-        last_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
-        # We also need the first membership event after the `to_token` so we can step
-        # backward to the previous membership that would apply to the from/to range.
-        first_membership_change_by_room_id_after_to_token: Dict[str, EventBase] = {}
-        for event in membership_change_events_after_to_token:
-            last_membership_change_by_room_id_after_to_token[event.room_id] = event
+        # 1) Assemble a list of the first membership event after the `to_token` so we can
+        # step backward to the previous membership that would apply to the from/to
+        # range.
+        first_membership_change_by_room_id_after_to_token: Dict[
+            str, CurrentStateDeltaMembership
+        ] = {}
+        for membership_change in current_state_delta_membership_changes_after_to_token:
             # Only set if we haven't already set it
             first_membership_change_by_room_id_after_to_token.setdefault(
-                event.room_id, event
+                membership_change.room_id, membership_change
             )
 
         # 1) Fixup
+        #
+        # Since we fetched a snapshot of the users room list at some point in time after
+        # the from/to tokens, we need to revert/rewind some membership changes to match
+        # the point in time of the `to_token`.
         for (
-            last_membership_change_after_to_token
-        ) in last_membership_change_by_room_id_after_to_token.values():
-            room_id = last_membership_change_after_to_token.room_id
-
-            # We want to find the first membership change after the `to_token` then step
-            # backward to know the membership in the from/to range.
-            first_membership_change_after_to_token = (
-                first_membership_change_by_room_id_after_to_token.get(room_id)
-            )
-            assert first_membership_change_after_to_token is not None, (
-                "If there was a `last_membership_change_after_to_token` that we're iterating over, "
-                + "then there should be corresponding a first change. For example, even if there "
-                + "is only one event after the `to_token`, the first and last event will be same event. "
-                + "This is probably a mistake in assembling the `last_membership_change_by_room_id_after_to_token`"
-                + "/`first_membership_change_by_room_id_after_to_token` dicts above."
-            )
-            # TODO: Instead of reading from `unsigned`, refactor this to use the
-            # `current_state_delta_stream` table in the future. Probably a new
-            # `get_membership_changes_for_user()` function that uses
-            # `current_state_delta_stream` with a join to `room_memberships`. This would
-            # help in state reset scenarios since `prev_content` is looking at the
-            # current branch vs the current room state. This is all just data given to
-            # the client so no real harm to data integrity, but we'd like to be nice to
-            # the client. Since the `current_state_delta_stream` table is new, it
-            # doesn't have all events in it. Since this is Sliding Sync, if we ever need
-            # to, we can signal the client to throw all of their state away by sending
-            # "operation: RESET".
-            prev_content = first_membership_change_after_to_token.unsigned.get(
-                "prev_content", {}
-            )
-            prev_membership = prev_content.get("membership", None)
-            prev_sender = first_membership_change_after_to_token.unsigned.get(
-                "prev_sender", None
-            )
-
-            # Check if the previous membership (membership that applies to the from/to
-            # range) should be included in our `sync_room_id_set`
-            should_prev_membership_be_included = (
-                prev_membership is not None
-                and prev_sender is not None
-                and filter_membership_for_sync(
-                    membership=prev_membership,
-                    user_id=user_id,
-                    sender=prev_sender,
-                )
-            )
-
-            # Check if the last membership (membership that applies to our snapshot) was
-            # already included in our `sync_room_id_set`
-            was_last_membership_already_included = filter_membership_for_sync(
-                membership=last_membership_change_after_to_token.membership,
+            room_id,
+            first_membership_change_after_to_token,
+        ) in first_membership_change_by_room_id_after_to_token.items():
+            # 1a) Remove rooms that the user joined after the `to_token`
+            if first_membership_change_after_to_token.prev_event_id is None:
+                sync_room_id_set.pop(room_id, None)
+            # 1b) 1c) From the first membership event after the `to_token`, step backward to the
+            # previous membership that would apply to the from/to range.
+            else:
+                # We don't expect these fields to be `None` if we have a `prev_event_id`
+                # but we're being defensive since it's possible that the prev event was
+                # culled from the database.
+                if (
+                    first_membership_change_after_to_token.prev_event_pos is not None
+                    and first_membership_change_after_to_token.prev_membership
+                    is not None
+                ):
+                    sync_room_id_set[room_id] = _RoomMembershipForUser(
+                        event_id=first_membership_change_after_to_token.prev_event_id,
+                        event_pos=first_membership_change_after_to_token.prev_event_pos,
+                        membership=first_membership_change_after_to_token.prev_membership,
+                        sender=first_membership_change_after_to_token.prev_sender,
+                        newly_joined=False,
+                    )
+                else:
+                    # If we can't find the previous membership event, we shouldn't
+                    # include the room in the sync response since we can't determine the
+                    # exact membership state and shouldn't rely on the current snapshot.
+                    sync_room_id_set.pop(room_id, None)
+
+        # Filter the rooms that that we have updated room membership events to the point
+        # in time of the `to_token` (from the "1)" fixups)
+        filtered_sync_room_id_set = {
+            room_id: room_membership_for_user
+            for room_id, room_membership_for_user in sync_room_id_set.items()
+            if filter_membership_for_sync(
+                membership=room_membership_for_user.membership,
                 user_id=user_id,
-                sender=last_membership_change_after_to_token.sender,
+                sender=room_membership_for_user.sender,
             )
-
-            # 1a) Add back rooms that the user left after the `to_token`
-            #
-            # For example, if the last membership event after the `to_token` is a leave
-            # event, then the room was excluded from `sync_room_id_set` when we first
-            # crafted it above. We should add these rooms back as long as the user also
-            # was part of the room before the `to_token`.
-            if (
-                not was_last_membership_already_included
-                and should_prev_membership_be_included
-            ):
-                sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
-                    last_membership_change_after_to_token
-                )
-            # 1b) Remove rooms that the user joined (hasn't left) after the `to_token`
-            #
-            # For example, if the last membership event after the `to_token` is a "join"
-            # event, then the room was included `sync_room_id_set` when we first crafted
-            # it above. We should remove these rooms as long as the user also wasn't
-            # part of the room before the `to_token`.
-            elif (
-                was_last_membership_already_included
-                and not should_prev_membership_be_included
-            ):
-                del sync_room_id_set[room_id]
+        }
 
         # 2) -----------------------------------------------------
         # We fix-up newly_left rooms after the first fixup because it may have removed
-        # some left rooms that we can figure out our newly_left in the following code
+        # some left rooms that we can figure out are newly_left in the following code
 
         # 2) Fetch membership changes that fall in the range from `from_token` up to `to_token`
-        membership_change_events_in_from_to_range = []
+        current_state_delta_membership_changes_in_from_to_range = []
         if from_token:
-            membership_change_events_in_from_to_range = (
-                await self.store.get_membership_changes_for_user(
+            current_state_delta_membership_changes_in_from_to_range = (
+                await self.store.get_current_state_delta_membership_changes_for_user(
                     user_id,
                     from_key=from_token.room_key,
                     to_key=to_token.room_key,
-                    excluded_rooms=self.rooms_to_exclude_globally,
+                    excluded_room_ids=self.rooms_to_exclude_globally,
                 )
             )
 
         # 2) Assemble a list of the last membership events in some given ranges. Someone
         # could have left and joined multiple times during the given range but we only
         # care about end-result so we grab the last one.
-        last_membership_change_by_room_id_in_from_to_range: Dict[str, EventBase] = {}
-        for event in membership_change_events_in_from_to_range:
-            last_membership_change_by_room_id_in_from_to_range[event.room_id] = event
+        last_membership_change_by_room_id_in_from_to_range: Dict[
+            str, CurrentStateDeltaMembership
+        ] = {}
+        # We also want to assemble a list of the first membership events during the token
+        # range so we can step backward to the previous membership that would apply to
+        # before the token range to see if we have `newly_joined` the room.
+        first_membership_change_by_room_id_in_from_to_range: Dict[
+            str, CurrentStateDeltaMembership
+        ] = {}
+        # Keep track if the room has a non-join event in the token range so we can later
+        # tell if it was a `newly_joined` room. If the last membership event in the
+        # token range is a join and there is also some non-join in the range, we know
+        # they `newly_joined`.
+        has_non_join_event_by_room_id_in_from_to_range: Dict[str, bool] = {}
+        for (
+            membership_change
+        ) in current_state_delta_membership_changes_in_from_to_range:
+            room_id = membership_change.room_id
+
+            last_membership_change_by_room_id_in_from_to_range[room_id] = (
+                membership_change
+            )
+            # Only set if we haven't already set it
+            first_membership_change_by_room_id_in_from_to_range.setdefault(
+                room_id, membership_change
+            )
+
+            if membership_change.membership != Membership.JOIN:
+                has_non_join_event_by_room_id_in_from_to_range[room_id] = True
 
         # 2) Fixup
+        #
+        # 3) We also want to assemble a list of possibly newly joined rooms. Someone
+        # could have left and joined multiple times during the given range but we only
+        # care about whether they are joined at the end of the token range so we are
+        # working with the last membership even in the token range.
+        possibly_newly_joined_room_ids = set()
         for (
             last_membership_change_in_from_to_range
         ) in last_membership_change_by_room_id_in_from_to_range.values():
             room_id = last_membership_change_in_from_to_range.room_id
 
+            # 3)
+            if last_membership_change_in_from_to_range.membership == Membership.JOIN:
+                possibly_newly_joined_room_ids.add(room_id)
+
             # 2) Add back newly_left rooms (> `from_token` and <= `to_token`). We
             # include newly_left rooms because the last event that the user should see
             # is their own leave event
             if last_membership_change_in_from_to_range.membership == Membership.LEAVE:
-                sync_room_id_set[room_id] = convert_event_to_rooms_for_user(
-                    last_membership_change_in_from_to_range
+                filtered_sync_room_id_set[room_id] = _RoomMembershipForUser(
+                    event_id=last_membership_change_in_from_to_range.event_id,
+                    event_pos=last_membership_change_in_from_to_range.event_pos,
+                    membership=last_membership_change_in_from_to_range.membership,
+                    sender=last_membership_change_in_from_to_range.sender,
+                    newly_joined=False,
                 )
 
-        return sync_room_id_set
+        # 3) Figure out `newly_joined`
+        for room_id in possibly_newly_joined_room_ids:
+            has_non_join_in_from_to_range = (
+                has_non_join_event_by_room_id_in_from_to_range.get(room_id, False)
+            )
+            # If the last membership event in the token range is a join and there is
+            # also some non-join in the range, we know they `newly_joined`.
+            if has_non_join_in_from_to_range:
+                # We found a `newly_joined` room (we left and joined within the token range)
+                filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                    room_id
+                ].copy_and_replace(newly_joined=True)
+            else:
+                prev_event_id = first_membership_change_by_room_id_in_from_to_range[
+                    room_id
+                ].prev_event_id
+                prev_membership = first_membership_change_by_room_id_in_from_to_range[
+                    room_id
+                ].prev_membership
+
+                if prev_event_id is None:
+                    # We found a `newly_joined` room (we are joining the room for the
+                    # first time within the token range)
+                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                        room_id
+                    ].copy_and_replace(newly_joined=True)
+                # Last resort, we need to step back to the previous membership event
+                # just before the token range to see if we're joined then or not.
+                elif prev_membership != Membership.JOIN:
+                    # We found a `newly_joined` room (we left before the token range
+                    # and joined within the token range)
+                    filtered_sync_room_id_set[room_id] = filtered_sync_room_id_set[
+                        room_id
+                    ].copy_and_replace(newly_joined=True)
+
+        return filtered_sync_room_id_set
 
     async def filter_rooms(
         self,
         user: UserID,
-        sync_room_map: Dict[str, RoomsForUser],
+        sync_room_map: Dict[str, _RoomMembershipForUser],
         filters: SlidingSyncConfig.SlidingSyncList.Filters,
         to_token: StreamToken,
-    ) -> Dict[str, RoomsForUser]:
+    ) -> Dict[str, _RoomMembershipForUser]:
         """
         Filter rooms based on the sync request.
 
@@ -629,9 +741,9 @@ class SlidingSyncHandler:
 
     async def sort_rooms(
         self,
-        sync_room_map: Dict[str, RoomsForUser],
+        sync_room_map: Dict[str, _RoomMembershipForUser],
         to_token: StreamToken,
-    ) -> List[Tuple[str, RoomsForUser]]:
+    ) -> List[Tuple[str, _RoomMembershipForUser]]:
         """
         Sort by `stream_ordering` of the last event that the user should see in the
         room. `stream_ordering` is unique so we get a stable sort.
@@ -678,3 +790,229 @@ class SlidingSyncHandler:
             # We want descending order
             reverse=True,
         )
+
+    async def get_room_sync_data(
+        self,
+        user: UserID,
+        room_id: str,
+        room_sync_config: RoomSyncConfig,
+        rooms_membership_for_user_at_to_token: _RoomMembershipForUser,
+        from_token: Optional[StreamToken],
+        to_token: StreamToken,
+    ) -> SlidingSyncResult.RoomResult:
+        """
+        Fetch room data for the sync response.
+
+        We fetch data according to the token range (> `from_token` and <= `to_token`).
+
+        Args:
+            user: User to fetch data for
+            room_id: The room ID to fetch data for
+            room_sync_config: Config for what data we should fetch for a room in the
+                sync response.
+            rooms_membership_for_user_at_to_token: Membership information for the user
+                in the room at the time of `to_token`.
+            from_token: The point in the stream to sync from.
+            to_token: The point in the stream to sync up to.
+        """
+
+        # Assemble the list of timeline events
+        #
+        # It would be nice to make the `rooms` response more uniform regardless of
+        # membership. Currently, we have to make all of these optional because
+        # `invite`/`knock` rooms only have `stripped_state`. See
+        # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1653045932
+        timeline_events: Optional[List[EventBase]] = None
+        bundled_aggregations: Optional[Dict[str, BundledAggregations]] = None
+        limited: Optional[bool] = None
+        prev_batch_token: Optional[StreamToken] = None
+        num_live: Optional[int] = None
+        if (
+            room_sync_config.timeline_limit > 0
+            # No timeline for invite/knock rooms (just `stripped_state`)
+            and rooms_membership_for_user_at_to_token.membership
+            not in (Membership.INVITE, Membership.KNOCK)
+        ):
+            limited = False
+            # We want to start off using the `to_token` (vs `from_token`) because we look
+            # backwards from the `to_token` up to the `timeline_limit` and we might not
+            # reach the `from_token` before we hit the limit. We will update the room stream
+            # position once we've fetched the events to point to the earliest event fetched.
+            prev_batch_token = to_token
+
+            # We're going to paginate backwards from the `to_token`
+            from_bound = to_token.room_key
+            # People shouldn't see past their leave/ban event
+            if rooms_membership_for_user_at_to_token.membership in (
+                Membership.LEAVE,
+                Membership.BAN,
+            ):
+                from_bound = (
+                    rooms_membership_for_user_at_to_token.event_pos.to_room_stream_token()
+                )
+
+            # Determine whether we should limit the timeline to the token range.
+            #
+            # We should return historical messages (before token range) in the
+            # following cases because we want clients to be able to show a basic
+            # screen of information:
+            #  - Initial sync (because no `from_token` to limit us anyway)
+            #  - When users `newly_joined`
+            #  - TODO: For an incremental sync where we haven't sent it down this
+            #    connection before
+            to_bound = (
+                from_token.room_key
+                if from_token is not None
+                and not rooms_membership_for_user_at_to_token.newly_joined
+                else None
+            )
+
+            timeline_events, new_room_key = await self.store.paginate_room_events(
+                room_id=room_id,
+                from_key=from_bound,
+                to_key=to_bound,
+                direction=Direction.BACKWARDS,
+                # We add one so we can determine if there are enough events to saturate
+                # the limit or not (see `limited`)
+                limit=room_sync_config.timeline_limit + 1,
+                event_filter=None,
+            )
+
+            # We want to return the events in ascending order (the last event is the
+            # most recent).
+            timeline_events.reverse()
+
+            # Determine our `limited` status based on the timeline. We do this before
+            # filtering the events so we can accurately determine if there is more to
+            # paginate even if we filter out some/all events.
+            if len(timeline_events) > room_sync_config.timeline_limit:
+                limited = True
+                # Get rid of that extra "+ 1" event because we only used it to determine
+                # if we hit the limit or not
+                timeline_events = timeline_events[-room_sync_config.timeline_limit :]
+                assert timeline_events[0].internal_metadata.stream_ordering
+                new_room_key = RoomStreamToken(
+                    stream=timeline_events[0].internal_metadata.stream_ordering - 1
+                )
+
+            # Make sure we don't expose any events that the client shouldn't see
+            timeline_events = await filter_events_for_client(
+                self.storage_controllers,
+                user.to_string(),
+                timeline_events,
+                is_peeking=rooms_membership_for_user_at_to_token.membership
+                != Membership.JOIN,
+                filter_send_to_client=True,
+            )
+            # TODO: Filter out `EventTypes.CallInvite` in public rooms,
+            # see https://github.com/element-hq/synapse/issues/17359
+
+            # TODO: Handle timeline gaps (`get_timeline_gaps()`)
+
+            # Determine how many "live" events we have (events within the given token range).
+            #
+            # This is mostly useful to determine whether a given @mention event should
+            # make a noise or not. Clients cannot rely solely on the absence of
+            # `initial: true` to determine live events because if a room not in the
+            # sliding window bumps into the window because of an @mention it will have
+            # `initial: true` yet contain a single live event (with potentially other
+            # old events in the timeline)
+            num_live = 0
+            if from_token is not None:
+                for timeline_event in reversed(timeline_events):
+                    # This fields should be present for all persisted events
+                    assert timeline_event.internal_metadata.stream_ordering is not None
+                    assert timeline_event.internal_metadata.instance_name is not None
+
+                    persisted_position = PersistedEventPosition(
+                        instance_name=timeline_event.internal_metadata.instance_name,
+                        stream=timeline_event.internal_metadata.stream_ordering,
+                    )
+                    if persisted_position.persisted_after(from_token.room_key):
+                        num_live += 1
+                    else:
+                        # Since we're iterating over the timeline events in
+                        # reverse-chronological order, we can break once we hit an event
+                        # that's not live. In the future, we could potentially optimize
+                        # this more with a binary search (bisect).
+                        break
+
+            # If the timeline is `limited=True`, the client does not have all events
+            # necessary to calculate aggregations themselves.
+            if limited:
+                bundled_aggregations = (
+                    await self.relations_handler.get_bundled_aggregations(
+                        timeline_events, user.to_string()
+                    )
+                )
+
+            # Update the `prev_batch_token` to point to the position that allows us to
+            # keep paginating backwards from the oldest event we return in the timeline.
+            prev_batch_token = prev_batch_token.copy_and_replace(
+                StreamKeyType.ROOM, new_room_key
+            )
+
+        # Figure out any stripped state events for invite/knocks. This allows the
+        # potential joiner to identify the room.
+        stripped_state: List[JsonDict] = []
+        if rooms_membership_for_user_at_to_token.membership in (
+            Membership.INVITE,
+            Membership.KNOCK,
+        ):
+            # This should never happen. If someone is invited/knocked on room, then
+            # there should be an event for it.
+            assert rooms_membership_for_user_at_to_token.event_id is not None
+
+            invite_or_knock_event = await self.store.get_event(
+                rooms_membership_for_user_at_to_token.event_id
+            )
+
+            stripped_state = []
+            if invite_or_knock_event.membership == Membership.INVITE:
+                stripped_state.extend(
+                    invite_or_knock_event.unsigned.get("invite_room_state", [])
+                )
+            elif invite_or_knock_event.membership == Membership.KNOCK:
+                stripped_state.extend(
+                    invite_or_knock_event.unsigned.get("knock_room_state", [])
+                )
+
+            stripped_state.append(strip_event(invite_or_knock_event))
+
+        # TODO: Handle state resets. For example, if we see
+        # `rooms_membership_for_user_at_to_token.membership = Membership.LEAVE` but
+        # `required_state` doesn't include it, we should indicate to the client that a
+        # state reset happened. Perhaps we should indicate this by setting `initial:
+        # True` and empty `required_state`.
+
+        return SlidingSyncResult.RoomResult(
+            # TODO: Dummy value
+            name=None,
+            # TODO: Dummy value
+            avatar=None,
+            # TODO: Dummy value
+            heroes=None,
+            # TODO: Since we can't determine whether we've already sent a room down this
+            # Sliding Sync connection before (we plan to add this optimization in the
+            # future), we're always returning the requested room state instead of
+            # updates.
+            initial=True,
+            # TODO: Dummy value
+            required_state=[],
+            timeline_events=timeline_events,
+            bundled_aggregations=bundled_aggregations,
+            # TODO: Dummy value
+            is_dm=False,
+            stripped_state=stripped_state,
+            prev_batch=prev_batch_token,
+            limited=limited,
+            # TODO: Dummy values
+            joined_count=0,
+            invited_count=0,
+            # TODO: These are just dummy values. We could potentially just remove these
+            # since notifications can only really be done correctly on the client anyway
+            # (encrypted rooms).
+            notification_count=0,
+            highlight_count=0,
+            num_live=num_live,
+        )
diff --git a/synapse/rest/client/sync.py b/synapse/rest/client/sync.py
index b5ab0d8534..1d955a2e89 100644
--- a/synapse/rest/client/sync.py
+++ b/synapse/rest/client/sync.py
@@ -761,7 +761,6 @@ class SlidingSyncRestServlet(RestServlet):
             "lists": {
                 "foo-list": {
                     "ranges": [ [0, 99] ],
-                    "sort": [ "by_notification_level", "by_recency", "by_name" ],
                     "required_state": [
                         ["m.room.join_rules", ""],
                         ["m.room.history_visibility", ""],
@@ -771,7 +770,6 @@ class SlidingSyncRestServlet(RestServlet):
                     "filters": {
                         "is_dm": true
                     },
-                    "bump_event_types": [ "m.room.message", "m.room.encrypted" ],
                 }
             },
             // Room Subscriptions API
@@ -779,10 +777,6 @@ class SlidingSyncRestServlet(RestServlet):
                 "!sub1:bar": {
                     "required_state": [ ["*","*"] ],
                     "timeline_limit": 10,
-                    "include_old_rooms": {
-                        "timeline_limit": 1,
-                        "required_state": [ ["m.room.tombstone", ""], ["m.room.create", ""] ],
-                    }
                 }
             },
             // Extensions API
@@ -791,7 +785,7 @@ class SlidingSyncRestServlet(RestServlet):
 
     Response JSON::
         {
-            "next_pos": "s58_224_0_13_10_1_1_16_0_1",
+            "pos": "s58_224_0_13_10_1_1_16_0_1",
             "lists": {
                 "foo-list": {
                     "count": 1337,
@@ -830,7 +824,8 @@ class SlidingSyncRestServlet(RestServlet):
                     "joined_count": 41,
                     "invited_count": 1,
                     "notification_count": 1,
-                    "highlight_count": 0
+                    "highlight_count": 0,
+                    "num_live": 2"
                 },
                 // rooms from list
                 "!foo:bar": {
@@ -855,7 +850,8 @@ class SlidingSyncRestServlet(RestServlet):
                     "joined_count": 4,
                     "invited_count": 0,
                     "notification_count": 54,
-                    "highlight_count": 3
+                    "highlight_count": 3,
+                    "num_live": 1,
                 },
                  // ... 99 more items
             },
@@ -871,10 +867,11 @@ class SlidingSyncRestServlet(RestServlet):
         super().__init__()
         self.auth = hs.get_auth()
         self.store = hs.get_datastores().main
+        self.clock = hs.get_clock()
         self.filtering = hs.get_filtering()
         self.sliding_sync_handler = hs.get_sliding_sync_handler()
+        self.event_serializer = hs.get_event_client_serializer()
 
-    # TODO: Update this to `on_GET` once we figure out how we want to handle params
     async def on_POST(self, request: SynapseRequest) -> Tuple[int, JsonDict]:
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
         user = requester.user
@@ -920,22 +917,25 @@ class SlidingSyncRestServlet(RestServlet):
             logger.info("Client has disconnected; not serializing response.")
             return 200, {}
 
-        response_content = await self.encode_response(sliding_sync_results)
+        response_content = await self.encode_response(requester, sliding_sync_results)
 
         return 200, response_content
 
     # TODO: Is there a better way to encode things?
     async def encode_response(
         self,
+        requester: Requester,
         sliding_sync_result: SlidingSyncResult,
     ) -> JsonDict:
         response: JsonDict = defaultdict(dict)
 
-        response["next_pos"] = await sliding_sync_result.next_pos.to_string(self.store)
+        response["pos"] = await sliding_sync_result.next_pos.to_string(self.store)
         serialized_lists = self.encode_lists(sliding_sync_result.lists)
         if serialized_lists:
             response["lists"] = serialized_lists
-        response["rooms"] = {}  # TODO: sliding_sync_result.rooms
+        response["rooms"] = await self.encode_rooms(
+            requester, sliding_sync_result.rooms
+        )
         response["extensions"] = {}  # TODO: sliding_sync_result.extensions
 
         return response
@@ -961,6 +961,92 @@ class SlidingSyncRestServlet(RestServlet):
 
         return serialized_lists
 
+    async def encode_rooms(
+        self,
+        requester: Requester,
+        rooms: Dict[str, SlidingSyncResult.RoomResult],
+    ) -> JsonDict:
+        time_now = self.clock.time_msec()
+
+        serialize_options = SerializeEventConfig(
+            event_format=format_event_for_client_v2_without_room_id,
+            requester=requester,
+        )
+
+        serialized_rooms: Dict[str, JsonDict] = {}
+        for room_id, room_result in rooms.items():
+            serialized_rooms[room_id] = {
+                "joined_count": room_result.joined_count,
+                "invited_count": room_result.invited_count,
+                "notification_count": room_result.notification_count,
+                "highlight_count": room_result.highlight_count,
+            }
+
+            if room_result.name:
+                serialized_rooms[room_id]["name"] = room_result.name
+
+            if room_result.avatar:
+                serialized_rooms[room_id]["avatar"] = room_result.avatar
+
+            if room_result.heroes:
+                serialized_rooms[room_id]["heroes"] = room_result.heroes
+
+            # We should only include the `initial` key if it's `True` to save bandwidth.
+            # The absense of this flag means `False`.
+            if room_result.initial:
+                serialized_rooms[room_id]["initial"] = room_result.initial
+
+            # This will omitted for invite/knock rooms with `stripped_state`
+            if room_result.required_state is not None:
+                serialized_required_state = (
+                    await self.event_serializer.serialize_events(
+                        room_result.required_state,
+                        time_now,
+                        config=serialize_options,
+                    )
+                )
+                serialized_rooms[room_id]["required_state"] = serialized_required_state
+
+            # This will omitted for invite/knock rooms with `stripped_state`
+            if room_result.timeline_events is not None:
+                serialized_timeline = await self.event_serializer.serialize_events(
+                    room_result.timeline_events,
+                    time_now,
+                    config=serialize_options,
+                    bundle_aggregations=room_result.bundled_aggregations,
+                )
+                serialized_rooms[room_id]["timeline"] = serialized_timeline
+
+            # This will omitted for invite/knock rooms with `stripped_state`
+            if room_result.limited is not None:
+                serialized_rooms[room_id]["limited"] = room_result.limited
+
+            # This will omitted for invite/knock rooms with `stripped_state`
+            if room_result.prev_batch is not None:
+                serialized_rooms[room_id]["prev_batch"] = (
+                    await room_result.prev_batch.to_string(self.store)
+                )
+
+            # This will omitted for invite/knock rooms with `stripped_state`
+            if room_result.num_live is not None:
+                serialized_rooms[room_id]["num_live"] = room_result.num_live
+
+            # Field should be absent on non-DM rooms
+            if room_result.is_dm:
+                serialized_rooms[room_id]["is_dm"] = room_result.is_dm
+
+            # Stripped state only applies to invite/knock rooms
+            if room_result.stripped_state is not None:
+                # TODO: `knocked_state` but that isn't specced yet.
+                #
+                # TODO: Instead of adding `knocked_state`, it would be good to rename
+                # this to `stripped_state` so it can be shared between invite and knock
+                # rooms, see
+                # https://github.com/matrix-org/matrix-spec-proposals/pull/3575#discussion_r1117629919
+                serialized_rooms[room_id]["invite_state"] = room_result.stripped_state
+
+        return serialized_rooms
+
 
 def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
     SyncRestServlet(hs).register(http_server)
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 198e65cfa5..a5acea8c3b 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -55,7 +55,7 @@ from synapse.api.room_versions import (
 )
 from synapse.events import EventBase, make_event_from_dict
 from synapse.events.snapshot import EventContext
-from synapse.events.utils import prune_event
+from synapse.events.utils import prune_event, strip_event
 from synapse.logging.context import (
     PreserveLoggingContext,
     current_context,
@@ -1025,15 +1025,7 @@ class EventsWorkerStore(SQLBaseStore):
 
         state_to_include = await self.get_events(selected_state_ids.values())
 
-        return [
-            {
-                "type": e.type,
-                "state_key": e.state_key,
-                "content": e.content,
-                "sender": e.sender,
-            }
-            for e in state_to_include.values()
-        ]
+        return [strip_event(e) for e in state_to_include.values()]
 
     def _maybe_start_fetch_thread(self) -> None:
         """Starts an event fetch thread if we are not yet at the maximum number."""
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index b7eb3116ae..d34376b8df 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -44,6 +44,7 @@ what sort order was used:
 import logging
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Any,
     Collection,
     Dict,
@@ -62,7 +63,7 @@ from typing_extensions import Literal
 
 from twisted.internet import defer
 
-from synapse.api.constants import Direction
+from synapse.api.constants import Direction, EventTypes, Membership
 from synapse.api.filtering import Filter
 from synapse.events import EventBase
 from synapse.logging.context import make_deferred_yieldable, run_in_background
@@ -111,6 +112,32 @@ class _EventsAround:
     end: RoomStreamToken
 
 
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class CurrentStateDeltaMembership:
+    """
+    Attributes:
+        event_id: The "current" membership event ID in this room.
+        event_pos: The position of the "current" membership event in the event stream.
+        prev_event_id: The previous membership event in this room that was replaced by
+            the "current" one. May be `None` if there was no previous membership event.
+        room_id: The room ID of the membership event.
+        membership: The membership state of the user in the room
+        sender: The person who sent the membership event
+    """
+
+    room_id: str
+    # Event
+    event_id: Optional[str]
+    event_pos: PersistedEventPosition
+    membership: str
+    sender: Optional[str]
+    # Prev event
+    prev_event_id: Optional[str]
+    prev_event_pos: Optional[PersistedEventPosition]
+    prev_membership: Optional[str]
+    prev_sender: Optional[str]
+
+
 def generate_pagination_where_clause(
     direction: Direction,
     column_names: Tuple[str, str],
@@ -390,6 +417,43 @@ def _filter_results(
     return True
 
 
+def _filter_results_by_stream(
+    lower_token: Optional[RoomStreamToken],
+    upper_token: Optional[RoomStreamToken],
+    instance_name: str,
+    stream_ordering: int,
+) -> bool:
+    """
+    This function only works with "live" tokens with `stream_ordering` only. See
+    `_filter_results(...)` if you want to work with all tokens.
+
+    Returns True if the event persisted by the given instance at the given
+    stream_ordering falls between the two tokens (taking a None
+    token to mean unbounded).
+
+    Used to filter results from fetching events in the DB against the given
+    tokens. This is necessary to handle the case where the tokens include
+    position maps, which we handle by fetching more than necessary from the DB
+    and then filtering (rather than attempting to construct a complicated SQL
+    query).
+    """
+    if lower_token:
+        assert lower_token.topological is None
+
+        # If these are live tokens we compare the stream ordering against the
+        # writers stream position.
+        if stream_ordering <= lower_token.get_stream_pos_for_instance(instance_name):
+            return False
+
+    if upper_token:
+        assert upper_token.topological is None
+
+        if upper_token.get_stream_pos_for_instance(instance_name) < stream_ordering:
+            return False
+
+    return True
+
+
 def filter_to_clause(event_filter: Optional[Filter]) -> Tuple[str, List[str]]:
     # NB: This may create SQL clauses that don't optimise well (and we don't
     # have indices on all possible clauses). E.g. it may create
@@ -734,6 +798,191 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
         return ret, key
 
+    async def get_current_state_delta_membership_changes_for_user(
+        self,
+        user_id: str,
+        from_key: RoomStreamToken,
+        to_key: RoomStreamToken,
+        excluded_room_ids: Optional[List[str]] = None,
+    ) -> List[CurrentStateDeltaMembership]:
+        """
+        Fetch membership events (and the previous event that was replaced by that one)
+        for a given user.
+
+        Note: This function only works with "live" tokens with `stream_ordering` only.
+
+        We're looking for membership changes in the token range (> `from_key` and <=
+        `to_key`).
+
+        Please be mindful to only use this with `from_key` and `to_key` tokens that are
+        recent enough to be after when the first local user joined the room. Otherwise,
+        the results may be incomplete or too greedy. For example, if you use a token
+        range before the first local user joined the room, you will see 0 events since
+        `current_state_delta_stream` tracks what the server thinks is the current state
+        of the room as time goes. It does not track how state progresses from the
+        beginning of the room. So for example, when you remotely join a room, the first
+        rows will just be the state when you joined and progress from there.
+
+        You can probably reasonably use this with `/sync` because the `to_key` passed in
+        will be the "current" now token and the range will cover when the user joined
+        the room.
+
+        Args:
+            user_id: The user ID to fetch membership events for.
+            from_key: The point in the stream to sync from (fetching events > this point).
+            to_key: The token to fetch rooms up to (fetching events <= this point).
+            excluded_room_ids: Optional list of room IDs to exclude from the results.
+
+        Returns:
+            All membership changes to the current state in the token range. Events are
+            sorted by `stream_ordering` ascending.
+        """
+        # Start by ruling out cases where a DB query is not necessary.
+        if from_key == to_key:
+            return []
+
+        if from_key:
+            has_changed = self._membership_stream_cache.has_entity_changed(
+                user_id, int(from_key.stream)
+            )
+            if not has_changed:
+                return []
+
+        def f(txn: LoggingTransaction) -> List[CurrentStateDeltaMembership]:
+            # To handle tokens with a non-empty instance_map we fetch more
+            # results than necessary and then filter down
+            min_from_id = from_key.stream
+            max_to_id = to_key.get_max_stream_pos()
+
+            args: List[Any] = [min_from_id, max_to_id, EventTypes.Member, user_id]
+
+            # TODO: It would be good to assert that the `from_token`/`to_token` is >=
+            # the first row in `current_state_delta_stream` for the rooms we're
+            # interested in. Otherwise, we will end up with empty results and not know
+            # it.
+
+            # We could `COALESCE(e.stream_ordering, s.stream_id)` to get more accurate
+            # stream positioning when available but given our usages, we can avoid the
+            # complexity. Between two (valid) stream tokens, we will still get all of
+            # the state changes. Since those events are persisted in a batch, valid
+            # tokens will either be before or after the batch of events.
+            #
+            # `stream_ordering` from the `events` table is more accurate when available
+            # since the `current_state_delta_stream` table only tracks that the current
+            # state is at this stream position (not what stream position the state event
+            # was added) and uses the *minimum* stream position for batches of events.
+            sql = """
+                SELECT
+                    s.room_id,
+                    e.event_id,
+                    s.instance_name,
+                    s.stream_id,
+                    m.membership,
+                    e.sender,
+                    s.prev_event_id,
+                    e_prev.instance_name AS prev_instance_name,
+                    e_prev.stream_ordering AS prev_stream_ordering,
+                    m_prev.membership AS prev_membership,
+                    e_prev.sender AS prev_sender
+                FROM current_state_delta_stream AS s
+                    LEFT JOIN events AS e ON e.event_id = s.event_id
+                    LEFT JOIN room_memberships AS m ON m.event_id = s.event_id
+                    LEFT JOIN events AS e_prev ON e_prev.event_id = s.prev_event_id
+                    LEFT JOIN room_memberships AS m_prev ON m_prev.event_id = s.prev_event_id
+                WHERE s.stream_id > ? AND s.stream_id <= ?
+                    AND s.type = ?
+                    AND s.state_key = ?
+                ORDER BY s.stream_id ASC
+            """
+
+            txn.execute(sql, args)
+
+            membership_changes: List[CurrentStateDeltaMembership] = []
+            for (
+                room_id,
+                event_id,
+                instance_name,
+                stream_ordering,
+                membership,
+                sender,
+                prev_event_id,
+                prev_instance_name,
+                prev_stream_ordering,
+                prev_membership,
+                prev_sender,
+            ) in txn:
+                assert room_id is not None
+                assert instance_name is not None
+                assert stream_ordering is not None
+
+                if _filter_results_by_stream(
+                    from_key,
+                    to_key,
+                    instance_name,
+                    stream_ordering,
+                ):
+                    # When the server leaves a room, it will insert new rows into the
+                    # `current_state_delta_stream` table with `event_id = null` for all
+                    # current state. This means we might already have a row for the
+                    # leave event and then another for the same leave where the
+                    # `event_id=null` but the `prev_event_id` is pointing back at the
+                    # earlier leave event. We don't want to report the leave, if we
+                    # already have a leave event.
+                    if event_id is None and prev_membership == Membership.LEAVE:
+                        continue
+
+                    membership_change = CurrentStateDeltaMembership(
+                        room_id=room_id,
+                        # Event
+                        event_id=event_id,
+                        event_pos=PersistedEventPosition(
+                            instance_name=instance_name,
+                            stream=stream_ordering,
+                        ),
+                        # When `s.event_id = null`, we won't be able to get respective
+                        # `room_membership` but can assume the user has left the room
+                        # because this only happens when the server leaves a room
+                        # (meaning everyone locally left) or a state reset which removed
+                        # the person from the room.
+                        membership=(
+                            membership if membership is not None else Membership.LEAVE
+                        ),
+                        sender=sender,
+                        # Prev event
+                        prev_event_id=prev_event_id,
+                        prev_event_pos=(
+                            PersistedEventPosition(
+                                instance_name=prev_instance_name,
+                                stream=prev_stream_ordering,
+                            )
+                            if (
+                                prev_instance_name is not None
+                                and prev_stream_ordering is not None
+                            )
+                            else None
+                        ),
+                        prev_membership=prev_membership,
+                        prev_sender=prev_sender,
+                    )
+
+                    membership_changes.append(membership_change)
+
+            return membership_changes
+
+        membership_changes = await self.db_pool.runInteraction(
+            "get_current_state_delta_membership_changes_for_user", f
+        )
+
+        room_ids_to_exclude: AbstractSet[str] = set()
+        if excluded_room_ids is not None:
+            room_ids_to_exclude = set(excluded_room_ids)
+
+        return [
+            membership_change
+            for membership_change in membership_changes
+            if membership_change.room_id not in room_ids_to_exclude
+        ]
+
     @cancellable
     async def get_membership_changes_for_user(
         self,
@@ -769,10 +1018,11 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
 
             ignore_room_clause = ""
             if excluded_rooms is not None and len(excluded_rooms) > 0:
-                ignore_room_clause = "AND e.room_id NOT IN (%s)" % ",".join(
-                    "?" for _ in excluded_rooms
+                ignore_room_clause, ignore_room_args = make_in_list_sql_clause(
+                    txn.database_engine, "e.room_id", excluded_rooms, negative=True
                 )
-                args = args + excluded_rooms
+                ignore_room_clause = f"AND {ignore_room_clause}"
+                args += ignore_room_args
 
             sql = """
                 SELECT m.event_id, instance_name, topological_ordering, stream_ordering
@@ -1554,6 +1804,9 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
     ) -> Tuple[List[EventBase], RoomStreamToken]:
         """Returns list of events before or after a given token.
 
+        When Direction.FORWARDS: from_key < x <= to_key
+        When Direction.BACKWARDS: from_key >= x > to_key
+
         Args:
             room_id
             from_key: The token used to stream from
@@ -1570,6 +1823,27 @@ class StreamWorkerStore(EventsWorkerStore, SQLBaseStore):
             and `to_key`).
         """
 
+        # We can bail early if we're looking forwards, and our `to_key` is already
+        # before our `from_key`.
+        if (
+            direction == Direction.FORWARDS
+            and to_key is not None
+            and to_key.is_before_or_eq(from_key)
+        ):
+            # Token selection matches what we do in `_paginate_room_events_txn` if there
+            # are no rows
+            return [], to_key if to_key else from_key
+        # Or vice-versa, if we're looking backwards and our `from_key` is already before
+        # our `to_key`.
+        elif (
+            direction == Direction.BACKWARDS
+            and to_key is not None
+            and from_key.is_before_or_eq(to_key)
+        ):
+            # Token selection matches what we do in `_paginate_room_events_txn` if there
+            # are no rows
+            return [], to_key if to_key else from_key
+
         rows, token = await self.db_pool.runInteraction(
             "paginate_room_events",
             self._paginate_room_events_txn,
diff --git a/synapse/storage/schema/main/delta/42/current_state_delta.sql b/synapse/storage/schema/main/delta/42/current_state_delta.sql
index 876b61e6a5..3d2fd69480 100644
--- a/synapse/storage/schema/main/delta/42/current_state_delta.sql
+++ b/synapse/storage/schema/main/delta/42/current_state_delta.sql
@@ -32,7 +32,10 @@
  * limitations under the License.
  */
 
-
+-- Tracks what the server thinks is the current state of the room as time goes. It does
+-- not track how state progresses from the beginning of the room. So for example, when
+-- you remotely join a room, the first rows will just be the state when you joined and
+-- progress from there.
 CREATE TABLE current_state_delta_stream (
     stream_id BIGINT NOT NULL,
     room_id TEXT NOT NULL,
diff --git a/synapse/types/__init__.py b/synapse/types/__init__.py
index 8ab9f90238..b22a13ef01 100644
--- a/synapse/types/__init__.py
+++ b/synapse/types/__init__.py
@@ -1096,6 +1096,9 @@ class PersistedPosition:
     stream: int
 
     def persisted_after(self, token: AbstractMultiWriterStreamToken) -> bool:
+        """
+        Checks whether this position happened after the token
+        """
         return token.get_stream_pos_for_instance(self.instance_name) < self.stream
 
 
diff --git a/synapse/types/handlers/__init__.py b/synapse/types/handlers/__init__.py
index 1d65551d5b..3cd3c8fb0f 100644
--- a/synapse/types/handlers/__init__.py
+++ b/synapse/types/handlers/__init__.py
@@ -31,9 +31,12 @@ else:
     from pydantic import Extra
 
 from synapse.events import EventBase
-from synapse.types import JsonMapping, StreamToken, UserID
+from synapse.types import JsonDict, JsonMapping, StreamToken, UserID
 from synapse.types.rest.client import SlidingSyncBody
 
+if TYPE_CHECKING:
+    from synapse.handlers.relations import BundledAggregations
+
 
 class ShutdownRoomParams(TypedDict):
     """
@@ -159,11 +162,16 @@ class SlidingSyncResult:
                 entirely and NOT send "initial":false as this is wasteful on bandwidth. The
                 absence of this flag means 'false'.
             required_state: The current state of the room
-            timeline: Latest events in the room. The last event is the most recent
+            timeline: Latest events in the room. The last event is the most recent.
+            bundled_aggregations: A mapping of event ID to the bundled aggregations for
+                the timeline events above. This allows clients to show accurate reaction
+                counts (or edits, threads), even if some of the reaction events were skipped
+                over in a gappy sync.
             is_dm: Flag to specify whether the room is a direct-message room (most likely
                 between two people).
-            invite_state: Stripped state events. Same as `rooms.invite.$room_id.invite_state`
-                in sync v2, absent on joined/left rooms
+            stripped_state: Stripped state events (for rooms where the usre is
+                invited/knocked). Same as `rooms.invite.$room_id.invite_state` in sync v2,
+                absent on joined/left rooms
             prev_batch: A token that can be passed as a start parameter to the
                 `/rooms/<room_id>/messages` API to retrieve earlier messages.
             limited: True if their are more events than fit between the given position and now.
@@ -185,21 +193,28 @@ class SlidingSyncResult:
                 (with potentially other old events in the timeline).
         """
 
-        name: str
+        name: Optional[str]
         avatar: Optional[str]
         heroes: Optional[List[EventBase]]
         initial: bool
-        required_state: List[EventBase]
-        timeline: List[EventBase]
+        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+        required_state: Optional[List[EventBase]]
+        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+        timeline_events: Optional[List[EventBase]]
+        bundled_aggregations: Optional[Dict[str, "BundledAggregations"]]
         is_dm: bool
-        invite_state: List[EventBase]
-        prev_batch: StreamToken
-        limited: bool
+        # Optional because it's only relevant to invite/knock rooms
+        stripped_state: Optional[List[JsonDict]]
+        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+        prev_batch: Optional[StreamToken]
+        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+        limited: Optional[bool]
         joined_count: int
         invited_count: int
         notification_count: int
         highlight_count: int
-        num_live: int
+        # Only optional because it won't be included for invite/knock rooms with `stripped_state`
+        num_live: Optional[int]
 
     @attr.s(slots=True, frozen=True, auto_attribs=True)
     class SlidingWindowList:
diff --git a/synapse/types/rest/client/__init__.py b/synapse/types/rest/client/__init__.py
index e2c79c4106..5d453769b5 100644
--- a/synapse/types/rest/client/__init__.py
+++ b/synapse/types/rest/client/__init__.py
@@ -152,22 +152,14 @@ class SlidingSyncBody(RequestBodyModel):
                 anyway.
             timeline_limit: The maximum number of timeline events to return per response.
                 (Max 1000 messages)
-            include_old_rooms: Determines if `predecessor` rooms are included in the
-                `rooms` response. The user MUST be joined to old rooms for them to show up
-                in the response.
         """
 
-        class IncludeOldRooms(RequestBodyModel):
-            timeline_limit: StrictInt
-            required_state: List[Tuple[StrictStr, StrictStr]]
-
         required_state: List[Tuple[StrictStr, StrictStr]]
         # mypy workaround via https://github.com/pydantic/pydantic/issues/156#issuecomment-1130883884
         if TYPE_CHECKING:
             timeline_limit: int
         else:
             timeline_limit: conint(le=1000, strict=True)  # type: ignore[valid-type]
-        include_old_rooms: Optional[IncludeOldRooms] = None
 
     class SlidingSyncList(CommonRoomParameters):
         """
@@ -208,9 +200,6 @@ class SlidingSyncBody(RequestBodyModel):
                     }
 
             timeline_limit: The maximum number of timeline events to return per response.
-            include_old_rooms: Determines if `predecessor` rooms are included in the
-                `rooms` response. The user MUST be joined to old rooms for them to show up
-                in the response.
             include_heroes: Return a stripped variant of membership events (containing
                 `user_id` and optionally `avatar_url` and `displayname`) for the users used
                 to calculate the room name.