summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py7
-rw-r--r--synapse/util/async_helpers.py131
-rw-r--r--synapse/util/caches/descriptors.py85
-rw-r--r--synapse/util/caches/treecache.py2
-rw-r--r--synapse/util/check_dependencies.py86
-rw-r--r--synapse/util/templates.py5
6 files changed, 244 insertions, 72 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 58b4220ff3..d8046b7553 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -31,13 +31,6 @@ from synapse.logging import context
 if typing.TYPE_CHECKING:
     pass
 
-# FIXME Mjolnir imports glob_to_regex from this file, but it was moved to
-#       matrix_common.
-#       As a temporary workaround, we import glob_to_regex here for
-#       compatibility with current versions of Mjolnir.
-# See https://github.com/matrix-org/mjolnir/pull/174
-from matrix_common.regex import glob_to_regex  # noqa
-
 logger = logging.getLogger(__name__)
 
 
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 60c03a66fd..6a8e844d63 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -18,9 +18,10 @@ import collections
 import inspect
 import itertools
 import logging
-from contextlib import contextmanager
+from contextlib import asynccontextmanager, contextmanager
 from typing import (
     Any,
+    AsyncIterator,
     Awaitable,
     Callable,
     Collection,
@@ -40,7 +41,7 @@ from typing import (
 )
 
 import attr
-from typing_extensions import ContextManager
+from typing_extensions import AsyncContextManager, Literal
 
 from twisted.internet import defer
 from twisted.internet.defer import CancelledError
@@ -96,6 +97,10 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
 
     __slots__ = ["_deferred", "_observers", "_result"]
 
+    _deferred: "defer.Deferred[_T]"
+    _observers: Union[List["defer.Deferred[_T]"], Tuple[()]]
+    _result: Union[None, Tuple[Literal[True], _T], Tuple[Literal[False], Failure]]
+
     def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
         object.__setattr__(self, "_deferred", deferred)
         object.__setattr__(self, "_result", None)
@@ -158,12 +163,14 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
         effect the underlying deferred.
         """
         if not self._result:
+            assert isinstance(self._observers, list)
             d: "defer.Deferred[_T]" = defer.Deferred()
             self._observers.append(d)
             return d
+        elif self._result[0]:
+            return defer.succeed(self._result[1])
         else:
-            success, res = self._result
-            return defer.succeed(res) if success else defer.fail(res)
+            return defer.fail(self._result[1])
 
     def observers(self) -> "Collection[defer.Deferred[_T]]":
         return self._observers
@@ -175,6 +182,8 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
         return self._result is not None and self._result[0] is True
 
     def get_result(self) -> Union[_T, Failure]:
+        if self._result is None:
+            raise ValueError(f"{self!r} has no result yet")
         return self._result[1]
 
     def __getattr__(self, name: str) -> Any:
@@ -483,7 +492,7 @@ class ReadWriteLock:
 
     Example:
 
-        with await read_write_lock.read("test_key"):
+        async with read_write_lock.read("test_key"):
             # do some work
     """
 
@@ -506,22 +515,24 @@ class ReadWriteLock:
         # Latest writer queued
         self.key_to_current_writer: Dict[str, defer.Deferred] = {}
 
-    async def read(self, key: str) -> ContextManager:
-        new_defer: "defer.Deferred[None]" = defer.Deferred()
-
-        curr_readers = self.key_to_current_readers.setdefault(key, set())
-        curr_writer = self.key_to_current_writer.get(key, None)
+    def read(self, key: str) -> AsyncContextManager:
+        @asynccontextmanager
+        async def _ctx_manager() -> AsyncIterator[None]:
+            new_defer: "defer.Deferred[None]" = defer.Deferred()
 
-        curr_readers.add(new_defer)
+            curr_readers = self.key_to_current_readers.setdefault(key, set())
+            curr_writer = self.key_to_current_writer.get(key, None)
 
-        # We wait for the latest writer to finish writing. We can safely ignore
-        # any existing readers... as they're readers.
-        if curr_writer:
-            await make_deferred_yieldable(curr_writer)
+            curr_readers.add(new_defer)
 
-        @contextmanager
-        def _ctx_manager() -> Iterator[None]:
             try:
+                # We wait for the latest writer to finish writing. We can safely ignore
+                # any existing readers... as they're readers.
+                # May raise a `CancelledError` if the `Deferred` wrapping us is
+                # cancelled. The `Deferred` we are waiting on must not be cancelled,
+                # since we do not own it.
+                if curr_writer:
+                    await make_deferred_yieldable(stop_cancellation(curr_writer))
                 yield
             finally:
                 with PreserveLoggingContext():
@@ -530,29 +541,35 @@ class ReadWriteLock:
 
         return _ctx_manager()
 
-    async def write(self, key: str) -> ContextManager:
-        new_defer: "defer.Deferred[None]" = defer.Deferred()
+    def write(self, key: str) -> AsyncContextManager:
+        @asynccontextmanager
+        async def _ctx_manager() -> AsyncIterator[None]:
+            new_defer: "defer.Deferred[None]" = defer.Deferred()
 
-        curr_readers = self.key_to_current_readers.get(key, set())
-        curr_writer = self.key_to_current_writer.get(key, None)
+            curr_readers = self.key_to_current_readers.get(key, set())
+            curr_writer = self.key_to_current_writer.get(key, None)
 
-        # We wait on all latest readers and writer.
-        to_wait_on = list(curr_readers)
-        if curr_writer:
-            to_wait_on.append(curr_writer)
+            # We wait on all latest readers and writer.
+            to_wait_on = list(curr_readers)
+            if curr_writer:
+                to_wait_on.append(curr_writer)
 
-        # We can clear the list of current readers since the new writer waits
-        # for them to finish.
-        curr_readers.clear()
-        self.key_to_current_writer[key] = new_defer
+            # We can clear the list of current readers since `new_defer` waits
+            # for them to finish.
+            curr_readers.clear()
+            self.key_to_current_writer[key] = new_defer
 
-        await make_deferred_yieldable(defer.gatherResults(to_wait_on))
-
-        @contextmanager
-        def _ctx_manager() -> Iterator[None]:
+            to_wait_on_defer = defer.gatherResults(to_wait_on)
             try:
+                # Wait for all current readers and the latest writer to finish.
+                # May raise a `CancelledError` immediately after the wait if the
+                # `Deferred` wrapping us is cancelled. We must only release the lock
+                # once we have acquired it, hence the use of `delay_cancellation`
+                # rather than `stop_cancellation`.
+                await make_deferred_yieldable(delay_cancellation(to_wait_on_defer))
                 yield
             finally:
+                # Release the lock.
                 with PreserveLoggingContext():
                     new_defer.callback(None)
                 # `self.key_to_current_writer[key]` may be missing if there was another
@@ -678,12 +695,48 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
             Synapse logcontext rules.
 
     Returns:
-        A new `Deferred`, which will contain the result of the original `Deferred`,
-        but will not propagate cancellation through to the original. When cancelled,
-        the new `Deferred` will fail with a `CancelledError` and will not follow the
-        Synapse logcontext rules. `make_deferred_yieldable` should be used to wrap
-        the new `Deferred`.
+        A new `Deferred`, which will contain the result of the original `Deferred`.
+        The new `Deferred` will not propagate cancellation through to the original.
+        When cancelled, the new `Deferred` will fail with a `CancelledError`.
+
+        The new `Deferred` will not follow the Synapse logcontext rules and should be
+        wrapped with `make_deferred_yieldable`.
     """
-    new_deferred: defer.Deferred[T] = defer.Deferred()
+    new_deferred: "defer.Deferred[T]" = defer.Deferred()
+    deferred.chainDeferred(new_deferred)
+    return new_deferred
+
+
+def delay_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
+    """Delay cancellation of a `Deferred` until it resolves.
+
+    Has the same effect as `stop_cancellation`, but the returned `Deferred` will not
+    resolve with a `CancelledError` until the original `Deferred` resolves.
+
+    Args:
+        deferred: The `Deferred` to protect against cancellation. May optionally follow
+            the Synapse logcontext rules.
+
+    Returns:
+        A new `Deferred`, which will contain the result of the original `Deferred`.
+        The new `Deferred` will not propagate cancellation through to the original.
+        When cancelled, the new `Deferred` will wait until the original `Deferred`
+        resolves before failing with a `CancelledError`.
+
+        The new `Deferred` will follow the Synapse logcontext rules if `deferred`
+        follows the Synapse logcontext rules. Otherwise the new `Deferred` should be
+        wrapped with `make_deferred_yieldable`.
+    """
+
+    def handle_cancel(new_deferred: "defer.Deferred[T]") -> None:
+        # before the new deferred is cancelled, we `pause` it to stop the cancellation
+        # propagating. we then `unpause` it once the wrapped deferred completes, to
+        # propagate the exception.
+        new_deferred.pause()
+        new_deferred.errback(Failure(CancelledError()))
+
+        deferred.addBoth(lambda _: new_deferred.unpause())
+
+    new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
     deferred.chainDeferred(new_deferred)
     return new_deferred
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 1cdead02f1..eda92d864d 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -20,6 +20,7 @@ from typing import (
     Any,
     Awaitable,
     Callable,
+    Collection,
     Dict,
     Generic,
     Hashable,
@@ -40,6 +41,7 @@ from twisted.python.failure import Failure
 
 from synapse.logging.context import make_deferred_yieldable, preserve_fn
 from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import delay_cancellation
 from synapse.util.caches.deferred_cache import DeferredCache
 from synapse.util.caches.lrucache import LruCache
 
@@ -69,6 +71,7 @@ class _CacheDescriptorBase:
         self,
         orig: Callable[..., Any],
         num_args: Optional[int],
+        uncached_args: Optional[Collection[str]] = None,
         cache_context: bool = False,
     ):
         self.orig = orig
@@ -76,6 +79,13 @@ class _CacheDescriptorBase:
         arg_spec = inspect.getfullargspec(orig)
         all_args = arg_spec.args
 
+        # There's no reason that keyword-only arguments couldn't be supported,
+        # but right now they're buggy so do not allow them.
+        if arg_spec.kwonlyargs:
+            raise ValueError(
+                "_CacheDescriptorBase does not support keyword-only arguments."
+            )
+
         if "cache_context" in all_args:
             if not cache_context:
                 raise ValueError(
@@ -88,6 +98,9 @@ class _CacheDescriptorBase:
                 " named `cache_context`"
             )
 
+        if num_args is not None and uncached_args is not None:
+            raise ValueError("Cannot provide both num_args and uncached_args")
+
         if num_args is None:
             num_args = len(all_args) - 1
             if cache_context:
@@ -105,6 +118,12 @@ class _CacheDescriptorBase:
         # list of the names of the args used as the cache key
         self.arg_names = all_args[1 : num_args + 1]
 
+        # If there are args to not cache on, filter them out (and fix the size of num_args).
+        if uncached_args is not None:
+            include_arg_in_cache_key = [n not in uncached_args for n in self.arg_names]
+        else:
+            include_arg_in_cache_key = [True] * len(self.arg_names)
+
         # self.arg_defaults is a map of arg name to its default value for each
         # argument that has a default value
         if arg_spec.defaults:
@@ -119,8 +138,8 @@ class _CacheDescriptorBase:
 
         self.add_cache_context = cache_context
 
-        self.cache_key_builder = get_cache_key_builder(
-            self.arg_names, self.arg_defaults
+        self.cache_key_builder = _get_cache_key_builder(
+            self.arg_names, include_arg_in_cache_key, self.arg_defaults
         )
 
 
@@ -130,8 +149,7 @@ class _LruCachedFunction(Generic[F]):
 
 
 def lru_cache(
-    max_entries: int = 1000,
-    cache_context: bool = False,
+    *, max_entries: int = 1000, cache_context: bool = False
 ) -> Callable[[F], _LruCachedFunction[F]]:
     """A method decorator that applies a memoizing cache around the function.
 
@@ -186,7 +204,9 @@ class LruCacheDescriptor(_CacheDescriptorBase):
         max_entries: int = 1000,
         cache_context: bool = False,
     ):
