diff options
author | Erik Johnston <erikj@jki.re> | 2016-06-29 15:50:57 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2016-06-29 15:50:57 +0100 |
commit | aac546c978e40c832854eb48cefeddaebca12b3e (patch) | |
tree | c09ca640a0e61e65dc5913bf7d6c1b65f4d4de28 /synapse/rest | |
parent | Merge pull request #900 from RickCogley/RickCogley-coturn-readme-2 (diff) | |
parent | Remove race (diff) | |
download | synapse-aac546c978e40c832854eb48cefeddaebca12b3e.tar.xz |
Merge pull request #902 from matrix-org/erikj/expire_media
Feature: Implement purge_media_cache admin API
Diffstat (limited to 'synapse/rest')
-rw-r--r-- | synapse/rest/client/v1/admin.py | 32 | ||||
-rw-r--r-- | synapse/rest/media/v1/filepath.py | 6 | ||||
-rw-r--r-- | synapse/rest/media/v1/media_repository.py | 91 |
3 files changed, 109 insertions, 20 deletions
diff --git a/synapse/rest/client/v1/admin.py b/synapse/rest/client/v1/admin.py index aa05b3f023..8ec8569a49 100644 --- a/synapse/rest/client/v1/admin.py +++ b/synapse/rest/client/v1/admin.py @@ -46,5 +46,37 @@ class WhoisRestServlet(ClientV1RestServlet): defer.returnValue((200, ret)) +class PurgeMediaCacheRestServlet(ClientV1RestServlet): + PATTERNS = client_path_patterns("/admin/purge_media_cache") + + def __init__(self, hs): + self.media_repository = hs.get_media_repository() + super(PurgeMediaCacheRestServlet, self).__init__(hs) + + @defer.inlineCallbacks + def on_POST(self, request): + requester = yield self.auth.get_user_by_req(request) + is_admin = yield self.auth.is_server_admin(requester.user) + + if not is_admin: + raise AuthError(403, "You are not a server admin") + + before_ts = request.args.get("before_ts", None) + if not before_ts: + raise SynapseError(400, "Missing 'before_ts' arg") + + logger.info("before_ts: %r", before_ts[0]) + + try: + before_ts = int(before_ts[0]) + except Exception: + raise SynapseError(400, "Invalid 'before_ts' arg") + + ret = yield self.media_repository.delete_old_remote_media(before_ts) + + defer.returnValue((200, ret)) + + def register_servlets(hs, http_server): WhoisRestServlet(hs).register(http_server) + PurgeMediaCacheRestServlet(hs).register(http_server) diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py index 422ab86fb3..0137458f71 100644 --- a/synapse/rest/media/v1/filepath.py +++ b/synapse/rest/media/v1/filepath.py @@ -65,3 +65,9 @@ class MediaFilePaths(object): file_id[0:2], file_id[2:4], file_id[4:], file_name ) + + def remote_media_thumbnail_dir(self, server_name, file_id): + return os.path.join( + self.base_path, "remote_thumbnail", server_name, + file_id[0:2], file_id[2:4], file_id[4:], + ) diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py index 2468c3ac42..692e078419 100644 --- a/synapse/rest/media/v1/media_repository.py +++ b/synapse/rest/media/v1/media_repository.py @@ -30,11 +30,13 @@ from synapse.api.errors import SynapseError from twisted.internet import defer, threads -from synapse.util.async import ObservableDeferred +from synapse.util.async import Linearizer from synapse.util.stringutils import is_ascii from synapse.util.logcontext import preserve_context_over_fn import os +import errno +import shutil import cgi import logging @@ -43,8 +45,11 @@ import urlparse logger = logging.getLogger(__name__) +UPDATE_RECENTLY_ACCESSED_REMOTES_TS = 60 * 1000 + + class MediaRepository(object): - def __init__(self, hs, filepaths): + def __init__(self, hs): self.auth = hs.get_auth() self.client = MatrixFederationHttpClient(hs) self.clock = hs.get_clock() @@ -52,11 +57,28 @@ class MediaRepository(object): self.store = hs.get_datastore() self.max_upload_size = hs.config.max_upload_size self.max_image_pixels = hs.config.max_image_pixels - self.filepaths = filepaths - self.downloads = {} + self.filepaths = MediaFilePaths(hs.config.media_store_path) self.dynamic_thumbnails = hs.config.dynamic_thumbnails self.thumbnail_requirements = hs.config.thumbnail_requirements + self.remote_media_linearizer = Linearizer() + + self.recently_accessed_remotes = set() + + self.clock.looping_call( + self._update_recently_accessed_remotes, + UPDATE_RECENTLY_ACCESSED_REMOTES_TS + ) + + @defer.inlineCallbacks + def _update_recently_accessed_remotes(self): + media = self.recently_accessed_remotes + self.recently_accessed_remotes = set() + + yield self.store.update_cached_last_access_time( + media, self.clock.time_msec() + ) + @staticmethod def _makedirs(filepath): dirname = os.path.dirname(filepath) @@ -93,22 +115,12 @@ class MediaRepository(object): defer.returnValue("mxc://%s/%s" % (self.server_name, media_id)) + @defer.inlineCallbacks def get_remote_media(self, server_name, media_id): key = (server_name, media_id) - download = self.downloads.get(key) - if download is None: - download = self._get_remote_media_impl(server_name, media_id) - download = ObservableDeferred( - download, - consumeErrors=True - ) - self.downloads[key] = download - - @download.addBoth - def callback(media_info): - del self.downloads[key] - return media_info - return download.observe() + with (yield self.remote_media_linearizer.queue(key)): + media_info = yield self._get_remote_media_impl(server_name, media_id) + defer.returnValue(media_info) @defer.inlineCallbacks def _get_remote_media_impl(self, server_name, media_id): @@ -119,6 +131,11 @@ class MediaRepository(object): media_info = yield self._download_remote_file( server_name, media_id ) + else: + self.recently_accessed_remotes.add((server_name, media_id)) + yield self.store.update_cached_last_access_time( + [(server_name, media_id)], self.clock.time_msec() + ) defer.returnValue(media_info) @defer.inlineCallbacks @@ -416,6 +433,41 @@ class MediaRepository(object): "height": m_height, }) + @defer.inlineCallbacks + def delete_old_remote_media(self, before_ts): + old_media = yield self.store.get_remote_media_before(before_ts) + + deleted = 0 + + for media in old_media: + origin = media["media_origin"] + media_id = media["media_id"] + file_id = media["filesystem_id"] + key = (origin, media_id) + + logger.info("Deleting: %r", key) + + with (yield self.remote_media_linearizer.queue(key)): + full_path = self.filepaths.remote_media_filepath(origin, file_id) + try: + os.remove(full_path) + except OSError as e: + logger.warn("Failed to remove file: %r", full_path) + if e.errno == errno.ENOENT: + pass + else: + continue + + thumbnail_dir = self.filepaths.remote_media_thumbnail_dir( + origin, file_id + ) + shutil.rmtree(thumbnail_dir, ignore_errors=True) + + yield self.store.delete_remote_media(origin, media_id) + deleted += 1 + + defer.returnValue({"deleted": deleted}) + class MediaRepositoryResource(Resource): """File uploading and downloading. @@ -464,9 +516,8 @@ class MediaRepositoryResource(Resource): def __init__(self, hs): Resource.__init__(self) - filepaths = MediaFilePaths(hs.config.media_store_path) - media_repo = MediaRepository(hs, filepaths) + media_repo = hs.get_media_repository() self.putChild("upload", UploadResource(hs, media_repo)) self.putChild("download", DownloadResource(hs, media_repo)) |