summary refs log tree commit diff
path: root/synapse/util
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/util')
-rw-r--r--synapse/util/__init__.py18
-rw-r--r--synapse/util/async_helpers.py30
-rw-r--r--synapse/util/caches/__init__.py2
-rw-r--r--synapse/util/caches/descriptors.py108
-rw-r--r--synapse/util/caches/dictionary_cache.py4
-rw-r--r--synapse/util/caches/expiringcache.py10
-rw-r--r--synapse/util/caches/lrucache.py4
-rw-r--r--synapse/util/caches/response_cache.py2
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/caches/treecache.py8
-rw-r--r--synapse/util/caches/ttlcache.py4
-rw-r--r--synapse/util/daemonize.py137
-rw-r--r--synapse/util/distributor.py34
-rw-r--r--synapse/util/file_consumer.py4
-rw-r--r--synapse/util/frozenutils.py13
-rw-r--r--synapse/util/jsonobject.py2
-rw-r--r--synapse/util/metrics.py48
-rw-r--r--synapse/util/patch_inline_callbacks.py2
-rw-r--r--synapse/util/ratelimitutils.py4
-rw-r--r--synapse/util/retryutils.py22
-rw-r--r--synapse/util/stringutils.py6
-rw-r--r--synapse/util/threepids.py23
-rw-r--r--synapse/util/wheel_timer.py6
23 files changed, 329 insertions, 166 deletions
diff --git a/synapse/util/__init__.py b/synapse/util/__init__.py
index 60f0de70f7..a13f11f8d8 100644
--- a/synapse/util/__init__.py
+++ b/synapse/util/__init__.py
@@ -17,6 +17,7 @@ import logging
 import re
 
 import attr
+from canonicaljson import json
 
 from twisted.internet import defer, task
 
@@ -25,6 +26,19 @@ from synapse.logging import context
 logger = logging.getLogger(__name__)
 
 
+def _reject_invalid_json(val):
+    """Do not allow Infinity, -Infinity, or NaN values in JSON."""
+    raise ValueError("Invalid JSON value: '%s'" % val)
+
+
+# Create a custom encoder to reduce the whitespace produced by JSON encoding and
+# ensure that valid JSON is produced.
+json_encoder = json.JSONEncoder(allow_nan=False, separators=(",", ":"))
+
+# Create a custom decoder to reject Python extensions to JSON.
+json_decoder = json.JSONDecoder(parse_constant=_reject_invalid_json)
+
+
 def unwrapFirstError(failure):
     # defer.gatherResults and DeferredLists wrap failures.
     failure.trap(defer.FirstError)
@@ -32,7 +46,7 @@ def unwrapFirstError(failure):
 
 
 @attr.s
-class Clock(object):
+class Clock:
     """
     A Clock wraps a Twisted reactor and provides utilities on top of it.
 
@@ -55,7 +69,7 @@ class Clock(object):
         return self._reactor.seconds()
 
     def time_msec(self):
-        """Returns the current system time in miliseconds since epoch."""
+        """Returns the current system time in milliseconds since epoch."""
         return int(self.time() * 1000)
 
     def looping_call(self, f, msec, *args, **kwargs):
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f7af2bca7f..bb57e27beb 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -19,9 +19,8 @@ import logging
 from contextlib import contextmanager
 from typing import Dict, Sequence, Set, Union
 
-from six.moves import range
-
 import attr
+from typing_extensions import ContextManager
 
 from twisted.internet import defer
 from twisted.internet.defer import CancelledError
@@ -37,7 +36,7 @@ from synapse.util import Clock, unwrapFirstError
 logger = logging.getLogger(__name__)
 
 
-class ObservableDeferred(object):
+class ObservableDeferred:
     """Wraps a deferred object so that we can add observer deferreds. These
     observer deferreds do not affect the callback chain of the original
     deferred.
@@ -95,7 +94,7 @@ class ObservableDeferred(object):
 
         This returns a brand new deferred that is resolved when the underlying
         deferred is resolved. Interacting with the returned deferred does not
-        effect the underdlying deferred.
+        effect the underlying deferred.
         """
         if not self._result:
             d = defer.Deferred()
@@ -189,7 +188,7 @@ def yieldable_gather_results(func, iter, *args, **kwargs):
     ).addErrback(unwrapFirstError)
 
 
-class Linearizer(object):
+class Linearizer:
     """Limits concurrent access to resources based on a key. Useful to ensure
     only a few things happen at a time on a given resource.
 
@@ -339,12 +338,12 @@ class Linearizer(object):
         return new_defer
 
 
-class ReadWriteLock(object):
-    """A deferred style read write lock.
+class ReadWriteLock:
+    """An async read write lock.
 
     Example:
 
