diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 95f23e27b6..f157132210 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -14,9 +14,8 @@
import json
import logging
-import re
import typing
-from typing import Any, Callable, Dict, Generator, Optional, Pattern
+from typing import Any, Callable, Dict, Generator, Optional
import attr
from frozendict import frozendict
@@ -35,9 +34,6 @@ if typing.TYPE_CHECKING:
logger = logging.getLogger(__name__)
-_WILDCARD_RUN = re.compile(r"([\?\*]+)")
-
-
def _reject_invalid_json(val: Any) -> None:
"""Do not allow Infinity, -Infinity, or NaN values in JSON."""
raise ValueError("Invalid JSON value: '%s'" % val)
@@ -185,56 +181,3 @@ def log_failure(
if not consumeErrors:
return failure
return None
-
-
-def glob_to_regex(glob: str, word_boundary: bool = False) -> Pattern:
- """Converts a glob to a compiled regex object.
-
- Args:
- glob: pattern to match
- word_boundary: If True, the pattern will be allowed to match at word boundaries
- anywhere in the string. Otherwise, the pattern is anchored at the start and
- end of the string.
-
- Returns:
- compiled regex pattern
- """
-
- # Patterns with wildcards must be simplified to avoid performance cliffs
- # - The glob `?**?**?` is equivalent to the glob `???*`
- # - The glob `???*` is equivalent to the regex `.{3,}`
- chunks = []
- for chunk in _WILDCARD_RUN.split(glob):
- # No wildcards? re.escape()
- if not _WILDCARD_RUN.match(chunk):
- chunks.append(re.escape(chunk))
- continue
-
- # Wildcards? Simplify.
- qmarks = chunk.count("?")
- if "*" in chunk:
- chunks.append(".{%d,}" % qmarks)
- else:
- chunks.append(".{%d}" % qmarks)
-
- res = "".join(chunks)
-
- if word_boundary:
- res = re_word_boundary(res)
- else:
- # \A anchors at start of string, \Z at end of string
- res = r"\A" + res + r"\Z"
-
- return re.compile(res, re.IGNORECASE)
-
-
-def re_word_boundary(r: str) -> str:
- """
- Adds word boundary characters to the start and end of an
- expression to require that the match occur as a whole word,
- but do so respecting the fact that strings starting or ending
- with non-word characters will change word boundaries.
- """
- # we can't use \b as it chokes on unicode. however \W seems to be okay
- # as shorthand for [^0-9A-Za-z_].
- return r"(^|\W)%s(\W|$)" % (r,)
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 20ce294209..150a04b53e 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+import abc
import collections
import inspect
import itertools
@@ -30,9 +31,11 @@ from typing import (
Iterator,
Optional,
Set,
+ Tuple,
TypeVar,
Union,
cast,
+ overload,
)
import attr
@@ -55,7 +58,26 @@ logger = logging.getLogger(__name__)
_T = TypeVar("_T")
-class ObservableDeferred(Generic[_T]):
+class AbstractObservableDeferred(Generic[_T], metaclass=abc.ABCMeta):
+ """Abstract base class defining the consumer interface of ObservableDeferred"""
+
+ __slots__ = ()
+
+ @abc.abstractmethod
+ def observe(self) -> "defer.Deferred[_T]":
+ """Add a new observer for this ObservableDeferred
+
+ This returns a brand new deferred that is resolved when the underlying
+ deferred is resolved. Interacting with the returned deferred does not
+ effect the underlying deferred.
+
+ Note that the returned Deferred doesn't follow the Synapse logcontext rules -
+ you will probably want to `make_deferred_yieldable` it.
+ """
+ ...
+
+
+class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
"""Wraps a deferred object so that we can add observer deferreds. These
observer deferreds do not affect the callback chain of the original
deferred.
@@ -234,6 +256,59 @@ def yieldable_gather_results(
).addErrback(unwrapFirstError)
+T1 = TypeVar("T1")
+T2 = TypeVar("T2")
+T3 = TypeVar("T3")
+
+
+@overload
+def gather_results(
+ deferredList: Tuple[()], consumeErrors: bool = ...
+) -> "defer.Deferred[Tuple[()]]":
+ ...
+
+
+@overload
+def gather_results(
+ deferredList: Tuple["defer.Deferred[T1]"],
+ consumeErrors: bool = ...,
+) -> "defer.Deferred[Tuple[T1]]":
+ ...
+
+
+@overload
+def gather_results(
+ deferredList: Tuple["defer.Deferred[T1]", "defer.Deferred[T2]"],
+ consumeErrors: bool = ...,
+) -> "defer.Deferred[Tuple[T1, T2]]":
+ ...
+
+
+@overload
+def gather_results(
+ deferredList: Tuple[
+ "defer.Deferred[T1]", "defer.Deferred[T2]", "defer.Deferred[T3]"
+ ],
+ consumeErrors: bool = ...,
+) -> "defer.Deferred[Tuple[T1, T2, T3]]":
+ ...
+
+
+def gather_results( # type: ignore[misc]
+ deferredList: Tuple["defer.Deferred[T1]", ...],
+ consumeErrors: bool = False,
+) -> "defer.Deferred[Tuple[T1, ...]]":
+ """Combines a tuple of `Deferred`s into a single `Deferred`.
+
+ Wraps `defer.gatherResults` to provide type annotations that support heterogenous
+ lists of `Deferred`s.
+ """
+ # The `type: ignore[misc]` above suppresses
+ # "Overloaded function implementation cannot produce return type of signature 1/2/3"
+ deferred = defer.gatherResults(deferredList, consumeErrors=consumeErrors)
+ return deferred.addCallback(tuple)
+
+
@attr.s(slots=True)
class _LinearizerEntry:
# The number of things executing.
@@ -352,7 +427,7 @@ class Linearizer:
logger.debug("Waiting to acquire linearizer lock %r for key %r", self.name, key)
- new_defer = make_deferred_yieldable(defer.Deferred())
+ new_defer: "defer.Deferred[None]" = make_deferred_yieldable(defer.Deferred())
entry.deferreds[new_defer] = 1
def cb(_r: None) -> "defer.Deferred[None]":
diff --git a/synapse/util/caches/cached_call.py b/synapse/util/caches/cached_call.py
index 470f4f91a5..e325f44da3 100644
--- a/synapse/util/caches/cached_call.py
+++ b/synapse/util/caches/cached_call.py
@@ -76,6 +76,7 @@ class CachedCall(Generic[TV]):
# Fire off the callable now if this is our first time
if not self._deferred:
+ assert self._callable is not None
self._deferred = run_in_background(self._callable)
# we will never need the callable again, so make sure it can be GCed
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index eb96f7e665..3f11a2f9dd 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -69,7 +69,6 @@ try:
sizer.exclude_refs((), None, "")
return sizer.asizeof(val, limit=100 if recurse else 0)
-
except ImportError:
def _get_size_of(val: Any, *, recurse: bool = True) -> int:
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 88ccf44337..a3eb5f741b 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -12,19 +12,37 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import logging
-from typing import Any, Awaitable, Callable, Dict, Generic, Optional, TypeVar
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Awaitable,
+ Callable,
+ Dict,
+ Generic,
+ Iterable,
+ Optional,
+ TypeVar,
+)
import attr
from twisted.internet import defer
from synapse.logging.context import make_deferred_yieldable, run_in_background
+from synapse.logging.opentracing import (
+ active_span,
+ start_active_span,
+ start_active_span_follows_from,
+)
from synapse.util import Clock
-from synapse.util.async_helpers import ObservableDeferred
+from synapse.util.async_helpers import AbstractObservableDeferred, ObservableDeferred
from synapse.util.caches import register_cache
logger = logging.getLogger(__name__)
+if TYPE_CHECKING:
+ import opentracing
+
# the type of the key in the cache
KV = TypeVar("KV")
@@ -54,6 +72,20 @@ class ResponseCacheContext(Generic[KV]):
"""
+@attr.s(auto_attribs=True)
+class ResponseCacheEntry:
+ result: AbstractObservableDeferred
+ """The (possibly incomplete) result of the operation.
+
+ Note that we continue to store an ObservableDeferred even after the operation
+ completes (rather than switching to an immediate value), since that makes it
+ easier to cache Failure results.
+ """
+
+ opentracing_span_context: "Optional[opentracing.SpanContext]"
+ """The opentracing span which generated/is generating the result"""
+
+
class ResponseCache(Generic[KV]):
"""
This caches a deferred response. Until the deferred completes it will be
@@ -63,10 +95,7 @@ class ResponseCache(Generic[KV]):
"""
def __init__(self, clock: Clock, name: str, timeout_ms: float = 0):
- # This is poorly-named: it includes both complete and incomplete results.
- # We keep complete results rather than switching to absolute values because
- # that makes it easier to cache Failure results.
- self.pending_result_cache: Dict[KV, ObservableDeferred] = {}
+ self._result_cache: Dict[KV, ResponseCacheEntry] = {}
self.clock = clock
self.timeout_sec = timeout_ms / 1000.0
@@ -75,56 +104,63 @@ class ResponseCache(Generic[KV]):
self._metrics = register_cache("response_cache", name, self, resizable=False)
def size(self) -> int:
- return len(self.pending_result_cache)
+ return len(self._result_cache)
def __len__(self) -> int:
return self.size()
- def get(self, key: KV) -> Optional[defer.Deferred]:
- """Look up the given key.
+ def keys(self) -> Iterable[KV]:
+ """Get the keys currently in the result cache
- Returns a new Deferred (which also doesn't follow the synapse
- logcontext rules). You will probably want to make_deferred_yieldable the result.
+ Returns both incomplete entries, and (if the timeout on this cache is non-zero),
+ complete entries which are still in the cache.
- If there is no entry for the key, returns None.
+ Note that the returned iterator is not safe in the face of concurrent execution:
+ behaviour is undefined if `wrap` is called during iteration.
+ """
+ return self._result_cache.keys()
+
+ def _get(self, key: KV) -> Optional[ResponseCacheEntry]:
+ """Look up the given key.
Args:
- key: key to get/set in the cache
+ key: key to get in the cache
Returns:
- None if there is no entry for this key; otherwise a deferred which
- resolves to the result.
+ The entry for this key, if any; else None.
"""
- result = self.pending_result_cache.get(key)
- if result is not None:
+ entry = self._result_cache.get(key)
+ if entry is not None:
self._metrics.inc_hits()
- return result.observe()
+ return entry
else:
self._metrics.inc_misses()
return None
def _set(
- self, context: ResponseCacheContext[KV], deferred: "defer.Deferred[RV]"
- ) -> "defer.Deferred[RV]":
+ self,
+ context: ResponseCacheContext[KV],
+ deferred: "defer.Deferred[RV]",
+ opentracing_span_context: "Optional[opentracing.SpanContext]",
+ ) -> ResponseCacheEntry:
"""Set the entry for the given key to the given deferred.
*deferred* should run its callbacks in the sentinel logcontext (ie,
you should wrap normal synapse deferreds with
synapse.logging.context.run_in_background).
- Returns a new Deferred (which also doesn't follow the synapse logcontext rules).
- You will probably want to make_deferred_yieldable the result.
-
Args:
context: Information about the cache miss
deferred: The deferred which resolves to the result.
+ opentracing_span_context: An opentracing span wrapping the calculation
Returns:
- A new deferred which resolves to the actual result.
+ The cache entry object.
"""
result = ObservableDeferred(deferred, consumeErrors=True)
key = context.cache_key
- self.pending_result_cache[key] = result
+ entry = ResponseCacheEntry(result, opentracing_span_context)
+ self._result_cache[key] = entry
def on_complete(r: RV) -> RV:
# if this cache has a non-zero timeout, and the callback has not cleared
@@ -132,18 +168,18 @@ class ResponseCache(Generic[KV]):
# its removal later.
if self.timeout_sec and context.should_cache:
self.clock.call_later(
- self.timeout_sec, self.pending_result_cache.pop, key, None
+ self.timeout_sec, self._result_cache.pop, key, None
)
else:
# otherwise, remove the result immediately.
- self.pending_result_cache.pop(key, None)
+ self._result_cache.pop(key, None)
return r
- # make sure we do this *after* adding the entry to pending_result_cache,
+ # make sure we do this *after* adding the entry to result_cache,
# in case the result is already complete (in which case flipping the order would
# leave us with a stuck entry in the cache).
result.addBoth(on_complete)
- return result.observe()
+ return entry
async def wrap(
self,
@@ -189,20 +225,41 @@ class ResponseCache(Generic[KV]):
Returns:
The result of the callback (from the cache, or otherwise)
"""
- result = self.get(key)
- if not result:
+ entry = self._get(key)
+ if not entry:
logger.debug(
"[%s]: no cached result for [%s], calculating new one", self._name, key
)
context = ResponseCacheContext(cache_key=key)
if cache_context:
kwargs["cache_context"] = context
- d = run_in_background(callback, *args, **kwargs)
- result = self._set(context, d)
- elif not isinstance(result, defer.Deferred) or result.called:
+
+ span_context: Optional[opentracing.SpanContext] = None
+
+ async def cb() -> RV:
+ # NB it is important that we do not `await` before setting span_context!
+ nonlocal span_context
+ with start_active_span(f"ResponseCache[{self._name}].calculate"):
+ span = active_span()
+ if span:
+ span_context = span.context
+ return await callback(*args, **kwargs)
+
+ d = run_in_background(cb)
+ entry = self._set(context, d, span_context)
+ return await make_deferred_yieldable(entry.result.observe())
+
+ result = entry.result.observe()
+ if result.called:
logger.info("[%s]: using completed cached result for [%s]", self._name, key)
else:
logger.info(
"[%s]: using incomplete cached result for [%s]", self._name, key
)
- return await make_deferred_yieldable(result)
+
+ span_context = entry.opentracing_span_context
+ with start_active_span_follows_from(
+ f"ResponseCache[{self._name}].wait",
+ contexts=(span_context,) if span_context else (),
+ ):
+ return await make_deferred_yieldable(result)
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index de2adacd70..46771a401b 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -142,6 +142,7 @@ class BackgroundFileConsumer:
def wait(self) -> "Deferred[None]":
"""Returns a deferred that resolves when finished writing to file"""
+ assert self._finished_deferred is not None
return make_deferred_yieldable(self._finished_deferred)
def _resume_paused_producer(self) -> None:
|