diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 58b4220ff3..d8046b7553 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -31,13 +31,6 @@ from synapse.logging import context
if typing.TYPE_CHECKING:
pass
-# FIXME Mjolnir imports glob_to_regex from this file, but it was moved to
-# matrix_common.
-# As a temporary workaround, we import glob_to_regex here for
-# compatibility with current versions of Mjolnir.
-# See https://github.com/matrix-org/mjolnir/pull/174
-from matrix_common.regex import glob_to_regex # noqa
-
logger = logging.getLogger(__name__)
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index 60c03a66fd..6a8e844d63 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -18,9 +18,10 @@ import collections
import inspect
import itertools
import logging
-from contextlib import contextmanager
+from contextlib import asynccontextmanager, contextmanager
from typing import (
Any,
+ AsyncIterator,
Awaitable,
Callable,
Collection,
@@ -40,7 +41,7 @@ from typing import (
)
import attr
-from typing_extensions import ContextManager
+from typing_extensions import AsyncContextManager, Literal
from twisted.internet import defer
from twisted.internet.defer import CancelledError
@@ -96,6 +97,10 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
__slots__ = ["_deferred", "_observers", "_result"]
+ _deferred: "defer.Deferred[_T]"
+ _observers: Union[List["defer.Deferred[_T]"], Tuple[()]]
+ _result: Union[None, Tuple[Literal[True], _T], Tuple[Literal[False], Failure]]
+
def __init__(self, deferred: "defer.Deferred[_T]", consumeErrors: bool = False):
object.__setattr__(self, "_deferred", deferred)
object.__setattr__(self, "_result", None)
@@ -158,12 +163,14 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
effect the underlying deferred.
"""
if not self._result:
+ assert isinstance(self._observers, list)
d: "defer.Deferred[_T]" = defer.Deferred()
self._observers.append(d)
return d
+ elif self._result[0]:
+ return defer.succeed(self._result[1])
else:
- success, res = self._result
- return defer.succeed(res) if success else defer.fail(res)
+ return defer.fail(self._result[1])
def observers(self) -> "Collection[defer.Deferred[_T]]":
return self._observers
@@ -175,6 +182,8 @@ class ObservableDeferred(Generic[_T], AbstractObservableDeferred[_T]):
return self._result is not None and self._result[0] is True
def get_result(self) -> Union[_T, Failure]:
+ if self._result is None:
+ raise ValueError(f"{self!r} has no result yet")
return self._result[1]
def __getattr__(self, name: str) -> Any:
@@ -483,7 +492,7 @@ class ReadWriteLock:
Example:
- with await read_write_lock.read("test_key"):
+ async with read_write_lock.read("test_key"):
# do some work
"""
@@ -506,22 +515,24 @@ class ReadWriteLock:
# Latest writer queued
self.key_to_current_writer: Dict[str, defer.Deferred] = {}
- async def read(self, key: str) -> ContextManager:
- new_defer: "defer.Deferred[None]" = defer.Deferred()
-
- curr_readers = self.key_to_current_readers.setdefault(key, set())
- curr_writer = self.key_to_current_writer.get(key, None)
+ def read(self, key: str) -> AsyncContextManager:
+ @asynccontextmanager
+ async def _ctx_manager() -> AsyncIterator[None]:
+ new_defer: "defer.Deferred[None]" = defer.Deferred()
- curr_readers.add(new_defer)
+ curr_readers = self.key_to_current_readers.setdefault(key, set())
+ curr_writer = self.key_to_current_writer.get(key, None)
- # We wait for the latest writer to finish writing. We can safely ignore
- # any existing readers... as they're readers.
- if curr_writer:
- await make_deferred_yieldable(curr_writer)
+ curr_readers.add(new_defer)
- @contextmanager
- def _ctx_manager() -> Iterator[None]:
try:
+ # We wait for the latest writer to finish writing. We can safely ignore
+ # any existing readers... as they're readers.
+ # May raise a `CancelledError` if the `Deferred` wrapping us is
+ # cancelled. The `Deferred` we are waiting on must not be cancelled,
+ # since we do not own it.
+ if curr_writer:
+ await make_deferred_yieldable(stop_cancellation(curr_writer))
yield
finally:
with PreserveLoggingContext():
@@ -530,29 +541,35 @@ class ReadWriteLock:
return _ctx_manager()
- async def write(self, key: str) -> ContextManager:
- new_defer: "defer.Deferred[None]" = defer.Deferred()
+ def write(self, key: str) -> AsyncContextManager:
+ @asynccontextmanager
+ async def _ctx_manager() -> AsyncIterator[None]:
+ new_defer: "defer.Deferred[None]" = defer.Deferred()
- curr_readers = self.key_to_current_readers.get(key, set())
- curr_writer = self.key_to_current_writer.get(key, None)
+ curr_readers = self.key_to_current_readers.get(key, set())
+ curr_writer = self.key_to_current_writer.get(key, None)
- # We wait on all latest readers and writer.
- to_wait_on = list(curr_readers)
- if curr_writer:
- to_wait_on.append(curr_writer)
+ # We wait on all latest readers and writer.
+ to_wait_on = list(curr_readers)
+ if curr_writer:
+ to_wait_on.append(curr_writer)
- # We can clear the list of current readers since the new writer waits
- # for them to finish.
- curr_readers.clear()
- self.key_to_current_writer[key] = new_defer
+ # We can clear the list of current readers since `new_defer` waits
+ # for them to finish.
+ curr_readers.clear()
+ self.key_to_current_writer[key] = new_defer
- await make_deferred_yieldable(defer.gatherResults(to_wait_on))
-
- @contextmanager
- def _ctx_manager() -> Iterator[None]:
+ to_wait_on_defer = defer.gatherResults(to_wait_on)
try:
+ # Wait for all current readers and the latest writer to finish.
+ # May raise a `CancelledError` immediately after the wait if the
+ # `Deferred` wrapping us is cancelled. We must only release the lock
+ # once we have acquired it, hence the use of `delay_cancellation`
+ # rather than `stop_cancellation`.
+ await make_deferred_yieldable(delay_cancellation(to_wait_on_defer))
yield
finally:
+ # Release the lock.
with PreserveLoggingContext():
new_defer.callback(None)
# `self.key_to_current_writer[key]` may be missing if there was another
@@ -678,12 +695,48 @@ def stop_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
Synapse logcontext rules.
Returns:
- A new `Deferred`, which will contain the result of the original `Deferred`,
- but will not propagate cancellation through to the original. When cancelled,
- the new `Deferred` will fail with a `CancelledError` and will not follow the
- Synapse logcontext rules. `make_deferred_yieldable` should be used to wrap
- the new `Deferred`.
+ A new `Deferred`, which will contain the result of the original `Deferred`.
+ The new `Deferred` will not propagate cancellation through to the original.
+ When cancelled, the new `Deferred` will fail with a `CancelledError`.
+
+ The new `Deferred` will not follow the Synapse logcontext rules and should be
+ wrapped with `make_deferred_yieldable`.
"""
- new_deferred: defer.Deferred[T] = defer.Deferred()
+ new_deferred: "defer.Deferred[T]" = defer.Deferred()
+ deferred.chainDeferred(new_deferred)
+ return new_deferred
+
+
+def delay_cancellation(deferred: "defer.Deferred[T]") -> "defer.Deferred[T]":
+ """Delay cancellation of a `Deferred` until it resolves.
+
+ Has the same effect as `stop_cancellation`, but the returned `Deferred` will not
+ resolve with a `CancelledError` until the original `Deferred` resolves.
+
+ Args:
+ deferred: The `Deferred` to protect against cancellation. May optionally follow
+ the Synapse logcontext rules.
+
+ Returns:
+ A new `Deferred`, which will contain the result of the original `Deferred`.
+ The new `Deferred` will not propagate cancellation through to the original.
+ When cancelled, the new `Deferred` will wait until the original `Deferred`
+ resolves before failing with a `CancelledError`.
+
+ The new `Deferred` will follow the Synapse logcontext rules if `deferred`
+ follows the Synapse logcontext rules. Otherwise the new `Deferred` should be
+ wrapped with `make_deferred_yieldable`.
+ """
+
+ def handle_cancel(new_deferred: "defer.Deferred[T]") -> None:
+ # before the new deferred is cancelled, we `pause` it to stop the cancellation
+ # propagating. we then `unpause` it once the wrapped deferred completes, to
+ # propagate the exception.
+ new_deferred.pause()
+ new_deferred.errback(Failure(CancelledError()))
+
+ deferred.addBoth(lambda _: new_deferred.unpause())
+
+ new_deferred: "defer.Deferred[T]" = defer.Deferred(handle_cancel)
deferred.chainDeferred(new_deferred)
return new_deferred
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 1cdead02f1..eda92d864d 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -20,6 +20,7 @@ from typing import (
Any,
Awaitable,
Callable,
+ Collection,
Dict,
Generic,
Hashable,
@@ -40,6 +41,7 @@ from twisted.python.failure import Failure
from synapse.logging.context import make_deferred_yieldable, preserve_fn
from synapse.util import unwrapFirstError
+from synapse.util.async_helpers import delay_cancellation
from synapse.util.caches.deferred_cache import DeferredCache
from synapse.util.caches.lrucache import LruCache
@@ -69,6 +71,7 @@ class _CacheDescriptorBase:
self,
orig: Callable[..., Any],
num_args: Optional[int],
+ uncached_args: Optional[Collection[str]] = None,
cache_context: bool = False,
):
self.orig = orig
@@ -76,6 +79,13 @@ class _CacheDescriptorBase:
arg_spec = inspect.getfullargspec(orig)
all_args = arg_spec.args
+ # There's no reason that keyword-only arguments couldn't be supported,
+ # but right now they're buggy so do not allow them.
+ if arg_spec.kwonlyargs:
+ raise ValueError(
+ "_CacheDescriptorBase does not support keyword-only arguments."
+ )
+
if "cache_context" in all_args:
if not cache_context:
raise ValueError(
@@ -88,6 +98,9 @@ class _CacheDescriptorBase:
" named `cache_context`"
)
+ if num_args is not None and uncached_args is not None:
+ raise ValueError("Cannot provide both num_args and uncached_args")
+
if num_args is None:
num_args = len(all_args) - 1
if cache_context:
@@ -105,6 +118,12 @@ class _CacheDescriptorBase:
# list of the names of the args used as the cache key
self.arg_names = all_args[1 : num_args + 1]
+ # If there are args to not cache on, filter them out (and fix the size of num_args).
+ if uncached_args is not None:
+ include_arg_in_cache_key = [n not in uncached_args for n in self.arg_names]
+ else:
+ include_arg_in_cache_key = [True] * len(self.arg_names)
+
# self.arg_defaults is a map of arg name to its default value for each
# argument that has a default value
if arg_spec.defaults:
@@ -119,8 +138,8 @@ class _CacheDescriptorBase:
self.add_cache_context = cache_context
- self.cache_key_builder = get_cache_key_builder(
- self.arg_names, self.arg_defaults
+ self.cache_key_builder = _get_cache_key_builder(
+ self.arg_names, include_arg_in_cache_key, self.arg_defaults
)
@@ -130,8 +149,7 @@ class _LruCachedFunction(Generic[F]):
def lru_cache(
- max_entries: int = 1000,
- cache_context: bool = False,
+ *, max_entries: int = 1000, cache_context: bool = False
) -> Callable[[F], _LruCachedFunction[F]]:
"""A method decorator that applies a memoizing cache around the function.
@@ -186,7 +204,9 @@ class LruCacheDescriptor(_CacheDescriptorBase):
max_entries: int = 1000,
cache_context: bool = False,
):
- super().__init__(orig, num_args=None, cache_context=cache_context)
+ super().__init__(
+ orig, num_args=None, uncached_args=None, cache_context=cache_context
+ )
self.max_entries = max_entries
def __get__(self, obj: Optional[Any], owner: Optional[Type]) -> Callable[..., Any]:
@@ -260,6 +280,9 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
num_args: number of positional arguments (excluding ``self`` and
``cache_context``) to use as cache keys. Defaults to all named
args of the function.
+ uncached_args: a list of argument names to not use as the cache key.
+ (``self`` and ``cache_context`` are always ignored.) Cannot be used
+ with num_args.
tree:
cache_context:
iterable:
@@ -273,12 +296,18 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
orig: Callable[..., Any],
max_entries: int = 1000,
num_args: Optional[int] = None,
+ uncached_args: Optional[Collection[str]] = None,
tree: bool = False,
cache_context: bool = False,
iterable: bool = False,
prune_unread_entries: bool = True,
):
- super().__init__(orig, num_args=num_args, cache_context=cache_context)
+ super().__init__(
+ orig,
+ num_args=num_args,
+ uncached_args=uncached_args,
+ cache_context=cache_context,
+ )
if tree and self.num_args < 2:
raise RuntimeError(
@@ -322,6 +351,11 @@ class DeferredCacheDescriptor(_CacheDescriptorBase):
ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs)
ret = cache.set(cache_key, ret, callback=invalidate_callback)
+ # We started a new call to `self.orig`, so we must always wait for it to
+ # complete. Otherwise we might mark our current logging context as
+ # finished while `self.orig` is still using it in the background.
+ ret = delay_cancellation(ret)
+
return make_deferred_yieldable(ret)
wrapped = cast(_CachedFunction, _wrapped)
@@ -369,7 +403,7 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
but including list_name) to use as cache keys. Defaults to all
named args of the function.
"""
- super().__init__(orig, num_args=num_args)
+ super().__init__(orig, num_args=num_args, uncached_args=None)
self.list_name = list_name
@@ -482,6 +516,11 @@ class DeferredCacheListDescriptor(_CacheDescriptorBase):
d = defer.gatherResults(cached_defers, consumeErrors=True).addCallbacks(
lambda _: results, unwrapFirstError
)
+ if missing:
+ # We started a new call to `self.orig`, so we must always wait for it to
+ # complete. Otherwise we might mark our current logging context as
+ # finished while `self.orig` is still using it in the background.
+ d = delay_cancellation(d)
return make_deferred_yieldable(d)
else:
return defer.succeed(results)
@@ -530,8 +569,10 @@ class _CacheContext:
def cached(
+ *,
max_entries: int = 1000,
num_args: Optional[int] = None,
+ uncached_args: Optional[Collection[str]] = None,
tree: bool = False,
cache_context: bool = False,
iterable: bool = False,
@@ -541,6 +582,7 @@ def cached(
orig,
max_entries=max_entries,
num_args=num_args,
+ uncached_args=uncached_args,
tree=tree,
cache_context=cache_context,
iterable=iterable,
@@ -551,7 +593,7 @@ def cached(
def cachedList(
- cached_method_name: str, list_name: str, num_args: Optional[int] = None
+ *, cached_method_name: str, list_name: str, num_args: Optional[int] = None
) -> Callable[[F], _CachedFunction[F]]:
"""Creates a descriptor that wraps a function in a `CacheListDescriptor`.
@@ -590,13 +632,16 @@ def cachedList(
return cast(Callable[[F], _CachedFunction[F]], func)
-def get_cache_key_builder(
- param_names: Sequence[str], param_defaults: Mapping[str, Any]
+def _get_cache_key_builder(
+ param_names: Sequence[str],
+ include_params: Sequence[bool],
+ param_defaults: Mapping[str, Any],
) -> Callable[[Sequence[Any], Mapping[str, Any]], CacheKey]:
"""Construct a function which will build cache keys suitable for a cached function
Args:
param_names: list of formal parameter names for the cached function
+ include_params: list of bools of whether to include the parameter name in the cache key
param_defaults: a mapping from parameter name to default value for that param
Returns:
@@ -608,6 +653,7 @@ def get_cache_key_builder(
if len(param_names) == 1:
nm = param_names[0]
+ assert include_params[0] is True
def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
if nm in kwargs:
@@ -620,13 +666,18 @@ def get_cache_key_builder(
else:
def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
- return tuple(_get_cache_key_gen(param_names, param_defaults, args, kwargs))
+ return tuple(
+ _get_cache_key_gen(
+ param_names, include_params, param_defaults, args, kwargs
+ )
+ )
return get_cache_key
def _get_cache_key_gen(
param_names: Iterable[str],
+ include_params: Iterable[bool],
param_defaults: Mapping[str, Any],
args: Sequence[Any],
kwargs: Mapping[str, Any],
@@ -637,16 +688,18 @@ def _get_cache_key_gen(
This is essentially the same operation as `inspect.getcallargs`, but optimised so
that we don't need to inspect the target function for each call.
"""
-
# We loop through each arg name, looking up if its in the `kwargs`,
# otherwise using the next argument in `args`. If there are no more
# args then we try looking the arg name up in the defaults.
pos = 0
- for nm in param_names:
+ for nm, inc in zip(param_names, include_params):
if nm in kwargs:
- yield kwargs[nm]
+ if inc:
+ yield kwargs[nm]
elif pos < len(args):
- yield args[pos]
+ if inc:
+ yield args[pos]
pos += 1
else:
- yield param_defaults[nm]
+ if inc:
+ yield param_defaults[nm]
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 563845f867..e78305f787 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -22,8 +22,6 @@ class TreeCacheNode(dict):
leaves.
"""
- pass
-
class TreeCache:
"""
diff --git a/synapse/util/check_dependencies.py b/synapse/util/check_dependencies.py
index 3a1f6b3c75..66f1da7502 100644
--- a/synapse/util/check_dependencies.py
+++ b/synapse/util/check_dependencies.py
@@ -1,3 +1,25 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+This module exposes a single function which checks synapse's dependencies are present
+and correctly versioned. It makes use of `importlib.metadata` to do so. The details
+are a bit murky: there's no easy way to get a map from "extras" to the packages they
+require. But this is probably just symptomatic of Python's package management.
+"""
+
import logging
from typing import Iterable, NamedTuple, Optional
@@ -10,6 +32,8 @@ try:
except ImportError:
import importlib_metadata as metadata # type: ignore[no-redef]
+__all__ = ["check_requirements"]
+
class DependencyException(Exception):
@property
@@ -29,7 +53,17 @@ class DependencyException(Exception):
yield '"' + i + '"'
-EXTRAS = set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra"))
+DEV_EXTRAS = {"lint", "mypy", "test", "dev"}
+RUNTIME_EXTRAS = (
+ set(metadata.metadata(DISTRIBUTION_NAME).get_all("Provides-Extra")) - DEV_EXTRAS
+)
+VERSION = metadata.version(DISTRIBUTION_NAME)
+
+
+def _is_dev_dependency(req: Requirement) -> bool:
+ return req.marker is not None and any(
+ req.marker.evaluate({"extra": e}) for e in DEV_EXTRAS
+ )
class Dependency(NamedTuple):
@@ -43,6 +77,9 @@ def _generic_dependencies() -> Iterable[Dependency]:
assert requirements is not None
for raw_requirement in requirements:
req = Requirement(raw_requirement)
+ if _is_dev_dependency(req):
+ continue
+
# https://packaging.pypa.io/en/latest/markers.html#usage notes that
# > Evaluating an extra marker with no environment is an error
# so we pass in a dummy empty extra value here.
@@ -56,6 +93,8 @@ def _dependencies_for_extra(extra: str) -> Iterable[Dependency]:
assert requirements is not None
for raw_requirement in requirements:
req = Requirement(raw_requirement)
+ if _is_dev_dependency(req):
+ continue
# Exclude mandatory deps by only selecting deps needed with this extra.
if (
req.marker is not None
@@ -67,18 +106,39 @@ def _dependencies_for_extra(extra: str) -> Iterable[Dependency]:
def _not_installed(requirement: Requirement, extra: Optional[str] = None) -> str:
if extra:
- return f"Need {requirement.name} for {extra}, but it is not installed"
+ return (
+ f"Synapse {VERSION} needs {requirement.name} for {extra}, "
+ f"but it is not installed"
+ )
else:
- return f"Need {requirement.name}, but it is not installed"
+ return f"Synapse {VERSION} needs {requirement.name}, but it is not installed"
def _incorrect_version(
requirement: Requirement, got: str, extra: Optional[str] = None
) -> str:
if extra:
- return f"Need {requirement} for {extra}, but got {requirement.name}=={got}"
+ return (
+ f"Synapse {VERSION} needs {requirement} for {extra}, "
+ f"but got {requirement.name}=={got}"
+ )
+ else:
+ return (
+ f"Synapse {VERSION} needs {requirement}, but got {requirement.name}=={got}"
+ )
+
+
+def _no_reported_version(requirement: Requirement, extra: Optional[str] = None) -> str:
+ if extra:
+ return (
+ f"Synapse {VERSION} needs {requirement} for {extra}, "
+ f"but can't determine {requirement.name}'s version"
+ )
else:
- return f"Need {requirement}, but got {requirement.name}=={got}"
+ return (
+ f"Synapse {VERSION} needs {requirement}, "
+ f"but can't determine {requirement.name}'s version"
+ )
def check_requirements(extra: Optional[str] = None) -> None:
@@ -100,10 +160,10 @@ def check_requirements(extra: Optional[str] = None) -> None:
# First work out which dependencies are required, and which are optional.
if extra is None:
dependencies = _generic_dependencies()
- elif extra in EXTRAS:
+ elif extra in RUNTIME_EXTRAS:
dependencies = _dependencies_for_extra(extra)
else:
- raise ValueError(f"Synapse does not provide the feature '{extra}'")
+ raise ValueError(f"Synapse {VERSION} does not provide the feature '{extra}'")
deps_unfulfilled = []
errors = []
@@ -116,7 +176,17 @@ def check_requirements(extra: Optional[str] = None) -> None:
deps_unfulfilled.append(requirement.name)
errors.append(_not_installed(requirement, extra))
else:
- if not requirement.specifier.contains(dist.version):
+ if dist.version is None:
+ # This shouldn't happen---it suggests a borked virtualenv. (See #12223)
+ # Try to give a vaguely helpful error message anyway.
+ # Type-ignore: the annotations don't reflect reality: see
+ # https://github.com/python/typeshed/issues/7513
+ # https://bugs.python.org/issue47060
+ deps_unfulfilled.append(requirement.name) # type: ignore[unreachable]
+ errors.append(_no_reported_version(requirement, extra))
+
+ # We specify prereleases=True to allow prereleases such as RCs.
+ elif not requirement.specifier.contains(dist.version, prereleases=True):
deps_unfulfilled.append(requirement.name)
errors.append(_incorrect_version(requirement, dist.version, extra))
diff --git a/synapse/util/templates.py b/synapse/util/templates.py
index 12941065ca..fb758b7180 100644
--- a/synapse/util/templates.py
+++ b/synapse/util/templates.py
@@ -64,6 +64,7 @@ def build_jinja_env(
{
"format_ts": _format_ts_filter,
"mxc_to_http": _create_mxc_to_http_filter(config.server.public_baseurl),
+ "localpart_from_email": _localpart_from_email_filter,
}
)
@@ -112,3 +113,7 @@ def _create_mxc_to_http_filter(
def _format_ts_filter(value: int, format: str) -> str:
return time.strftime(format, time.localtime(value / 1000))
+
+
+def _localpart_from_email_filter(address: str) -> str:
+ return address.rsplit("@", 1)[0]
|