diff --git a/synapse/storage/databases/main/cache.py b/synapse/storage/databases/main/cache.py
index 2367ddeea3..12e9a42382 100644
--- a/synapse/storage/databases/main/cache.py
+++ b/synapse/storage/databases/main/cache.py
@@ -32,6 +32,7 @@ from synapse.storage.database import (
LoggingTransaction,
)
from synapse.storage.engines import PostgresEngine
+from synapse.storage.util.id_generators import MultiWriterIdGenerator
from synapse.util.caches.descriptors import _CachedFunction
from synapse.util.iterutils import batch_iter
@@ -65,6 +66,31 @@ class CacheInvalidationWorkerStore(SQLBaseStore):
psql_only=True, # The table is only on postgres DBs.
)
+ self._cache_id_gen: Optional[MultiWriterIdGenerator]
+ if isinstance(self.database_engine, PostgresEngine):
+ # We set the `writers` to an empty list here as we don't care about
+ # missing updates over restarts, as we'll not have anything in our
+ # caches to invalidate. (This reduces the amount of writes to the DB
+ # that happen).
+ self._cache_id_gen = MultiWriterIdGenerator(
+ db_conn,
+ database,
+ stream_name="caches",
+ instance_name=hs.get_instance_name(),
+ tables=[
+ (
+ "cache_invalidation_stream_by_instance",
+ "instance_name",
+ "stream_id",
+ )
+ ],
+ sequence_name="cache_invalidation_stream_seq",
+ writers=[],
+ )
+
+ else:
+ self._cache_id_gen = None
+
async def get_all_updated_caches(
self, instance_name: str, last_id: int, current_id: int, limit: int
) -> Tuple[List[Tuple[int, tuple]], int, bool]:
|