summary refs log tree commit diff
path: root/tests
diff options
context:
space:
mode:
Diffstat (limited to 'tests')
-rw-r--r--tests/events/test_utils.py3
-rw-r--r--tests/federation/test_federation_media.py234
-rw-r--r--tests/handlers/test_oauth_delegation.py2
-rw-r--r--tests/handlers/test_sliding_sync.py1246
-rw-r--r--tests/handlers/test_user_directory.py39
-rw-r--r--tests/media/test_media_storage.py239
-rw-r--r--tests/push/test_email.py37
-rw-r--r--tests/replication/storage/test_events.py13
-rw-r--r--tests/rest/admin/test_admin.py4
-rw-r--r--tests/rest/admin/test_event_reports.py6
-rw-r--r--tests/rest/client/test_account.py28
-rw-r--r--tests/rest/client/test_keys.py65
-rw-r--r--tests/rest/client/test_models.py2
-rw-r--r--tests/rest/client/test_reporting.py (renamed from tests/rest/client/test_report_event.py)93
-rw-r--r--tests/rest/client/test_retention.py1
-rw-r--r--tests/rest/client/test_sync.py261
-rw-r--r--tests/rest/client/utils.py5
-rw-r--r--tests/storage/test_event_chain.py1
-rw-r--r--tests/storage/test_stream.py269
-rw-r--r--tests/storage/test_user_directory.py4
-rw-r--r--tests/test_visibility.py10
21 files changed, 2456 insertions, 106 deletions
diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py
index d5ac66a6ed..30f8787758 100644
--- a/tests/events/test_utils.py
+++ b/tests/events/test_utils.py
@@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
         )
         original.internal_metadata.stream_ordering = 1234
         self.assertEqual(original.internal_metadata.stream_ordering, 1234)
+        original.internal_metadata.instance_name = "worker1"
+        self.assertEqual(original.internal_metadata.instance_name, "worker1")
 
         cloned = clone_event(original)
         cloned.unsigned["b"] = 3
@@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase):
         self.assertEqual(original.unsigned, {"a": 1, "b": 2})
         self.assertEqual(cloned.unsigned, {"a": 1, "b": 3})
         self.assertEqual(cloned.internal_metadata.stream_ordering, 1234)
+        self.assertEqual(cloned.internal_metadata.instance_name, "worker1")
         self.assertEqual(cloned.internal_metadata.txn_id, "txn")
 
 
diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py
new file mode 100644
index 0000000000..1c89d19e99
--- /dev/null
+++ b/tests/federation/test_federation_media.py
@@ -0,0 +1,234 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+import io
+import os
+import shutil
+import tempfile
+from typing import Optional
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.media._base import FileInfo, Responder
+from synapse.media.filepath import MediaFilePaths
+from synapse.media.media_storage import MediaStorage
+from synapse.media.storage_provider import (
+    FileStorageProviderBackend,
+    StorageProviderWrapper,
+)
+from synapse.server import HomeServer
+from synapse.storage.databases.main.media_repository import LocalMedia
+from synapse.types import JsonDict, UserID
+from synapse.util import Clock
+
+from tests import unittest
+from tests.test_utils import SMALL_PNG
+from tests.unittest import override_config
+
+
+class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase):
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        super().prepare(reactor, clock, hs)
+        self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-")
+        self.addCleanup(shutil.rmtree, self.test_dir)
+        self.primary_base_path = os.path.join(self.test_dir, "primary")
+        self.secondary_base_path = os.path.join(self.test_dir, "secondary")
+
+        hs.config.media.media_store_path = self.primary_base_path
+
+        storage_providers = [
+            StorageProviderWrapper(
+                FileStorageProviderBackend(hs, self.secondary_base_path),
+                store_local=True,
+                store_remote=False,
+                store_synchronous=True,
+            )
+        ]
+
+        self.filepaths = MediaFilePaths(self.primary_base_path)
+        self.media_storage = MediaStorage(
+            hs, self.primary_base_path, self.filepaths, storage_providers
+        )
+        self.media_repo = hs.get_media_repository()
+
+    @override_config(
+        {"experimental_features": {"msc3916_authenticated_media_enabled": True}}
+    )
+    def test_file_download(self) -> None:
+        content = io.BytesIO(b"file_to_stream")
+        content_uri = self.get_success(
+            self.media_repo.create_content(
+                "text/plain",
+                "test_upload",
+                content,
+                46,
+                UserID.from_string("@user_id:whatever.org"),
+            )
+        )
+        # test with a text file
+        channel = self.make_signed_federation_request(
+            "GET",
+            f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
+        )
+        self.pump()
+        self.assertEqual(200, channel.code)
+
+        content_type = channel.headers.getRawHeaders("content-type")
+        assert content_type is not None
+        assert "multipart/mixed" in content_type[0]
+        assert "boundary" in content_type[0]
+
+        # extract boundary
+        boundary = content_type[0].split("boundary=")[1]
+        # split on boundary and check that json field and expected value exist
+        stripped = channel.text_body.split("\r\n" + "--" + boundary)
+        # TODO: the json object expected will change once MSC3911 is implemented, currently
+        # {} is returned for all requests as a placeholder (per MSC3196)
+        found_json = any(
+            "\r\nContent-Type: application/json\r\n{}" in field for field in stripped
+        )
+        self.assertTrue(found_json)
+
+        # check that text file and expected value exist
+        found_file = any(
+            "\r\nContent-Type: text/plain\r\nfile_to_stream" in field
+            for field in stripped
+        )
+        self.assertTrue(found_file)
+
+        content = io.BytesIO(SMALL_PNG)
+        content_uri = self.get_success(
+            self.media_repo.create_content(
+                "image/png",
+                "test_png_upload",
+                content,
+                67,
+                UserID.from_string("@user_id:whatever.org"),
+            )
+        )
+        # test with an image file
+        channel = self.make_signed_federation_request(
+            "GET",
+            f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
+        )
+        self.pump()
+        self.assertEqual(200, channel.code)
+
+        content_type = channel.headers.getRawHeaders("content-type")
+        assert content_type is not None
+        assert "multipart/mixed" in content_type[0]
+        assert "boundary" in content_type[0]
+
+        # extract boundary
+        boundary = content_type[0].split("boundary=")[1]
+        # split on boundary and check that json field and expected value exist
+        body = channel.result.get("body")
+        assert body is not None
+        stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8"))
+        found_json = any(
+            b"\r\nContent-Type: application/json\r\n{}" in field
+            for field in stripped_bytes
+        )
+        self.assertTrue(found_json)
+
+        # check that png file exists and matches what was uploaded
+        found_file = any(SMALL_PNG in field for field in stripped_bytes)
+        self.assertTrue(found_file)
+
+    @override_config(
+        {"experimental_features": {"msc3916_authenticated_media_enabled": False}}
+    )
+    def test_disable_config(self) -> None:
+        content = io.BytesIO(b"file_to_stream")
+        content_uri = self.get_success(
+            self.media_repo.create_content(
+                "text/plain",
+                "test_upload",
+                content,
+                46,
+                UserID.from_string("@user_id:whatever.org"),
+            )
+        )
+        channel = self.make_signed_federation_request(
+            "GET",
+            f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}",
+        )
+        self.pump()
+        self.assertEqual(404, channel.code)
+        self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED")
+
+
+class FakeFileStorageProviderBackend:
+    """
+    Fake storage provider stub with incompatible `fetch` signature for testing
+    """
+
+    def __init__(self, hs: "HomeServer", config: str):
+        self.hs = hs
+        self.cache_directory = hs.config.media.media_store_path
+        self.base_directory = config
+
+    def __str__(self) -> str:
+        return "FakeFileStorageProviderBackend[%s]" % (self.base_directory,)
+
+    async def fetch(
+        self, path: str, file_info: FileInfo, media_info: Optional[LocalMedia] = None
+    ) -> Optional[Responder]:
+        pass
+
+
+TEST_DIR = tempfile.mkdtemp(prefix="synapse-tests-")
+
+
+class FederationUnstableMediaEndpointCompatibilityTest(
+    unittest.FederatingHomeserverTestCase
+):
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        super().prepare(reactor, clock, hs)
+        self.test_dir = TEST_DIR
+        self.addCleanup(shutil.rmtree, self.test_dir)
+        self.media_repo = hs.get_media_repository()
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        primary_base_path = os.path.join(TEST_DIR, "primary")
+        config["media_storage_providers"] = [
+            {
+                "module": "tests.federation.test_federation_media.FakeFileStorageProviderBackend",
+                "store_local": "True",
+                "store_remote": "False",
+                "store_synchronous": "False",
+                "config": {"directory": primary_base_path},
+            }
+        ]
+        return config
+
+    @override_config(
+        {"experimental_features": {"msc3916_authenticated_media_enabled": True}}
+    )
+    def test_incompatible_storage_provider_fails_to_load_endpoint(self) -> None:
+        channel = self.make_signed_federation_request(
+            "GET",
+            "/_matrix/federation/unstable/org.matrix.msc3916/media/download/xyz",
+        )
+        self.pump()
+        self.assertEqual(404, channel.code)
+        self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED")
diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py
index 9387d07de8..036c539db2 100644
--- a/tests/handlers/test_oauth_delegation.py
+++ b/tests/handlers/test_oauth_delegation.py
@@ -541,6 +541,8 @@ class MSC3861OAuthDelegation(HomeserverTestCase):
 
         self.assertEqual(channel.code, 200, channel.json_body)
 
