summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
authorEric Eastwood <eric.eastwood@beta.gouv.fr>2024-07-02 11:07:05 -0500
committerGitHub <noreply@github.com>2024-07-02 11:07:05 -0500
commitfa916558056013678e88d9dc2a2f64b161d9c77f (patch)
tree2a726ca48f2a131047d31199a481c9b41f539f5c /tests
parentMerge branch 'release-v1.110' into develop (diff)
downloadsynapse-fa916558056013678e88d9dc2a2f64b161d9c77f.tar.xz
Return some room data in Sliding Sync `/sync` (#17320)
 - Timeline events
 - Stripped `invite_state`

Based on [MSC3575](https://github.com/matrix-org/matrix-spec-proposals/pull/3575): Sliding Sync
Diffstat (limited to 'tests')
-rw-r--r--tests/handlers/test_sliding_sync.py772
-rw-r--r--tests/rest/client/test_sync.py1079
-rw-r--r--tests/rest/client/utils.py4
-rw-r--r--tests/storage/test_stream.py874
4 files changed, 2672 insertions, 57 deletions
diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
index 8dd4521b18..3d37a696d5 100644
--- a/tests/handlers/test_sliding_sync.py
+++ b/tests/handlers/test_sliding_sync.py
@@ -63,6 +63,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
         self.store = self.hs.get_datastores().main
         self.event_sources = hs.get_event_sources()
+        self.storage_controllers = hs.get_storage_controllers()
 
     def test_no_rooms(self) -> None:
         """
@@ -90,10 +91,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
         before_room_token = self.event_sources.get_current_token()
 
-        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(room_id, user1_id, tok=user1_tok)
 
         after_room_token = self.event_sources.get_current_token()
 
@@ -106,6 +110,15 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
 
         self.assertEqual(room_id_results.keys(), {room_id})
+        # It should be pointing to the join event (latest membership event in the
+        # from/to range)
+        self.assertEqual(
+            room_id_results[room_id].event_id,
+            join_response["event_id"],
+        )
+        # We should be considered `newly_joined` because we joined during the token
+        # range
+        self.assertEqual(room_id_results[room_id].newly_joined, True)
 
     def test_get_already_joined_room(self) -> None:
         """
@@ -113,8 +126,11 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
-        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(room_id, user1_id, tok=user1_tok)
 
         after_room_token = self.event_sources.get_current_token()
 
@@ -127,6 +143,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
 
         self.assertEqual(room_id_results.keys(), {room_id})
+        # It should be pointing to the join event (latest membership event in the
+        # from/to range)
+        self.assertEqual(
+            room_id_results[room_id].event_id,
+            join_response["event_id"],
+        )
+        # We should *NOT* be `newly_joined` because we joined before the token range
+        self.assertEqual(room_id_results[room_id].newly_joined, False)
 
     def test_get_invited_banned_knocked_room(self) -> None:
         """
@@ -142,14 +166,18 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Setup the invited room (user2 invites user1 to the room)
         invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
-        self.helper.invite(invited_room_id, targ=user1_id, tok=user2_tok)
+        invite_response = self.helper.invite(
+            invited_room_id, targ=user1_id, tok=user2_tok
+        )
 
         # Setup the ban room (user2 bans user1 from the room)
         ban_room_id = self.helper.create_room_as(
             user2_id, tok=user2_tok, is_public=True
         )
         self.helper.join(ban_room_id, user1_id, tok=user1_tok)
-        self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
+        ban_response = self.helper.ban(
+            ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok
+        )
 
         # Setup the knock room (user1 knocks on the room)
         knock_room_id = self.helper.create_room_as(
@@ -162,13 +190,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
             tok=user2_tok,
         )
         # User1 knocks on the room
-        channel = self.make_request(
+        knock_channel = self.make_request(
             "POST",
             "/_matrix/client/r0/knock/%s" % (knock_room_id,),
             b"{}",
             user1_tok,
         )
-        self.assertEqual(channel.code, 200, channel.result)
+        self.assertEqual(knock_channel.code, 200, knock_channel.result)
+        knock_room_membership_state_event = self.get_success(
+            self.storage_controllers.state.get_current_state_event(
+                knock_room_id, EventTypes.Member, user1_id
+            )
+        )
+        assert knock_room_membership_state_event is not None
 
         after_room_token = self.event_sources.get_current_token()
 
@@ -189,6 +223,25 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
                 knock_room_id,
             },
         )
+        # It should be pointing to the the respective membership event (latest
+        # membership event in the from/to range)
+        self.assertEqual(
+            room_id_results[invited_room_id].event_id,
+            invite_response["event_id"],
+        )
+        self.assertEqual(
+            room_id_results[ban_room_id].event_id,
+            ban_response["event_id"],
+        )
+        self.assertEqual(
+            room_id_results[knock_room_id].event_id,
+            knock_room_membership_state_event.event_id,
+        )
+        # We should *NOT* be `newly_joined` because we were not joined at the the time
+        # of the `to_token`.
+        self.assertEqual(room_id_results[invited_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[ban_room_id].newly_joined, False)
+        self.assertEqual(room_id_results[knock_room_id].newly_joined, False)
 
     def test_get_kicked_room(self) -> None:
         """
@@ -206,7 +259,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
         self.helper.join(kick_room_id, user1_id, tok=user1_tok)
         # Kick user1 from the room
-        self.helper.change_membership(
+        kick_response = self.helper.change_membership(
             room=kick_room_id,
             src=user2_id,
             targ=user1_id,
@@ -229,6 +282,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # The kicked room should show up
         self.assertEqual(room_id_results.keys(), {kick_room_id})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[kick_room_id].event_id,
+            kick_response["event_id"],
+        )
+        # We should *NOT* be `newly_joined` because we were not joined at the the time
+        # of the `to_token`.
+        self.assertEqual(room_id_results[kick_room_id].newly_joined, False)
 
     def test_forgotten_rooms(self) -> None:
         """
@@ -329,7 +390,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Leave during the from_token/to_token range (newly_left)
         room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+        _leave_response2 = self.helper.leave(room_id2, user1_id, tok=user1_tok)
 
         after_room2_token = self.event_sources.get_current_token()
 
@@ -343,6 +404,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Only the newly_left room should show up
         self.assertEqual(room_id_results.keys(), {room_id2})
+        # It should be pointing to the latest membership event in the from/to range but
+        # the `event_id` is `None` because we left the room causing the server to leave
+        # the room because no other local users are in it (quirk of the
+        # `current_state_delta_stream` table that we source things from)
+        self.assertEqual(
+            room_id_results[room_id2].event_id,
+            None,  # _leave_response2["event_id"],
+        )
+        # We should *NOT* be `newly_joined` because we are instead `newly_left`
+        self.assertEqual(room_id_results[room_id2].newly_joined, False)
 
     def test_no_joins_after_to_token(self) -> None:
         """
@@ -351,16 +422,19 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
         before_room1_token = self.event_sources.get_current_token()
 
-        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
-        # Room join after after our `to_token` shouldn't show up
-        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
-        _ = room_id2
+        # Room join after our `to_token` shouldn't show up
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -371,6 +445,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         )
 
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response1["event_id"],
+        )
+        # We should be `newly_joined` because we joined during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
 
     def test_join_during_range_and_left_room_after_to_token(self) -> None:
         """
@@ -380,15 +461,18 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
         before_room1_token = self.event_sources.get_current_token()
 
-        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Leave the room after we already have our tokens
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -401,6 +485,20 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # We should still see the room because we were joined during the
         # from_token/to_token time period.
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response": join_response["event_id"],
+                    "leave_response": leave_response["event_id"],
+                }
+            ),
+        )
+        # We should be `newly_joined` because we joined during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
 
     def test_join_before_range_and_left_room_after_to_token(self) -> None:
         """
@@ -410,13 +508,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
 
-        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Leave the room after we already have our tokens
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -428,6 +529,20 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # We should still see the room because we were joined before the `from_token`
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response": join_response["event_id"],
+                    "leave_response": leave_response["event_id"],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we joined before the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
 
     def test_kicked_before_range_and_left_after_to_token(self) -> None:
         """
@@ -444,9 +559,9 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         kick_room_id = self.helper.create_room_as(
             user2_id, tok=user2_tok, is_public=True
         )
-        self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+        join_response1 = self.helper.join(kick_room_id, user1_id, tok=user1_tok)
         # Kick user1 from the room
-        self.helper.change_membership(
+        kick_response = self.helper.change_membership(
             room=kick_room_id,
             src=user2_id,
             targ=user1_id,
@@ -463,8 +578,8 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         #
         # We have to join before we can leave (leave -> leave isn't a valid transition
         # or at least it doesn't work in Synapse, 403 forbidden)
-        self.helper.join(kick_room_id, user1_id, tok=user1_tok)
-        self.helper.leave(kick_room_id, user1_id, tok=user1_tok)
+        join_response2 = self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(kick_room_id, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -476,6 +591,22 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # We shouldn't see the room because it was forgotten
         self.assertEqual(room_id_results.keys(), {kick_room_id})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[kick_room_id].event_id,
+            kick_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response1": join_response1["event_id"],
+                    "kick_response": kick_response["event_id"],
+                    "join_response2": join_response2["event_id"],
+                    "leave_response": leave_response["event_id"],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we were kicked
+        self.assertEqual(room_id_results[kick_room_id].newly_joined, False)
 
     def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None:
         """
@@ -494,14 +625,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # leave and can still re-join.
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         # Join and leave the room during the from/to range
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Join and leave the room after we already have our tokens
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -513,6 +644,22 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Room should still show up because it's newly_left during the from/to range
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            leave_response1["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response1": join_response1["event_id"],
+                    "leave_response1": leave_response1["event_id"],
+                    "join_response2": join_response2["event_id"],
+                    "leave_response2": leave_response2["event_id"],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we left during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
 
     def test_newly_left_during_range_and_join_after_to_token(self) -> None:
         """
@@ -531,13 +678,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # leave and can still re-join.
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         # Join and leave the room during the from/to range
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Join the room after we already have our tokens
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -549,11 +696,26 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Room should still show up because it's newly_left during the from/to range
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            leave_response1["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response1": join_response1["event_id"],
+                    "leave_response1": leave_response1["event_id"],
+                    "join_response2": join_response2["event_id"],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we left during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
 
     def test_no_from_token(self) -> None:
         """
         Test that if we don't provide a `from_token`, we get all the rooms that we we're
-        joined to up to the `to_token`.
+        joined up to the `to_token`.
 
         Providing `from_token` only really has the effect that it adds `newly_left`
         rooms to the response.
@@ -569,7 +731,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
 
         # Join room1
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Join and leave the room2 before the `to_token`
         self.helper.join(room_id2, user1_id, tok=user1_tok)
@@ -590,6 +752,14 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Only rooms we were joined to before the `to_token` should show up
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response1["event_id"],
+        )
+        # We should *NOT* be `newly_joined` because there is no `from_token` to
+        # define a "live" range to compare against
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
 
     def test_from_token_ahead_of_to_token(self) -> None:
         """
@@ -609,7 +779,7 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
 
         # Join room1 before `before_room_token`
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         # Join and leave the room2 before `before_room_token`
         self.helper.join(room_id2, user1_id, tok=user1_tok)
@@ -651,6 +821,13 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # There won't be any newly_left rooms because the `from_token` is ahead of the
         # `to_token` and that range will give no membership changes to check.
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response1["event_id"],
+        )
+        # We should *NOT* be `newly_joined` because we joined `room1` before either of the tokens
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
 
     def test_leave_before_range_and_join_leave_after_to_token(self) -> None:
         """
@@ -741,16 +918,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # leave and can still re-join.
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         # Join, leave, join back to the room before the from/to range
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Leave and Join the room multiple times after we already have our tokens
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response3 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -762,6 +939,24 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Room should show up because it was newly_left and joined during the from/to range
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response2["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response1": join_response1["event_id"],
+                    "leave_response1": leave_response1["event_id"],
+                    "join_response2": join_response2["event_id"],
+                    "leave_response2": leave_response2["event_id"],
+                    "join_response3": join_response3["event_id"],
+                    "leave_response3": leave_response3["event_id"],
+                }
+            ),
+        )
+        # We should be `newly_joined` because we joined during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
 
     def test_join_leave_multiple_times_before_range_and_after_to_token(
         self,
@@ -781,16 +976,16 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         # leave and can still re-join.
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         # Join, leave, join back to the room before the from/to range
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Leave and Join the room multiple times after we already have our tokens
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
-        self.helper.join(room_id1, user1_id, tok=user1_tok)
-        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_response2 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response3 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response3 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
@@ -802,6 +997,24 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
 
         # Room should show up because we were joined before the from/to range
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response2["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response1": join_response1["event_id"],
+                    "leave_response1": leave_response1["event_id"],
+                    "join_response2": join_response2["event_id"],
+                    "leave_response2": leave_response2["event_id"],
+                    "join_response3": join_response3["event_id"],
+                    "leave_response3": leave_response3["event_id"],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we joined before the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
 
     def test_invite_before_range_and_join_leave_after_to_token(
         self,
@@ -821,24 +1034,495 @@ class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
         room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
 
         # Invited to the room before the token
-        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+        invite_response = self.helper.invite(
+            room_id1, src=user2_id, targ=user1_id, tok=user2_tok
+        )
 
         after_room1_token = self.event_sources.get_current_token()
 
         # Join and leave the room after we already have our tokens
+        join_respsonse = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        leave_response = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should show up because we were invited before the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            invite_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "invite_response": invite_response["event_id"],
+                    "join_respsonse": join_respsonse["event_id"],
+                    "leave_response": leave_response["event_id"],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we were only invited before the
+        # token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+
+    def test_join_and_display_name_changes_in_token_range(
+        self,
+    ) -> None:
+        """
+        Test that we point to the correct membership event within the from/to range even
+        if there are multiple `join` membership events in a row indicating
+        `displayname`/`avatar_url` updates.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # Update the displayname during the token range
+        displayname_change_during_token_range_response = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname during token range",
+            },
+            tok=user1_tok,
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Update the displayname after the token range
+        displayname_change_after_token_range_response = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname after token range",
+            },
+            tok=user1_tok,
+        )
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should show up because we were joined during the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            displayname_change_during_token_range_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response": join_response["event_id"],
+                    "displayname_change_during_token_range_response": displayname_change_during_token_range_response[
+                        "event_id"
+                    ],
+                    "displayname_change_after_token_range_response": displayname_change_after_token_range_response[
+                        "event_id"
+                    ],
+                }
+            ),
+        )
+        # We should be `newly_joined` because we joined during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
+
+    def test_display_name_changes_in_token_range(
+        self,
+    ) -> None:
+        """
+        Test that we point to the correct membership event within the from/to range even
+        if there is `displayname`/`avatar_url` updates.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Update the displayname during the token range
+        displayname_change_during_token_range_response = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname during token range",
+            },
+            tok=user1_tok,
+        )
+
+        after_change1_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_change1_token,
+            )
+        )
+
+        # Room should show up because we were joined during the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            displayname_change_during_token_range_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response": join_response["event_id"],
+                    "displayname_change_during_token_range_response": displayname_change_during_token_range_response[
+                        "event_id"
+                    ],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we joined before the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+
+    def test_display_name_changes_before_and_after_token_range(
+        self,
+    ) -> None:
+        """
+        Test that we point to the correct membership event even though there are no
+        membership events in the from/range but there are `displayname`/`avatar_url`
+        changes before/after the token range.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # Update the displayname before the token range
+        displayname_change_before_token_range_response = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname during token range",
+            },
+            tok=user1_tok,
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Update the displayname after the token range
+        displayname_change_after_token_range_response = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname after token range",
+            },
+            tok=user1_tok,
+        )
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should show up because we were joined before the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            displayname_change_before_token_range_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response": join_response["event_id"],
+                    "displayname_change_before_token_range_response": displayname_change_before_token_range_response[
+                        "event_id"
+                    ],
+                    "displayname_change_after_token_range_response": displayname_change_after_token_range_response[
+                        "event_id"
+                    ],
+                }
+            ),
+        )
+        # We should *NOT* be `newly_joined` because we joined before the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, False)
+
+    def test_display_name_changes_leave_after_token_range(
+        self,
+    ) -> None:
+        """
+        Test that we point to the correct membership event within the from/to range even
+        if there are multiple `join` membership events in a row indicating
+        `displayname`/`avatar_url` updates and we leave after the `to_token`.
+
+        See condition "1a)" comments in the `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # Update the displayname during the token range
+        displayname_change_during_token_range_response = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname during token range",
+            },
+            tok=user1_tok,
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Update the displayname after the token range
+        displayname_change_after_token_range_response = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname after token range",
+            },
+            tok=user1_tok,
+        )
+
+        # Leave after the token
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should show up because we were joined during the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            displayname_change_during_token_range_response["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response": join_response["event_id"],
+                    "displayname_change_during_token_range_response": displayname_change_during_token_range_response[
+                        "event_id"
+                    ],
+                    "displayname_change_after_token_range_response": displayname_change_after_token_range_response[
+                        "event_id"
+                    ],
+                }
+            ),
+        )
+        # We should be `newly_joined` because we joined during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
+
+    def test_display_name_changes_join_after_token_range(
+        self,
+    ) -> None:
+        """
+        Test that multiple `join` membership events (after the `to_token`) in a row
+        indicating `displayname`/`avatar_url` updates doesn't affect the results (we
+        joined after the token range so it shouldn't show up)
+
+        See condition "1b)" comments in the `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # Update the displayname after the token range
+        self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname after token range",
+            },
+            tok=user1_tok,
+        )
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room shouldn't show up because we joined after the from/to range
+        self.assertEqual(room_id_results.keys(), set())
+
+    def test_newly_joined_with_leave_join_in_token_range(
+        self,
+    ) -> None:
+        """
+        Test that even though we're joined before the token range, if we leave and join
+        within the token range, it's still counted as `newly_joined`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
         self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Leave and join back during the token range
         self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        join_response2 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        after_more_changes_token = self.event_sources.get_current_token()
 
         room_id_results = self.get_success(
             self.sliding_sync_handler.get_sync_room_ids_for_user(
                 UserID.from_string(user1_id),
                 from_token=after_room1_token,
+                to_token=after_more_changes_token,
+            )
+        )
+
+        # Room should show up because we were joined during the from/to range
+        self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            join_response2["event_id"],
+        )
+        # We should be considered `newly_joined` because there is some non-join event in
+        # between our latest join event.
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
+
+    def test_newly_joined_only_joins_during_token_range(
+        self,
+    ) -> None:
+        """
+        Test that a join and more joins caused by display name changes, all during the
+        token range, still count as `newly_joined`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        # Join, leave, join back to the room before the from/to range
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # Update the displayname during the token range (looks like another join)
+        displayname_change_during_token_range_response1 = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname during token range",
+            },
+            tok=user1_tok,
+        )
+        # Update the displayname during the token range (looks like another join)
+        displayname_change_during_token_range_response2 = self.helper.send_state(
+            room_id1,
+            event_type=EventTypes.Member,
+            state_key=user1_id,
+            body={
+                "membership": Membership.JOIN,
+                "displayname": "displayname during token range",
+            },
+            tok=user1_tok,
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
                 to_token=after_room1_token,
             )
         )
 
