summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/12732.feature1
-rw-r--r--docs/usage/configuration/config_documentation.md29
-rw-r--r--synapse/config/repository.py16
-rw-r--r--synapse/rest/media/v1/media_repository.py71
-rw-r--r--tests/rest/media/test_media_retention.py238
5 files changed, 353 insertions, 2 deletions
diff --git a/changelog.d/12732.feature b/changelog.d/12732.feature
new file mode 100644
index 0000000000..3c73363d28
--- /dev/null
+++ b/changelog.d/12732.feature
@@ -0,0 +1 @@
+Add new `media_retention` options to the homeserver config for routinely cleaning up non-recently accessed media.
\ No newline at end of file
diff --git a/docs/usage/configuration/config_documentation.md b/docs/usage/configuration/config_documentation.md
index 88b9e5744d..1c75a23a36 100644
--- a/docs/usage/configuration/config_documentation.md
+++ b/docs/usage/configuration/config_documentation.md
@@ -1459,7 +1459,7 @@ federation_rr_transactions_per_room_per_second: 40
 ```
 ---
 ## Media Store ##
-Config options relating to Synapse media store.
+Config options related to Synapse's media store.
 
 ---
 Config option: `enable_media_repo` 
@@ -1563,6 +1563,33 @@ thumbnail_sizes:
     height: 600
     method: scale
 ```
+---
+Config option: `media_retention`
+
+Controls whether local media and entries in the remote media cache
+(media that is downloaded from other homeservers) should be removed
+under certain conditions, typically for the purpose of saving space.
+
+Purging media files will be the carried out by the media worker
+(that is, the worker that has the `enable_media_repo` homeserver config
+option set to 'true'). This may be the main process.
+
+The `media_retention.local_media_lifetime` and
+`media_retention.remote_media_lifetime` config options control whether
+media will be purged if it has not been accessed in a given amount of
+time. Note that media is 'accessed' when loaded in a room in a client, or
+otherwise downloaded by a local or remote user. If the media has never
+been accessed, the media's creation time is used instead. Both thumbnails
+and the original media will be removed. If either of these options are unset,
+then media of that type will not be purged.
+
+Example configuration:
+```yaml
+media_retention:
+    local_media_lifetime: 90d
+    remote_media_lifetime: 14d
+```
+---
 Config option: `url_preview_enabled`
 
 This setting determines whether the preview URL API is enabled.
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index 98d8a16621..f9c55143c3 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -223,6 +223,22 @@ class ContentRepositoryConfig(Config):
                 "url_preview_accept_language"
             ) or ["en"]
 
+        media_retention = config.get("media_retention") or {}
+
+        self.media_retention_local_media_lifetime_ms = None
+        local_media_lifetime = media_retention.get("local_media_lifetime")
+        if local_media_lifetime is not None:
+            self.media_retention_local_media_lifetime_ms = self.parse_duration(
+                local_media_lifetime
+            )
+
+        self.media_retention_remote_media_lifetime_ms = None
+        remote_media_lifetime = media_retention.get("remote_media_lifetime")
+        if remote_media_lifetime is not None:
+            self.media_retention_remote_media_lifetime_ms = self.parse_duration(
+                remote_media_lifetime
+            )
+
     def generate_config_section(self, data_dir_path: str, **kwargs: Any) -> str:
         assert data_dir_path is not None
         media_store = os.path.join(data_dir_path, "media_store")
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index 3e5d6c6294..20af366538 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -65,7 +65,12 @@ if TYPE_CHECKING:
 logger = logging.getLogger(__name__)
 
 
-UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000
+# How often to run the background job to update the "recently accessed"
+# attribute of local and remote media.
+UPDATE_RECENTLY_ACCESSED_TS = 60 * 1000  # 1 minute
+# How often to run the background job to check for local and remote media
+# that should be purged according to the configured media retention settings.
+MEDIA_RETENTION_CHECK_PERIOD_MS = 60 * 60 * 1000  # 1 hour
 
 
 class MediaRepository:
@@ -122,11 +127,36 @@ class MediaRepository:
             self._start_update_recently_accessed, UPDATE_RECENTLY_ACCESSED_TS
         )
 
+        # Media retention configuration options
+        self._media_retention_local_media_lifetime_ms = (
+            hs.config.media.media_retention_local_media_lifetime_ms
+        )
+        self._media_retention_remote_media_lifetime_ms = (
+            hs.config.media.media_retention_remote_media_lifetime_ms
+        )
+
+        # Check whether local or remote media retention is configured
+        if (
+            hs.config.media.media_retention_local_media_lifetime_ms is not None
+            or hs.config.media.media_retention_remote_media_lifetime_ms is not None
+        ):
+            # Run the background job to apply media retention rules routinely,
+            # with the duration between runs dictated by the homeserver config.
+            self.clock.looping_call(
+                self._start_apply_media_retention_rules,
+                MEDIA_RETENTION_CHECK_PERIOD_MS,
+            )
+
     def _start_update_recently_accessed(self) -> Deferred:
         return run_as_background_process(
             "update_recently_accessed_media", self._update_recently_accessed
         )
 