-        super().__init__(orig, num_args=None, cache_context=cache_context)
+        super().__init__(
+            orig, num_args=None, uncached_args=None, cache_context=cache_context
+        )
         self.max_entries = max_entries
 
     def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
@@ -260,6 +280,9 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
         num_args: number of positional arguments (excluding ``self`` and
             ``cache_context``) to use as cache keys. Defaults to all named
             args of the function.
+        uncached_args: a list of argument names to not use as the cache key.
+            (``self`` and ``cache_context`` are always ignored.) Cannot be used
+            with num_args.
         tree:
         cache_context:
         iterable:
@@ -273,12 +296,18 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
         orig: Callable[..., Any],
         max_entries: int = 1000,
         num_args: Optional[int] = None,
+        uncached_args: Optional[Collection[str]] = None,
         tree: bool = False,
         cache_context: bool = False,
         iterable: bool = False,
         prune_unread_entries: bool = True,
     ):
-        super().__init__(orig, num_args=num_args, cache_context=cache_context)
+        super().__init__(
+            orig,
+            num_args=num_args,
+            uncached_args=uncached_args,
+            cache_context=cache_context,
+        )
 
         if tree and self.num_args < 2:
             raise RuntimeError(
@@ -322,6 +351,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
                 ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs)
                 ret = cache.set(cache_key, ret, callback=invalidate_callback)
 
