diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 60f0de70f7..b2a22dbd5c 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -17,6 +17,7 @@ import logging
import re
import attr
+from canonicaljson import json
from twisted.internet import defer, task
@@ -25,6 +26,19 @@ from synapse.logging import context
logger = logging.getLogger(__name__)
+def _reject_invalid_json(val):
+ """Do not allow Infinity, -Infinity, or NaN values in JSON."""
+ raise json.JSONDecodeError("Invalid JSON value: '%s'" % val)
+
+
+# Create a custom encoder to reduce the whitespace produced by JSON encoding and
+# ensure that valid JSON is produced.
+json_encoder = json.JSONEncoder(allow_nan=False, separators=(",", ":"))
+
+# Create a custom decoder to reject Python extensions to JSON.
+json_decoder = json.JSONDecoder(parse_constant=_reject_invalid_json)
+
+
def unwrapFirstError(failure):
# defer.gatherResults and DeferredLists wrap failures.
failure.trap(defer.FirstError)
@@ -55,7 +69,7 @@ class Clock(object):
return self._reactor.seconds()
def time_msec(self):
- """Returns the current system time in miliseconds since epoch."""
+ """Returns the current system time in milliseconds since epoch."""
return int(self.time() * 1000)
def looping_call(self, f, msec, *args, **kwargs):
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f7af2bca7f..f562770922 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -19,8 +19,6 @@ import logging
from contextlib import contextmanager
from typing import Dict, Sequence, Set, Union
-from six.moves import range
-
import attr
from twisted.internet import defer
@@ -95,7 +93,7 @@ class ObservableDeferred(object):
This returns a brand new deferred that is resolved when the underlying
deferred is resolved. Interacting with the returned deferred does not
- effect the underdlying deferred.
+ effect the underlying deferred.
"""
if not self._result:
d = defer.Deferred()
@@ -354,7 +352,7 @@ class ReadWriteLock(object):
# resolved when they release the lock).
#
# Read: We know its safe to acquire a read lock when the latest writer has
- # been resolved. The new reader is appeneded to the list of latest readers.
+ # been resolved. The new reader is appended to the list of latest readers.
#
# Write: We know its safe to acquire the write lock when both the latest
# writers and readers have been resolved. The new writer replaces the latest
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index cd48262420..49d9fddcf0 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -21,8 +21,6 @@ import threading
from typing import Any, Tuple, Union, cast
from weakref import WeakValueDictionary
-from six import itervalues
-
from prometheus_client import Gauge
from typing_extensions import Protocol
@@ -194,7 +192,7 @@ class Cache(object):
callbacks = [callback] if callback else []
self.check_thread()
observable = ObservableDeferred(value, consumeErrors=True)
- observer = defer.maybeDeferred(observable.observe)
+ observer = observable.observe()
entry = CacheEntry(deferred=observable, callbacks=callbacks)
existing_entry = self._pending_deferred_cache.pop(key, None)
@@ -281,22 +279,15 @@ class Cache(object):
def invalidate_all(self):
self.check_thread()
self.cache.clear()
- for entry in itervalues(self._pending_deferred_cache):
+ for entry in self._pending_deferred_cache.values():
entry.invalidate()
self._pending_deferred_cache.clear()
class _CacheDescriptorBase(object):
- def __init__(
- self, orig: _CachedFunction, num_args, inlineCallbacks, cache_context=False
- ):
+ def __init__(self, orig: _CachedFunction, num_args, cache_context=False):
self.orig = orig
- if inlineCallbacks:
- self.function_to_call = defer.inlineCallbacks(orig)
- else:
- self.function_to_call = orig
-
arg_spec = inspect.getfullargspec(orig)
all_args = arg_spec.args
@@ -366,7 +357,7 @@ class CacheDescriptor(_CacheDescriptorBase):
invalidated) by adding a special "cache_context" argument to the function
and passing that as a kwarg to all caches called. For example::
- @cachedInlineCallbacks(cache_context=True)
+ @cached(cache_context=True)
def foo(self, key, cache_context):
r1 = yield self.bar1(key, on_invalidate=cache_context.invalidate)
r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate)
@@ -384,17 +375,11 @@ class CacheDescriptor(_CacheDescriptorBase):
max_entries=1000,
num_args=None,
tree=False,
- inlineCallbacks=False,
cache_context=False,
iterable=False,
):
- super(CacheDescriptor, self).__init__(
- orig,
- num_args=num_args,
- inlineCallbacks=inlineCallbacks,
- cache_context=cache_context,
- )
+ super().__init__(orig, num_args=num_args, cache_context=cache_context)
self.max_entries = max_entries
self.tree = tree
@@ -467,9 +452,7 @@ class CacheDescriptor(_CacheDescriptorBase):
observer = defer.succeed(cached_result_d)
except KeyError:
- ret = defer.maybeDeferred(
- preserve_fn(self.function_to_call), obj, *args, **kwargs
- )
+ ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs)
def onErr(f):
cache.invalidate(cache_key)
@@ -512,23 +495,17 @@ class CacheListDescriptor(_CacheDescriptorBase):
of results.
"""
- def __init__(
- self, orig, cached_method_name, list_name, num_args=None, inlineCallbacks=False
- ):
+ def __init__(self, orig, cached_method_name, list_name, num_args=None):
"""
Args:
orig (function)
- cached_method_name (str): The name of the chached method.
+ cached_method_name (str): The name of the cached method.
list_name (str): Name of the argument which is the bulk lookup list
num_args (int): number of positional arguments (excluding ``self``,
but including list_name) to use as cache keys. Defaults to all
named args of the function.
- inlineCallbacks (bool): Whether orig is a generator that should
- be wrapped by defer.inlineCallbacks
"""
- super(CacheListDescriptor, self).__init__(
- orig, num_args=num_args, inlineCallbacks=inlineCallbacks
- )
+ super().__init__(orig, num_args=num_args)
self.list_name = list_name
@@ -633,7 +610,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
cached_defers.append(
defer.maybeDeferred(
- preserve_fn(self.function_to_call), **args_to_call
+ preserve_fn(self.orig), **args_to_call
).addCallbacks(complete_all, errback)
)
@@ -697,21 +674,7 @@ def cached(
)
-def cachedInlineCallbacks(
- max_entries=1000, num_args=None, tree=False, cache_context=False, iterable=False
-):
- return lambda orig: CacheDescriptor(
- orig,
- max_entries=max_entries,
- num_args=num_args,
- tree=tree,
- inlineCallbacks=True,
- cache_context=cache_context,
- iterable=iterable,
- )
-
-
-def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=False):
+def cachedList(cached_method_name, list_name, num_args=None):
"""Creates a descriptor that wraps a function in a `CacheListDescriptor`.
Used to do batch lookups for an already created cache. A single argument
@@ -727,8 +690,6 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal
do batch lookups in the cache.
num_args (int): Number of arguments to use as the key in the cache
(including list_name). Defaults to all named parameters.
- inlineCallbacks (bool): Should the function be wrapped in an
- `defer.inlineCallbacks`?
Example:
@@ -746,5 +707,4 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal
cached_method_name=cached_method_name,
list_name=list_name,
num_args=num_args,
- inlineCallbacks=inlineCallbacks,
)
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 2726b67b6d..89a3420f92 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,8 +16,6 @@
import logging
from collections import OrderedDict
-from six import iteritems, itervalues
-
from synapse.config import cache as cache_config
from synapse.metrics.background_process_metrics import run_as_background_process
from synapse.util.caches import register_cache
@@ -150,7 +148,7 @@ class ExpiringCache(object):
keys_to_delete = set()
- for key, cache_entry in iteritems(self._cache):
+ for key, cache_entry in self._cache.items():
if now - cache_entry.time > self._expiry_ms:
keys_to_delete.add(key)
@@ -170,7 +168,7 @@ class ExpiringCache(object):
def __len__(self):
if self.iterable:
- return sum(len(entry.value) for entry in itervalues(self._cache))
+ return sum(len(entry.value) for entry in self._cache.values())
else:
return len(self._cache)
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2a161bf244..c541bf4579 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -17,8 +17,6 @@ import logging
import math
from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
-from six import integer_types
-
from sortedcontainers import SortedDict
from synapse.types import Collection
@@ -88,7 +86,7 @@ class StreamChangeCache:
def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
"""Returns True if the entity may have been updated since stream_pos
"""
- assert type(stream_pos) in integer_types
+ assert isinstance(stream_pos, int)
if stream_pos < self._earliest_known_stream_pos:
self.metrics.inc_misses()
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 2ea4e4e911..ecd9948e79 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,7 +1,5 @@
from typing import Dict
-from six import itervalues
-
SENTINEL = object()
@@ -81,7 +79,7 @@ def iterate_tree_cache_entry(d):
can contain dicts.
"""
if isinstance(d, dict):
- for value_d in itervalues(d):
+ for value_d in d.values():
for value in iterate_tree_cache_entry(value_d):
yield value
else:
diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py
new file mode 100644
index 0000000000..23393cf49b
--- /dev/null
+++ b/synapse/util/daemonize.py
@@ -0,0 +1,137 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2012, 2013, 2014 Ilya Otyutskiy <ilya.otyutskiy@icloud.com>
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import atexit
+import fcntl
+import logging
+import os
+import signal
+import sys
+
+
+def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -> None:
+ """daemonize the current process
+
+ This calls fork(), and has the main process exit. When it returns we will be
+ running in the child process.
+ """
+
+ # If pidfile already exists, we should read pid from there; to overwrite it, if
+ # locking will fail, because locking attempt somehow purges the file contents.
+ if os.path.isfile(pid_file):
+ with open(pid_file, "r") as pid_fh:
+ old_pid = pid_fh.read()
+
+ # Create a lockfile so that only one instance of this daemon is running at any time.
+ try:
+ lock_fh = open(pid_file, "w")
+ except IOError:
+ print("Unable to create the pidfile.")
+ sys.exit(1)
+
+ try:
+ # Try to get an exclusive lock on the file. This will fail if another process
+ # has the file locked.
+ fcntl.flock(lock_fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
+ except IOError:
+ print("Unable to lock on the pidfile.")
+ # We need to overwrite the pidfile if we got here.
+ #
+ # XXX better to avoid overwriting it, surely. this looks racey as the pid file
+ # could be created between us trying to read it and us trying to lock it.
+ with open(pid_file, "w") as pid_fh:
+ pid_fh.write(old_pid)
+ sys.exit(1)
+
+ # Fork, creating a new process for the child.
+ process_id = os.fork()
+
+ if process_id != 0:
+ # parent process: exit.
+
+ # we use os._exit to avoid running the atexit handlers. In particular, that
+ # means we don't flush the logs. This is important because if we are using
+ # a MemoryHandler, we could have logs buffered which are now buffered in both
+ # the main and the child process, so if we let the main process flush the logs,
+ # we'll get two copies.
+ os._exit(0)
+
+ # This is the child process. Continue.
+
+ # Stop listening for signals that the parent process receives.
+ # This is done by getting a new process id.
+ # setpgrp() is an alternative to setsid().
+ # setsid puts the process in a new parent group and detaches its controlling
+ # terminal.
+
+ os.setsid()
+
+ # point stdin, stdout, stderr at /dev/null
+ devnull = "/dev/null"
+ if hasattr(os, "devnull"):
+ # Python has set os.devnull on this system, use it instead as it might be
+ # different than /dev/null.
+ devnull = os.devnull
+
+ devnull_fd = os.open(devnull, os.O_RDWR)
+ os.dup2(devnull_fd, 0)
+ os.dup2(devnull_fd, 1)
+ os.dup2(devnull_fd, 2)
+ os.close(devnull_fd)
+
+ # now that we have redirected stderr to /dev/null, any uncaught exceptions will
+ # get sent to /dev/null, so make sure we log them.
+ #
+ # (we don't normally expect reactor.run to raise any exceptions, but this will
+ # also catch any other uncaught exceptions before we get that far.)
+
+ def excepthook(type_, value, traceback):
+ logger.critical("Unhanded exception", exc_info=(type_, value, traceback))
+
+ sys.excepthook = excepthook
+
+ # Set umask to default to safe file permissions when running as a root daemon. 027
+ # is an octal number which we are typing as 0o27 for Python3 compatibility.
+ os.umask(0o27)
+
+ # Change to a known directory. If this isn't done, starting a daemon in a
+ # subdirectory that needs to be deleted results in "directory busy" errors.
+ os.chdir(chdir)
+
+ try:
+ lock_fh.write("%s" % (os.getpid()))
+ lock_fh.flush()
+ except IOError:
+ logger.error("Unable to write pid to the pidfile.")
+ print("Unable to write pid to the pidfile.")
+ sys.exit(1)
+
+ # write a log line on SIGTERM.
+ def sigterm(signum, frame):
+ logger.warning("Caught signal %s. Stopping daemon." % signum)
+ sys.exit(0)
+
+ signal.signal(signal.SIGTERM, sigterm)
+
+ # Cleanup pid file at exit.
+ def exit():
+ logger.warning("Stopping daemon.")
+ os.remove(pid_file)
+ sys.exit(0)
+
+ atexit.register(exit)
+
+ logger.warning("Starting daemon.")
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 45af8d3eeb..22a857a306 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -12,10 +12,12 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
-
+import inspect
import logging
from twisted.internet import defer
+from twisted.internet.defer import Deferred, fail, succeed
+from twisted.python import failure
from synapse.logging.context import make_deferred_yieldable, run_in_background
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -39,7 +41,7 @@ class Distributor(object):
Signals are named simply by strings.
TODO(paul): It would be nice to give signals stronger object identities,
- so we can attach metadata, docstrings, detect typoes, etc... But this
+ so we can attach metadata, docstrings, detect typos, etc... But this
model will do for today.
"""
@@ -79,6 +81,28 @@ class Distributor(object):
run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
+def maybeAwaitableDeferred(f, *args, **kw):
+ """
+ Invoke a function that may or may not return a Deferred or an Awaitable.
+
+ This is a modified version of twisted.internet.defer.maybeDeferred.
+ """
+ try:
+ result = f(*args, **kw)
+ except Exception:
+ return fail(failure.Failure(captureVars=Deferred.debug))
+
+ if isinstance(result, Deferred):
+ return result
+ # Handle the additional case of an awaitable being returned.
+ elif inspect.isawaitable(result):
+ return defer.ensureDeferred(result)
+ elif isinstance(result, failure.Failure):
+ return fail(result)
+ else:
+ return succeed(result)
+
+
class Signal(object):
"""A Signal is a dispatch point that stores a list of callables as
observers of it.
@@ -122,7 +146,7 @@ class Signal(object):
),
)
- return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
+ return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb)
deferreds = [run_in_background(do, o) for o in self.observers]
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 8b17d1c8b8..6a3f6177b1 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from six.moves import queue
+import queue
from twisted.internet import threads
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 9815bb8667..0e445e01d7 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from six import binary_type, text_type
-
from canonicaljson import json
from frozendict import frozendict
@@ -26,7 +24,7 @@ def freeze(o):
if isinstance(o, frozendict):
return o
- if isinstance(o, (binary_type, text_type)):
+ if isinstance(o, (bytes, str)):
return o
try:
@@ -41,7 +39,7 @@ def unfreeze(o):
if isinstance(o, (dict, frozendict)):
return dict({k: unfreeze(v) for k, v in o.items()})
- if isinstance(o, (binary_type, text_type)):
+ if isinstance(o, (bytes, str)):
return o
try:
@@ -65,5 +63,8 @@ def _handle_frozendict(obj):
)
-# A JSONEncoder which is capable of encoding frozendicts without barfing
-frozendict_json_encoder = json.JSONEncoder(default=_handle_frozendict)
+# A JSONEncoder which is capable of encoding frozendicts without barfing.
+# Additionally reduce the whitespace produced by JSON encoding.
+frozendict_json_encoder = json.JSONEncoder(
+ default=_handle_frozendict, separators=(",", ":"),
+)
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index ec61e14423..13775b43f9 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,14 +13,12 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-import inspect
import logging
from functools import wraps
+from typing import Any, Callable, Optional, TypeVar, cast
from prometheus_client import Counter
-from twisted.internet import defer
-
from synapse.logging.context import LoggingContext, current_context
from synapse.metrics import InFlightGauge
@@ -60,29 +58,37 @@ in_flight = InFlightGauge(
sub_metrics=["real_time_max", "real_time_sum"],
)
+T = TypeVar("T", bound=Callable[..., Any])
-def measure_func(name=None):
- def wrapper(func):
- block_name = func.__name__ if name is None else name
- if inspect.iscoroutinefunction(func):
+def measure_func(name: Optional[str] = None) -> Callable[[T], T]:
+ """
+ Used to decorate an async function with a `Measure` context manager.
+
+ Usage:
+
+ @measure_func()
+ async def foo(...):
+ ...
- @wraps(func)
- async def measured_func(self, *args, **kwargs):
- with Measure(self.clock, block_name):
- r = await func(self, *args, **kwargs)
- return r
+ Which is analogous to:
- else:
+ async def foo(...):
+ with Measure(...):
+ ...
+
+ """
+
+ def wrapper(func: T) -> T:
+ block_name = func.__name__ if name is None else name
- @wraps(func)
- @defer.inlineCallbacks
- def measured_func(self, *args, **kwargs):
- with Measure(self.clock, block_name):
- r = yield func(self, *args, **kwargs)
- return r
+ @wraps(func)
+ async def measured_func(self, *args, **kwargs):
+ with Measure(self.clock, block_name):
+ r = await func(self, *args, **kwargs)
+ return r
- return measured_func
+ return cast(T, measured_func)
return wrapper
diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py
index 2605f3c65b..54c046b6e1 100644
--- a/synapse/util/patch_inline_callbacks.py
+++ b/synapse/util/patch_inline_callbacks.py
@@ -192,7 +192,7 @@ def _check_yield_points(f: Callable, changes: List[str]):
result = yield d
except Exception:
# this will fish an earlier Failure out of the stack where possible, and
- # thus is preferable to passing in an exeception to the Failure
+ # thus is preferable to passing in an exception to the Failure
# constructor, since it results in less stack-mangling.
result = Failure()
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index af69587196..919988d3bc 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -15,14 +15,12 @@
import logging
import random
-from twisted.internet import defer
-
import synapse.logging.context
from synapse.api.errors import CodeMessageException
logger = logging.getLogger(__name__)
-# the intial backoff, after the first transaction fails
+# the initial backoff, after the first transaction fails
MIN_RETRY_INTERVAL = 10 * 60 * 1000
# how much we multiply the backoff by after each subsequent fail
@@ -54,8 +52,7 @@ class NotRetryingDestination(Exception):
self.destination = destination
-@defer.inlineCallbacks
-def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
+async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
"""For a given destination check if we have previously failed to
send a request there and are waiting before retrying the destination.
If we are not ready to retry the destination, this will raise a
@@ -73,9 +70,9 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
Example usage:
try:
- limiter = yield get_retry_limiter(destination, clock, store)
+ limiter = await get_retry_limiter(destination, clock, store)
with limiter:
- response = yield do_request()
+ response = await do_request()
except NotRetryingDestination:
# We aren't ready to retry that destination.
raise
@@ -83,7 +80,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
failure_ts = None
retry_last_ts, retry_interval = (0, 0)
- retry_timings = yield store.get_destination_retry_timings(destination)
+ retry_timings = await store.get_destination_retry_timings(destination)
if retry_timings:
failure_ts = retry_timings["failure_ts"]
@@ -174,7 +171,7 @@ class RetryDestinationLimiter(object):
# has been decommissioned.
# If we get a 401, then we should probably back off since they
# won't accept our requests for at least a while.
- # 429 is us being aggresively rate limited, so lets rate limit
+ # 429 is us being aggressively rate limited, so lets rate limit
# ourselves.
if exc_val.code == 404 and self.backoff_on_404:
valid_err_code = False
@@ -222,10 +219,9 @@ class RetryDestinationLimiter(object):
if self.failure_ts is None:
self.failure_ts = retry_last_ts
- @defer.inlineCallbacks
- def store_retry_timings():
+ async def store_retry_timings():
try:
- yield self.store.set_destination_retry_timings(
+ await self.store.set_destination_retry_timings(
self.destination,
self.failure_ts,
retry_last_ts,
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 08c86e92b8..61d96a6c28 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -17,16 +17,14 @@ import itertools
import random
import re
import string
-from collections import Iterable
+from collections.abc import Iterable
from synapse.api.errors import Codes, SynapseError
_string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
# https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-register-email-requesttoken
-# Note: The : character is allowed here for older clients, but will be removed in a
-# future release. Context: https://github.com/matrix-org/synapse/issues/6766
-client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-\:]+$")
+client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-]+$")
# random_string and random_string_with_symbols are used for a range of things,
# some cryptographically important, some less so. We use SystemRandom to make sure
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
index 3ec1dfb0c2..43c2e0ac23 100644
--- a/synapse/util/threepids.py
+++ b/synapse/util/threepids.py
@@ -48,3 +48,26 @@ def check_3pid_allowed(hs, medium, address):
return True
return False
+
+
+def canonicalise_email(address: str) -> str:
+ """'Canonicalise' email address
+ Case folding of local part of email address and lowercase domain part
+ See MSC2265, https://github.com/matrix-org/matrix-doc/pull/2265
+
+ Args:
+ address: email address to be canonicalised
+ Returns:
+ The canonical form of the email address
+ Raises:
+ ValueError if the address could not be parsed.
+ """
+
+ address = address.strip()
+
+ parts = address.split("@")
+ if len(parts) != 2:
+ logger.debug("Couldn't parse email address %s", address)
+ raise ValueError("Unable to parse email address")
+
+ return parts[0].casefold() + "@" + parts[1].lower()
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 9bf6a44f75..023beb5ede 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-from six.moves import range
-
class _Entry(object):
__slots__ = ["end_key", "queue"]
|