+        # Try uploading *different* keys; it should cause a 501 error.
+        keys_upload_body = self.make_device_keys(USER_ID, DEVICE)
         channel = self.make_request(
             "POST",
             "/_matrix/client/v3/keys/device_signing/upload",
diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py
new file mode 100644
index 0000000000..62fe1214fe
--- /dev/null
+++ b/tests/handlers/test_sliding_sync.py
@@ -0,0 +1,1246 @@
+#
+# This file is licensed under the Affero General Public License (AGPL) version 3.
+#
+# Copyright (C) 2024 New Vector, Ltd
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License as
+# published by the Free Software Foundation, either version 3 of the
+# License, or (at your option) any later version.
+#
+# See the GNU Affero General Public License for more details:
+# <https://www.gnu.org/licenses/agpl-3.0.html>.
+#
+# Originally licensed under the Apache License, Version 2.0:
+# <http://www.apache.org/licenses/LICENSE-2.0>.
+#
+# [This file includes modifications made by New Vector Limited]
+#
+#
+import logging
+from unittest.mock import patch
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
+from synapse.api.room_versions import RoomVersions
+from synapse.handlers.sliding_sync import SlidingSyncConfig
+from synapse.rest import admin
+from synapse.rest.client import knock, login, room
+from synapse.server import HomeServer
+from synapse.storage.util.id_generators import MultiWriterIdGenerator
+from synapse.types import JsonDict, UserID
+from synapse.util import Clock
+
+from tests.replication._base import BaseMultiWorkerStreamTestCase
+from tests.unittest import HomeserverTestCase
+
+logger = logging.getLogger(__name__)
+
+
+class GetSyncRoomIdsForUserTestCase(HomeserverTestCase):
+    """
+    Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns
+    the correct list of rooms IDs.
+    """
+
+    servlets = [
+        admin.register_servlets,
+        knock.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        self.store = self.hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+
+    def test_no_rooms(self) -> None:
+        """
+        Test when the user has never joined any rooms before
+        """
+        user1_id = self.register_user("user1", "pass")
+        # user1_tok = self.login(user1_id, "pass")
+
+        now_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=now_token,
+                to_token=now_token,
+            )
+        )
+
+        self.assertEqual(room_id_results, set())
+
+    def test_get_newly_joined_room(self) -> None:
+        """
+        Test that rooms that the user has newly_joined show up. newly_joined is when you
+        join after the `from_token` and <= `to_token`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        before_room_token = self.event_sources.get_current_token()
+
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room_token,
+                to_token=after_room_token,
+            )
+        )
+
+        self.assertEqual(room_id_results, {room_id})
+
+    def test_get_already_joined_room(self) -> None:
+        """
+        Test that rooms that the user is already joined show up.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room_token,
+                to_token=after_room_token,
+            )
+        )
+
+        self.assertEqual(room_id_results, {room_id})
+
+    def test_get_invited_banned_knocked_room(self) -> None:
+        """
+        Test that rooms that the user is invited to, banned from, and knocked on show
+        up.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room_token = self.event_sources.get_current_token()
+
+        # Setup the invited room (user2 invites user1 to the room)
+        invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok)
+        self.helper.invite(invited_room_id, targ=user1_id, tok=user2_tok)
+
+        # Setup the ban room (user2 bans user1 from the room)
+        ban_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(ban_room_id, user1_id, tok=user1_tok)
+        self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        # Setup the knock room (user1 knocks on the room)
+        knock_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, room_version=RoomVersions.V7.identifier
+        )
+        self.helper.send_state(
+            knock_room_id,
+            EventTypes.JoinRules,
+            {"join_rule": JoinRules.KNOCK},
+            tok=user2_tok,
+        )
+        # User1 knocks on the room
+        channel = self.make_request(
+            "POST",
+            "/_matrix/client/r0/knock/%s" % (knock_room_id,),
+            b"{}",
+            user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room_token,
+                to_token=after_room_token,
+            )
+        )
+
+        # Ensure that the invited, ban, and knock rooms show up
+        self.assertEqual(
+            room_id_results,
+            {
+                invited_room_id,
+                ban_room_id,
+                knock_room_id,
+            },
+        )
+
+    def test_get_kicked_room(self) -> None:
+        """
+        Test that a room that the user was kicked from still shows up. When the user
+        comes back to their client, they should see that they were kicked.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup the kick room (user2 kicks user1 from the room)
+        kick_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+        # Kick user1 from the room
+        self.helper.change_membership(
+            room=kick_room_id,
+            src=user2_id,
+            targ=user1_id,
+            tok=user2_tok,
+            membership=Membership.LEAVE,
+            extra_data={
+                "reason": "Bad manners",
+            },
+        )
+
+        after_kick_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_kick_token,
+                to_token=after_kick_token,
+            )
+        )
+
+        # The kicked room should show up
+        self.assertEqual(room_id_results, {kick_room_id})
+
+    def test_forgotten_rooms(self) -> None:
+        """
+        Forgotten rooms do not show up even if we forget after the from/to range.
+
+        Ideally, we would be able to track when the `/forget` happens and apply it
+        accordingly in the token range but the forgotten flag is only an extra bool in
+        the `room_memberships` table.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup a normal room that we leave. This won't show up in the sync response
+        # because we left it before our token but is good to check anyway.
+        leave_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(leave_room_id, user1_id, tok=user1_tok)
+        self.helper.leave(leave_room_id, user1_id, tok=user1_tok)
+
+        # Setup the ban room (user2 bans user1 from the room)
+        ban_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(ban_room_id, user1_id, tok=user1_tok)
+        self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        # Setup the kick room (user2 kicks user1 from the room)
+        kick_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+        # Kick user1 from the room
+        self.helper.change_membership(
+            room=kick_room_id,
+            src=user2_id,
+            targ=user1_id,
+            tok=user2_tok,
+            membership=Membership.LEAVE,
+            extra_data={
+                "reason": "Bad manners",
+            },
+        )
+
+        before_room_forgets = self.event_sources.get_current_token()
+
+        # Forget the room after we already have our tokens. This doesn't change
+        # the membership event itself but will mark it internally in Synapse
+        channel = self.make_request(
+            "POST",
+            f"/_matrix/client/r0/rooms/{leave_room_id}/forget",
+            content={},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+        channel = self.make_request(
+            "POST",
+            f"/_matrix/client/r0/rooms/{ban_room_id}/forget",
+            content={},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+        channel = self.make_request(
+            "POST",
+            f"/_matrix/client/r0/rooms/{kick_room_id}/forget",
+            content={},
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.result)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room_forgets,
+                to_token=before_room_forgets,
+            )
+        )
+
+        # We shouldn't see the room because it was forgotten
+        self.assertEqual(room_id_results, set())
+
+    def test_only_newly_left_rooms_show_up(self) -> None:
+        """
+        Test that newly_left rooms still show up in the sync response but rooms that
+        were left before the `from_token` don't show up. See condition "2)" comments in
+        the `get_sync_room_ids_for_user` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        # Leave before we calculate the `from_token`
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Leave during the from_token/to_token range (newly_left)
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+
+        after_room2_token = self.event_sources.get_current_token()
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room2_token,
+            )
+        )
+
+        # Only the newly_left room should show up
+        self.assertEqual(room_id_results, {room_id2})
+
+    def test_no_joins_after_to_token(self) -> None:
+        """
+        Rooms we join after the `to_token` should *not* show up. See condition "1b)"
+        comments in the `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Room join after after our `to_token` shouldn't show up
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok)
+        _ = room_id2
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_join_during_range_and_left_room_after_to_token(self) -> None:
+        """
+        Room still shows up if we left the room but were joined during the
+        from_token/to_token. See condition "1a)" comments in the
+        `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Leave the room after we already have our tokens
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # We should still see the room because we were joined during the
+        # from_token/to_token time period.
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_join_before_range_and_left_room_after_to_token(self) -> None:
+        """
+        Room still shows up if we left the room but were joined before the `from_token`
+        so it should show up. See condition "1a)" comments in the
+        `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Leave the room after we already have our tokens
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # We should still see the room because we were joined before the `from_token`
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_kicked_before_range_and_left_after_to_token(self) -> None:
+        """
+        Room still shows up if we left the room but were kicked before the `from_token`
+        so it should show up. See condition "1a)" comments in the
+        `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Setup the kick room (user2 kicks user1 from the room)
+        kick_room_id = self.helper.create_room_as(
+            user2_id, tok=user2_tok, is_public=True
+        )
+        self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+        # Kick user1 from the room
+        self.helper.change_membership(
+            room=kick_room_id,
+            src=user2_id,
+            targ=user1_id,
+            tok=user2_tok,
+            membership=Membership.LEAVE,
+            extra_data={
+                "reason": "Bad manners",
+            },
+        )
+
+        after_kick_token = self.event_sources.get_current_token()
+
+        # Leave the room after we already have our tokens
+        #
+        # We have to join before we can leave (leave -> leave isn't a valid transition
+        # or at least it doesn't work in Synapse, 403 forbidden)
+        self.helper.join(kick_room_id, user1_id, tok=user1_tok)
+        self.helper.leave(kick_room_id, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_kick_token,
+                to_token=after_kick_token,
+            )
+        )
+
+        # We shouldn't see the room because it was forgotten
+        self.assertEqual(room_id_results, {kick_room_id})
+
+    def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None:
+        """
+        Newly left room should show up. But we're also testing that joining and leaving
+        after the `to_token` doesn't mess with the results. See condition "2)" and "1a)"
+        comments in the `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        # Join and leave the room during the from/to range
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Join and leave the room after we already have our tokens
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should still show up because it's newly_left during the from/to range
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_newly_left_during_range_and_join_after_to_token(self) -> None:
+        """
+        Newly left room should show up. But we're also testing that joining after the
+        `to_token` doesn't mess with the results. See condition "2)" and "1b)" comments
+        in the `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        # Join and leave the room during the from/to range
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Join the room after we already have our tokens
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should still show up because it's newly_left during the from/to range
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_no_from_token(self) -> None:
+        """
+        Test that if we don't provide a `from_token`, we get all the rooms that we we're
+        joined to up to the `to_token`.
+
+        Providing `from_token` only really has the effect that it adds `newly_left`
+        rooms to the response.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+        # Join room1
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Join and leave the room2 before the `to_token`
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Join the room2 after we already have our tokens
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=None,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Only rooms we were joined to before the `to_token` should show up
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_from_token_ahead_of_to_token(self) -> None:
+        """
+        Test when the provided `from_token` comes after the `to_token`. We should
+        basically expect the same result as having no `from_token`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+        # Join room1 before `before_room_token`
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        # Join and leave the room2 before `before_room_token`
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+
+        # Note: These are purposely swapped. The `from_token` should come after
+        # the `to_token` in this test
+        to_token = self.event_sources.get_current_token()
+
+        # Join room2 after `before_room_token`
+        self.helper.join(room_id2, user1_id, tok=user1_tok)
+
+        # --------
+
+        # Join room3 after `before_room_token`
+        self.helper.join(room_id3, user1_id, tok=user1_tok)
+
+        # Join and leave the room4 after `before_room_token`
+        self.helper.join(room_id4, user1_id, tok=user1_tok)
+        self.helper.leave(room_id4, user1_id, tok=user1_tok)
+
+        # Note: These are purposely swapped. The `from_token` should come after the
+        # `to_token` in this test
+        from_token = self.event_sources.get_current_token()
+
+        # Join the room4 after we already have our tokens
+        self.helper.join(room_id4, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=from_token,
+                to_token=to_token,
+            )
+        )
+
+        # Only rooms we were joined to before the `to_token` should show up
+        #
+        # There won't be any newly_left rooms because the `from_token` is ahead of the
+        # `to_token` and that range will give no membership changes to check.
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_leave_before_range_and_join_leave_after_to_token(self) -> None:
+        """
+        Old left room shouldn't show up. But we're also testing that joining and leaving
+        after the `to_token` doesn't mess with the results. See condition "1a)" comments
+        in the `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        # Join and leave the room before the from/to range
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Join and leave the room after we already have our tokens
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room shouldn't show up because it was left before the `from_token`
+        self.assertEqual(room_id_results, set())
+
+    def test_leave_before_range_and_join_after_to_token(self) -> None:
+        """
+        Old left room shouldn't show up. But we're also testing that joining after the
+        `to_token` doesn't mess with the results. See condition "1b)" comments in the
+        `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        # Join and leave the room before the from/to range
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Join the room after we already have our tokens
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room shouldn't show up because it was left before the `from_token`
+        self.assertEqual(room_id_results, set())
+
+    def test_join_leave_multiple_times_during_range_and_after_to_token(
+        self,
+    ) -> None:
+        """
+        Join and leave multiple times shouldn't affect rooms from showing up. It just
+        matters that we were joined or newly_left in the from/to range. But we're also
+        testing that joining and leaving after the `to_token` doesn't mess with the
+        results.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        before_room1_token = self.event_sources.get_current_token()
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        # Join, leave, join back to the room before the from/to range
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Leave and Join the room multiple times after we already have our tokens
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should show up because it was newly_left and joined during the from/to range
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_join_leave_multiple_times_before_range_and_after_to_token(
+        self,
+    ) -> None:
+        """
+        Join and leave multiple times before the from/to range shouldn't affect rooms
+        from showing up. It just matters that we were joined or newly_left in the
+        from/to range. But we're also testing that joining and leaving after the
+        `to_token` doesn't mess with the results.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        # Join, leave, join back to the room before the from/to range
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Leave and Join the room multiple times after we already have our tokens
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should show up because we were joined before the from/to range
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_invite_before_range_and_join_leave_after_to_token(
+        self,
+    ) -> None:
+        """
+        Make it look like we joined after the token range but we were invited before the
+        from/to range so the room should still show up. See condition "1a)" comments in
+        the `get_sync_room_ids_for_user()` method.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+        # Invited to the room before the token
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        after_room1_token = self.event_sources.get_current_token()
+
+        # Join and leave the room after we already have our tokens
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=after_room1_token,
+                to_token=after_room1_token,
+            )
+        )
+
+        # Room should show up because we were invited before the from/to range
+        self.assertEqual(room_id_results, {room_id1})
+
+    def test_multiple_rooms_are_not_confused(
+        self,
+    ) -> None:
+        """
+        Test that multiple rooms are not confused as we fixup the list. This test is
+        spawning from a real world bug in the code where I was accidentally using
+        `event.room_id` in one of the fix-up loops but the `event` being referenced was
+        actually from a different loop.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # We create the room with user2 so the room isn't left with no members when we
+        # leave and can still re-join.
+        room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+
+        # Invited and left the room before the token
+        self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        # Invited to room2
+        self.helper.invite(room_id2, src=user2_id, targ=user1_id, tok=user2_tok)
+
+        before_room3_token = self.event_sources.get_current_token()
+
+        # Invited and left room3 during the from/to range
+        room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True)
+        self.helper.invite(room_id3, src=user2_id, targ=user1_id, tok=user2_tok)
+        self.helper.leave(room_id3, user1_id, tok=user1_tok)
+
+        after_room3_token = self.event_sources.get_current_token()
+
+        # Join and leave the room after we already have our tokens
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        # Leave room2
+        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+        # Leave room3
+        self.helper.leave(room_id3, user1_id, tok=user1_tok)
+
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_room3_token,
+                to_token=after_room3_token,
+            )
+        )
+
+        self.assertEqual(
+            room_id_results,
+            {
+                # `room_id1` shouldn't show up because we left before the from/to range
+                #
+                # Room should show up because we were invited before the from/to range
+                room_id2,
+                # Room should show up because it was newly_left during the from/to range
+                room_id3,
+            },
+        )
+
+
+class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase):
+    """
+    Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it works with
+    sharded event stream_writers enabled
+    """
+
+    servlets = [
+        admin.register_servlets_for_client_rest_resource,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def default_config(self) -> dict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+
+        # Enable shared event stream_writers
+        config["stream_writers"] = {"events": ["worker1", "worker2", "worker3"]}
+        config["instance_map"] = {
+            "main": {"host": "testserv", "port": 8765},
+            "worker1": {"host": "testserv", "port": 1001},
+            "worker2": {"host": "testserv", "port": 1002},
+            "worker3": {"host": "testserv", "port": 1003},
+        }
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        self.store = self.hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+
+    def _create_room(self, room_id: str, user_id: str, tok: str) -> None:
+        """
+        Create a room with a specific room_id. We use this so that that we have a
+        consistent room_id across test runs that hashes to the same value and will be
+        sharded to a known worker in the tests.
+        """
+
+        # We control the room ID generation by patching out the
+        # `_generate_room_id` method
+        with patch(
+            "synapse.handlers.room.RoomCreationHandler._generate_room_id"
+        ) as mock:
+            mock.side_effect = lambda: room_id
+            self.helper.create_room_as(user_id, tok=tok)
+
+    def test_sharded_event_persisters(self) -> None:
+        """
+        This test should catch bugs that would come from flawed stream position
+        (`stream_ordering`) comparisons or making `RoomStreamToken`'s naively. To
+        compare event positions properly, you need to consider both the `instance_name`
+        and `stream_ordering` together.
+
+        The test creates three event persister workers and a room that is sharded to
+        each worker. On worker2, we make the event stream position stuck so that it lags
+        behind the other workers and we start getting `RoomStreamToken` that have an
+        `instance_map` component (i.e. q`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
+
+        We then send some events to advance the stream positions of worker1 and worker3
+        but worker2 is lagging behind because it's stuck. We are specifically testing
+        that `get_sync_room_ids_for_user(from_token=xxx, to_token=xxx)` should work
+        correctly in these adverse conditions.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "worker1"},
+        )
+
+        worker_hs2 = self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "worker2"},
+        )
+
+        self.make_worker_hs(
+            "synapse.app.generic_worker",
+            {"worker_name": "worker3"},
+        )
+
+        # Specially crafted room IDs that get persisted on different workers.
+        #
+        # Sharded to worker1
+        room_id1 = "!fooo:test"
+        # Sharded to worker2
+        room_id2 = "!bar:test"
+        # Sharded to worker3
+        room_id3 = "!quux:test"
+
+        # Create rooms on the different workers.
+        self._create_room(room_id1, user2_id, user2_tok)
+        self._create_room(room_id2, user2_id, user2_tok)
+        self._create_room(room_id3, user2_id, user2_tok)
+        join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok)
+        join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok)
+        # Leave room2
+        self.helper.leave(room_id2, user1_id, tok=user1_tok)
+        join_response3 = self.helper.join(room_id3, user1_id, tok=user1_tok)
+        # Leave room3
+        self.helper.leave(room_id3, user1_id, tok=user1_tok)
+
+        # Ensure that the events were sharded to different workers.
+        pos1 = self.get_success(
+            self.store.get_position_for_event(join_response1["event_id"])
+        )
+        self.assertEqual(pos1.instance_name, "worker1")
+        pos2 = self.get_success(
+            self.store.get_position_for_event(join_response2["event_id"])
+        )
+        self.assertEqual(pos2.instance_name, "worker2")
+        pos3 = self.get_success(
+            self.store.get_position_for_event(join_response3["event_id"])
+        )
+        self.assertEqual(pos3.instance_name, "worker3")
+
+        before_stuck_activity_token = self.event_sources.get_current_token()
+
+        # We now gut wrench into the events stream `MultiWriterIdGenerator` on worker2 to
+        # mimic it getting stuck persisting an event. This ensures that when we send an
+        # event on worker1/worker3 we end up in a state where worker2 events stream
+        # position lags that on worker1/worker3, resulting in a RoomStreamToken with a
+        # non-empty `instance_map` component.
+        #
+        # Worker2's event stream position will not advance until we call `__aexit__`
+        # again.
+        worker_store2 = worker_hs2.get_datastores().main
+        assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator)
+        actx = worker_store2._stream_id_gen.get_next()
+        self.get_success(actx.__aenter__())
+
+        # For room_id1/worker1: leave and join the room to advance the stream position
+        # and generate membership changes.
+        self.helper.leave(room_id1, user1_id, tok=user1_tok)
+        self.helper.join(room_id1, user1_id, tok=user1_tok)
+        # For room_id2/worker2: which is currently stuck, join the room.
+        join_on_worker2_response = self.helper.join(room_id2, user1_id, tok=user1_tok)
+        # For room_id3/worker3: leave and join the room to advance the stream position
+        # and generate membership changes.
+        self.helper.leave(room_id3, user1_id, tok=user1_tok)
+        join_on_worker3_response = self.helper.join(room_id3, user1_id, tok=user1_tok)
+
+        # Get a token while things are stuck after our activity
+        stuck_activity_token = self.event_sources.get_current_token()
+        logger.info("stuck_activity_token %s", stuck_activity_token)
+        # Let's make sure we're working with a token that has an `instance_map`
+        self.assertNotEqual(len(stuck_activity_token.room_key.instance_map), 0)
+
+        # Just double check that the join event on worker2 (that is stuck) happened
+        # after the position recorded for worker2 in the token but before the max
+        # position in the token. This is crucial for the behavior we're trying to test.
+        join_on_worker2_pos = self.get_success(
+            self.store.get_position_for_event(join_on_worker2_response["event_id"])
+        )
+        logger.info("join_on_worker2_pos %s", join_on_worker2_pos)
+        # Ensure the join technially came after our token
+        self.assertGreater(
+            join_on_worker2_pos.stream,
+            stuck_activity_token.room_key.get_stream_pos_for_instance("worker2"),
+        )
+        # But less than the max stream position of some other worker
+        self.assertLess(
+            join_on_worker2_pos.stream,
+            # max
+            stuck_activity_token.room_key.get_max_stream_pos(),
+        )
+
+        # Just double check that the join event on worker3 happened after the min stream
+        # value in the token but still within the position recorded for worker3. This is
+        # crucial for the behavior we're trying to test.
+        join_on_worker3_pos = self.get_success(
+            self.store.get_position_for_event(join_on_worker3_response["event_id"])
+        )
+        logger.info("join_on_worker3_pos %s", join_on_worker3_pos)
+        # Ensure the join came after the min but still encapsulated by the token
+        self.assertGreaterEqual(
+            join_on_worker3_pos.stream,
+            # min
+            stuck_activity_token.room_key.stream,
+        )
+        self.assertLessEqual(
+            join_on_worker3_pos.stream,
+            stuck_activity_token.room_key.get_stream_pos_for_instance("worker3"),
+        )
+
+        # We finish the fake persisting an event we started above and advance worker2's
+        # event stream position (unstuck worker2).
+        self.get_success(actx.__aexit__(None, None, None))
+
+        # The function under test
+        room_id_results = self.get_success(
+            self.sliding_sync_handler.get_sync_room_ids_for_user(
+                UserID.from_string(user1_id),
+                from_token=before_stuck_activity_token,
+                to_token=stuck_activity_token,
+            )
+        )
+
+        self.assertEqual(
+            room_id_results,
+            {
+                room_id1,
+                # room_id2 shouldn't show up because we left before the from/to range
+                # and the join event during the range happened while worker2 was stuck.
+                # This means that from the perspective of the master, where the
+                # `stuck_activity_token` is generated, the stream position for worker2
+                # wasn't advanced to the join yet. Looking at the `instance_map`, the
+                # join technically comes after `stuck_activity_token``.
+                #
+                # room_id2,
+                room_id3,
+            },
+        )
+
+
+class FilterRoomsTestCase(HomeserverTestCase):
+    """
+    Tests Sliding Sync handler `filter_rooms()` to make sure it includes/excludes rooms
+    correctly.
+    """
+
+    servlets = [
+        admin.register_servlets,
+        knock.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.sliding_sync_handler = self.hs.get_sliding_sync_handler()
+        self.store = self.hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+
+    def _create_dm_room(
+        self,
+        inviter_user_id: str,
+        inviter_tok: str,
+        invitee_user_id: str,
+        invitee_tok: str,
+    ) -> str:
+        """
+        Helper to create a DM room as the "inviter" and invite the "invitee" user to the room. The
+        "invitee" user also will join the room. The `m.direct` account data will be set
+        for both users.
+        """
+
+        # Create a room and send an invite the other user
+        room_id = self.helper.create_room_as(
+            inviter_user_id,
+            is_public=False,
+            tok=inviter_tok,
+        )
+        self.helper.invite(
+            room_id,
+            src=inviter_user_id,
+            targ=invitee_user_id,
+            tok=inviter_tok,
+            extra_data={"is_direct": True},
+        )
+        # Person that was invited joins the room
+        self.helper.join(room_id, invitee_user_id, tok=invitee_tok)
+
+        # Mimic the client setting the room as a direct message in the global account
+        # data
+        self.get_success(
+            self.store.add_account_data_for_user(
+                invitee_user_id,
+                AccountDataTypes.DIRECT,
+                {inviter_user_id: [room_id]},
+            )
+        )
+        self.get_success(
+            self.store.add_account_data_for_user(
+                inviter_user_id,
+                AccountDataTypes.DIRECT,
+                {invitee_user_id: [room_id]},
+            )
+        )
+
+        return room_id
+
+    def test_filter_dm_rooms(self) -> None:
+        """
+        Test `filter.is_dm` for DM rooms
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Create a normal room
+        room_id = self.helper.create_room_as(
+            user1_id,
+            is_public=False,
+            tok=user1_tok,
+        )
+
+        # Create a DM room
+        dm_room_id = self._create_dm_room(
+            inviter_user_id=user1_id,
+            inviter_tok=user1_tok,
+            invitee_user_id=user2_id,
+            invitee_tok=user2_tok,
+        )
+
+        after_rooms_token = self.event_sources.get_current_token()
+
+        # Try with `is_dm=True`
+        truthy_filtered_room_ids = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                {room_id, dm_room_id},
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_dm=True,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(truthy_filtered_room_ids, {dm_room_id})
+
+        # Try with `is_dm=False`
+        falsy_filtered_room_ids = self.get_success(
+            self.sliding_sync_handler.filter_rooms(
+                UserID.from_string(user1_id),
+                {room_id, dm_room_id},
+                SlidingSyncConfig.SlidingSyncList.Filters(
+                    is_dm=False,
+                ),
+                after_rooms_token,
+            )
+        )
+
+        self.assertEqual(falsy_filtered_room_ids, {room_id})
diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py
index 77c6cac449..878d9683b6 100644
--- a/tests/handlers/test_user_directory.py
+++ b/tests/handlers/test_user_directory.py
@@ -1061,6 +1061,45 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase):
             {alice: ProfileInfo(display_name=None, avatar_url=MXC_DUMMY)},
         )
 
+    def test_search_punctuation(self) -> None:
+        """Test that you can search for a user that includes punctuation"""
+
+        searching_user = self.register_user("searcher", "password")
+        searching_user_tok = self.login("searcher", "password")
+
+        room_id = self.helper.create_room_as(
+            searching_user,
+            room_version=RoomVersions.V1.identifier,
+            tok=searching_user_tok,
+        )
+
+        # We want to test searching for users of the form e.g. "user-1", with
+        # various punctuation. We also test both where the prefix is numeric and
+        # alphanumeric, as e.g. postgres tokenises "user-1" as "user" and "-1".
+        i = 1
+        for char in ["-", ".", "_"]:
+            for use_numeric in [False, True]:
+                if use_numeric:
+                    prefix1 = f"{i}"
+                    prefix2 = f"{i+1}"
+                else:
+                    prefix1 = f"a{i}"
+                    prefix2 = f"a{i+1}"
+
+                local_user_1 = self.register_user(f"user{char}{prefix1}", "password")
+                local_user_2 = self.register_user(f"user{char}{prefix2}", "password")
+
+                self._add_user_to_room(room_id, RoomVersions.V1, local_user_1)
+                self._add_user_to_room(room_id, RoomVersions.V1, local_user_2)
+
+                results = self.get_success(
+                    self.handler.search_users(searching_user, local_user_1, 20)
+                )["results"]
+                received_user_id_ordering = [result["user_id"] for result in results]
+                self.assertSequenceEqual(received_user_id_ordering[:1], [local_user_1])
+
+                i += 2
+
 
 class TestUserDirSearchDisabled(unittest.HomeserverTestCase):
     servlets = [
diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py
index 1bd51ceba2..47a89e9c66 100644
--- a/tests/media/test_media_storage.py
+++ b/tests/media/test_media_storage.py
@@ -25,7 +25,7 @@ import tempfile
 from binascii import unhexlify
 from io import BytesIO
 from typing import Any, BinaryIO, ClassVar, Dict, List, Optional, Tuple, Union
-from unittest.mock import Mock
+from unittest.mock import MagicMock, Mock, patch
 from urllib import parse
 
 import attr
@@ -37,16 +37,22 @@ from twisted.internet import defer
 from twisted.internet.defer import Deferred
 from twisted.python.failure import Failure
 from twisted.test.proto_helpers import MemoryReactor
+from twisted.web.http_headers import Headers
+from twisted.web.iweb import UNKNOWN_LENGTH, IResponse
 from twisted.web.resource import Resource
 
 from synapse.api.errors import Codes, HttpResponseException
+from synapse.api.ratelimiting import Ratelimiter
 from synapse.events import EventBase
 from synapse.http.types import QueryParams
 from synapse.logging.context import make_deferred_yieldable
 from synapse.media._base import FileInfo, ThumbnailInfo
 from synapse.media.filepath import MediaFilePaths
 from synapse.media.media_storage import MediaStorage, ReadableFileWrapper
-from synapse.media.storage_provider import FileStorageProviderBackend
+from synapse.media.storage_provider import (
+    FileStorageProviderBackend,
+    StorageProviderWrapper,
+)
 from synapse.media.thumbnailer import ThumbnailProvider
 from synapse.module_api import ModuleApi
 from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers
@@ -59,6 +65,7 @@ from synapse.util import Clock
 from tests import unittest
 from tests.server import FakeChannel
 from tests.test_utils import SMALL_PNG
+from tests.unittest import override_config
 from tests.utils import default_config
 
 
@@ -74,7 +81,14 @@ class MediaStorageTests(unittest.HomeserverTestCase):
 
         hs.config.media.media_store_path = self.primary_base_path
 
-        storage_providers = [FileStorageProviderBackend(hs, self.secondary_base_path)]
+        storage_providers = [
+            StorageProviderWrapper(
+                FileStorageProviderBackend(hs, self.secondary_base_path),
+                store_local=True,
+                store_remote=False,
+                store_synchronous=True,
+            )
+        ]
 
         self.filepaths = MediaFilePaths(self.primary_base_path)
         self.media_storage = MediaStorage(
@@ -251,9 +265,11 @@ class MediaRepoTests(unittest.HomeserverTestCase):
             destination: str,
             path: str,
             output_stream: BinaryIO,
+            download_ratelimiter: Ratelimiter,
+            ip_address: Any,
+            max_size: int,
             args: Optional[QueryParams] = None,
             retry_on_dns_fail: bool = True,
-            max_size: Optional[int] = None,
             ignore_backoff: bool = False,
             follow_redirects: bool = False,
         ) -> "Deferred[Tuple[int, Dict[bytes, List[bytes]]]]":
@@ -878,3 +894,218 @@ class SpamCheckerTestCase(unittest.HomeserverTestCase):
             tok=self.tok,
             expect_code=400,
         )
+
+
+class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase):
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        config = self.default_config()
+
+        self.storage_path = self.mktemp()
+        self.media_store_path = self.mktemp()
+        os.mkdir(self.storage_path)
+        os.mkdir(self.media_store_path)
+        config["media_store_path"] = self.media_store_path
+
+        provider_config = {
+            "module": "synapse.media.storage_provider.FileStorageProviderBackend",
+            "store_local": True,
+            "store_synchronous": False,
+            "store_remote": True,
+            "config": {"directory": self.storage_path},
+        }
+
+        config["media_storage_providers"] = [provider_config]
+
+        return self.setup_test_homeserver(config=config)
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.repo = hs.get_media_repository()
+        self.client = hs.get_federation_http_client()
+        self.store = hs.get_datastores().main
+
+    def create_resource_dict(self) -> Dict[str, Resource]:
+        # We need to manually set the resource tree to include media, the
+        # default only does `/_matrix/client` APIs.
+        return {"/_matrix/media": self.hs.get_media_repository_resource()}
+
+    # mock actually reading file body
+    def read_body_with_max_size_30MiB(*args: Any, **kwargs: Any) -> Deferred:
+        d: Deferred = defer.Deferred()
+        d.callback(31457280)
+        return d
+
+    def read_body_with_max_size_50MiB(*args: Any, **kwargs: Any) -> Deferred:
+        d: Deferred = defer.Deferred()
+        d.callback(52428800)
+        return d
+
+    @patch(
+        "synapse.http.matrixfederationclient.read_body_with_max_size",
+        read_body_with_max_size_30MiB,
+    )
+    def test_download_ratelimit_default(self) -> None:
+        """
+        Test remote media download ratelimiting against default configuration - 500MB bucket
+        and 87kb/second drain rate
+        """
+
+        # mock out actually sending the request, returns a 30MiB response
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 31457280
+            resp.headers = Headers({"Content-Type": ["application/octet-stream"]})
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        # first request should go through
+        channel = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyz",
+            shorthand=False,
+        )
+        assert channel.code == 200
+
+        # next 15 should go through
+        for i in range(15):
+            channel2 = self.make_request(
+                "GET",
+                f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}",
+                shorthand=False,
+            )
+            assert channel2.code == 200
+
+        # 17th will hit ratelimit
+        channel3 = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx",
+            shorthand=False,
+        )
+        assert channel3.code == 429
+
+        # however, a request from a different IP will go through
+        channel4 = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyz",
+            shorthand=False,
+            client_ip="187.233.230.159",
+        )
+        assert channel4.code == 200
+
+        # at 87Kib/s it should take about 2 minutes for enough to drain from bucket that another
+        # 30MiB download is authorized - The last download was blocked at 503,316,480.
+        # The next download will be authorized when bucket hits 492,830,720
+        # (524,288,000 total capacity - 31,457,280 download size) so 503,316,480 - 492,830,720 ~= 10,485,760
+        # needs to drain before another download will be authorized, that will take ~=
+        # 2 minutes (10,485,760/89,088/60)
+        self.reactor.pump([2.0 * 60.0])
+
+        # enough has drained and next request goes through
+        channel5 = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyb",
+            shorthand=False,
+        )
+        assert channel5.code == 200
+
+    @override_config(
+        {
+            "remote_media_download_per_second": "50M",
+            "remote_media_download_burst_count": "50M",
+        }
+    )
+    @patch(
+        "synapse.http.matrixfederationclient.read_body_with_max_size",
+        read_body_with_max_size_50MiB,
+    )
+    def test_download_rate_limit_config(self) -> None:
+        """
+        Test that download rate limit config options are correctly picked up and applied
+        """
+
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = 52428800
+            resp.headers = Headers({"Content-Type": ["application/octet-stream"]})
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        # first request should go through
+        channel = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyz",
+            shorthand=False,
+        )
+        assert channel.code == 200
+
+        # immediate second request should fail
+        channel = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy1",
+            shorthand=False,
+        )
+        assert channel.code == 429
+
+        # advance half a second
+        self.reactor.pump([0.5])
+
+        # request still fails
+        channel = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy2",
+            shorthand=False,
+        )
+        assert channel.code == 429
+
+        # advance another half second
+        self.reactor.pump([0.5])
+
+        # enough has drained from bucket and request is successful
+        channel = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy3",
+            shorthand=False,
+        )
+        assert channel.code == 200
+
+    @patch(
+        "synapse.http.matrixfederationclient.read_body_with_max_size",
+        read_body_with_max_size_30MiB,
+    )
+    def test_download_ratelimit_max_size_sub(self) -> None:
+        """
+        Test that if no content-length is provided, the default max size is applied instead
+        """
+
+        # mock out actually sending the request
+        async def _send_request(*args: Any, **kwargs: Any) -> IResponse:
+            resp = MagicMock(spec=IResponse)
+            resp.code = 200
+            resp.length = UNKNOWN_LENGTH
+            resp.headers = Headers({"Content-Type": ["application/octet-stream"]})
+            resp.phrase = b"OK"
+            return resp
+
+        self.client._send_request = _send_request  # type: ignore
+
+        # ten requests should go through using the max size (500MB/50MB)
+        for i in range(10):
+            channel2 = self.make_request(
+                "GET",
+                f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}",
+                shorthand=False,
+            )
+            assert channel2.code == 200
+
+        # eleventh will hit ratelimit
+        channel3 = self.make_request(
+            "GET",
+            "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx",
+            shorthand=False,
+        )
+        assert channel3.code == 429
diff --git a/tests/push/test_email.py b/tests/push/test_email.py
index c927a73fa6..e0aab1c046 100644
--- a/tests/push/test_email.py
+++ b/tests/push/test_email.py
@@ -205,8 +205,24 @@ class EmailPusherTests(HomeserverTestCase):
 
         # Multipart: plain text, base 64 encoded; html, base 64 encoded
         multipart_msg = email.message_from_bytes(msg)
-        txt = multipart_msg.get_payload()[0].get_payload(decode=True).decode()
-        html = multipart_msg.get_payload()[1].get_payload(decode=True).decode()
+
+        # Extract the text (non-HTML) portion of the multipart Message,
+        # as a Message.
+        txt_message = multipart_msg.get_payload(i=0)
+        assert isinstance(txt_message, email.message.Message)
+
+        # Extract the actual bytes from the Message object, and decode them to a `str`.
+        txt_bytes = txt_message.get_payload(decode=True)
+        assert isinstance(txt_bytes, bytes)
+        txt = txt_bytes.decode()
+
+        # Do the same for the HTML portion of the multipart Message.
+        html_message = multipart_msg.get_payload(i=1)
+        assert isinstance(html_message, email.message.Message)
+        html_bytes = html_message.get_payload(decode=True)
+        assert isinstance(html_bytes, bytes)
+        html = html_bytes.decode()
+
         self.assertIn("/_synapse/client/unsubscribe", txt)
         self.assertIn("/_synapse/client/unsubscribe", html)
 
@@ -347,12 +363,17 @@ class EmailPusherTests(HomeserverTestCase):
         # That email should contain the room's avatar
         msg: bytes = args[5]
         # Multipart: plain text, base 64 encoded; html, base 64 encoded
-        html = (
-            email.message_from_bytes(msg)
-            .get_payload()[1]
-            .get_payload(decode=True)
-            .decode()
-        )
+
+        # Extract the html Message object from the Multipart Message.
+        # We need the asserts to convince mypy that this is OK.
+        html_message = email.message_from_bytes(msg).get_payload(i=1)
+        assert isinstance(html_message, email.message.Message)
+
+        # Extract the `bytes` from the html Message object, and decode to a `str`.
+        html = html_message.get_payload(decode=True)
+        assert isinstance(html, bytes)
+        html = html.decode()
+
         self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html)
 
     def test_empty_room(self) -> None:
diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py
index 86c8f14d1b..a56f1e2d5d 100644
--- a/tests/replication/storage/test_events.py
+++ b/tests/replication/storage/test_events.py
@@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
         self.persist(type="m.room.create", key="", creator=USER_ID)
         self.check("get_invited_rooms_for_local_user", [USER_ID_2], [])
         event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite")
+        assert event.internal_metadata.instance_name is not None
         assert event.internal_metadata.stream_ordering is not None
 
         self.replicate()
@@ -154,7 +155,10 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
                     USER_ID,
                     "invite",
                     event.event_id,
-                    event.internal_metadata.stream_ordering,
+                    PersistedEventPosition(
+                        event.internal_metadata.instance_name,
+                        event.internal_metadata.stream_ordering,
+                    ),
                     RoomVersions.V1.identifier,
                 )
             ],
@@ -229,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
         j2 = self.persist(
             type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join"
         )
+        assert j2.internal_metadata.instance_name is not None
         assert j2.internal_metadata.stream_ordering is not None
         self.replicate()
 
         expected_pos = PersistedEventPosition(
-            "master", j2.internal_metadata.stream_ordering
+            j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering
         )
         self.check(
             "get_rooms_for_user_with_stream_ordering",
@@ -285,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
         msg, msgctx = self.build_event()
         self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)]))
         self.replicate()
+        assert j2.internal_metadata.instance_name is not None
         assert j2.internal_metadata.stream_ordering is not None
 
         event_source = RoomEventSource(self.hs)
@@ -326,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase):
             # joined_rooms list.
             if membership_changes:
                 expected_pos = PersistedEventPosition(
-                    "master", j2.internal_metadata.stream_ordering
+                    j2.internal_metadata.instance_name,
+                    j2.internal_metadata.stream_ordering,
                 )
                 self.assertEqual(
                     joined_rooms,
diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py
index 22106eb786..5f6f7213b3 100644
--- a/tests/rest/admin/test_admin.py
+++ b/tests/rest/admin/test_admin.py
@@ -435,10 +435,6 @@ class ExperimentalFeaturesTestCase(unittest.HomeserverTestCase):
             True,
             channel.json_body["features"]["msc3881"],
         )
-        self.assertEqual(
-            False,
-            channel.json_body["features"]["msc3967"],
-        )
 
         # test nothing blows up if you try to disable a feature that isn't already enabled
         url = f"{self.url}/{self.other_user}"
diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py
index a0f978911a..feb410a11d 100644
--- a/tests/rest/admin/test_event_reports.py
+++ b/tests/rest/admin/test_event_reports.py
@@ -24,7 +24,7 @@ from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
 from synapse.api.errors import Codes
-from synapse.rest.client import login, report_event, room
+from synapse.rest.client import login, reporting, room
 from synapse.server import HomeServer
 from synapse.types import JsonDict
 from synapse.util import Clock
@@ -37,7 +37,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase):
         synapse.rest.admin.register_servlets,
         login.register_servlets,
         room.register_servlets,
-        report_event.register_servlets,
+        reporting.register_servlets,
     ]
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
@@ -453,7 +453,7 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase):
         synapse.rest.admin.register_servlets,
         login.register_servlets,
         room.register_servlets,
-        report_event.register_servlets,
+        reporting.register_servlets,
     ]
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py
index 992421ffe2..a85ea994de 100644
--- a/tests/rest/client/test_account.py
+++ b/tests/rest/client/test_account.py
@@ -427,13 +427,23 @@ class PasswordResetTestCase(unittest.HomeserverTestCase):
         text = None
         for part in mail.walk():
             if part.get_content_type() == "text/plain":
-                text = part.get_payload(decode=True).decode("UTF-8")
+                text = part.get_payload(decode=True)
+                if text is not None:
+                    # According to the logic table in `get_payload`, we know that
+                    # the result of `get_payload` will be `bytes`, but mypy doesn't
+                    # know this and complains. Thus, we assert the type.
+                    assert isinstance(text, bytes)
+                    text = text.decode("UTF-8")
+
                 break
 
         if not text:
             self.fail("Could not find text portion of email to parse")
 
-        assert text is not None
+        # `text` must be a `str`, after being decoded and determined just above
+        # to not be `None` or an empty `str`.
+        assert isinstance(text, str)
+
         match = re.search(r"https://example.com\S+", text)
         assert match, "Could not find link in email"
 
@@ -1209,13 +1219,23 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase):
         text = None
         for part in mail.walk():
             if part.get_content_type() == "text/plain":
-                text = part.get_payload(decode=True).decode("UTF-8")
+                text = part.get_payload(decode=True)
+                if text is not None:
+                    # According to the logic table in `get_payload`, we know that
+                    # the result of `get_payload` will be `bytes`, but mypy doesn't
+                    # know this and complains. Thus, we assert the type.
+                    assert isinstance(text, bytes)
+                    text = text.decode("UTF-8")
+
                 break
 
         if not text:
             self.fail("Could not find text portion of email to parse")
 
-        assert text is not None
+        # `text` must be a `str`, after being decoded and determined just above
+        # to not be `None` or an empty `str`.
+        assert isinstance(text, str)
+
         match = re.search(r"https://example.com\S+", text)
         assert match, "Could not find link in email"
 
diff --git a/tests/rest/client/test_keys.py b/tests/rest/client/test_keys.py
index 5f0c005576..8bbd109092 100644
--- a/tests/rest/client/test_keys.py
+++ b/tests/rest/client/test_keys.py
@@ -155,71 +155,6 @@ class KeyQueryTestCase(unittest.HomeserverTestCase):
         }
 
     def test_device_signing_with_uia(self) -> None:
-        """Device signing key upload requires UIA."""
-        password = "wonderland"
-        device_id = "ABCDEFGHI"
-        alice_id = self.register_user("alice", password)
-        alice_token = self.login("alice", password, device_id=device_id)
-
-        content = self.make_device_keys(alice_id, device_id)
-
-        channel = self.make_request(
-            "POST",
-            "/_matrix/client/v3/keys/device_signing/upload",
-            content,
-            alice_token,
-        )
-
-        self.assertEqual(channel.code, HTTPStatus.UNAUTHORIZED, channel.result)
-        # Grab the session
-        session = channel.json_body["session"]
-        # Ensure that flows are what is expected.
-        self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"])
-
-        # add UI auth
-        content["auth"] = {
-            "type": "m.login.password",
-            "identifier": {"type": "m.id.user", "user": alice_id},
-            "password": password,
-            "session": session,
-        }
-
-        channel = self.make_request(
-            "POST",
-            "/_matrix/client/v3/keys/device_signing/upload",
-            content,
-            alice_token,
-        )
-
-        self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
-
-    @override_config({"ui_auth": {"session_timeout": "15m"}})
-    def test_device_signing_with_uia_session_timeout(self) -> None:
-        """Device signing key upload requires UIA buy passes with grace period."""
-        password = "wonderland"
-        device_id = "ABCDEFGHI"
-        alice_id = self.register_user("alice", password)
-        alice_token = self.login("alice", password, device_id=device_id)
-
-        content = self.make_device_keys(alice_id, device_id)
-
-        channel = self.make_request(
-            "POST",
-            "/_matrix/client/v3/keys/device_signing/upload",
-            content,
-            alice_token,
-        )
-
-        self.assertEqual(channel.code, HTTPStatus.OK, channel.result)
-
-    @override_config(
-        {
-            "experimental_features": {"msc3967_enabled": True},
-            "ui_auth": {"session_timeout": "15s"},
-        }
-    )
-    def test_device_signing_with_msc3967(self) -> None:
-        """Device signing key follows MSC3967 behaviour when enabled."""
         password = "wonderland"
         device_id = "ABCDEFGHI"
         alice_id = self.register_user("alice", password)
diff --git a/tests/rest/client/test_models.py b/tests/rest/client/test_models.py
index 534dd7bcf4..f8a56c80ca 100644
--- a/tests/rest/client/test_models.py
+++ b/tests/rest/client/test_models.py
@@ -24,7 +24,7 @@ from typing import TYPE_CHECKING
 from typing_extensions import Literal
 
 from synapse._pydantic_compat import HAS_PYDANTIC_V2
-from synapse.rest.client.models import EmailRequestTokenBody
+from synapse.types.rest.client import EmailRequestTokenBody
 
 if TYPE_CHECKING or HAS_PYDANTIC_V2:
     from pydantic.v1 import BaseModel, ValidationError
diff --git a/tests/rest/client/test_report_event.py b/tests/rest/client/test_reporting.py
index 5903771e52..009deb9cb0 100644
--- a/tests/rest/client/test_report_event.py
+++ b/tests/rest/client/test_reporting.py
@@ -22,7 +22,7 @@
 from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
-from synapse.rest.client import login, report_event, room
+from synapse.rest.client import login, reporting, room
 from synapse.server import HomeServer
 from synapse.types import JsonDict
 from synapse.util import Clock
@@ -35,7 +35,7 @@ class ReportEventTestCase(unittest.HomeserverTestCase):
         synapse.rest.admin.register_servlets,
         login.register_servlets,
         room.register_servlets,
-        report_event.register_servlets,
+        reporting.register_servlets,
     ]
 
     def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
@@ -139,3 +139,92 @@ class ReportEventTestCase(unittest.HomeserverTestCase):
             "POST", self.report_path, data, access_token=self.other_user_tok
         )
         self.assertEqual(response_status, channel.code, msg=channel.result["body"])
+
+
+class ReportRoomTestCase(unittest.HomeserverTestCase):
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        reporting.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.other_user = self.register_user("user", "pass")
+        self.other_user_tok = self.login("user", "pass")
+
+        self.room_id = self.helper.create_room_as(
+            self.other_user, tok=self.other_user_tok, is_public=True
+        )
+        self.report_path = (
+            f"/_matrix/client/unstable/org.matrix.msc4151/rooms/{self.room_id}/report"
+        )
+
+    @unittest.override_config(
+        {
+            "experimental_features": {"msc4151_enabled": True},
+        }
+    )
+    def test_reason_str(self) -> None:
+        data = {"reason": "this makes me sad"}
+        self._assert_status(200, data)
+
+    @unittest.override_config(
+        {
+            "experimental_features": {"msc4151_enabled": True},
+        }
+    )
+    def test_no_reason(self) -> None:
+        data = {"not_reason": "for typechecking"}
+        self._assert_status(400, data)
+
+    @unittest.override_config(
+        {
+            "experimental_features": {"msc4151_enabled": True},
+        }
+    )
+    def test_reason_nonstring(self) -> None:
+        data = {"reason": 42}
+        self._assert_status(400, data)
+
+    @unittest.override_config(
+        {
+            "experimental_features": {"msc4151_enabled": True},
+        }
+    )
+    def test_reason_null(self) -> None:
+        data = {"reason": None}
+        self._assert_status(400, data)
+
+    @unittest.override_config(
+        {
+            "experimental_features": {"msc4151_enabled": True},
+        }
+    )
+    def test_cannot_report_nonexistent_room(self) -> None:
+        """
+        Tests that we don't accept event reports for rooms which do not exist.
+        """
+        channel = self.make_request(
+            "POST",
+            "/_matrix/client/unstable/org.matrix.msc4151/rooms/!bloop:example.org/report",
+            {"reason": "i am very sad"},
+            access_token=self.other_user_tok,
+            shorthand=False,
+        )
+        self.assertEqual(404, channel.code, msg=channel.result["body"])
+        self.assertEqual(
+            "Room does not exist",
+            channel.json_body["error"],
+            msg=channel.result["body"],
+        )
+
+    def _assert_status(self, response_status: int, data: JsonDict) -> None:
+        channel = self.make_request(
+            "POST",
+            self.report_path,
+            data,
+            access_token=self.other_user_tok,
+            shorthand=False,
+        )
+        self.assertEqual(response_status, channel.code, msg=channel.result["body"])
diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py
index ceae40498e..1e5a1b0a4d 100644
--- a/tests/rest/client/test_retention.py
+++ b/tests/rest/client/test_retention.py
@@ -167,7 +167,6 @@ class RetentionTestCase(unittest.HomeserverTestCase):
                 storage_controllers,
                 self.user_id,
                 events,
-                msc4115_membership_on_events=True,
             )
         )
 
diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py
index daeb1d3ddd..40870b2cfe 100644
--- a/tests/rest/client/test_sync.py
+++ b/tests/rest/client/test_sync.py
@@ -27,6 +27,7 @@ from twisted.test.proto_helpers import MemoryReactor
 
 import synapse.rest.admin
 from synapse.api.constants import (
+    AccountDataTypes,
     EventContentFields,
     EventTypes,
     ReceiptTypes,
@@ -34,7 +35,7 @@ from synapse.api.constants import (
 )
 from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync
 from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, RoomStreamToken, StreamKeyType
 from synapse.util import Clock
 
 from tests import unittest
@@ -1204,3 +1205,261 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase):
 
         self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"])
         self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"])
+
+
+class SlidingSyncTestCase(unittest.HomeserverTestCase):
+    """
+    Tests regarding MSC3575 Sliding Sync `/sync` endpoint.
+    """
+
+    servlets = [
+        synapse.rest.admin.register_servlets,
+        login.register_servlets,
+        room.register_servlets,
+        sync.register_servlets,
+        devices.register_servlets,
+    ]
+
+    def default_config(self) -> JsonDict:
+        config = super().default_config()
+        # Enable sliding sync
+        config["experimental_features"] = {"msc3575_enabled": True}
+        return config
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync"
+        self.store = hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+
+    def _create_dm_room(
+        self,
+        inviter_user_id: str,
+        inviter_tok: str,
+        invitee_user_id: str,
+        invitee_tok: str,
+    ) -> str:
+        """
+        Helper to create a DM room as the "inviter" and invite the "invitee" user to the
+        room. The "invitee" user also will join the room. The `m.direct` account data
+        will be set for both users.
+        """
+
+        # Create a room and send an invite the other user
+        room_id = self.helper.create_room_as(
+            inviter_user_id,
+            is_public=False,
+            tok=inviter_tok,
+        )
+        self.helper.invite(
+            room_id,
+            src=inviter_user_id,
+            targ=invitee_user_id,
+            tok=inviter_tok,
+            extra_data={"is_direct": True},
+        )
+        # Person that was invited joins the room
+        self.helper.join(room_id, invitee_user_id, tok=invitee_tok)
+
+        # Mimic the client setting the room as a direct message in the global account
+        # data
+        self.get_success(
+            self.store.add_account_data_for_user(
+                invitee_user_id,
+                AccountDataTypes.DIRECT,
+                {inviter_user_id: [room_id]},
+            )
+        )
+        self.get_success(
+            self.store.add_account_data_for_user(
+                inviter_user_id,
+                AccountDataTypes.DIRECT,
+                {invitee_user_id: [room_id]},
+            )
+        )
+
+        return room_id
+
+    def test_sync_list(self) -> None:
+        """
+        Test that room IDs show up in the Sliding Sync lists
+        """
+        alice_user_id = self.register_user("alice", "correcthorse")
+        alice_access_token = self.login(alice_user_id, "correcthorse")
+
+        room_id = self.helper.create_room_as(
+            alice_user_id, tok=alice_access_token, is_public=True
+        )
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 99]],
+                        "sort": ["by_notification_level", "by_recency", "by_name"],
+                        "required_state": [
+                            ["m.room.join_rules", ""],
+                            ["m.room.history_visibility", ""],
+                            ["m.space.child", "*"],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                }
+            },
+            access_token=alice_access_token,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make sure it has the foo-list we requested
+        self.assertListEqual(
+            list(channel.json_body["lists"].keys()),
+            ["foo-list"],
+            channel.json_body["lists"].keys(),
+        )
+
+        # Make sure the list includes the room we are joined to
+        self.assertListEqual(
+            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 99],
+                    "room_ids": [room_id],
+                }
+            ],
+            channel.json_body["lists"]["foo-list"],
+        )
+
+    def test_wait_for_sync_token(self) -> None:
+        """
+        Test that worker will wait until it catches up to the given token
+        """
+        alice_user_id = self.register_user("alice", "correcthorse")
+        alice_access_token = self.login(alice_user_id, "correcthorse")
+
+        # Create a future token that will cause us to wait. Since we never send a new
+        # event to reach that future stream_ordering, the worker will wait until the
+        # full timeout.
+        current_token = self.event_sources.get_current_token()
+        future_position_token = current_token.copy_and_replace(
+            StreamKeyType.ROOM,
+            RoomStreamToken(stream=current_token.room_key.stream + 1),
+        )
+
+        future_position_token_serialized = self.get_success(
+            future_position_token.to_string(self.store)
+        )
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint + f"?pos={future_position_token_serialized}",
+            {
+                "lists": {
+                    "foo-list": {
+                        "ranges": [[0, 99]],
+                        "sort": ["by_notification_level", "by_recency", "by_name"],
+                        "required_state": [
+                            ["m.room.join_rules", ""],
+                            ["m.room.history_visibility", ""],
+                            ["m.space.child", "*"],
+                        ],
+                        "timeline_limit": 1,
+                    }
+                }
+            },
+            access_token=alice_access_token,
+            await_result=False,
+        )
+        # Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)`
+        # timeout
+        with self.assertRaises(TimedOutException):
+            channel.await_result(timeout_ms=9900)
+        channel.await_result(timeout_ms=200)
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # We expect the `next_pos` in the result to be the same as what we requested
+        # with because we weren't able to find anything new yet.
+        self.assertEqual(
+            channel.json_body["next_pos"], future_position_token_serialized
+        )
+
+    def test_filter_list(self) -> None:
+        """
+        Test that filters apply to lists
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+        user2_id = self.register_user("user2", "pass")
+        user2_tok = self.login(user2_id, "pass")
+
+        # Create a DM room
+        dm_room_id = self._create_dm_room(
+            inviter_user_id=user1_id,
+            inviter_tok=user1_tok,
+            invitee_user_id=user2_id,
+            invitee_tok=user2_tok,
+        )
+
+        # Create a normal room
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        # Make the Sliding Sync request
+        channel = self.make_request(
+            "POST",
+            self.sync_endpoint,
+            {
+                "lists": {
+                    "dms": {
+                        "ranges": [[0, 99]],
+                        "sort": ["by_recency"],
+                        "required_state": [],
+                        "timeline_limit": 1,
+                        "filters": {"is_dm": True},
+                    },
+                    "foo-list": {
+                        "ranges": [[0, 99]],
+                        "sort": ["by_recency"],
+                        "required_state": [],
+                        "timeline_limit": 1,
+                        "filters": {"is_dm": False},
+                    },
+                }
+            },
+            access_token=user1_tok,
+        )
+        self.assertEqual(channel.code, 200, channel.json_body)
+
+        # Make sure it has the foo-list we requested
+        self.assertListEqual(
+            list(channel.json_body["lists"].keys()),
+            ["dms", "foo-list"],
+            channel.json_body["lists"].keys(),
+        )
+
+        # Make sure the list includes the room we are joined to
+        self.assertListEqual(
+            list(channel.json_body["lists"]["dms"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 99],
+                    "room_ids": [dm_room_id],
+                }
+            ],
+            list(channel.json_body["lists"]["dms"]),
+        )
+        self.assertListEqual(
+            list(channel.json_body["lists"]["foo-list"]["ops"]),
+            [
+                {
+                    "op": "SYNC",
+                    "range": [0, 99],
+                    "room_ids": [room_id],
+                }
+            ],
+            list(channel.json_body["lists"]["foo-list"]),
+        )
diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py
index 7362bde7ab..f0ba40a1f1 100644
--- a/tests/rest/client/utils.py
+++ b/tests/rest/client/utils.py
@@ -330,9 +330,12 @@ class RestHelper:
             data,
         )
 
-        assert channel.code == expect_code, "Expected: %d, got: %d, resp: %r" % (
+        assert (
+            channel.code == expect_code
+        ), "Expected: %d, got: %d, PUT %s -> resp: %r" % (
             expect_code,
             channel.code,
+            path,
             channel.result["body"],
         )
 
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index 27d5b0125f..81feb3ec29 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase):
 
         for e in events:
             e.internal_metadata.stream_ordering = self._next_stream_ordering
+            e.internal_metadata.instance_name = self.hs.get_instance_name()
             self._next_stream_ordering += 1
 
         def _persist(txn: LoggingTransaction) -> None:
diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py
index 2029cd9c68..ee34baf46f 100644
--- a/tests/storage/test_stream.py
+++ b/tests/storage/test_stream.py
@@ -19,7 +19,10 @@
 #
 #
 
-from typing import List
+import logging
+from typing import List, Tuple
+
+from immutabledict import immutabledict
 
 from twisted.test.proto_helpers import MemoryReactor
 
@@ -28,11 +31,13 @@ from synapse.api.filtering import Filter
 from synapse.rest import admin
 from synapse.rest.client import login, room
 from synapse.server import HomeServer
-from synapse.types import JsonDict
+from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken
 from synapse.util import Clock
 
 from tests.unittest import HomeserverTestCase
 
