summary refs log tree commit diff
path: root/synapse/rest
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/rest')
-rw-r--r--synapse/rest/client/v1/admin.py32
-rw-r--r--synapse/rest/media/v1/filepath.py6
-rw-r--r--synapse/rest/media/v1/media_repository.py91
3 files changed, 109 insertions, 20 deletions
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py
index aa05b3f023..8ec8569a49 100644
--- a/synapse/rest/client/v1/admin.py
+++ b/synapse/rest/client/v1/admin.py
@@ -46,5 +46,37 @@ class WhoisRestServlet(ClientV1RestServlet):
         defer.returnValue((200, ret))
 
 
+class PurgeMediaCacheRestServlet(ClientV1RestServlet):
+    PATTERNS = client_path_patterns("/admin/purge_media_cache")
+
+    def __init__(self, hs):
+        self.media_repository = hs.get_media_repository()
+        super(PurgeMediaCacheRestServlet, self).__init__(hs)
+
+    @defer.inlineCallbacks
+    def on_POST(self, request):
+        requester = yield self.auth.get_user_by_req(request)
+        is_admin = yield self.auth.is_server_admin(requester.user)
+
+        if not is_admin:
+            raise AuthError(403, "You are not a server admin")
+
+        before_ts = request.args.get("before_ts", None)
+        if not before_ts:
+            raise SynapseError(400, "Missing 'before_ts' arg")
+
+        logger.info("before_ts: %r", before_ts[0])
+
+        try:
+            before_ts = int(before_ts[0])
+        except Exception:
+            raise SynapseError(400, "Invalid 'before_ts' arg")
+
+        ret = yield self.media_repository.delete_old_remote_media(before_ts)
+
+        defer.returnValue((200, ret))
+
+
 def register_servlets(hs, http_server):
     WhoisRestServlet(hs).register(http_server)
+    PurgeMediaCacheRestServlet(hs).register(http_server)
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 422ab86fb3..0137458f71 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -65,3 +65,9 @@ class MediaFilePaths(object):
             file_id[0:2], file_id[2:4], file_id[4:],
             file_name
         )
+
+    def remote_media_thumbnail_dir(self, server_name, file_id):
+        return os.path.join(
+            self.base_path, "remote_thumbnail", server_name,
+            file_id[0:2], file_id[2:4], file_id[4:],
+        )
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 2468c3ac42..692e078419 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -30,11 +30,13 @@ from synapse.api.errors import SynapseError
 
 from twisted.internet import defer, threads
 
-from synapse.util.async import ObservableDeferred
+from synapse.util.async import Linearizer
 from synapse.util.stringutils import is_ascii
 from synapse.util.logcontext import preserve_context_over_fn
 
 import os
+import errno
+import shutil
 
 import cgi
 import logging
@@ -43,8 +45,11 @@ import urlparse
 logger = logging.getLogger(__name__)
 
 
+UPDATE_RECENTLY_ACCESSED_REMOTES_TS = 60 * 1000
+
+
 class MediaRepository(object):
-    def __init__(self, hs, filepaths):
+    def __init__(self, hs):
         self.auth = hs.get_auth()
         self.client = MatrixFederationHttpClient(hs)
         self.clock = hs.get_clock()
@@ -52,11 +57,28 @@ class MediaRepository(object):
         self.store = hs.get_datastore()
         self.max_upload_size = hs.config.max_upload_size
         self.max_image_pixels = hs.config.max_image_pixels
-        self.filepaths = filepaths
-        self.downloads = {}
+        self.filepaths = MediaFilePaths(hs.config.media_store_path)
         self.dynamic_thumbnails = hs.config.dynamic_thumbnails
         self.thumbnail_requirements = hs.config.thumbnail_requirements
 
+        self.remote_media_linearizer = Linearizer()
+
+        self.recently_accessed_remotes = set()
+
+        self.clock.looping_call(
+            self._update_recently_accessed_remotes,
+            UPDATE_RECENTLY_ACCESSED_REMOTES_TS
+        )
+
+    @defer.inlineCallbacks
+    def _update_recently_accessed_remotes(self):
+        media = self.recently_accessed_remotes
+        self.recently_accessed_remotes = set()
+
+        yield self.store.update_cached_last_access_time(
+            media, self.clock.time_msec()
+        )
+
     @staticmethod
     def _makedirs(filepath):
         dirname = os.path.dirname(filepath)
@@ -93,22 +115,12 @@ class MediaRepository(object):
 
         defer.returnValue("mxc://%s/%s" % (self.server_name, media_id))
 
+    @defer.inlineCallbacks
     def get_remote_media(self, server_name, media_id):
         key = (server_name, media_id)
-        download = self.downloads.get(key)
-        if download is None:
-            download = self._get_remote_media_impl(server_name, media_id)
-            download = ObservableDeferred(
-                download,
-                consumeErrors=True
-            )
-            self.downloads[key] = download
-
-            @download.addBoth
-            def callback(media_info):
-                del self.downloads[key]
-                return media_info
-        return download.observe()
+        with (yield self.remote_media_linearizer.queue(key)):
+            media_info = yield self._get_remote_media_impl(server_name, media_id)
+        defer.returnValue(media_info)
 
     @defer.inlineCallbacks
     def _get_remote_media_impl(self, server_name, media_id):
@@ -119,6 +131,11 @@ class MediaRepository(object):
             media_info = yield self._download_remote_file(
                 server_name, media_id
             )
+        else:
+            self.recently_accessed_remotes.add((server_name, media_id))
+            yield self.store.update_cached_last_access_time(
+                [(server_name, media_id)], self.clock.time_msec()
+            )
         defer.returnValue(media_info)
 
     @defer.inlineCallbacks
@@ -416,6 +433,41 @@ class MediaRepository(object):
             "height": m_height,
         })
 
+    @defer.inlineCallbacks
+    def delete_old_remote_media(self, before_ts):
+        old_media = yield self.store.get_remote_media_before(before_ts)
+
+        deleted = 0
+
+        for media in old_media:
+            origin = media["media_origin"]
+            media_id = media["media_id"]
+            file_id = media["filesystem_id"]
+            key = (origin, media_id)
+
+            logger.info("Deleting: %r", key)
+
+            with (yield self.remote_media_linearizer.queue(key)):
+                full_path = self.filepaths.remote_media_filepath(origin, file_id)
+                try:
+                    os.remove(full_path)
+                except OSError as e:
+                    logger.warn("Failed to remove file: %r", full_path)
+                    if e.errno == errno.ENOENT:
+                        pass
+                    else:
+                        continue
+
+                thumbnail_dir = self.filepaths.remote_media_thumbnail_dir(
+                    origin, file_id
+                )
+                shutil.rmtree(thumbnail_dir, ignore_errors=True)
+
+                yield self.store.delete_remote_media(origin, media_id)
+                deleted += 1
+
+        defer.returnValue({"deleted": deleted})
+
 
 class MediaRepositoryResource(Resource):
     """File uploading and downloading.
@@ -464,9 +516,8 @@ class MediaRepositoryResource(Resource):
 
     def __init__(self, hs):
         Resource.__init__(self)
-        filepaths = MediaFilePaths(hs.config.media_store_path)
 
-        media_repo = MediaRepository(hs, filepaths)
+        media_repo = hs.get_media_repository()
 
         self.putChild("upload", UploadResource(hs, media_repo))
         self.putChild("download", DownloadResource(hs, media_repo))