diff options
Diffstat (limited to 'synapse/util/caches')
-rw-r--r-- | synapse/util/caches/cached_call.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/deferred_cache.py | 11 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 57 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 6 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 6 | ||||
-rw-r--r-- | synapse/util/caches/ttlcache.py | 12 |
6 files changed, 45 insertions, 49 deletions
diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py index e58dd91eda..470f4f91a5 100644 --- a/synapse/util/caches/cached_call.py +++ b/synapse/util/caches/cached_call.py @@ -85,7 +85,7 @@ class CachedCall(Generic[TV]): # result in the deferred, since `awaiting` a deferred destroys its result. # (Also, if it's a Failure, GCing the deferred would log a critical error # about unhandled Failures) - def got_result(r): + def got_result(r: Union[TV, Failure]) -> None: self._result = r self._deferred.addBoth(got_result) diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 6262efe072..da502aec11 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -31,6 +31,7 @@ from prometheus_client import Gauge from twisted.internet import defer from twisted.python import failure +from twisted.python.failure import Failure from synapse.util.async_helpers import ObservableDeferred from synapse.util.caches.lrucache import LruCache @@ -112,7 +113,7 @@ class DeferredCache(Generic[KT, VT]): self.thread: Optional[threading.Thread] = None @property - def max_entries(self): + def max_entries(self) -> int: return self.cache.max_size def check_thread(self) -> None: @@ -258,7 +259,7 @@ class DeferredCache(Generic[KT, VT]): return False - def cb(result) -> None: + def cb(result: VT) -> None: if compare_and_pop(): self.cache.set(key, result, entry.callbacks) else: @@ -270,7 +271,7 @@ class DeferredCache(Generic[KT, VT]): # not have been. Either way, let's double-check now. entry.invalidate() - def eb(_fail) -> None: + def eb(_fail: Failure) -> None: compare_and_pop() entry.invalidate() @@ -284,11 +285,11 @@ class DeferredCache(Generic[KT, VT]): def prefill( self, key: KT, value: VT, callback: Optional[Callable[[], None]] = None - ): + ) -> None: callbacks = [callback] if callback else [] self.cache.set(key, value, callbacks=callbacks) - def invalidate(self, key): + def invalidate(self, key) -> None: """Delete a key, or tree of entries If the cache is backed by a regular dict, then "key" must be of diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 4ff62b403f..a0a7a9de32 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -52,7 +52,7 @@ logger = logging.getLogger(__name__) try: from pympler.asizeof import Asizer - def _get_size_of(val: Any, *, recurse=True) -> int: + def _get_size_of(val: Any, *, recurse: bool = True) -> int: """Get an estimate of the size in bytes of the object. Args: @@ -71,7 +71,7 @@ try: except ImportError: - def _get_size_of(val: Any, *, recurse=True) -> int: + def _get_size_of(val: Any, *, recurse: bool = True) -> int: return 0 @@ -85,15 +85,6 @@ VT = TypeVar("VT") # a general type var, distinct from either KT or VT T = TypeVar("T") - -def enumerate_leaves(node, depth): - if depth == 0: - yield node - else: - for n in node.values(): - yield from enumerate_leaves(n, depth - 1) - - P = TypeVar("P") @@ -102,7 +93,7 @@ class _TimedListNode(ListNode[P]): __slots__ = ["last_access_ts_secs"] - def update_last_access(self, clock: Clock): + def update_last_access(self, clock: Clock) -> None: self.last_access_ts_secs = int(clock.time()) @@ -115,7 +106,7 @@ GLOBAL_ROOT = ListNode["_Node"].create_root_node() @wrap_as_background_process("LruCache._expire_old_entries") -async def _expire_old_entries(clock: Clock, expiry_seconds: int): +async def _expire_old_entries(clock: Clock, expiry_seconds: int) -> None: """Walks the global cache list to find cache entries that haven't been accessed in the given number of seconds. """ @@ -163,7 +154,7 @@ async def _expire_old_entries(clock: Clock, expiry_seconds: int): logger.info("Dropped %d items from caches", i) -def setup_expire_lru_cache_entries(hs: "HomeServer"): +def setup_expire_lru_cache_entries(hs: "HomeServer") -> None: """Start a background job that expires all cache entries if they have not been accessed for the given number of seconds. """ @@ -183,7 +174,7 @@ def setup_expire_lru_cache_entries(hs: "HomeServer"): ) -class _Node: +class _Node(Generic[KT, VT]): __slots__ = [ "_list_node", "_global_list_node", @@ -197,8 +188,8 @@ class _Node: def __init__( self, root: "ListNode[_Node]", - key, - value, + key: KT, + value: VT, cache: "weakref.ReferenceType[LruCache]", clock: Clock, callbacks: Collection[Callable[[], None]] = (), @@ -409,7 +400,7 @@ class LruCache(Generic[KT, VT]): def synchronized(f: FT) -> FT: @wraps(f) - def inner(*args, **kwargs): + def inner(*args: Any, **kwargs: Any) -> Any: with lock: return f(*args, **kwargs) @@ -418,17 +409,19 @@ class LruCache(Generic[KT, VT]): cached_cache_len = [0] if size_callback is not None: - def cache_len(): + def cache_len() -> int: return cached_cache_len[0] else: - def cache_len(): + def cache_len() -> int: return len(cache) self.len = synchronized(cache_len) - def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()): + def add_node( + key: KT, value: VT, callbacks: Collection[Callable[[], None]] = () + ) -> None: node = _Node( list_root, key, @@ -446,7 +439,7 @@ class LruCache(Generic[KT, VT]): if caches.TRACK_MEMORY_USAGE and metrics: metrics.inc_memory_usage(node.memory) - def move_node_to_front(node: _Node): + def move_node_to_front(node: _Node) -> None: node.move_to_front(real_clock, list_root) def delete_node(node: _Node) -> int: @@ -488,7 +481,7 @@ class LruCache(Generic[KT, VT]): default: Optional[T] = None, callbacks: Collection[Callable[[], None]] = (), update_metrics: bool = True, - ): + ) -> Union[None, T, VT]: node = cache.get(key, None) if node is not None: move_node_to_front(node) @@ -502,7 +495,9 @@ class LruCache(Generic[KT, VT]): return default @synchronized - def cache_set(key: KT, value: VT, callbacks: Iterable[Callable[[], None]] = ()): + def cache_set( + key: KT, value: VT, callbacks: Iterable[Callable[[], None]] = () + ) -> None: node = cache.get(key, None) if node is not None: # We sometimes store large objects, e.g. dicts, which cause @@ -547,7 +542,7 @@ class LruCache(Generic[KT, VT]): ... @synchronized - def cache_pop(key: KT, default: Optional[T] = None): + def cache_pop(key: KT, default: Optional[T] = None) -> Union[None, T, VT]: node = cache.get(key, None) if node: delete_node(node) @@ -612,25 +607,25 @@ class LruCache(Generic[KT, VT]): self.contains = cache_contains self.clear = cache_clear - def __getitem__(self, key): + def __getitem__(self, key: KT) -> VT: result = self.get(key, self.sentinel) if result is self.sentinel: raise KeyError() else: - return result + return cast(VT, result) - def __setitem__(self, key, value): + def __setitem__(self, key: KT, value: VT) -> None: self.set(key, value) - def __delitem__(self, key, value): + def __delitem__(self, key: KT, value: VT) -> None: result = self.pop(key, self.sentinel) if result is self.sentinel: raise KeyError() - def __len__(self): + def __len__(self) -> int: return self.len() - def __contains__(self, key): + def __contains__(self, key: KT) -> bool: return self.contains(key) def set_cache_factor(self, factor: float) -> bool: diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index ed7204336f..88ccf44337 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -104,8 +104,8 @@ class ResponseCache(Generic[KV]): return None def _set( - self, context: ResponseCacheContext[KV], deferred: defer.Deferred - ) -> defer.Deferred: + self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]" + ) -> "defer.Deferred[RV]": """Set the entry for the given key to the given deferred. *deferred* should run its callbacks in the sentinel logcontext (ie, @@ -126,7 +126,7 @@ class ResponseCache(Generic[KV]): key = context.cache_key self.pending_result_cache[key] = result - def on_complete(r): + def on_complete(r: RV) -> RV: # if this cache has a non-zero timeout, and the callback has not cleared # the should_cache bit, we leave it in the cache for now and schedule # its removal later. diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 27b1da235e..330709b8b7 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -40,10 +40,10 @@ class StreamChangeCache: self, name: str, current_stream_pos: int, - max_size=10000, + max_size: int = 10000, prefilled_cache: Optional[Mapping[EntityType, int]] = None, - ): - self._original_max_size = max_size + ) -> None: + self._original_max_size: int = max_size self._max_size = math.floor(max_size) self._entity_to_key: Dict[EntityType, int] = {} diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index 46afe3f934..0b9ac26b69 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -159,12 +159,12 @@ class TTLCache(Generic[KT, VT]): del self._expiry_list[0] -@attr.s(frozen=True, slots=True) -class _CacheEntry: +@attr.s(frozen=True, slots=True, auto_attribs=True) +class _CacheEntry: # Should be Generic[KT, VT]. See python-attrs/attrs#313 """TTLCache entry""" # expiry_time is the first attribute, so that entries are sorted by expiry. - expiry_time = attr.ib(type=float) - ttl = attr.ib(type=float) - key = attr.ib() - value = attr.ib() + expiry_time: float + ttl: float + key: Any # should be KT + value: Any # should be VT |