+                # We started a new call to `self.orig`, so we must always wait for it to
+                # complete. Otherwise we might mark our current logging context as
+                # finished while `self.orig` is still using it in the background.
+                ret = delay_cancellation(ret)
+
             return make_deferred_yieldable(ret)
 
         wrapped = cast(_CachedFunction, _wrapped)
@@ -369,7 +403,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
                 but including list_name) to use as cache keys. Defaults to all
                 named args of the function.
         """
-        super().__init__(orig, num_args=num_args)
+        super().__init__(orig, num_args=num_args, uncached_args=None)
 
         self.list_name = list_name
 
@@ -482,6 +516,11 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
                 d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
                     lambda _: results, unwrapFirstError
                 )
+                if missing:
+                    # We started a new call to `self.orig`, so we must always wait for it to
+                    # complete. Otherwise we might mark our current logging context as
+                    # finished while `self.orig` is still using it in the background.
+                    d = delay_cancellation(d)
                 return make_deferred_yieldable(d)
             else:
                 return defer.succeed(results)
@@ -530,8 +569,10 @@ class _CacheContext:
 
 
 def cached(
+    *,
     max_entries: int = 1000,
     num_args: Optional[int] = None,
+    uncached_args: Optional[Collection[str]] = None,
     tree: bool = False,
     cache_context: bool = False,
     iterable: bool = False,
@@ -541,6 +582,7 @@ def cached(
         orig,
         max_entries=max_entries,
         num_args=num_args,
+        uncached_args=uncached_args,
         tree=tree,
         cache_context=cache_context,
         iterable=iterable,
@@ -551,7 +593,7 @@ def cached(
 
 
 def cachedList(
-    cached_method_name: str, list_name: str, num_args: Optional[int] = None
+    *, cached_method_name: str, list_name: str, num_args: Optional[int] = None
 ) -> Callable[[F], _CachedFunction[F]]:
     """Creates a descriptor that wraps a function in a `CacheListDescriptor`.
 