+    def _start_apply_media_retention_rules(self) -> Deferred:
+        return run_as_background_process(
+            "apply_media_retention_rules", self._apply_media_retention_rules
+        )
+
     async def _update_recently_accessed(self) -> None:
         remote_media = self.recently_accessed_remotes
         self.recently_accessed_remotes = set()
@@ -835,6 +865,45 @@ class MediaRepository:
 
         return {"width": m_width, "height": m_height}
 
+    async def _apply_media_retention_rules(self) -> None:
+        """
+        Purge old local and remote media according to the media retention rules
+        defined in the homeserver config.
+        """
+        # Purge remote media
+        if self._media_retention_remote_media_lifetime_ms is not None:
+            # Calculate a threshold timestamp derived from the configured lifetime. Any
+            # media that has not been accessed since this timestamp will be removed.
+            remote_media_threshold_timestamp_ms = (
+                self.clock.time_msec() - self._media_retention_remote_media_lifetime_ms
+            )
+
+            logger.info(
+                "Purging remote media last accessed before"
+                f" {remote_media_threshold_timestamp_ms}"
+            )
+
+            await self.delete_old_remote_media(
+                before_ts=remote_media_threshold_timestamp_ms
+            )
+
+        # And now do the same for local media
+        if self._media_retention_local_media_lifetime_ms is not None:
+            # This works the same as the remote media threshold
+            local_media_threshold_timestamp_ms = (
+                self.clock.time_msec() - self._media_retention_local_media_lifetime_ms
+            )
+
+            logger.info(
+                "Purging local media last accessed before"
+                f" {local_media_threshold_timestamp_ms}"
+            )
+
+            await self.delete_old_local_media(
+                before_ts=local_media_threshold_timestamp_ms,
+                keep_profiles=True,
+            )
+
     async def delete_old_remote_media(self, before_ts: int) -> Dict[str, int]:
         old_media = await self.store.get_remote_media_before(before_ts)
 
diff --git a/tests/rest/media/test_media_retention.py b/tests/rest/media/test_media_retention.py
new file mode 100644
index 0000000000..b98a5cd586
--- /dev/null
+++ b/tests/rest/media/test_media_retention.py
@@ -0,0 +1,238 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import io
+from typing import Iterable, Optional, Tuple
+
+from twisted.test.proto_helpers import MemoryReactor
+
+from synapse.rest import admin
+from synapse.rest.client import login, register, room
+from synapse.server import HomeServer
+from synapse.types import UserID
+from synapse.util import Clock
+
+from tests import unittest
+from tests.unittest import override_config
+from tests.utils import MockClock
+
+
+class MediaRetentionTestCase(unittest.HomeserverTestCase):
+
+    ONE_DAY_IN_MS = 24 * 60 * 60 * 1000
+    THIRTY_DAYS_IN_MS = 30 * ONE_DAY_IN_MS
+
+    servlets = [
+        room.register_servlets,
+        login.register_servlets,
+        register.register_servlets,
+        admin.register_servlets_for_client_rest_resource,
+    ]
+
+    def make_homeserver(self, reactor: MemoryReactor, clock: Clock) -> HomeServer:
+        # We need to be able to test advancing time in the homeserver, so we
+        # replace the test homeserver's default clock with a MockClock, which
+        # supports advancing time.
+        return self.setup_test_homeserver(clock=MockClock())
+
+    def prepare(self, reactor: MemoryReactor, clock: Clock, hs: HomeServer) -> None:
+        self.remote_server_name = "remote.homeserver"
+        self.store = hs.get_datastores().main
+
+        # Create a user to upload media with
+        test_user_id = self.register_user("alice", "password")
+
+        # Inject media (3 images each; recently accessed, old access, never accessed)
+        # into both the local store and the remote cache
+        media_repository = hs.get_media_repository()
+        test_media_content = b"example string"
+
+        def _create_media_and_set_last_accessed(
+            last_accessed_ms: Optional[int],
+        ) -> str:
+            # "Upload" some media to the local media store
+            mxc_uri = self.get_success(
+                media_repository.create_content(
+                    media_type="text/plain",
+                    upload_name=None,
+                    content=io.BytesIO(test_media_content),
+                    content_length=len(test_media_content),
+                    auth_user=UserID.from_string(test_user_id),
+                )
+            )
+
+            media_id = mxc_uri.split("/")[-1]
+
+            # Set the last recently accessed time for this media
+            if last_accessed_ms is not None:
+                self.get_success(
+                    self.store.update_cached_last_access_time(
+                        local_media=(media_id,),
+                        remote_media=(),
+                        time_ms=last_accessed_ms,
+                    )
+                )
+
+            return media_id
+
+        def _cache_remote_media_and_set_last_accessed(
+            media_id: str, last_accessed_ms: Optional[int]
+        ) -> str:
+            # Pretend to cache some remote media
+            self.get_success(
+                self.store.store_cached_remote_media(
+                    origin=self.remote_server_name,
+                    media_id=media_id,
+                    media_type="text/plain",
+                    media_length=1,
+                    time_now_ms=clock.time_msec(),
+                    upload_name="testfile.txt",
+                    filesystem_id="abcdefg12345",
+                )
+            )
+
+            # Set the last recently accessed time for this media
+            if last_accessed_ms is not None:
+                self.get_success(
+                    hs.get_datastores().main.update_cached_last_access_time(
+                        local_media=(),
+                        remote_media=((self.remote_server_name, media_id),),
+                        time_ms=last_accessed_ms,
+                    )
+                )
+
+            return media_id
+
+        # Start with the local media store
+        self.local_recently_accessed_media = _create_media_and_set_last_accessed(
+            self.THIRTY_DAYS_IN_MS
+        )
+        self.local_not_recently_accessed_media = _create_media_and_set_last_accessed(
+            self.ONE_DAY_IN_MS
+        )
+        self.local_never_accessed_media = _create_media_and_set_last_accessed(None)
+
+        # And now the remote media store
+        self.remote_recently_accessed_media = _cache_remote_media_and_set_last_accessed(
+            "a", self.THIRTY_DAYS_IN_MS
+        )
+        self.remote_not_recently_accessed_media = (
+            _cache_remote_media_and_set_last_accessed("b", self.ONE_DAY_IN_MS)
+        )
+        # Remote media will always have a "last accessed" attribute, as it would not
+        # be fetched from the remote homeserver unless instigated by a user.
+
+    @override_config(
+        {
+            "media_retention": {
+                # Enable retention for local media
+                "local_media_lifetime": "30d"
+                # Cached remote media should not be purged
+            }
+        }
+    )
+    def test_local_media_retention(self) -> None:
+        """
+        Tests that local media that have not been accessed recently is purged, while
+        cached remote media is unaffected.
+        """
+        # Advance 31 days (in seconds)
+        self.reactor.advance(31 * 24 * 60 * 60)
+
+        # Check that media has been correctly purged.
+        # Local media accessed <30 days ago should still exist.
+        # Remote media should be unaffected.
+        self._assert_if_mxc_uris_purged(
+            purged=[
+                (
+                    self.hs.config.server.server_name,
+                    self.local_not_recently_accessed_media,
+                ),
+                (self.hs.config.server.server_name, self.local_never_accessed_media),
+            ],
+            not_purged=[
+                (self.hs.config.server.server_name, self.local_recently_accessed_media),
+                (self.remote_server_name, self.remote_recently_accessed_media),
+                (self.remote_server_name, self.remote_not_recently_accessed_media),
+            ],
+        )
+
+    @override_config(
+        {
+            "media_retention": {
+                # Enable retention for cached remote media
+                "remote_media_lifetime": "30d"
+                # Local media should not be purged
+            }
+        }
+    )
+    def test_remote_media_cache_retention(self) -> None:
+        """
+        Tests that entries from the remote media cache that have not been accessed
+        recently is purged, while local media is unaffected.
+        """
+        # Advance 31 days (in seconds)
+        self.reactor.advance(31 * 24 * 60 * 60)
+
+        # Check that media has been correctly purged.
+        # Local media should be unaffected.
+        # Remote media accessed <30 days ago should still exist.
+        self._assert_if_mxc_uris_purged(
+            purged=[
+                (self.remote_server_name, self.remote_not_recently_accessed_media),
+            ],
+            not_purged=[
+                (self.remote_server_name, self.remote_recently_accessed_media),
+                (self.hs.config.server.server_name, self.local_recently_accessed_media),
+                (
+                    self.hs.config.server.server_name,
+                    self.local_not_recently_accessed_media,
+                ),
+                (self.hs.config.server.server_name, self.local_never_accessed_media),
+            ],
+        )
+
+    def _assert_if_mxc_uris_purged(
+        self, purged: Iterable[Tuple[str, str]], not_purged: Iterable[Tuple[str, str]]
+    ) -> None:
+        def _assert_mxc_uri_purge_state(
+            server_name: str, media_id: str, expect_purged: bool
+        ) -> None:
+            """Given an MXC URI, assert whether it has been purged or not."""
+            if server_name == self.hs.config.server.server_name:
+                found_media_dict = self.get_success(
+                    self.store.get_local_media(media_id)
+                )
+            else:
+                found_media_dict = self.get_success(
+                    self.store.get_cached_remote_media(server_name, media_id)
+                )
+
+            mxc_uri = f"mxc://{server_name}/{media_id}"
+
+            if expect_purged:
+                self.assertIsNone(
+                    found_media_dict, msg=f"{mxc_uri} unexpectedly not purged"
+                )
+            else:
+                self.assertIsNotNone(
+                    found_media_dict,
+                    msg=f"{mxc_uri} unexpectedly purged",
+                )
+
+        # Assert that the given MXC URIs have either been correctly purged or not.
+        for server_name, media_id in purged:
+            _assert_mxc_uri_purge_state(server_name, media_id, expect_purged=True)
+        for server_name, media_id in not_purged:
+            _assert_mxc_uri_purge_state(server_name, media_id, expect_purged=False)