1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
|
#
# This file is licensed under the Affero General Public License (AGPL) version 3.
#
# Copyright (C) 2023 New Vector, Ltd
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as
# published by the Free Software Foundation, either version 3 of the
# License, or (at your option) any later version.
#
# See the GNU Affero General Public License for more details:
# <https://www.gnu.org/licenses/agpl-3.0.html>.
#
# Originally licensed under the Apache License, Version 2.0:
# <http://www.apache.org/licenses/LICENSE-2.0>.
#
# [This file includes modifications made by New Vector Limited]
#
#
from typing import Dict
from twisted.test.proto_helpers import MemoryReactor
from twisted.web.resource import Resource
from synapse.media._base import FileInfo
from synapse.server import HomeServer
from synapse.util import Clock
from tests import unittest
from tests.test_utils import SMALL_PNG
from tests.unittest import override_config
class MediaDomainBlockingTests(unittest.HomeserverTestCase):
remote_media_id = "doesnotmatter"
remote_server_name = "evil.com"
def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
self.store = hs.get_datastores().main
# Inject a piece of media. We'll use this to ensure we're returning a sane
# response when we're not supposed to block it, distinguishing a media block
# from a regular 404.
file_id = "abcdefg12345"
file_info = FileInfo(server_name=self.remote_server_name, file_id=file_id)
with hs.get_media_repository().media_storage.store_into_file(file_info) as (
f,
fname,
finish,
):
f.write(SMALL_PNG)
self.get_success(finish())
self.get_success(
self.store.store_cached_remote_media(
origin=self.remote_server_name,
media_id=self.remote_media_id,
media_type="image/png",
media_length=1,
time_now_ms=clock.time_msec(),
upload_name="test.png",
filesystem_id=file_id,
)
)
def create_resource_dict(self) -> Dict[str, Resource]:
# We need to manually set the resource tree to include media, the
# default only does `/_matrix/client` APIs.
return {"/_matrix/media": self.hs.get_media_repository_resource()}
@override_config(
{
# Disable downloads from the domain we'll be trying to download from.
# Should result in a 404.
"prevent_media_downloads_from": ["evil.com"]
}
)
def test_cannot_download_blocked_media(self) -> None:
"""
Tests to ensure that remote media which is blocked cannot be downloaded.
"""
response = self.make_request(
"GET",
f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
shorthand=False,
)
self.assertEqual(response.code, 404)
@override_config(
{
# Disable downloads from a domain we won't be requesting downloads from.
# This proves we haven't broken anything.
"prevent_media_downloads_from": ["not-listed.com"]
}
)
def test_remote_media_normally_unblocked(self) -> None:
"""
Tests to ensure that remote media is normally able to be downloaded
when no domain block is in place.
"""
response = self.make_request(
"GET",
f"/_matrix/media/v3/download/evil.com/{self.remote_media_id}",
shorthand=False,
)
self.assertEqual(response.code, 200)
@override_config(
{
# Disable downloads from the domain we'll be trying to download from.
# Should result in a 404.
"prevent_media_downloads_from": ["evil.com"],
"dynamic_thumbnails": True,
}
)
def test_cannot_download_blocked_media_thumbnail(self) -> None:
"""
Same test as test_cannot_download_blocked_media but for thumbnails.
"""
response = self.make_request(
"GET",
f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
shorthand=False,
content={"width": 100, "height": 100},
)
self.assertEqual(response.code, 404)
@override_config(
{
# Disable downloads from a domain we won't be requesting downloads from.
# This proves we haven't broken anything.
"prevent_media_downloads_from": ["not-listed.com"],
"dynamic_thumbnails": True,
}
)
def test_remote_media_thumbnail_normally_unblocked(self) -> None:
"""
Same test as test_remote_media_normally_unblocked but for thumbnails.
"""
response = self.make_request(
"GET",
f"/_matrix/media/v3/thumbnail/evil.com/{self.remote_media_id}?width=100&height=100",
shorthand=False,
)
self.assertEqual(response.code, 200)
|