diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 9aba9f13f0..7938fe7bc8 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -27,6 +27,9 @@ from .identicon_resource import IdenticonResource
from .preview_url_resource import PreviewUrlResource
from .filepath import MediaFilePaths
from .thumbnailer import Thumbnailer
+from .storage_provider import (
+ StorageProviderWrapper, FileStorageProviderBackend,
+)
from .media_storage import MediaStorage
from synapse.http.matrixfederationclient import MatrixFederationHttpClient
@@ -66,10 +69,6 @@ class MediaRepository(object):
self.primary_base_path = hs.config.media_store_path
self.filepaths = MediaFilePaths(self.primary_base_path)
- self.backup_base_path = hs.config.backup_media_store_path
-
- self.synchronous_backup_media_store = hs.config.synchronous_backup_media_store
-
self.dynamic_thumbnails = hs.config.dynamic_thumbnails
self.thumbnail_requirements = hs.config.thumbnail_requirements
@@ -77,7 +76,27 @@ class MediaRepository(object):
self.recently_accessed_remotes = set()
- self.media_storage = MediaStorage(self.primary_base_path, self.filepaths)
+ # List of StorageProvider's where we should search for media and
+ # potentially upload to.
+ self.storage_providers = []
+
+ # TODO: Move this into config and allow other storage providers to be
+ # defined.
+ if hs.config.backup_media_store_path:
+ backend = FileStorageProviderBackend(
+ self.primary_base_path, hs.config.backup_media_store_path,
+ )
+ provider = StorageProviderWrapper(
+ backend,
+ store=True,
+ store_synchronous=hs.config.synchronous_backup_media_store,
+ store_remote=True,
+ )
+ self.storage_providers.append(provider)
+
+ self.media_storage = MediaStorage(
+ self.primary_base_path, self.filepaths, self.storage_providers,
+ )
self.clock.looping_call(
self._update_recently_accessed_remotes,
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 052745e6f6..a1ec6cadb1 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -32,9 +32,10 @@ class MediaStorage(object):
"""Responsible for storing/fetching files from local sources.
"""
- def __init__(self, local_media_directory, filepaths):
+ def __init__(self, local_media_directory, filepaths, storage_providers):
self.local_media_directory = local_media_directory
self.filepaths = filepaths
+ self.storage_providers = storage_providers
@defer.inlineCallbacks
def store_file(self, source, file_info):
@@ -90,11 +91,12 @@ class MediaStorage(object):
finished_called = [False]
+ @defer.inlineCallbacks
def finish():
- # This will be used later when we want to hit out to other storage
- # places
+ for provider in self.storage_providers:
+ yield provider.store_file(path, file_info)
+
finished_called[0] = True
- return defer.succeed(None)
try:
with open(fname, "wb") as f:
@@ -127,6 +129,11 @@ class MediaStorage(object):
if os.path.exists(local_path):
defer.returnValue(FileResponder(open(local_path, "rb")))
+ for provider in self.storage_providers:
+ res = yield provider.fetch(path, file_info)
+ if res:
+ defer.returnValue(res)
+
defer.returnValue(None)
def _file_info_to_path(self, file_info):
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
new file mode 100644
index 0000000000..2ad602e101
--- /dev/null
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -0,0 +1,127 @@
+# -*- coding: utf-8 -*-
+# Copyright 2018 New Vector Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+from twisted.internet import defer, threads
+
+from .media_storage import FileResponder
+
+from synapse.util.logcontext import preserve_fn
+
+import logging
+import os
+import shutil
+
+
+logger = logging.getLogger(__name__)
+
+
+class StorageProvider(object):
+ """A storage provider is a service that can store uploaded media and
+ retrieve them.
+ """
+ def store_file(self, path, file_info):
+ """Store the file described by file_info. The actual contents can be
+ retrieved by reading the file in file_info.upload_path.
+
+ Args:
+ path (str): Relative path of file in local cache
+ file_info (FileInfo)
+
+ Returns:
+ Deferred
+ """
+ pass
+
+ def fetch(self, path, file_info):
+ """Attempt to fetch the file described by file_info and stream it
+ into writer.
+
+ Args:
+ path (str): Relative path of file in local cache
+ file_info (FileInfo)
+
+ Returns:
+ Deferred(Responder): Returns a Responder if the provider has the file,
+ otherwise returns None.
+ """
+ pass
+
+
+class StorageProviderWrapper(StorageProvider):
+ """Wraps a storage provider and provides various config options
+
+ Args:
+ backend (StorageProvider)
+ store (bool): Whether to store new files or not.
+ store_synchronous (bool): Whether to wait for file to be successfully
+ uploaded, or todo the upload in the backgroud.
+ store_remote (bool): Whether remote media should be uploaded
+ """
+ def __init__(self, backend, store, store_synchronous, store_remote):
+ self.backend = backend
+ self.store = store
+ self.store_synchronous = store_synchronous
+ self.store_remote = store_remote
+
+ def store_file(self, path, file_info):
+ if not self.store:
+ return defer.succeed(None)
+
+ if file_info.server_name and not self.store_remote:
+ return defer.succeed(None)
+
+ if self.store_synchronous:
+ return self.backend.store_file(path, file_info)
+ else:
+ # TODO: Handle errors.
+ preserve_fn(self.backend.store_file)(path, file_info)
+ return defer.succeed(None)
+
+ def fetch(self, path, file_info):
+ return self.backend.fetch(path, file_info)
+
+
+class FileStorageProviderBackend(StorageProvider):
+ """A storage provider that stores files in a directory on a filesystem.
+
+ Args:
+ cache_directory (str): Base path of the local media repository
+ base_directory (str): Base path to store new files
+ """
+
+ def __init__(self, cache_directory, base_directory):
+ self.cache_directory = cache_directory
+ self.base_directory = base_directory
+
+ def store_file(self, path, file_info):
+ """See StorageProvider.store_file"""
+
+ primary_fname = os.path.join(self.cache_directory, path)
+ backup_fname = os.path.join(self.base_directory, path)
+
+ dirname = os.path.dirname(backup_fname)
+ if not os.path.exists(dirname):
+ os.makedirs(dirname)
+
+ return threads.deferToThread(
+ shutil.copyfile, primary_fname, backup_fname,
+ )
+
+ def fetch(self, path, file_info):
+ """See StorageProvider.fetch"""
+
+ backup_fname = os.path.join(self.base_directory, path)
+ if os.path.isfile(backup_fname):
+ return FileResponder(open(backup_fname, "rb"))
|