diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index 1e6696332f..14bd3ba3b0 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -21,10 +21,19 @@
import enum
import logging
import threading
-from typing import Dict, Generic, Iterable, Optional, Set, Tuple, TypeVar, Union
+from typing import (
+ Dict,
+ Generic,
+ Iterable,
+ Literal,
+ Optional,
+ Set,
+ Tuple,
+ TypeVar,
+ Union,
+)
import attr
-from typing_extensions import Literal
from synapse.util.caches.lrucache import LruCache
from synapse.util.caches.treecache import TreeCache
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 8017c031ee..3198fdd2ed 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -21,10 +21,9 @@
import logging
from collections import OrderedDict
-from typing import Any, Generic, Iterable, Optional, TypeVar, Union, overload
+from typing import Any, Generic, Iterable, Literal, Optional, TypeVar, Union, overload
import attr
-from typing_extensions import Literal
from twisted.internet import defer
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 481a1a621e..2e5efa3a52 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -34,6 +34,7 @@ from typing import (
Generic,
Iterable,
List,
+ Literal,
Optional,
Set,
Tuple,
@@ -44,8 +45,6 @@ from typing import (
overload,
)
-from typing_extensions import Literal
-
from twisted.internet import reactor
from twisted.internet.interfaces import IReactorTime
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 96b7ca83dc..54b99134b9 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -101,7 +101,13 @@ class ResponseCache(Generic[KV]):
used rather than trying to compute a new response.
"""
- def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
+ def __init__(
+ self,
+ clock: Clock,
+ name: str,
+ timeout_ms: float = 0,
+ enable_logging: bool = True,
+ ):
self._result_cache: Dict[KV, ResponseCacheEntry] = {}
self.clock = clock
@@ -109,6 +115,7 @@ class ResponseCache(Generic[KV]):
self._name = name
self._metrics = register_cache("response_cache", name, self, resizable=False)
+ self._enable_logging = enable_logging
def size(self) -> int:
return len(self._result_cache)
@@ -246,9 +253,12 @@ class ResponseCache(Generic[KV]):
"""
entry = self._get(key)
if not entry:
- logger.debug(
- "[%s]: no cached result for [%s], calculating new one", self._name, key
- )
+ if self._enable_logging:
+ logger.debug(
+ "[%s]: no cached result for [%s], calculating new one",
+ self._name,
+ key,
+ )
context = ResponseCacheContext(cache_key=key)
if cache_context:
kwargs["cache_context"] = context
@@ -269,12 +279,15 @@ class ResponseCache(Generic[KV]):
return await make_deferred_yieldable(entry.result.observe())
result = entry.result.observe()
- if result.called:
- logger.info("[%s]: using completed cached result for [%s]", self._name, key)
- else:
- logger.info(
- "[%s]: using incomplete cached result for [%s]", self._name, key
- )
+ if self._enable_logging:
+ if result.called:
+ logger.info(
+ "[%s]: using completed cached result for [%s]", self._name, key
+ )
+ else:
+ logger.info(
+ "[%s]: using incomplete cached result for [%s]", self._name, key
+ )
span_context = entry.opentracing_span_context
with start_active_span_follows_from(
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 16fcb00206..5ac8643eef 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -142,9 +142,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
- # _cache is not valid at or before the earliest known stream position, so
+ # _cache is not valid before the earliest known stream position, so
# return that the entity has changed.
- if stream_pos <= self._earliest_known_stream_pos:
+ if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -186,7 +186,7 @@ class StreamChangeCache:
This will be all entities if the given stream position is at or earlier
than the earliest known stream position.
"""
- if not self._cache or stream_pos <= self._earliest_known_stream_pos:
+ if not self._cache or stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return set(entities)
@@ -238,9 +238,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
- # _cache is not valid at or before the earliest known stream position, so
+ # _cache is not valid before the earliest known stream position, so
# return that an entity has changed.
- if stream_pos <= self._earliest_known_stream_pos:
+ if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
return True
@@ -270,9 +270,9 @@ class StreamChangeCache:
"""
assert isinstance(stream_pos, int)
- # _cache is not valid at or before the earliest known stream position, so
+ # _cache is not valid before the earliest known stream position, so
# return None to mark that it is unknown if an entity has changed.
- if stream_pos <= self._earliest_known_stream_pos:
+ if stream_pos < self._earliest_known_stream_pos:
return AllEntitiesChangedResult(None)
changed_entities: List[EntityType] = []
@@ -314,6 +314,15 @@ class StreamChangeCache:
self._entity_to_key[entity] = stream_pos
self._evict()
+ def all_entities_changed(self, stream_pos: int) -> None:
+ """
+ Mark all entities as changed. This is useful when the cache is invalidated and
+ there may be some potential change for all of the entities.
+ """
+ self._cache.clear()
+ self._entity_to_key.clear()
+ self._earliest_known_stream_pos = stream_pos
+
def _evict(self) -> None:
"""
Ensure the cache has not exceeded the maximum size.
|