summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/databases/main/cache.py130
1 files changed, 130 insertions, 0 deletions
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c940f864d1..2fbd389c71 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -18,6 +18,8 @@ import logging
 from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple
 
 from synapse.api.constants import EventTypes
+from synapse.config._base import Config
+from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.replication.tcp.streams import BackfillStream, CachesStream
 from synapse.replication.tcp.streams.events import (
     EventsStream,
@@ -52,6 +54,21 @@ PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
 # As above, but for invalidating room caches on room deletion
 DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
 
+# How long between cache invalidation table cleanups, once we have caught up
+# with the backlog.
+REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h")
+
+# How long between cache invalidation table cleanups, before we have caught
+# up with the backlog.
+CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m")
+
+# Maximum number of cache invalidation rows to delete at once.
+CLEAN_UP_MAX_BATCH_SIZE = 20_000
+
+# Keep cache invalidations for 7 days
+# (This is likely to be quite excessive.)
+RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d")
+
 
 class CacheInvalidationWorkerStore(SQLBaseStore):
     def __init__(
@@ -98,6 +115,18 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
         else:
             self._cache_id_gen = None
 
+        # Occasionally clean up the cache invalidations stream table by deleting
+        # old rows.
+        # This is only applicable when Postgres is in use; this table is unused
+        # and not populated at all when SQLite is the active database engine.
+        if hs.config.worker.run_background_tasks and isinstance(
+            self.database_engine, PostgresEngine
+        ):
+            self.hs.get_clock().call_later(
+                CATCH_UP_CLEANUP_INTERVAL_MS / 1000,
+                self._clean_up_cache_invalidation_wrapper,
+            )
+
     async def get_all_updated_caches(
         self, instance_name: str, last_id: int, current_id: int, limit: int
     ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
@@ -554,3 +583,104 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
             return self._cache_id_gen.get_current_token_for_writer(instance_name)
         else:
             return 0
+
+    @wrap_as_background_process("clean_up_old_cache_invalidations")
+    async def _clean_up_cache_invalidation_wrapper(self) -> None:
+        """
+        Clean up cache invalidation stream table entries occasionally.
+        If we are behind (i.e. there are entries old enough to
+        be deleted but too many of them to be deleted in one go),
+        then we run slightly more frequently.
+        """
+        delete_up_to: int = (
+            self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS
+        )
+
+        in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to)
+
+        # Vary how long we wait before calling again depending on whether we
+        # are still sifting through backlog or we have caught up.
+        if in_backlog:
+            next_interval = CATCH_UP_CLEANUP_INTERVAL_MS
+        else:
+            next_interval = REGULAR_CLEANUP_INTERVAL_MS
+
+        self.hs.get_clock().call_later(
+            next_interval / 1000, self._clean_up_cache_invalidation_wrapper
+        )
+
+    async def _clean_up_batch_of_old_cache_invalidations(
+        self, delete_up_to_millisec: int
+    ) -> bool:
+        """
+        Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite).
+
+        Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once.
+
+        Returns true if and only if we were limited by batch size (i.e. we are in backlog:
+        there are more things to clean up).
+        """
+
+        def _clean_up_batch_of_old_cache_invalidations_txn(
+            txn: LoggingTransaction,
+        ) -> bool:
+            # First get the earliest stream ID
+            txn.execute(
+                """
+                SELECT stream_id FROM cache_invalidation_stream_by_instance
+                ORDER BY stream_id ASC
+                LIMIT 1
+                """
+            )
+            row = txn.fetchone()
+            if row is None:
+                return False
+            earliest_stream_id: int = row[0]
+
+            # Then find the last stream ID of the range we will delete
+            txn.execute(
+                """
+                SELECT stream_id FROM cache_invalidation_stream_by_instance
+                WHERE stream_id <= ? AND invalidation_ts <= ?
+                ORDER BY stream_id DESC
+                LIMIT 1
+                """,
+                (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec),
+            )
+            row = txn.fetchone()
+            if row is None:
+                return False
+            cutoff_stream_id: int = row[0]
+
+            # Determine whether we are caught up or still catching up
+            txn.execute(
+                """
+                SELECT invalidation_ts FROM cache_invalidation_stream_by_instance
+                WHERE stream_id > ?
+                ORDER BY stream_id ASC
+                LIMIT 1
+                """,
+                (cutoff_stream_id,),
+            )
+            row = txn.fetchone()
+            if row is None:
+                in_backlog = False
+            else:
+                # We are in backlog if the next row could have been deleted
+                # if we didn't have such a small batch size
+                in_backlog = row[0] <= delete_up_to_millisec
+
+            txn.execute(
+                """
+                DELETE FROM cache_invalidation_stream_by_instance
+                WHERE ? <= stream_id AND stream_id <= ?
+                """,
+                (earliest_stream_id, cutoff_stream_id),
+            )
+
+            return in_backlog
+
+        return await self.db_pool.runInteraction(
+            "clean_up_old_cache_invalidations",
+            _clean_up_batch_of_old_cache_invalidations_txn,
+        )