-        with (yield read_write_lock.read("test_key")):
+        with await read_write_lock.read("test_key"):
             # do some work
     """
 
@@ -354,7 +353,7 @@ class ReadWriteLock(object):
     # resolved when they release the lock).
     #
     # Read: We know its safe to acquire a read lock when the latest writer has
-    # been resolved. The new reader is appeneded to the list of latest readers.
+    # been resolved. The new reader is appended to the list of latest readers.
     #
     # Write: We know its safe to acquire the write lock when both the latest
     # writers and readers have been resolved. The new writer replaces the latest
@@ -367,8 +366,7 @@ class ReadWriteLock(object):
         # Latest writer queued
         self.key_to_current_writer = {}  # type: Dict[str, defer.Deferred]
 
-    @defer.inlineCallbacks
-    def read(self, key):
+    async def read(self, key: str) -> ContextManager:
         new_defer = defer.Deferred()
 
         curr_readers = self.key_to_current_readers.setdefault(key, set())
@@ -378,7 +376,8 @@ class ReadWriteLock(object):
 
         # We wait for the latest writer to finish writing. We can safely ignore
         # any existing readers... as they're readers.
-        yield make_deferred_yieldable(curr_writer)
+        if curr_writer:
+            await make_deferred_yieldable(curr_writer)
 
         @contextmanager
         def _ctx_manager():
@@ -390,8 +389,7 @@ class ReadWriteLock(object):
 
         return _ctx_manager()
 
-    @defer.inlineCallbacks
-    def write(self, key):
+    async def write(self, key: str) -> ContextManager:
         new_defer = defer.Deferred()
 
         curr_readers = self.key_to_current_readers.get(key, set())
@@ -407,7 +405,7 @@ class ReadWriteLock(object):
         curr_readers.clear()
         self.key_to_current_writer[key] = new_defer
 
-        yield make_deferred_yieldable(defer.gatherResults(to_wait_on))
+        await make_deferred_yieldable(defer.gatherResults(to_wait_on))
 
         @contextmanager
         def _ctx_manager():
@@ -504,7 +502,7 @@ def timeout_deferred(deferred, timeout, reactor, on_timeout_cancel=None):
 
 
 @attr.s(slots=True, frozen=True)
-class DoneAwaitable(object):
+class DoneAwaitable:
     """Simple awaitable that returns the provided value.
     """
 
diff --git a/synapse/util/caches/__init__.py b/synapse/util/caches/__init__.py
index dd356bf156..237f588658 100644
--- a/synapse/util/caches/__init__.py
+++ b/synapse/util/caches/__init__.py
@@ -43,7 +43,7 @@ response_cache_total = Gauge("synapse_util_caches_response_cache:total", "", ["n
 
 
 @attr.s
-class CacheMetric(object):
+class CacheMetric:
 
     _cache = attr.ib()
     _cache_type = attr.ib(type=str)
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index cd48262420..98b34f2223 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -18,13 +18,10 @@ import functools
 import inspect
 import logging
 import threading
-from typing import Any, Tuple, Union, cast
+from typing import Any, Callable, Generic, Optional, Tuple, TypeVar, Union, cast
 from weakref import WeakValueDictionary
 
-from six import itervalues
-
 from prometheus_client import Gauge
-from typing_extensions import Protocol
 
 from twisted.internet import defer
 
@@ -40,8 +37,10 @@ logger = logging.getLogger(__name__)
 
 CacheKey = Union[Tuple, Any]
 
+F = TypeVar("F", bound=Callable[..., Any])
+
 
-class _CachedFunction(Protocol):
+class _CachedFunction(Generic[F]):
     invalidate = None  # type: Any
     invalidate_all = None  # type: Any
     invalidate_many = None  # type: Any
@@ -49,8 +48,11 @@ class _CachedFunction(Protocol):
     cache = None  # type: Any
     num_args = None  # type: Any
 
-    def __name__(self):
-        ...
+    __name__ = None  # type: str
+
+    # Note: This function signature is actually fiddled with by the synapse mypy
+    # plugin to a) make it a bound method, and b) remove any `cache_context` arg.
+    __call__ = None  # type: F
 
 
 cache_pending_metric = Gauge(
@@ -62,7 +64,7 @@ cache_pending_metric = Gauge(
 _CacheSentinel = object()
 
 
-class CacheEntry(object):
+class CacheEntry:
     __slots__ = ["deferred", "callbacks", "invalidated"]
 
     def __init__(self, deferred, callbacks):
@@ -78,7 +80,7 @@ class CacheEntry(object):
             self.callbacks.clear()
 
 
-class Cache(object):
+class Cache:
     __slots__ = (
         "cache",
         "name",
@@ -125,7 +127,7 @@ class Cache(object):
 
         self.name = name
         self.keylen = keylen
-        self.thread = None
+        self.thread = None  # type: Optional[threading.Thread]
         self.metrics = register_cache(
             "cache",
             name,
@@ -194,7 +196,7 @@ class Cache(object):
         callbacks = [callback] if callback else []
         self.check_thread()
         observable = ObservableDeferred(value, consumeErrors=True)
-        observer = defer.maybeDeferred(observable.observe)
+        observer = observable.observe()
         entry = CacheEntry(deferred=observable, callbacks=callbacks)
 
         existing_entry = self._pending_deferred_cache.pop(key, None)
@@ -281,22 +283,15 @@ class Cache(object):
     def invalidate_all(self):
         self.check_thread()
         self.cache.clear()
-        for entry in itervalues(self._pending_deferred_cache):
+        for entry in self._pending_deferred_cache.values():
             entry.invalidate()
         self._pending_deferred_cache.clear()
 
 
-class _CacheDescriptorBase(object):
-    def __init__(
-        self, orig: _CachedFunction, num_args, inlineCallbacks, cache_context=False
-    ):
+class _CacheDescriptorBase:
+    def __init__(self, orig: _CachedFunction, num_args, cache_context=False):
         self.orig = orig
 
-        if inlineCallbacks:
-            self.function_to_call = defer.inlineCallbacks(orig)
-        else:
-            self.function_to_call = orig
-
         arg_spec = inspect.getfullargspec(orig)
         all_args = arg_spec.args
 
@@ -366,7 +361,7 @@ class CacheDescriptor(_CacheDescriptorBase):
     invalidated) by adding a special "cache_context" argument to the function
     and passing that as a kwarg to all caches called. For example::
 
-        @cachedInlineCallbacks(cache_context=True)
+        @cached(cache_context=True)
         def foo(self, key, cache_context):
             r1 = yield self.bar1(key, on_invalidate=cache_context.invalidate)
             r2 = yield self.bar2(key, on_invalidate=cache_context.invalidate)
@@ -384,17 +379,11 @@ class CacheDescriptor(_CacheDescriptorBase):
         max_entries=1000,
         num_args=None,
         tree=False,
-        inlineCallbacks=False,
         cache_context=False,
         iterable=False,
     ):
 
-        super(CacheDescriptor, self).__init__(
-            orig,
-            num_args=num_args,
-            inlineCallbacks=inlineCallbacks,
-            cache_context=cache_context,
-        )
+        super().__init__(orig, num_args=num_args, cache_context=cache_context)
 
         self.max_entries = max_entries
         self.tree = tree
@@ -467,9 +456,7 @@ class CacheDescriptor(_CacheDescriptorBase):
                     observer = defer.succeed(cached_result_d)
 
             except KeyError:
-                ret = defer.maybeDeferred(
-                    preserve_fn(self.function_to_call), obj, *args, **kwargs
-                )
+                ret = defer.maybeDeferred(preserve_fn(self.orig), obj, *args, **kwargs)
 
                 def onErr(f):
                     cache.invalidate(cache_key)
@@ -512,23 +499,17 @@ class CacheListDescriptor(_CacheDescriptorBase):
     of results.
     """
 
