summary refs log tree commit diff
path: root/synapse/media/media_repository.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/media/media_repository.py')
-rw-r--r--synapse/media/media_repository.py151
1 files changed, 148 insertions, 3 deletions
diff --git a/synapse/media/media_repository.py b/synapse/media/media_repository.py
index 1436329fad..542642b900 100644
--- a/synapse/media/media_repository.py
+++ b/synapse/media/media_repository.py
@@ -480,6 +480,7 @@ class MediaRepository:
         name: Optional[str],
         max_timeout_ms: int,
         ip_address: str,
+        use_federation_endpoint: bool,
     ) -> None:
         """Respond to requests for remote media.
 
@@ -492,6 +493,8 @@ class MediaRepository:
             max_timeout_ms: the maximum number of milliseconds to wait for the
                 media to be uploaded.
             ip_address: the IP address of the requester
+            use_federation_endpoint: whether to request the remote media over the new
+                federation `/download` endpoint
 
         Returns:
             Resolves once a response has successfully been written to request
@@ -522,6 +525,7 @@ class MediaRepository:
                 max_timeout_ms,
                 self.download_ratelimiter,
                 ip_address,
+                use_federation_endpoint,
             )
 
         # We deliberately stream the file outside the lock
@@ -569,6 +573,7 @@ class MediaRepository:
                 max_timeout_ms,
                 self.download_ratelimiter,
                 ip_address,
+                False,
             )
 
         # Ensure we actually use the responder so that it releases resources
@@ -585,6 +590,7 @@ class MediaRepository:
         max_timeout_ms: int,
         download_ratelimiter: Ratelimiter,
         ip_address: str,
+        use_federation_endpoint: bool,
     ) -> Tuple[Optional[Responder], RemoteMedia]:
         """Looks for media in local cache, if not there then attempt to
         download from remote server.
@@ -598,6 +604,8 @@ class MediaRepository:
             download_ratelimiter: a ratelimiter limiting remote media downloads, keyed to
                 requester IP.
             ip_address: the IP address of the requester
+            use_federation_endpoint: whether to request the remote media over the new federation
+            /download endpoint
 
         Returns:
             A tuple of responder and the media info of the file.
@@ -629,9 +637,23 @@ class MediaRepository:
         # Failed to find the file anywhere, lets download it.
 
         try:
-            media_info = await self._download_remote_file(
-                server_name, media_id, max_timeout_ms, download_ratelimiter, ip_address
-            )
+            if not use_federation_endpoint:
+                media_info = await self._download_remote_file(
+                    server_name,
+                    media_id,
+                    max_timeout_ms,
+                    download_ratelimiter,
+                    ip_address,
+                )
+            else:
+                media_info = await self._federation_download_remote_file(
+                    server_name,
+                    media_id,
+                    max_timeout_ms,
+                    download_ratelimiter,
+                    ip_address,
+                )
+
         except SynapseError:
             raise
         except Exception as e:
@@ -775,6 +797,129 @@ class MediaRepository:
             quarantined_by=None,
         )
 
+    async def _federation_download_remote_file(
+        self,
+        server_name: str,
+        media_id: str,
+        max_timeout_ms: int,
+        download_ratelimiter: Ratelimiter,
+        ip_address: str,
+    ) -> RemoteMedia:
+        """Attempt to download the remote file from the given server name.
+        Uses the given file_id as the local id and downloads the file over the federation
+        v1 download endpoint
+
+        Args:
+            server_name: Originating server
+            media_id: The media ID of the content (as defined by the
+                remote server). This is different than the file_id, which is
+                locally generated.
+            max_timeout_ms: the maximum number of milliseconds to wait for the
+                media to be uploaded.
+            download_ratelimiter: a ratelimiter limiting remote media downloads, keyed to
+                requester IP
+            ip_address: the IP address of the requester
+
+        Returns:
+            The media info of the file.
+        """
+
+        file_id = random_string(24)
+
+        file_info = FileInfo(server_name=server_name, file_id=file_id)
+
+        async with self.media_storage.store_into_file(file_info) as (f, fname):
+            try:
+                res = await self.client.federation_download_media(
+                    server_name,
+                    media_id,
+                    output_stream=f,
+                    max_size=self.max_upload_size,
+                    max_timeout_ms=max_timeout_ms,
+                    download_ratelimiter=download_ratelimiter,
+                    ip_address=ip_address,
+                )
+                # if we had to fall back to the _matrix/media endpoint it will only return
+                # the headers and length, check the length of the tuple before unpacking
+                if len(res) == 3:
+                    length, headers, json = res
+                else:
+                    length, headers = res
+            except RequestSendFailed as e:
+                logger.warning(
+                    "Request failed fetching remote media %s/%s: %r",
+                    server_name,
+                    media_id,
+                    e,
+                )
+                raise SynapseError(502, "Failed to fetch remote media")
+
+            except HttpResponseException as e:
+                logger.warning(
+                    "HTTP error fetching remote media %s/%s: %s",
+                    server_name,
+                    media_id,
+                    e.response,
+                )
+                if e.code == twisted.web.http.NOT_FOUND:
+                    raise e.to_synapse_error()
+                raise SynapseError(502, "Failed to fetch remote media")
+
+            except SynapseError:
+                logger.warning(
+                    "Failed to fetch remote media %s/%s", server_name, media_id
+                )
+                raise
+            except NotRetryingDestination:
+                logger.warning("Not retrying destination %r", server_name)
+                raise SynapseError(502, "Failed to fetch remote media")
+            except Exception:
+                logger.exception(
+                    "Failed to fetch remote media %s/%s", server_name, media_id
+                )
+                raise SynapseError(502, "Failed to fetch remote media")
+
+            if b"Content-Type" in headers:
+                media_type = headers[b"Content-Type"][0].decode("ascii")
+            else:
+                media_type = "application/octet-stream"
+            upload_name = get_filename_from_headers(headers)
+            time_now_ms = self.clock.time_msec()
+
+            # Multiple remote media download requests can race (when using
+            # multiple media repos), so this may throw a violation constraint
+            # exception. If it does we'll delete the newly downloaded file from
+            # disk (as we're in the ctx manager).
+            #
+            # However: we've already called `finish()` so we may have also
+            # written to the storage providers. This is preferable to the
+            # alternative where we call `finish()` *after* this, where we could
+            # end up having an entry in the DB but fail to write the files to
+            # the storage providers.
+            await self.store.store_cached_remote_media(
+                origin=server_name,
+                media_id=media_id,
+                media_type=media_type,
+                time_now_ms=time_now_ms,
+                upload_name=upload_name,
+                media_length=length,
+                filesystem_id=file_id,
+            )
+
+        logger.debug("Stored remote media in file %r", fname)
+
+        return RemoteMedia(
+            media_origin=server_name,
+            media_id=media_id,
+            media_type=media_type,
+            media_length=length,
+            upload_name=upload_name,
+            created_ts=time_now_ms,
+            filesystem_id=file_id,
+            last_access_ts=time_now_ms,
+            quarantined_by=None,
+        )
+
     def _get_thumbnail_requirements(
         self, media_type: str
     ) -> Tuple[ThumbnailRequirement, ...]: