diff --git a/changelog.d/15868.feature b/changelog.d/15868.feature
new file mode 100644
index 0000000000..a866bf5774
--- /dev/null
+++ b/changelog.d/15868.feature
@@ -0,0 +1 @@
+Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite).
diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index c940f864d1..2fbd389c71 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -18,6 +18,8 @@ import logging
from typing import TYPE_CHECKING, Any, Collection, Iterable, List, Optional, Tuple
from synapse.api.constants import EventTypes
+from synapse.config._base import Config
+from synapse.metrics.background_process_metrics import wrap_as_background_process
from synapse.replication.tcp.streams import BackfillStream, CachesStream
from synapse.replication.tcp.streams.events import (
EventsStream,
@@ -52,6 +54,21 @@ PURGE_HISTORY_CACHE_NAME = "ph_cache_fake"
# As above, but for invalidating room caches on room deletion
DELETE_ROOM_CACHE_NAME = "dr_cache_fake"
+# How long between cache invalidation table cleanups, once we have caught up
+# with the backlog.
+REGULAR_CLEANUP_INTERVAL_MS = Config.parse_duration("1h")
+
+# How long between cache invalidation table cleanups, before we have caught
+# up with the backlog.
+CATCH_UP_CLEANUP_INTERVAL_MS = Config.parse_duration("1m")
+
+# Maximum number of cache invalidation rows to delete at once.
+CLEAN_UP_MAX_BATCH_SIZE = 20_000
+
+# Keep cache invalidations for 7 days
+# (This is likely to be quite excessive.)
+RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS = Config.parse_duration("7d")
+
class CacheInvalidationWorkerStore(SQLBaseStore):
def __init__(
@@ -98,6 +115,18 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
else:
self._cache_id_gen = None
+ # Occasionally clean up the cache invalidations stream table by deleting
+ # old rows.
+ # This is only applicable when Postgres is in use; this table is unused
+ # and not populated at all when SQLite is the active database engine.
+ if hs.config.worker.run_background_tasks and isinstance(
+ self.database_engine, PostgresEngine
+ ):
+ self.hs.get_clock().call_later(
+ CATCH_UP_CLEANUP_INTERVAL_MS / 1000,
+ self._clean_up_cache_invalidation_wrapper,
+ )
+
async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
@@ -554,3 +583,104 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
return self._cache_id_gen.get_current_token_for_writer(instance_name)
else:
return 0
+
+ @wrap_as_background_process("clean_up_old_cache_invalidations")
+ async def _clean_up_cache_invalidation_wrapper(self) -> None:
+ """
+ Clean up cache invalidation stream table entries occasionally.
+ If we are behind (i.e. there are entries old enough to
+ be deleted but too many of them to be deleted in one go),
+ then we run slightly more frequently.
+ """
+ delete_up_to: int = (
+ self.hs.get_clock().time_msec() - RETENTION_PERIOD_OF_CACHE_INVALIDATIONS_MS
+ )
+
+ in_backlog = await self._clean_up_batch_of_old_cache_invalidations(delete_up_to)
+
+ # Vary how long we wait before calling again depending on whether we
+ # are still sifting through backlog or we have caught up.
+ if in_backlog:
+ next_interval = CATCH_UP_CLEANUP_INTERVAL_MS
+ else:
+ next_interval = REGULAR_CLEANUP_INTERVAL_MS
+
+ self.hs.get_clock().call_later(
+ next_interval / 1000, self._clean_up_cache_invalidation_wrapper
+ )
+
+ async def _clean_up_batch_of_old_cache_invalidations(
+ self, delete_up_to_millisec: int
+ ) -> bool:
+ """
+ Remove old rows from the `cache_invalidation_stream_by_instance` table automatically (this table is unused in SQLite).
+
+ Up to `CLEAN_UP_BATCH_SIZE` rows will be deleted at once.
+
+ Returns true if and only if we were limited by batch size (i.e. we are in backlog:
+ there are more things to clean up).
+ """
+
+ def _clean_up_batch_of_old_cache_invalidations_txn(
+ txn: LoggingTransaction,
+ ) -> bool:
+ # First get the earliest stream ID
+ txn.execute(
+ """
+ SELECT stream_id FROM cache_invalidation_stream_by_instance
+ ORDER BY stream_id ASC
+ LIMIT 1
+ """
+ )
+ row = txn.fetchone()
+ if row is None:
+ return False
+ earliest_stream_id: int = row[0]
+
+ # Then find the last stream ID of the range we will delete
+ txn.execute(
+ """
+ SELECT stream_id FROM cache_invalidation_stream_by_instance
+ WHERE stream_id <= ? AND invalidation_ts <= ?
+ ORDER BY stream_id DESC
+ LIMIT 1
+ """,
+ (earliest_stream_id + CLEAN_UP_MAX_BATCH_SIZE, delete_up_to_millisec),
+ )
+ row = txn.fetchone()
+ if row is None:
+ return False
+ cutoff_stream_id: int = row[0]
+
+ # Determine whether we are caught up or still catching up
+ txn.execute(
+ """
+ SELECT invalidation_ts FROM cache_invalidation_stream_by_instance
+ WHERE stream_id > ?
+ ORDER BY stream_id ASC
+ LIMIT 1
+ """,
+ (cutoff_stream_id,),
+ )
+ row = txn.fetchone()
+ if row is None:
+ in_backlog = False
+ else:
+ # We are in backlog if the next row could have been deleted
+ # if we didn't have such a small batch size
+ in_backlog = row[0] <= delete_up_to_millisec
+
+ txn.execute(
+ """
+ DELETE FROM cache_invalidation_stream_by_instance
+ WHERE ? <= stream_id AND stream_id <= ?
+ """,
+ (earliest_stream_id, cutoff_stream_id),
+ )
+
+ return in_backlog
+
+ return await self.db_pool.runInteraction(
+ "clean_up_old_cache_invalidations",
+ _clean_up_batch_of_old_cache_invalidations_txn,
+ )
|