-        # Room should show up because we were invited before the from/to range
+        # Room should show up because it was newly_left and joined during the from/to range
         self.assertEqual(room_id_results.keys(), {room_id1})
+        # It should be pointing to the latest membership event in the from/to range
+        self.assertEqual(
+            room_id_results[room_id1].event_id,
+            displayname_change_during_token_range_response2["event_id"],
+            "Corresponding map to disambiguate the opaque event IDs: "
+            + str(
+                {
+                    "join_response1": join_response1["event_id"],
+                    "displayname_change_during_token_range_response1": displayname_change_during_token_range_response1[
+                        "event_id"
+                    ],
+                    "displayname_change_during_token_range_response2": displayname_change_during_token_range_response2[
+                        "event_id"
+                    ],
+                }
+            ),
+        )
+        # We should be `newly_joined` because we first joined during the token range
+        self.assertEqual(room_id_results[room_id1].newly_joined, True)
 
     def test_multiple_rooms_are_not_confused(
         self,
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index 12c11f342c..966c622e14 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -31,12 +31,13 @@ from synapse.api.constants import (
     AccountDataTypes,
     EventContentFields,
     EventTypes,
+    HistoryVisibility,
     ReceiptTypes,
     RelationTypes,
 )
 from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
 from synapse.server import HomeServer
-from synapse.types import JsonDict, RoomStreamToken, StreamKeyType
+from synapse.types import JsonDict, RoomStreamToken, StreamKeyType, StreamToken, UserID
 from synapse.util import Clock
 
 from tests import unittest
@@ -1326,7 +1327,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
     def test_sync_list(self) -> None:
         """
-        Test that room IDs show up in the Sliding Sync lists
+        Test that room IDs show up in the Sliding Sync `lists`
         """
         alice_user_id = self.register_user("alice", "correcthorse")
         alice_access_token = self.login(alice_user_id, "correcthorse")
@@ -1425,15 +1426,13 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
         channel.await_result(timeout_ms=200)
         self.assertEqual(channel.code, 200, channel.json_body)
 
-        # We expect the `next_pos` in the result to be the same as what we requested
+        # We expect the next `pos` in the result to be the same as what we requested
         # with because we weren't able to find anything new yet.
-        self.assertEqual(
-            channel.json_body["next_pos"], future_position_token_serialized
-        )
+        self.assertEqual(channel.json_body["pos"], future_position_token_serialized)
 
     def test_filter_list(self) -> None:
         """
-        Test that filters apply to lists
+        Test that filters apply to `lists`
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1564,7 +1563,7 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
 
     def test_sort_list(self) -> None:
         """
-        Test that the lists are sorted by `stream_ordering`
+        Test that the `lists` are sorted by `stream_ordering`
         """
         user1_id = self.register_user("user1", "pass")
         user1_tok = self.login(user1_id, "pass")
@@ -1618,3 +1617,1067 @@ class SlidingSyncTestCase(unittest.HomeserverTestCase):
             ],
             channel.json_body["lists"]["foo-list"],
         )
+
+    def test_sliced_windows(self) -> None:
+        """
+        Test that the `lists` `ranges` are sliced correctly. Both sides of each range
+        are inclusive.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        _room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        room_id3 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        # Make the Sliding Sync request for a single room
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 0]],
+                        "required_state": [
+                            ["m.room.join_rules", ""],
+                            ["m.room.history_visibility", ""],
+                            ["m.space.child", "*"],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make sure it has the foo-list we requested
+        self.assertListEqual(
+            list(channel.json_body["lists"].keys()),
+            ["foo-list"],
+            channel.json_body["lists"].keys(),
+        )
+        # Make sure the list is sorted in the way we expect
+        self.assertListEqual(
+            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 0],
+                    "room_ids": [room_id3],
+                }
+            ],
+            channel.json_body["lists"]["foo-list"],
+        )
+
+        # Make the Sliding Sync request for the first two rooms
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [
+                            ["m.room.join_rules", ""],
+                            ["m.room.history_visibility", ""],
+                            ["m.space.child", "*"],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make sure it has the foo-list we requested
+        self.assertListEqual(
+            list(channel.json_body["lists"].keys()),
+            ["foo-list"],
+            channel.json_body["lists"].keys(),
+        )
+        # Make sure the list is sorted in the way we expect
+        self.assertListEqual(
+            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 1],
+                    "room_ids": [room_id3, room_id2],
+                }
+            ],
+            channel.json_body["lists"]["foo-list"],
+        )
+
+    def test_rooms_limited_initial_sync(self) -> None:
+        """
+        Test that we mark `rooms` as `limited=True` when we saturate the `timeline_limit`
+        on initial sync.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity1", tok=user2_tok)
+        self.helper.send(room_id1, "activity2", tok=user2_tok)
+        event_response3 = self.helper.send(room_id1, "activity3", tok=user2_tok)
+        event_pos3 = self.get_success(
+            self.store.get_position_for_event(event_response3["event_id"])
+        )
+        event_response4 = self.helper.send(room_id1, "activity4", tok=user2_tok)
+        event_pos4 = self.get_success(
+            self.store.get_position_for_event(event_response4["event_id"])
+        )
+        event_response5 = self.helper.send(room_id1, "activity5", tok=user2_tok)
+        user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 3,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We expect to saturate the `timeline_limit` (there are more than 3 messages in the room)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            True,
+            channel.json_body["rooms"][room_id1],
+        )
+        # Check to make sure the latest events are returned
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                event_response4["event_id"],
+                event_response5["event_id"],
+                user1_join_response["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+
+        # Check to make sure the `prev_batch` points at the right place
+        prev_batch_token = self.get_success(
+            StreamToken.from_string(
+                self.store, channel.json_body["rooms"][room_id1]["prev_batch"]
+            )
+        )
+        prev_batch_room_stream_token_serialized = self.get_success(
+            prev_batch_token.room_key.to_string(self.store)
+        )
+        # If we use the `prev_batch` token to look backwards, we should see `event3`
+        # next so make sure the token encompasses it
+        self.assertEqual(
+            event_pos3.persisted_after(prev_batch_token.room_key),
+            False,
+            f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be >= event_pos3={self.get_success(event_pos3.to_room_stream_token().to_string(self.store))}",
+        )
+        # If we use the `prev_batch` token to look backwards, we shouldn't see `event4`
+        # anymore since it was just returned in this response.
+        self.assertEqual(
+            event_pos4.persisted_after(prev_batch_token.room_key),
+            True,
+            f"`prev_batch` token {prev_batch_room_stream_token_serialized} should be < event_pos4={self.get_success(event_pos4.to_room_stream_token().to_string(self.store))}",
+        )
+
+        # With no `from_token` (initial sync), it's all historical since there is no
+        # "live" range
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_rooms_not_limited_initial_sync(self) -> None:
+        """
+        Test that we mark `rooms` as `limited=False` when there are no more events to
+        paginate to.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity1", tok=user2_tok)
+        self.helper.send(room_id1, "activity2", tok=user2_tok)
+        self.helper.send(room_id1, "activity3", tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Make the Sliding Sync request
+        timeline_limit = 100
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": timeline_limit,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # The timeline should be `limited=False` because we have all of the events (no
+        # more to paginate to)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            False,
+            channel.json_body["rooms"][room_id1],
+        )
+        expected_number_of_events = 9
+        # We're just looking to make sure we got all of the events before hitting the `timeline_limit`
+        self.assertEqual(
+            len(channel.json_body["rooms"][room_id1]["timeline"]),
+            expected_number_of_events,
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        self.assertLessEqual(expected_number_of_events, timeline_limit)
+
+        # With no `from_token` (initial sync), it's all historical since there is no
+        # "live" token range.
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_rooms_incremental_sync(self) -> None:
+        """
+        Test `rooms` data during an incremental sync after an initial sync.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.send(room_id1, "activity before initial sync1", tok=user2_tok)
+
+        # Make an initial Sliding Sync request to grab a token. This is also a sanity
+        # check that we can go from initial to incremental sync.
+        sync_params = {
+            "lists": {
+                "foo-list": {
+                    "ranges": [[0, 1]],
+                    "required_state": [],
+                    "timeline_limit": 3,
+                }
+            }
+        }
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            sync_params,
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+        next_pos = channel.json_body["pos"]
+
+        # Send some events but don't send enough to saturate the `timeline_limit`.
+        # We want to later test that we only get the new events since the `next_pos`
+        event_response2 = self.helper.send(room_id1, "activity after2", tok=user2_tok)
+        event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
+
+        # Make an incremental Sliding Sync request (what we're trying to test)
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?pos={next_pos}",
+            sync_params,
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We only expect to see the new events since the last sync which isn't enough to
+        # fill up the `timeline_limit`.
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            False,
+            f'Our `timeline_limit` was {sync_params["lists"]["foo-list"]["timeline_limit"]} '
+            + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+            + str(channel.json_body["rooms"][room_id1]),
+        )
+        # Check to make sure the latest events are returned
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                event_response2["event_id"],
+                event_response3["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+
+        # All events are "live"
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            2,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_rooms_newly_joined_incremental_sync(self) -> None:
+        """
+        Test that when we make an incremental sync with a `newly_joined` `rooms`, we are
+        able to see some historical events before the `from_token`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity before token1", tok=user2_tok)
+        event_response2 = self.helper.send(
+            room_id1, "activity before token2", tok=user2_tok
+        )
+
+        from_token = self.event_sources.get_current_token()
+
+        # Join the room after the `from_token` which will make us consider this room as
+        # `newly_joined`.
+        user1_join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Send some events but don't send enough to saturate the `timeline_limit`.
+        # We want to later test that we only get the new events since the `next_pos`
+        event_response3 = self.helper.send(
+            room_id1, "activity after token3", tok=user2_tok
+        )
+        event_response4 = self.helper.send(
+            room_id1, "activity after token4", tok=user2_tok
+        )
+
+        # The `timeline_limit` is set to 4 so we can at least see one historical event
+        # before the `from_token`. We should see historical events because this is a
+        # `newly_joined` room.
+        timeline_limit = 4
+        # Make an incremental Sliding Sync request (what we're trying to test)
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": timeline_limit,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should see the new events and the rest should be filled with historical
+        # events which will make us `limited=True` since there are more to paginate to.
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            True,
+            f"Our `timeline_limit` was {timeline_limit} "
+            + f'and {len(channel.json_body["rooms"][room_id1]["timeline"])} events were returned in the timeline. '
+            + str(channel.json_body["rooms"][room_id1]),
+        )
+        # Check to make sure that the "live" and historical events are returned
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                event_response2["event_id"],
+                user1_join_response["event_id"],
+                event_response3["event_id"],
+                event_response4["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+
+        # Only events after the `from_token` are "live" (join, event3, event4)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            3,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_rooms_invite_shared_history_initial_sync(self) -> None:
+        """
+        Test that `rooms` we are invited to have some stripped `invite_state` during an
+        initial sync.
+
+        This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+        but we also shouldn't see any timeline events because the history visiblity is
+        `shared` and we haven't joined the room yet.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user1 = UserID.from_string(user1_id)
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user2 = UserID.from_string(user2_id)
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        # Ensure we're testing with a room with `shared` history visibility which means
+        # history visible until you actually join the room.
+        history_visibility_response = self.helper.get_state(
+            room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+        )
+        self.assertEqual(
+            history_visibility_response.get("history_visibility"),
+            HistoryVisibility.SHARED,
+        )
+
+        self.helper.send(room_id1, "activity before1", tok=user2_tok)
+        self.helper.send(room_id1, "activity before2", tok=user2_tok)
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity after3", tok=user2_tok)
+        self.helper.send(room_id1, "activity after4", tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 3,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # `timeline` is omitted for `invite` rooms with `stripped_state`
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("timeline"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("num_live"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("limited"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("prev_batch"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # We should have some `stripped_state` so the potential joiner can identify the
+        # room (we don't care about the order).
+        self.assertCountEqual(
+            channel.json_body["rooms"][room_id1]["invite_state"],
+            [
+                {
+                    "content": {"creator": user2_id, "room_version": "10"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.create",
+                },
+                {
+                    "content": {"join_rule": "public"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.join_rules",
+                },
+                {
+                    "content": {"displayname": user2.localpart, "membership": "join"},
+                    "sender": user2_id,
+                    "state_key": user2_id,
+                    "type": "m.room.member",
+                },
+                {
+                    "content": {"displayname": user1.localpart, "membership": "invite"},
+                    "sender": user2_id,
+                    "state_key": user1_id,
+                    "type": "m.room.member",
+                },
+            ],
+            channel.json_body["rooms"][room_id1]["invite_state"],
+        )
+
+    def test_rooms_invite_shared_history_incremental_sync(self) -> None:
+        """
+        Test that `rooms` we are invited to have some stripped `invite_state` during an
+        incremental sync.
+
+        This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+        but we also shouldn't see any timeline events because the history visiblity is
+        `shared` and we haven't joined the room yet.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user1 = UserID.from_string(user1_id)
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user2 = UserID.from_string(user2_id)
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        # Ensure we're testing with a room with `shared` history visibility which means
+        # history visible until you actually join the room.
+        history_visibility_response = self.helper.get_state(
+            room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+        )
+        self.assertEqual(
+            history_visibility_response.get("history_visibility"),
+            HistoryVisibility.SHARED,
+        )
+
+        self.helper.send(room_id1, "activity before invite1", tok=user2_tok)
+        self.helper.send(room_id1, "activity before invite2", tok=user2_tok)
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
+        self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        self.helper.send(room_id1, "activity after token5", tok=user2_tok)
+        self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 3,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # `timeline` is omitted for `invite` rooms with `stripped_state`
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("timeline"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("num_live"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("limited"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("prev_batch"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # We should have some `stripped_state` so the potential joiner can identify the
+        # room (we don't care about the order).
+        self.assertCountEqual(
+            channel.json_body["rooms"][room_id1]["invite_state"],
+            [
+                {
+                    "content": {"creator": user2_id, "room_version": "10"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.create",
+                },
+                {
+                    "content": {"join_rule": "public"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.join_rules",
+                },
+                {
+                    "content": {"displayname": user2.localpart, "membership": "join"},
+                    "sender": user2_id,
+                    "state_key": user2_id,
+                    "type": "m.room.member",
+                },
+                {
+                    "content": {"displayname": user1.localpart, "membership": "invite"},
+                    "sender": user2_id,
+                    "state_key": user1_id,
+                    "type": "m.room.member",
+                },
+            ],
+            channel.json_body["rooms"][room_id1]["invite_state"],
+        )
+
+    def test_rooms_invite_world_readable_history_initial_sync(self) -> None:
+        """
+        Test that `rooms` we are invited to have some stripped `invite_state` during an
+        initial sync.
+
+        This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+        but depending on the semantics we decide, we could potentially see some
+        historical events before/after the `from_token` because the history is
+        `world_readable`. Same situation for events after the `from_token` if the
+        history visibility was set to `invited`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user1 = UserID.from_string(user1_id)
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user2 = UserID.from_string(user2_id)
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "preset": "public_chat",
+                "initial_state": [
+                    {
+                        "content": {
+                            "history_visibility": HistoryVisibility.WORLD_READABLE
+                        },
+                        "state_key": "",
+                        "type": EventTypes.RoomHistoryVisibility,
+                    }
+                ],
+            },
+        )
+        # Ensure we're testing with a room with `world_readable` history visibility
+        # which means events are visible to anyone even without membership.
+        history_visibility_response = self.helper.get_state(
+            room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+        )
+        self.assertEqual(
+            history_visibility_response.get("history_visibility"),
+            HistoryVisibility.WORLD_READABLE,
+        )
+
+        self.helper.send(room_id1, "activity before1", tok=user2_tok)
+        self.helper.send(room_id1, "activity before2", tok=user2_tok)
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity after3", tok=user2_tok)
+        self.helper.send(room_id1, "activity after4", tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        # Large enough to see the latest events and before the invite
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # `timeline` is omitted for `invite` rooms with `stripped_state`
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("timeline"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("num_live"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("limited"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("prev_batch"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # We should have some `stripped_state` so the potential joiner can identify the
+        # room (we don't care about the order).
+        self.assertCountEqual(
+            channel.json_body["rooms"][room_id1]["invite_state"],
+            [
+                {
+                    "content": {"creator": user2_id, "room_version": "10"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.create",
+                },
+                {
+                    "content": {"join_rule": "public"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.join_rules",
+                },
+                {
+                    "content": {"displayname": user2.localpart, "membership": "join"},
+                    "sender": user2_id,
+                    "state_key": user2_id,
+                    "type": "m.room.member",
+                },
+                {
+                    "content": {"displayname": user1.localpart, "membership": "invite"},
+                    "sender": user2_id,
+                    "state_key": user1_id,
+                    "type": "m.room.member",
+                },
+            ],
+            channel.json_body["rooms"][room_id1]["invite_state"],
+        )
+
+    def test_rooms_invite_world_readable_history_incremental_sync(self) -> None:
+        """
+        Test that `rooms` we are invited to have some stripped `invite_state` during an
+        incremental sync.
+
+        This is an `invite` room so we should only have `stripped_state` (no `timeline`)
+        but depending on the semantics we decide, we could potentially see some
+        historical events before/after the `from_token` because the history is
+        `world_readable`. Same situation for events after the `from_token` if the
+        history visibility was set to `invited`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user1 = UserID.from_string(user1_id)
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user2 = UserID.from_string(user2_id)
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "preset": "public_chat",
+                "initial_state": [
+                    {
+                        "content": {
+                            "history_visibility": HistoryVisibility.WORLD_READABLE
+                        },
+                        "state_key": "",
+                        "type": EventTypes.RoomHistoryVisibility,
+                    }
+                ],
+            },
+        )
+        # Ensure we're testing with a room with `world_readable` history visibility
+        # which means events are visible to anyone even without membership.
+        history_visibility_response = self.helper.get_state(
+            room_id1, EventTypes.RoomHistoryVisibility, tok=user2_tok
+        )
+        self.assertEqual(
+            history_visibility_response.get("history_visibility"),
+            HistoryVisibility.WORLD_READABLE,
+        )
+
+        self.helper.send(room_id1, "activity before invite1", tok=user2_tok)
+        self.helper.send(room_id1, "activity before invite2", tok=user2_tok)
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity after invite3", tok=user2_tok)
+        self.helper.send(room_id1, "activity after invite4", tok=user2_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        self.helper.send(room_id1, "activity after token5", tok=user2_tok)
+        self.helper.send(room_id1, "activity after toekn6", tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        # Large enough to see the latest events and before the invite
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # `timeline` is omitted for `invite` rooms with `stripped_state`
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("timeline"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `num_live` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("num_live"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `limited` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("limited"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # `prev_batch` is omitted for `invite` rooms with `stripped_state` (no timeline anyway)
+        self.assertIsNone(
+            channel.json_body["rooms"][room_id1].get("prev_batch"),
+            channel.json_body["rooms"][room_id1],
+        )
+        # We should have some `stripped_state` so the potential joiner can identify the
+        # room (we don't care about the order).
+        self.assertCountEqual(
+            channel.json_body["rooms"][room_id1]["invite_state"],
+            [
+                {
+                    "content": {"creator": user2_id, "room_version": "10"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.create",
+                },
+                {
+                    "content": {"join_rule": "public"},
+                    "sender": user2_id,
+                    "state_key": "",
+                    "type": "m.room.join_rules",
+                },
+                {
+                    "content": {"displayname": user2.localpart, "membership": "join"},
+                    "sender": user2_id,
+                    "state_key": user2_id,
+                    "type": "m.room.member",
+                },
+                {
+                    "content": {"displayname": user1.localpart, "membership": "invite"},
+                    "sender": user2_id,
+                    "state_key": user1_id,
+                    "type": "m.room.member",
+                },
+            ],
+            channel.json_body["rooms"][room_id1]["invite_state"],
+        )
+
+    def test_rooms_ban_initial_sync(self) -> None:
+        """
+        Test that `rooms` we are banned from in an intial sync only allows us to see
+        timeline events up to the ban event.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity before1", tok=user2_tok)
+        self.helper.send(room_id1, "activity before2", tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
+        event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
+        user1_ban_response = self.helper.ban(
+            room_id1, src=user2_id, targ=user1_id, tok=user2_tok
+        )
+
+        self.helper.send(room_id1, "activity after5", tok=user2_tok)
+        self.helper.send(room_id1, "activity after6", tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 3,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should see events before the ban but not after
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                event_response3["event_id"],
+                event_response4["event_id"],
+                user1_ban_response["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        # No "live" events in an initial sync (no `from_token` to define the "live"
+        # range)
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+        # There are more events to paginate to
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            True,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_rooms_ban_incremental_sync1(self) -> None:
+        """
+        Test that `rooms` we are banned from during the next incremental sync only
+        allows us to see timeline events up to the ban event.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity before1", tok=user2_tok)
+        self.helper.send(room_id1, "activity before2", tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        event_response3 = self.helper.send(room_id1, "activity after3", tok=user2_tok)
+        event_response4 = self.helper.send(room_id1, "activity after4", tok=user2_tok)
+        # The ban is within the token range (between the `from_token` and the sliding
+        # sync request)
+        user1_ban_response = self.helper.ban(
+            room_id1, src=user2_id, targ=user1_id, tok=user2_tok
+        )
+
+        self.helper.send(room_id1, "activity after5", tok=user2_tok)
+        self.helper.send(room_id1, "activity after6", tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We should see events before the ban but not after
+        self.assertEqual(
+            [
+                event["event_id"]
+                for event in channel.json_body["rooms"][room_id1]["timeline"]
+            ],
+            [
+                event_response3["event_id"],
+                event_response4["event_id"],
+                user1_ban_response["event_id"],
+            ],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        # All live events in the incremental sync
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            3,
+            channel.json_body["rooms"][room_id1],
+        )
+        # There aren't anymore events to paginate to in this range
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            False,
+            channel.json_body["rooms"][room_id1],
+        )
+
+    def test_rooms_ban_incremental_sync2(self) -> None:
+        """
+        Test that `rooms` we are banned from before the incremental sync don't return
+        any events in the timeline.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.send(room_id1, "activity before1", tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        self.helper.send(room_id1, "activity after2", tok=user2_tok)
+        # The ban is before we get our `from_token`
+        self.helper.ban(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        self.helper.send(room_id1, "activity after3", tok=user2_tok)
+
+        from_token = self.event_sources.get_current_token()
+
+        self.helper.send(room_id1, "activity after4", tok=user2_tok)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint
+            + f"?pos={self.get_success(from_token.to_string(self.store))}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 1]],
+                        "required_state": [],
+                        "timeline_limit": 4,
+                    }
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Nothing to see for this banned user in the room in the token range
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["timeline"],
+            [],
+            channel.json_body["rooms"][room_id1]["timeline"],
+        )
+        # No events returned in the timeline so nothing is "live"
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["num_live"],
+            0,
+            channel.json_body["rooms"][room_id1],
+        )
+        # There aren't anymore events to paginate to in this range
+        self.assertEqual(
+            channel.json_body["rooms"][room_id1]["limited"],
+            False,
+            channel.json_body["rooms"][room_id1],
+        )
diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py
index f0ba40a1f1..e43140720d 100644
--- a/tests/rest/client/utils.py
+++ b/tests/rest/client/utils.py
@@ -261,9 +261,9 @@ class RestHelper:
         targ: str,
         expect_code: int = HTTPStatus.OK,
         tok: Optional[str] = None,
-    ) -> None:
+    ) -> JsonDict:
         """A convenience helper: `change_membership` with `membership` preset to "ban"."""
-        self.change_membership(
+        return self.change_membership(
             room=room,
             src=src,
             targ=targ,
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index fe1e873e15..aad46b1b44 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -21,20 +21,32 @@
 
 import logging
 from typing import List, Tuple
+from unittest.mock import AsyncMock, patch
 
 from immutabledict import immutabledict
 
 from twisted.test.proto_helpers import MemoryReactor
 
-from synapse.api.constants import Direction, EventTypes, RelationTypes
+from synapse.api.constants import Direction, EventTypes, Membership, RelationTypes
 from synapse.api.filtering import Filter
+from synapse.crypto.event_signing import add_hashes_and_signatures
+from synapse.events import FrozenEventV3
+from synapse.federation.federation_client import SendJoinResult
 from synapse.rest import admin
 from synapse.rest.client import login, room
 from synapse.server import HomeServer
-from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
+from synapse.storage.databases.main.stream import CurrentStateDeltaMembership
+from synapse.types import (
+    JsonDict,
+    PersistedEventPosition,
+    RoomStreamToken,
+    UserID,
+    create_requester,
+)
 from synapse.util import Clock
 
-from tests.unittest import HomeserverTestCase
+from tests.test_utils.event_injection import create_event
+from tests.unittest import FederatingHomeserverTestCase, HomeserverTestCase
 
 logger = logging.getLogger(__name__)
 
@@ -543,3 +555,859 @@ class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
                 }
             ),
         )
+
+
+class GetCurrentStateDeltaMembershipChangesForUserTestCase(HomeserverTestCase):
+    """
+    Test `get_current_state_delta_membership_changes_for_user(...)`
+    """
+
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+        self.state_handler = self.hs.get_state_handler()
+        persistence = hs.get_storage_controllers().persistence
+        assert persistence is not None
+        self.persistence = persistence
+
+    def test_returns_membership_events(self) -> None:
+        """
+        A basic test that a membership event in the token range is returned for the user.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_pos = self.get_success(
+            self.store.get_position_for_event(join_response["event_id"])
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_room1_token.room_key,
+                to_key=after_room1_token.room_key,
+            )
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=join_response["event_id"],
+                    event_pos=join_pos,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                )
+            ],
+        )
+
+    def test_server_left_room_after_us(self) -> None:
+        """
+        Test that when probing over part of the DAG where the server left the room *after
+        us*, we still see the join and leave changes.
+
+        This is to make sure we play nicely with this behavior: When the server leaves a
+        room, it will insert new rows with `event_id = null` into the
+        `current_state_delta_stream` table for all current state.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "power_level_content_override": {
+                    "users": {
+                        user2_id: 100,
+                        # Allow user1 to send state in the room
+                        user1_id: 100,
+                    }
+                }
+            },
+        )
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_pos1 = self.get_success(
+            self.store.get_position_for_event(join_response1["event_id"])
+        )
+        # Make sure that random other non-member state that happens to have a `state_key`
+        # matching the user ID doesn't mess with things.
+        self.helper.send_state(
+            room_id1,
+            event_type="foobarbazdummy",
+            state_key=user1_id,
+            body={"foo": "bar"},
+            tok=user1_tok,
+        )
+        # User1 should leave the room first
+        leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_pos1 = self.get_success(
+            self.store.get_position_for_event(leave_response1["event_id"])
+        )
+
+        # User2 should also leave the room (everyone has left the room which means the
+        # server is no longer in the room).
+        self.helper.leave(room_id1, user2_id, tok=user2_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Get the membership changes for the user.
+        #
+        # At this point, the `current_state_delta_stream` table should look like the
+        # following. When the server leaves a room, it will insert new rows with
+        # `event_id = null` for all current state.
+        #
+        # | stream_id | room_id  | type                        | state_key      | event_id | prev_event_id |
+        # |-----------|----------|-----------------------------|----------------|----------|---------------|
+        # | 2         | !x:test  | 'm.room.create'             | ''             | $xxx     | None          |
+        # | 3         | !x:test  | 'm.room.member'             | '@user2:test'  | $aaa     | None          |
+        # | 4         | !x:test  | 'm.room.history_visibility' | ''             | $xxx     | None          |
+        # | 4         | !x:test  | 'm.room.join_rules'         | ''             | $xxx     | None          |
+        # | 4         | !x:test  | 'm.room.power_levels'       | ''             | $xxx     | None          |
+        # | 7         | !x:test  | 'm.room.member'             | '@user1:test'  | $ooo     | None          |
+        # | 8         | !x:test  | 'foobarbazdummy'            | '@user1:test'  | $xxx     | None          |
+        # | 9         | !x:test  | 'm.room.member'             | '@user1:test'  | $ppp     | $ooo          |
+        # | 10        | !x:test  | 'foobarbazdummy'            | '@user1:test'  | None     | $xxx          |
+        # | 10        | !x:test  | 'm.room.create'             | ''             | None     | $xxx          |
+        # | 10        | !x:test  | 'm.room.history_visibility' | ''             | None     | $xxx          |
+        # | 10        | !x:test  | 'm.room.join_rules'         | ''             | None     | $xxx          |
+        # | 10        | !x:test  | 'm.room.member'             | '@user1:test'  | None     | $ppp          |
+        # | 10        | !x:test  | 'm.room.member'             | '@user2:test'  | None     | $aaa          |
+        # | 10        | !x:test  | 'm.room.power_levels'       |                | None     | $xxx          |
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_room1_token.room_key,
+                to_key=after_room1_token.room_key,
+            )
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=join_response1["event_id"],
+                    event_pos=join_pos1,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                ),
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=leave_response1["event_id"],
+                    event_pos=leave_pos1,
+                    membership="leave",
+                    sender=user1_id,
+                    prev_event_id=join_response1["event_id"],
+                    prev_event_pos=join_pos1,
+                    prev_membership="join",
+                    prev_sender=user1_id,
+                ),
+            ],
+        )
+
+    def test_server_left_room_after_us_later(self) -> None:
+        """
+        Test when the user leaves the room, then sometime later, everyone else leaves
+        the room, causing the server to leave the room, we shouldn't see any membership
+        changes.
+
+        This is to make sure we play nicely with this behavior: When the server leaves a
+        room, it will insert new rows with `event_id = null` into the
+        `current_state_delta_stream` table for all current state.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # User1 should leave the room first
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        after_user1_leave_token = self.event_sources.get_current_token()
+
+        # User2 should also leave the room (everyone has left the room which means the
+        # server is no longer in the room).
+        self.helper.leave(room_id1, user2_id, tok=user2_tok)
+
+        after_server_leave_token = self.event_sources.get_current_token()
+
+        # Join another room as user1 just to advance the stream_ordering and bust
+        # `_membership_stream_cache`
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+        # Get the membership changes for the user.
+        #
+        # At this point, the `current_state_delta_stream` table should look like the
+        # following. When the server leaves a room, it will insert new rows with
+        # `event_id = null` for all current state.
+        #
+        # TODO: Add DB rows to better see what's going on.
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=after_user1_leave_token.room_key,
+                to_key=after_server_leave_token.room_key,
+            )
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [],
+        )
+
+    def test_we_cause_server_left_room(self) -> None:
+        """
+        Test that when probing over part of the DAG where the user leaves the room
+        causing the server to leave the room (because we were the last local user in the
+        room), we still see the join and leave changes.
+
+        This is to make sure we play nicely with this behavior: When the server leaves a
+        room, it will insert new rows with `event_id = null` into the
+        `current_state_delta_stream` table for all current state.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        room_id1 = self.helper.create_room_as(
+            user2_id,
+            tok=user2_tok,
+            extra_content={
+                "power_level_content_override": {
+                    "users": {
+                        user2_id: 100,
+                        # Allow user1 to send state in the room
+                        user1_id: 100,
+                    }
+                }
+            },
+        )
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_pos1 = self.get_success(
+            self.store.get_position_for_event(join_response1["event_id"])
+        )
+        # Make sure that random other non-member state that happens to have a `state_key`
+        # matching the user ID doesn't mess with things.
+        self.helper.send_state(
+            room_id1,
+            event_type="foobarbazdummy",
+            state_key=user1_id,
+            body={"foo": "bar"},
+            tok=user1_tok,
+        )
+
+        # User2 should leave the room first.
+        self.helper.leave(room_id1, user2_id, tok=user2_tok)
+
+        # User1 (the person we're testing with) should also leave the room (everyone has
+        # left the room which means the server is no longer in the room).
+        leave_response1 = self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        leave_pos1 = self.get_success(
+            self.store.get_position_for_event(leave_response1["event_id"])
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Get the membership changes for the user.
+        #
+        # At this point, the `current_state_delta_stream` table should look like the
+        # following. When the server leaves a room, it will insert new rows with
+        # `event_id = null` for all current state.
+        #
+        # | stream_id | room_id   | type                        | state_key     | event_id | prev_event_id |
+        # |-----------|-----------|-----------------------------|---------------|----------|---------------|
+        # | 2         | '!x:test' | 'm.room.create'             | ''            | '$xxx'   | None          |
+        # | 3         | '!x:test' | 'm.room.member'             | '@user2:test' | '$aaa'   | None          |
+        # | 4         | '!x:test' | 'm.room.history_visibility' | ''            | '$xxx'   | None          |
+        # | 4         | '!x:test' | 'm.room.join_rules'         | ''            | '$xxx'   | None          |
+        # | 4         | '!x:test' | 'm.room.power_levels'       | ''            | '$xxx'   | None          |
+        # | 7         | '!x:test' | 'm.room.member'             | '@user1:test' | '$ooo'   | None          |
+        # | 8         | '!x:test' | 'foobarbazdummy'            | '@user1:test' | '$xxx'   | None          |
+        # | 9         | '!x:test' | 'm.room.member'             | '@user2:test' | '$bbb'   | '$aaa'        |
+        # | 10        | '!x:test' | 'foobarbazdummy'            | '@user1:test' | None     | '$xxx'        |
+        # | 10        | '!x:test' | 'm.room.create'             | ''            | None     | '$xxx'        |
+        # | 10        | '!x:test' | 'm.room.history_visibility' | ''            | None     | '$xxx'        |
+        # | 10        | '!x:test' | 'm.room.join_rules'         | ''            | None     | '$xxx'        |
+        # | 10        | '!x:test' | 'm.room.member'             | '@user1:test' | None     | '$ooo'        |
+        # | 10        | '!x:test' | 'm.room.member'             | '@user2:test' | None     | '$bbb'        |
+        # | 10        | '!x:test' | 'm.room.power_levels'       | ''            | None     | '$xxx'        |
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_room1_token.room_key,
+                to_key=after_room1_token.room_key,
+            )
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=join_response1["event_id"],
+                    event_pos=join_pos1,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                ),
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=None,  # leave_response1["event_id"],
+                    event_pos=leave_pos1,
+                    membership="leave",
+                    sender=None,  # user1_id,
+                    prev_event_id=join_response1["event_id"],
+                    prev_event_pos=join_pos1,
+                    prev_membership="join",
+                    prev_sender=user1_id,
+                ),
+            ],
+        )
+
+    def test_different_user_membership_persisted_in_same_batch(self) -> None:
+        """
+        Test batch of membership events from different users being processed at once.
+        This will result in all of the memberships being stored in the
+        `current_state_delta_stream` table with the same `stream_ordering` even though
+        the individual events have different `stream_ordering`s.
+        """
+        user1_id = self.register_user("user1", "pass")
+        _user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+        user3_id = self.register_user("user3", "pass")
+        _user3_tok = self.login(user3_id, "pass")
+        user4_id = self.register_user("user4", "pass")
+        _user4_tok = self.login(user4_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # User2 is just the designated person to create the room (we do this across the
+        # tests to be consistent)
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+
+        # Persist the user1, user3, and user4 join events in the same batch so they all
+        # end up in the `current_state_delta_stream` table with the same
+        # stream_ordering.
+        join_event3, join_event_context3 = self.get_success(
+            create_event(
+                self.hs,
+                sender=user3_id,
+                type=EventTypes.Member,
+                state_key=user3_id,
+                content={"membership": "join"},
+                room_id=room_id1,
+            )
+        )
+        # We want to put user1 in the middle of the batch. This way, regardless of the
+        # implementation that inserts rows into current_state_delta_stream` (whether it
+        # be minimum/maximum of stream position of the batch), we will still catch bugs.
+        join_event1, join_event_context1 = self.get_success(
+            create_event(
+                self.hs,
+                sender=user1_id,
+                type=EventTypes.Member,
+                state_key=user1_id,
+                content={"membership": "join"},
+                room_id=room_id1,
+            )
+        )
+        join_event4, join_event_context4 = self.get_success(
+            create_event(
+                self.hs,
+                sender=user4_id,
+                type=EventTypes.Member,
+                state_key=user4_id,
+                content={"membership": "join"},
+                room_id=room_id1,
+            )
+        )
+        self.get_success(
+            self.persistence.persist_events(
+                [
+                    (join_event3, join_event_context3),
+                    (join_event1, join_event_context1),
+                    (join_event4, join_event_context4),
+                ]
+            )
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Get the membership changes for the user.
+        #
+        # At this point, the `current_state_delta_stream` table should look like (notice
+        # those three memberships at the end with `stream_id=7` because we persisted
+        # them in the same batch):
+        #
+        # | stream_id | room_id   | type                       | state_key        | event_id | prev_event_id |
+        # |-----------|-----------|----------------------------|------------------|----------|---------------|
+        # | 2         | '!x:test' | 'm.room.create'            | ''               | '$xxx'   | None          |
+        # | 3         | '!x:test' | 'm.room.member'            | '@user2:test'    | '$xxx'   | None          |
+        # | 4         | '!x:test' | 'm.room.history_visibility'| ''               | '$xxx'   | None          |
+        # | 4         | '!x:test' | 'm.room.join_rules'        | ''               | '$xxx'   | None          |
+        # | 4         | '!x:test' | 'm.room.power_levels'      | ''               | '$xxx'   | None          |
+        # | 7         | '!x:test' | 'm.room.member'            | '@user3:test'    | '$xxx'   | None          |
+        # | 7         | '!x:test' | 'm.room.member'            | '@user1:test'    | '$xxx'   | None          |
+        # | 7         | '!x:test' | 'm.room.member'            | '@user4:test'    | '$xxx'   | None          |
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_room1_token.room_key,
+                to_key=after_room1_token.room_key,
+            )
+        )
+
+        join_pos3 = self.get_success(
+            self.store.get_position_for_event(join_event3.event_id)
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=join_event1.event_id,
+                    # Ideally, this would be `join_pos1` (to match the `event_id`) but
+                    # when events are persisted in a batch, they are all stored in the
+                    # `current_state_delta_stream` table with the minimum
+                    # `stream_ordering` from the batch.
+                    event_pos=join_pos3,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                ),
+            ],
+        )
+
+    def test_state_reset(self) -> None:
+        """
+        Test a state reset scenario where the user gets removed from the room (when
+        there is no corresponding leave event)
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_pos1 = self.get_success(
+            self.store.get_position_for_event(join_response1["event_id"])
+        )
+
+        before_reset_token = self.event_sources.get_current_token()
+
+        # Send another state event to make a position for the state reset to happen at
+        dummy_state_response = self.helper.send_state(
+            room_id1,
+            event_type="foobarbaz",
+            state_key="",
+            body={"foo": "bar"},
+            tok=user2_tok,
+        )
+        dummy_state_pos = self.get_success(
+            self.store.get_position_for_event(dummy_state_response["event_id"])
+        )
+
+        # Mock a state reset removing the membership for user1 in the current state
+        self.get_success(
+            self.store.db_pool.simple_delete(
+                table="current_state_events",
+                keyvalues={
+                    "room_id": room_id1,
+                    "type": EventTypes.Member,
+                    "state_key": user1_id,
+                },
+                desc="state reset user in current_state_delta_stream",
+            )
+        )
+        self.get_success(
+            self.store.db_pool.simple_insert(
+                table="current_state_delta_stream",
+                values={
+                    "stream_id": dummy_state_pos.stream,
+                    "room_id": room_id1,
+                    "type": EventTypes.Member,
+                    "state_key": user1_id,
+                    "event_id": None,
+                    "prev_event_id": join_response1["event_id"],
+                    "instance_name": dummy_state_pos.instance_name,
+                },
+                desc="state reset user in current_state_delta_stream",
+            )
+        )
+
+        # Manually bust the cache since we we're just manually messing with the database
+        # and not causing an actual state reset.
+        self.store._membership_stream_cache.entity_has_changed(
+            user1_id, dummy_state_pos.stream
+        )
+
+        after_reset_token = self.event_sources.get_current_token()
+
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_reset_token.room_key,
+                to_key=after_reset_token.room_key,
+            )
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=None,
+                    event_pos=dummy_state_pos,
+                    membership="leave",
+                    sender=None,  # user1_id,
+                    prev_event_id=join_response1["event_id"],
+                    prev_event_pos=join_pos1,
+                    prev_membership="join",
+                    prev_sender=user1_id,
+                ),
+            ],
+        )
+
+    def test_excluded_room_ids(self) -> None:
+        """
+        Test that the `excluded_room_ids` option excludes changes from the specified rooms.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_pos1 = self.get_success(
+            self.store.get_position_for_event(join_response1["event_id"])
+        )
+
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok)
+        join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
+        join_pos2 = self.get_success(
+            self.store.get_position_for_event(join_response2["event_id"])
+        )
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # First test the the room is returned without the `excluded_room_ids` option
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_room1_token.room_key,
+                to_key=after_room1_token.room_key,
+            )
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=join_response1["event_id"],
+                    event_pos=join_pos1,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                ),
+                CurrentStateDeltaMembership(
+                    room_id=room_id2,
+                    event_id=join_response2["event_id"],
+                    event_pos=join_pos2,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                ),
+            ],
+        )
+
+        # The test that `excluded_room_ids` excludes room2 as expected
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_room1_token.room_key,
+                to_key=after_room1_token.room_key,
+                excluded_room_ids=[room_id2],
+            )
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=room_id1,
+                    event_id=join_response1["event_id"],
+                    event_pos=join_pos1,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                )
+            ],
+        )
+
+
+class GetCurrentStateDeltaMembershipChangesForUserFederationTestCase(
+    FederatingHomeserverTestCase
+):
+    """
+    Test `get_current_state_delta_membership_changes_for_user(...)` when joining remote federated rooms.
+    """
+
+    servlets = [
+        admin.register_servlets_for_client_rest_resource,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        self.store = self.hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+        self.room_member_handler = hs.get_room_member_handler()
+
+    def test_remote_join(self) -> None:
+        """
+        Test remote join where the first rows in `current_state_delta_stream` will just
+        be the state when you joined the remote room.
+        """
+        user1_id = self.register_user("user1", "pass")
+        _user1_tok = self.login(user1_id, "pass")
+
+        before_join_token = self.event_sources.get_current_token()
+
+        intially_unjoined_room_id = f"!example:{self.OTHER_SERVER_NAME}"
+
+        # Remotely join a room on another homeserver.
+        #
+        # To do this we have to mock the responses from the remote homeserver. We also
+        # patch out a bunch of event checks on our end.
+        create_event_source = {
+            "auth_events": [],
+            "content": {
+                "creator": f"@creator:{self.OTHER_SERVER_NAME}",
+                "room_version": self.hs.config.server.default_room_version.identifier,
+            },
+            "depth": 0,
+            "origin_server_ts": 0,
+            "prev_events": [],
+            "room_id": intially_unjoined_room_id,
+            "sender": f"@creator:{self.OTHER_SERVER_NAME}",
+            "state_key": "",
+            "type": EventTypes.Create,
+        }
+        self.add_hashes_and_signatures_from_other_server(
+            create_event_source,
+            self.hs.config.server.default_room_version,
+        )
+        create_event = FrozenEventV3(
+            create_event_source,
+            self.hs.config.server.default_room_version,
+            {},
+            None,
+        )
+        creator_join_event_source = {
+            "auth_events": [create_event.event_id],
+            "content": {
+                "membership": "join",
+            },
+            "depth": 1,
+            "origin_server_ts": 1,
+            "prev_events": [],
+            "room_id": intially_unjoined_room_id,
+            "sender": f"@creator:{self.OTHER_SERVER_NAME}",
+            "state_key": f"@creator:{self.OTHER_SERVER_NAME}",
+            "type": EventTypes.Member,
+        }
+        self.add_hashes_and_signatures_from_other_server(
+            creator_join_event_source,
+            self.hs.config.server.default_room_version,
+        )
+        creator_join_event = FrozenEventV3(
+            creator_join_event_source,
+            self.hs.config.server.default_room_version,
+            {},
+            None,
+        )
+
+        # Our local user is going to remote join the room
+        join_event_source = {
+            "auth_events": [create_event.event_id],
+            "content": {"membership": "join"},
+            "depth": 1,
+            "origin_server_ts": 100,
+            "prev_events": [creator_join_event.event_id],
+            "sender": user1_id,
+            "state_key": user1_id,
+            "room_id": intially_unjoined_room_id,
+            "type": EventTypes.Member,
+        }
+        add_hashes_and_signatures(
+            self.hs.config.server.default_room_version,
+            join_event_source,
+            self.hs.hostname,
+            self.hs.signing_key,
+        )
+        join_event = FrozenEventV3(
+            join_event_source,
+            self.hs.config.server.default_room_version,
+            {},
+            None,
+        )
+
+        mock_make_membership_event = AsyncMock(
+            return_value=(
+                self.OTHER_SERVER_NAME,
+                join_event,
+                self.hs.config.server.default_room_version,
+            )
+        )
+        mock_send_join = AsyncMock(
+            return_value=SendJoinResult(
+                join_event,
+                self.OTHER_SERVER_NAME,
+                state=[create_event, creator_join_event],
+                auth_chain=[create_event, creator_join_event],
+                partial_state=False,
+                servers_in_room=frozenset(),
+            )
+        )
+
+        with patch.object(
+            self.room_member_handler.federation_handler.federation_client,
+            "make_membership_event",
+            mock_make_membership_event,
+        ), patch.object(
+            self.room_member_handler.federation_handler.federation_client,
+            "send_join",
+            mock_send_join,
+        ), patch(
+            "synapse.event_auth._is_membership_change_allowed",
+            return_value=None,
+        ), patch(
+            "synapse.handlers.federation_event.check_state_dependent_auth_rules",
+            return_value=None,
+        ):
+            self.get_success(
+                self.room_member_handler.update_membership(
+                    requester=create_requester(user1_id),
+                    target=UserID.from_string(user1_id),
+                    room_id=intially_unjoined_room_id,
+                    action=Membership.JOIN,
+                    remote_room_hosts=[self.OTHER_SERVER_NAME],
+                )
+            )
+
+        after_join_token = self.event_sources.get_current_token()
+
+        # Get the membership changes for the user.
+        #
+        # At this point, the `current_state_delta_stream` table should look like the
+        # following. Notice that all of the events are at the same `stream_id` because
+        # the current state starts out where we remotely joined:
+        #
+        # | stream_id | room_id                      | type            | state_key                    | event_id | prev_event_id |
+        # |-----------|------------------------------|-----------------|------------------------------|----------|---------------|
+        # | 2         | '!example:other.example.com' | 'm.room.member' | '@user1:test'                | '$xxx'   | None          |
+        # | 2         | '!example:other.example.com' | 'm.room.create' | ''                           | '$xxx'   | None          |
+        # | 2         | '!example:other.example.com' | 'm.room.member' | '@creator:other.example.com' | '$xxx'   | None          |
+        membership_changes = self.get_success(
+            self.store.get_current_state_delta_membership_changes_for_user(
+                user1_id,
+                from_key=before_join_token.room_key,
+                to_key=after_join_token.room_key,
+            )
+        )
+
+        join_pos = self.get_success(
+            self.store.get_position_for_event(join_event.event_id)
+        )
+
+        # Let the whole diff show on failure
+        self.maxDiff = None
+        self.assertEqual(
+            membership_changes,
+            [
+                CurrentStateDeltaMembership(
+                    room_id=intially_unjoined_room_id,
+                    event_id=join_event.event_id,
+                    event_pos=join_pos,
+                    membership="join",
+                    sender=user1_id,
+                    prev_event_id=None,
+                    prev_event_pos=None,
+                    prev_membership=None,
+                    prev_sender=None,
+                ),
+            ],
+        )