diff options
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 7 | ||||
-rw-r--r-- | synapse/util/async_helpers.py | 131 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 85 | ||||
-rw-r--r-- | synapse/util/caches/treecache.py | 2 | ||||
-rw-r--r-- | synapse/util/check_dependencies.py | 86 | ||||
-rw-r--r-- | synapse/util/templates.py | 5 |
6 files changed, 244 insertions, 72 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 58b4220ff3..d8046b7553 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -31,13 +31,6 @@ from synapse.logging import context if typing.TYPE_CHECKING: pass -# FIXME Mjolnir imports glob_to_regex from this file, but it was moved to -# matrix_common. -# As a temporary workaround, we import glob_to_regex here for -# compatibility with current versions of Mjolnir. -# See https://github.com/matrix-org/mjolnir/pull/174 -from matrix_common.regex import glob_to_regex # noqa - logger = logging.getLogger(__name__) diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index 60c03a66fd..6a8e844d63 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -18,9 +18,10 @@ import collections import inspect import itertools import logging -from contextlib import contextmanager +from contextlib import asynccontextmanager, contextmanager from typing import ( Any, + AsyncIterator, Awaitable, Callable, Collection, @@ -40,7 +41,7 @@ from typing import ( ) import attr -from typing_extensions import ContextManager +from typing_extensions import AsyncContextManager, Literal from twisted.internet import defer from twisted.internet.defer import CancelledError @@ -96,6 +97,10 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]): __slots__ = ["_deferred", "_observers", "_result"] + _deferred: "defer.Deferred[_T]" + _observers: Union[List["defer.Deferred[_T]"], Tuple[()]] + _result: Union[None, Tuple[Literal[True], _T], Tuple[Literal[False], Failure]] + def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False): object.__setattr__(self, "_deferred", deferred) object.__setattr__(self, "_result", None) @@ -158,12 +163,14 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]): effect the underlying deferred. """ if not self._result: + assert isinstance(self._observers, list) d: "defer.Deferred[_T]" = defer.Deferred() self._observers.append(d) return d + elif self._result[0]: + return defer.succeed(self._result[1]) else: - success, res = self._result - return defer.succeed(res) if success else defer.fail(res) + return defer.fail(self._result[1]) def observers(self) -> "Collection[defer.Deferred[_T]]": return self._observers @@ -175,6 +182,8 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]): return self._result is not None and self._result[0] is True def get_result(self) -> Union[_T, Failure]: + if self._result is None: + raise ValueError(f"{self!r} has no result yet") return self._result[1] def __getattr__(self, name: str) -> Any: @@ -483,7 +492,7 @@ class ReadWriteLock: Example: - with await read_write_lock.read("test_key"): + async with read_write_lock.read("test_key"): # do some work """ @@ -506,22 +515,24 @@ class ReadWriteLock: # Latest writer queued self.key_to_current_writer: Dict[str, defer.Deferred] = {} - async def read(self, key: str) -> ContextManager: - new_defer: "defer.Deferred[None]" = defer.Deferred() - - curr_readers = self.key_to_current_readers.setdefault(key, set()) - curr_writer = self.key_to_current_writer.get(key, None) + def read(self, key: str) -> AsyncContextManager: + @asynccontextmanager + async def _ctx_manager() -> AsyncIterator[None]: + new_defer: "defer.Deferred[None]" = defer.Deferred() - curr_readers.add(new_defer) + curr_readers = self.key_to_current_readers.setdefault(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) - # We wait for the latest writer to finish writing. We can safely ignore - # any existing readers... as they're readers. - if curr_writer: - await make_deferred_yieldable(curr_writer) + curr_readers.add(new_defer) - @contextmanager - def _ctx_manager() -> Iterator[None]: try: + # We wait for the latest writer to finish writing. We can safely ignore + # any existing readers... as they're readers. + # May raise a `CancelledError` if the `Deferred` wrapping us is + # cancelled. The `Deferred` we are waiting on must not be cancelled, + # since we do not own it. + if curr_writer: + await make_deferred_yieldable(stop_cancellation(curr_writer)) yield finally: with PreserveLoggingContext(): @@ -530,29 +541,35 @@ class ReadWriteLock: return _ctx_manager() - async def write(self, key: str) -> ContextManager: - new_defer: "defer.Deferred[None]" = defer.Deferred() + def write(self, key: str) -> AsyncContextManager: + @asynccontextmanager + async def _ctx_manager() -> AsyncIterator[None]: + new_defer: "defer.Deferred[None]" = defer.Deferred() - curr_readers = self.key_to_current_readers.get(key, set()) - curr_writer = self.key_to_current_writer.get(key, None) + curr_readers = self.key_to_current_readers.get(key, set()) + curr_writer = self.key_to_current_writer.get(key, None) - # We wait on all latest readers and writer. - to_wait_on = list(curr_readers) - if curr_writer: - to_wait_on.append(curr_writer) + # We wait on all latest readers and writer. + to_wait_on = list(curr_readers) + if curr_writer: + to_wait_on.append(curr_writer) - # We can clear the list of current readers since the new writer waits - # for them to finish. - curr_readers.clear() - self.key_to_current_writer[key] = new_defer + # We can clear the list of current readers since `new_defer` waits + # for them to finish. + curr_readers.clear() + self.key_to_current_writer[key] = new_defer - await make_deferred_yieldable(defer.gatherResults(to_wait_on)) - - @contextmanager - def _ctx_manager() -> Iterator[None]: + to_wait_on_defer = defer.gatherResults(to_wait_on) try: + # Wait for all current readers and the latest writer to finish. + # May raise a `CancelledError` immediately after the wait if the + # `Deferred` wrapping us is cancelled. We must only release the lock + # once we have acquired it, hence the use of `delay_cancellation` + # rather than `stop_cancellation`. + await make_deferred_yieldable(delay_cancellation(to_wait_on_defer)) yield finally: + # Release the lock. with PreserveLoggingContext(): new_defer.callback(None) # `self.key_to_current_writer[key]` may be missing if there was another @@ -678,12 +695,48 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]": Synapse logcontext rules. Returns: - A new `Deferred`, which will contain the result of the original `Deferred`, - but will not propagate cancellation through to the original. When cancelled, - the new `Deferred` will fail with a `CancelledError` and will not follow the - Synapse logcontext rules. `make_deferred_yieldable` should be used to wrap - the new `Deferred`. + A new `Deferred`, which will contain the result of the original `Deferred`. + The new `Deferred` will not propagate cancellation through to the original. + When cancelled, the new `Deferred` will fail with a `CancelledError`. + + The new `Deferred` will not follow the Synapse logcontext rules and should be + wrapped with `make_deferred_yieldable`. """ - new_deferred: defer.Deferred[T] = defer.Deferred() + new_deferred: "defer.Deferred[T]" = defer.Deferred() + deferred.chainDeferred(new_deferred) + return new_deferred + + +def delay_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]": + """Delay cancellation of a `Deferred` until it resolves. + + Has the same effect as `stop_cancellation`, but the returned `Deferred` will not + resolve with a `CancelledError` until the original `Deferred` resolves. + + Args: + deferred: The `Deferred` to protect against cancellation. May optionally follow + the Synapse logcontext rules. + + Returns: + A new `Deferred`, which will contain the result of the original `Deferred`. + The new `Deferred` will not propagate cancellation through to the original. + When cancelled, the new `Deferred` will wait until the original `Deferred` + resolves before failing with a `CancelledError`. + + The new `Deferred` will follow the Synapse logcontext rules if `deferred` + follows the Synapse logcontext rules. Otherwise the new `Deferred` should be + wrapped with `make_deferred_yieldable`. + """ + + def handle_cancel(new_deferred: "defer.Deferred[T]") -> None: + # before the new deferred is cancelled, we `pause` it to stop the cancellation + # propagating. we then `unpause` it once the wrapped deferred completes, to + # propagate the exception. + new_deferred.pause() + new_deferred.errback(Failure(CancelledError())) + + deferred.addBoth(lambda _: new_deferred.unpause()) + + new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel) deferred.chainDeferred(new_deferred) return new_deferred diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 1cdead02f1..eda92d864d 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -20,6 +20,7 @@ from typing import ( Any, Awaitable, Callable, + Collection, Dict, Generic, Hashable, @@ -40,6 +41,7 @@ from twisted.python.failure import Failure from synapse.logging.context import make_deferred_yieldable, preserve_fn from synapse.util import unwrapFirstError +from synapse.util.async_helpers import delay_cancellation from synapse.util.caches.deferred_cache import DeferredCache from synapse.util.caches.lrucache import LruCache @@ -69,6 +71,7 @@ class _CacheDescriptorBase: self, orig: Callable[..., Any], num_args: Optional[int], + uncached_args: Optional[Collection[str]] = None, cache_context: bool = False, ): self.orig = orig @@ -76,6 +79,13 @@ class _CacheDescriptorBase: arg_spec = inspect.getfullargspec(orig) all_args = arg_spec.args + # There's no reason that keyword-only arguments couldn't be supported, + # but right now they're buggy so do not allow them. + if arg_spec.kwonlyargs: + raise ValueError( + "_CacheDescriptorBase does not support keyword-only arguments." + ) + if "cache_context" in all_args: if not cache_context: raise ValueError( @@ -88,6 +98,9 @@ class _CacheDescriptorBase: " named `cache_context`" ) + if num_args is not None and uncached_args is not None: + raise ValueError("Cannot provide both num_args and uncached_args") + if num_args is None: num_args = len(all_args) - 1 if cache_context: @@ -105,6 +118,12 @@ class _CacheDescriptorBase: # list of the names of the args used as the cache key self.arg_names = all_args[1 : num_args + 1] + # If there are args to not cache on, filter them out (and fix the size of num_args). + if uncached_args is not None: + include_arg_in_cache_key = [n not in uncached_args for n in self.arg_names] + else: + include_arg_in_cache_key = [True] * len(self.arg_names) + # self.arg_defaults is a map of arg name to its default value for each # argument that has a default value if arg_spec.defaults: @@ -119,8 +138,8 @@ class _CacheDescriptorBase: self.add_cache_context = cache_context - self.cache_key_builder = get_cache_key_builder( - self.arg_names, self.arg_defaults + self.cache_key_builder = _get_cache_key_builder( + self.arg_names, include_arg_in_cache_key, self.arg_defaults ) @@ -130,8 +149,7 @@ class _LruCachedFunction(Generic[F]): def lru_cache( - max_entries: int = 1000, - cache_context: bool = False, + *, max_entries: int = 1000, cache_context: bool = False ) -> Callable[[F], _LruCachedFunction[F]]: """A method decorator that applies a memoizing cache around the function. @@ -186,7 +204,9 @@ class LruCacheDescriptor(_CacheDescriptorBase): max_entries: int = 1000, cache_context: bool = False, ): - super().__init__(orig, num_args=None, cache_context=cache_context) + super().__init__( + orig, num_args=None, uncached_args=None, cache_context=cache_context + ) self.max_entries = max_entries def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]: @@ -260,6 +280,9 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): num_args: number of positional arguments (excluding ``self`` and ``cache_context``) to use as cache keys. Defaults to all named args of the function. + uncached_args: a list of argument names to not use as the cache key. + (``self`` and ``cache_context`` are always ignored.) Cannot be used + with num_args. tree: cache_context: iterable: @@ -273,12 +296,18 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): orig: Callable[..., Any], max_entries: int = 1000, num_args: Optional[int] = None, + uncached_args: Optional[Collection[str]] = None, tree: bool = False, cache_context: bool = False, iterable: bool = False, prune_unread_entries: bool = True, ): - super().__init__(orig, num_args=num_args, cache_context=cache_context) + super().__init__( + orig, + num_args=num_args, + uncached_args=uncached_args, + cache_context=cache_context, + ) if tree and self.num_args < 2: raise RuntimeError( @@ -322,6 +351,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase): ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs) ret = cache.set(cache_key, ret, callback=invalidate_callback) + # We started a new call to `self.orig`, so we must always wait for it to + # complete. Otherwise we might mark our current logging context as + # finished while `self.orig` is still using it in the background. + ret = delay_cancellation(ret) + return make_deferred_yieldable(ret) wrapped = cast(_CachedFunction, _wrapped) @@ -369,7 +403,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): but including list_name) to use as cache keys. Defaults to all named args of the function. """ - super().__init__(orig, num_args=num_args) + super().__init__(orig, num_args=num_args, uncached_args=None) self.list_name = list_name @@ -482,6 +516,11 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase): d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks( lambda _: results, unwrapFirstError ) + if missing: + # We started a new call to `self.orig`, so we must always wait for it to + # complete. Otherwise we might mark our current logging context as + # finished while `self.orig` is still using it in the background. + d = delay_cancellation(d) return make_deferred_yieldable(d) else: return defer.succeed(results) @@ -530,8 +569,10 @@ class _CacheContext: def cached( + *, max_entries: int = 1000, num_args: Optional[int] = None, + uncached_args: Optional[Collection[str]] = None, tree: bool = False, cache_context: bool = False, iterable: bool = False, @@ -541,6 +582,7 @@ def cached( orig, max_entries=max_entries, num_args=num_args, + uncached_args=uncached_args, tree=tree, cache_context=cache_context, iterable=iterable, @@ -551,7 +593,7 @@ def cached( def cachedList( - cached_method_name: str, list_name: str, num_args: Optional[int] = None + *, cached_method_name: str, list_name: str, num_args: Optional[int] = None ) -> Callable[[F], _CachedFunction[F]]: """Creates a descriptor that wraps a function in a `CacheListDescriptor`. @@ -590,13 +632,16 @@ def cachedList( return cast(Callable[[F], _CachedFunction[F]], func) -def get_cache_key_builder( - param_names: Sequence[str], param_defaults: Mapping[str, Any] +def _get_cache_key_builder( + param_names: Sequence[str], + include_params: Sequence[bool], + param_defaults: Mapping[str, Any], ) -> Callable[[Sequence[Any], Mapping[str, Any]], CacheKey]: """Construct a function which will build cache keys suitable for a cached function Args: param_names: list of formal parameter names for the cached function + include_params: list of bools of whether to include the parameter name in the cache key param_defaults: a mapping from parameter name to default value for that param Returns: @@ -608,6 +653,7 @@ def get_cache_key_builder( if len(param_names) == 1: nm = param_names[0] + assert include_params[0] is True def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey: if nm in kwargs: @@ -620,13 +666,18 @@ def get_cache_key_builder( else: def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey: - return tuple(_get_cache_key_gen(param_names, param_defaults, args, kwargs)) + return tuple( + _get_cache_key_gen( + param_names, include_params, param_defaults, args, kwargs + ) + ) return get_cache_key def _get_cache_key_gen( param_names: Iterable[str], + include_params: Iterable[bool], param_defaults: Mapping[str, Any], args: Sequence[Any], kwargs: Mapping[str, Any], @@ -637,16 +688,18 @@ def _get_cache_key_gen( This is essentially the same operation as `inspect.getcallargs`, but optimised so that we don't need to inspect the target function for each call. """ - # We loop through each arg name, looking up if its in the `kwargs`, # otherwise using the next argument in `args`. If there are no more # args then we try looking the arg name up in the defaults. pos = 0 - for nm in param_names: + for nm, inc in zip(param_names, include_params): if nm in kwargs: - yield kwargs[nm] + if inc: + yield kwargs[nm] elif pos < len(args): - yield args[pos] + if inc: + yield args[pos] pos += 1 else: - yield param_defaults[nm] + if inc: + yield param_defaults[nm] diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 563845f867..e78305f787 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -22,8 +22,6 @@ class TreeCacheNode(dict): leaves. """ - pass - class TreeCache: """ diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py index 3a1f6b3c75..66f1da7502 100644 --- a/synapse/util/check_dependencies.py +++ b/synapse/util/check_dependencies.py @@ -1,3 +1,25 @@ +# Copyright 2022 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +""" +This module exposes a single function which checks synapse's dependencies are present +and correctly versioned. It makes use of `importlib.metadata` to do so. The details +are a bit murky: there's no easy way to get a map from "extras" to the packages they +require. But this is probably just symptomatic of Python's package management. +""" + import logging from typing import Iterable, NamedTuple, Optional @@ -10,6 +32,8 @@ try: except ImportError: import importlib_metadata as metadata # type: ignore[no-redef] +__all__ = ["check_requirements"] + class DependencyException(Exception): @property @@ -29,7 +53,17 @@ class DependencyException(Exception): yield '"' + i + '"' -EXTRAS = set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) +DEV_EXTRAS = {"lint", "mypy", "test", "dev"} +RUNTIME_EXTRAS = ( + set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS +) +VERSION = metadata.version(DISTRIBUTION_NAME) + + +def _is_dev_dependency(req: Requirement) -> bool: + return req.marker is not None and any( + req.marker.evaluate({"extra": e}) for e in DEV_EXTRAS + ) class Dependency(NamedTuple): @@ -43,6 +77,9 @@ def _generic_dependencies() -> Iterable[Dependency]: assert requirements is not None for raw_requirement in requirements: req = Requirement(raw_requirement) + if _is_dev_dependency(req): + continue + # https://packaging.pypa.io/en/latest/markers.html#usage notes that # > Evaluating an extra marker with no environment is an error # so we pass in a dummy empty extra value here. @@ -56,6 +93,8 @@ def _dependencies_for_extra(extra: str) -> Iterable[Dependency]: assert requirements is not None for raw_requirement in requirements: req = Requirement(raw_requirement) + if _is_dev_dependency(req): + continue # Exclude mandatory deps by only selecting deps needed with this extra. if ( req.marker is not None @@ -67,18 +106,39 @@ def _dependencies_for_extra(extra: str) -> Iterable[Dependency]: def _not_installed(requirement: Requirement, extra: Optional[str] = None) -> str: if extra: - return f"Need {requirement.name} for {extra}, but it is not installed" + return ( + f"Synapse {VERSION} needs {requirement.name} for {extra}, " + f"but it is not installed" + ) else: - return f"Need {requirement.name}, but it is not installed" + return f"Synapse {VERSION} needs {requirement.name}, but it is not installed" def _incorrect_version( requirement: Requirement, got: str, extra: Optional[str] = None ) -> str: if extra: - return f"Need {requirement} for {extra}, but got {requirement.name}=={got}" + return ( + f"Synapse {VERSION} needs {requirement} for {extra}, " + f"but got {requirement.name}=={got}" + ) + else: + return ( + f"Synapse {VERSION} needs {requirement}, but got {requirement.name}=={got}" + ) + + +def _no_reported_version(requirement: Requirement, extra: Optional[str] = None) -> str: + if extra: + return ( + f"Synapse {VERSION} needs {requirement} for {extra}, " + f"but can't determine {requirement.name}'s version" + ) else: - return f"Need {requirement}, but got {requirement.name}=={got}" + return ( + f"Synapse {VERSION} needs {requirement}, " + f"but can't determine {requirement.name}'s version" + ) def check_requirements(extra: Optional[str] = None) -> None: @@ -100,10 +160,10 @@ def check_requirements(extra: Optional[str] = None) -> None: # First work out which dependencies are required, and which are optional. if extra is None: dependencies = _generic_dependencies() - elif extra in EXTRAS: + elif extra in RUNTIME_EXTRAS: dependencies = _dependencies_for_extra(extra) else: - raise ValueError(f"Synapse does not provide the feature '{extra}'") + raise ValueError(f"Synapse {VERSION} does not provide the feature '{extra}'") deps_unfulfilled = [] errors = [] @@ -116,7 +176,17 @@ def check_requirements(extra: Optional[str] = None) -> None: deps_unfulfilled.append(requirement.name) errors.append(_not_installed(requirement, extra)) else: - if not requirement.specifier.contains(dist.version): + if dist.version is None: + # This shouldn't happen---it suggests a borked virtualenv. (See #12223) + # Try to give a vaguely helpful error message anyway. + # Type-ignore: the annotations don't reflect reality: see + # https://github.com/python/typeshed/issues/7513 + # https://bugs.python.org/issue47060 + deps_unfulfilled.append(requirement.name) # type: ignore[unreachable] + errors.append(_no_reported_version(requirement, extra)) + + # We specify prereleases=True to allow prereleases such as RCs. + elif not requirement.specifier.contains(dist.version, prereleases=True): deps_unfulfilled.append(requirement.name) errors.append(_incorrect_version(requirement, dist.version, extra)) diff --git a/synapse/util/templates.py b/synapse/util/templates.py index 12941065ca..fb758b7180 100644 --- a/synapse/util/templates.py +++ b/synapse/util/templates.py @@ -64,6 +64,7 @@ def build_jinja_env( { "format_ts": _format_ts_filter, "mxc_to_http": _create_mxc_to_http_filter(config.server.public_baseurl), + "localpart_from_email": _localpart_from_email_filter, } ) @@ -112,3 +113,7 @@ def _create_mxc_to_http_filter( def _format_ts_filter(value: int, format: str) -> str: return time.strftime(format, time.localtime(value / 1000)) + + +def _localpart_from_email_filter(address: str) -> str: + return address.rsplit("@", 1)[0] |