summary refs log tree commit diff
path: root/synapse/rest
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/rest')
-rw-r--r--synapse/rest/media/v1/media_repository.py29
-rw-r--r--synapse/rest/media/v1/media_storage.py15
-rw-r--r--synapse/rest/media/v1/storage_provider.py127
3 files changed, 162 insertions, 9 deletions
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 9aba9f13f0..7938fe7bc8 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -27,6 +27,9 @@ from .identicon_resource import IdenticonResource
 from .preview_url_resource import PreviewUrlResource
 from .filepath import MediaFilePaths
 from .thumbnailer import Thumbnailer
+from .storage_provider import (
+    StorageProviderWrapper, FileStorageProviderBackend,
+)
 from .media_storage import MediaStorage
 
 from synapse.http.matrixfederationclient import MatrixFederationHttpClient
@@ -66,10 +69,6 @@ class MediaRepository(object):
         self.primary_base_path = hs.config.media_store_path
         self.filepaths = MediaFilePaths(self.primary_base_path)
 
-        self.backup_base_path = hs.config.backup_media_store_path
-
-        self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store
-
         self.dynamic_thumbnails = hs.config.dynamic_thumbnails
         self.thumbnail_requirements = hs.config.thumbnail_requirements
 
@@ -77,7 +76,27 @@ class MediaRepository(object):
 
         self.recently_accessed_remotes = set()
 
-        self.media_storage = MediaStorage(self.primary_base_path, self.filepaths)
+        # List of StorageProvider's where we should search for media and
+        # potentially upload to.
+        self.storage_providers = []
+
+        # TODO: Move this into config and allow other storage providers to be
+        # defined.
+        if hs.config.backup_media_store_path:
+            backend = FileStorageProviderBackend(
+                self.primary_base_path, hs.config.backup_media_store_path,
+            )
+            provider = StorageProviderWrapper(
+                backend,
+                store=True,
+                store_synchronous=hs.config.synchronous_backup_media_store,
+                store_remote=True,
+            )
+            self.storage_providers.append(provider)
+
+        self.media_storage = MediaStorage(
+            self.primary_base_path, self.filepaths, self.storage_providers,
+        )
 
         self.clock.looping_call(
             self._update_recently_accessed_remotes,
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 052745e6f6..a1ec6cadb1 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -32,9 +32,10 @@ class MediaStorage(object):
     """Responsible for storing/fetching files from local sources.
     """
 
-    def __init__(self, local_media_directory, filepaths):
+    def __init__(self, local_media_directory, filepaths, storage_providers):
         self.local_media_directory = local_media_directory
         self.filepaths = filepaths
+        self.storage_providers = storage_providers
 
     @defer.inlineCallbacks
     def store_file(self, source, file_info):
@@ -90,11 +91,12 @@ class MediaStorage(object):
 
         finished_called = [False]
 
+        @defer.inlineCallbacks
         def finish():
-            # This will be used later when we want to hit out to other storage
-            # places
+            for provider in self.storage_providers:
+                yield provider.store_file(path, file_info)
+
             finished_called[0] = True
-            return defer.succeed(None)
 
         try:
             with open(fname, "wb") as f:
@@ -127,6 +129,11 @@ class MediaStorage(object):
         if os.path.exists(local_path):
             defer.returnValue(FileResponder(open(local_path, "rb")))
 
+        for provider in self.storage_providers:
+            res = yield provider.fetch(path, file_info)
+            if res:
+                defer.returnValue(res)
+
         defer.returnValue(None)
 
     def _file_info_to_path(self, file_info):
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
new file mode 100644
index 0000000000..2ad602e101
--- /dev/null
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -0,0 +1,127 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, threads
+
+from .media_storage import FileResponder
+
+from synapse.util.logcontext import preserve_fn
+
+import logging
+import os
+import shutil
+
+
+logger = logging.getLogger(__name__)
+
+
+class StorageProvider(object):
+    """A storage provider is a service that can store uploaded media and
+    retrieve them.
+    """
+    def store_file(self, path, file_info):
+        """Store the file described by file_info. The actual contents can be
+        retrieved by reading the file in file_info.upload_path.
+
+        Args:
+            path (str): Relative path of file in local cache
+            file_info (FileInfo)
+
+        Returns:
+            Deferred
+        """
+        pass
+
+    def fetch(self, path, file_info):
+        """Attempt to fetch the file described by file_info and stream it
+        into writer.
+
+        Args:
+            path (str): Relative path of file in local cache
+            file_info (FileInfo)
+
+        Returns:
+            Deferred(Responder): Returns a Responder if the provider has the file,
+                otherwise returns None.
+        """
+        pass
+
+
+class StorageProviderWrapper(StorageProvider):
+    """Wraps a storage provider and provides various config options
+
+    Args:
+        backend (StorageProvider)
+        store (bool): Whether to store new files or not.
+        store_synchronous (bool): Whether to wait for file to be successfully
+            uploaded, or todo the upload in the backgroud.
+        store_remote (bool): Whether remote media should be uploaded
+    """
+    def __init__(self, backend, store, store_synchronous, store_remote):
+        self.backend = backend
+        self.store = store
+        self.store_synchronous = store_synchronous
+        self.store_remote = store_remote
+
+    def store_file(self, path, file_info):
+        if not self.store:
+            return defer.succeed(None)
+
+        if file_info.server_name and not self.store_remote:
+            return defer.succeed(None)
+
+        if self.store_synchronous:
+            return self.backend.store_file(path, file_info)
+        else:
+            # TODO: Handle errors.
+            preserve_fn(self.backend.store_file)(path, file_info)
+            return defer.succeed(None)
+
+    def fetch(self, path, file_info):
+        return self.backend.fetch(path, file_info)
+
+
+class FileStorageProviderBackend(StorageProvider):
+    """A storage provider that stores files in a directory on a filesystem.
+
+    Args:
+        cache_directory (str): Base path of the local media repository
+        base_directory (str): Base path to store new files
+    """
+
+    def __init__(self, cache_directory, base_directory):
+        self.cache_directory = cache_directory
+        self.base_directory = base_directory
+
+    def store_file(self, path, file_info):
+        """See StorageProvider.store_file"""
+
+        primary_fname = os.path.join(self.cache_directory, path)
+        backup_fname = os.path.join(self.base_directory, path)
+
+        dirname = os.path.dirname(backup_fname)
+        if not os.path.exists(dirname):
+            os.makedirs(dirname)
+
+        return threads.deferToThread(
+            shutil.copyfile, primary_fname, backup_fname,
+        )
+
+    def fetch(self, path, file_info):
+        """See StorageProvider.fetch"""
+
+        backup_fname = os.path.join(self.base_directory, path)
+        if os.path.isfile(backup_fname):
+            return FileResponder(open(backup_fname, "rb"))