+logger = logging.getLogger(__name__)
+
 
 class PaginationTestCase(HomeserverTestCase):
     """
@@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase):
         }
         chunk = self._filter_messages(filter)
         self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none])
+
+
+class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase):
+    """
+    Test `get_last_event_in_room_before_stream_ordering(...)`
+    """
+
+    servlets = [
+        admin.register_servlets,
+        room.register_servlets,
+        login.register_servlets,
+    ]
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.store = hs.get_datastores().main
+        self.event_sources = hs.get_event_sources()
+
+    def _update_persisted_instance_name_for_event(
+        self, event_id: str, instance_name: str
+    ) -> None:
+        """
+        Update the `instance_name` that persisted the the event in the database.
+        """
+        return self.get_success(
+            self.store.db_pool.simple_update_one(
+                "events",
+                keyvalues={"event_id": event_id},
+                updatevalues={"instance_name": instance_name},
+            )
+        )
+
+    def _send_event_on_instance(
+        self, instance_name: str, room_id: str, access_token: str
+    ) -> Tuple[JsonDict, PersistedEventPosition]:
+        """
+        Send an event in a room and mimic that it was persisted by a specific
+        instance/worker.
+        """
+        event_response = self.helper.send(
+            room_id, f"{instance_name} message", tok=access_token
+        )
+
+        self._update_persisted_instance_name_for_event(
+            event_response["event_id"], instance_name
+        )
+
+        event_pos = self.get_success(
+            self.store.get_position_for_event(event_response["event_id"])
+        )
+
+        return event_response, event_pos
+
+    def test_before_room_created(self) -> None:
+        """
+        Test that no event is returned if we are using a token before the room was even created
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        before_room_token = self.event_sources.get_current_token()
+
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id,
+                end_token=before_room_token.room_key,
+            )
+        )
+
+        self.assertIsNone(last_event)
+
+    def test_after_room_created(self) -> None:
+        """
+        Test that an event is returned if we are using a token after the room was created
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id,
+                end_token=after_room_token.room_key,
+            )
+        )
+
+        self.assertIsNotNone(last_event)
+
+    def test_activity_in_other_rooms(self) -> None:
+        """
+        Test to make sure that the last event in the room is returned even if the
+        `stream_ordering` has advanced from activity in other rooms.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
+        # Create another room to advance the stream_ordering
+        self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=after_room_token.room_key,
+            )
+        )
+
+        # Make sure it's the event we expect (which also means we know it's from the
+        # correct room)
+        self.assertEqual(last_event, event_response["event_id"])
+
+    def test_activity_after_token_has_no_effect(self) -> None:
+        """
+        Test to make sure we return the last event before the token even if there is
+        activity after it.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response = self.helper.send(room_id1, "target!", tok=user1_tok)
+
+        after_room_token = self.event_sources.get_current_token()
+
+        # Send some events after the token
+        self.helper.send(room_id1, "after1", tok=user1_tok)
+        self.helper.send(room_id1, "after2", tok=user1_tok)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=after_room_token.room_key,
+            )
+        )
+
+        # Make sure it's the last event before the token
+        self.assertEqual(last_event, event_response["event_id"])
+
+    def test_last_event_within_sharded_token(self) -> None:
+        """
+        Test to make sure we can find the last event that that is *within* the sharded
+        token (a token that has an `instance_map` and looks like
+        `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing
+        that we can find an event within the tokens minimum and instance
+        `stream_ordering`.
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response1, event_pos1 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+        event_response2, event_pos2 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+        event_response3, event_pos3 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+
+        # Create another room to advance the `stream_ordering` on the same worker
+        # so we can sandwich event3 in the middle of the token
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response4, event_pos4 = self._send_event_on_instance(
+            "worker1", room_id2, user1_tok
+        )
+
+        # Assemble a token that encompasses event1 -> event4 on worker1
+        end_token = RoomStreamToken(
+            stream=event_pos2.stream,
+            instance_map=immutabledict({"worker1": event_pos4.stream}),
+        )
+
+        # Send some events after the token
+        self.helper.send(room_id1, "after1", tok=user1_tok)
+        self.helper.send(room_id1, "after2", tok=user1_tok)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=end_token,
+            )
+        )
+
+        # Should find closest event at/before the token in room1
+        self.assertEqual(
+            last_event,
+            event_response3["event_id"],
+            f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to "
+            + str(
+                {
+                    "event1": event_response1["event_id"],
+                    "event2": event_response2["event_id"],
+                    "event3": event_response3["event_id"],
+                }
+            ),
+        )
+
+    def test_last_event_before_sharded_token(self) -> None:
+        """
+        Test to make sure we can find the last event that is *before* the sharded token
+        (a token that has an `instance_map` and looks like
+        `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`).
+        """
+        user1_id = self.register_user("user1", "pass")
+        user1_tok = self.login(user1_id, "pass")
+
+        room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response1, event_pos1 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+        event_response2, event_pos2 = self._send_event_on_instance(
+            "worker1", room_id1, user1_tok
+        )
+
+        # Create another room to advance the `stream_ordering` on the same worker
+        room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True)
+        event_response3, event_pos3 = self._send_event_on_instance(
+            "worker1", room_id2, user1_tok
+        )
+        event_response4, event_pos4 = self._send_event_on_instance(
+            "worker1", room_id2, user1_tok
+        )
+
+        # Assemble a token that encompasses event3 -> event4 on worker1
+        end_token = RoomStreamToken(
+            stream=event_pos3.stream,
+            instance_map=immutabledict({"worker1": event_pos4.stream}),
+        )
+
+        # Send some events after the token
+        self.helper.send(room_id1, "after1", tok=user1_tok)
+        self.helper.send(room_id1, "after2", tok=user1_tok)
+
+        last_event = self.get_success(
+            self.store.get_last_event_in_room_before_stream_ordering(
+                room_id=room_id1,
+                end_token=end_token,
+            )
+        )
+
+        # Should find closest event at/before the token in room1
+        self.assertEqual(
+            last_event,
+            event_response2["event_id"],
+            f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to "
+            + str(
+                {
+                    "event1": event_response1["event_id"],
+                    "event2": event_response2["event_id"],
+                }
+            ),
+        )
diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py
index 156a610faa..c26932069f 100644
--- a/tests/storage/test_user_directory.py
+++ b/tests/storage/test_user_directory.py
@@ -711,6 +711,10 @@ class UserDirectoryICUTestCase(HomeserverTestCase):
             ),
         )
 
+        self.assertEqual(_parse_words_with_icu("user-1"), ["user-1"])
+        self.assertEqual(_parse_words_with_icu("user-ab"), ["user-ab"])
+        self.assertEqual(_parse_words_with_icu("user.--1"), ["user", "-1"])
+
     def test_regex_word_boundary_punctuation(self) -> None:
         """
         Tests the behaviour of punctuation with the non-ICU tokeniser
diff --git a/tests/test_visibility.py b/tests/test_visibility.py
index 3e2100eab4..89cbe4e54b 100644
--- a/tests/test_visibility.py
+++ b/tests/test_visibility.py
@@ -336,7 +336,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
                 self.hs.get_storage_controllers(),
                 "@joiner:test",
                 events_to_filter,
-                msc4115_membership_on_events=True,
             )
         )
         resident_filtered_events = self.get_success(
@@ -344,7 +343,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
                 self.hs.get_storage_controllers(),
                 "@resident:test",
                 events_to_filter,
-                msc4115_membership_on_events=True,
             )
         )
 
@@ -357,7 +355,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
         self.assertEqual(
             ["join", "join", "leave"],
             [
-                e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
+                e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
                 for e in joiner_filtered_events
             ],
         )
@@ -379,7 +377,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase):
         self.assertEqual(
             ["join", "join", "join", "join", "join"],
             [
-                e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
+                e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
                 for e in resident_filtered_events
             ],
         )
@@ -441,7 +439,6 @@ class FilterEventsOutOfBandEventsForClientTestCase(
                 self.hs.get_storage_controllers(),
                 "@user:test",
                 [invite_event, reject_event],
-                msc4115_membership_on_events=True,
             )
         )
         self.assertEqual(
@@ -451,7 +448,7 @@ class FilterEventsOutOfBandEventsForClientTestCase(
         self.assertEqual(
             ["invite", "leave"],
             [
-                e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP]
+                e.unsigned[EventUnsignedContentFields.MEMBERSHIP]
                 for e in filtered_events
             ],
         )
@@ -463,7 +460,6 @@ class FilterEventsOutOfBandEventsForClientTestCase(
                     self.hs.get_storage_controllers(),
                     "@other:test",
                     [invite_event, reject_event],
-                    msc4115_membership_on_events=True,
                 )
             ),
             [],