diff options
-rw-r--r-- | changelog.d/11607.misc | 1 | ||||
-rw-r--r-- | synapse/handlers/room.py | 2 | ||||
-rw-r--r-- | synapse/util/async_helpers.py | 22 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 127 | ||||
-rw-r--r-- | tests/util/caches/test_response_cache.py | 45 |
5 files changed, 149 insertions, 48 deletions
diff --git a/changelog.d/11607.misc b/changelog.d/11607.misc new file mode 100644 index 0000000000..e82f467763 --- /dev/null +++ b/changelog.d/11607.misc @@ -0,0 +1 @@ +Improve opentracing support for requests which use a `ResponseCache`. diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index ead2198e14..b9c1cbffa5 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -172,7 +172,7 @@ class RoomCreationHandler: user_id = requester.user.to_string() # Check if this room is already being upgraded by another person - for key in self._upgrade_response_cache.pending_result_cache: + for key in self._upgrade_response_cache.keys(): if key[0] == old_room_id and key[1] != user_id: # Two different people are trying to upgrade the same room. # Send the second an error. diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py index bde99ea878..150a04b53e 100644 --- a/synapse/util/async_helpers.py +++ b/synapse/util/async_helpers.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import abc import collections import inspect import itertools @@ -57,7 +58,26 @@ logger = logging.getLogger(__name__) _T = TypeVar("_T") -class ObservableDeferred(Generic[_T]): +class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta): + """Abstract base class defining the consumer interface of ObservableDeferred""" + + __slots__ = () + + @abc.abstractmethod + def observe(self) -> "defer.Deferred[_T]": + """Add a new observer for this ObservableDeferred + + This returns a brand new deferred that is resolved when the underlying + deferred is resolved. Interacting with the returned deferred does not + effect the underlying deferred. + + Note that the returned Deferred doesn't follow the Synapse logcontext rules - + you will probably want to `make_deferred_yieldable` it. + """ + ... + + +class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]): """Wraps a deferred object so that we can add observer deferreds. These observer deferreds do not affect the callback chain of the original deferred. diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index 88ccf44337..a3eb5f741b 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -12,19 +12,37 @@ # See the License for the specific language governing permissions and # limitations under the License. import logging -from typing import Any, Awaitable, Callable, Dict, Generic, Optional, TypeVar +from typing import ( + TYPE_CHECKING, + Any, + Awaitable, + Callable, + Dict, + Generic, + Iterable, + Optional, + TypeVar, +) import attr from twisted.internet import defer from synapse.logging.context import make_deferred_yieldable, run_in_background +from synapse.logging.opentracing import ( + active_span, + start_active_span, + start_active_span_follows_from, +) from synapse.util import Clock -from synapse.util.async_helpers import ObservableDeferred +from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred from synapse.util.caches import register_cache logger = logging.getLogger(__name__) +if TYPE_CHECKING: + import opentracing + # the type of the key in the cache KV = TypeVar("KV") @@ -54,6 +72,20 @@ class ResponseCacheContext(Generic[KV]): """ +@attr.s(auto_attribs=True) +class ResponseCacheEntry: + result: AbstractObservableDeferred + """The (possibly incomplete) result of the operation. + + Note that we continue to store an ObservableDeferred even after the operation + completes (rather than switching to an immediate value), since that makes it + easier to cache Failure results. + """ + + opentracing_span_context: "Optional[opentracing.SpanContext]" + """The opentracing span which generated/is generating the result""" + + class ResponseCache(Generic[KV]): """ This caches a deferred response. Until the deferred completes it will be @@ -63,10 +95,7 @@ class ResponseCache(Generic[KV]): """ def __init__(self, clock: Clock, name: str, timeout_ms: float = 0): - # This is poorly-named: it includes both complete and incomplete results. - # We keep complete results rather than switching to absolute values because - # that makes it easier to cache Failure results. - self.pending_result_cache: Dict[KV, ObservableDeferred] = {} + self._result_cache: Dict[KV, ResponseCacheEntry] = {} self.clock = clock self.timeout_sec = timeout_ms / 1000.0 @@ -75,56 +104,63 @@ class ResponseCache(Generic[KV]): self._metrics = register_cache("response_cache", name, self, resizable=False) def size(self) -> int: - return len(self.pending_result_cache) + return len(self._result_cache) def __len__(self) -> int: return self.size() - def get(self, key: KV) -> Optional[defer.Deferred]: - """Look up the given key. + def keys(self) -> Iterable[KV]: + """Get the keys currently in the result cache - Returns a new Deferred (which also doesn't follow the synapse - logcontext rules). You will probably want to make_deferred_yieldable the result. + Returns both incomplete entries, and (if the timeout on this cache is non-zero), + complete entries which are still in the cache. - If there is no entry for the key, returns None. + Note that the returned iterator is not safe in the face of concurrent execution: + behaviour is undefined if `wrap` is called during iteration. + """ + return self._result_cache.keys() + + def _get(self, key: KV) -> Optional[ResponseCacheEntry]: + """Look up the given key. Args: - key: key to get/set in the cache + key: key to get in the cache Returns: - None if there is no entry for this key; otherwise a deferred which - resolves to the result. + The entry for this key, if any; else None. """ - result = self.pending_result_cache.get(key) - if result is not None: + entry = self._result_cache.get(key) + if entry is not None: self._metrics.inc_hits() - return result.observe() + return entry else: self._metrics.inc_misses() return None def _set( - self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]" - ) -> "defer.Deferred[RV]": + self, + context: ResponseCacheContext[KV], + deferred: "defer.Deferred[RV]", + opentracing_span_context: "Optional[opentracing.SpanContext]", + ) -> ResponseCacheEntry: """Set the entry for the given key to the given deferred. *deferred* should run its callbacks in the sentinel logcontext (ie, you should wrap normal synapse deferreds with synapse.logging.context.run_in_background). - Returns a new Deferred (which also doesn't follow the synapse logcontext rules). - You will probably want to make_deferred_yieldable the result. - Args: context: Information about the cache miss deferred: The deferred which resolves to the result. + opentracing_span_context: An opentracing span wrapping the calculation Returns: - A new deferred which resolves to the actual result. + The cache entry object. """ result = ObservableDeferred(deferred, consumeErrors=True) key = context.cache_key - self.pending_result_cache[key] = result + entry = ResponseCacheEntry(result, opentracing_span_context) + self._result_cache[key] = entry def on_complete(r: RV) -> RV: # if this cache has a non-zero timeout, and the callback has not cleared @@ -132,18 +168,18 @@ class ResponseCache(Generic[KV]): # its removal later. if self.timeout_sec and context.should_cache: self.clock.call_later( - self.timeout_sec, self.pending_result_cache.pop, key, None + self.timeout_sec, self._result_cache.pop, key, None ) else: # otherwise, remove the result immediately. - self.pending_result_cache.pop(key, None) + self._result_cache.pop(key, None) return r - # make sure we do this *after* adding the entry to pending_result_cache, + # make sure we do this *after* adding the entry to result_cache, # in case the result is already complete (in which case flipping the order would # leave us with a stuck entry in the cache). result.addBoth(on_complete) - return result.observe() + return entry async def wrap( self, @@ -189,20 +225,41 @@ class ResponseCache(Generic[KV]): Returns: The result of the callback (from the cache, or otherwise) """ - result = self.get(key) - if not result: + entry = self._get(key) + if not entry: logger.debug( "[%s]: no cached result for [%s], calculating new one", self._name, key ) context = ResponseCacheContext(cache_key=key) if cache_context: kwargs["cache_context"] = context - d = run_in_background(callback, *args, **kwargs) - result = self._set(context, d) - elif not isinstance(result, defer.Deferred) or result.called: + + span_context: Optional[opentracing.SpanContext] = None + + async def cb() -> RV: + # NB it is important that we do not `await` before setting span_context! + nonlocal span_context + with start_active_span(f"ResponseCache[{self._name}].calculate"): + span = active_span() + if span: + span_context = span.context + return await callback(*args, **kwargs) + + d = run_in_background(cb) + entry = self._set(context, d, span_context) + return await make_deferred_yieldable(entry.result.observe()) + + result = entry.result.observe() + if result.called: logger.info("[%s]: using completed cached result for [%s]", self._name, key) else: logger.info( "[%s]: using incomplete cached result for [%s]", self._name, key ) - return await make_deferred_yieldable(result) + + span_context = entry.opentracing_span_context + with start_active_span_follows_from( + f"ResponseCache[{self._name}].wait", + contexts=(span_context,) if span_context else (), + ): + return await make_deferred_yieldable(result) diff --git a/tests/util/caches/test_response_cache.py b/tests/util/caches/test_response_cache.py index 1e83ef2f33..025b73e32f 100644 --- a/tests/util/caches/test_response_cache.py +++ b/tests/util/caches/test_response_cache.py @@ -11,6 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. + +from unittest.mock import Mock + from parameterized import parameterized from twisted.internet import defer @@ -60,10 +63,15 @@ class ResponseCacheTestCase(TestCase): self.successResultOf(wrap_d), "initial wrap result should be the same", ) + + # a second call should return the result without a call to the wrapped function + unexpected = Mock(spec=()) + wrap2_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), - "cache should have the result", + self.successResultOf(wrap2_d), + "cache should still have the result", ) def test_cache_miss(self): @@ -80,7 +88,7 @@ class ResponseCacheTestCase(TestCase): self.successResultOf(wrap_d), "initial wrap result should be the same", ) - self.assertIsNone(cache.get(0), "cache should not have the result now") + self.assertCountEqual([], cache.keys(), "cache should not have the result now") def test_cache_expire(self): cache = self.with_cache("short_cache", ms=1000) @@ -92,16 +100,20 @@ class ResponseCacheTestCase(TestCase): ) self.assertEqual(expected_result, self.successResultOf(wrap_d)) + + # a second call should return the result without a call to the wrapped function + unexpected = Mock(spec=()) + wrap2_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), + self.successResultOf(wrap2_d), "cache should still have the result", ) # cache eviction timer is handled self.reactor.pump((2,)) - - self.assertIsNone(cache.get(0), "cache should not have the result now") + self.assertCountEqual([], cache.keys(), "cache should not have the result now") def test_cache_wait_hit(self): cache = self.with_cache("neutral_cache") @@ -133,16 +145,21 @@ class ResponseCacheTestCase(TestCase): self.reactor.pump((1, 1)) self.assertEqual(expected_result, self.successResultOf(wrap_d)) + + # a second call should immediately return the result without a call to the + # wrapped function + unexpected = Mock(spec=()) + wrap2_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), + self.successResultOf(wrap2_d), "cache should still have the result", ) # (1 + 1 + 2) > 3.0, cache eviction timer is handled self.reactor.pump((2,)) - - self.assertIsNone(cache.get(0), "cache should not have the result now") + self.assertCountEqual([], cache.keys(), "cache should not have the result now") @parameterized.expand([(True,), (False,)]) def test_cache_context_nocache(self, should_cache: bool): @@ -183,10 +200,16 @@ class ResponseCacheTestCase(TestCase): self.assertEqual(expected_result, self.successResultOf(wrap2_d)) if should_cache: + unexpected = Mock(spec=()) + wrap3_d = defer.ensureDeferred(cache.wrap(0, unexpected)) + unexpected.assert_not_called() self.assertEqual( expected_result, - self.successResultOf(cache.get(0)), + self.successResultOf(wrap3_d), "cache should still have the result", ) + else: - self.assertIsNone(cache.get(0), "cache should not have the result") + self.assertCountEqual( + [], cache.keys(), "cache should not have the result now" + ) |