diff options
author | Brendan Abolivier <babolivier@matrix.org> | 2020-06-10 11:42:30 +0100 |
---|---|---|
committer | Brendan Abolivier <babolivier@matrix.org> | 2020-06-10 11:42:30 +0100 |
commit | ec0a7b9034806d6b2ba086bae58f5c6b0fd14672 (patch) | |
tree | f2af547b1342795e10548f8fb7a9cfc93e03df37 /synapse/util | |
parent | changelog (diff) | |
parent | 1.15.0rc1 (diff) | |
download | synapse-ec0a7b9034806d6b2ba086bae58f5c6b0fd14672.tar.xz |
Merge branch 'develop' into babolivier/mark_unread
Diffstat (limited to 'synapse/util')
-rw-r--r-- | synapse/util/__init__.py | 17 | ||||
-rw-r--r-- | synapse/util/async_helpers.py | 77 | ||||
-rw-r--r-- | synapse/util/caches/__init__.py | 150 | ||||
-rw-r--r-- | synapse/util/caches/descriptors.py | 115 | ||||
-rw-r--r-- | synapse/util/caches/expiringcache.py | 29 | ||||
-rw-r--r-- | synapse/util/caches/lrucache.py | 56 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 4 | ||||
-rw-r--r-- | synapse/util/caches/snapshot_cache.py | 94 | ||||
-rw-r--r-- | synapse/util/caches/stream_change_cache.py | 158 | ||||
-rw-r--r-- | synapse/util/caches/treecache.py | 4 | ||||
-rw-r--r-- | synapse/util/caches/ttlcache.py | 2 | ||||
-rw-r--r-- | synapse/util/frozenutils.py | 4 | ||||
-rw-r--r-- | synapse/util/httpresourcetree.py | 2 | ||||
-rw-r--r-- | synapse/util/iterutils.py | 48 | ||||
-rw-r--r-- | synapse/util/metrics.py | 89 | ||||
-rw-r--r-- | synapse/util/module_loader.py | 22 | ||||
-rw-r--r-- | synapse/util/patch_inline_callbacks.py | 224 | ||||
-rw-r--r-- | synapse/util/ratelimitutils.py | 2 | ||||
-rw-r--r-- | synapse/util/retryutils.py | 2 | ||||
-rw-r--r-- | synapse/util/rlimit.py | 2 | ||||
-rw-r--r-- | synapse/util/stringutils.py | 102 | ||||
-rw-r--r-- | synapse/util/versionstring.py | 10 |
22 files changed, 799 insertions, 414 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py index 7856353002..60f0de70f7 100644 --- a/synapse/util/__init__.py +++ b/synapse/util/__init__.py @@ -15,7 +15,6 @@ import logging import re -from itertools import islice import attr @@ -107,22 +106,6 @@ class Clock(object): raise -def batch_iter(iterable, size): - """batch an iterable up into tuples with a maximum size - - Args: - iterable (iterable): the iterable to slice - size (int): the maximum batch size - - Returns: - an iterator over the chunks - """ - # make sure we can deal with iterables like lists too - sourceiter = iter(iterable) - # call islice until it returns an empty tuple - return iter(lambda: tuple(islice(sourceiter, size)), ()) - - def log_failure(failure, msg, consumeErrors=True): """Creates a function suitable for passing to `Deferred.addErrback` that logs any failures that occur. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index f1c46836b1..f7af2bca7f 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -13,12 +13,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import collections import logging from contextlib import contextmanager +from typing import Dict, Sequence, Set, Union from six.moves import range +import attr + from twisted.internet import defer from twisted.internet.defer import CancelledError from twisted.python import failure @@ -69,6 +73,10 @@ class ObservableDeferred(object): def errback(f): object.__setattr__(self, "_result", (False, f)) while self._observers: + # This is a little bit of magic to correctly propagate stack + # traces when we `await` on one of the observer deferreds. + f.value.__failure__ = f + try: # TODO: Handle errors here. self._observers.pop().errback(f) @@ -82,11 +90,12 @@ class ObservableDeferred(object): deferred.addCallbacks(callback, errback) - def observe(self): + def observe(self) -> defer.Deferred: """Observe the underlying deferred. - Can return either a deferred if the underlying deferred is still pending - (or has failed), or the actual value. Callers may need to use maybeDeferred. + This returns a brand new deferred that is resolved when the underlying + deferred is resolved. Interacting with the returned deferred does not + effect the underdlying deferred. """ if not self._result: d = defer.Deferred() @@ -101,7 +110,7 @@ class ObservableDeferred(object): return d else: success, res = self._result - return res if success else defer.fail(res) + return defer.succeed(res) if success else defer.fail(res) def observers(self): return self._observers @@ -134,9 +143,9 @@ def concurrently_execute(func, args, limit): the number of concurrent executions. Args: - func (func): Function to execute, should return a deferred. - args (list): List of arguments to pass to func, each invocation of func - gets a signle argument. + func (func): Function to execute, should return a deferred or coroutine. + args (Iterable): List of arguments to pass to func, each invocation of func + gets a single argument. limit (int): Maximum number of conccurent executions. Returns: @@ -144,11 +153,10 @@ def concurrently_execute(func, args, limit): """ it = iter(args) - @defer.inlineCallbacks - def _concurrently_execute_inner(): + async def _concurrently_execute_inner(): try: while True: - yield func(next(it)) + await maybe_awaitable(func(next(it))) except StopIteration: pass @@ -213,7 +221,21 @@ class Linearizer(object): # the first element is the number of things executing, and # the second element is an OrderedDict, where the keys are deferreds for the # things blocked from executing. - self.key_to_defer = {} + self.key_to_defer = ( + {} + ) # type: Dict[str, Sequence[Union[int, Dict[defer.Deferred, int]]]] + + def is_queued(self, key) -> bool: + """Checks whether there is a process queued up waiting + """ + entry = self.key_to_defer.get(key) + if not entry: + # No entry so nothing is waiting. + return False + + # There are waiting deferreds only in the OrderedDict of deferreds is + # non-empty. + return bool(entry[1]) def queue(self, key): # we avoid doing defer.inlineCallbacks here, so that cancellation works correctly. @@ -303,7 +325,7 @@ class Linearizer(object): ) else: - logger.warn( + logger.warning( "Unexpected exception waiting for linearizer lock %r for key %r", self.name, key, @@ -340,10 +362,10 @@ class ReadWriteLock(object): def __init__(self): # Latest readers queued - self.key_to_current_readers = {} + self.key_to_current_readers = {} # type: Dict[str, Set[defer.Deferred]] # Latest writer queued - self.key_to_current_writer = {} + self.key_to_current_writer = {} # type: Dict[str, defer.Deferred] @defer.inlineCallbacks def read(self, key): @@ -479,3 +501,30 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None): deferred.addCallbacks(success_cb, failure_cb) return new_d + + +@attr.s(slots=True, frozen=True) +class DoneAwaitable(object): + """Simple awaitable that returns the provided value. + """ + + value = attr.ib() + + def __await__(self): + return self + + def __iter__(self): + return self + + def __next__(self): + raise StopIteration(self.value) + + +def maybe_awaitable(value): + """Convert a value to an awaitable if not already an awaitable. + """ + + if hasattr(value, "__await__"): + return value + + return DoneAwaitable(value) diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py index b50e3503f0..dd356bf156 100644 --- a/synapse/util/caches/__init__.py +++ b/synapse/util/caches/__init__.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2015, 2016 OpenMarket Ltd -# Copyright 2019 The Matrix.org Foundation C.I.C. +# Copyright 2019, 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -15,34 +15,24 @@ # limitations under the License. import logging -import os +from sys import intern +from typing import Callable, Dict, Optional -import six -from six.moves import intern +import attr +from prometheus_client.core import Gauge -from prometheus_client.core import REGISTRY, Gauge, GaugeMetricFamily +from synapse.config.cache import add_resizable_cache logger = logging.getLogger(__name__) -CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.5)) - - -def get_cache_factor_for(cache_name): - env_var = "SYNAPSE_CACHE_FACTOR_" + cache_name.upper() - factor = os.environ.get(env_var) - if factor: - return float(factor) - - return CACHE_SIZE_FACTOR - - caches_by_name = {} -collectors_by_name = {} +collectors_by_name = {} # type: Dict cache_size = Gauge("synapse_util_caches_cache:size", "", ["name"]) cache_hits = Gauge("synapse_util_caches_cache:hits", "", ["name"]) cache_evicted = Gauge("synapse_util_caches_cache:evicted_size", "", ["name"]) cache_total = Gauge("synapse_util_caches_cache:total", "", ["name"]) +cache_max_size = Gauge("synapse_util_caches_cache_max_size", "", ["name"]) response_cache_size = Gauge("synapse_util_caches_response_cache:size", "", ["name"]) response_cache_hits = Gauge("synapse_util_caches_response_cache:hits", "", ["name"]) @@ -52,67 +42,82 @@ response_cache_evicted = Gauge( response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["name"]) -def register_cache(cache_type, cache_name, cache, collect_callback=None): - """Register a cache object for metric collection. +@attr.s +class CacheMetric(object): + + _cache = attr.ib() + _cache_type = attr.ib(type=str) + _cache_name = attr.ib(type=str) + _collect_callback = attr.ib(type=Optional[Callable]) + + hits = attr.ib(default=0) + misses = attr.ib(default=0) + evicted_size = attr.ib(default=0) + + def inc_hits(self): + self.hits += 1 + + def inc_misses(self): + self.misses += 1 + + def inc_evictions(self, size=1): + self.evicted_size += size + + def describe(self): + return [] + + def collect(self): + try: + if self._cache_type == "response_cache": + response_cache_size.labels(self._cache_name).set(len(self._cache)) + response_cache_hits.labels(self._cache_name).set(self.hits) + response_cache_evicted.labels(self._cache_name).set(self.evicted_size) + response_cache_total.labels(self._cache_name).set( + self.hits + self.misses + ) + else: + cache_size.labels(self._cache_name).set(len(self._cache)) + cache_hits.labels(self._cache_name).set(self.hits) + cache_evicted.labels(self._cache_name).set(self.evicted_size) + cache_total.labels(self._cache_name).set(self.hits + self.misses) + if getattr(self._cache, "max_size", None): + cache_max_size.labels(self._cache_name).set(self._cache.max_size) + if self._collect_callback: + self._collect_callback() + except Exception as e: + logger.warning("Error calculating metrics for %s: %s", self._cache_name, e) + raise + + +def register_cache( + cache_type: str, + cache_name: str, + cache, + collect_callback: Optional[Callable] = None, + resizable: bool = True, + resize_callback: Optional[Callable] = None, +) -> CacheMetric: + """Register a cache object for metric collection and resizing. Args: - cache_type (str): - cache_name (str): name of the cache - cache (object): cache itself - collect_callback (callable|None): if not None, a function which is called during - metric collection to update additional metrics. + cache_type + cache_name: name of the cache + cache: cache itself + collect_callback: If given, a function which is called during metric + collection to update additional metrics. + resizable: Whether this cache supports being resized. + resize_callback: A function which can be called to resize the cache. Returns: CacheMetric: an object which provides inc_{hits,misses,evictions} methods """ + if resizable: + if not resize_callback: + resize_callback = getattr(cache, "set_cache_factor") + add_resizable_cache(cache_name, resize_callback) - # Check if the metric is already registered. Unregister it, if so. - # This usually happens during tests, as at runtime these caches are - # effectively singletons. + metric = CacheMetric(cache, cache_type, cache_name, collect_callback) metric_name = "cache_%s_%s" % (cache_type, cache_name) - if metric_name in collectors_by_name.keys(): - REGISTRY.unregister(collectors_by_name[metric_name]) - - class CacheMetric(object): - - hits = 0 - misses = 0 - evicted_size = 0 - - def inc_hits(self): - self.hits += 1 - - def inc_misses(self): - self.misses += 1 - - def inc_evictions(self, size=1): - self.evicted_size += size - - def describe(self): - return [] - - def collect(self): - try: - if cache_type == "response_cache": - response_cache_size.labels(cache_name).set(len(cache)) - response_cache_hits.labels(cache_name).set(self.hits) - response_cache_evicted.labels(cache_name).set(self.evicted_size) - response_cache_total.labels(cache_name).set(self.hits + self.misses) - else: - cache_size.labels(cache_name).set(len(cache)) - cache_hits.labels(cache_name).set(self.hits) - cache_evicted.labels(cache_name).set(self.evicted_size) - cache_total.labels(cache_name).set(self.hits + self.misses) - if collect_callback: - collect_callback() - except Exception as e: - logger.warn("Error calculating metrics for %s: %s", cache_name, e) - raise - - yield GaugeMetricFamily("__unused", "") - - metric = CacheMetric() - REGISTRY.register(metric) caches_by_name[cache_name] = cache collectors_by_name[metric_name] = metric return metric @@ -147,9 +152,6 @@ def intern_string(string): return None try: - if six.PY2: - string = string.encode("ascii") - return intern(string) except UnicodeEncodeError: return string diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py index 43f66ec4be..cd48262420 100644 --- a/synapse/util/caches/descriptors.py +++ b/synapse/util/caches/descriptors.py @@ -13,22 +13,24 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + import functools import inspect import logging import threading -from collections import namedtuple +from typing import Any, Tuple, Union, cast +from weakref import WeakValueDictionary from six import itervalues from prometheus_client import Gauge +from typing_extensions import Protocol from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, preserve_fn from synapse.util import unwrapFirstError from synapse.util.async_helpers import ObservableDeferred -from synapse.util.caches import get_cache_factor_for from synapse.util.caches.lrucache import LruCache from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry @@ -36,6 +38,20 @@ from . import register_cache logger = logging.getLogger(__name__) +CacheKey = Union[Tuple, Any] + + +class _CachedFunction(Protocol): + invalidate = None # type: Any + invalidate_all = None # type: Any + invalidate_many = None # type: Any + prefill = None # type: Any + cache = None # type: Any + num_args = None # type: Any + + def __name__(self): + ... + cache_pending_metric = Gauge( "synapse_util_caches_cache_pending", @@ -65,7 +81,6 @@ class CacheEntry(object): class Cache(object): __slots__ = ( "cache", - "max_entries", "name", "keylen", "thread", @@ -73,7 +88,29 @@ class Cache(object): "_pending_deferred_cache", ) - def __init__(self, name, max_entries=1000, keylen=1, tree=False, iterable=False): + def __init__( + self, + name: str, + max_entries: int = 1000, + keylen: int = 1, + tree: bool = False, + iterable: bool = False, + apply_cache_factor_from_config: bool = True, + ): + """ + Args: + name: The name of the cache + max_entries: Maximum amount of entries that the cache will hold + keylen: The length of the tuple used as the cache key + tree: Use a TreeCache instead of a dict as the underlying cache type + iterable: If True, count each item in the cached object as an entry, + rather than each cached object + apply_cache_factor_from_config: Whether cache factors specified in the + config file affect `max_entries` + + Returns: + Cache + """ cache_type = TreeCache if tree else dict self._pending_deferred_cache = cache_type() @@ -83,6 +120,7 @@ class Cache(object): cache_type=cache_type, size_callback=(lambda d: len(d)) if iterable else None, evicted_callback=self._on_evicted, + apply_cache_factor_from_config=apply_cache_factor_from_config, ) self.name = name @@ -95,6 +133,10 @@ class Cache(object): collect_callback=self._metrics_collection_callback, ) + @property + def max_entries(self): + return self.cache.max_size + def _on_evicted(self, evicted_count): self.metrics.inc_evictions(evicted_count) @@ -245,7 +287,9 @@ class Cache(object): class _CacheDescriptorBase(object): - def __init__(self, orig, num_args, inlineCallbacks, cache_context=False): + def __init__( + self, orig: _CachedFunction, num_args, inlineCallbacks, cache_context=False + ): self.orig = orig if inlineCallbacks: @@ -253,7 +297,7 @@ class _CacheDescriptorBase(object): else: self.function_to_call = orig - arg_spec = inspect.getargspec(orig) + arg_spec = inspect.getfullargspec(orig) all_args = arg_spec.args if "cache_context" in all_args: @@ -352,13 +396,11 @@ class CacheDescriptor(_CacheDescriptorBase): cache_context=cache_context, ) - max_entries = int(max_entries * get_cache_factor_for(orig.__name__)) - self.max_entries = max_entries self.tree = tree self.iterable = iterable - def __get__(self, obj, objtype=None): + def __get__(self, obj, owner): cache = Cache( name=self.orig.__name__, max_entries=self.max_entries, @@ -404,7 +446,7 @@ class CacheDescriptor(_CacheDescriptorBase): return tuple(get_cache_key_gen(args, kwargs)) @functools.wraps(self.orig) - def wrapped(*args, **kwargs): + def _wrapped(*args, **kwargs): # If we're passed a cache_context then we'll want to call its invalidate() # whenever we are invalidated invalidate_callback = kwargs.pop("on_invalidate", None) @@ -414,7 +456,7 @@ class CacheDescriptor(_CacheDescriptorBase): # Add our own `cache_context` to argument list if the wrapped function # has asked for one if self.add_cache_context: - kwargs["cache_context"] = _CacheContext(cache, cache_key) + kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key) try: cached_result_d = cache.get(cache_key, callback=invalidate_callback) @@ -422,7 +464,7 @@ class CacheDescriptor(_CacheDescriptorBase): if isinstance(cached_result_d, ObservableDeferred): observer = cached_result_d.observe() else: - observer = cached_result_d + observer = defer.succeed(cached_result_d) except KeyError: ret = defer.maybeDeferred( @@ -440,6 +482,8 @@ class CacheDescriptor(_CacheDescriptorBase): return make_deferred_yieldable(observer) + wrapped = cast(_CachedFunction, _wrapped) + if self.num_args == 1: wrapped.invalidate = lambda key: cache.invalidate(key[0]) wrapped.prefill = lambda key, val: cache.prefill(key[0], val) @@ -464,9 +508,8 @@ class CacheListDescriptor(_CacheDescriptorBase): Given a list of keys it looks in the cache to find any hits, then passes the list of missing keys to the wrapped function. - Once wrapped, the function returns either a Deferred which resolves to - the list of results, or (if all results were cached), just the list of - results. + Once wrapped, the function returns a Deferred which resolves to the list + of results. """ def __init__( @@ -600,21 +643,45 @@ class CacheListDescriptor(_CacheDescriptorBase): ) return make_deferred_yieldable(d) else: - return results + return defer.succeed(results) obj.__dict__[self.orig.__name__] = wrapped return wrapped -class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))): - # We rely on _CacheContext implementing __eq__ and __hash__ sensibly, - # which namedtuple does for us (i.e. two _CacheContext are the same if - # their caches and keys match). This is important in particular to - # dedupe when we add callbacks to lru cache nodes, otherwise the number - # of callbacks would grow. - def invalidate(self): - self.cache.invalidate(self.key) +class _CacheContext: + """Holds cache information from the cached function higher in the calling order. + + Can be used to invalidate the higher level cache entry if something changes + on a lower level. + """ + + _cache_context_objects = ( + WeakValueDictionary() + ) # type: WeakValueDictionary[Tuple[Cache, CacheKey], _CacheContext] + + def __init__(self, cache, cache_key): # type: (Cache, CacheKey) -> None + self._cache = cache + self._cache_key = cache_key + + def invalidate(self): # type: () -> None + """Invalidates the cache entry referred to by the context.""" + self._cache.invalidate(self._cache_key) + + @classmethod + def get_instance(cls, cache, cache_key): # type: (Cache, CacheKey) -> _CacheContext + """Returns an instance constructed with the given arguments. + + A new instance is only created if none already exists. + """ + + # We make sure there are no identical _CacheContext instances. This is + # important in particular to dedupe when we add callbacks to lru cache + # nodes, otherwise the number of callbacks would grow. + return cls._cache_context_objects.setdefault( + (cache, cache_key), cls(cache, cache_key) + ) def cached( diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py index cddf1ed515..2726b67b6d 100644 --- a/synapse/util/caches/expiringcache.py +++ b/synapse/util/caches/expiringcache.py @@ -18,6 +18,7 @@ from collections import OrderedDict from six import iteritems, itervalues +from synapse.config import cache as cache_config from synapse.metrics.background_process_metrics import run_as_background_process from synapse.util.caches import register_cache @@ -51,15 +52,16 @@ class ExpiringCache(object): an item on access. Defaults to False. iterable (bool): If true, the size is calculated by summing the sizes of all entries, rather than the number of entries. - """ self._cache_name = cache_name + self._original_max_size = max_len + + self._max_size = int(max_len * cache_config.properties.default_factor_size) + self._clock = clock - self._max_len = max_len self._expiry_ms = expiry_ms - self._reset_expiry_on_get = reset_expiry_on_get self._cache = OrderedDict() @@ -82,9 +84,11 @@ class ExpiringCache(object): def __setitem__(self, key, value): now = self._clock.time_msec() self._cache[key] = _CacheEntry(now, value) + self.evict() + def evict(self): # Evict if there are now too many items - while self._max_len and len(self) > self._max_len: + while self._max_size and len(self) > self._max_size: _key, value = self._cache.popitem(last=False) if self.iterable: self.metrics.inc_evictions(len(value.value)) @@ -170,6 +174,23 @@ class ExpiringCache(object): else: return len(self._cache) + def set_cache_factor(self, factor: float) -> bool: + """ + Set the cache factor for this individual cache. + + This will trigger a resize if it changes, which may require evicting + items from the cache. + + Returns: + bool: Whether the cache changed size or not. + """ + new_size = int(self._original_max_size * factor) + if new_size != self._max_size: + self._max_size = new_size + self.evict() + return True + return False + class _CacheEntry(object): __slots__ = ["time", "value"] diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py index 1536cb64f3..df4ea5901d 100644 --- a/synapse/util/caches/lrucache.py +++ b/synapse/util/caches/lrucache.py @@ -13,10 +13,11 @@ # See the License for the specific language governing permissions and # limitations under the License. - import threading from functools import wraps +from typing import Callable, Optional, Type, Union +from synapse.config import cache as cache_config from synapse.util.caches.treecache import TreeCache @@ -52,17 +53,18 @@ class LruCache(object): def __init__( self, - max_size, - keylen=1, - cache_type=dict, - size_callback=None, - evicted_callback=None, + max_size: int, + keylen: int = 1, + cache_type: Type[Union[dict, TreeCache]] = dict, + size_callback: Optional[Callable] = None, + evicted_callback: Optional[Callable] = None, + apply_cache_factor_from_config: bool = True, ): """ Args: - max_size (int): + max_size: The maximum amount of entries the cache can hold - keylen (int): + keylen: The length of the tuple used as the cache key cache_type (type): type of underlying cache to be used. Typically one of dict @@ -73,9 +75,24 @@ class LruCache(object): evicted_callback (func(int)|None): if not None, called on eviction with the size of the evicted entry + + apply_cache_factor_from_config (bool): If true, `max_size` will be + multiplied by a cache factor derived from the homeserver config """ cache = cache_type() self.cache = cache # Used for introspection. + self.apply_cache_factor_from_config = apply_cache_factor_from_config + + # Save the original max size, and apply the default size factor. + self._original_max_size = max_size + # We previously didn't apply the cache factor here, and as such some caches were + # not affected by the global cache factor. Add an option here to disable applying + # the cache factor when a cache is created + if apply_cache_factor_from_config: + self.max_size = int(max_size * cache_config.properties.default_factor_size) + else: + self.max_size = int(max_size) + list_root = _Node(None, None, None, None) list_root.next_node = list_root list_root.prev_node = list_root @@ -83,7 +100,7 @@ class LruCache(object): lock = threading.Lock() def evict(): - while cache_len() > max_size: + while cache_len() > self.max_size: todelete = list_root.prev_node evicted_len = delete_node(todelete) cache.pop(todelete.key, None) @@ -236,6 +253,7 @@ class LruCache(object): return key in cache self.sentinel = object() + self._on_resize = evict self.get = cache_get self.set = cache_set self.setdefault = cache_set_default @@ -266,3 +284,23 @@ class LruCache(object): def __contains__(self, key): return self.contains(key) + + def set_cache_factor(self, factor: float) -> bool: + """ + Set the cache factor for this individual cache. + + This will trigger a resize if it changes, which may require evicting + items from the cache. + + Returns: + bool: Whether the cache changed size or not. + """ + if not self.apply_cache_factor_from_config: + return False + + new_size = int(self._original_max_size * factor) + if new_size != self.max_size: + self.max_size = new_size + self._on_resize() + return True + return False diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 82d3eefe0e..a6c60888e5 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -38,7 +38,7 @@ class ResponseCache(object): self.timeout_sec = timeout_ms / 1000.0 self._name = name - self._metrics = register_cache("response_cache", name, self) + self._metrics = register_cache("response_cache", name, self, resizable=False) def size(self): return len(self.pending_result_cache) @@ -144,7 +144,7 @@ class ResponseCache(object): """ result = self.get(key) if not result: - logger.info( + logger.debug( "[%s]: no cached result for [%s], calculating new one", self._name, key ) d = run_in_background(callback, *args, **kwargs) diff --git a/synapse/util/caches/snapshot_cache.py b/synapse/util/caches/snapshot_cache.py deleted file mode 100644 index 8318db8d2c..0000000000 --- a/synapse/util/caches/snapshot_cache.py +++ /dev/null @@ -1,94 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2015, 2016 OpenMarket Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from synapse.util.async_helpers import ObservableDeferred - - -class SnapshotCache(object): - """Cache for snapshots like the response of /initialSync. - The response of initialSync only has to be a recent snapshot of the - server state. It shouldn't matter to clients if it is a few minutes out - of date. - - This caches a deferred response. Until the deferred completes it will be - returned from the cache. This means that if the client retries the request - while the response is still being computed, that original response will be - used rather than trying to compute a new response. - - Once the deferred completes it will removed from the cache after 5 minutes. - We delay removing it from the cache because a client retrying its request - could race with us finishing computing the response. - - Rather than tracking precisely how long something has been in the cache we - keep two generations of completed responses. Every 5 minutes discard the - old generation, move the new generation to the old generation, and set the - new generation to be empty. This means that a result will be in the cache - somewhere between 5 and 10 minutes. - """ - - DURATION_MS = 5 * 60 * 1000 # Cache results for 5 minutes. - - def __init__(self): - self.pending_result_cache = {} # Request that haven't finished yet. - self.prev_result_cache = {} # The older requests that have finished. - self.next_result_cache = {} # The newer requests that have finished. - self.time_last_rotated_ms = 0 - - def rotate(self, time_now_ms): - # Rotate once if the cache duration has passed since the last rotation. - if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: - self.prev_result_cache = self.next_result_cache - self.next_result_cache = {} - self.time_last_rotated_ms += self.DURATION_MS - - # Rotate again if the cache duration has passed twice since the last - # rotation. - if time_now_ms - self.time_last_rotated_ms >= self.DURATION_MS: - self.prev_result_cache = self.next_result_cache - self.next_result_cache = {} - self.time_last_rotated_ms = time_now_ms - - def get(self, time_now_ms, key): - self.rotate(time_now_ms) - # This cache is intended to deduplicate requests, so we expect it to be - # missed most of the time. So we just lookup the key in all of the - # dictionaries rather than trying to short circuit the lookup if the - # key is found. - result = self.prev_result_cache.get(key) - result = self.next_result_cache.get(key, result) - result = self.pending_result_cache.get(key, result) - if result is not None: - return result.observe() - else: - return None - - def set(self, time_now_ms, key, deferred): - self.rotate(time_now_ms) - - result = ObservableDeferred(deferred) - - self.pending_result_cache[key] = result - - def shuffle_along(r): - # When the deferred completes we shuffle it along to the first - # generation of the result cache. So that it will eventually - # expire from the rotation of that cache. - self.next_result_cache[key] = result - self.pending_result_cache.pop(key, None) - return r - - result.addBoth(shuffle_along) - - return result.observe() diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py index 235f64049c..2a161bf244 100644 --- a/synapse/util/caches/stream_change_cache.py +++ b/synapse/util/caches/stream_change_cache.py @@ -14,17 +14,23 @@ # limitations under the License. import logging +import math +from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union from six import integer_types from sortedcontainers import SortedDict +from synapse.types import Collection from synapse.util import caches logger = logging.getLogger(__name__) +# for now, assume all entities in the cache are strings +EntityType = str -class StreamChangeCache(object): + +class StreamChangeCache: """Keeps track of the stream positions of the latest change in a set of entities. Typically the entity will be a room or user id. @@ -34,19 +40,52 @@ class StreamChangeCache(object): old then the cache will simply return all given entities. """ - def __init__(self, name, current_stream_pos, max_size=10000, prefilled_cache=None): - self._max_size = int(max_size * caches.CACHE_SIZE_FACTOR) - self._entity_to_key = {} - self._cache = SortedDict() + def __init__( + self, + name: str, + current_stream_pos: int, + max_size=10000, + prefilled_cache: Optional[Mapping[EntityType, int]] = None, + ): + self._original_max_size = max_size + self._max_size = math.floor(max_size) + self._entity_to_key = {} # type: Dict[EntityType, int] + + # map from stream id to the a set of entities which changed at that stream id. + self._cache = SortedDict() # type: SortedDict[int, Set[EntityType]] + + # the earliest stream_pos for which we can reliably answer + # get_all_entities_changed. In other words, one less than the earliest + # stream_pos for which we know _cache is valid. + # self._earliest_known_stream_pos = current_stream_pos self.name = name - self.metrics = caches.register_cache("cache", self.name, self._cache) + self.metrics = caches.register_cache( + "cache", self.name, self._cache, resize_callback=self.set_cache_factor + ) if prefilled_cache: for entity, stream_pos in prefilled_cache.items(): self.entity_has_changed(entity, stream_pos) - def has_entity_changed(self, entity, stream_pos): + def set_cache_factor(self, factor: float) -> bool: + """ + Set the cache factor for this individual cache. + + This will trigger a resize if it changes, which may require evicting + items from the cache. + + Returns: + bool: Whether the cache changed size or not. + """ + new_size = math.floor(self._original_max_size * factor) + if new_size != self._max_size: + self.max_size = new_size + self._evict() + return True + return False + + def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool: """Returns True if the entity may have been updated since stream_pos """ assert type(stream_pos) in integer_types @@ -67,22 +106,27 @@ class StreamChangeCache(object): self.metrics.inc_hits() return False - def get_entities_changed(self, entities, stream_pos): + def get_entities_changed( + self, entities: Collection[EntityType], stream_pos: int + ) -> Union[Set[EntityType], FrozenSet[EntityType]]: """ Returns subset of entities that have had new things since the given position. Entities unknown to the cache will be returned. If the position is too old it will just return the given list. """ - assert type(stream_pos) is int - - if stream_pos >= self._earliest_known_stream_pos: - changed_entities = { - self._cache[k] - for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)) - } - - result = changed_entities.intersection(entities) - + changed_entities = self.get_all_entities_changed(stream_pos) + if changed_entities is not None: + # We now do an intersection, trying to do so in the most efficient + # way possible (some of these sets are *large*). First check in the + # given iterable is already set that we can reuse, otherwise we + # create a set of the *smallest* of the two iterables and call + # `intersection(..)` on it (this can be twice as fast as the reverse). + if isinstance(entities, (set, frozenset)): + result = entities.intersection(changed_entities) + elif len(changed_entities) < len(entities): + result = set(changed_entities).intersection(entities) + else: + result = set(entities).intersection(changed_entities) self.metrics.inc_hits() else: result = set(entities) @@ -90,13 +134,13 @@ class StreamChangeCache(object): return result - def has_any_entity_changed(self, stream_pos): + def has_any_entity_changed(self, stream_pos: int) -> bool: """Returns if any entity has changed """ assert type(stream_pos) is int if not self._cache: - # If we have no cache, nothing can have changed. + # If the cache is empty, nothing can have changed. return False if stream_pos >= self._earliest_known_stream_pos: @@ -106,42 +150,66 @@ class StreamChangeCache(object): self.metrics.inc_misses() return True - def get_all_entities_changed(self, stream_pos): - """Returns all entites that have had new things since the given + def get_all_entities_changed(self, stream_pos: int) -> Optional[List[EntityType]]: + """Returns all entities that have had new things since the given position. If the position is too old it will return None. + + Returns the entities in the order that they were changed. """ assert type(stream_pos) is int - if stream_pos >= self._earliest_known_stream_pos: - return [ - self._cache[k] - for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)) - ] - else: + if stream_pos < self._earliest_known_stream_pos: return None - def entity_has_changed(self, entity, stream_pos): + changed_entities = [] # type: List[EntityType] + + for k in self._cache.islice(start=self._cache.bisect_right(stream_pos)): + changed_entities.extend(self._cache[k]) + return changed_entities + + def entity_has_changed(self, entity: EntityType, stream_pos: int) -> None: """Informs the cache that the entity has been changed at the given position. """ assert type(stream_pos) is int - if stream_pos > self._earliest_known_stream_pos: - old_pos = self._entity_to_key.get(entity, None) - if old_pos is not None: - stream_pos = max(stream_pos, old_pos) - self._cache.pop(old_pos, None) - self._cache[stream_pos] = entity - self._entity_to_key[entity] = stream_pos - - while len(self._cache) > self._max_size: - k, r = self._cache.popitem(0) - self._earliest_known_stream_pos = max( - k, self._earliest_known_stream_pos - ) - self._entity_to_key.pop(r, None) - - def get_max_pos_of_last_change(self, entity): + if stream_pos <= self._earliest_known_stream_pos: + return + + old_pos = self._entity_to_key.get(entity, None) + if old_pos is not None: + if old_pos >= stream_pos: + # nothing to do + return + e = self._cache[old_pos] + e.remove(entity) + if not e: + # cache at this point is now empty + del self._cache[old_pos] + + e1 = self._cache.get(stream_pos) + if e1 is None: + e1 = self._cache[stream_pos] = set() + e1.add(entity) + self._entity_to_key[entity] = stream_pos + self._evict() + + # if the cache is too big, remove entries + while len(self._cache) > self._max_size: + k, r = self._cache.popitem(0) + self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + for entity in r: + del self._entity_to_key[entity] + + def _evict(self): + while len(self._cache) > self._max_size: + k, r = self._cache.popitem(0) + self._earliest_known_stream_pos = max(k, self._earliest_known_stream_pos) + for entity in r: + self._entity_to_key.pop(entity, None) + + def get_max_pos_of_last_change(self, entity: EntityType) -> int: + """Returns an upper bound of the stream id of the last change to an entity. """ diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py index 9a72218d85..2ea4e4e911 100644 --- a/synapse/util/caches/treecache.py +++ b/synapse/util/caches/treecache.py @@ -1,3 +1,5 @@ +from typing import Dict + from six import itervalues SENTINEL = object() @@ -12,7 +14,7 @@ class TreeCache(object): def __init__(self): self.size = 0 - self.root = {} + self.root = {} # type: Dict def __setitem__(self, key, value): return self.set(key, value) diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py index 99646c7cf0..6437aa907e 100644 --- a/synapse/util/caches/ttlcache.py +++ b/synapse/util/caches/ttlcache.py @@ -38,7 +38,7 @@ class TTLCache(object): self._timer = timer - self._metrics = register_cache("ttl", cache_name, self) + self._metrics = register_cache("ttl", cache_name, self, resizable=False) def set(self, key, value, ttl): """Add/update an entry in the cache diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py index 635b897d6c..9815bb8667 100644 --- a/synapse/util/frozenutils.py +++ b/synapse/util/frozenutils.py @@ -30,7 +30,7 @@ def freeze(o): return o try: - return tuple([freeze(i) for i in o]) + return tuple(freeze(i) for i in o) except TypeError: pass @@ -65,5 +65,5 @@ def _handle_frozendict(obj): ) -# A JSONEncoder which is capable of encoding frozendics without barfing +# A JSONEncoder which is capable of encoding frozendicts without barfing frozendict_json_encoder = json.JSONEncoder(default=_handle_frozendict) diff --git a/synapse/util/httpresourcetree.py b/synapse/util/httpresourcetree.py index 1a20c596bf..3c0e8469f3 100644 --- a/synapse/util/httpresourcetree.py +++ b/synapse/util/httpresourcetree.py @@ -20,7 +20,7 @@ logger = logging.getLogger(__name__) def create_resource_tree(desired_tree, root_resource): - """Create the resource tree for this Home Server. + """Create the resource tree for this homeserver. This in unduly complicated because Twisted does not support putting child resources more than 1 level deep at a time. diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py new file mode 100644 index 0000000000..06faeebe7f --- /dev/null +++ b/synapse/util/iterutils.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Copyright 2014-2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from itertools import islice +from typing import Iterable, Iterator, Sequence, Tuple, TypeVar + +T = TypeVar("T") + + +def batch_iter(iterable: Iterable[T], size: int) -> Iterator[Tuple[T]]: + """batch an iterable up into tuples with a maximum size + + Args: + iterable (iterable): the iterable to slice + size (int): the maximum batch size + + Returns: + an iterator over the chunks + """ + # make sure we can deal with iterables like lists too + sourceiter = iter(iterable) + # call islice until it returns an empty tuple + return iter(lambda: tuple(islice(sourceiter, size)), ()) + + +ISeq = TypeVar("ISeq", bound=Sequence, covariant=True) + + +def chunk_seq(iseq: ISeq, maxlen: int) -> Iterable[ISeq]: + """Split the given sequence into chunks of the given size + + The last chunk may be shorter than the given size. + + If the input is empty, no chunks are returned. + """ + return (iseq[i : i + maxlen] for i in range(0, len(iseq), maxlen)) diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py index 0910930c21..ec61e14423 100644 --- a/synapse/util/metrics.py +++ b/synapse/util/metrics.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import inspect import logging from functools import wraps @@ -20,7 +21,7 @@ from prometheus_client import Counter from twisted.internet import defer -from synapse.logging.context import LoggingContext +from synapse.logging.context import LoggingContext, current_context from synapse.metrics import InFlightGauge logger = logging.getLogger(__name__) @@ -60,14 +61,26 @@ in_flight = InFlightGauge( ) -def measure_func(name): +def measure_func(name=None): def wrapper(func): - @wraps(func) - @defer.inlineCallbacks - def measured_func(self, *args, **kwargs): - with Measure(self.clock, name): - r = yield func(self, *args, **kwargs) - return r + block_name = func.__name__ if name is None else name + + if inspect.iscoroutinefunction(func): + + @wraps(func) + async def measured_func(self, *args, **kwargs): + with Measure(self.clock, block_name): + r = await func(self, *args, **kwargs) + return r + + else: + + @wraps(func) + @defer.inlineCallbacks + def measured_func(self, *args, **kwargs): + with Measure(self.clock, block_name): + r = yield func(self, *args, **kwargs) + return r return measured_func @@ -78,72 +91,48 @@ class Measure(object): __slots__ = [ "clock", "name", - "start_context", + "_logging_context", "start", - "created_context", - "start_usage", ] def __init__(self, clock, name): self.clock = clock self.name = name - self.start_context = None + self._logging_context = None self.start = None - self.created_context = False def __enter__(self): - self.start = self.clock.time() - self.start_context = LoggingContext.current_context() - if not self.start_context: - self.start_context = LoggingContext("Measure") - self.start_context.__enter__() - self.created_context = True - - self.start_usage = self.start_context.get_resource_usage() + if self._logging_context: + raise RuntimeError("Measure() objects cannot be re-used") + self.start = self.clock.time() + parent_context = current_context() + self._logging_context = LoggingContext( + "Measure[%s]" % (self.name,), parent_context + ) + self._logging_context.__enter__() in_flight.register((self.name,), self._update_in_flight) def __exit__(self, exc_type, exc_val, exc_tb): - if isinstance(exc_type, Exception) or not self.start_context: - return - - in_flight.unregister((self.name,), self._update_in_flight) + if not self._logging_context: + raise RuntimeError("Measure() block exited without being entered") duration = self.clock.time() - self.start + usage = self._logging_context.get_resource_usage() - block_counter.labels(self.name).inc() - block_timer.labels(self.name).inc(duration) - - context = LoggingContext.current_context() - - if context != self.start_context: - logger.warn( - "Context has unexpectedly changed from '%s' to '%s'. (%r)", - self.start_context, - context, - self.name, - ) - return - - if not context: - logger.warn("Expected context. (%r)", self.name) - return + in_flight.unregister((self.name,), self._update_in_flight) + self._logging_context.__exit__(exc_type, exc_val, exc_tb) - current = context.get_resource_usage() - usage = current - self.start_usage try: + block_counter.labels(self.name).inc() + block_timer.labels(self.name).inc(duration) block_ru_utime.labels(self.name).inc(usage.ru_utime) block_ru_stime.labels(self.name).inc(usage.ru_stime) block_db_txn_count.labels(self.name).inc(usage.db_txn_count) block_db_txn_duration.labels(self.name).inc(usage.db_txn_duration_sec) block_db_sched_duration.labels(self.name).inc(usage.db_sched_duration_sec) except ValueError: - logger.warn( - "Failed to save metrics! OLD: %r, NEW: %r", self.start_usage, current - ) - - if self.created_context: - self.start_context.__exit__(exc_type, exc_val, exc_tb) + logger.warning("Failed to save metrics! Usage: %s", usage) def _update_in_flight(self, metrics): """Gets called when processing in flight metrics diff --git a/synapse/util/module_loader.py b/synapse/util/module_loader.py index 522acd5aa8..bb62db4637 100644 --- a/synapse/util/module_loader.py +++ b/synapse/util/module_loader.py @@ -14,12 +14,13 @@ # limitations under the License. import importlib +import importlib.util from synapse.config._base import ConfigError def load_module(provider): - """ Loads a module with its config + """ Loads a synapse module with its config Take a dict with keys 'module' (the module name) and 'config' (the config dict). @@ -33,8 +34,25 @@ def load_module(provider): provider_class = getattr(module, clz) try: - provider_config = provider_class.parse_config(provider["config"]) + provider_config = provider_class.parse_config(provider.get("config")) except Exception as e: raise ConfigError("Failed to parse config for %r: %r" % (provider["module"], e)) return provider_class, provider_config + + +def load_python_module(location: str): + """Load a python module, and return a reference to its global namespace + + Args: + location (str): path to the module + + Returns: + python module object + """ + spec = importlib.util.spec_from_file_location(location, location) + if spec is None: + raise Exception("Unable to load module at %s" % (location,)) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) # type: ignore + return mod diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py new file mode 100644 index 0000000000..2605f3c65b --- /dev/null +++ b/synapse/util/patch_inline_callbacks.py @@ -0,0 +1,224 @@ +# -*- coding: utf-8 -*- +# Copyright 2018 New Vector Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +from __future__ import print_function + +import functools +import sys +from typing import Any, Callable, List + +from twisted.internet import defer +from twisted.internet.defer import Deferred +from twisted.python.failure import Failure + +# Tracks if we've already patched inlineCallbacks +_already_patched = False + + +def do_patch(): + """ + Patch defer.inlineCallbacks so that it checks the state of the logcontext on exit + """ + + from synapse.logging.context import current_context + + global _already_patched + + orig_inline_callbacks = defer.inlineCallbacks + if _already_patched: + return + + def new_inline_callbacks(f): + @functools.wraps(f) + def wrapped(*args, **kwargs): + start_context = current_context() + changes = [] # type: List[str] + orig = orig_inline_callbacks(_check_yield_points(f, changes)) + + try: + res = orig(*args, **kwargs) + except Exception: + if current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + + err = "%s changed context from %s to %s on exception" % ( + f, + start_context, + current_context(), + ) + print(err, file=sys.stderr) + raise Exception(err) + raise + + if not isinstance(res, Deferred) or res.called: + if current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + + err = "Completed %s changed context from %s to %s" % ( + f, + start_context, + current_context(), + ) + # print the error to stderr because otherwise all we + # see in travis-ci is the 500 error + print(err, file=sys.stderr) + raise Exception(err) + return res + + if current_context(): + err = ( + "%s returned incomplete deferred in non-sentinel context " + "%s (start was %s)" + ) % (f, current_context(), start_context) + print(err, file=sys.stderr) + raise Exception(err) + + def check_ctx(r): + if current_context() != start_context: + for err in changes: + print(err, file=sys.stderr) + err = "%s completion of %s changed context from %s to %s" % ( + "Failure" if isinstance(r, Failure) else "Success", + f, + start_context, + current_context(), + ) + print(err, file=sys.stderr) + raise Exception(err) + return r + + res.addBoth(check_ctx) + return res + + return wrapped + + defer.inlineCallbacks = new_inline_callbacks + _already_patched = True + + +def _check_yield_points(f: Callable, changes: List[str]): + """Wraps a generator that is about to be passed to defer.inlineCallbacks + checking that after every yield the log contexts are correct. + + It's perfectly valid for log contexts to change within a function, e.g. due + to new Measure blocks, so such changes are added to the given `changes` + list instead of triggering an exception. + + Args: + f: generator function to wrap + changes: A list of strings detailing how the contexts + changed within a function. + + Returns: + function + """ + + from synapse.logging.context import current_context + + @functools.wraps(f) + def check_yield_points_inner(*args, **kwargs): + gen = f(*args, **kwargs) + + last_yield_line_no = gen.gi_frame.f_lineno + result = None # type: Any + while True: + expected_context = current_context() + + try: + isFailure = isinstance(result, Failure) + if isFailure: + d = result.throwExceptionIntoGenerator(gen) + else: + d = gen.send(result) + except (StopIteration, defer._DefGen_Return) as e: + if current_context() != expected_context: + # This happens when the context is lost sometime *after* the + # final yield and returning. E.g. we forgot to yield on a + # function that returns a deferred. + # + # We don't raise here as it's perfectly valid for contexts to + # change in a function, as long as it sets the correct context + # on resolving (which is checked separately). + err = ( + "Function %r returned and changed context from %s to %s," + " in %s between %d and end of func" + % ( + f.__qualname__, + expected_context, + current_context(), + f.__code__.co_filename, + last_yield_line_no, + ) + ) + changes.append(err) + return getattr(e, "value", None) + + frame = gen.gi_frame + + if isinstance(d, defer.Deferred) and not d.called: + # This happens if we yield on a deferred that doesn't follow + # the log context rules without wrapping in a `make_deferred_yieldable`. + # We raise here as this should never happen. + if current_context(): + err = ( + "%s yielded with context %s rather than sentinel," + " yielded on line %d in %s" + % ( + frame.f_code.co_name, + current_context(), + frame.f_lineno, + frame.f_code.co_filename, + ) + ) + raise Exception(err) + + # the wrapped function yielded a Deferred: yield it back up to the parent + # inlineCallbacks(). + try: + result = yield d + except Exception: + # this will fish an earlier Failure out of the stack where possible, and + # thus is preferable to passing in an exeception to the Failure + # constructor, since it results in less stack-mangling. + result = Failure() + + if current_context() != expected_context: + + # This happens because the context is lost sometime *after* the + # previous yield and *after* the current yield. E.g. the + # deferred we waited on didn't follow the rules, or we forgot to + # yield on a function between the two yield points. + # + # We don't raise here as its perfectly valid for contexts to + # change in a function, as long as it sets the correct context + # on resolving (which is checked separately). + err = ( + "%s changed context from %s to %s, happened between lines %d and %d in %s" + % ( + frame.f_code.co_name, + expected_context, + current_context(), + last_yield_line_no, + frame.f_lineno, + frame.f_code.co_filename, + ) + ) + changes.append(err) + + last_yield_line_no = frame.f_lineno + + return check_yield_points_inner diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py index 5ca4521ce3..e5efdfcd02 100644 --- a/synapse/util/ratelimitutils.py +++ b/synapse/util/ratelimitutils.py @@ -43,7 +43,7 @@ class FederationRateLimiter(object): self.ratelimiters = collections.defaultdict(new_limiter) def ratelimit(self, host): - """Used to ratelimit an incoming request from given host + """Used to ratelimit an incoming request from a given host Example usage: diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py index a5f2fbef5c..af69587196 100644 --- a/synapse/util/retryutils.py +++ b/synapse/util/retryutils.py @@ -29,7 +29,7 @@ MIN_RETRY_INTERVAL = 10 * 60 * 1000 RETRY_MULTIPLIER = 5 # a cap on the backoff. (Essentially none) -MAX_RETRY_INTERVAL = 2 ** 63 +MAX_RETRY_INTERVAL = 2 ** 62 class NotRetryingDestination(Exception): diff --git a/synapse/util/rlimit.py b/synapse/util/rlimit.py index 6c0f2bb0cf..207cd17c2a 100644 --- a/synapse/util/rlimit.py +++ b/synapse/util/rlimit.py @@ -33,4 +33,4 @@ def change_resource_limit(soft_file_no): resource.RLIMIT_CORE, (resource.RLIM_INFINITY, resource.RLIM_INFINITY) ) except (ValueError, resource.error) as e: - logger.warn("Failed to set file or core limit: %s", e) + logger.warning("Failed to set file or core limit: %s", e) diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py index 982c6d81ca..08c86e92b8 100644 --- a/synapse/util/stringutils.py +++ b/synapse/util/stringutils.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2014-2016 OpenMarket Ltd +# Copyright 2020 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -12,16 +13,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import itertools import random +import re import string +from collections import Iterable -import six -from six import PY2, PY3 -from six.moves import range +from synapse.api.errors import Codes, SynapseError _string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@" +# https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-register-email-requesttoken +# Note: The : character is allowed here for older clients, but will be removed in a +# future release. Context: https://github.com/matrix-org/synapse/issues/6766 +client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-\:]+$") + # random_string and random_string_with_symbols are used for a range of things, # some cryptographically important, some less so. We use SystemRandom to make sure # we get cryptographically-secure randoms. @@ -37,75 +43,37 @@ def random_string_with_symbols(length): def is_ascii(s): - - if PY3: - if isinstance(s, bytes): - try: - s.decode("ascii").encode("ascii") - except UnicodeDecodeError: - return False - except UnicodeEncodeError: - return False - return True - - try: - s.encode("ascii") - except UnicodeEncodeError: - return False - except UnicodeDecodeError: - return False - else: + if isinstance(s, bytes): + try: + s.decode("ascii").encode("ascii") + except UnicodeDecodeError: + return False + except UnicodeEncodeError: + return False return True -def to_ascii(s): - """Converts a string to ascii if it is ascii, otherwise leave it alone. - - If given None then will return None. - """ - if PY3: - return s - - if s is None: - return None +def assert_valid_client_secret(client_secret): + """Validate that a given string matches the client_secret regex defined by the spec""" + if client_secret_regex.match(client_secret) is None: + raise SynapseError( + 400, "Invalid client_secret parameter", errcode=Codes.INVALID_PARAM + ) - try: - return s.encode("ascii") - except UnicodeEncodeError: - return s +def shortstr(iterable: Iterable, maxitems: int = 5) -> str: + """If iterable has maxitems or fewer, return the stringification of a list + containing those items. -def exception_to_unicode(e): - """Helper function to extract the text of an exception as a unicode string + Otherwise, return the stringification of a a list with the first maxitems items, + followed by "...". Args: - e (Exception): exception to be stringified - - Returns: - unicode + iterable: iterable to truncate + maxitems: number of items to return before truncating """ - # urgh, this is a mess. The basic problem here is that psycopg2 constructs its - # exceptions with PyErr_SetString, with a (possibly non-ascii) argument. str() will - # then produce the raw byte sequence. Under Python 2, this will then cause another - # error if it gets mixed with a `unicode` object, as per - # https://github.com/matrix-org/synapse/issues/4252 - - # First of all, if we're under python3, everything is fine because it will sort this - # nonsense out for us. - if not PY2: - return str(e) - - # otherwise let's have a stab at decoding the exception message. We'll circumvent - # Exception.__str__(), which would explode if someone raised Exception(u'non-ascii') - # and instead look at what is in the args member. - - if len(e.args) == 0: - return "" - elif len(e.args) > 1: - return six.text_type(repr(e.args)) - - msg = e.args[0] - if isinstance(msg, bytes): - return msg.decode("utf-8", errors="replace") - else: - return msg + + items = list(itertools.islice(iterable, maxitems + 1)) + if len(items) <= maxitems: + return str(items) + return "[" + ", ".join(repr(r) for r in items[:maxitems]) + ", ...]" diff --git a/synapse/util/versionstring.py b/synapse/util/versionstring.py index fa404b9d75..ab7d03af3a 100644 --- a/synapse/util/versionstring.py +++ b/synapse/util/versionstring.py @@ -42,6 +42,7 @@ def get_version_string(module): try: null = open(os.devnull, "w") cwd = os.path.dirname(os.path.abspath(module.__file__)) + try: git_branch = ( subprocess.check_output( @@ -51,7 +52,8 @@ def get_version_string(module): .decode("ascii") ) git_branch = "b=" + git_branch - except subprocess.CalledProcessError: + except (subprocess.CalledProcessError, FileNotFoundError): + # FileNotFoundError can arise when git is not installed git_branch = "" try: @@ -63,7 +65,7 @@ def get_version_string(module): .decode("ascii") ) git_tag = "t=" + git_tag - except subprocess.CalledProcessError: + except (subprocess.CalledProcessError, FileNotFoundError): git_tag = "" try: @@ -74,7 +76,7 @@ def get_version_string(module): .strip() .decode("ascii") ) - except subprocess.CalledProcessError: + except (subprocess.CalledProcessError, FileNotFoundError): git_commit = "" try: @@ -89,7 +91,7 @@ def get_version_string(module): ) git_dirty = "dirty" if is_dirty else "" - except subprocess.CalledProcessError: + except (subprocess.CalledProcessError, FileNotFoundError): git_dirty = "" if git_branch or git_tag or git_commit or git_dirty: |