diff options
Diffstat (limited to 'tests/rest')
-rw-r--r-- | tests/rest/media/test_url_preview.py | 194 |
1 files changed, 190 insertions, 4 deletions
diff --git a/tests/rest/media/test_url_preview.py b/tests/rest/media/test_url_preview.py index e44beae8c1..7517155cf3 100644 --- a/tests/rest/media/test_url_preview.py +++ b/tests/rest/media/test_url_preview.py @@ -653,6 +653,57 @@ class URLPreviewTests(unittest.HomeserverTestCase): server.data, ) + def test_image(self) -> None: + """An image should be precached if mentioned in the HTML.""" + self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")] + self.lookups["cdn.matrix.org"] = [(IPv4Address, "10.1.2.4")] + + result = ( + b"""<html><body><img src="http://cdn.matrix.org/foo.png"></body></html>""" + ) + + channel = self.make_request( + "GET", + "preview_url?url=http://matrix.org", + shorthand=False, + await_result=False, + ) + self.pump() + + # Respond with the HTML. + client = self.reactor.tcpClients[0][2].buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, self.reactor)) + client.makeConnection(FakeTransport(server, self.reactor)) + client.dataReceived( + ( + b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n" + b'Content-Type: text/html; charset="utf8"\r\n\r\n' + ) + % (len(result),) + + result + ) + self.pump() + + # Respond with the photo. + client = self.reactor.tcpClients[1][2].buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, self.reactor)) + client.makeConnection(FakeTransport(server, self.reactor)) + client.dataReceived( + ( + b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n" + b"Content-Type: image/png\r\n\r\n" + ) + % (len(SMALL_PNG),) + + SMALL_PNG + ) + self.pump() + + # The image should be in the result. + self.assertEqual(channel.code, 200) + self._assert_small_png(channel.json_body) + def test_nonexistent_image(self) -> None: """If the preview image doesn't exist, ensure some data is returned.""" self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")] @@ -683,9 +734,53 @@ class URLPreviewTests(unittest.HomeserverTestCase): ) self.pump() + + # There should not be a second connection. + self.assertEqual(len(self.reactor.tcpClients), 1) + + # The image should not be in the result. self.assertEqual(channel.code, 200) + self.assertNotIn("og:image", channel.json_body) + + @unittest.override_config( + {"url_preview_url_blacklist": [{"netloc": "cdn.matrix.org"}]} + ) + def test_image_blocked(self) -> None: + """If the preview image doesn't exist, ensure some data is returned.""" + self.lookups["matrix.org"] = [(IPv4Address, "10.1.2.3")] + self.lookups["cdn.matrix.org"] = [(IPv4Address, "10.1.2.4")] + + result = ( + b"""<html><body><img src="http://cdn.matrix.org/foo.jpg"></body></html>""" + ) + + channel = self.make_request( + "GET", + "preview_url?url=http://matrix.org", + shorthand=False, + await_result=False, + ) + self.pump() + + client = self.reactor.tcpClients[0][2].buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, self.reactor)) + client.makeConnection(FakeTransport(server, self.reactor)) + client.dataReceived( + ( + b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n" + b'Content-Type: text/html; charset="utf8"\r\n\r\n' + ) + % (len(result),) + + result + ) + self.pump() + + # There should not be a second connection. + self.assertEqual(len(self.reactor.tcpClients), 1) # The image should not be in the result. + self.assertEqual(channel.code, 200) self.assertNotIn("og:image", channel.json_body) def test_oembed_failure(self) -> None: @@ -880,6 +975,11 @@ class URLPreviewTests(unittest.HomeserverTestCase): ) self.pump() + + # Double check that the proper host is being connected to. (Note that + # twitter.com can't be resolved so this is already implicitly checked.) + self.assertIn(b"\r\nHost: publish.twitter.com\r\n", server.data) + self.assertEqual(channel.code, 200) body = channel.json_body self.assertEqual( @@ -940,6 +1040,22 @@ class URLPreviewTests(unittest.HomeserverTestCase): }, ) + @unittest.override_config( + {"url_preview_url_blacklist": [{"netloc": "publish.twitter.com"}]} + ) + def test_oembed_blocked(self) -> None: + """The oEmbed URL should not be downloaded if the oEmbed URL is blocked.""" + self.lookups["twitter.com"] = [(IPv4Address, "10.1.2.3")] + + channel = self.make_request( + "GET", + "preview_url?url=http://twitter.com/matrixdotorg/status/12345", + shorthand=False, + await_result=False, + ) + self.pump() + self.assertEqual(channel.code, 403, channel.result) + def test_oembed_autodiscovery(self) -> None: """ Autodiscovery works by finding the link in the HTML response and then requesting an oEmbed URL. @@ -980,7 +1096,6 @@ class URLPreviewTests(unittest.HomeserverTestCase): % (len(result),) + result ) - self.pump() # The oEmbed response. @@ -1004,7 +1119,6 @@ class URLPreviewTests(unittest.HomeserverTestCase): % (len(oembed_content),) + oembed_content ) - self.pump() # Ensure the URL is what was requested. @@ -1023,7 +1137,6 @@ class URLPreviewTests(unittest.HomeserverTestCase): % (len(SMALL_PNG),) + SMALL_PNG ) - self.pump() # Ensure the URL is what was requested. @@ -1036,6 +1149,59 @@ class URLPreviewTests(unittest.HomeserverTestCase): ) self._assert_small_png(body) + @unittest.override_config( + {"url_preview_url_blacklist": [{"netloc": "publish.twitter.com"}]} + ) + def test_oembed_autodiscovery_blocked(self) -> None: + """ + If the discovered oEmbed URL is blocked, it should be discarded. + """ + # This is a little cheesy in that we use the www subdomain (which isn't the + # list of oEmbed patterns) to get "raw" HTML response. + self.lookups["www.twitter.com"] = [(IPv4Address, "10.1.2.3")] + self.lookups["publish.twitter.com"] = [(IPv4Address, "10.1.2.4")] + + result = b""" + <title>Test</title> + <link rel="alternate" type="application/json+oembed" + href="http://publish.twitter.com/oembed?url=http%3A%2F%2Fcdn.twitter.com%2Fmatrixdotorg%2Fstatus%2F12345&format=json" + title="matrixdotorg" /> + """ + + channel = self.make_request( + "GET", + "preview_url?url=http://www.twitter.com/matrixdotorg/status/12345", + shorthand=False, + await_result=False, + ) + self.pump() + + client = self.reactor.tcpClients[0][2].buildProtocol(None) + server = AccumulatingProtocol() + server.makeConnection(FakeTransport(client, self.reactor)) + client.makeConnection(FakeTransport(server, self.reactor)) + client.dataReceived( + ( + b"HTTP/1.0 200 OK\r\nContent-Length: %d\r\n" + b'Content-Type: text/html; charset="utf8"\r\n\r\n' + ) + % (len(result),) + + result + ) + + self.pump() + + # Ensure there's no additional connections. + self.assertEqual(len(self.reactor.tcpClients), 1) + + # Ensure the URL is what was requested. + self.assertIn(b"\r\nHost: www.twitter.com\r\n", server.data) + + self.assertEqual(channel.code, 200) + body = channel.json_body + self.assertEqual(body["og:title"], "Test") + self.assertNotIn("og:image", body) + def _download_image(self) -> Tuple[str, str]: """Downloads an image into the URL cache. Returns: @@ -1192,7 +1358,7 @@ class URLPreviewTests(unittest.HomeserverTestCase): ) @unittest.override_config({"url_preview_url_blacklist": [{"port": "*"}]}) - def test_blacklist_port(self) -> None: + def test_blocked_port(self) -> None: """Tests that blacklisting URLs with a port makes previewing such URLs fail with a 403 error and doesn't impact other previews. """ @@ -1230,3 +1396,23 @@ class URLPreviewTests(unittest.HomeserverTestCase): self.pump() self.assertEqual(channel.code, 200) + + @unittest.override_config( + {"url_preview_url_blacklist": [{"netloc": "example.com"}]} + ) + def test_blocked_url(self) -> None: + """Tests that blacklisting URLs with a host makes previewing such URLs + fail with a 403 error. + """ + self.lookups["example.com"] = [(IPv4Address, "10.1.2.3")] + + bad_url = quote("http://example.com/foo") + + channel = self.make_request( + "GET", + "preview_url?url=" + bad_url, + shorthand=False, + await_result=False, + ) + self.pump() + self.assertEqual(channel.code, 403, channel.result) |