-    def __init__(
-        self, orig, cached_method_name, list_name, num_args=None, inlineCallbacks=False
-    ):
+    def __init__(self, orig, cached_method_name, list_name, num_args=None):
         """
         Args:
             orig (function)
-            cached_method_name (str): The name of the chached method.
+            cached_method_name (str): The name of the cached method.
             list_name (str): Name of the argument which is the bulk lookup list
             num_args (int): number of positional arguments (excluding ``self``,
                 but including list_name) to use as cache keys. Defaults to all
                 named args of the function.
-            inlineCallbacks (bool): Whether orig is a generator that should
-                be wrapped by defer.inlineCallbacks
         """
-        super(CacheListDescriptor, self).__init__(
-            orig, num_args=num_args, inlineCallbacks=inlineCallbacks
-        )
+        super().__init__(orig, num_args=num_args)
 
         self.list_name = list_name
 
@@ -633,7 +614,7 @@ class CacheListDescriptor(_CacheDescriptorBase):
 
                 cached_defers.append(
                     defer.maybeDeferred(
-                        preserve_fn(self.function_to_call), **args_to_call
+                        preserve_fn(self.orig), **args_to_call
                     ).addCallbacks(complete_all, errback)
                 )
 
@@ -685,9 +666,13 @@ class _CacheContext:
 
 
 def cached(
-    max_entries=1000, num_args=None, tree=False, cache_context=False, iterable=False
-):
-    return lambda orig: CacheDescriptor(
+    max_entries: int = 1000,
+    num_args: Optional[int] = None,
+    tree: bool = False,
+    cache_context: bool = False,
+    iterable: bool = False,
+) -> Callable[[F], _CachedFunction[F]]:
+    func = lambda orig: CacheDescriptor(
         orig,
         max_entries=max_entries,
         num_args=num_args,
@@ -696,22 +681,12 @@ def cached(
         iterable=iterable,
     )
 
-
-def cachedInlineCallbacks(
-    max_entries=1000, num_args=None, tree=False, cache_context=False, iterable=False
-):
-    return lambda orig: CacheDescriptor(
-        orig,
-        max_entries=max_entries,
-        num_args=num_args,
-        tree=tree,
-        inlineCallbacks=True,
-        cache_context=cache_context,
-        iterable=iterable,
-    )
+    return cast(Callable[[F], _CachedFunction[F]], func)
 
 
-def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=False):
+def cachedList(
+    cached_method_name: str, list_name: str, num_args: Optional[int] = None
+) -> Callable[[F], _CachedFunction[F]]:
     """Creates a descriptor that wraps a function in a `CacheListDescriptor`.
 
     Used to do batch lookups for an already created cache. A single argument
@@ -721,18 +696,16 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal
     cache.
 
     Args:
-        cached_method_name (str): The name of the single-item lookup method.
+        cached_method_name: The name of the single-item lookup method.
             This is only used to find the cache to use.
-        list_name (str): The name of the argument that is the list to use to
+        list_name: The name of the argument that is the list to use to
             do batch lookups in the cache.
-        num_args (int): Number of arguments to use as the key in the cache
+        num_args: Number of arguments to use as the key in the cache
             (including list_name). Defaults to all named parameters.
-        inlineCallbacks (bool): Should the function be wrapped in an
-            `defer.inlineCallbacks`?
 
     Example:
 
-        class Example(object):
+        class Example:
             @cached(num_args=2)
             def do_something(self, first_arg):
                 ...
@@ -741,10 +714,11 @@ def cachedList(cached_method_name, list_name, num_args=None, inlineCallbacks=Fal
             def batch_do_something(self, first_arg, second_args):
                 ...
     """