@@ -590,13 +632,16 @@ def cachedList(
     return cast(Callable[[F], _CachedFunction[F]], func)
 
 
-def get_cache_key_builder(
-    param_names: Sequence[str], param_defaults: Mapping[str, Any]
+def _get_cache_key_builder(
+    param_names: Sequence[str],
+    include_params: Sequence[bool],
+    param_defaults: Mapping[str, Any],
 ) -> Callable[[Sequence[Any], Mapping[str, Any]], CacheKey]:
     """Construct a function which will build cache keys suitable for a cached function
 
     Args:
         param_names: list of formal parameter names for the cached function
+        include_params: list of bools of whether to include the parameter name in the cache key
         param_defaults: a mapping from parameter name to default value for that param
 
     Returns:
@@ -608,6 +653,7 @@ def get_cache_key_builder(
 
     if len(param_names) == 1:
         nm = param_names[0]
+        assert include_params[0] is True
 
         def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
             if nm in kwargs:
@@ -620,13 +666,18 @@ def get_cache_key_builder(
     else:
 
         def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
-            return tuple(_get_cache_key_gen(param_names, param_defaults, args, kwargs))
+            return tuple(
+                _get_cache_key_gen(
+                    param_names, include_params, param_defaults, args, kwargs
+                )
+            )
 
     return get_cache_key
 
 
 def _get_cache_key_gen(
     param_names: Iterable[str],
+    include_params: Iterable[bool],
     param_defaults: Mapping[str, Any],
     args: Sequence[Any],
     kwargs: Mapping[str, Any],
@@ -637,16 +688,18 @@ def _get_cache_key_gen(
     This is essentially the same operation as `inspect.getcallargs`, but optimised so
     that we don't need to inspect the target function for each call.
     """
-
     # We loop through each arg name, looking up if its in the `kwargs`,
     # otherwise using the next argument in `args`. If there are no more
     # args then we try looking the arg name up in the defaults.
     pos = 0
-    for nm in param_names:
+    for nm, inc in zip(param_names, include_params):
         if nm in kwargs:
-            yield kwargs[nm]
+            if inc:
+                yield kwargs[nm]
         elif pos < len(args):
-            yield args[pos]
+            if inc:
+                yield args[pos]
             pos += 1
         else:
-            yield param_defaults[nm]
+            if inc:
+                yield param_defaults[nm]
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 563845f867..e78305f787 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -22,8 +22,6 @@ class TreeCacheNode(dict):
     leaves.
     """
 
-    pass
-
 
 class TreeCache:
     """
diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py
index 3a1f6b3c75..66f1da7502 100644
--- a/synapse/util/check_dependencies.py
+++ b/synapse/util/check_dependencies.py
@@ -1,3 +1,25 @@
+#  Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+#  Licensed under the Apache License, Version 2.0 (the "License");
+#  you may not use this file except in compliance with the License.
+#  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+"""
+This module exposes a single function which checks synapse's dependencies are present
+and correctly versioned. It makes use of `importlib.metadata` to do so. The details
+are a bit murky: there's no easy way to get a map from "extras" to the packages they
+require. But this is probably just symptomatic of Python's package management.
+"""
+
 import logging
 from typing import Iterable, NamedTuple, Optional
 
@@ -10,6 +32,8 @@ try:
 except ImportError:
     import importlib_metadata as metadata  # type: ignore[no-redef]
 
+__all__ = ["check_requirements"]
+
 
 class DependencyException(Exception):
     @property
@@ -29,7 +53,17 @@ class DependencyException(Exception):
             yield '"' + i + '"'
 
 
-EXTRAS = set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra"))
+DEV_EXTRAS = {"lint", "mypy", "test", "dev"}
+RUNTIME_EXTRAS = (
+    set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS
+)
+VERSION = metadata.version(DISTRIBUTION_NAME)
+
+
+def _is_dev_dependency(req: Requirement) -> bool:
+    return req.marker is not None and any(
+        req.marker.evaluate({"extra": e}) for e in DEV_EXTRAS
+    )
 
 
 class Dependency(NamedTuple):
@@ -43,6 +77,9 @@ def _generic_dependencies() -> Iterable[Dependency]:
     assert requirements is not None
     for raw_requirement in requirements:
         req = Requirement(raw_requirement)
+        if _is_dev_dependency(req):
+            continue
+
         # https://packaging.pypa.io/en/latest/markers.html#usage notes that
         #   > Evaluating an extra marker with no environment is an error
         # so we pass in a dummy empty extra value here.
@@ -56,6 +93,8 @@ def _dependencies_for_extra(extra: str) -> Iterable[Dependency]:
     assert requirements is not None
     for raw_requirement in requirements:
         req = Requirement(raw_requirement)
+        if _is_dev_dependency(req):
+            continue
         # Exclude mandatory deps by only selecting deps needed with this extra.
         if (
             req.marker is not None
@@ -67,18 +106,39 @@ def _dependencies_for_extra(extra: str) -> Iterable[Dependency]:
 
 def _not_installed(requirement: Requirement, extra: Optional[str] = None) -> str:
     if extra:
-        return f"Need {requirement.name} for {extra}, but it is not installed"
+        return (
+            f"Synapse {VERSION} needs {requirement.name} for {extra}, "
+            f"but it is not installed"
+        )
     else:
-        return f"Need {requirement.name}, but it is not installed"
+        return f"Synapse {VERSION} needs {requirement.name}, but it is not installed"
 
 
 def _incorrect_version(
     requirement: Requirement, got: str, extra: Optional[str] = None
 ) -> str:
     if extra:
-        return f"Need {requirement} for {extra}, but got {requirement.name}=={got}"
+        return (
+            f"Synapse {VERSION} needs {requirement} for {extra}, "
+            f"but got {requirement.name}=={got}"
+        )
+    else:
+        return (
+            f"Synapse {VERSION} needs {requirement}, but got {requirement.name}=={got}"
+        )
+
+
+def _no_reported_version(requirement: Requirement, extra: Optional[str] = None) -> str:
+    if extra:
+        return (
+            f"Synapse {VERSION} needs {requirement} for {extra}, "
+            f"but can't determine {requirement.name}'s version"
+        )
     else:
-        return f"Need {requirement}, but got {requirement.name}=={got}"
+        return (
+            f"Synapse {VERSION} needs {requirement}, "
+            f"but can't determine {requirement.name}'s version"
+        )
 
 
 def check_requirements(extra: Optional[str] = None) -> None:
@@ -100,10 +160,10 @@ def check_requirements(extra: Optional[str] = None) -> None:
     # First work out which dependencies are required, and which are optional.
     if extra is None:
         dependencies = _generic_dependencies()
-    elif extra in EXTRAS:
+    elif extra in RUNTIME_EXTRAS:
         dependencies = _dependencies_for_extra(extra)
     else:
-        raise ValueError(f"Synapse does not provide the feature '{extra}'")
+        raise ValueError(f"Synapse {VERSION} does not provide the feature '{extra}'")
 
     deps_unfulfilled = []
     errors = []
@@ -116,7 +176,17 @@ def check_requirements(extra: Optional[str] = None) -> None:
                 deps_unfulfilled.append(requirement.name)
                 errors.append(_not_installed(requirement, extra))
         else:
-            if not requirement.specifier.contains(dist.version):
+            if dist.version is None:
+                # This shouldn't happen---it suggests a borked virtualenv. (See #12223)
+                # Try to give a vaguely helpful error message anyway.
+                # Type-ignore: the annotations don't reflect reality: see
+                #     https://github.com/python/typeshed/issues/7513
+                #     https://bugs.python.org/issue47060
+                deps_unfulfilled.append(requirement.name)  # type: ignore[unreachable]
+                errors.append(_no_reported_version(requirement, extra))
+
+            # We specify prereleases=True to allow prereleases such as RCs.
+            elif not requirement.specifier.contains(dist.version, prereleases=True):
                 deps_unfulfilled.append(requirement.name)
                 errors.append(_incorrect_version(requirement, dist.version, extra))
 
diff --git a/synapse/util/templates.py b/synapse/util/templates.py
index 12941065ca..fb758b7180 100644
--- a/synapse/util/templates.py
+++ b/synapse/util/templates.py
@@ -64,6 +64,7 @@ def build_jinja_env(
         {
             "format_ts": _format_ts_filter,
             "mxc_to_http": _create_mxc_to_http_filter(config.server.public_baseurl),
+            "localpart_from_email": _localpart_from_email_filter,
         }
     )
 
@@ -112,3 +113,7 @@ def _create_mxc_to_http_filter(
 
 def _format_ts_filter(value: int, format: str) -> str:
     return time.strftime(format, time.localtime(value / 1000))
+
+
+def _localpart_from_email_filter(address: str) -> str:
+    return address.rsplit("@", 1)[0]