diff options
Diffstat (limited to 'tests')
21 files changed, 2456 insertions, 106 deletions
diff --git a/tests/events/test_utils.py b/tests/events/test_utils.py index d5ac66a6ed..30f8787758 100644 --- a/tests/events/test_utils.py +++ b/tests/events/test_utils.py @@ -625,6 +625,8 @@ class CloneEventTestCase(stdlib_unittest.TestCase): ) original.internal_metadata.stream_ordering = 1234 self.assertEqual(original.internal_metadata.stream_ordering, 1234) + original.internal_metadata.instance_name = "worker1" + self.assertEqual(original.internal_metadata.instance_name, "worker1") cloned = clone_event(original) cloned.unsigned["b"] = 3 @@ -632,6 +634,7 @@ class CloneEventTestCase(stdlib_unittest.TestCase): self.assertEqual(original.unsigned, {"a": 1, "b": 2}) self.assertEqual(cloned.unsigned, {"a": 1, "b": 3}) self.assertEqual(cloned.internal_metadata.stream_ordering, 1234) + self.assertEqual(cloned.internal_metadata.instance_name, "worker1") self.assertEqual(cloned.internal_metadata.txn_id, "txn") diff --git a/tests/federation/test_federation_media.py b/tests/federation/test_federation_media.py new file mode 100644 index 0000000000..1c89d19e99 --- /dev/null +++ b/tests/federation/test_federation_media.py @@ -0,0 +1,234 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# +# Originally licensed under the Apache License, Version 2.0: +# <http://www.apache.org/licenses/LICENSE-2.0>. +# +# [This file includes modifications made by New Vector Limited] +# +# +import io +import os +import shutil +import tempfile +from typing import Optional + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.media._base import FileInfo, Responder +from synapse.media.filepath import MediaFilePaths +from synapse.media.media_storage import MediaStorage +from synapse.media.storage_provider import ( + FileStorageProviderBackend, + StorageProviderWrapper, +) +from synapse.server import HomeServer +from synapse.storage.databases.main.media_repository import LocalMedia +from synapse.types import JsonDict, UserID +from synapse.util import Clock + +from tests import unittest +from tests.test_utils import SMALL_PNG +from tests.unittest import override_config + + +class FederationUnstableMediaDownloadsTest(unittest.FederatingHomeserverTestCase): + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + super().prepare(reactor, clock, hs) + self.test_dir = tempfile.mkdtemp(prefix="synapse-tests-") + self.addCleanup(shutil.rmtree, self.test_dir) + self.primary_base_path = os.path.join(self.test_dir, "primary") + self.secondary_base_path = os.path.join(self.test_dir, "secondary") + + hs.config.media.media_store_path = self.primary_base_path + + storage_providers = [ + StorageProviderWrapper( + FileStorageProviderBackend(hs, self.secondary_base_path), + store_local=True, + store_remote=False, + store_synchronous=True, + ) + ] + + self.filepaths = MediaFilePaths(self.primary_base_path) + self.media_storage = MediaStorage( + hs, self.primary_base_path, self.filepaths, storage_providers + ) + self.media_repo = hs.get_media_repository() + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": True}} + ) + def test_file_download(self) -> None: + content = io.BytesIO(b"file_to_stream") + content_uri = self.get_success( + self.media_repo.create_content( + "text/plain", + "test_upload", + content, + 46, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with a text file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + stripped = channel.text_body.split("\r\n" + "--" + boundary) + # TODO: the json object expected will change once MSC3911 is implemented, currently + # {} is returned for all requests as a placeholder (per MSC3196) + found_json = any( + "\r\nContent-Type: application/json\r\n{}" in field for field in stripped + ) + self.assertTrue(found_json) + + # check that text file and expected value exist + found_file = any( + "\r\nContent-Type: text/plain\r\nfile_to_stream" in field + for field in stripped + ) + self.assertTrue(found_file) + + content = io.BytesIO(SMALL_PNG) + content_uri = self.get_success( + self.media_repo.create_content( + "image/png", + "test_png_upload", + content, + 67, + UserID.from_string("@user_id:whatever.org"), + ) + ) + # test with an image file + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(200, channel.code) + + content_type = channel.headers.getRawHeaders("content-type") + assert content_type is not None + assert "multipart/mixed" in content_type[0] + assert "boundary" in content_type[0] + + # extract boundary + boundary = content_type[0].split("boundary=")[1] + # split on boundary and check that json field and expected value exist + body = channel.result.get("body") + assert body is not None + stripped_bytes = body.split(b"\r\n" + b"--" + boundary.encode("utf-8")) + found_json = any( + b"\r\nContent-Type: application/json\r\n{}" in field + for field in stripped_bytes + ) + self.assertTrue(found_json) + + # check that png file exists and matches what was uploaded + found_file = any(SMALL_PNG in field for field in stripped_bytes) + self.assertTrue(found_file) + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": False}} + ) + def test_disable_config(self) -> None: + content = io.BytesIO(b"file_to_stream") + content_uri = self.get_success( + self.media_repo.create_content( + "text/plain", + "test_upload", + content, + 46, + UserID.from_string("@user_id:whatever.org"), + ) + ) + channel = self.make_signed_federation_request( + "GET", + f"/_matrix/federation/unstable/org.matrix.msc3916/media/download/{content_uri.media_id}", + ) + self.pump() + self.assertEqual(404, channel.code) + self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED") + + +class FakeFileStorageProviderBackend: + """ + Fake storage provider stub with incompatible `fetch` signature for testing + """ + + def __init__(self, hs: "HomeServer", config: str): + self.hs = hs + self.cache_directory = hs.config.media.media_store_path + self.base_directory = config + + def __str__(self) -> str: + return "FakeFileStorageProviderBackend[%s]" % (self.base_directory,) + + async def fetch( + self, path: str, file_info: FileInfo, media_info: Optional[LocalMedia] = None + ) -> Optional[Responder]: + pass + + +TEST_DIR = tempfile.mkdtemp(prefix="synapse-tests-") + + +class FederationUnstableMediaEndpointCompatibilityTest( + unittest.FederatingHomeserverTestCase +): + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + super().prepare(reactor, clock, hs) + self.test_dir = TEST_DIR + self.addCleanup(shutil.rmtree, self.test_dir) + self.media_repo = hs.get_media_repository() + + def default_config(self) -> JsonDict: + config = super().default_config() + primary_base_path = os.path.join(TEST_DIR, "primary") + config["media_storage_providers"] = [ + { + "module": "tests.federation.test_federation_media.FakeFileStorageProviderBackend", + "store_local": "True", + "store_remote": "False", + "store_synchronous": "False", + "config": {"directory": primary_base_path}, + } + ] + return config + + @override_config( + {"experimental_features": {"msc3916_authenticated_media_enabled": True}} + ) + def test_incompatible_storage_provider_fails_to_load_endpoint(self) -> None: + channel = self.make_signed_federation_request( + "GET", + "/_matrix/federation/unstable/org.matrix.msc3916/media/download/xyz", + ) + self.pump() + self.assertEqual(404, channel.code) + self.assertEqual(channel.json_body.get("errcode"), "M_UNRECOGNIZED") diff --git a/tests/handlers/test_oauth_delegation.py b/tests/handlers/test_oauth_delegation.py index 9387d07de8..036c539db2 100644 --- a/tests/handlers/test_oauth_delegation.py +++ b/tests/handlers/test_oauth_delegation.py @@ -541,6 +541,8 @@ class MSC3861OAuthDelegation(HomeserverTestCase): self.assertEqual(channel.code, 200, channel.json_body) + # Try uploading *different* keys; it should cause a 501 error. + keys_upload_body = self.make_device_keys(USER_ID, DEVICE) channel = self.make_request( "POST", "/_matrix/client/v3/keys/device_signing/upload", diff --git a/tests/handlers/test_sliding_sync.py b/tests/handlers/test_sliding_sync.py new file mode 100644 index 0000000000..62fe1214fe --- /dev/null +++ b/tests/handlers/test_sliding_sync.py @@ -0,0 +1,1246 @@ +# +# This file is licensed under the Affero General Public License (AGPL) version 3. +# +# Copyright (C) 2024 New Vector, Ltd +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# See the GNU Affero General Public License for more details: +# <https://www.gnu.org/licenses/agpl-3.0.html>. +# +# Originally licensed under the Apache License, Version 2.0: +# <http://www.apache.org/licenses/LICENSE-2.0>. +# +# [This file includes modifications made by New Vector Limited] +# +# +import logging +from unittest.mock import patch + +from twisted.test.proto_helpers import MemoryReactor + +from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership +from synapse.api.room_versions import RoomVersions +from synapse.handlers.sliding_sync import SlidingSyncConfig +from synapse.rest import admin +from synapse.rest.client import knock, login, room +from synapse.server import HomeServer +from synapse.storage.util.id_generators import MultiWriterIdGenerator +from synapse.types import JsonDict, UserID +from synapse.util import Clock + +from tests.replication._base import BaseMultiWorkerStreamTestCase +from tests.unittest import HomeserverTestCase + +logger = logging.getLogger(__name__) + + +class GetSyncRoomIdsForUserTestCase(HomeserverTestCase): + """ + Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it returns + the correct list of rooms IDs. + """ + + servlets = [ + admin.register_servlets, + knock.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def test_no_rooms(self) -> None: + """ + Test when the user has never joined any rooms before + """ + user1_id = self.register_user("user1", "pass") + # user1_tok = self.login(user1_id, "pass") + + now_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=now_token, + to_token=now_token, + ) + ) + + self.assertEqual(room_id_results, set()) + + def test_get_newly_joined_room(self) -> None: + """ + Test that rooms that the user has newly_joined show up. newly_joined is when you + join after the `from_token` and <= `to_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room_token, + to_token=after_room_token, + ) + ) + + self.assertEqual(room_id_results, {room_id}) + + def test_get_already_joined_room(self) -> None: + """ + Test that rooms that the user is already joined show up. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room_token, + to_token=after_room_token, + ) + ) + + self.assertEqual(room_id_results, {room_id}) + + def test_get_invited_banned_knocked_room(self) -> None: + """ + Test that rooms that the user is invited to, banned from, and knocked on show + up. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + # Setup the invited room (user2 invites user1 to the room) + invited_room_id = self.helper.create_room_as(user2_id, tok=user2_tok) + self.helper.invite(invited_room_id, targ=user1_id, tok=user2_tok) + + # Setup the ban room (user2 bans user1 from the room) + ban_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(ban_room_id, user1_id, tok=user1_tok) + self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok) + + # Setup the knock room (user1 knocks on the room) + knock_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, room_version=RoomVersions.V7.identifier + ) + self.helper.send_state( + knock_room_id, + EventTypes.JoinRules, + {"join_rule": JoinRules.KNOCK}, + tok=user2_tok, + ) + # User1 knocks on the room + channel = self.make_request( + "POST", + "/_matrix/client/r0/knock/%s" % (knock_room_id,), + b"{}", + user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + after_room_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room_token, + to_token=after_room_token, + ) + ) + + # Ensure that the invited, ban, and knock rooms show up + self.assertEqual( + room_id_results, + { + invited_room_id, + ban_room_id, + knock_room_id, + }, + ) + + def test_get_kicked_room(self) -> None: + """ + Test that a room that the user was kicked from still shows up. When the user + comes back to their client, they should see that they were kicked. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Setup the kick room (user2 kicks user1 from the room) + kick_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + # Kick user1 from the room + self.helper.change_membership( + room=kick_room_id, + src=user2_id, + targ=user1_id, + tok=user2_tok, + membership=Membership.LEAVE, + extra_data={ + "reason": "Bad manners", + }, + ) + + after_kick_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_kick_token, + to_token=after_kick_token, + ) + ) + + # The kicked room should show up + self.assertEqual(room_id_results, {kick_room_id}) + + def test_forgotten_rooms(self) -> None: + """ + Forgotten rooms do not show up even if we forget after the from/to range. + + Ideally, we would be able to track when the `/forget` happens and apply it + accordingly in the token range but the forgotten flag is only an extra bool in + the `room_memberships` table. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Setup a normal room that we leave. This won't show up in the sync response + # because we left it before our token but is good to check anyway. + leave_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(leave_room_id, user1_id, tok=user1_tok) + self.helper.leave(leave_room_id, user1_id, tok=user1_tok) + + # Setup the ban room (user2 bans user1 from the room) + ban_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(ban_room_id, user1_id, tok=user1_tok) + self.helper.ban(ban_room_id, src=user2_id, targ=user1_id, tok=user2_tok) + + # Setup the kick room (user2 kicks user1 from the room) + kick_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + # Kick user1 from the room + self.helper.change_membership( + room=kick_room_id, + src=user2_id, + targ=user1_id, + tok=user2_tok, + membership=Membership.LEAVE, + extra_data={ + "reason": "Bad manners", + }, + ) + + before_room_forgets = self.event_sources.get_current_token() + + # Forget the room after we already have our tokens. This doesn't change + # the membership event itself but will mark it internally in Synapse + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{leave_room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{ban_room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + channel = self.make_request( + "POST", + f"/_matrix/client/r0/rooms/{kick_room_id}/forget", + content={}, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.result) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room_forgets, + to_token=before_room_forgets, + ) + ) + + # We shouldn't see the room because it was forgotten + self.assertEqual(room_id_results, set()) + + def test_only_newly_left_rooms_show_up(self) -> None: + """ + Test that newly_left rooms still show up in the sync response but rooms that + were left before the `from_token` don't show up. See condition "2)" comments in + the `get_sync_room_ids_for_user` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + # Leave before we calculate the `from_token` + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave during the from_token/to_token range (newly_left) + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + self.helper.leave(room_id2, user1_id, tok=user1_tok) + + after_room2_token = self.event_sources.get_current_token() + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room2_token, + ) + ) + + # Only the newly_left room should show up + self.assertEqual(room_id_results, {room_id2}) + + def test_no_joins_after_to_token(self) -> None: + """ + Rooms we join after the `to_token` should *not* show up. See condition "1b)" + comments in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Room join after after our `to_token` shouldn't show up + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok) + _ = room_id2 + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + self.assertEqual(room_id_results, {room_id1}) + + def test_join_during_range_and_left_room_after_to_token(self) -> None: + """ + Room still shows up if we left the room but were joined during the + from_token/to_token. See condition "1a)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave the room after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # We should still see the room because we were joined during the + # from_token/to_token time period. + self.assertEqual(room_id_results, {room_id1}) + + def test_join_before_range_and_left_room_after_to_token(self) -> None: + """ + Room still shows up if we left the room but were joined before the `from_token` + so it should show up. See condition "1a)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave the room after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # We should still see the room because we were joined before the `from_token` + self.assertEqual(room_id_results, {room_id1}) + + def test_kicked_before_range_and_left_after_to_token(self) -> None: + """ + Room still shows up if we left the room but were kicked before the `from_token` + so it should show up. See condition "1a)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Setup the kick room (user2 kicks user1 from the room) + kick_room_id = self.helper.create_room_as( + user2_id, tok=user2_tok, is_public=True + ) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + # Kick user1 from the room + self.helper.change_membership( + room=kick_room_id, + src=user2_id, + targ=user1_id, + tok=user2_tok, + membership=Membership.LEAVE, + extra_data={ + "reason": "Bad manners", + }, + ) + + after_kick_token = self.event_sources.get_current_token() + + # Leave the room after we already have our tokens + # + # We have to join before we can leave (leave -> leave isn't a valid transition + # or at least it doesn't work in Synapse, 403 forbidden) + self.helper.join(kick_room_id, user1_id, tok=user1_tok) + self.helper.leave(kick_room_id, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_kick_token, + to_token=after_kick_token, + ) + ) + + # We shouldn't see the room because it was forgotten + self.assertEqual(room_id_results, {kick_room_id}) + + def test_newly_left_during_range_and_join_leave_after_to_token(self) -> None: + """ + Newly left room should show up. But we're also testing that joining and leaving + after the `to_token` doesn't mess with the results. See condition "2)" and "1a)" + comments in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room during the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should still show up because it's newly_left during the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_newly_left_during_range_and_join_after_to_token(self) -> None: + """ + Newly left room should show up. But we're also testing that joining after the + `to_token` doesn't mess with the results. See condition "2)" and "1b)" comments + in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room during the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should still show up because it's newly_left during the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_no_from_token(self) -> None: + """ + Test that if we don't provide a `from_token`, we get all the rooms that we we're + joined to up to the `to_token`. + + Providing `from_token` only really has the effect that it adds `newly_left` + rooms to the response. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Join room1 + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Join and leave the room2 before the `to_token` + self.helper.join(room_id2, user1_id, tok=user1_tok) + self.helper.leave(room_id2, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join the room2 after we already have our tokens + self.helper.join(room_id2, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=None, + to_token=after_room1_token, + ) + ) + + # Only rooms we were joined to before the `to_token` should show up + self.assertEqual(room_id_results, {room_id1}) + + def test_from_token_ahead_of_to_token(self) -> None: + """ + Test when the provided `from_token` comes after the `to_token`. We should + basically expect the same result as having no `from_token`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id4 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Join room1 before `before_room_token` + self.helper.join(room_id1, user1_id, tok=user1_tok) + + # Join and leave the room2 before `before_room_token` + self.helper.join(room_id2, user1_id, tok=user1_tok) + self.helper.leave(room_id2, user1_id, tok=user1_tok) + + # Note: These are purposely swapped. The `from_token` should come after + # the `to_token` in this test + to_token = self.event_sources.get_current_token() + + # Join room2 after `before_room_token` + self.helper.join(room_id2, user1_id, tok=user1_tok) + + # -------- + + # Join room3 after `before_room_token` + self.helper.join(room_id3, user1_id, tok=user1_tok) + + # Join and leave the room4 after `before_room_token` + self.helper.join(room_id4, user1_id, tok=user1_tok) + self.helper.leave(room_id4, user1_id, tok=user1_tok) + + # Note: These are purposely swapped. The `from_token` should come after the + # `to_token` in this test + from_token = self.event_sources.get_current_token() + + # Join the room4 after we already have our tokens + self.helper.join(room_id4, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=from_token, + to_token=to_token, + ) + ) + + # Only rooms we were joined to before the `to_token` should show up + # + # There won't be any newly_left rooms because the `from_token` is ahead of the + # `to_token` and that range will give no membership changes to check. + self.assertEqual(room_id_results, {room_id1}) + + def test_leave_before_range_and_join_leave_after_to_token(self) -> None: + """ + Old left room shouldn't show up. But we're also testing that joining and leaving + after the `to_token` doesn't mess with the results. See condition "1a)" comments + in the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room shouldn't show up because it was left before the `from_token` + self.assertEqual(room_id_results, set()) + + def test_leave_before_range_and_join_after_to_token(self) -> None: + """ + Old left room shouldn't show up. But we're also testing that joining after the + `to_token` doesn't mess with the results. See condition "1b)" comments in the + `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join and leave the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room shouldn't show up because it was left before the `from_token` + self.assertEqual(room_id_results, set()) + + def test_join_leave_multiple_times_during_range_and_after_to_token( + self, + ) -> None: + """ + Join and leave multiple times shouldn't affect rooms from showing up. It just + matters that we were joined or newly_left in the from/to range. But we're also + testing that joining and leaving after the `to_token` doesn't mess with the + results. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + before_room1_token = self.event_sources.get_current_token() + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join, leave, join back to the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave and Join the room multiple times after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because it was newly_left and joined during the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_join_leave_multiple_times_before_range_and_after_to_token( + self, + ) -> None: + """ + Join and leave multiple times before the from/to range shouldn't affect rooms + from showing up. It just matters that we were joined or newly_left in the + from/to range. But we're also testing that joining and leaving after the + `to_token` doesn't mess with the results. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + # Join, leave, join back to the room before the from/to range + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Leave and Join the room multiple times after we already have our tokens + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were joined before the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_invite_before_range_and_join_leave_after_to_token( + self, + ) -> None: + """ + Make it look like we joined after the token range but we were invited before the + from/to range so the room should still show up. See condition "1a)" comments in + the `get_sync_room_ids_for_user()` method. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Invited to the room before the token + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + + after_room1_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=after_room1_token, + to_token=after_room1_token, + ) + ) + + # Room should show up because we were invited before the from/to range + self.assertEqual(room_id_results, {room_id1}) + + def test_multiple_rooms_are_not_confused( + self, + ) -> None: + """ + Test that multiple rooms are not confused as we fixup the list. This test is + spawning from a real world bug in the code where I was accidentally using + `event.room_id` in one of the fix-up loops but the `event` being referenced was + actually from a different loop. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # We create the room with user2 so the room isn't left with no members when we + # leave and can still re-join. + room_id1 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + room_id2 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + + # Invited and left the room before the token + self.helper.invite(room_id1, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + # Invited to room2 + self.helper.invite(room_id2, src=user2_id, targ=user1_id, tok=user2_tok) + + before_room3_token = self.event_sources.get_current_token() + + # Invited and left room3 during the from/to range + room_id3 = self.helper.create_room_as(user2_id, tok=user2_tok, is_public=True) + self.helper.invite(room_id3, src=user2_id, targ=user1_id, tok=user2_tok) + self.helper.leave(room_id3, user1_id, tok=user1_tok) + + after_room3_token = self.event_sources.get_current_token() + + # Join and leave the room after we already have our tokens + self.helper.join(room_id1, user1_id, tok=user1_tok) + self.helper.leave(room_id1, user1_id, tok=user1_tok) + # Leave room2 + self.helper.leave(room_id2, user1_id, tok=user1_tok) + # Leave room3 + self.helper.leave(room_id3, user1_id, tok=user1_tok) + + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_room3_token, + to_token=after_room3_token, + ) + ) + + self.assertEqual( + room_id_results, + { + # `room_id1` shouldn't show up because we left before the from/to range + # + # Room should show up because we were invited before the from/to range + room_id2, + # Room should show up because it was newly_left during the from/to range + room_id3, + }, + ) + + +class GetSyncRoomIdsForUserEventShardTestCase(BaseMultiWorkerStreamTestCase): + """ + Tests Sliding Sync handler `get_sync_room_ids_for_user()` to make sure it works with + sharded event stream_writers enabled + """ + + servlets = [ + admin.register_servlets_for_client_rest_resource, + room.register_servlets, + login.register_servlets, + ] + + def default_config(self) -> dict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + + # Enable shared event stream_writers + config["stream_writers"] = {"events": ["worker1", "worker2", "worker3"]} + config["instance_map"] = { + "main": {"host": "testserv", "port": 8765}, + "worker1": {"host": "testserv", "port": 1001}, + "worker2": {"host": "testserv", "port": 1002}, + "worker3": {"host": "testserv", "port": 1003}, + } + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _create_room(self, room_id: str, user_id: str, tok: str) -> None: + """ + Create a room with a specific room_id. We use this so that that we have a + consistent room_id across test runs that hashes to the same value and will be + sharded to a known worker in the tests. + """ + + # We control the room ID generation by patching out the + # `_generate_room_id` method + with patch( + "synapse.handlers.room.RoomCreationHandler._generate_room_id" + ) as mock: + mock.side_effect = lambda: room_id + self.helper.create_room_as(user_id, tok=tok) + + def test_sharded_event_persisters(self) -> None: + """ + This test should catch bugs that would come from flawed stream position + (`stream_ordering`) comparisons or making `RoomStreamToken`'s naively. To + compare event positions properly, you need to consider both the `instance_name` + and `stream_ordering` together. + + The test creates three event persister workers and a room that is sharded to + each worker. On worker2, we make the event stream position stuck so that it lags + behind the other workers and we start getting `RoomStreamToken` that have an + `instance_map` component (i.e. q`m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). + + We then send some events to advance the stream positions of worker1 and worker3 + but worker2 is lagging behind because it's stuck. We are specifically testing + that `get_sync_room_ids_for_user(from_token=xxx, to_token=xxx)` should work + correctly in these adverse conditions. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "worker1"}, + ) + + worker_hs2 = self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "worker2"}, + ) + + self.make_worker_hs( + "synapse.app.generic_worker", + {"worker_name": "worker3"}, + ) + + # Specially crafted room IDs that get persisted on different workers. + # + # Sharded to worker1 + room_id1 = "!fooo:test" + # Sharded to worker2 + room_id2 = "!bar:test" + # Sharded to worker3 + room_id3 = "!quux:test" + + # Create rooms on the different workers. + self._create_room(room_id1, user2_id, user2_tok) + self._create_room(room_id2, user2_id, user2_tok) + self._create_room(room_id3, user2_id, user2_tok) + join_response1 = self.helper.join(room_id1, user1_id, tok=user1_tok) + join_response2 = self.helper.join(room_id2, user1_id, tok=user1_tok) + # Leave room2 + self.helper.leave(room_id2, user1_id, tok=user1_tok) + join_response3 = self.helper.join(room_id3, user1_id, tok=user1_tok) + # Leave room3 + self.helper.leave(room_id3, user1_id, tok=user1_tok) + + # Ensure that the events were sharded to different workers. + pos1 = self.get_success( + self.store.get_position_for_event(join_response1["event_id"]) + ) + self.assertEqual(pos1.instance_name, "worker1") + pos2 = self.get_success( + self.store.get_position_for_event(join_response2["event_id"]) + ) + self.assertEqual(pos2.instance_name, "worker2") + pos3 = self.get_success( + self.store.get_position_for_event(join_response3["event_id"]) + ) + self.assertEqual(pos3.instance_name, "worker3") + + before_stuck_activity_token = self.event_sources.get_current_token() + + # We now gut wrench into the events stream `MultiWriterIdGenerator` on worker2 to + # mimic it getting stuck persisting an event. This ensures that when we send an + # event on worker1/worker3 we end up in a state where worker2 events stream + # position lags that on worker1/worker3, resulting in a RoomStreamToken with a + # non-empty `instance_map` component. + # + # Worker2's event stream position will not advance until we call `__aexit__` + # again. + worker_store2 = worker_hs2.get_datastores().main + assert isinstance(worker_store2._stream_id_gen, MultiWriterIdGenerator) + actx = worker_store2._stream_id_gen.get_next() + self.get_success(actx.__aenter__()) + + # For room_id1/worker1: leave and join the room to advance the stream position + # and generate membership changes. + self.helper.leave(room_id1, user1_id, tok=user1_tok) + self.helper.join(room_id1, user1_id, tok=user1_tok) + # For room_id2/worker2: which is currently stuck, join the room. + join_on_worker2_response = self.helper.join(room_id2, user1_id, tok=user1_tok) + # For room_id3/worker3: leave and join the room to advance the stream position + # and generate membership changes. + self.helper.leave(room_id3, user1_id, tok=user1_tok) + join_on_worker3_response = self.helper.join(room_id3, user1_id, tok=user1_tok) + + # Get a token while things are stuck after our activity + stuck_activity_token = self.event_sources.get_current_token() + logger.info("stuck_activity_token %s", stuck_activity_token) + # Let's make sure we're working with a token that has an `instance_map` + self.assertNotEqual(len(stuck_activity_token.room_key.instance_map), 0) + + # Just double check that the join event on worker2 (that is stuck) happened + # after the position recorded for worker2 in the token but before the max + # position in the token. This is crucial for the behavior we're trying to test. + join_on_worker2_pos = self.get_success( + self.store.get_position_for_event(join_on_worker2_response["event_id"]) + ) + logger.info("join_on_worker2_pos %s", join_on_worker2_pos) + # Ensure the join technially came after our token + self.assertGreater( + join_on_worker2_pos.stream, + stuck_activity_token.room_key.get_stream_pos_for_instance("worker2"), + ) + # But less than the max stream position of some other worker + self.assertLess( + join_on_worker2_pos.stream, + # max + stuck_activity_token.room_key.get_max_stream_pos(), + ) + + # Just double check that the join event on worker3 happened after the min stream + # value in the token but still within the position recorded for worker3. This is + # crucial for the behavior we're trying to test. + join_on_worker3_pos = self.get_success( + self.store.get_position_for_event(join_on_worker3_response["event_id"]) + ) + logger.info("join_on_worker3_pos %s", join_on_worker3_pos) + # Ensure the join came after the min but still encapsulated by the token + self.assertGreaterEqual( + join_on_worker3_pos.stream, + # min + stuck_activity_token.room_key.stream, + ) + self.assertLessEqual( + join_on_worker3_pos.stream, + stuck_activity_token.room_key.get_stream_pos_for_instance("worker3"), + ) + + # We finish the fake persisting an event we started above and advance worker2's + # event stream position (unstuck worker2). + self.get_success(actx.__aexit__(None, None, None)) + + # The function under test + room_id_results = self.get_success( + self.sliding_sync_handler.get_sync_room_ids_for_user( + UserID.from_string(user1_id), + from_token=before_stuck_activity_token, + to_token=stuck_activity_token, + ) + ) + + self.assertEqual( + room_id_results, + { + room_id1, + # room_id2 shouldn't show up because we left before the from/to range + # and the join event during the range happened while worker2 was stuck. + # This means that from the perspective of the master, where the + # `stuck_activity_token` is generated, the stream position for worker2 + # wasn't advanced to the join yet. Looking at the `instance_map`, the + # join technically comes after `stuck_activity_token``. + # + # room_id2, + room_id3, + }, + ) + + +class FilterRoomsTestCase(HomeserverTestCase): + """ + Tests Sliding Sync handler `filter_rooms()` to make sure it includes/excludes rooms + correctly. + """ + + servlets = [ + admin.register_servlets, + knock.register_servlets, + login.register_servlets, + room.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.sliding_sync_handler = self.hs.get_sliding_sync_handler() + self.store = self.hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _create_dm_room( + self, + inviter_user_id: str, + inviter_tok: str, + invitee_user_id: str, + invitee_tok: str, + ) -> str: + """ + Helper to create a DM room as the "inviter" and invite the "invitee" user to the room. The + "invitee" user also will join the room. The `m.direct` account data will be set + for both users. + """ + + # Create a room and send an invite the other user + room_id = self.helper.create_room_as( + inviter_user_id, + is_public=False, + tok=inviter_tok, + ) + self.helper.invite( + room_id, + src=inviter_user_id, + targ=invitee_user_id, + tok=inviter_tok, + extra_data={"is_direct": True}, + ) + # Person that was invited joins the room + self.helper.join(room_id, invitee_user_id, tok=invitee_tok) + + # Mimic the client setting the room as a direct message in the global account + # data + self.get_success( + self.store.add_account_data_for_user( + invitee_user_id, + AccountDataTypes.DIRECT, + {inviter_user_id: [room_id]}, + ) + ) + self.get_success( + self.store.add_account_data_for_user( + inviter_user_id, + AccountDataTypes.DIRECT, + {invitee_user_id: [room_id]}, + ) + ) + + return room_id + + def test_filter_dm_rooms(self) -> None: + """ + Test `filter.is_dm` for DM rooms + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create a normal room + room_id = self.helper.create_room_as( + user1_id, + is_public=False, + tok=user1_tok, + ) + + # Create a DM room + dm_room_id = self._create_dm_room( + inviter_user_id=user1_id, + inviter_tok=user1_tok, + invitee_user_id=user2_id, + invitee_tok=user2_tok, + ) + + after_rooms_token = self.event_sources.get_current_token() + + # Try with `is_dm=True` + truthy_filtered_room_ids = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + {room_id, dm_room_id}, + SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=True, + ), + after_rooms_token, + ) + ) + + self.assertEqual(truthy_filtered_room_ids, {dm_room_id}) + + # Try with `is_dm=False` + falsy_filtered_room_ids = self.get_success( + self.sliding_sync_handler.filter_rooms( + UserID.from_string(user1_id), + {room_id, dm_room_id}, + SlidingSyncConfig.SlidingSyncList.Filters( + is_dm=False, + ), + after_rooms_token, + ) + ) + + self.assertEqual(falsy_filtered_room_ids, {room_id}) diff --git a/tests/handlers/test_user_directory.py b/tests/handlers/test_user_directory.py index 77c6cac449..878d9683b6 100644 --- a/tests/handlers/test_user_directory.py +++ b/tests/handlers/test_user_directory.py @@ -1061,6 +1061,45 @@ class UserDirectoryTestCase(unittest.HomeserverTestCase): {alice: ProfileInfo(display_name=None, avatar_url=MXC_DUMMY)}, ) + def test_search_punctuation(self) -> None: + """Test that you can search for a user that includes punctuation""" + + searching_user = self.register_user("searcher", "password") + searching_user_tok = self.login("searcher", "password") + + room_id = self.helper.create_room_as( + searching_user, + room_version=RoomVersions.V1.identifier, + tok=searching_user_tok, + ) + + # We want to test searching for users of the form e.g. "user-1", with + # various punctuation. We also test both where the prefix is numeric and + # alphanumeric, as e.g. postgres tokenises "user-1" as "user" and "-1". + i = 1 + for char in ["-", ".", "_"]: + for use_numeric in [False, True]: + if use_numeric: + prefix1 = f"{i}" + prefix2 = f"{i+1}" + else: + prefix1 = f"a{i}" + prefix2 = f"a{i+1}" + + local_user_1 = self.register_user(f"user{char}{prefix1}", "password") + local_user_2 = self.register_user(f"user{char}{prefix2}", "password") + + self._add_user_to_room(room_id, RoomVersions.V1, local_user_1) + self._add_user_to_room(room_id, RoomVersions.V1, local_user_2) + + results = self.get_success( + self.handler.search_users(searching_user, local_user_1, 20) + )["results"] + received_user_id_ordering = [result["user_id"] for result in results] + self.assertSequenceEqual(received_user_id_ordering[:1], [local_user_1]) + + i += 2 + class TestUserDirSearchDisabled(unittest.HomeserverTestCase): servlets = [ diff --git a/tests/media/test_media_storage.py b/tests/media/test_media_storage.py index 1bd51ceba2..47a89e9c66 100644 --- a/tests/media/test_media_storage.py +++ b/tests/media/test_media_storage.py @@ -25,7 +25,7 @@ import tempfile from binascii import unhexlify from io import BytesIO from typing import Any, BinaryIO, ClassVar, Dict, List, Optional, Tuple, Union -from unittest.mock import Mock +from unittest.mock import MagicMock, Mock, patch from urllib import parse import attr @@ -37,16 +37,22 @@ from twisted.internet import defer from twisted.internet.defer import Deferred from twisted.python.failure import Failure from twisted.test.proto_helpers import MemoryReactor +from twisted.web.http_headers import Headers +from twisted.web.iweb import UNKNOWN_LENGTH, IResponse from twisted.web.resource import Resource from synapse.api.errors import Codes, HttpResponseException +from synapse.api.ratelimiting import Ratelimiter from synapse.events import EventBase from synapse.http.types import QueryParams from synapse.logging.context import make_deferred_yieldable from synapse.media._base import FileInfo, ThumbnailInfo from synapse.media.filepath import MediaFilePaths from synapse.media.media_storage import MediaStorage, ReadableFileWrapper -from synapse.media.storage_provider import FileStorageProviderBackend +from synapse.media.storage_provider import ( + FileStorageProviderBackend, + StorageProviderWrapper, +) from synapse.media.thumbnailer import ThumbnailProvider from synapse.module_api import ModuleApi from synapse.module_api.callbacks.spamchecker_callbacks import load_legacy_spam_checkers @@ -59,6 +65,7 @@ from synapse.util import Clock from tests import unittest from tests.server import FakeChannel from tests.test_utils import SMALL_PNG +from tests.unittest import override_config from tests.utils import default_config @@ -74,7 +81,14 @@ class MediaStorageTests(unittest.HomeserverTestCase): hs.config.media.media_store_path = self.primary_base_path - storage_providers = [FileStorageProviderBackend(hs, self.secondary_base_path)] + storage_providers = [ + StorageProviderWrapper( + FileStorageProviderBackend(hs, self.secondary_base_path), + store_local=True, + store_remote=False, + store_synchronous=True, + ) + ] self.filepaths = MediaFilePaths(self.primary_base_path) self.media_storage = MediaStorage( @@ -251,9 +265,11 @@ class MediaRepoTests(unittest.HomeserverTestCase): destination: str, path: str, output_stream: BinaryIO, + download_ratelimiter: Ratelimiter, + ip_address: Any, + max_size: int, args: Optional[QueryParams] = None, retry_on_dns_fail: bool = True, - max_size: Optional[int] = None, ignore_backoff: bool = False, follow_redirects: bool = False, ) -> "Deferred[Tuple[int, Dict[bytes, List[bytes]]]]": @@ -878,3 +894,218 @@ class SpamCheckerTestCase(unittest.HomeserverTestCase): tok=self.tok, expect_code=400, ) + + +class RemoteDownloadLimiterTestCase(unittest.HomeserverTestCase): + def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer: + config = self.default_config() + + self.storage_path = self.mktemp() + self.media_store_path = self.mktemp() + os.mkdir(self.storage_path) + os.mkdir(self.media_store_path) + config["media_store_path"] = self.media_store_path + + provider_config = { + "module": "synapse.media.storage_provider.FileStorageProviderBackend", + "store_local": True, + "store_synchronous": False, + "store_remote": True, + "config": {"directory": self.storage_path}, + } + + config["media_storage_providers"] = [provider_config] + + return self.setup_test_homeserver(config=config) + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.repo = hs.get_media_repository() + self.client = hs.get_federation_http_client() + self.store = hs.get_datastores().main + + def create_resource_dict(self) -> Dict[str, Resource]: + # We need to manually set the resource tree to include media, the + # default only does `/_matrix/client` APIs. + return {"/_matrix/media": self.hs.get_media_repository_resource()} + + # mock actually reading file body + def read_body_with_max_size_30MiB(*args: Any, **kwargs: Any) -> Deferred: + d: Deferred = defer.Deferred() + d.callback(31457280) + return d + + def read_body_with_max_size_50MiB(*args: Any, **kwargs: Any) -> Deferred: + d: Deferred = defer.Deferred() + d.callback(52428800) + return d + + @patch( + "synapse.http.matrixfederationclient.read_body_with_max_size", + read_body_with_max_size_30MiB, + ) + def test_download_ratelimit_default(self) -> None: + """ + Test remote media download ratelimiting against default configuration - 500MB bucket + and 87kb/second drain rate + """ + + # mock out actually sending the request, returns a 30MiB response + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = 31457280 + resp.headers = Headers({"Content-Type": ["application/octet-stream"]}) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + # first request should go through + channel = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyz", + shorthand=False, + ) + assert channel.code == 200 + + # next 15 should go through + for i in range(15): + channel2 = self.make_request( + "GET", + f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}", + shorthand=False, + ) + assert channel2.code == 200 + + # 17th will hit ratelimit + channel3 = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx", + shorthand=False, + ) + assert channel3.code == 429 + + # however, a request from a different IP will go through + channel4 = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyz", + shorthand=False, + client_ip="187.233.230.159", + ) + assert channel4.code == 200 + + # at 87Kib/s it should take about 2 minutes for enough to drain from bucket that another + # 30MiB download is authorized - The last download was blocked at 503,316,480. + # The next download will be authorized when bucket hits 492,830,720 + # (524,288,000 total capacity - 31,457,280 download size) so 503,316,480 - 492,830,720 ~= 10,485,760 + # needs to drain before another download will be authorized, that will take ~= + # 2 minutes (10,485,760/89,088/60) + self.reactor.pump([2.0 * 60.0]) + + # enough has drained and next request goes through + channel5 = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyb", + shorthand=False, + ) + assert channel5.code == 200 + + @override_config( + { + "remote_media_download_per_second": "50M", + "remote_media_download_burst_count": "50M", + } + ) + @patch( + "synapse.http.matrixfederationclient.read_body_with_max_size", + read_body_with_max_size_50MiB, + ) + def test_download_rate_limit_config(self) -> None: + """ + Test that download rate limit config options are correctly picked up and applied + """ + + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = 52428800 + resp.headers = Headers({"Content-Type": ["application/octet-stream"]}) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + # first request should go through + channel = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyz", + shorthand=False, + ) + assert channel.code == 200 + + # immediate second request should fail + channel = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy1", + shorthand=False, + ) + assert channel.code == 429 + + # advance half a second + self.reactor.pump([0.5]) + + # request still fails + channel = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy2", + shorthand=False, + ) + assert channel.code == 429 + + # advance another half second + self.reactor.pump([0.5]) + + # enough has drained from bucket and request is successful + channel = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy3", + shorthand=False, + ) + assert channel.code == 200 + + @patch( + "synapse.http.matrixfederationclient.read_body_with_max_size", + read_body_with_max_size_30MiB, + ) + def test_download_ratelimit_max_size_sub(self) -> None: + """ + Test that if no content-length is provided, the default max size is applied instead + """ + + # mock out actually sending the request + async def _send_request(*args: Any, **kwargs: Any) -> IResponse: + resp = MagicMock(spec=IResponse) + resp.code = 200 + resp.length = UNKNOWN_LENGTH + resp.headers = Headers({"Content-Type": ["application/octet-stream"]}) + resp.phrase = b"OK" + return resp + + self.client._send_request = _send_request # type: ignore + + # ten requests should go through using the max size (500MB/50MB) + for i in range(10): + channel2 = self.make_request( + "GET", + f"/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxy{i}", + shorthand=False, + ) + assert channel2.code == 200 + + # eleventh will hit ratelimit + channel3 = self.make_request( + "GET", + "/_matrix/media/v3/download/remote.org/abcdefghijklmnopqrstuvwxyx", + shorthand=False, + ) + assert channel3.code == 429 diff --git a/tests/push/test_email.py b/tests/push/test_email.py index c927a73fa6..e0aab1c046 100644 --- a/tests/push/test_email.py +++ b/tests/push/test_email.py @@ -205,8 +205,24 @@ class EmailPusherTests(HomeserverTestCase): # Multipart: plain text, base 64 encoded; html, base 64 encoded multipart_msg = email.message_from_bytes(msg) - txt = multipart_msg.get_payload()[0].get_payload(decode=True).decode() - html = multipart_msg.get_payload()[1].get_payload(decode=True).decode() + + # Extract the text (non-HTML) portion of the multipart Message, + # as a Message. + txt_message = multipart_msg.get_payload(i=0) + assert isinstance(txt_message, email.message.Message) + + # Extract the actual bytes from the Message object, and decode them to a `str`. + txt_bytes = txt_message.get_payload(decode=True) + assert isinstance(txt_bytes, bytes) + txt = txt_bytes.decode() + + # Do the same for the HTML portion of the multipart Message. + html_message = multipart_msg.get_payload(i=1) + assert isinstance(html_message, email.message.Message) + html_bytes = html_message.get_payload(decode=True) + assert isinstance(html_bytes, bytes) + html = html_bytes.decode() + self.assertIn("/_synapse/client/unsubscribe", txt) self.assertIn("/_synapse/client/unsubscribe", html) @@ -347,12 +363,17 @@ class EmailPusherTests(HomeserverTestCase): # That email should contain the room's avatar msg: bytes = args[5] # Multipart: plain text, base 64 encoded; html, base 64 encoded - html = ( - email.message_from_bytes(msg) - .get_payload()[1] - .get_payload(decode=True) - .decode() - ) + + # Extract the html Message object from the Multipart Message. + # We need the asserts to convince mypy that this is OK. + html_message = email.message_from_bytes(msg).get_payload(i=1) + assert isinstance(html_message, email.message.Message) + + # Extract the `bytes` from the html Message object, and decode to a `str`. + html = html_message.get_payload(decode=True) + assert isinstance(html, bytes) + html = html.decode() + self.assertIn("_matrix/media/v1/thumbnail/DUMMY_MEDIA_ID", html) def test_empty_room(self) -> None: diff --git a/tests/replication/storage/test_events.py b/tests/replication/storage/test_events.py index 86c8f14d1b..a56f1e2d5d 100644 --- a/tests/replication/storage/test_events.py +++ b/tests/replication/storage/test_events.py @@ -141,6 +141,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): self.persist(type="m.room.create", key="", creator=USER_ID) self.check("get_invited_rooms_for_local_user", [USER_ID_2], []) event = self.persist(type="m.room.member", key=USER_ID_2, membership="invite") + assert event.internal_metadata.instance_name is not None assert event.internal_metadata.stream_ordering is not None self.replicate() @@ -154,7 +155,10 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): USER_ID, "invite", event.event_id, - event.internal_metadata.stream_ordering, + PersistedEventPosition( + event.internal_metadata.instance_name, + event.internal_metadata.stream_ordering, + ), RoomVersions.V1.identifier, ) ], @@ -229,11 +233,12 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): j2 = self.persist( type="m.room.member", sender=USER_ID_2, key=USER_ID_2, membership="join" ) + assert j2.internal_metadata.instance_name is not None assert j2.internal_metadata.stream_ordering is not None self.replicate() expected_pos = PersistedEventPosition( - "master", j2.internal_metadata.stream_ordering + j2.internal_metadata.instance_name, j2.internal_metadata.stream_ordering ) self.check( "get_rooms_for_user_with_stream_ordering", @@ -285,6 +290,7 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): msg, msgctx = self.build_event() self.get_success(self.persistance.persist_events([(j2, j2ctx), (msg, msgctx)])) self.replicate() + assert j2.internal_metadata.instance_name is not None assert j2.internal_metadata.stream_ordering is not None event_source = RoomEventSource(self.hs) @@ -326,7 +332,8 @@ class EventsWorkerStoreTestCase(BaseWorkerStoreTestCase): # joined_rooms list. if membership_changes: expected_pos = PersistedEventPosition( - "master", j2.internal_metadata.stream_ordering + j2.internal_metadata.instance_name, + j2.internal_metadata.stream_ordering, ) self.assertEqual( joined_rooms, diff --git a/tests/rest/admin/test_admin.py b/tests/rest/admin/test_admin.py index 22106eb786..5f6f7213b3 100644 --- a/tests/rest/admin/test_admin.py +++ b/tests/rest/admin/test_admin.py @@ -435,10 +435,6 @@ class ExperimentalFeaturesTestCase(unittest.HomeserverTestCase): True, channel.json_body["features"]["msc3881"], ) - self.assertEqual( - False, - channel.json_body["features"]["msc3967"], - ) # test nothing blows up if you try to disable a feature that isn't already enabled url = f"{self.url}/{self.other_user}" diff --git a/tests/rest/admin/test_event_reports.py b/tests/rest/admin/test_event_reports.py index a0f978911a..feb410a11d 100644 --- a/tests/rest/admin/test_event_reports.py +++ b/tests/rest/admin/test_event_reports.py @@ -24,7 +24,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.errors import Codes -from synapse.rest.client import login, report_event, room +from synapse.rest.client import login, reporting, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -37,7 +37,7 @@ class EventReportsTestCase(unittest.HomeserverTestCase): synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, - report_event.register_servlets, + reporting.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -453,7 +453,7 @@ class EventReportDetailTestCase(unittest.HomeserverTestCase): synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, - report_event.register_servlets, + reporting.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: diff --git a/tests/rest/client/test_account.py b/tests/rest/client/test_account.py index 992421ffe2..a85ea994de 100644 --- a/tests/rest/client/test_account.py +++ b/tests/rest/client/test_account.py @@ -427,13 +427,23 @@ class PasswordResetTestCase(unittest.HomeserverTestCase): text = None for part in mail.walk(): if part.get_content_type() == "text/plain": - text = part.get_payload(decode=True).decode("UTF-8") + text = part.get_payload(decode=True) + if text is not None: + # According to the logic table in `get_payload`, we know that + # the result of `get_payload` will be `bytes`, but mypy doesn't + # know this and complains. Thus, we assert the type. + assert isinstance(text, bytes) + text = text.decode("UTF-8") + break if not text: self.fail("Could not find text portion of email to parse") - assert text is not None + # `text` must be a `str`, after being decoded and determined just above + # to not be `None` or an empty `str`. + assert isinstance(text, str) + match = re.search(r"https://example.com\S+", text) assert match, "Could not find link in email" @@ -1209,13 +1219,23 @@ class ThreepidEmailRestTestCase(unittest.HomeserverTestCase): text = None for part in mail.walk(): if part.get_content_type() == "text/plain": - text = part.get_payload(decode=True).decode("UTF-8") + text = part.get_payload(decode=True) + if text is not None: + # According to the logic table in `get_payload`, we know that + # the result of `get_payload` will be `bytes`, but mypy doesn't + # know this and complains. Thus, we assert the type. + assert isinstance(text, bytes) + text = text.decode("UTF-8") + break if not text: self.fail("Could not find text portion of email to parse") - assert text is not None + # `text` must be a `str`, after being decoded and determined just above + # to not be `None` or an empty `str`. + assert isinstance(text, str) + match = re.search(r"https://example.com\S+", text) assert match, "Could not find link in email" diff --git a/tests/rest/client/test_keys.py b/tests/rest/client/test_keys.py index 5f0c005576..8bbd109092 100644 --- a/tests/rest/client/test_keys.py +++ b/tests/rest/client/test_keys.py @@ -155,71 +155,6 @@ class KeyQueryTestCase(unittest.HomeserverTestCase): } def test_device_signing_with_uia(self) -> None: - """Device signing key upload requires UIA.""" - password = "wonderland" - device_id = "ABCDEFGHI" - alice_id = self.register_user("alice", password) - alice_token = self.login("alice", password, device_id=device_id) - - content = self.make_device_keys(alice_id, device_id) - - channel = self.make_request( - "POST", - "/_matrix/client/v3/keys/device_signing/upload", - content, - alice_token, - ) - - self.assertEqual(channel.code, HTTPStatus.UNAUTHORIZED, channel.result) - # Grab the session - session = channel.json_body["session"] - # Ensure that flows are what is expected. - self.assertIn({"stages": ["m.login.password"]}, channel.json_body["flows"]) - - # add UI auth - content["auth"] = { - "type": "m.login.password", - "identifier": {"type": "m.id.user", "user": alice_id}, - "password": password, - "session": session, - } - - channel = self.make_request( - "POST", - "/_matrix/client/v3/keys/device_signing/upload", - content, - alice_token, - ) - - self.assertEqual(channel.code, HTTPStatus.OK, channel.result) - - @override_config({"ui_auth": {"session_timeout": "15m"}}) - def test_device_signing_with_uia_session_timeout(self) -> None: - """Device signing key upload requires UIA buy passes with grace period.""" - password = "wonderland" - device_id = "ABCDEFGHI" - alice_id = self.register_user("alice", password) - alice_token = self.login("alice", password, device_id=device_id) - - content = self.make_device_keys(alice_id, device_id) - - channel = self.make_request( - "POST", - "/_matrix/client/v3/keys/device_signing/upload", - content, - alice_token, - ) - - self.assertEqual(channel.code, HTTPStatus.OK, channel.result) - - @override_config( - { - "experimental_features": {"msc3967_enabled": True}, - "ui_auth": {"session_timeout": "15s"}, - } - ) - def test_device_signing_with_msc3967(self) -> None: - """Device signing key follows MSC3967 behaviour when enabled.""" password = "wonderland" device_id = "ABCDEFGHI" alice_id = self.register_user("alice", password) diff --git a/tests/rest/client/test_models.py b/tests/rest/client/test_models.py index 534dd7bcf4..f8a56c80ca 100644 --- a/tests/rest/client/test_models.py +++ b/tests/rest/client/test_models.py @@ -24,7 +24,7 @@ from typing import TYPE_CHECKING from typing_extensions import Literal from synapse._pydantic_compat import HAS_PYDANTIC_V2 -from synapse.rest.client.models import EmailRequestTokenBody +from synapse.types.rest.client import EmailRequestTokenBody if TYPE_CHECKING or HAS_PYDANTIC_V2: from pydantic.v1 import BaseModel, ValidationError diff --git a/tests/rest/client/test_report_event.py b/tests/rest/client/test_reporting.py index 5903771e52..009deb9cb0 100644 --- a/tests/rest/client/test_report_event.py +++ b/tests/rest/client/test_reporting.py @@ -22,7 +22,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin -from synapse.rest.client import login, report_event, room +from synapse.rest.client import login, reporting, room from synapse.server import HomeServer from synapse.types import JsonDict from synapse.util import Clock @@ -35,7 +35,7 @@ class ReportEventTestCase(unittest.HomeserverTestCase): synapse.rest.admin.register_servlets, login.register_servlets, room.register_servlets, - report_event.register_servlets, + reporting.register_servlets, ] def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: @@ -139,3 +139,92 @@ class ReportEventTestCase(unittest.HomeserverTestCase): "POST", self.report_path, data, access_token=self.other_user_tok ) self.assertEqual(response_status, channel.code, msg=channel.result["body"]) + + +class ReportRoomTestCase(unittest.HomeserverTestCase): + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + reporting.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.other_user = self.register_user("user", "pass") + self.other_user_tok = self.login("user", "pass") + + self.room_id = self.helper.create_room_as( + self.other_user, tok=self.other_user_tok, is_public=True + ) + self.report_path = ( + f"/_matrix/client/unstable/org.matrix.msc4151/rooms/{self.room_id}/report" + ) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_reason_str(self) -> None: + data = {"reason": "this makes me sad"} + self._assert_status(200, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_no_reason(self) -> None: + data = {"not_reason": "for typechecking"} + self._assert_status(400, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_reason_nonstring(self) -> None: + data = {"reason": 42} + self._assert_status(400, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_reason_null(self) -> None: + data = {"reason": None} + self._assert_status(400, data) + + @unittest.override_config( + { + "experimental_features": {"msc4151_enabled": True}, + } + ) + def test_cannot_report_nonexistent_room(self) -> None: + """ + Tests that we don't accept event reports for rooms which do not exist. + """ + channel = self.make_request( + "POST", + "/_matrix/client/unstable/org.matrix.msc4151/rooms/!bloop:example.org/report", + {"reason": "i am very sad"}, + access_token=self.other_user_tok, + shorthand=False, + ) + self.assertEqual(404, channel.code, msg=channel.result["body"]) + self.assertEqual( + "Room does not exist", + channel.json_body["error"], + msg=channel.result["body"], + ) + + def _assert_status(self, response_status: int, data: JsonDict) -> None: + channel = self.make_request( + "POST", + self.report_path, + data, + access_token=self.other_user_tok, + shorthand=False, + ) + self.assertEqual(response_status, channel.code, msg=channel.result["body"]) diff --git a/tests/rest/client/test_retention.py b/tests/rest/client/test_retention.py index ceae40498e..1e5a1b0a4d 100644 --- a/tests/rest/client/test_retention.py +++ b/tests/rest/client/test_retention.py @@ -167,7 +167,6 @@ class RetentionTestCase(unittest.HomeserverTestCase): storage_controllers, self.user_id, events, - msc4115_membership_on_events=True, ) ) diff --git a/tests/rest/client/test_sync.py b/tests/rest/client/test_sync.py index daeb1d3ddd..40870b2cfe 100644 --- a/tests/rest/client/test_sync.py +++ b/tests/rest/client/test_sync.py @@ -27,6 +27,7 @@ from twisted.test.proto_helpers import MemoryReactor import synapse.rest.admin from synapse.api.constants import ( + AccountDataTypes, EventContentFields, EventTypes, ReceiptTypes, @@ -34,7 +35,7 @@ from synapse.api.constants import ( ) from synapse.rest.client import devices, knock, login, read_marker, receipts, room, sync from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, RoomStreamToken, StreamKeyType from synapse.util import Clock from tests import unittest @@ -1204,3 +1205,261 @@ class ExcludeRoomTestCase(unittest.HomeserverTestCase): self.assertNotIn(self.excluded_room_id, channel.json_body["rooms"]["join"]) self.assertIn(self.included_room_id, channel.json_body["rooms"]["join"]) + + +class SlidingSyncTestCase(unittest.HomeserverTestCase): + """ + Tests regarding MSC3575 Sliding Sync `/sync` endpoint. + """ + + servlets = [ + synapse.rest.admin.register_servlets, + login.register_servlets, + room.register_servlets, + sync.register_servlets, + devices.register_servlets, + ] + + def default_config(self) -> JsonDict: + config = super().default_config() + # Enable sliding sync + config["experimental_features"] = {"msc3575_enabled": True} + return config + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.sync_endpoint = "/_matrix/client/unstable/org.matrix.msc3575/sync" + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _create_dm_room( + self, + inviter_user_id: str, + inviter_tok: str, + invitee_user_id: str, + invitee_tok: str, + ) -> str: + """ + Helper to create a DM room as the "inviter" and invite the "invitee" user to the + room. The "invitee" user also will join the room. The `m.direct` account data + will be set for both users. + """ + + # Create a room and send an invite the other user + room_id = self.helper.create_room_as( + inviter_user_id, + is_public=False, + tok=inviter_tok, + ) + self.helper.invite( + room_id, + src=inviter_user_id, + targ=invitee_user_id, + tok=inviter_tok, + extra_data={"is_direct": True}, + ) + # Person that was invited joins the room + self.helper.join(room_id, invitee_user_id, tok=invitee_tok) + + # Mimic the client setting the room as a direct message in the global account + # data + self.get_success( + self.store.add_account_data_for_user( + invitee_user_id, + AccountDataTypes.DIRECT, + {inviter_user_id: [room_id]}, + ) + ) + self.get_success( + self.store.add_account_data_for_user( + inviter_user_id, + AccountDataTypes.DIRECT, + {invitee_user_id: [room_id]}, + ) + ) + + return room_id + + def test_sync_list(self) -> None: + """ + Test that room IDs show up in the Sliding Sync lists + """ + alice_user_id = self.register_user("alice", "correcthorse") + alice_access_token = self.login(alice_user_id, "correcthorse") + + room_id = self.helper.create_room_as( + alice_user_id, tok=alice_access_token, is_public=True + ) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "sort": ["by_notification_level", "by_recency", "by_name"], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + }, + access_token=alice_access_token, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["foo-list"], + channel.json_body["lists"].keys(), + ) + + # Make sure the list includes the room we are joined to + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 99], + "room_ids": [room_id], + } + ], + channel.json_body["lists"]["foo-list"], + ) + + def test_wait_for_sync_token(self) -> None: + """ + Test that worker will wait until it catches up to the given token + """ + alice_user_id = self.register_user("alice", "correcthorse") + alice_access_token = self.login(alice_user_id, "correcthorse") + + # Create a future token that will cause us to wait. Since we never send a new + # event to reach that future stream_ordering, the worker will wait until the + # full timeout. + current_token = self.event_sources.get_current_token() + future_position_token = current_token.copy_and_replace( + StreamKeyType.ROOM, + RoomStreamToken(stream=current_token.room_key.stream + 1), + ) + + future_position_token_serialized = self.get_success( + future_position_token.to_string(self.store) + ) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint + f"?pos={future_position_token_serialized}", + { + "lists": { + "foo-list": { + "ranges": [[0, 99]], + "sort": ["by_notification_level", "by_recency", "by_name"], + "required_state": [ + ["m.room.join_rules", ""], + ["m.room.history_visibility", ""], + ["m.space.child", "*"], + ], + "timeline_limit": 1, + } + } + }, + access_token=alice_access_token, + await_result=False, + ) + # Block for 10 seconds to make `notifier.wait_for_stream_token(from_token)` + # timeout + with self.assertRaises(TimedOutException): + channel.await_result(timeout_ms=9900) + channel.await_result(timeout_ms=200) + self.assertEqual(channel.code, 200, channel.json_body) + + # We expect the `next_pos` in the result to be the same as what we requested + # with because we weren't able to find anything new yet. + self.assertEqual( + channel.json_body["next_pos"], future_position_token_serialized + ) + + def test_filter_list(self) -> None: + """ + Test that filters apply to lists + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + user2_id = self.register_user("user2", "pass") + user2_tok = self.login(user2_id, "pass") + + # Create a DM room + dm_room_id = self._create_dm_room( + inviter_user_id=user1_id, + inviter_tok=user1_tok, + invitee_user_id=user2_id, + invitee_tok=user2_tok, + ) + + # Create a normal room + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + # Make the Sliding Sync request + channel = self.make_request( + "POST", + self.sync_endpoint, + { + "lists": { + "dms": { + "ranges": [[0, 99]], + "sort": ["by_recency"], + "required_state": [], + "timeline_limit": 1, + "filters": {"is_dm": True}, + }, + "foo-list": { + "ranges": [[0, 99]], + "sort": ["by_recency"], + "required_state": [], + "timeline_limit": 1, + "filters": {"is_dm": False}, + }, + } + }, + access_token=user1_tok, + ) + self.assertEqual(channel.code, 200, channel.json_body) + + # Make sure it has the foo-list we requested + self.assertListEqual( + list(channel.json_body["lists"].keys()), + ["dms", "foo-list"], + channel.json_body["lists"].keys(), + ) + + # Make sure the list includes the room we are joined to + self.assertListEqual( + list(channel.json_body["lists"]["dms"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 99], + "room_ids": [dm_room_id], + } + ], + list(channel.json_body["lists"]["dms"]), + ) + self.assertListEqual( + list(channel.json_body["lists"]["foo-list"]["ops"]), + [ + { + "op": "SYNC", + "range": [0, 99], + "room_ids": [room_id], + } + ], + list(channel.json_body["lists"]["foo-list"]), + ) diff --git a/tests/rest/client/utils.py b/tests/rest/client/utils.py index 7362bde7ab..f0ba40a1f1 100644 --- a/tests/rest/client/utils.py +++ b/tests/rest/client/utils.py @@ -330,9 +330,12 @@ class RestHelper: data, ) - assert channel.code == expect_code, "Expected: %d, got: %d, resp: %r" % ( + assert ( + channel.code == expect_code + ), "Expected: %d, got: %d, PUT %s -> resp: %r" % ( expect_code, channel.code, + path, channel.result["body"], ) diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py index 27d5b0125f..81feb3ec29 100644 --- a/tests/storage/test_event_chain.py +++ b/tests/storage/test_event_chain.py @@ -431,6 +431,7 @@ class EventChainStoreTestCase(HomeserverTestCase): for e in events: e.internal_metadata.stream_ordering = self._next_stream_ordering + e.internal_metadata.instance_name = self.hs.get_instance_name() self._next_stream_ordering += 1 def _persist(txn: LoggingTransaction) -> None: diff --git a/tests/storage/test_stream.py b/tests/storage/test_stream.py index 2029cd9c68..ee34baf46f 100644 --- a/tests/storage/test_stream.py +++ b/tests/storage/test_stream.py @@ -19,7 +19,10 @@ # # -from typing import List +import logging +from typing import List, Tuple + +from immutabledict import immutabledict from twisted.test.proto_helpers import MemoryReactor @@ -28,11 +31,13 @@ from synapse.api.filtering import Filter from synapse.rest import admin from synapse.rest.client import login, room from synapse.server import HomeServer -from synapse.types import JsonDict +from synapse.types import JsonDict, PersistedEventPosition, RoomStreamToken from synapse.util import Clock from tests.unittest import HomeserverTestCase +logger = logging.getLogger(__name__) + class PaginationTestCase(HomeserverTestCase): """ @@ -268,3 +273,263 @@ class PaginationTestCase(HomeserverTestCase): } chunk = self._filter_messages(filter) self.assertEqual(chunk, [self.event_id_1, self.event_id_2, self.event_id_none]) + + +class GetLastEventInRoomBeforeStreamOrderingTestCase(HomeserverTestCase): + """ + Test `get_last_event_in_room_before_stream_ordering(...)` + """ + + servlets = [ + admin.register_servlets, + room.register_servlets, + login.register_servlets, + ] + + def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None: + self.store = hs.get_datastores().main + self.event_sources = hs.get_event_sources() + + def _update_persisted_instance_name_for_event( + self, event_id: str, instance_name: str + ) -> None: + """ + Update the `instance_name` that persisted the the event in the database. + """ + return self.get_success( + self.store.db_pool.simple_update_one( + "events", + keyvalues={"event_id": event_id}, + updatevalues={"instance_name": instance_name}, + ) + ) + + def _send_event_on_instance( + self, instance_name: str, room_id: str, access_token: str + ) -> Tuple[JsonDict, PersistedEventPosition]: + """ + Send an event in a room and mimic that it was persisted by a specific + instance/worker. + """ + event_response = self.helper.send( + room_id, f"{instance_name} message", tok=access_token + ) + + self._update_persisted_instance_name_for_event( + event_response["event_id"], instance_name + ) + + event_pos = self.get_success( + self.store.get_position_for_event(event_response["event_id"]) + ) + + return event_response, event_pos + + def test_before_room_created(self) -> None: + """ + Test that no event is returned if we are using a token before the room was even created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + before_room_token = self.event_sources.get_current_token() + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=before_room_token.room_key, + ) + ) + + self.assertIsNone(last_event) + + def test_after_room_created(self) -> None: + """ + Test that an event is returned if we are using a token after the room was created + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id, + end_token=after_room_token.room_key, + ) + ) + + self.assertIsNotNone(last_event) + + def test_activity_in_other_rooms(self) -> None: + """ + Test to make sure that the last event in the room is returned even if the + `stream_ordering` has advanced from activity in other rooms. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + # Create another room to advance the stream_ordering + self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + + after_room_token = self.event_sources.get_current_token() + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the event we expect (which also means we know it's from the + # correct room) + self.assertEqual(last_event, event_response["event_id"]) + + def test_activity_after_token_has_no_effect(self) -> None: + """ + Test to make sure we return the last event before the token even if there is + activity after it. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response = self.helper.send(room_id1, "target!", tok=user1_tok) + + after_room_token = self.event_sources.get_current_token() + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=after_room_token.room_key, + ) + ) + + # Make sure it's the last event before the token + self.assertEqual(last_event, event_response["event_id"]) + + def test_last_event_within_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that that is *within* the sharded + token (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). We are specifically testing + that we can find an event within the tokens minimum and instance + `stream_ordering`. + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + # so we can sandwich event3 in the middle of the token + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event1 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos2.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event at/before the token in room1 + self.assertEqual( + last_event, + event_response3["event_id"], + f"We expected {event_response3['event_id']} but saw {last_event} which corresponds to " + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + "event3": event_response3["event_id"], + } + ), + ) + + def test_last_event_before_sharded_token(self) -> None: + """ + Test to make sure we can find the last event that is *before* the sharded token + (a token that has an `instance_map` and looks like + `m{min_pos}~{writer1}.{pos1}~{writer2}.{pos2}`). + """ + user1_id = self.register_user("user1", "pass") + user1_tok = self.login(user1_id, "pass") + + room_id1 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response1, event_pos1 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + event_response2, event_pos2 = self._send_event_on_instance( + "worker1", room_id1, user1_tok + ) + + # Create another room to advance the `stream_ordering` on the same worker + room_id2 = self.helper.create_room_as(user1_id, tok=user1_tok, is_public=True) + event_response3, event_pos3 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + event_response4, event_pos4 = self._send_event_on_instance( + "worker1", room_id2, user1_tok + ) + + # Assemble a token that encompasses event3 -> event4 on worker1 + end_token = RoomStreamToken( + stream=event_pos3.stream, + instance_map=immutabledict({"worker1": event_pos4.stream}), + ) + + # Send some events after the token + self.helper.send(room_id1, "after1", tok=user1_tok) + self.helper.send(room_id1, "after2", tok=user1_tok) + + last_event = self.get_success( + self.store.get_last_event_in_room_before_stream_ordering( + room_id=room_id1, + end_token=end_token, + ) + ) + + # Should find closest event at/before the token in room1 + self.assertEqual( + last_event, + event_response2["event_id"], + f"We expected {event_response2['event_id']} but saw {last_event} which corresponds to " + + str( + { + "event1": event_response1["event_id"], + "event2": event_response2["event_id"], + } + ), + ) diff --git a/tests/storage/test_user_directory.py b/tests/storage/test_user_directory.py index 156a610faa..c26932069f 100644 --- a/tests/storage/test_user_directory.py +++ b/tests/storage/test_user_directory.py @@ -711,6 +711,10 @@ class UserDirectoryICUTestCase(HomeserverTestCase): ), ) + self.assertEqual(_parse_words_with_icu("user-1"), ["user-1"]) + self.assertEqual(_parse_words_with_icu("user-ab"), ["user-ab"]) + self.assertEqual(_parse_words_with_icu("user.--1"), ["user", "-1"]) + def test_regex_word_boundary_punctuation(self) -> None: """ Tests the behaviour of punctuation with the non-ICU tokeniser diff --git a/tests/test_visibility.py b/tests/test_visibility.py index 3e2100eab4..89cbe4e54b 100644 --- a/tests/test_visibility.py +++ b/tests/test_visibility.py @@ -336,7 +336,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase): self.hs.get_storage_controllers(), "@joiner:test", events_to_filter, - msc4115_membership_on_events=True, ) ) resident_filtered_events = self.get_success( @@ -344,7 +343,6 @@ class FilterEventsForClientTestCase(HomeserverTestCase): self.hs.get_storage_controllers(), "@resident:test", events_to_filter, - msc4115_membership_on_events=True, ) ) @@ -357,7 +355,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase): self.assertEqual( ["join", "join", "leave"], [ - e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP] + e.unsigned[EventUnsignedContentFields.MEMBERSHIP] for e in joiner_filtered_events ], ) @@ -379,7 +377,7 @@ class FilterEventsForClientTestCase(HomeserverTestCase): self.assertEqual( ["join", "join", "join", "join", "join"], [ - e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP] + e.unsigned[EventUnsignedContentFields.MEMBERSHIP] for e in resident_filtered_events ], ) @@ -441,7 +439,6 @@ class FilterEventsOutOfBandEventsForClientTestCase( self.hs.get_storage_controllers(), "@user:test", [invite_event, reject_event], - msc4115_membership_on_events=True, ) ) self.assertEqual( @@ -451,7 +448,7 @@ class FilterEventsOutOfBandEventsForClientTestCase( self.assertEqual( ["invite", "leave"], [ - e.unsigned[EventUnsignedContentFields.MSC4115_MEMBERSHIP] + e.unsigned[EventUnsignedContentFields.MEMBERSHIP] for e in filtered_events ], ) @@ -463,7 +460,6 @@ class FilterEventsOutOfBandEventsForClientTestCase( self.hs.get_storage_controllers(), "@other:test", [invite_event, reject_event], - msc4115_membership_on_events=True, ) ), [], |