-    return lambda orig: CacheListDescriptor(
+    func = lambda orig: CacheListDescriptor(
         orig,
         cached_method_name=cached_method_name,
         list_name=list_name,
         num_args=num_args,
-        inlineCallbacks=inlineCallbacks,
     )
+
+    return cast(Callable[[F], _CachedFunction[F]], func)
diff --git a/synapse/util/caches/dictionary_cache.py b/synapse/util/caches/dictionary_cache.py
index 6834e6f3ae..8592b93689 100644
--- a/synapse/util/caches/dictionary_cache.py
+++ b/synapse/util/caches/dictionary_cache.py
@@ -40,7 +40,7 @@ class DictionaryEntry(namedtuple("DictionaryEntry", ("full", "known_absent", "va
         return len(self.value)
 
 
-class DictionaryCache(object):
+class DictionaryCache:
     """Caches key -> dictionary lookups, supporting caching partial dicts, i.e.
     fetching a subset of dictionary keys for a particular key.
     """
@@ -53,7 +53,7 @@ class DictionaryCache(object):
         self.thread = None
         # caches_by_name[name] = self.cache
 
-        class Sentinel(object):
+        class Sentinel:
             __slots__ = []
 
         self.sentinel = Sentinel()
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 2726b67b6d..e15f7ee698 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,8 +16,6 @@
 import logging
 from collections import OrderedDict
 
-from six import iteritems, itervalues
-
 from synapse.config import cache as cache_config
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import register_cache
@@ -28,7 +26,7 @@ logger = logging.getLogger(__name__)
 SENTINEL = object()
 
 
-class ExpiringCache(object):
+class ExpiringCache:
     def __init__(
         self,
         cache_name,
@@ -150,7 +148,7 @@ class ExpiringCache(object):
 
         keys_to_delete = set()
 
-        for key, cache_entry in iteritems(self._cache):
+        for key, cache_entry in self._cache.items():
             if now - cache_entry.time > self._expiry_ms:
                 keys_to_delete.add(key)
 
@@ -170,7 +168,7 @@ class ExpiringCache(object):
 
     def __len__(self):
         if self.iterable:
-            return sum(len(entry.value) for entry in itervalues(self._cache))
+            return sum(len(entry.value) for entry in self._cache.values())
         else:
             return len(self._cache)
 
@@ -192,7 +190,7 @@ class ExpiringCache(object):
         return False
 
 
-class _CacheEntry(object):
+class _CacheEntry:
     __slots__ = ["time", "value"]
 
     def __init__(self, time, value):
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index df4ea5901d..4bc1a67b58 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -30,7 +30,7 @@ def enumerate_leaves(node, depth):
                 yield m
 
 
-class _Node(object):
+class _Node:
     __slots__ = ["prev_node", "next_node", "key", "value", "callbacks"]
 
     def __init__(self, prev_node, next_node, key, value, callbacks=set()):
@@ -41,7 +41,7 @@ class _Node(object):
         self.callbacks = callbacks
 
 
-class LruCache(object):
+class LruCache:
     """
     Least-recently-used cache.
     Supports del_multi only if cache_type=TreeCache
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index a6c60888e5..df1a721add 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -23,7 +23,7 @@ from synapse.util.caches import register_cache
 logger = logging.getLogger(__name__)
 
 
-class ResponseCache(object):
+class ResponseCache:
     """
     This caches a deferred response. Until the deferred completes it will be
     returned from the cache. This means that if the client retries the request
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2a161bf244..c541bf4579 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -17,8 +17,6 @@ import logging
 import math
 from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
 
-from six import integer_types
-
 from sortedcontainers import SortedDict
 
 from synapse.types import Collection
@@ -88,7 +86,7 @@ class StreamChangeCache:
     def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) in integer_types
+        assert isinstance(stream_pos, int)
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 2ea4e4e911..eb4d98f683 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,11 +1,9 @@
 from typing import Dict
 
-from six import itervalues
-
 SENTINEL = object()
 
 
-class TreeCache(object):
+class TreeCache:
     """
     Tree-based backing store for LruCache. Allows subtrees of data to be deleted
     efficiently.
@@ -81,7 +79,7 @@ def iterate_tree_cache_entry(d):
     can contain dicts.
     """
     if isinstance(d, dict):
-        for value_d in itervalues(d):
+        for value_d in d.values():
             for value in iterate_tree_cache_entry(value_d):
                 yield value
     else:
@@ -91,7 +89,7 @@ def iterate_tree_cache_entry(d):
             yield d
 
 
-class _Entry(object):
+class _Entry:
     __slots__ = ["value"]
 
     def __init__(self, value):
diff --git a/synapse/util/caches/ttlcache.py b/synapse/util/caches/ttlcache.py
index 6437aa907e..3e180cafd3 100644
--- a/synapse/util/caches/ttlcache.py
+++ b/synapse/util/caches/ttlcache.py
@@ -26,7 +26,7 @@ logger = logging.getLogger(__name__)
 SENTINEL = object()
 
 
-class TTLCache(object):
+class TTLCache:
     """A key/value cache implementation where each entry has its own TTL"""
 
     def __init__(self, cache_name, timer=time.time):
@@ -154,7 +154,7 @@ class TTLCache(object):
 
 
 @attr.s(frozen=True, slots=True)
-class _CacheEntry(object):
+class _CacheEntry:
     """TTLCache entry"""
 
     # expiry_time is the first attribute, so that entries are sorted by expiry.
diff --git a/synapse/util/daemonize.py b/synapse/util/daemonize.py
new file mode 100644
index 0000000000..23393cf49b
--- /dev/null
+++ b/synapse/util/daemonize.py
@@ -0,0 +1,137 @@
+# -*- coding: utf-8 -*-
+# Copyright (c) 2012, 2013, 2014 Ilya Otyutskiy <ilya.otyutskiy@icloud.com>
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import atexit
+import fcntl
+import logging
+import os
+import signal
+import sys
+
+
+def daemonize_process(pid_file: str, logger: logging.Logger, chdir: str = "/") -> None:
+    """daemonize the current process
+
+    This calls fork(), and has the main process exit. When it returns we will be
+    running in the child process.
+    """
+
+    # If pidfile already exists, we should read pid from there; to overwrite it, if
+    # locking will fail, because locking attempt somehow purges the file contents.
+    if os.path.isfile(pid_file):
+        with open(pid_file, "r") as pid_fh:
+            old_pid = pid_fh.read()
+
+    # Create a lockfile so that only one instance of this daemon is running at any time.
+    try:
+        lock_fh = open(pid_file, "w")
+    except IOError:
+        print("Unable to create the pidfile.")
+        sys.exit(1)
+
+    try:
+        # Try to get an exclusive lock on the file. This will fail if another process
+        # has the file locked.
+        fcntl.flock(lock_fh, fcntl.LOCK_EX | fcntl.LOCK_NB)
+    except IOError:
+        print("Unable to lock on the pidfile.")
+        # We need to overwrite the pidfile if we got here.
+        #
+        # XXX better to avoid overwriting it, surely. this looks racey as the pid file
+        # could be created between us trying to read it and us trying to lock it.
+        with open(pid_file, "w") as pid_fh:
+            pid_fh.write(old_pid)
+        sys.exit(1)
+
+    # Fork, creating a new process for the child.
+    process_id = os.fork()
+
+    if process_id != 0:
+        # parent process: exit.
+
+        # we use os._exit to avoid running the atexit handlers. In particular, that
+        # means we don't flush the logs. This is important because if we are using
+        # a MemoryHandler, we could have logs buffered which are now buffered in both
+        # the main and the child process, so if we let the main process flush the logs,
+        # we'll get two copies.
+        os._exit(0)
+
+    # This is the child process. Continue.
+
+    # Stop listening for signals that the parent process receives.
+    # This is done by getting a new process id.
+    # setpgrp() is an alternative to setsid().
+    # setsid puts the process in a new parent group and detaches its controlling
+    # terminal.
+
+    os.setsid()
+
+    # point stdin, stdout, stderr at /dev/null
+    devnull = "/dev/null"
+    if hasattr(os, "devnull"):
+        # Python has set os.devnull on this system, use it instead as it might be
+        # different than /dev/null.
+        devnull = os.devnull
+
+    devnull_fd = os.open(devnull, os.O_RDWR)
+    os.dup2(devnull_fd, 0)
+    os.dup2(devnull_fd, 1)
+    os.dup2(devnull_fd, 2)
+    os.close(devnull_fd)
+
+    # now that we have redirected stderr to /dev/null, any uncaught exceptions will
+    # get sent to /dev/null, so make sure we log them.
+    #
+    # (we don't normally expect reactor.run to raise any exceptions, but this will
+    # also catch any other uncaught exceptions before we get that far.)
+
+    def excepthook(type_, value, traceback):
+        logger.critical("Unhanded exception", exc_info=(type_, value, traceback))
+
+    sys.excepthook = excepthook
+
+    # Set umask to default to safe file permissions when running as a root daemon. 027
+    # is an octal number which we are typing as 0o27 for Python3 compatibility.
+    os.umask(0o27)
+
+    # Change to a known directory. If this isn't done, starting a daemon in a
+    # subdirectory that needs to be deleted results in "directory busy" errors.
+    os.chdir(chdir)
+
+    try:
+        lock_fh.write("%s" % (os.getpid()))
+        lock_fh.flush()
+    except IOError:
+        logger.error("Unable to write pid to the pidfile.")
+        print("Unable to write pid to the pidfile.")
+        sys.exit(1)
+
+    # write a log line on SIGTERM.
+    def sigterm(signum, frame):
+        logger.warning("Caught signal %s. Stopping daemon." % signum)
+        sys.exit(0)
+
+    signal.signal(signal.SIGTERM, sigterm)
+
+    # Cleanup pid file at exit.
+    def exit():
+        logger.warning("Stopping daemon.")
+        os.remove(pid_file)
+        sys.exit(0)
+
+    atexit.register(exit)
+
+    logger.warning("Starting daemon.")
diff --git a/synapse/util/distributor.py b/synapse/util/distributor.py
index 45af8d3eeb..a750261e77 100644
--- a/synapse/util/distributor.py
+++ b/synapse/util/distributor.py
@@ -12,10 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import inspect
 import logging
 
 from twisted.internet import defer
+from twisted.internet.defer import Deferred, fail, succeed
+from twisted.python import failure
 
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -32,14 +34,14 @@ def user_joined_room(distributor, user, room_id):
     distributor.fire("user_joined_room", user=user, room_id=room_id)
 
 
-class Distributor(object):
+class Distributor:
     """A central dispatch point for loosely-connected pieces of code to
     register, observe, and fire signals.
 
     Signals are named simply by strings.
 
     TODO(paul): It would be nice to give signals stronger object identities,
-      so we can attach metadata, docstrings, detect typoes, etc... But this
+      so we can attach metadata, docstrings, detect typos, etc... But this
       model will do for today.
     """
 
@@ -79,7 +81,29 @@ class Distributor(object):
         run_as_background_process(name, self.signals[name].fire, *args, **kwargs)
 
 
-class Signal(object):
+def maybeAwaitableDeferred(f, *args, **kw):
+    """
+    Invoke a function that may or may not return a Deferred or an Awaitable.
+
+    This is a modified version of twisted.internet.defer.maybeDeferred.
+    """
+    try:
+        result = f(*args, **kw)
+    except Exception:
+        return fail(failure.Failure(captureVars=Deferred.debug))
+
+    if isinstance(result, Deferred):
+        return result
+    # Handle the additional case of an awaitable being returned.
+    elif inspect.isawaitable(result):
+        return defer.ensureDeferred(result)
+    elif isinstance(result, failure.Failure):
+        return fail(result)
+    else:
+        return succeed(result)
+
+
+class Signal:
     """A Signal is a dispatch point that stores a list of callables as
     observers of it.
 
@@ -122,7 +146,7 @@ class Signal(object):
                     ),
                 )
 
-            return defer.maybeDeferred(observer, *args, **kwargs).addErrback(eb)
+            return maybeAwaitableDeferred(observer, *args, **kwargs).addErrback(eb)
 
         deferreds = [run_in_background(do, o) for o in self.observers]
 
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 8b17d1c8b8..733f5e26e6 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,14 +13,14 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six.moves import queue
+import queue
 
 from twisted.internet import threads
 
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 
 
-class BackgroundFileConsumer(object):
+class BackgroundFileConsumer:
     """A consumer that writes to a file like object. Supports both push
     and pull producers
 
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 9815bb8667..0e445e01d7 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six import binary_type, text_type
-
 from canonicaljson import json
 from frozendict import frozendict
 
@@ -26,7 +24,7 @@ def freeze(o):
     if isinstance(o, frozendict):
         return o
 
-    if isinstance(o, (binary_type, text_type)):
+    if isinstance(o, (bytes, str)):
         return o
 
     try:
@@ -41,7 +39,7 @@ def unfreeze(o):
     if isinstance(o, (dict, frozendict)):
         return dict({k: unfreeze(v) for k, v in o.items()})
 
-    if isinstance(o, (binary_type, text_type)):
+    if isinstance(o, (bytes, str)):
         return o
 
     try:
@@ -65,5 +63,8 @@ def _handle_frozendict(obj):
     )
 
 
-# A JSONEncoder which is capable of encoding frozendicts without barfing
-frozendict_json_encoder = json.JSONEncoder(default=_handle_frozendict)
+# A JSONEncoder which is capable of encoding frozendicts without barfing.
+# Additionally reduce the whitespace produced by JSON encoding.
+frozendict_json_encoder = json.JSONEncoder(
+    default=_handle_frozendict, separators=(",", ":"),
+)
diff --git a/synapse/util/jsonobject.py b/synapse/util/jsonobject.py
index 6dce03dd3a..50516926f3 100644
--- a/synapse/util/jsonobject.py
+++ b/synapse/util/jsonobject.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 
-class JsonEncodedObject(object):
+class JsonEncodedObject:
     """ A common base class for defining protocol units that are represented
     as JSON.
 
diff --git a/synapse/util/metrics.py b/synapse/util/metrics.py
index ec61e14423..6e57c1ee72 100644
--- a/synapse/util/metrics.py
+++ b/synapse/util/metrics.py
@@ -13,14 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import inspect
 import logging
 from functools import wraps
+from typing import Any, Callable, Optional, TypeVar, cast
 
 from prometheus_client import Counter
 
-from twisted.internet import defer
-
 from synapse.logging.context import LoggingContext, current_context
 from synapse.metrics import InFlightGauge
 
@@ -60,34 +58,42 @@ in_flight = InFlightGauge(
     sub_metrics=["real_time_max", "real_time_sum"],
 )
 
+T = TypeVar("T", bound=Callable[..., Any])
 
-def measure_func(name=None):
-    def wrapper(func):
-        block_name = func.__name__ if name is None else name
 
-        if inspect.iscoroutinefunction(func):
+def measure_func(name: Optional[str] = None) -> Callable[[T], T]:
+    """
+    Used to decorate an async function with a `Measure` context manager.
+
+    Usage:
+
+    @measure_func()
+    async def foo(...):
+        ...
 
-            @wraps(func)
-            async def measured_func(self, *args, **kwargs):
-                with Measure(self.clock, block_name):
-                    r = await func(self, *args, **kwargs)
-                return r
+    Which is analogous to:
 
-        else:
+    async def foo(...):
+        with Measure(...):
+            ...
+
+    """
+
+    def wrapper(func: T) -> T:
+        block_name = func.__name__ if name is None else name
 
-            @wraps(func)
-            @defer.inlineCallbacks
-            def measured_func(self, *args, **kwargs):
-                with Measure(self.clock, block_name):
-                    r = yield func(self, *args, **kwargs)
-                return r
+        @wraps(func)
+        async def measured_func(self, *args, **kwargs):
+            with Measure(self.clock, block_name):
+                r = await func(self, *args, **kwargs)
+            return r
 
-        return measured_func
+        return cast(T, measured_func)
 
     return wrapper
 
 
-class Measure(object):
+class Measure:
     __slots__ = [
         "clock",
         "name",
diff --git a/synapse/util/patch_inline_callbacks.py b/synapse/util/patch_inline_callbacks.py
index 2605f3c65b..54c046b6e1 100644
--- a/synapse/util/patch_inline_callbacks.py
+++ b/synapse/util/patch_inline_callbacks.py
@@ -192,7 +192,7 @@ def _check_yield_points(f: Callable, changes: List[str]):
                 result = yield d
             except Exception:
                 # this will fish an earlier Failure out of the stack where possible, and
-                # thus is preferable to passing in an exeception to the Failure
+                # thus is preferable to passing in an exception to the Failure
                 # constructor, since it results in less stack-mangling.
                 result = Failure()
 
diff --git a/synapse/util/ratelimitutils.py b/synapse/util/ratelimitutils.py
index e5efdfcd02..70d11e1ec3 100644
--- a/synapse/util/ratelimitutils.py
+++ b/synapse/util/ratelimitutils.py
@@ -29,7 +29,7 @@ from synapse.logging.context import (
 logger = logging.getLogger(__name__)
 
 
-class FederationRateLimiter(object):
+class FederationRateLimiter:
     def __init__(self, clock, config):
         """
         Args:
@@ -60,7 +60,7 @@ class FederationRateLimiter(object):
         return self.ratelimiters[host].ratelimit()
 
 
-class _PerHostRatelimiter(object):
+class _PerHostRatelimiter:
     def __init__(self, clock, config):
         """
         Args:
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index af69587196..79869aaa44 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -15,14 +15,12 @@
 import logging
 import random
 
-from twisted.internet import defer
-
 import synapse.logging.context
 from synapse.api.errors import CodeMessageException
 
 logger = logging.getLogger(__name__)
 
-# the intial backoff, after the first transaction fails
+# the initial backoff, after the first transaction fails
 MIN_RETRY_INTERVAL = 10 * 60 * 1000
 
 # how much we multiply the backoff by after each subsequent fail
@@ -54,8 +52,7 @@ class NotRetryingDestination(Exception):
         self.destination = destination
 
 
-@defer.inlineCallbacks
-def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
+async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs):
     """For a given destination check if we have previously failed to
     send a request there and are waiting before retrying the destination.
     If we are not ready to retry the destination, this will raise a
@@ -73,9 +70,9 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
     Example usage:
 
         try:
-            limiter = yield get_retry_limiter(destination, clock, store)
+            limiter = await get_retry_limiter(destination, clock, store)
             with limiter:
-                response = yield do_request()
+                response = await do_request()
         except NotRetryingDestination:
             # We aren't ready to retry that destination.
             raise
@@ -83,7 +80,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
     failure_ts = None
     retry_last_ts, retry_interval = (0, 0)
 
-    retry_timings = yield store.get_destination_retry_timings(destination)
+    retry_timings = await store.get_destination_retry_timings(destination)
 
     if retry_timings:
         failure_ts = retry_timings["failure_ts"]
@@ -117,7 +114,7 @@ def get_retry_limiter(destination, clock, store, ignore_backoff=False, **kwargs)
     )
 
 
-class RetryDestinationLimiter(object):
+class RetryDestinationLimiter:
     def __init__(
         self,
         destination,
@@ -174,7 +171,7 @@ class RetryDestinationLimiter(object):
             # has been decommissioned.
             # If we get a 401, then we should probably back off since they
             # won't accept our requests for at least a while.
-            # 429 is us being aggresively rate limited, so lets rate limit
+            # 429 is us being aggressively rate limited, so lets rate limit
             # ourselves.
             if exc_val.code == 404 and self.backoff_on_404:
                 valid_err_code = False
@@ -222,10 +219,9 @@ class RetryDestinationLimiter(object):
             if self.failure_ts is None:
                 self.failure_ts = retry_last_ts
 
-        @defer.inlineCallbacks
-        def store_retry_timings():
+        async def store_retry_timings():
             try:
-                yield self.store.set_destination_retry_timings(
+                await self.store.set_destination_retry_timings(
                     self.destination,
                     self.failure_ts,
                     retry_last_ts,
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 08c86e92b8..61d96a6c28 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -17,16 +17,14 @@ import itertools
 import random
 import re
 import string
-from collections import Iterable
+from collections.abc import Iterable
 
 from synapse.api.errors import Codes, SynapseError
 
 _string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
 
 # https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-register-email-requesttoken
-# Note: The : character is allowed here for older clients, but will be removed in a
-# future release. Context: https://github.com/matrix-org/synapse/issues/6766
-client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-\:]+$")
+client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-]+$")
 
 # random_string and random_string_with_symbols are used for a range of things,
 # some cryptographically important, some less so. We use SystemRandom to make sure
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
index 20cf4c4a81..bd63b9107e 100644
--- a/synapse/util/threepids.py
+++ b/synapse/util/threepids.py
@@ -74,3 +74,26 @@ async def check_3pid_allowed(hs, medium, address):
         return True
 
     return False
+
+
+def canonicalise_email(address: str) -> str:
+    """'Canonicalise' email address
+    Case folding of local part of email address and lowercase domain part
+    See MSC2265, https://github.com/matrix-org/matrix-doc/pull/2265
+
+    Args:
+        address: email address to be canonicalised
+    Returns:
+        The canonical form of the email address
+    Raises:
+        ValueError if the address could not be parsed.
+    """
+
+    address = address.strip()
+
+    parts = address.split("@")
+    if len(parts) != 2:
+        logger.debug("Couldn't parse email address %s", address)
+        raise ValueError("Unable to parse email address")
+
+    return parts[0].casefold() + "@" + parts[1].lower()
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 9bf6a44f75..be3b22469d 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -13,10 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six.moves import range
 
-
-class _Entry(object):
+class _Entry:
     __slots__ = ["end_key", "queue"]
 
     def __init__(self, end_key):
@@ -24,7 +22,7 @@ class _Entry(object):
         self.queue = []
 
 
-class WheelTimer(object):
+class WheelTimer:
     """Stores arbitrary objects that will be returned after their timers have
     expired.
     """