diff --git a/synapse/util/async.py b/synapse/util/async.py
index 83875edc85..35380bf8ed 100644
--- a/synapse/util/async.py
+++ b/synapse/util/async.py
@@ -192,8 +192,11 @@ class Linearizer(object):
logger.info(
"Waiting to acquire linearizer lock %r for key %r", self.name, key
)
- with PreserveLoggingContext():
- yield current_defer
+ try:
+ with PreserveLoggingContext():
+ yield current_defer
+ except:
+ logger.exception("Unexpected exception in Linearizer")
logger.info("Acquired linearizer lock %r for key %r", self.name, key)
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index ebd715c5dc..8a7774a88e 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -40,8 +40,8 @@ def register_cache(name, cache):
)
-_string_cache = LruCache(int(5000 * CACHE_SIZE_FACTOR))
-caches_by_name["string_cache"] = _string_cache
+_string_cache = LruCache(int(100000 * CACHE_SIZE_FACTOR))
+_stirng_cache_metrics = register_cache("string_cache", _string_cache)
KNOWN_KEYS = {
@@ -69,7 +69,12 @@ KNOWN_KEYS = {
def intern_string(string):
"""Takes a (potentially) unicode string and interns using custom cache
"""
- return _string_cache.setdefault(string, string)
+ new_str = _string_cache.setdefault(string, string)
+ if new_str is string:
+ _stirng_cache_metrics.inc_hits()
+ else:
+ _stirng_cache_metrics.inc_misses()
+ return new_str
def intern_dict(dictionary):
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 8dba61d49f..998de70d29 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -17,7 +17,7 @@ import logging
from synapse.util.async import ObservableDeferred
from synapse.util import unwrapFirstError
from synapse.util.caches.lrucache import LruCache
-from synapse.util.caches.treecache import TreeCache
+from synapse.util.caches.treecache import TreeCache, iterate_tree_cache_entry
from synapse.util.logcontext import (
PreserveLoggingContext, preserve_context_over_deferred, preserve_context_over_fn
)
@@ -42,6 +42,25 @@ _CacheSentinel = object()
CACHE_SIZE_FACTOR = float(os.environ.get("SYNAPSE_CACHE_FACTOR", 0.1))
+class CacheEntry(object):
+ __slots__ = [
+ "deferred", "sequence", "callbacks", "invalidated"
+ ]
+
+ def __init__(self, deferred, sequence, callbacks):
+ self.deferred = deferred
+ self.sequence = sequence
+ self.callbacks = set(callbacks)
+ self.invalidated = False
+
+ def invalidate(self):
+ if not self.invalidated:
+ self.invalidated = True
+ for callback in self.callbacks:
+ callback()
+ self.callbacks.clear()
+
+
class Cache(object):
__slots__ = (
"cache",
@@ -51,12 +70,16 @@ class Cache(object):
"sequence",
"thread",
"metrics",
+ "_pending_deferred_cache",
)
- def __init__(self, name, max_entries=1000, keylen=1, tree=False):
+ def __init__(self, name, max_entries=1000, keylen=1, tree=False, iterable=False):
cache_type = TreeCache if tree else dict
+ self._pending_deferred_cache = cache_type()
+
self.cache = LruCache(
- max_size=max_entries, keylen=keylen, cache_type=cache_type
+ max_size=max_entries, keylen=keylen, cache_type=cache_type,
+ size_callback=(lambda d: len(d.result)) if iterable else None,
)
self.name = name
@@ -76,7 +99,15 @@ class Cache(object):
)
def get(self, key, default=_CacheSentinel, callback=None):
- val = self.cache.get(key, _CacheSentinel, callback=callback)
+ callbacks = [callback] if callback else []
+ val = self._pending_deferred_cache.get(key, _CacheSentinel)
+ if val is not _CacheSentinel:
+ if val.sequence == self.sequence:
+ val.callbacks.update(callbacks)
+ self.metrics.inc_hits()
+ return val.deferred
+
+ val = self.cache.get(key, _CacheSentinel, callbacks=callbacks)
if val is not _CacheSentinel:
self.metrics.inc_hits()
return val
@@ -88,15 +119,39 @@ class Cache(object):
else:
return default
- def update(self, sequence, key, value, callback=None):
+ def set(self, key, value, callback=None):
+ callbacks = [callback] if callback else []
self.check_thread()
- if self.sequence == sequence:
- # Only update the cache if the caches sequence number matches the
- # number that the cache had before the SELECT was started (SYN-369)
- self.prefill(key, value, callback=callback)
+ entry = CacheEntry(
+ deferred=value,
+ sequence=self.sequence,
+ callbacks=callbacks,
+ )
+
+ entry.callbacks.update(callbacks)
+
+ existing_entry = self._pending_deferred_cache.pop(key, None)
+ if existing_entry:
+ existing_entry.invalidate()
+
+ self._pending_deferred_cache[key] = entry
+
+ def shuffle(result):
+ if self.sequence == entry.sequence:
+ existing_entry = self._pending_deferred_cache.pop(key, None)
+ if existing_entry is entry:
+ self.cache.set(key, entry.deferred, entry.callbacks)
+ else:
+ entry.invalidate()
+ else:
+ entry.invalidate()
+ return result
+
+ entry.deferred.addCallback(shuffle)
def prefill(self, key, value, callback=None):
- self.cache.set(key, value, callback=callback)
+ callbacks = [callback] if callback else []
+ self.cache.set(key, value, callbacks=callbacks)
def invalidate(self, key):
self.check_thread()
@@ -108,6 +163,10 @@ class Cache(object):
# Increment the sequence number so that any SELECT statements that
# raced with the INSERT don't update the cache (SYN-369)
self.sequence += 1
+ entry = self._pending_deferred_cache.pop(key, None)
+ if entry:
+ entry.invalidate()
+
self.cache.pop(key, None)
def invalidate_many(self, key):
@@ -119,6 +178,11 @@ class Cache(object):
self.sequence += 1
self.cache.del_multi(key)
+ entry_dict = self._pending_deferred_cache.pop(key, None)
+ if entry_dict is not None:
+ for entry in iterate_tree_cache_entry(entry_dict):
+ entry.invalidate()
+
def invalidate_all(self):
self.check_thread()
self.sequence += 1
@@ -155,7 +219,7 @@ class CacheDescriptor(object):
"""
def __init__(self, orig, max_entries=1000, num_args=1, tree=False,
- inlineCallbacks=False, cache_context=False):
+ inlineCallbacks=False, cache_context=False, iterable=False):
max_entries = int(max_entries * CACHE_SIZE_FACTOR)
self.orig = orig
@@ -169,6 +233,8 @@ class CacheDescriptor(object):
self.num_args = num_args
self.tree = tree
+ self.iterable = iterable
+
all_args = inspect.getargspec(orig)
self.arg_names = all_args.args[1:num_args + 1]
@@ -203,6 +269,7 @@ class CacheDescriptor(object):
max_entries=self.max_entries,
keylen=self.num_args,
tree=self.tree,
+ iterable=self.iterable,
)
@functools.wraps(self.orig)
@@ -243,11 +310,6 @@ class CacheDescriptor(object):
return preserve_context_over_deferred(observer)
except KeyError:
- # Get the sequence number of the cache before reading from the
- # database so that we can tell if the cache is invalidated
- # while the SELECT is executing (SYN-369)
- sequence = cache.sequence
-
ret = defer.maybeDeferred(
preserve_context_over_fn,
self.function_to_call,
@@ -261,7 +323,7 @@ class CacheDescriptor(object):
ret.addErrback(onErr)
ret = ObservableDeferred(ret, consumeErrors=True)
- cache.update(sequence, cache_key, ret, callback=invalidate_callback)
+ cache.set(cache_key, ret, callback=invalidate_callback)
return preserve_context_over_deferred(ret.observe())
@@ -359,7 +421,6 @@ class CacheListDescriptor(object):
missing.append(arg)
if missing:
- sequence = cache.sequence
args_to_call = dict(arg_dict)
args_to_call[self.list_name] = missing
@@ -382,8 +443,8 @@ class CacheListDescriptor(object):
key = list(keyargs)
key[self.list_pos] = arg
- cache.update(
- sequence, tuple(key), observer,
+ cache.set(
+ tuple(key), observer,
callback=invalidate_callback
)
@@ -417,21 +478,29 @@ class CacheListDescriptor(object):
class _CacheContext(namedtuple("_CacheContext", ("cache", "key"))):
+ # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
+ # which namedtuple does for us (i.e. two _CacheContext are the same if
+ # their caches and keys match). This is important in particular to
+ # dedupe when we add callbacks to lru cache nodes, otherwise the number
+ # of callbacks would grow.
def invalidate(self):
self.cache.invalidate(self.key)
-def cached(max_entries=1000, num_args=1, tree=False, cache_context=False):
+def cached(max_entries=1000, num_args=1, tree=False, cache_context=False,
+ iterable=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
num_args=num_args,
tree=tree,
cache_context=cache_context,
+ iterable=iterable,
)
-def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False):
+def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_context=False,
+ iterable=False):
return lambda orig: CacheDescriptor(
orig,
max_entries=max_entries,
@@ -439,6 +508,7 @@ def cachedInlineCallbacks(max_entries=1000, num_args=1, tree=False, cache_contex
tree=tree,
inlineCallbacks=True,
cache_context=cache_context,
+ iterable=iterable,
)
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index b0ca1bb79d..cb6933c61c 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -23,7 +23,9 @@ import logging
logger = logging.getLogger(__name__)
-DictionaryEntry = namedtuple("DictionaryEntry", ("full", "value"))
+class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "value"))):
+ def __len__(self):
+ return len(self.value)
class DictionaryCache(object):
@@ -32,7 +34,7 @@ class DictionaryCache(object):
"""
def __init__(self, name, max_entries=1000):
- self.cache = LruCache(max_size=max_entries)
+ self.cache = LruCache(max_size=max_entries, size_callback=len)
self.name = name
self.sequence = 0
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 080388958f..2987c38a2d 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -15,6 +15,7 @@
from synapse.util.caches import register_cache
+from collections import OrderedDict
import logging
@@ -23,7 +24,7 @@ logger = logging.getLogger(__name__)
class ExpiringCache(object):
def __init__(self, cache_name, clock, max_len=0, expiry_ms=0,
- reset_expiry_on_get=False):
+ reset_expiry_on_get=False, iterable=False):
"""
Args:
cache_name (str): Name of this cache, used for logging.
@@ -36,6 +37,8 @@ class ExpiringCache(object):
evicted based on time.
reset_expiry_on_get (bool): If true, will reset the expiry time for
an item on access. Defaults to False.
+ iterable (bool): If true, the size is calculated by summing the
+ sizes of all entries, rather than the number of entries.
"""
self._cache_name = cache_name
@@ -47,9 +50,13 @@ class ExpiringCache(object):
self._reset_expiry_on_get = reset_expiry_on_get
- self._cache = {}
+ self._cache = OrderedDict()
- self.metrics = register_cache(cache_name, self._cache)
+ self.metrics = register_cache(cache_name, self)
+
+ self.iterable = iterable
+
+ self._size_estimate = 0
def start(self):
if not self._expiry_ms:
@@ -65,15 +72,14 @@ class ExpiringCache(object):
now = self._clock.time_msec()
self._cache[key] = _CacheEntry(now, value)
- # Evict if there are now too many items
- if self._max_len and len(self._cache.keys()) > self._max_len:
- sorted_entries = sorted(
- self._cache.items(),
- key=lambda item: item[1].time,
- )
+ if self.iterable:
+ self._size_estimate += len(value)
- for k, _ in sorted_entries[self._max_len:]:
- self._cache.pop(k)
+ # Evict if there are now too many items
+ while self._max_len and len(self) > self._max_len:
+ _key, value = self._cache.popitem(last=False)
+ if self.iterable:
+ self._size_estimate -= len(value.value)
def __getitem__(self, key):
try:
@@ -99,7 +105,7 @@ class ExpiringCache(object):
# zero expiry time means don't expire. This should never get called
# since we have this check in start too.
return
- begin_length = len(self._cache)
+ begin_length = len(self)
now = self._clock.time_msec()
@@ -110,15 +116,20 @@ class ExpiringCache(object):
keys_to_delete.add(key)
for k in keys_to_delete:
- self._cache.pop(k)
+ value = self._cache.pop(k)
+ if self.iterable:
+ self._size_estimate -= len(value.value)
logger.debug(
"[%s] _prune_cache before: %d, after len: %d",
- self._cache_name, begin_length, len(self._cache)
+ self._cache_name, begin_length, len(self)
)
def __len__(self):
- return len(self._cache)
+ if self.iterable:
+ return self._size_estimate
+ else:
+ return len(self._cache)
class _CacheEntry(object):
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 9c4c679175..cf5fbb679c 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -49,7 +49,7 @@ class LruCache(object):
Can also set callbacks on objects when getting/setting which are fired
when that key gets invalidated/evicted.
"""
- def __init__(self, max_size, keylen=1, cache_type=dict):
+ def __init__(self, max_size, keylen=1, cache_type=dict, size_callback=None):
cache = cache_type()
self.cache = cache # Used for introspection.
list_root = _Node(None, None, None, None)
@@ -58,6 +58,12 @@ class LruCache(object):
lock = threading.Lock()
+ def evict():
+ while cache_len() > max_size:
+ todelete = list_root.prev_node
+ delete_node(todelete)
+ cache.pop(todelete.key, None)
+
def synchronized(f):
@wraps(f)
def inner(*args, **kwargs):
@@ -66,6 +72,16 @@ class LruCache(object):
return inner
+ cached_cache_len = [0]
+ if size_callback is not None:
+ def cache_len():
+ return cached_cache_len[0]
+ else:
+ def cache_len():
+ return len(cache)
+
+ self.len = synchronized(cache_len)
+
def add_node(key, value, callbacks=set()):
prev_node = list_root
next_node = prev_node.next_node
@@ -74,6 +90,9 @@ class LruCache(object):
next_node.prev_node = node
cache[key] = node
+ if size_callback:
+ cached_cache_len[0] += size_callback(node.value)
+
def move_node_to_front(node):
prev_node = node.prev_node
next_node = node.next_node
@@ -92,23 +111,25 @@ class LruCache(object):
prev_node.next_node = next_node
next_node.prev_node = prev_node
+ if size_callback:
+ cached_cache_len[0] -= size_callback(node.value)
+
for cb in node.callbacks:
cb()
node.callbacks.clear()
@synchronized
- def cache_get(key, default=None, callback=None):
+ def cache_get(key, default=None, callbacks=[]):
node = cache.get(key, None)
if node is not None:
move_node_to_front(node)
- if callback:
- node.callbacks.add(callback)
+ node.callbacks.update(callbacks)
return node.value
else:
return default
@synchronized
- def cache_set(key, value, callback=None):
+ def cache_set(key, value, callbacks=[]):
node = cache.get(key, None)
if node is not None:
if value != node.value:
@@ -116,21 +137,18 @@ class LruCache(object):
cb()
node.callbacks.clear()
- if callback:
- node.callbacks.add(callback)
+ if size_callback:
+ cached_cache_len[0] -= size_callback(node.value)
+ cached_cache_len[0] += size_callback(value)
+
+ node.callbacks.update(callbacks)
move_node_to_front(node)
node.value = value
else:
- if callback:
- callbacks = set([callback])
- else:
- callbacks = set()
- add_node(key, value, callbacks)
- if len(cache) > max_size:
- todelete = list_root.prev_node
- delete_node(todelete)
- cache.pop(todelete.key, None)
+ add_node(key, value, set(callbacks))
+
+ evict()
@synchronized
def cache_set_default(key, value):
@@ -139,10 +157,7 @@ class LruCache(object):
return node.value
else:
add_node(key, value)
- if len(cache) > max_size:
- todelete = list_root.prev_node
- delete_node(todelete)
- cache.pop(todelete.key, None)
+ evict()
return value
@synchronized
@@ -174,10 +189,8 @@ class LruCache(object):
for cb in node.callbacks:
cb()
cache.clear()
-
- @synchronized
- def cache_len():
- return len(cache)
+ if size_callback:
+ cached_cache_len[0] = 0
@synchronized
def cache_contains(key):
@@ -190,7 +203,7 @@ class LruCache(object):
self.pop = cache_pop
if cache_type is TreeCache:
self.del_multi = cache_del_multi
- self.len = cache_len
+ self.len = synchronized(cache_len)
self.contains = cache_contains
self.clear = cache_clear
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index c31585aea3..fcc341a6b7 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -65,12 +65,27 @@ class TreeCache(object):
return popped
def values(self):
- return [e.value for e in self.root.values()]
+ return list(iterate_tree_cache_entry(self.root))
def __len__(self):
return self.size
+def iterate_tree_cache_entry(d):
+ """Helper function to iterate over the leaves of a tree, i.e. a dict of that
+ can contain dicts.
+ """
+ if isinstance(d, dict):
+ for value_d in d.itervalues():
+ for value in iterate_tree_cache_entry(value_d):
+ yield value
+ else:
+ if isinstance(d, _Entry):
+ yield d.value
+ else:
+ yield d
+
+
class _Entry(object):
__slots__ = ["value"]
diff --git a/synapse/util/debug.py b/synapse/util/debug.py
deleted file mode 100644
index dc49162e6a..0000000000
--- a/synapse/util/debug.py
+++ /dev/null
@@ -1,71 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2015, 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from twisted.internet import defer, reactor
-from functools import wraps
-from synapse.util.logcontext import LoggingContext, PreserveLoggingContext
-
-
-def debug_deferreds():
- """Cause all deferreds to wait for a reactor tick before running their
- callbacks. This increases the chance of getting a stack trace out of
- a defer.inlineCallback since the code waiting on the deferred will get
- a chance to add an errback before the deferred runs."""
-
- # Helper method for retrieving and restoring the current logging context
- # around a callback.
- def with_logging_context(fn):
- context = LoggingContext.current_context()
-
- def restore_context_callback(x):
- with PreserveLoggingContext(context):
- return fn(x)
-
- return restore_context_callback
-
- # We are going to modify the __init__ method of defer.Deferred so we
- # need to get a copy of the old method so we can still call it.
- old__init__ = defer.Deferred.__init__
-
- # We need to create a deferred to bounce the callbacks through the reactor
- # but we don't want to add a callback when we create that deferred so we
- # we create a new type of deferred that uses the old __init__ method.
- # This is safe as long as the old __init__ method doesn't invoke an
- # __init__ using super.
- class Bouncer(defer.Deferred):
- __init__ = old__init__
-
- # We'll add this as a callback to all Deferreds. Twisted will wait until
- # the bouncer deferred resolves before calling the callbacks of the
- # original deferred.
- def bounce_callback(x):
- bouncer = Bouncer()
- reactor.callLater(0, with_logging_context(bouncer.callback), x)
- return bouncer
-
- # We'll add this as an errback to all Deferreds. Twisted will wait until
- # the bouncer deferred resolves before calling the errbacks of the
- # original deferred.
- def bounce_errback(x):
- bouncer = Bouncer()
- reactor.callLater(0, with_logging_context(bouncer.errback), x)
- return bouncer
-
- @wraps(old__init__)
- def new__init__(self, *args, **kargs):
- old__init__(self, *args, **kargs)
- self.addCallbacks(bounce_callback, bounce_errback)
-
- defer.Deferred.__init__ = new__init__
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index e2de7fce91..153ef001ad 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -88,7 +88,7 @@ class RetryDestinationLimiter(object):
def __init__(self, destination, clock, store, retry_interval,
min_retry_interval=10 * 60 * 1000,
max_retry_interval=24 * 60 * 60 * 1000,
- multiplier_retry_interval=5,):
+ multiplier_retry_interval=5, backoff_on_404=False):
"""Marks the destination as "down" if an exception is thrown in the
context, except for CodeMessageException with code < 500.
@@ -107,6 +107,7 @@ class RetryDestinationLimiter(object):
a failed request, in milliseconds.
multiplier_retry_interval (int): The multiplier to use to increase
the retry interval after a failed request.
+ backoff_on_404 (bool): Back off if we get a 404
"""
self.clock = clock
self.store = store
@@ -116,6 +117,7 @@ class RetryDestinationLimiter(object):
self.min_retry_interval = min_retry_interval
self.max_retry_interval = max_retry_interval
self.multiplier_retry_interval = multiplier_retry_interval
+ self.backoff_on_404 = backoff_on_404
def __enter__(self):
pass
@@ -123,7 +125,22 @@ class RetryDestinationLimiter(object):
def __exit__(self, exc_type, exc_val, exc_tb):
valid_err_code = False
if exc_type is not None and issubclass(exc_type, CodeMessageException):
- valid_err_code = exc_val.code != 429 and 0 <= exc_val.code < 500
+ # Some error codes are perfectly fine for some APIs, whereas other
+ # APIs may expect to never received e.g. a 404. It's important to
+ # handle 404 as some remote servers will return a 404 when the HS
+ # has been decommissioned.
+ # If we get a 401, then we should probably back off since they
+ # won't accept our requests for at least a while.
+ # 429 is us being aggresively rate limited, so lets rate limit
+ # ourselves.
+ if exc_val.code == 404 and self.backoff_on_404:
+ valid_err_code = False
+ elif exc_val.code in (401, 429):
+ valid_err_code = False
+ elif exc_val.code < 500:
+ valid_err_code = True
+ else:
+ valid_err_code = False
if exc_type is None or valid_err_code:
# We connected successfully.
|