diff options
author | Jonathan de Jong <jonathan@automatia.nl> | 2021-07-15 18:46:54 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-07-15 12:46:54 -0400 |
commit | bdfde6dca11a9468372b3c9b327ad3327cbdbe4a (patch) | |
tree | e3185688882f25f08cc0aefa80d8e1944c5004d9 /synapse/util | |
parent | Reduce likelihood of Postgres table scanning `state_groups_state`. (#10359) (diff) | |
download | synapse-bdfde6dca11a9468372b3c9b327ad3327cbdbe4a.tar.xz |
Use inline type hints in `http/federation/`, `storage/` and `util/` (#10381)
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/async_helpers.py | 8 | ||||
-rw-r--r-- | synapse/util/batching_queue.py | 8 | ||||
-rw-r--r-- | synapse/util/caches/__init__.py | 4 | ||||
-rw-r--r-- | synapse/util/caches/cached_call.py | 6 | ||||
-rw-r--r-- | synapse/util/caches/deferred_cache.py | 12 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 36 | ||||
-rw-r--r-- | synapse/util/caches/dictionary_cache.py | 6 | ||||
-rw-r--r-- | synapse/util/caches/expiringcache.py | 4 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 8 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 2 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 6 | ||||
-rw-r--r-- | synapse/util/caches/ttlcache.py | 6 | ||||
-rw-r--r-- | synapse/util/iterutils.py | 2 | ||||
-rw-r--r-- | synapse/util/macaroons.py | 2 | ||||
-rw-r--r-- | synapse/util/metrics.py | 2 | ||||
-rw-r--r-- | synapse/util/patch_inline_callbacks.py | 4 |
16 files changed, 57 insertions, 59 deletions
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 061102c3c8..014db1355b 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -257,7 +257,7 @@ class Linearizer: max_count: The maximum number of concurrent accesses """ if name is None: - self.name = id(self) # type: Union[str, int] + self.name: Union[str, int] = id(self) else: self.name = name @@ -269,7 +269,7 @@ class Linearizer: self.max_count = max_count # key_to_defer is a map from the key to a _LinearizerEntry. - self.key_to_defer = {} # type: Dict[Hashable, _LinearizerEntry] + self.key_to_defer: Dict[Hashable, _LinearizerEntry] = {} def is_queued(self, key: Hashable) -> bool: """Checks whether there is a process queued up waiting""" @@ -409,10 +409,10 @@ class ReadWriteLock: def __init__(self): # Latest readers queued - self.key_to_current_readers = {} # type: Dict[str, Set[defer.Deferred]] + self.key_to_current_readers: Dict[str, Set[defer.Deferred]] = {} # Latest writer queued - self.key_to_current_writer = {} # type: Dict[str, defer.Deferred] + self.key_to_current_writer: Dict[str, defer.Deferred] = {} async def read(self, key: str) -> ContextManager: new_defer = defer.Deferred() diff --git a/synapse/util/batching_queue.py b/synapse/util/batching_queue.py index 8fd5bfb69b..274cea7eb7 100644 --- a/synapse/util/batching_queue.py +++ b/synapse/util/batching_queue.py @@ -93,11 +93,11 @@ class BatchingQueue(Generic[V, R]): self._clock = clock # The set of keys currently being processed. - self._processing_keys = set() # type: Set[Hashable] + self._processing_keys: Set[Hashable] = set() # The currently pending batch of values by key, with a Deferred to call # with the result of the corresponding `_process_batch_callback` call. - self._next_values = {} # type: Dict[Hashable, List[Tuple[V, defer.Deferred]]] + self._next_values: Dict[Hashable, List[Tuple[V, defer.Deferred]]] = {} # The function to call with batches of values. self._process_batch_callback = process_batch_callback @@ -108,9 +108,7 @@ class BatchingQueue(Generic[V, R]): number_of_keys.labels(self._name).set_function(lambda: len(self._next_values)) - self._number_in_flight_metric = number_in_flight.labels( - self._name - ) # type: Gauge + self._number_in_flight_metric: Gauge = number_in_flight.labels(self._name) async def add_to_queue(self, value: V, key: Hashable = ()) -> R: """Adds the value to the queue with the given key, returning the result diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index ca36f07c20..9012034b7a 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -29,8 +29,8 @@ logger = logging.getLogger(__name__) TRACK_MEMORY_USAGE = False -caches_by_name = {} # type: Dict[str, Sized] -collectors_by_name = {} # type: Dict[str, CacheMetric] +caches_by_name: Dict[str, Sized] = {} +collectors_by_name: Dict[str, "CacheMetric"] = {} cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py index a301c9e89b..891bee0b33 100644 --- a/synapse/util/caches/cached_call.py +++ b/synapse/util/caches/cached_call.py @@ -63,9 +63,9 @@ class CachedCall(Generic[TV]): f: The underlying function. Only one call to this function will be alive at once (per instance of CachedCall) """ - self._callable = f # type: Optional[Callable[[], Awaitable[TV]]] - self._deferred = None # type: Optional[Deferred] - self._result = None # type: Union[None, Failure, TV] + self._callable: Optional[Callable[[], Awaitable[TV]]] = f + self._deferred: Optional[Deferred] = None + self._result: Union[None, Failure, TV] = None async def get(self) -> TV: """Kick off the call if necessary, and return the result""" diff --git a/synapse/util/caches/deferred_cache.py b/synapse/util/caches/deferred_cache.py index 1044139119..8c6fafc677 100644 --- a/synapse/util/caches/deferred_cache.py +++ b/synapse/util/caches/deferred_cache.py @@ -80,25 +80,25 @@ class DeferredCache(Generic[KT, VT]): cache_type = TreeCache if tree else dict # _pending_deferred_cache maps from the key value to a `CacheEntry` object. - self._pending_deferred_cache = ( - cache_type() - ) # type: Union[TreeCache, MutableMapping[KT, CacheEntry]] + self._pending_deferred_cache: Union[ + TreeCache, "MutableMapping[KT, CacheEntry]" + ] = cache_type() def metrics_cb(): cache_pending_metric.labels(name).set(len(self._pending_deferred_cache)) # cache is used for completed results and maps to the result itself, rather than # a Deferred. - self.cache = LruCache( + self.cache: LruCache[KT, VT] = LruCache( max_size=max_entries, cache_name=name, cache_type=cache_type, size_callback=(lambda d: len(d) or 1) if iterable else None, metrics_collection_callback=metrics_cb, apply_cache_factor_from_config=apply_cache_factor_from_config, - ) # type: LruCache[KT, VT] + ) - self.thread = None # type: Optional[threading.Thread] + self.thread: Optional[threading.Thread] = None @property def max_entries(self): diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index d77e8edeea..1e8e6b1d01 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -46,17 +46,17 @@ F = TypeVar("F", bound=Callable[..., Any]) class _CachedFunction(Generic[F]): - invalidate = None # type: Any - invalidate_all = None # type: Any - prefill = None # type: Any - cache = None # type: Any - num_args = None # type: Any + invalidate: Any = None + invalidate_all: Any = None + prefill: Any = None + cache: Any = None + num_args: Any = None - __name__ = None # type: str + __name__: str # Note: This function signature is actually fiddled with by the synapse mypy # plugin to a) make it a bound method, and b) remove any `cache_context` arg. - __call__ = None # type: F + __call__: F class _CacheDescriptorBase: @@ -115,8 +115,8 @@ class _CacheDescriptorBase: class _LruCachedFunction(Generic[F]): - cache = None # type: LruCache[CacheKey, Any] - __call__ = None # type: F + cache: LruCache[CacheKey, Any] + __call__: F def lru_cache( @@ -180,10 +180,10 @@ class LruCacheDescriptor(_CacheDescriptorBase): self.max_entries = max_entries def __get__(self, obj, owner): - cache = LruCache( + cache: LruCache[CacheKey, Any] = LruCache( cache_name=self.orig.__name__, max_size=self.max_entries, - ) # type: LruCache[CacheKey, Any] + ) get_cache_key = self.cache_key_builder sentinel = LruCacheDescriptor._Sentinel.sentinel @@ -271,12 +271,12 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): self.iterable = iterable def __get__(self, obj, owner): - cache = DeferredCache( + cache: DeferredCache[CacheKey, Any] = DeferredCache( name=self.orig.__name__, max_entries=self.max_entries, tree=self.tree, iterable=self.iterable, - ) # type: DeferredCache[CacheKey, Any] + ) get_cache_key = self.cache_key_builder @@ -359,7 +359,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): def __get__(self, obj, objtype=None): cached_method = getattr(obj, self.cached_method_name) - cache = cached_method.cache # type: DeferredCache[CacheKey, Any] + cache: DeferredCache[CacheKey, Any] = cached_method.cache num_args = cached_method.num_args @functools.wraps(self.orig) @@ -472,15 +472,15 @@ class _CacheContext: Cache = Union[DeferredCache, LruCache] - _cache_context_objects = ( - WeakValueDictionary() - ) # type: WeakValueDictionary[Tuple[_CacheContext.Cache, CacheKey], _CacheContext] + _cache_context_objects: """WeakValueDictionary[ + Tuple["_CacheContext.Cache", CacheKey], "_CacheContext" + ]""" = WeakValueDictionary() def __init__(self, cache: "_CacheContext.Cache", cache_key: CacheKey) -> None: self._cache = cache self._cache_key = cache_key - def invalidate(self): # type: () -> None + def invalidate(self) -> None: """Invalidates the cache entry referred to by the context.""" self._cache.invalidate(self._cache_key) diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py index 56d94d96ce..3f852edd7f 100644 --- a/synapse/util/caches/dictionary_cache.py +++ b/synapse/util/caches/dictionary_cache.py @@ -62,13 +62,13 @@ class DictionaryCache(Generic[KT, DKT]): """ def __init__(self, name: str, max_entries: int = 1000): - self.cache = LruCache( + self.cache: LruCache[KT, DictionaryEntry] = LruCache( max_size=max_entries, cache_name=name, size_callback=len - ) # type: LruCache[KT, DictionaryEntry] + ) self.name = name self.sequence = 0 - self.thread = None # type: Optional[threading.Thread] + self.thread: Optional[threading.Thread] = None def check_thread(self) -> None: expected_thread = self.thread diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index ac47a31cd7..bde16b8577 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -27,7 +27,7 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) -SENTINEL = object() # type: Any +SENTINEL: Any = object() T = TypeVar("T") @@ -71,7 +71,7 @@ class ExpiringCache(Generic[KT, VT]): self._expiry_ms = expiry_ms self._reset_expiry_on_get = reset_expiry_on_get - self._cache = OrderedDict() # type: OrderedDict[KT, _CacheEntry] + self._cache: OrderedDict[KT, _CacheEntry] = OrderedDict() self.iterable = iterable diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 4b9d0433ff..efeba0cb96 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -226,7 +226,7 @@ class _Node: # footprint down. Storing `None` is free as its a singleton, while empty # lists are 56 bytes (and empty sets are 216 bytes, if we did the naive # thing and used sets). - self.callbacks = None # type: Optional[List[Callable[[], None]]] + self.callbacks: Optional[List[Callable[[], None]]] = None self.add_callbacks(callbacks) @@ -362,15 +362,15 @@ class LruCache(Generic[KT, VT]): # register_cache might call our "set_cache_factor" callback; there's nothing to # do yet when we get resized. - self._on_resize = None # type: Optional[Callable[[],None]] + self._on_resize: Optional[Callable[[], None]] = None if cache_name is not None: - metrics = register_cache( + metrics: Optional[CacheMetric] = register_cache( "lru_cache", cache_name, self, collect_callback=metrics_collection_callback, - ) # type: Optional[CacheMetric] + ) else: metrics = None diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 34c662c4db..ed7204336f 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -66,7 +66,7 @@ class ResponseCache(Generic[KV]): # This is poorly-named: it includes both complete and incomplete results. # We keep complete results rather than switching to absolute values because # that makes it easier to cache Failure results. - self.pending_result_cache = {} # type: Dict[KV, ObservableDeferred] + self.pending_result_cache: Dict[KV, ObservableDeferred] = {} self.clock = clock self.timeout_sec = timeout_ms / 1000.0 diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index e81e468899..3a41a8baa6 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -45,10 +45,10 @@ class StreamChangeCache: ): self._original_max_size = max_size self._max_size = math.floor(max_size) - self._entity_to_key = {} # type: Dict[EntityType, int] + self._entity_to_key: Dict[EntityType, int] = {} # map from stream id to the a set of entities which changed at that stream id. - self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]] + self._cache: SortedDict[int, Set[EntityType]] = SortedDict() # the earliest stream_pos for which we can reliably answer # get_all_entities_changed. In other words, one less than the earliest @@ -155,7 +155,7 @@ class StreamChangeCache: if stream_pos < self._earliest_known_stream_pos: return None - changed_entities = [] # type: List[EntityType] + changed_entities: List[EntityType] = [] for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)): changed_entities.extend(self._cache[k]) diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index c276107d56..46afe3f934 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -23,7 +23,7 @@ from synapse.util.caches import register_cache logger = logging.getLogger(__name__) -SENTINEL = object() # type: Any +SENTINEL: Any = object() T = TypeVar("T") KT = TypeVar("KT") @@ -35,10 +35,10 @@ class TTLCache(Generic[KT, VT]): def __init__(self, cache_name: str, timer: Callable[[], float] = time.time): # map from key to _CacheEntry - self._data = {} # type: Dict[KT, _CacheEntry] + self._data: Dict[KT, _CacheEntry] = {} # the _CacheEntries, sorted by expiry time - self._expiry_list = SortedList() # type: SortedList[_CacheEntry] + self._expiry_list: SortedList[_CacheEntry] = SortedList() self._timer = timer diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py index 886afa9d19..8ac3eab2f5 100644 --- a/synapse/util/iterutils.py +++ b/synapse/util/iterutils.py @@ -68,7 +68,7 @@ def sorted_topologically( # This is implemented by Kahn's algorithm. degree_map = {node: 0 for node in nodes} - reverse_graph = {} # type: Dict[T, Set[T]] + reverse_graph: Dict[T, Set[T]] = {} for node, edges in graph.items(): if node not in degree_map: diff --git a/synapse/util/macaroons.py b/synapse/util/macaroons.py index f6ebfd7e7d..d1f76e3dc5 100644 --- a/synapse/util/macaroons.py +++ b/synapse/util/macaroons.py @@ -39,7 +39,7 @@ def get_value_from_macaroon(macaroon: pymacaroons.Macaroon, key: str) -> str: caveat in the macaroon, or if the caveat was not found in the macaroon. """ prefix = key + " = " - result = None # type: Optional[str] + result: Optional[str] = None for caveat in macaroon.caveats: if not caveat.caveat_id.startswith(prefix): continue diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 45353d41c5..1b82dca81b 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -124,7 +124,7 @@ class Measure: assert isinstance(curr_context, LoggingContext) parent_context = curr_context self._logging_context = LoggingContext(str(curr_context), parent_context) - self.start = None # type: Optional[int] + self.start: Optional[int] = None def __enter__(self) -> "Measure": if self.start is not None: diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py index eed0291cae..99f01e325c 100644 --- a/synapse/util/patch_inline_callbacks.py +++ b/synapse/util/patch_inline_callbacks.py @@ -41,7 +41,7 @@ def do_patch(): @functools.wraps(f) def wrapped(*args, **kwargs): start_context = current_context() - changes = [] # type: List[str] + changes: List[str] = [] orig = orig_inline_callbacks(_check_yield_points(f, changes)) try: @@ -131,7 +131,7 @@ def _check_yield_points(f: Callable, changes: List[str]): gen = f(*args, **kwargs) last_yield_line_no = gen.gi_frame.f_lineno - result = None # type: Any + result: Any = None while True: expected_context = current_context() |