summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/api/auth.py117
-rw-r--r--synapse/appservice/__init__.py4
-rw-r--r--synapse/config/cas.py46
-rw-r--r--synapse/config/jwt_config.py2
-rw-r--r--synapse/config/logger.py98
-rw-r--r--synapse/config/oidc_config.py7
-rw-r--r--synapse/config/registration.py2
-rw-r--r--synapse/config/room_directory.py2
-rw-r--r--synapse/config/saml2_config.py81
-rw-r--r--synapse/config/tracer.py2
-rw-r--r--synapse/crypto/context_factory.py2
-rw-r--r--synapse/events/__init__.py6
-rw-r--r--synapse/events/utils.py2
-rw-r--r--synapse/federation/transport/server.py2
-rw-r--r--synapse/groups/attestations.py2
-rw-r--r--synapse/groups/groups_server.py4
-rw-r--r--synapse/handlers/account_validity.py29
-rw-r--r--synapse/handlers/admin.py4
-rw-r--r--synapse/handlers/appservice.py121
-rw-r--r--synapse/handlers/auth.py37
-rw-r--r--synapse/handlers/cas_handler.py4
-rw-r--r--synapse/handlers/e2e_keys.py27
-rw-r--r--synapse/handlers/federation.py14
-rw-r--r--synapse/handlers/groups_local.py9
-rw-r--r--synapse/handlers/message.py37
-rw-r--r--synapse/handlers/oidc_handler.py10
-rw-r--r--synapse/handlers/presence.py16
-rw-r--r--synapse/handlers/profile.py13
-rw-r--r--synapse/handlers/register.py7
-rw-r--r--synapse/handlers/room.py31
-rw-r--r--synapse/handlers/room_member.py8
-rw-r--r--synapse/handlers/saml_handler.py4
-rw-r--r--synapse/handlers/search.py2
-rw-r--r--synapse/handlers/state_deltas.py2
-rw-r--r--synapse/handlers/sync.py4
-rw-r--r--synapse/handlers/typing.py23
-rw-r--r--synapse/handlers/user_directory.py2
-rw-r--r--synapse/http/client.py2
-rw-r--r--synapse/http/federation/well_known_resolver.py2
-rw-r--r--synapse/http/matrixfederationclient.py6
-rw-r--r--synapse/http/request_metrics.py2
-rw-r--r--synapse/http/server.py6
-rw-r--r--synapse/http/servlet.py3
-rw-r--r--synapse/http/site.py50
-rw-r--r--synapse/logging/__init__.py20
-rw-r--r--synapse/logging/_remote.py122
-rw-r--r--synapse/logging/_structured.py329
-rw-r--r--synapse/logging/_terse_json.py192
-rw-r--r--synapse/logging/filter.py33
-rw-r--r--synapse/logging/opentracing.py10
-rw-r--r--synapse/metrics/background_process_metrics.py14
-rw-r--r--synapse/notifier.py104
-rw-r--r--synapse/push/baserules.py2
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py41
-rw-r--r--synapse/push/mailer.py41
-rw-r--r--synapse/push/pusherpool.py18
-rw-r--r--synapse/replication/http/membership.py6
-rw-r--r--synapse/replication/http/send_event.py3
-rw-r--r--synapse/replication/tcp/client.py20
-rw-r--r--synapse/replication/tcp/redis.py4
-rw-r--r--synapse/replication/tcp/resource.py10
-rw-r--r--synapse/replication/tcp/streams/events.py21
-rw-r--r--synapse/rest/admin/__init__.py10
-rw-r--r--synapse/rest/admin/devices.py2
-rw-r--r--synapse/rest/admin/event_reports.py46
-rw-r--r--synapse/rest/admin/media.py81
-rw-r--r--synapse/rest/admin/users.py124
-rw-r--r--synapse/rest/client/v1/events.py3
-rw-r--r--synapse/rest/client/v1/login.py3
-rw-r--r--synapse/rest/client/v1/logout.py6
-rw-r--r--synapse/rest/client/v1/presence.py3
-rw-r--r--synapse/rest/client/v1/profile.py6
-rw-r--r--synapse/rest/client/v1/push_rule.py3
-rw-r--r--synapse/rest/client/v1/pusher.py9
-rw-r--r--synapse/rest/client/v1/room.py17
-rw-r--r--synapse/rest/client/v1/voip.py3
-rw-r--r--synapse/rest/client/v2_alpha/account.py3
-rw-r--r--synapse/rest/client/v2_alpha/auth.py3
-rw-r--r--synapse/rest/client/v2_alpha/register.py3
-rw-r--r--synapse/rest/media/v1/filepath.py17
-rw-r--r--synapse/rest/media/v1/media_repository.py237
-rw-r--r--synapse/rest/media/v1/media_storage.py30
-rw-r--r--synapse/server_notices/consent_server_notices.py2
-rw-r--r--synapse/state/__init__.py2
-rw-r--r--synapse/state/v1.py2
-rw-r--r--synapse/state/v2.py2
-rw-r--r--synapse/static/client/login/js/login.js2
-rw-r--r--synapse/storage/database.py4
-rw-r--r--synapse/storage/databases/main/__init__.py1
-rw-r--r--synapse/storage/databases/main/appservice.py98
-rw-r--r--synapse/storage/databases/main/events_bg_updates.py7
-rw-r--r--synapse/storage/databases/main/events_worker.py62
-rw-r--r--synapse/storage/databases/main/media_repository.py131
-rw-r--r--synapse/storage/databases/main/profile.py4
-rw-r--r--synapse/storage/databases/main/registration.py202
-rw-r--r--synapse/storage/databases/main/room.py104
-rw-r--r--synapse/storage/databases/main/schema/delta/58/22puppet_token.sql17
-rw-r--r--synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql2
-rw-r--r--synapse/types.py33
-rw-r--r--synapse/util/caches/descriptors.py235
-rw-r--r--synapse/util/retryutils.py2
101 files changed, 2087 insertions, 1278 deletions
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index bff87fabde..bfcaf68b2a 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
 from synapse.logging import opentracing as opentracing
+from synapse.storage.databases.main.registration import TokenLookupResult
 from synapse.types import StateMap, UserID
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.metrics import Measure
@@ -184,18 +185,12 @@ class Auth:
         """
         try:
             ip_addr = self.hs.get_ip_from_request(request)
-            user_agent = request.requestHeaders.getRawHeaders(
-                b"User-Agent", default=[b""]
-            )[0].decode("ascii", "surrogateescape")
+            user_agent = request.get_user_agent("")
 
             access_token = self.get_access_token_from_request(request)
 
             user_id, app_service = await self._get_appservice_user_id(request)
             if user_id:
-                request.authenticated_entity = user_id
-                opentracing.set_tag("authenticated_entity", user_id)
-                opentracing.set_tag("appservice_id", app_service.id)
-
                 if ip_addr and self._track_appservice_user_ips:
                     await self.store.insert_client_ip(
                         user_id=user_id,
@@ -205,31 +200,38 @@ class Auth:
                         device_id="dummy-device",  # stubbed
                     )
 
-                return synapse.types.create_requester(user_id, app_service=app_service)
+                requester = synapse.types.create_requester(
+                    user_id, app_service=app_service
+                )
+
+                request.requester = user_id
+                opentracing.set_tag("authenticated_entity", user_id)
+                opentracing.set_tag("user_id", user_id)
+                opentracing.set_tag("appservice_id", app_service.id)
+
+                return requester
 
             user_info = await self.get_user_by_access_token(
                 access_token, rights, allow_expired=allow_expired
             )
-            user = user_info["user"]
-            token_id = user_info["token_id"]
-            is_guest = user_info["is_guest"]
-            shadow_banned = user_info["shadow_banned"]
+            token_id = user_info.token_id
+            is_guest = user_info.is_guest
+            shadow_banned = user_info.shadow_banned
 
             # Deny the request if the user account has expired.
             if self._account_validity.enabled and not allow_expired:
-                user_id = user.to_string()
-                if await self.store.is_account_expired(user_id, self.clock.time_msec()):
+                if await self.store.is_account_expired(
+                    user_info.user_id, self.clock.time_msec()
+                ):
                     raise AuthError(
                         403, "User account has expired", errcode=Codes.EXPIRED_ACCOUNT
                     )
 
-            # device_id may not be present if get_user_by_access_token has been
-            # stubbed out.
-            device_id = user_info.get("device_id")
+            device_id = user_info.device_id
 
-            if user and access_token and ip_addr:
+            if access_token and ip_addr:
                 await self.store.insert_client_ip(
-                    user_id=user.to_string(),
+                    user_id=user_info.token_owner,
                     access_token=access_token,
                     ip=ip_addr,
                     user_agent=user_agent,
@@ -243,19 +245,23 @@ class Auth:
                     errcode=Codes.GUEST_ACCESS_FORBIDDEN,
                 )
 
-            request.authenticated_entity = user.to_string()
-            opentracing.set_tag("authenticated_entity", user.to_string())
-            if device_id:
-                opentracing.set_tag("device_id", device_id)
-
-            return synapse.types.create_requester(
-                user,
+            requester = synapse.types.create_requester(
+                user_info.user_id,
                 token_id,
                 is_guest,
                 shadow_banned,
                 device_id,
                 app_service=app_service,
+                authenticated_entity=user_info.token_owner,
             )
+
+            request.requester = requester
+            opentracing.set_tag("authenticated_entity", user_info.token_owner)
+            opentracing.set_tag("user_id", user_info.user_id)
+            if device_id:
+                opentracing.set_tag("device_id", device_id)
+
+            return requester
         except KeyError:
             raise MissingClientTokenError()
 
@@ -286,7 +292,7 @@ class Auth:
 
     async def get_user_by_access_token(
         self, token: str, rights: str = "access", allow_expired: bool = False,
-    ) -> dict:
+    ) -> TokenLookupResult:
         """ Validate access token and get user_id from it
 
         Args:
@@ -295,13 +301,7 @@ class Auth:
                 allow this
             allow_expired: If False, raises an InvalidClientTokenError
                 if the token is expired
-        Returns:
-            dict that includes:
-               `user` (UserID)
-               `is_guest` (bool)
-               `shadow_banned` (bool)
-               `token_id` (int|None): access token id. May be None if guest
-               `device_id` (str|None): device corresponding to access token
+
         Raises:
             InvalidClientTokenError if a user by that token exists, but the token is
                 expired
@@ -311,9 +311,9 @@ class Auth:
 
         if rights == "access":
             # first look in the database
-            r = await self._look_up_user_by_access_token(token)
+            r = await self.store.get_user_by_access_token(token)
             if r:
-                valid_until_ms = r["valid_until_ms"]
+                valid_until_ms = r.valid_until_ms
                 if (
                     not allow_expired
                     and valid_until_ms is not None
@@ -330,7 +330,6 @@ class Auth:
         # otherwise it needs to be a valid macaroon
         try:
             user_id, guest = self._parse_and_validate_macaroon(token, rights)
-            user = UserID.from_string(user_id)
 
             if rights == "access":
                 if not guest:
@@ -356,23 +355,17 @@ class Auth:
                     raise InvalidClientTokenError(
                         "Guest access token used for regular user"
                     )
-                ret = {
-                    "user": user,
-                    "is_guest": True,
-                    "shadow_banned": False,
-                    "token_id": None,
+
+                ret = TokenLookupResult(
+                    user_id=user_id,
+                    is_guest=True,
                     # all guests get the same device id
-                    "device_id": GUEST_DEVICE_ID,
-                }
+                    device_id=GUEST_DEVICE_ID,
+                )
             elif rights == "delete_pusher":
                 # We don't store these tokens in the database
-                ret = {
-                    "user": user,
-                    "is_guest": False,
-                    "shadow_banned": False,
-                    "token_id": None,
-                    "device_id": None,
-                }
+
+                ret = TokenLookupResult(user_id=user_id, is_guest=False)
             else:
                 raise RuntimeError("Unknown rights setting %s", rights)
             return ret
@@ -481,31 +474,15 @@ class Auth:
         now = self.hs.get_clock().time_msec()
         return now < expiry
 
-    async def _look_up_user_by_access_token(self, token):
-        ret = await self.store.get_user_by_access_token(token)
-        if not ret:
-            return None
-
-        # we use ret.get() below because *lots* of unit tests stub out
-        # get_user_by_access_token in a way where it only returns a couple of
-        # the fields.
-        user_info = {
-            "user": UserID.from_string(ret.get("name")),
-            "token_id": ret.get("token_id", None),
-            "is_guest": False,
-            "shadow_banned": ret.get("shadow_banned"),
-            "device_id": ret.get("device_id"),
-            "valid_until_ms": ret.get("valid_until_ms"),
-        }
-        return user_info
-
     def get_appservice_by_req(self, request):
         token = self.get_access_token_from_request(request)
         service = self.store.get_app_service_by_token(token)
         if not service:
             logger.warning("Unrecognised appservice access token.")
             raise InvalidClientTokenError()
-        request.authenticated_entity = service.sender
+        request.requester = synapse.types.create_requester(
+            service.sender, app_service=service
+        )
         return service
 
     async def is_server_admin(self, user: UserID) -> bool:
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index f70841ae86..3944780a42 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -52,11 +52,11 @@ class ApplicationService:
         self,
         token,
         hostname,
+        id,
+        sender,
         url=None,
         namespaces=None,
         hs_token=None,
-        sender=None,
-        id=None,
         protocols=None,
         rate_limited=True,
         ip_range_whitelist=None,
diff --git a/synapse/config/cas.py b/synapse/config/cas.py
index 4526c1a67b..2f97e6d258 100644
--- a/synapse/config/cas.py
+++ b/synapse/config/cas.py
@@ -26,14 +26,14 @@ class CasConfig(Config):
 
     def read_config(self, config, **kwargs):
         cas_config = config.get("cas_config", None)
-        if cas_config:
-            self.cas_enabled = cas_config.get("enabled", True)
+        self.cas_enabled = cas_config and cas_config.get("enabled", True)
+
+        if self.cas_enabled:
             self.cas_server_url = cas_config["server_url"]
             self.cas_service_url = cas_config["service_url"]
             self.cas_displayname_attribute = cas_config.get("displayname_attribute")
-            self.cas_required_attributes = cas_config.get("required_attributes", {})
+            self.cas_required_attributes = cas_config.get("required_attributes") or {}
         else:
-            self.cas_enabled = False
             self.cas_server_url = None
             self.cas_service_url = None
             self.cas_displayname_attribute = None
@@ -41,13 +41,35 @@ class CasConfig(Config):
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         return """
-        # Enable CAS for registration and login.
+        # Enable Central Authentication Service (CAS) for registration and login.
         #
-        #cas_config:
-        #   enabled: true
-        #   server_url: "https://cas-server.com"
-        #   service_url: "https://homeserver.domain.com:8448"
-        #   #displayname_attribute: name
-        #   #required_attributes:
-        #   #    name: value
+        cas_config:
+          # Uncomment the following to enable authorization against a CAS server.
+          # Defaults to false.
+          #
+          #enabled: true
+
+          # The URL of the CAS authorization endpoint.
+          #
+          #server_url: "https://cas-server.com"
+
+          # The public URL of the homeserver.
+          #
+          #service_url: "https://homeserver.domain.com:8448"
+
+          # The attribute of the CAS response to use as the display name.
+          #
+          # If unset, no displayname will be set.
+          #
+          #displayname_attribute: name
+
+          # It is possible to configure Synapse to only allow logins if CAS attributes
+          # match particular values. All of the keys in the mapping below must exist
+          # and the values must match the given value. Alternately if the given value
+          # is None then any value is allowed (the attribute just must exist).
+          # All of the listed attributes must match for the login to be permitted.
+          #
+          #required_attributes:
+          #  userGroup: "staff"
+          #  department: None
         """
diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt_config.py
index 3252ad9e7f..f30330abb6 100644
--- a/synapse/config/jwt_config.py
+++ b/synapse/config/jwt_config.py
@@ -63,7 +63,7 @@ class JWTConfig(Config):
         # and issued at ("iat") claims are validated if present.
         #
         # Note that this is a non-standard login type and client support is
-        # expected to be non-existant.
+        # expected to be non-existent.
         #
         # See https://github.com/matrix-org/synapse/blob/master/docs/jwt.md.
         #
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index 13d6f6a3ea..d4e887a3e0 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -23,7 +23,6 @@ from string import Template
 import yaml
 
 from twisted.logger import (
-    ILogObserver,
     LogBeginner,
     STDLibLogObserver,
     eventAsText,
@@ -32,11 +31,9 @@ from twisted.logger import (
 
 import synapse
 from synapse.app import _base as appbase
-from synapse.logging._structured import (
-    reload_structured_logging,
-    setup_structured_logging,
-)
+from synapse.logging._structured import setup_structured_logging
 from synapse.logging.context import LoggingContextFilter
+from synapse.logging.filter import MetadataFilter
 from synapse.util.versionstring import get_version_string
 
 from ._base import Config, ConfigError
@@ -48,7 +45,11 @@ DEFAULT_LOG_CONFIG = Template(
 # This is a YAML file containing a standard Python logging configuration
 # dictionary. See [1] for details on the valid settings.
 #
+# Synapse also supports structured logging for machine readable logs which can
+# be ingested by ELK stacks. See [2] for details.
+#
 # [1]: https://docs.python.org/3.7/library/logging.config.html#configuration-dictionary-schema
+# [2]: https://github.com/matrix-org/synapse/blob/master/docs/structured_logging.md
 
 version: 1
 
@@ -105,7 +106,7 @@ root:
     # then write them to a file.
     #
     # Replace "buffer" with "console" to log to stderr instead. (Note that you'll
-    # also need to update the configuation for the `twisted` logger above, in
+    # also need to update the configuration for the `twisted` logger above, in
     # this case.)
     #
     handlers: [buffer]
@@ -176,11 +177,11 @@ class LoggingConfig(Config):
                 log_config_file.write(DEFAULT_LOG_CONFIG.substitute(log_file=log_file))
 
 
-def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
+def _setup_stdlib_logging(config, log_config_path, logBeginner: LogBeginner) -> None:
     """
-    Set up Python stdlib logging.
+    Set up Python standard library logging.
     """
-    if log_config is None:
+    if log_config_path is None:
         log_format = (
             "%(asctime)s - %(name)s - %(lineno)d - %(levelname)s - %(request)s"
             " - %(message)s"
@@ -196,7 +197,8 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
         handler.setFormatter(formatter)
         logger.addHandler(handler)
     else:
-        logging.config.dictConfig(log_config)
+        # Load the logging configuration.
+        _load_logging_config(log_config_path)
 
     # We add a log record factory that runs all messages through the
     # LoggingContextFilter so that we get the context *at the time we log*
@@ -204,12 +206,14 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
     # filter options, but care must when using e.g. MemoryHandler to buffer
     # writes.
 
-    log_filter = LoggingContextFilter(request="")
+    log_context_filter = LoggingContextFilter(request="")
+    log_metadata_filter = MetadataFilter({"server_name": config.server_name})
     old_factory = logging.getLogRecordFactory()
 
     def factory(*args, **kwargs):
         record = old_factory(*args, **kwargs)
-        log_filter.filter(record)
+        log_context_filter.filter(record)
+        log_metadata_filter.filter(record)
         return record
 
     logging.setLogRecordFactory(factory)
@@ -255,21 +259,40 @@ def _setup_stdlib_logging(config, log_config, logBeginner: LogBeginner):
     if not config.no_redirect_stdio:
         print("Redirected stdout/stderr to logs")
 
-    return observer
-
 
-def _reload_stdlib_logging(*args, log_config=None):
-    logger = logging.getLogger("")
+def _load_logging_config(log_config_path: str) -> None:
+    """
+    Configure logging from a log config path.
+    """
+    with open(log_config_path, "rb") as f:
+        log_config = yaml.safe_load(f.read())
 
     if not log_config:
-        logger.warning("Reloaded a blank config?")
+        logging.warning("Loaded a blank logging config?")
+
+    # If the old structured logging configuration is being used, convert it to
+    # the new style configuration.
+    if "structured" in log_config and log_config.get("structured"):
+        log_config = setup_structured_logging(log_config)
 
     logging.config.dictConfig(log_config)
 
 
+def _reload_logging_config(log_config_path):
+    """
+    Reload the log configuration from the file and apply it.
+    """
+    # If no log config path was given, it cannot be reloaded.
+    if log_config_path is None:
+        return
+
+    _load_logging_config(log_config_path)
+    logging.info("Reloaded log config from %s due to SIGHUP", log_config_path)
+
+
 def setup_logging(
     hs, config, use_worker_options=False, logBeginner: LogBeginner = globalLogBeginner
-) -> ILogObserver:
+) -> None:
     """
     Set up the logging subsystem.
 
@@ -282,41 +305,18 @@ def setup_logging(
 
         logBeginner: The Twisted logBeginner to use.
 
-    Returns:
-        The "root" Twisted Logger observer, suitable for sending logs to from a
-        Logger instance.
     """
-    log_config = config.worker_log_config if use_worker_options else config.log_config
-
-    def read_config(*args, callback=None):
-        if log_config is None:
-            return None
-
-        with open(log_config, "rb") as f:
-            log_config_body = yaml.safe_load(f.read())
-
-        if callback:
-            callback(log_config=log_config_body)
-            logging.info("Reloaded log config from %s due to SIGHUP", log_config)
-
-        return log_config_body
+    log_config_path = (
+        config.worker_log_config if use_worker_options else config.log_config
+    )
 
-    log_config_body = read_config()
+    # Perform one-time logging configuration.
+    _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
+    # Add a SIGHUP handler to reload the logging configuration, if one is available.
+    appbase.register_sighup(_reload_logging_config, log_config_path)
 
-    if log_config_body and log_config_body.get("structured") is True:
-        logger = setup_structured_logging(
-            hs, config, log_config_body, logBeginner=logBeginner
-        )
-        appbase.register_sighup(read_config, callback=reload_structured_logging)
-    else:
-        logger = _setup_stdlib_logging(config, log_config_body, logBeginner=logBeginner)
-        appbase.register_sighup(read_config, callback=_reload_stdlib_logging)
-
-    # make sure that the first thing we log is a thing we can grep backwards
-    # for
+    # Log immediately so we can grep backwards.
     logging.warning("***** STARTING SERVER *****")
     logging.warning("Server %s version %s", sys.argv[0], get_version_string(synapse))
     logging.info("Server hostname: %s", config.server_name)
     logging.info("Instance name: %s", hs.get_instance_name())
-
-    return logger
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index 7597fbc864..69d188341c 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -87,11 +87,10 @@ class OIDCConfig(Config):
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
         return """\
-        # OpenID Connect integration. The following settings can be used to make Synapse
-        # use an OpenID Connect Provider for authentication, instead of its internal
-        # password database.
+        # Enable OpenID Connect (OIDC) / OAuth 2.0 for registration and login.
         #
-        # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md.
+        # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md
+        # for some example configurations.
         #
         oidc_config:
           # Uncomment the following to enable authorization against an OpenID Connect
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index d7e3690a32..b0a77a2e43 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -143,7 +143,7 @@ class RegistrationConfig(Config):
             RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
         }
 
-        # Pull the creater/inviter from the configuration, this gets used to
+        # Pull the creator/inviter from the configuration, this gets used to
         # send invites for invite-only rooms.
         mxid_localpart = config.get("auto_join_mxid_localpart")
         self.auto_join_user_id = None
diff --git a/synapse/config/room_directory.py b/synapse/config/room_directory.py
index 6de1f9d103..92e1b67528 100644
--- a/synapse/config/room_directory.py
+++ b/synapse/config/room_directory.py
@@ -99,7 +99,7 @@ class RoomDirectoryConfig(Config):
         #
         # Options for the rules include:
         #
-        #   user_id: Matches agaisnt the creator of the alias
+        #   user_id: Matches against the creator of the alias
         #   room_id: Matches against the room ID being published
         #   alias: Matches against any current local or canonical aliases
         #            associated with the room
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index 99aa8b3bf1..778750f43b 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -216,10 +216,8 @@ class SAML2Config(Config):
         return """\
         ## Single sign-on integration ##
 
-        # Enable SAML2 for registration and login. Uses pysaml2.
-        #
-        # At least one of `sp_config` or `config_path` must be set in this section to
-        # enable SAML login.
+        # The following settings can be used to make Synapse use a single sign-on
+        # provider for authentication, instead of its internal password database.
         #
         # You will probably also want to set the following options to `false` to
         # disable the regular login/registration flows:
@@ -228,6 +226,11 @@ class SAML2Config(Config):
         #
         # You will also want to investigate the settings under the "sso" configuration
         # section below.
+
+        # Enable SAML2 for registration and login. Uses pysaml2.
+        #
+        # At least one of `sp_config` or `config_path` must be set in this section to
+        # enable SAML login.
         #
         # Once SAML support is enabled, a metadata file will be exposed at
         # https://<server>:<port>/_matrix/saml2/metadata.xml, which you may be able to
@@ -243,40 +246,42 @@ class SAML2Config(Config):
           # so it is not normally necessary to specify them unless you need to
           # override them.
           #
-          #sp_config:
-          #  # point this to the IdP's metadata. You can use either a local file or
-          #  # (preferably) a URL.
-          #  metadata:
-          #    #local: ["saml2/idp.xml"]
-          #    remote:
-          #      - url: https://our_idp/metadata.xml
-          #
-          #  # By default, the user has to go to our login page first. If you'd like
-          #  # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
-          #  # 'service.sp' section:
-          #  #
-          #  #service:
-          #  #  sp:
-          #  #    allow_unsolicited: true
-          #
-          #  # The examples below are just used to generate our metadata xml, and you
-          #  # may well not need them, depending on your setup. Alternatively you
-          #  # may need a whole lot more detail - see the pysaml2 docs!
-          #
-          #  description: ["My awesome SP", "en"]
-          #  name: ["Test SP", "en"]
-          #
-          #  organization:
-          #    name: Example com
-          #    display_name:
-          #      - ["Example co", "en"]
-          #    url: "http://example.com"
-          #
-          #  contact_person:
-          #    - given_name: Bob
-          #      sur_name: "the Sysadmin"
-          #      email_address": ["admin@example.com"]
-          #      contact_type": technical
+          sp_config:
+            # Point this to the IdP's metadata. You must provide either a local
+            # file via the `local` attribute or (preferably) a URL via the
+            # `remote` attribute.
+            #
+            #metadata:
+            #  local: ["saml2/idp.xml"]
+            #  remote:
+            #    - url: https://our_idp/metadata.xml
+
+            # By default, the user has to go to our login page first. If you'd like
+            # to allow IdP-initiated login, set 'allow_unsolicited: true' in a
+            # 'service.sp' section:
+            #
+            #service:
+            #  sp:
+            #    allow_unsolicited: true
+
+            # The examples below are just used to generate our metadata xml, and you
+            # may well not need them, depending on your setup. Alternatively you
+            # may need a whole lot more detail - see the pysaml2 docs!
+
+            #description: ["My awesome SP", "en"]
+            #name: ["Test SP", "en"]
+
+            #organization:
+            #  name: Example com
+            #  display_name:
+            #    - ["Example co", "en"]
+            #  url: "http://example.com"
+
+            #contact_person:
+            #  - given_name: Bob
+            #    sur_name: "the Sysadmin"
+            #    email_address": ["admin@example.com"]
+            #    contact_type": technical
 
           # Instead of putting the config inline as above, you can specify a
           # separate pysaml2 configuration file:
diff --git a/synapse/config/tracer.py b/synapse/config/tracer.py
index 8be1346113..0c1a854f09 100644
--- a/synapse/config/tracer.py
+++ b/synapse/config/tracer.py
@@ -67,7 +67,7 @@ class TracerConfig(Config):
             # This is a list of regexes which are matched against the server_name of the
             # homeserver.
             #
-            # By defult, it is empty, so no servers are matched.
+            # By default, it is empty, so no servers are matched.
             #
             #homeserver_whitelist:
             #  - ".*"
diff --git a/synapse/crypto/context_factory.py b/synapse/crypto/context_factory.py
index 79668a402e..57fd426e87 100644
--- a/synapse/crypto/context_factory.py
+++ b/synapse/crypto/context_factory.py
@@ -149,7 +149,7 @@ class FederationPolicyForHTTPS:
         return SSLClientConnectionCreator(host, ssl_context, should_verify)
 
     def creatorForNetloc(self, hostname, port):
-        """Implements the IPolicyForHTTPS interace so that this can be passed
+        """Implements the IPolicyForHTTPS interface so that this can be passed
         directly to agents.
         """
         return self.get_options(hostname)
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 65df62107f..8028663fa8 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -59,7 +59,7 @@ class DictProperty:
             #
             # To exclude the KeyError from the traceback, we explicitly
             # 'raise from e1.__context__' (which is better than 'raise from None',
-            # becuase that would omit any *earlier* exceptions).
+            # because that would omit any *earlier* exceptions).
             #
             raise AttributeError(
                 "'%s' has no '%s' property" % (type(instance), self.key)
@@ -368,7 +368,7 @@ class FrozenEvent(EventBase):
         return self.__repr__()
 
     def __repr__(self):
-        return "<FrozenEvent event_id='%s', type='%s', state_key='%s'>" % (
+        return "<FrozenEvent event_id=%r, type=%r, state_key=%r>" % (
             self.get("event_id", None),
             self.get("type", None),
             self.get("state_key", None),
@@ -451,7 +451,7 @@ class FrozenEventV2(EventBase):
         return self.__repr__()
 
     def __repr__(self):
-        return "<%s event_id='%s', type='%s', state_key='%s'>" % (
+        return "<%s event_id=%r, type=%r, state_key=%r>" % (
             self.__class__.__name__,
             self.event_id,
             self.get("type", None),
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 355cbe05f1..14f7f1156f 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -180,7 +180,7 @@ def only_fields(dictionary, fields):
     in 'fields'.
 
     If there are no event fields specified then all fields are included.
-    The entries may include '.' charaters to indicate sub-fields.
+    The entries may include '.' characters to indicate sub-fields.
     So ['content.body'] will include the 'body' field of the 'content' object.
     A literal '.' character in a field name may be escaped using a '\'.
 
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index 3a6b95631e..a0933fae88 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -154,7 +154,7 @@ class Authenticator:
         )
 
         logger.debug("Request from %s", origin)
-        request.authenticated_entity = origin
+        request.requester = origin
 
         # If we get a valid signed request from the other side, its probably
         # alive
diff --git a/synapse/groups/attestations.py b/synapse/groups/attestations.py
index a86b3debc5..41cf07cc88 100644
--- a/synapse/groups/attestations.py
+++ b/synapse/groups/attestations.py
@@ -22,7 +22,7 @@ attestations have a validity period so need to be periodically renewed.
 If a user leaves (or gets kicked out of) a group, either side can still use
 their attestation to "prove" their membership, until the attestation expires.
 Therefore attestations shouldn't be relied on to prove membership in important
-cases, but can for less important situtations, e.g. showing a users membership
+cases, but can for less important situations, e.g. showing a users membership
 of groups on their profile, showing flairs, etc.
 
 An attestation is a signed blob of json that looks like:
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index e5f85b472d..0d042cbfac 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -113,7 +113,7 @@ class GroupsServerWorkerHandler:
             entry = await self.room_list_handler.generate_room_entry(
                 room_id, len(joined_users), with_alias=False, allow_private=True
             )
-            entry = dict(entry)  # so we don't change whats cached
+            entry = dict(entry)  # so we don't change what's cached
             entry.pop("room_id", None)
 
             room_entry["profile"] = entry
@@ -550,7 +550,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
                 group_id, room_id, is_public=is_public
             )
         else:
-            raise SynapseError(400, "Uknown config option")
+            raise SynapseError(400, "Unknown config option")
 
         return {}
 
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index fd4f762f33..664d09da1c 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -18,19 +18,22 @@ import email.utils
 import logging
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
-from typing import List
+from typing import TYPE_CHECKING, List
 
-from synapse.api.errors import StoreError
+from synapse.api.errors import StoreError, SynapseError
 from synapse.logging.context import make_deferred_yieldable
 from synapse.metrics.background_process_metrics import wrap_as_background_process
 from synapse.types import UserID
 from synapse.util import stringutils
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class AccountValidityHandler:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.config = hs.config
         self.store = self.hs.get_datastore()
@@ -67,7 +70,7 @@ class AccountValidityHandler:
                 self.clock.looping_call(self._send_renewal_emails, 30 * 60 * 1000)
 
     @wrap_as_background_process("send_renewals")
-    async def _send_renewal_emails(self):
+    async def _send_renewal_emails(self) -> None:
         """Gets the list of users whose account is expiring in the amount of time
         configured in the ``renew_at`` parameter from the ``account_validity``
         configuration, and sends renewal emails to all of these users as long as they
@@ -81,11 +84,25 @@ class AccountValidityHandler:
                     user_id=user["user_id"], expiration_ts=user["expiration_ts_ms"]
                 )
 
-    async def send_renewal_email_to_user(self, user_id: str):
+    async def send_renewal_email_to_user(self, user_id: str) -> None:
+        """
+        Send a renewal email for a specific user.
+
+        Args:
+            user_id: The user ID to send a renewal email for.
+
+        Raises:
+            SynapseError if the user is not set to renew.
+        """
         expiration_ts = await self.store.get_expiration_ts_for_user(user_id)
+
+        # If this user isn't set to be expired, raise an error.
+        if expiration_ts is None:
+            raise SynapseError(400, "User has no expiration time: %s" % (user_id,))
+
         await self._send_renewal_email(user_id, expiration_ts)
 
-    async def _send_renewal_email(self, user_id: str, expiration_ts: int):
+    async def _send_renewal_email(self, user_id: str, expiration_ts: int) -> None:
         """Sends out a renewal email to every email address attached to the given user
         with a unique link allowing them to renew their account.
 
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 1ce2091b46..a703944543 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -88,7 +88,7 @@ class AdminHandler(BaseHandler):
 
         # We only try and fetch events for rooms the user has been in. If
         # they've been e.g. invited to a room without joining then we handle
-        # those seperately.
+        # those separately.
         rooms_user_has_been_in = await self.store.get_rooms_user_has_been_in(user_id)
 
         for index, room in enumerate(rooms):
@@ -226,7 +226,7 @@ class ExfiltrationWriter:
         """
 
     def finished(self):
-        """Called when all data has succesfully been exported and written.
+        """Called when all data has successfully been exported and written.
 
         This functions return value is passed to the caller of
         `export_user_data`.
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 64dea23fc5..9fc8444228 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -12,9 +12,8 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 import logging
-from typing import Dict, List, Optional
+from typing import TYPE_CHECKING, Dict, List, Optional, Union
 
 from prometheus_client import Counter
 
@@ -30,17 +29,24 @@ from synapse.metrics import (
     event_processing_loop_counter,
     event_processing_loop_room_count,
 )
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import Collection, JsonDict, RoomStreamToken, UserID
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
+from synapse.storage.databases.main.directory import RoomAliasMapping
+from synapse.types import Collection, JsonDict, RoomAlias, RoomStreamToken, UserID
 from synapse.util.metrics import Measure
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 events_processed_counter = Counter("synapse_handlers_appservice_events_processed", "")
 
 
 class ApplicationServicesHandler:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
         self.is_mine_id = hs.is_mine_id
         self.appservice_api = hs.get_application_service_api()
@@ -53,7 +59,7 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
-    async def notify_interested_services(self, max_token: RoomStreamToken):
+    def notify_interested_services(self, max_token: RoomStreamToken):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
@@ -72,6 +78,12 @@ class ApplicationServicesHandler:
         if self.is_processing:
             return
 
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._notify_interested_services(max_token)
+
+    @wrap_as_background_process("notify_interested_services")
+    async def _notify_interested_services(self, max_token: RoomStreamToken):
         with Measure(self.clock, "notify_interested_services"):
             self.is_processing = True
             try:
@@ -166,8 +178,11 @@ class ApplicationServicesHandler:
             finally:
                 self.is_processing = False
 
-    async def notify_interested_services_ephemeral(
-        self, stream_key: str, new_token: Optional[int], users: Collection[UserID] = [],
+    def notify_interested_services_ephemeral(
+        self,
+        stream_key: str,
+        new_token: Optional[int],
+        users: Collection[Union[str, UserID]] = [],
     ):
         """This is called by the notifier in the background
         when a ephemeral event handled by the homeserver.
@@ -183,13 +198,34 @@ class ApplicationServicesHandler:
             new_token: The latest stream token
             users: The user(s) involved with the event.
         """
+        if not self.notify_appservices:
+            return
+
+        if stream_key not in ("typing_key", "receipt_key", "presence_key"):
+            return
+
         services = [
             service
             for service in self.store.get_app_services()
             if service.supports_ephemeral
         ]
-        if not services or not self.notify_appservices:
+        if not services:
             return
+
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._notify_interested_services_ephemeral(
+            services, stream_key, new_token, users
+        )
+
+    @wrap_as_background_process("notify_interested_services_ephemeral")
+    async def _notify_interested_services_ephemeral(
+        self,
+        services: List[ApplicationService],
+        stream_key: str,
+        new_token: Optional[int],
+        users: Collection[Union[str, UserID]],
+    ):
         logger.info("Checking interested services for %s" % (stream_key))
         with Measure(self.clock, "notify_interested_services_ephemeral"):
             for service in services:
@@ -214,7 +250,9 @@ class ApplicationServicesHandler:
                         service, "presence", new_token
                     )
 
-    async def _handle_typing(self, service: ApplicationService, new_token: int):
+    async def _handle_typing(
+        self, service: ApplicationService, new_token: int
+    ) -> List[JsonDict]:
         typing_source = self.event_sources.sources["typing"]
         # Get the typing events from just before current
         typing, _ = await typing_source.get_new_events_as(
@@ -226,7 +264,7 @@ class ApplicationServicesHandler:
         )
         return typing
 
-    async def _handle_receipts(self, service: ApplicationService):
+    async def _handle_receipts(self, service: ApplicationService) -> List[JsonDict]:
         from_key = await self.store.get_type_stream_id_for_appservice(
             service, "read_receipt"
         )
@@ -237,7 +275,7 @@ class ApplicationServicesHandler:
         return receipts
 
     async def _handle_presence(
-        self, service: ApplicationService, users: Collection[UserID]
+        self, service: ApplicationService, users: Collection[Union[str, UserID]]
     ) -> List[JsonDict]:
         events = []  # type: List[JsonDict]
         presence_source = self.event_sources.sources["presence"]
@@ -245,6 +283,9 @@ class ApplicationServicesHandler:
             service, "presence"
         )
         for user in users:
+            if isinstance(user, str):
+                user = UserID.from_string(user)
+
             interested = await service.is_interested_in_presence(user, self.store)
             if not interested:
                 continue
@@ -265,11 +306,11 @@ class ApplicationServicesHandler:
 
         return events
 
-    async def query_user_exists(self, user_id):
+    async def query_user_exists(self, user_id: str) -> bool:
         """Check if any application service knows this user_id exists.
 
         Args:
-            user_id(str): The user to query if they exist on any AS.
+            user_id: The user to query if they exist on any AS.
         Returns:
             True if this user exists on at least one application service.
         """
@@ -280,11 +321,13 @@ class ApplicationServicesHandler:
                 return True
         return False
 
-    async def query_room_alias_exists(self, room_alias):
+    async def query_room_alias_exists(
+        self, room_alias: RoomAlias
+    ) -> Optional[RoomAliasMapping]:
         """Check if an application service knows this room alias exists.
 
         Args:
-            room_alias(RoomAlias): The room alias to query.
+            room_alias: The room alias to query.
         Returns:
             namedtuple: with keys "room_id" and "servers" or None if no
             association can be found.
@@ -300,10 +343,13 @@ class ApplicationServicesHandler:
             )
             if is_known_alias:
                 # the alias exists now so don't query more ASes.
-                result = await self.store.get_association_from_room_alias(room_alias)
-                return result
+                return await self.store.get_association_from_room_alias(room_alias)
 
-    async def query_3pe(self, kind, protocol, fields):
+        return None
+
+    async def query_3pe(
+        self, kind: str, protocol: str, fields: Dict[bytes, List[bytes]]
+    ) -> List[JsonDict]:
         services = self._get_services_for_3pn(protocol)
 
         results = await make_deferred_yieldable(
@@ -325,7 +371,9 @@ class ApplicationServicesHandler:
 
         return ret
 
-    async def get_3pe_protocols(self, only_protocol=None):
+    async def get_3pe_protocols(
+        self, only_protocol: Optional[str] = None
+    ) -> Dict[str, JsonDict]:
         services = self.store.get_app_services()
         protocols = {}  # type: Dict[str, List[JsonDict]]
 
@@ -343,7 +391,7 @@ class ApplicationServicesHandler:
                 if info is not None:
                     protocols[p].append(info)
 
-        def _merge_instances(infos):
+        def _merge_instances(infos: List[JsonDict]) -> JsonDict:
             if not infos:
                 return {}
 
@@ -358,19 +406,17 @@ class ApplicationServicesHandler:
 
             return combined
 
-        for p in protocols.keys():
-            protocols[p] = _merge_instances(protocols[p])
-
-        return protocols
+        return {p: _merge_instances(protocols[p]) for p in protocols.keys()}
 
-    async def _get_services_for_event(self, event):
+    async def _get_services_for_event(
+        self, event: EventBase
+    ) -> List[ApplicationService]:
         """Retrieve a list of application services interested in this event.
 
         Args:
-            event(Event): The event to check. Can be None if alias_list is not.
+            event: The event to check. Can be None if alias_list is not.
         Returns:
-            list<ApplicationService>: A list of services interested in this
-            event based on the service regex.
+            A list of services interested in this event based on the service regex.
         """
         services = self.store.get_app_services()
 
@@ -384,17 +430,15 @@ class ApplicationServicesHandler:
 
         return interested_list
 
-    def _get_services_for_user(self, user_id):
+    def _get_services_for_user(self, user_id: str) -> List[ApplicationService]:
         services = self.store.get_app_services()
-        interested_list = [s for s in services if (s.is_interested_in_user(user_id))]
-        return interested_list
+        return [s for s in services if (s.is_interested_in_user(user_id))]
 
-    def _get_services_for_3pn(self, protocol):
+    def _get_services_for_3pn(self, protocol: str) -> List[ApplicationService]:
         services = self.store.get_app_services()
-        interested_list = [s for s in services if s.is_interested_in_protocol(protocol)]
-        return interested_list
+        return [s for s in services if s.is_interested_in_protocol(protocol)]
 
-    async def _is_unknown_user(self, user_id):
+    async def _is_unknown_user(self, user_id: str) -> bool:
         if not self.is_mine_id(user_id):
             # we don't know if they are unknown or not since it isn't one of our
             # users. We can't poke ASes.
@@ -409,9 +453,8 @@ class ApplicationServicesHandler:
         service_list = [s for s in services if s.sender == user_id]
         return len(service_list) == 0
 
-    async def _check_user_exists(self, user_id):
+    async def _check_user_exists(self, user_id: str) -> bool:
         unknown_user = await self._is_unknown_user(user_id)
         if unknown_user:
-            exists = await self.query_user_exists(user_id)
-            return exists
+            return await self.query_user_exists(user_id)
         return True
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 8619fbb982..ff103cbb92 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -18,10 +18,20 @@ import logging
 import time
 import unicodedata
 import urllib.parse
-from typing import Any, Callable, Dict, Iterable, List, Optional, Tuple, Union
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Callable,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+    Union,
+)
 
 import attr
-import bcrypt  # type: ignore[import]
+import bcrypt
 import pymacaroons
 
 from synapse.api.constants import LoginType
@@ -49,6 +59,9 @@ from synapse.util.threepids import canonicalise_email
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -149,11 +162,7 @@ class SsoLoginExtraAttributes:
 class AuthHandler(BaseHandler):
     SESSION_EXPIRE_MS = 48 * 60 * 60 * 1000
 
-    def __init__(self, hs):
-        """
-        Args:
-            hs (synapse.server.HomeServer):
-        """
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.checkers = {}  # type: Dict[str, UserInteractiveAuthChecker]
@@ -470,9 +479,7 @@ class AuthHandler(BaseHandler):
             # authentication flow.
             await self.store.set_ui_auth_clientdict(sid, clientdict)
 
-        user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[
-            0
-        ].decode("ascii", "surrogateescape")
+        user_agent = request.get_user_agent("")
 
         await self.store.add_user_agent_ip_to_ui_auth_session(
             session.session_id, user_agent, clientip
@@ -692,7 +699,7 @@ class AuthHandler(BaseHandler):
         Creates a new access token for the user with the given user ID.
 
         The user is assumed to have been authenticated by some other
-        machanism (e.g. CAS), and the user_id converted to the canonical case.
+        mechanism (e.g. CAS), and the user_id converted to the canonical case.
 
         The device will be recorded in the table if it is not there already.
 
@@ -984,17 +991,17 @@ class AuthHandler(BaseHandler):
                 # This might return an awaitable, if it does block the log out
                 # until it completes.
                 result = provider.on_logged_out(
-                    user_id=str(user_info["user"]),
-                    device_id=user_info["device_id"],
+                    user_id=user_info.user_id,
+                    device_id=user_info.device_id,
                     access_token=access_token,
                 )
                 if inspect.isawaitable(result):
                     await result
 
         # delete pushers associated with this access token
-        if user_info["token_id"] is not None:
+        if user_info.token_id is not None:
             await self.hs.get_pusherpool().remove_pushers_by_access_token(
-                str(user_info["user"]), (user_info["token_id"],)
+                user_info.user_id, (user_info.token_id,)
             )
 
     async def delete_access_tokens_for_user(
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index a4cc4b9a5a..048a3b3c0b 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -212,9 +212,7 @@ class CasHandler:
         else:
             if not registered_user_id:
                 # Pull out the user-agent and IP from the request.
-                user_agent = request.requestHeaders.getRawHeaders(
-                    b"User-Agent", default=[b""]
-                )[0].decode("ascii", "surrogateescape")
+                user_agent = request.get_user_agent("")
                 ip_address = self.hs.get_ip_from_request(request)
 
                 registered_user_id = await self._registration_handler.register_user(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 611742ae72..929752150d 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -129,6 +129,11 @@ class E2eKeysHandler:
                 if user_id in local_query:
                     results[user_id] = keys
 
+        # Get cached cross-signing keys
+        cross_signing_keys = await self.get_cross_signing_keys_from_cache(
+            device_keys_query, from_user_id
+        )
+
         # Now attempt to get any remote devices from our local cache.
         remote_queries_not_in_cache = {}
         if remote_queries:
@@ -155,16 +160,28 @@ class E2eKeysHandler:
                             unsigned["device_display_name"] = device_display_name
                         user_devices[device_id] = result
 
+            # check for missing cross-signing keys.
+            for user_id in remote_queries.keys():
+                cached_cross_master = user_id in cross_signing_keys["master_keys"]
+                cached_cross_selfsigning = (
+                    user_id in cross_signing_keys["self_signing_keys"]
+                )
+
+                # check if we are missing only one of cross-signing master or
+                # self-signing key, but the other one is cached.
+                # as we need both, this will issue a federation request.
+                # if we don't have any of the keys, either the user doesn't have
+                # cross-signing set up, or the cached device list
+                # is not (yet) updated.
+                if cached_cross_master ^ cached_cross_selfsigning:
+                    user_ids_not_in_cache.add(user_id)
+
+            # add those users to the list to fetch over federation.
             for user_id in user_ids_not_in_cache:
                 domain = get_domain_from_id(user_id)
                 r = remote_queries_not_in_cache.setdefault(domain, {})
                 r[user_id] = remote_queries[user_id]
 
-        # Get cached cross-signing keys
-        cross_signing_keys = await self.get_cross_signing_keys_from_cache(
-            device_keys_query, from_user_id
-        )
-
         # Now fetch any devices that we don't have in our cache
         @trace
         async def do_remote_query(destination):
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index fde8f00531..c386957706 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -112,7 +112,7 @@ class FederationHandler(BaseHandler):
     """Handles events that originated from federation.
         Responsible for:
         a) handling received Pdus before handing them on as Events to the rest
-        of the homeserver (including auth and state conflict resoultion)
+        of the homeserver (including auth and state conflict resolutions)
         b) converting events that were produced by local clients that may need
         to be sent to remote homeservers.
         c) doing the necessary dances to invite remote users and join remote
@@ -477,7 +477,7 @@ class FederationHandler(BaseHandler):
         # ----
         #
         # Update richvdh 2018/09/18: There are a number of problems with timing this
-        # request out agressively on the client side:
+        # request out aggressively on the client side:
         #
         # - it plays badly with the server-side rate-limiter, which starts tarpitting you
         #   if you send too many requests at once, so you end up with the server carefully
@@ -495,13 +495,13 @@ class FederationHandler(BaseHandler):
         #   we'll end up back here for the *next* PDU in the list, which exacerbates the
         #   problem.
         #
-        # - the agressive 10s timeout was introduced to deal with incoming federation
+        # - the aggressive 10s timeout was introduced to deal with incoming federation
         #   requests taking 8 hours to process. It's not entirely clear why that was going
         #   on; certainly there were other issues causing traffic storms which are now
         #   resolved, and I think in any case we may be more sensible about our locking
         #   now. We're *certainly* more sensible about our logging.
         #
-        # All that said: Let's try increasing the timout to 60s and see what happens.
+        # All that said: Let's try increasing the timeout to 60s and see what happens.
 
         try:
             missing_events = await self.federation_client.get_missing_events(
@@ -1120,7 +1120,7 @@ class FederationHandler(BaseHandler):
                     logger.info(str(e))
                     continue
                 except RequestSendFailed as e:
-                    logger.info("Falied to get backfill from %s because %s", dom, e)
+                    logger.info("Failed to get backfill from %s because %s", dom, e)
                     continue
                 except FederationDeniedError as e:
                     logger.info(e)
@@ -1545,7 +1545,7 @@ class FederationHandler(BaseHandler):
         #
         # The reasons we have the destination server rather than the origin
         # server send it are slightly mysterious: the origin server should have
-        # all the neccessary state once it gets the response to the send_join,
+        # all the necessary state once it gets the response to the send_join,
         # so it could send the event itself if it wanted to. It may be that
         # doing it this way reduces failure modes, or avoids certain attacks
         # where a new server selectively tells a subset of the federation that
@@ -1649,7 +1649,7 @@ class FederationHandler(BaseHandler):
         event.internal_metadata.outlier = True
         event.internal_metadata.out_of_band_membership = True
 
-        # Try the host that we succesfully called /make_leave/ on first for
+        # Try the host that we successfully called /make_leave/ on first for
         # the /send_leave/ request.
         host_list = list(target_hosts)
         try:
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index 9684e60fc8..abd8d2af44 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -17,7 +17,7 @@
 import logging
 
 from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
-from synapse.types import get_domain_from_id
+from synapse.types import GroupID, get_domain_from_id
 
 logger = logging.getLogger(__name__)
 
@@ -28,6 +28,9 @@ def _create_rerouter(func_name):
     """
 
     async def f(self, group_id, *args, **kwargs):
+        if not GroupID.is_valid(group_id):
+            raise SynapseError(400, "%s was not legal group ID" % (group_id,))
+
         if self.is_mine_id(group_id):
             return await getattr(self.groups_server_handler, func_name)(
                 group_id, *args, **kwargs
@@ -346,7 +349,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
                 server_name=get_domain_from_id(group_id),
             )
 
-        # TODO: Check that the group is public and we're being added publically
+        # TODO: Check that the group is public and we're being added publicly
         is_publicised = content.get("publicise", False)
 
         token = await self.store.register_user_group_membership(
@@ -391,7 +394,7 @@ class GroupsLocalHandler(GroupsLocalWorkerHandler):
                 server_name=get_domain_from_id(group_id),
             )
 
-        # TODO: Check that the group is public and we're being added publically
+        # TODO: Check that the group is public and we're being added publicly
         is_publicised = content.get("publicise", False)
 
         token = await self.store.register_user_group_membership(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index fb0a04e9a7..ca5602c13e 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -656,7 +656,7 @@ class EventCreationHandler:
             context: The event context.
 
         Returns:
-            The previous verion of the event is returned, if it is found in the
+            The previous version of the event is returned, if it is found in the
             event context. Otherwise, None is returned.
         """
         prev_state_ids = await context.get_prev_state_ids()
@@ -1099,34 +1099,13 @@ class EventCreationHandler:
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.INVITE:
-
-                def is_inviter_member_event(e):
-                    return e.type == EventTypes.Member and e.sender == event.sender
-
-                current_state_ids = await context.get_current_state_ids()
-
-                # We know this event is not an outlier, so this must be
-                # non-None.
-                assert current_state_ids is not None
-
-                state_to_include_ids = [
-                    e_id
-                    for k, e_id in current_state_ids.items()
-                    if k[0] in self.room_invite_state_types
-                    or k == (EventTypes.Member, event.sender)
-                ]
-
-                state_to_include = await self.store.get_events(state_to_include_ids)
-
-                event.unsigned["invite_room_state"] = [
-                    {
-                        "type": e.type,
-                        "state_key": e.state_key,
-                        "content": e.content,
-                        "sender": e.sender,
-                    }
-                    for e in state_to_include.values()
-                ]
+                event.unsigned[
+                    "invite_room_state"
+                ] = await self.store.get_stripped_room_state_from_event_context(
+                    context,
+                    self.room_invite_state_types,
+                    membership_user_id=event.sender,
+                )
 
                 invitee = UserID.from_string(event.state_key)
                 if not self.hs.is_mine(invitee):
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 05ac86e697..331d4e7e96 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -217,7 +217,7 @@ class OidcHandler:
 
         This is based on the requested scopes: if the scopes include
         ``openid``, the provider should give use an ID token containing the
-        user informations. If not, we should fetch them using the
+        user information. If not, we should fetch them using the
         ``access_token`` with the ``userinfo_endpoint``.
         """
 
@@ -426,7 +426,7 @@ class OidcHandler:
         return resp
 
     async def _fetch_userinfo(self, token: Token) -> UserInfo:
-        """Fetch user informations from the ``userinfo_endpoint``.
+        """Fetch user information from the ``userinfo_endpoint``.
 
         Args:
             token: the token given by the ``token_endpoint``.
@@ -695,9 +695,7 @@ class OidcHandler:
                 return
 
         # Pull out the user-agent and IP from the request.
-        user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[
-            0
-        ].decode("ascii", "surrogateescape")
+        user_agent = request.get_user_agent("")
         ip_address = self.hs.get_ip_from_request(request)
 
         # Call the mapper to register/login the user
@@ -756,7 +754,7 @@ class OidcHandler:
                 Defaults to an hour.
 
         Returns:
-            A signed macaroon token with the session informations.
+            A signed macaroon token with the session information.
         """
         macaroon = pymacaroons.Macaroon(
             location=self._server_name, identifier="key", key=self._macaroon_secret_key,
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 1000ac95ff..8e014c9bb5 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -48,7 +48,7 @@ from synapse.util.wheel_timer import WheelTimer
 
 MYPY = False
 if MYPY:
-    import synapse.server
+    from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
 
@@ -101,7 +101,7 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 class BasePresenceHandler(abc.ABC):
     """Parts of the PresenceHandler that are shared between workers and master"""
 
-    def __init__(self, hs: "synapse.server.HomeServer"):
+    def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
 
@@ -199,7 +199,7 @@ class BasePresenceHandler(abc.ABC):
 
 
 class PresenceHandler(BasePresenceHandler):
-    def __init__(self, hs: "synapse.server.HomeServer"):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.hs = hs
         self.is_mine_id = hs.is_mine_id
@@ -802,7 +802,7 @@ class PresenceHandler(BasePresenceHandler):
             between the requested tokens due to the limit.
 
             The token returned can be used in a subsequent call to this
-            function to get further updatees.
+            function to get further updates.
 
             The updates are a list of 2-tuples of stream ID and the row data
         """
@@ -977,7 +977,7 @@ def should_notify(old_state, new_state):
             new_state.last_active_ts - old_state.last_active_ts
             > LAST_ACTIVE_GRANULARITY
         ):
-            # Only notify about last active bumps if we're not currently acive
+            # Only notify about last active bumps if we're not currently active
             if not new_state.currently_active:
                 notify_reason_counter.labels("last_active_change_online").inc()
                 return True
@@ -1011,7 +1011,7 @@ def format_user_presence_state(state, now, include_user_id=True):
 
 
 class PresenceEventSource:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         # We can't call get_presence_handler here because there's a cycle:
         #
         #   Presence -> Notifier -> PresenceEventSource -> Presence
@@ -1071,12 +1071,14 @@ class PresenceEventSource:
 
             users_interested_in = await self._get_interested_in(user, explicit_room_id)
 
-            user_ids_changed = set()
+            user_ids_changed = set()  # type: Collection[str]
             changed = None
             if from_key:
                 changed = stream_change_cache.get_all_entities_changed(from_key)
 
             if changed is not None and len(changed) < 500:
+                assert isinstance(user_ids_changed, set)
+
                 # For small deltas, its quicker to get all changes and then
                 # work out if we share a room or they're in our presence list
                 get_updates_counter.labels("stream").inc()
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 92700b589c..14348faaf3 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -98,11 +98,18 @@ class ProfileHandler(BaseHandler):
             except RequestSendFailed as e:
                 raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
+                if e.code < 500 and e.code != 404:
+                    # Other codes are not allowed in c2s API
+                    logger.info(
+                        "Server replied with wrong response: %s %s", e.code, e.msg
+                    )
+
+                    raise SynapseError(502, "Failed to fetch profile")
                 raise e.to_synapse_error()
 
     async def get_profile_from_cache(self, user_id: str) -> JsonDict:
         """Get the profile information from our local cache. If the user is
-        ours then the profile information will always be corect. Otherwise,
+        ours then the profile information will always be correct. Otherwise,
         it may be out of date/missing.
         """
         target_user = UserID.from_string(user_id)
@@ -124,7 +131,7 @@ class ProfileHandler(BaseHandler):
             profile = await self.store.get_from_remote_profile_cache(user_id)
             return profile or {}
 
-    async def get_displayname(self, target_user: UserID) -> str:
+    async def get_displayname(self, target_user: UserID) -> Optional[str]:
         if self.hs.is_mine(target_user):
             try:
                 displayname = await self.store.get_profile_displayname(
@@ -211,7 +218,7 @@ class ProfileHandler(BaseHandler):
 
         await self._update_join_states(requester, target_user)
 
-    async def get_avatar_url(self, target_user: UserID) -> str:
+    async def get_avatar_url(self, target_user: UserID) -> Optional[str]:
         if self.hs.is_mine(target_user):
             try:
                 avatar_url = await self.store.get_profile_avatar_url(
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index a6f1d21674..ed1ff62599 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -115,7 +115,10 @@ class RegistrationHandler(BaseHandler):
                     400, "User ID already taken.", errcode=Codes.USER_IN_USE
                 )
             user_data = await self.auth.get_user_by_access_token(guest_access_token)
-            if not user_data["is_guest"] or user_data["user"].localpart != localpart:
+            if (
+                not user_data.is_guest
+                or UserID.from_string(user_data.user_id).localpart != localpart
+            ):
                 raise AuthError(
                     403,
                     "Cannot register taken user ID without valid guest "
@@ -741,7 +744,7 @@ class RegistrationHandler(BaseHandler):
             # up when the access token is saved, but that's quite an
             # invasive change I'd rather do separately.
             user_tuple = await self.store.get_user_by_access_token(token)
-            token_id = user_tuple["token_id"]
+            token_id = user_tuple.token_id
 
             await self.pusher_pool.add_pusher(
                 user_id=user_id,
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index ec300d8877..e73031475f 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -771,22 +771,29 @@ class RoomCreationHandler(BaseHandler):
                 ratelimit=False,
             )
 
-        for invitee in invite_list:
+        # we avoid dropping the lock between invites, as otherwise joins can
+        # start coming in and making the createRoom slow.
+        #
+        # we also don't need to check the requester's shadow-ban here, as we
+        # have already done so above (and potentially emptied invite_list).
+        with (await self.room_member_handler.member_linearizer.queue((room_id,))):
             content = {}
             is_direct = config.get("is_direct", None)
             if is_direct:
                 content["is_direct"] = is_direct
 
-            # Note that update_membership with an action of "invite" can raise a
-            # ShadowBanError, but this was handled above by emptying invite_list.
-            _, last_stream_id = await self.room_member_handler.update_membership(
-                requester,
-                UserID.from_string(invitee),
-                room_id,
-                "invite",
-                ratelimit=False,
-                content=content,
-            )
+            for invitee in invite_list:
+                (
+                    _,
+                    last_stream_id,
+                ) = await self.room_member_handler.update_membership_locked(
+                    requester,
+                    UserID.from_string(invitee),
+                    room_id,
+                    "invite",
+                    ratelimit=False,
+                    content=content,
+                )
 
         for invite_3pid in invite_3pid_list:
             id_server = invite_3pid["id_server"]
@@ -1268,7 +1275,7 @@ class RoomShutdownHandler:
             )
 
             # We now wait for the create room to come back in via replication so
-            # that we can assume that all the joins/invites have propogated before
+            # that we can assume that all the joins/invites have propagated before
             # we try and auto join below.
             await self._replication.wait_for_stream_position(
                 self.hs.config.worker.events_shard_config.get_instance(new_room_id),
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ec784030e9..7cd858b7db 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -307,7 +307,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         key = (room_id,)
 
         with (await self.member_linearizer.queue(key)):
-            result = await self._update_membership(
+            result = await self.update_membership_locked(
                 requester,
                 target,
                 room_id,
@@ -322,7 +322,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         return result
 
-    async def _update_membership(
+    async def update_membership_locked(
         self,
         requester: Requester,
         target: UserID,
@@ -335,6 +335,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         content: Optional[dict] = None,
         require_consent: bool = True,
     ) -> Tuple[str, int]:
+        """Helper for update_membership.
+
+        Assumes that the membership linearizer is already held for the room.
+        """
         content_specified = bool(content)
         if content is None:
             content = {}
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml_handler.py
index 285c481a96..fd6c5e9ea8 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml_handler.py
@@ -216,9 +216,7 @@ class SamlHandler:
                 return
 
         # Pull out the user-agent and IP from the request.
-        user_agent = request.requestHeaders.getRawHeaders(b"User-Agent", default=[b""])[
-            0
-        ].decode("ascii", "surrogateescape")
+        user_agent = request.get_user_agent("")
         ip_address = self.hs.get_ip_from_request(request)
 
         # Call the mapper to register/login the user
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index e9402e6e2e..66f1bbcfc4 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -139,7 +139,7 @@ class SearchHandler(BaseHandler):
             # Filter to apply to results
             filter_dict = room_cat.get("filter", {})
 
-            # What to order results by (impacts whether pagination can be doen)
+            # What to order results by (impacts whether pagination can be done)
             order_by = room_cat.get("order_by", "rank")
 
             # Return the current state of the rooms?
diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py
index 7a4ae0727a..fb4f70e8e2 100644
--- a/synapse/handlers/state_deltas.py
+++ b/synapse/handlers/state_deltas.py
@@ -32,7 +32,7 @@ class StateDeltasHandler:
         Returns:
             None if the field in the events either both match `public_value`
             or if neither do, i.e. there has been no change.
-            True if it didnt match `public_value` but now does
+            True if it didn't match `public_value` but now does
             False if it did match `public_value` but now doesn't
         """
         prev_event = None
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index b527724bc4..32e53c2d25 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -754,7 +754,7 @@ class SyncHandler:
         """
         # TODO(mjark) Check if the state events were received by the server
         # after the previous sync, since we need to include those state
-        # updates even if they occured logically before the previous event.
+        # updates even if they occurred logically before the previous event.
         # TODO(mjark) Check for new redactions in the state events.
 
         with Measure(self.clock, "compute_state_delta"):
@@ -1882,7 +1882,7 @@ class SyncHandler:
         # members (as the client otherwise doesn't have enough info to form
         # the name itself).
         if sync_config.filter_collection.lazy_load_members() and (
-            # we recalulate the summary:
+            # we recalculate the summary:
             #   if there are membership changes in the timeline, or
             #   if membership has changed during a gappy sync, or
             #   if this is an initial sync.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index d3692842e3..e919a8f9ed 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -167,20 +167,25 @@ class FollowerTypingHandler:
             now_typing = set(row.user_ids)
             self._room_typing[row.room_id] = row.user_ids
 
-            run_as_background_process(
-                "_handle_change_in_typing",
-                self._handle_change_in_typing,
-                row.room_id,
-                prev_typing,
-                now_typing,
-            )
+            if self.federation:
+                run_as_background_process(
+                    "_send_changes_in_typing_to_remotes",
+                    self._send_changes_in_typing_to_remotes,
+                    row.room_id,
+                    prev_typing,
+                    now_typing,
+                )
 
-    async def _handle_change_in_typing(
+    async def _send_changes_in_typing_to_remotes(
         self, room_id: str, prev_typing: Set[str], now_typing: Set[str]
     ):
         """Process a change in typing of a room from replication, sending EDUs
         for any local users.
         """
+
+        if not self.federation:
+            return
+
         for user_id in now_typing - prev_typing:
             if self.is_mine_id(user_id):
                 await self._push_remote(RoomMember(room_id, user_id), True)
@@ -371,7 +376,7 @@ class TypingWriterHandler(FollowerTypingHandler):
             between the requested tokens due to the limit.
 
             The token returned can be used in a subsequent call to this
-            function to get further updatees.
+            function to get further updates.
 
             The updates are a list of 2-tuples of stream ID and the row data
         """
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 79393c8829..afbebfc200 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -31,7 +31,7 @@ class UserDirectoryHandler(StateDeltasHandler):
     N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
 
     The user directory is filled with users who this server can see are joined to a
-    world_readable or publically joinable room. We keep a database table up to date
+    world_readable or publicly joinable room. We keep a database table up to date
     by streaming changes of the current state and recalculating whether users should
     be in the directory or not when necessary.
     """
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 8324632cb6..f409368802 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -359,7 +359,7 @@ class SimpleHttpClient:
                     agent=self.agent,
                     data=body_producer,
                     headers=headers,
-                    **self._extra_treq_args
+                    **self._extra_treq_args,
                 )  # type: defer.Deferred
 
                 # we use our own timeout mechanism rather than treq's as a workaround
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index a306faa267..1cc666fbf6 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -172,7 +172,7 @@ class WellKnownResolver:
         had_valid_well_known = self._had_valid_well_known_cache.get(server_name, False)
 
         # We do this in two steps to differentiate between possibly transient
-        # errors (e.g. can't connect to host, 503 response) and more permenant
+        # errors (e.g. can't connect to host, 503 response) and more permanent
         # errors (such as getting a 404 response).
         response, body = await self._make_well_known_request(
             server_name, retry=had_valid_well_known
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index c23a4d7c0c..04766ca965 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -587,7 +587,7 @@ class MatrixFederationHttpClient:
         """
         Builds the Authorization headers for a federation request
         Args:
-            destination (bytes|None): The desination homeserver of the request.
+            destination (bytes|None): The destination homeserver of the request.
                 May be None if the destination is an identity server, in which case
                 destination_is must be non-None.
             method (bytes): The HTTP method of the request
@@ -640,7 +640,7 @@ class MatrixFederationHttpClient:
         backoff_on_404=False,
         try_trailing_slash_on_400=False,
     ):
-        """ Sends the specifed json data using PUT
+        """ Sends the specified json data using PUT
 
         Args:
             destination (str): The remote server to send the HTTP request
@@ -729,7 +729,7 @@ class MatrixFederationHttpClient:
         ignore_backoff=False,
         args={},
     ):
-        """ Sends the specifed json data using POST
+        """ Sends the specified json data using POST
 
         Args:
             destination (str): The remote server to send the HTTP request
diff --git a/synapse/http/request_metrics.py b/synapse/http/request_metrics.py
index cd94e789e8..7c5defec82 100644
--- a/synapse/http/request_metrics.py
+++ b/synapse/http/request_metrics.py
@@ -109,7 +109,7 @@ in_flight_requests_db_sched_duration = Counter(
 # The set of all in flight requests, set[RequestMetrics]
 _in_flight_requests = set()
 
-# Protects the _in_flight_requests set from concurrent accesss
+# Protects the _in_flight_requests set from concurrent access
 _in_flight_requests_lock = threading.Lock()
 
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index d8e354f0a9..c0919f8cb7 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -180,7 +180,7 @@ class HttpServer:
         """ Register a callback that gets fired if we receive a http request
         with the given method for a path that matches the given regex.
 
-        If the regex contains groups these gets passed to the calback via
+        If the regex contains groups these gets passed to the callback via
         an unpacked tuple.
 
         Args:
@@ -239,7 +239,7 @@ class _AsyncResource(resource.Resource, metaclass=abc.ABCMeta):
 
     async def _async_render(self, request: Request):
         """Delegates to `_async_render_<METHOD>` methods, or returns a 400 if
-        no appropriate method exists. Can be overriden in sub classes for
+        no appropriate method exists. Can be overridden in sub classes for
         different routing.
         """
         # Treat HEAD requests as GET requests.
@@ -384,7 +384,7 @@ class JsonResource(DirectServeJsonResource):
     async def _async_render(self, request):
         callback, servlet_classname, group_dict = self._get_handler_for_request(request)
 
-        # Make sure we have an appopriate name for this handler in prometheus
+        # Make sure we have an appropriate name for this handler in prometheus
         # (rather than the default of JsonResource).
         request.request_metrics.name = servlet_classname
 
diff --git a/synapse/http/servlet.py b/synapse/http/servlet.py
index fd90ba7828..b361b7cbaf 100644
--- a/synapse/http/servlet.py
+++ b/synapse/http/servlet.py
@@ -272,7 +272,6 @@ class RestServlet:
       on_PUT
       on_POST
       on_DELETE
-      on_OPTIONS
 
     Automatically handles turning CodeMessageExceptions thrown by these methods
     into the appropriate HTTP response.
@@ -283,7 +282,7 @@ class RestServlet:
         if hasattr(self, "PATTERNS"):
             patterns = self.PATTERNS
 
-            for method in ("GET", "PUT", "POST", "OPTIONS", "DELETE"):
+            for method in ("GET", "PUT", "POST", "DELETE"):
                 if hasattr(self, "on_%s" % (method,)):
                     servlet_classname = self.__class__.__name__
                     method_handler = getattr(self, "on_%s" % (method,))
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 6e79b47828..5f0581dc3f 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,7 +14,7 @@
 import contextlib
 import logging
 import time
-from typing import Optional
+from typing import Optional, Union
 
 from twisted.python.failure import Failure
 from twisted.web.server import Request, Site
@@ -23,6 +23,7 @@ from synapse.config.server import ListenerConfig
 from synapse.http import redact_uri
 from synapse.http.request_metrics import RequestMetrics, requests_counter
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
+from synapse.types import Requester
 
 logger = logging.getLogger(__name__)
 
@@ -54,9 +55,12 @@ class SynapseRequest(Request):
         Request.__init__(self, channel, *args, **kw)
         self.site = channel.site
         self._channel = channel  # this is used by the tests
-        self.authenticated_entity = None
         self.start_time = 0.0
 
+        # The requester, if authenticated. For federation requests this is the
+        # server name, for client requests this is the Requester object.
+        self.requester = None  # type: Optional[Union[Requester, str]]
+
         # we can't yet create the logcontext, as we don't know the method.
         self.logcontext = None  # type: Optional[LoggingContext]
 
@@ -109,8 +113,14 @@ class SynapseRequest(Request):
             method = self.method.decode("ascii")
         return method
 
-    def get_user_agent(self):
-        return self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
+    def get_user_agent(self, default: str) -> str:
+        """Return the last User-Agent header, or the given default.
+        """
+        user_agent = self.requestHeaders.getRawHeaders(b"User-Agent", [None])[-1]
+        if user_agent is None:
+            return default
+
+        return user_agent.decode("ascii", "replace")
 
     def render(self, resrc):
         # this is called once a Resource has been found to serve the request; in our
@@ -161,7 +171,9 @@ class SynapseRequest(Request):
             yield
         except Exception:
             # this should already have been caught, and sent back to the client as a 500.
-            logger.exception("Asynchronous messge handler raised an uncaught exception")
+            logger.exception(
+                "Asynchronous message handler raised an uncaught exception"
+            )
         finally:
             # the request handler has finished its work and either sent the whole response
             # back, or handed over responsibility to a Producer.
@@ -263,22 +275,30 @@ class SynapseRequest(Request):
         # to the client (nb may be negative)
         response_send_time = self.finish_time - self._processing_finished_time
 
-        # need to decode as it could be raw utf-8 bytes
-        # from a IDN servname in an auth header
-        authenticated_entity = self.authenticated_entity
-        if authenticated_entity is not None and isinstance(authenticated_entity, bytes):
-            authenticated_entity = authenticated_entity.decode("utf-8", "replace")
+        # Convert the requester into a string that we can log
+        authenticated_entity = None
+        if isinstance(self.requester, str):
+            authenticated_entity = self.requester
+        elif isinstance(self.requester, Requester):
+            authenticated_entity = self.requester.authenticated_entity
+
+            # If this is a request where the target user doesn't match the user who
+            # authenticated (e.g. and admin is puppetting a user) then we log both.
+            if self.requester.user.to_string() != authenticated_entity:
+                authenticated_entity = "{},{}".format(
+                    authenticated_entity, self.requester.user.to_string(),
+                )
+        elif self.requester is not None:
+            # This shouldn't happen, but we log it so we don't lose information
+            # and can see that we're doing something wrong.
+            authenticated_entity = repr(self.requester)  # type: ignore[unreachable]
 
         # ...or could be raw utf-8 bytes in the User-Agent header.
         # N.B. if you don't do this, the logger explodes cryptically
         # with maximum recursion trying to log errors about
         # the charset problem.
         # c.f. https://github.com/matrix-org/synapse/issues/3471
-        user_agent = self.get_user_agent()
-        if user_agent is not None:
-            user_agent = user_agent.decode("utf-8", "replace")
-        else:
-            user_agent = "-"
+        user_agent = self.get_user_agent("-")
 
         code = str(self.code)
         if not self.finished:
diff --git a/synapse/logging/__init__.py b/synapse/logging/__init__.py
index e69de29bb2..b28b7b2ef7 100644
--- a/synapse/logging/__init__.py
+++ b/synapse/logging/__init__.py
@@ -0,0 +1,20 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# These are imported to allow for nicer logging configuration files.
+from synapse.logging._remote import RemoteHandler
+from synapse.logging._terse_json import JsonFormatter, TerseJsonFormatter
+
+__all__ = ["RemoteHandler", "JsonFormatter", "TerseJsonFormatter"]
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 0caf325916..fb937b3f28 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -13,6 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
 import sys
 import traceback
 from collections import deque
@@ -21,10 +22,11 @@ from math import floor
 from typing import Callable, Optional
 
 import attr
+from typing_extensions import Deque
 from zope.interface import implementer
 
 from twisted.application.internet import ClientService
-from twisted.internet.defer import Deferred
+from twisted.internet.defer import CancelledError, Deferred
 from twisted.internet.endpoints import (
     HostnameEndpoint,
     TCP4ClientEndpoint,
@@ -32,7 +34,9 @@ from twisted.internet.endpoints import (
 )
 from twisted.internet.interfaces import IPushProducer, ITransport
 from twisted.internet.protocol import Factory, Protocol
-from twisted.logger import ILogObserver, Logger, LogLevel
+from twisted.python.failure import Failure
+
+logger = logging.getLogger(__name__)
 
 
 @attr.s
@@ -45,11 +49,11 @@ class LogProducer:
     Args:
         buffer: Log buffer to read logs from.
         transport: Transport to write to.
-        format_event: A callable to format the log entry to a string.
+        format: A callable to format the log record to a string.
     """
 
     transport = attr.ib(type=ITransport)
-    format_event = attr.ib(type=Callable[[dict], str])
+    _format = attr.ib(type=Callable[[logging.LogRecord], str])
     _buffer = attr.ib(type=deque)
     _paused = attr.ib(default=False, type=bool, init=False)
 
@@ -61,16 +65,19 @@ class LogProducer:
         self._buffer = deque()
 
     def resumeProducing(self):
+        # If we're already producing, nothing to do.
         self._paused = False
 
+        # Loop until paused.
         while self._paused is False and (self._buffer and self.transport.connected):
             try:
-                # Request the next event and format it.
-                event = self._buffer.popleft()
-                msg = self.format_event(event)
+                # Request the next record and format it.
+                record = self._buffer.popleft()
+                msg = self._format(record)
 
                 # Send it as a new line over the transport.
                 self.transport.write(msg.encode("utf8"))
+                self.transport.write(b"\n")
             except Exception:
                 # Something has gone wrong writing to the transport -- log it
                 # and break out of the while.
@@ -78,76 +85,85 @@ class LogProducer:
                 break
 
 
-@attr.s
-@implementer(ILogObserver)
-class TCPLogObserver:
+class RemoteHandler(logging.Handler):
     """
-    An IObserver that writes JSON logs to a TCP target.
+    An logging handler that writes logs to a TCP target.
 
     Args:
-        hs (HomeServer): The homeserver that is being logged for.
         host: The host of the logging target.
         port: The logging target's port.
-        format_event: A callable to format the log entry to a string.
         maximum_buffer: The maximum buffer size.
     """
 
-    hs = attr.ib()
-    host = attr.ib(type=str)
-    port = attr.ib(type=int)
-    format_event = attr.ib(type=Callable[[dict], str])
-    maximum_buffer = attr.ib(type=int)
-    _buffer = attr.ib(default=attr.Factory(deque), type=deque)
-    _connection_waiter = attr.ib(default=None, type=Optional[Deferred])
-    _logger = attr.ib(default=attr.Factory(Logger))
-    _producer = attr.ib(default=None, type=Optional[LogProducer])
-
-    def start(self) -> None:
+    def __init__(
+        self,
+        host: str,
+        port: int,
+        maximum_buffer: int = 1000,
+        level=logging.NOTSET,
+        _reactor=None,
+    ):
+        super().__init__(level=level)
+        self.host = host
+        self.port = port
+        self.maximum_buffer = maximum_buffer
+
+        self._buffer = deque()  # type: Deque[logging.LogRecord]
+        self._connection_waiter = None  # type: Optional[Deferred]
+        self._producer = None  # type: Optional[LogProducer]
 
         # Connect without DNS lookups if it's a direct IP.
+        if _reactor is None:
+            from twisted.internet import reactor
+
+            _reactor = reactor
+
         try:
             ip = ip_address(self.host)
             if isinstance(ip, IPv4Address):
-                endpoint = TCP4ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP4ClientEndpoint(_reactor, self.host, self.port)
             elif isinstance(ip, IPv6Address):
-                endpoint = TCP6ClientEndpoint(
-                    self.hs.get_reactor(), self.host, self.port
-                )
+                endpoint = TCP6ClientEndpoint(_reactor, self.host, self.port)
             else:
                 raise ValueError("Unknown IP address provided: %s" % (self.host,))
         except ValueError:
-            endpoint = HostnameEndpoint(self.hs.get_reactor(), self.host, self.port)
+            endpoint = HostnameEndpoint(_reactor, self.host, self.port)
 
         factory = Factory.forProtocol(Protocol)
-        self._service = ClientService(endpoint, factory, clock=self.hs.get_reactor())
+        self._service = ClientService(endpoint, factory, clock=_reactor)
         self._service.startService()
+        self._stopping = False
         self._connect()
 
-    def stop(self):
+    def close(self):
+        self._stopping = True
         self._service.stopService()
 
     def _connect(self) -> None:
         """
         Triggers an attempt to connect then write to the remote if not already writing.
         """
+        # Do not attempt to open multiple connections.
         if self._connection_waiter:
             return
 
         self._connection_waiter = self._service.whenConnected(failAfterFailures=1)
 
-        @self._connection_waiter.addErrback
-        def fail(r):
-            r.printTraceback(file=sys.__stderr__)
+        def fail(failure: Failure) -> None:
+            # If the Deferred was cancelled (e.g. during shutdown) do not try to
+            # reconnect (this will cause an infinite loop of errors).
+            if failure.check(CancelledError) and self._stopping:
+                return
+
+            # For a different error, print the traceback and re-connect.
+            failure.printTraceback(file=sys.__stderr__)
             self._connection_waiter = None
             self._connect()
 
-        @self._connection_waiter.addCallback
-        def writer(r):
+        def writer(result: Protocol) -> None:
             # We have a connection. If we already have a producer, and its
             # transport is the same, just trigger a resumeProducing.
-            if self._producer and r.transport is self._producer.transport:
+            if self._producer and result.transport is self._producer.transport:
                 self._producer.resumeProducing()
                 self._connection_waiter = None
                 return
@@ -158,29 +174,29 @@ class TCPLogObserver:
 
             # Make a new producer and start it.
             self._producer = LogProducer(
-                buffer=self._buffer,
-                transport=r.transport,
-                format_event=self.format_event,
+                buffer=self._buffer, transport=result.transport, format=self.format,
             )
-            r.transport.registerProducer(self._producer, True)
+            result.transport.registerProducer(self._producer, True)
             self._producer.resumeProducing()
             self._connection_waiter = None
 
+        self._connection_waiter.addCallbacks(writer, fail)
+
     def _handle_pressure(self) -> None:
         """
-        Handle backpressure by shedding events.
+        Handle backpressure by shedding records.
 
         The buffer will, in this order, until the buffer is below the maximum:
-            - Shed DEBUG events
-            - Shed INFO events
-            - Shed the middle 50% of the events.
+            - Shed DEBUG records.
+            - Shed INFO records.
+            - Shed the middle 50% of the records.
         """
         if len(self._buffer) <= self.maximum_buffer:
             return
 
         # Strip out DEBUGs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.debug, self._buffer)
+            filter(lambda record: record.levelno > logging.DEBUG, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -188,7 +204,7 @@ class TCPLogObserver:
 
         # Strip out INFOs
         self._buffer = deque(
-            filter(lambda event: event["log_level"] != LogLevel.info, self._buffer)
+            filter(lambda record: record.levelno > logging.INFO, self._buffer)
         )
 
         if len(self._buffer) <= self.maximum_buffer:
@@ -209,17 +225,17 @@ class TCPLogObserver:
 
         self._buffer.extend(reversed(end_buffer))
 
-    def __call__(self, event: dict) -> None:
-        self._buffer.append(event)
+    def emit(self, record: logging.LogRecord) -> None:
+        self._buffer.append(record)
 
         # Handle backpressure, if it exists.
         try:
             self._handle_pressure()
         except Exception:
-            # If handling backpressure fails,clear the buffer and log the
+            # If handling backpressure fails, clear the buffer and log the
             # exception.
             self._buffer.clear()
-            self._logger.failure("Failed clearing backpressure")
+            logger.warning("Failed clearing backpressure")
 
         # Try and write immediately.
         self._connect()
diff --git a/synapse/logging/_structured.py b/synapse/logging/_structured.py
index 0fc2ea609e..14d9c104c2 100644
--- a/synapse/logging/_structured.py
+++ b/synapse/logging/_structured.py
@@ -12,138 +12,12 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-import logging
 import os.path
-import sys
-import typing
-import warnings
-from typing import List
+from typing import Any, Dict, Generator, Optional, Tuple
 
-import attr
-from constantly import NamedConstant, Names, ValueConstant, Values
-from zope.interface import implementer
-
-from twisted.logger import (
-    FileLogObserver,
-    FilteringLogObserver,
-    ILogObserver,
-    LogBeginner,
-    Logger,
-    LogLevel,
-    LogLevelFilterPredicate,
-    LogPublisher,
-    eventAsText,
-    jsonFileLogObserver,
-)
+from constantly import NamedConstant, Names
 
 from synapse.config._base import ConfigError
-from synapse.logging._terse_json import (
-    TerseJSONToConsoleLogObserver,
-    TerseJSONToTCPLogObserver,
-)
-from synapse.logging.context import current_context
-
-
-def stdlib_log_level_to_twisted(level: str) -> LogLevel:
-    """
-    Convert a stdlib log level to Twisted's log level.
-    """
-    lvl = level.lower().replace("warning", "warn")
-    return LogLevel.levelWithName(lvl)
-
-
-@attr.s
-@implementer(ILogObserver)
-class LogContextObserver:
-    """
-    An ILogObserver which adds Synapse-specific log context information.
-
-    Attributes:
-        observer (ILogObserver): The target parent observer.
-    """
-
-    observer = attr.ib()
-
-    def __call__(self, event: dict) -> None:
-        """
-        Consume a log event and emit it to the parent observer after filtering
-        and adding log context information.
-
-        Args:
-            event (dict)
-        """
-        # Filter out some useless events that Twisted outputs
-        if "log_text" in event:
-            if event["log_text"].startswith("DNSDatagramProtocol starting on "):
-                return
-
-            if event["log_text"].startswith("(UDP Port "):
-                return
-
-            if event["log_text"].startswith("Timing out client") or event[
-                "log_format"
-            ].startswith("Timing out client"):
-                return
-
-        context = current_context()
-
-        # Copy the context information to the log event.
-        context.copy_to_twisted_log_entry(event)
-
-        self.observer(event)
-
-
-class PythonStdlibToTwistedLogger(logging.Handler):
-    """
-    Transform a Python stdlib log message into a Twisted one.
-    """
-
-    def __init__(self, observer, *args, **kwargs):
-        """
-        Args:
-            observer (ILogObserver): A Twisted logging observer.
-            *args, **kwargs: Args/kwargs to be passed to logging.Handler.
-        """
-        self.observer = observer
-        super().__init__(*args, **kwargs)
-
-    def emit(self, record: logging.LogRecord) -> None:
-        """
-        Emit a record to Twisted's observer.
-
-        Args:
-            record (logging.LogRecord)
-        """
-
-        self.observer(
-            {
-                "log_time": record.created,
-                "log_text": record.getMessage(),
-                "log_format": "{log_text}",
-                "log_namespace": record.name,
-                "log_level": stdlib_log_level_to_twisted(record.levelname),
-            }
-        )
-
-
-def SynapseFileLogObserver(outFile: typing.IO[str]) -> FileLogObserver:
-    """
-    A log observer that formats events like the traditional log formatter and
-    sends them to `outFile`.
-
-    Args:
-        outFile (file object): The file object to write to.
-    """
-
-    def formatEvent(_event: dict) -> str:
-        event = dict(_event)
-        event["log_level"] = event["log_level"].name.upper()
-        event["log_format"] = "- {log_namespace} - {log_level} - {request} - " + (
-            event.get("log_format", "{log_text}") or "{log_text}"
-        )
-        return eventAsText(event, includeSystem=False) + "\n"
-
-    return FileLogObserver(outFile, formatEvent)
 
 
 class DrainType(Names):
@@ -155,30 +29,12 @@ class DrainType(Names):
     NETWORK_JSON_TERSE = NamedConstant()
 
 
-class OutputPipeType(Values):
-    stdout = ValueConstant(sys.__stdout__)
-    stderr = ValueConstant(sys.__stderr__)
-
-
-@attr.s
-class DrainConfiguration:
-    name = attr.ib()
-    type = attr.ib()
-    location = attr.ib()
-    options = attr.ib(default=None)
-
-
-@attr.s
-class NetworkJSONTerseOptions:
-    maximum_buffer = attr.ib(type=int)
-
-
-DEFAULT_LOGGERS = {"synapse": {"level": "INFO"}}
+DEFAULT_LOGGERS = {"synapse": {"level": "info"}}
 
 
 def parse_drain_configs(
     drains: dict,
-) -> typing.Generator[DrainConfiguration, None, None]:
+) -> Generator[Tuple[str, Dict[str, Any]], None, None]:
     """
     Parse the drain configurations.
 
@@ -186,11 +42,12 @@ def parse_drain_configs(
         drains (dict): A list of drain configurations.
 
     Yields:
-        DrainConfiguration instances.
+        dict instances representing a logging handler.
 
     Raises:
         ConfigError: If any of the drain configuration items are invalid.
     """
+
     for name, config in drains.items():
         if "type" not in config:
             raise ConfigError("Logging drains require a 'type' key.")
@@ -202,6 +59,18 @@ def parse_drain_configs(
                 "%s is not a known logging drain type." % (config["type"],)
             )
 
+        # Either use the default formatter or the tersejson one.
+        if logging_type in (DrainType.CONSOLE_JSON, DrainType.FILE_JSON,):
+            formatter = "json"  # type: Optional[str]
+        elif logging_type in (
+            DrainType.CONSOLE_JSON_TERSE,
+            DrainType.NETWORK_JSON_TERSE,
+        ):
+            formatter = "tersejson"
+        else:
+            # A formatter of None implies using the default formatter.
+            formatter = None
+
         if logging_type in [
             DrainType.CONSOLE,
             DrainType.CONSOLE_JSON,
@@ -217,9 +86,11 @@ def parse_drain_configs(
                     % (logging_type,)
                 )
 
-            pipe = OutputPipeType.lookupByName(location).value
-
-            yield DrainConfiguration(name=name, type=logging_type, location=pipe)
+            yield name, {
+                "class": "logging.StreamHandler",
+                "formatter": formatter,
+                "stream": "ext://sys." + location,
+            }
 
         elif logging_type in [DrainType.FILE, DrainType.FILE_JSON]:
             if "location" not in config:
@@ -233,18 +104,25 @@ def parse_drain_configs(
                     "File paths need to be absolute, '%s' is a relative path"
                     % (location,)
                 )
-            yield DrainConfiguration(name=name, type=logging_type, location=location)
+
+            yield name, {
+                "class": "logging.FileHandler",
+                "formatter": formatter,
+                "filename": location,
+            }
 
         elif logging_type in [DrainType.NETWORK_JSON_TERSE]:
             host = config.get("host")
             port = config.get("port")
             maximum_buffer = config.get("maximum_buffer", 1000)
-            yield DrainConfiguration(
-                name=name,
-                type=logging_type,
-                location=(host, port),
-                options=NetworkJSONTerseOptions(maximum_buffer=maximum_buffer),
-            )
+
+            yield name, {
+                "class": "synapse.logging.RemoteHandler",
+                "formatter": formatter,
+                "host": host,
+                "port": port,
+                "maximum_buffer": maximum_buffer,
+            }
 
         else:
             raise ConfigError(
@@ -253,126 +131,29 @@ def parse_drain_configs(
             )
 
 
-class StoppableLogPublisher(LogPublisher):
+def setup_structured_logging(log_config: dict,) -> dict:
     """
-    A log publisher that can tell its observers to shut down any external
-    communications.
-    """
-
-    def stop(self):
-        for obs in self._observers:
-            if hasattr(obs, "stop"):
-                obs.stop()
-
-
-def setup_structured_logging(
-    hs,
-    config,
-    log_config: dict,
-    logBeginner: LogBeginner,
-    redirect_stdlib_logging: bool = True,
-) -> LogPublisher:
-    """
-    Set up Twisted's structured logging system.
-
-    Args:
-        hs: The homeserver to use.
-        config (HomeserverConfig): The configuration of the Synapse homeserver.
-        log_config (dict): The log configuration to use.
+    Convert a legacy structured logging configuration (from Synapse < v1.23.0)
+    to one compatible with the new standard library handlers.
     """
-    if config.no_redirect_stdio:
-        raise ConfigError(
-            "no_redirect_stdio cannot be defined using structured logging."
-        )
-
-    logger = Logger()
-
     if "drains" not in log_config:
         raise ConfigError("The logging configuration requires a list of drains.")
 
-    observers = []  # type: List[ILogObserver]
-
-    for observer in parse_drain_configs(log_config["drains"]):
-        # Pipe drains
-        if observer.type == DrainType.CONSOLE:
-            logger.debug(
-                "Starting up the {name} console logger drain", name=observer.name
-            )
-            observers.append(SynapseFileLogObserver(observer.location))
-        elif observer.type == DrainType.CONSOLE_JSON:
-            logger.debug(
-                "Starting up the {name} JSON console logger drain", name=observer.name
-            )
-            observers.append(jsonFileLogObserver(observer.location))
-        elif observer.type == DrainType.CONSOLE_JSON_TERSE:
-            logger.debug(
-                "Starting up the {name} terse JSON console logger drain",
-                name=observer.name,
-            )
-            observers.append(
-                TerseJSONToConsoleLogObserver(observer.location, metadata={})
-            )
-
-        # File drains
-        elif observer.type == DrainType.FILE:
-            logger.debug("Starting up the {name} file logger drain", name=observer.name)
-            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
-            observers.append(SynapseFileLogObserver(log_file))
-        elif observer.type == DrainType.FILE_JSON:
-            logger.debug(
-                "Starting up the {name} JSON file logger drain", name=observer.name
-            )
-            log_file = open(observer.location, "at", buffering=1, encoding="utf8")
-            observers.append(jsonFileLogObserver(log_file))
-
-        elif observer.type == DrainType.NETWORK_JSON_TERSE:
-            metadata = {"server_name": hs.config.server_name}
-            log_observer = TerseJSONToTCPLogObserver(
-                hs=hs,
-                host=observer.location[0],
-                port=observer.location[1],
-                metadata=metadata,
-                maximum_buffer=observer.options.maximum_buffer,
-            )
-            log_observer.start()
-            observers.append(log_observer)
-        else:
-            # We should never get here, but, just in case, throw an error.
-            raise ConfigError("%s drain type cannot be configured" % (observer.type,))
-
-    publisher = StoppableLogPublisher(*observers)
-    log_filter = LogLevelFilterPredicate()
-
-    for namespace, namespace_config in log_config.get(
-        "loggers", DEFAULT_LOGGERS
-    ).items():
-        # Set the log level for twisted.logger.Logger namespaces
-        log_filter.setLogLevelForNamespace(
-            namespace,
-            stdlib_log_level_to_twisted(namespace_config.get("level", "INFO")),
-        )
-
-        # Also set the log levels for the stdlib logger namespaces, to prevent
-        # them getting to PythonStdlibToTwistedLogger and having to be formatted
-        if "level" in namespace_config:
-            logging.getLogger(namespace).setLevel(namespace_config.get("level"))
-
-    f = FilteringLogObserver(publisher, [log_filter])
-    lco = LogContextObserver(f)
-
-    if redirect_stdlib_logging:
-        stuff_into_twisted = PythonStdlibToTwistedLogger(lco)
-        stdliblogger = logging.getLogger()
-        stdliblogger.addHandler(stuff_into_twisted)
-
-    # Always redirect standard I/O, otherwise other logging outputs might miss
-    # it.
-    logBeginner.beginLoggingTo([lco], redirectStandardIO=True)
+    new_config = {
+        "version": 1,
+        "formatters": {
+            "json": {"class": "synapse.logging.JsonFormatter"},
+            "tersejson": {"class": "synapse.logging.TerseJsonFormatter"},
+        },
+        "handlers": {},
+        "loggers": log_config.get("loggers", DEFAULT_LOGGERS),
+        "root": {"handlers": []},
+    }
 
-    return publisher
+    for handler_name, handler in parse_drain_configs(log_config["drains"]):
+        new_config["handlers"][handler_name] = handler
 
+        # Add each handler to the root logger.
+        new_config["root"]["handlers"].append(handler_name)
 
-def reload_structured_logging(*args, log_config=None) -> None:
-    warnings.warn(
-        "Currently the structured logging system can not be reloaded, doing nothing"
-    )
+    return new_config
diff --git a/synapse/logging/_terse_json.py b/synapse/logging/_terse_json.py
index 9b46956ca9..2fbf5549a1 100644
--- a/synapse/logging/_terse_json.py
+++ b/synapse/logging/_terse_json.py
@@ -16,141 +16,65 @@
 """
 Log formatters that output terse JSON.
 """
-
 import json
-from typing import IO
-
-from twisted.logger import FileLogObserver
-
-from synapse.logging._remote import TCPLogObserver
+import logging
 
 _encoder = json.JSONEncoder(ensure_ascii=False, separators=(",", ":"))
 
-
-def flatten_event(event: dict, metadata: dict, include_time: bool = False):
-    """
-    Flatten a Twisted logging event to an dictionary capable of being sent
-    as a log event to a logging aggregation system.
-
-    The format is vastly simplified and is not designed to be a "human readable
-    string" in the sense that traditional logs are. Instead, the structure is
-    optimised for searchability and filtering, with human-understandable log
-    keys.
-
-    Args:
-        event (dict): The Twisted logging event we are flattening.
-        metadata (dict): Additional data to include with each log message. This
-            can be information like the server name. Since the target log
-            consumer does not know who we are other than by host IP, this
-            allows us to forward through static information.
-        include_time (bool): Should we include the `time` key? If False, the
-            event time is stripped from the event.
-    """
-    new_event = {}
-
-    # If it's a failure, make the new event's log_failure be the traceback text.
-    if "log_failure" in event:
-        new_event["log_failure"] = event["log_failure"].getTraceback()
-
-    # If it's a warning, copy over a string representation of the warning.
-    if "warning" in event:
-        new_event["warning"] = str(event["warning"])
-
-    # Stdlib logging events have "log_text" as their human-readable portion,
-    # Twisted ones have "log_format". For now, include the log_format, so that
-    # context only given in the log format (e.g. what is being logged) is
-    # available.
-    if "log_text" in event:
-        new_event["log"] = event["log_text"]
-    else:
-        new_event["log"] = event["log_format"]
-
-    # We want to include the timestamp when forwarding over the network, but
-    # exclude it when we are writing to stdout. This is because the log ingester
-    # (e.g. logstash, fluentd) can add its own timestamp.
-    if include_time:
-        new_event["time"] = round(event["log_time"], 2)
-
-    # Convert the log level to a textual representation.
-    new_event["level"] = event["log_level"].name.upper()
-
-    # Ignore these keys, and do not transfer them over to the new log object.
-    # They are either useless (isError), transferred manually above (log_time,
-    # log_level, etc), or contain Python objects which are not useful for output
-    # (log_logger, log_source).
-    keys_to_delete = [
-        "isError",
-        "log_failure",
-        "log_format",
-        "log_level",
-        "log_logger",
-        "log_source",
-        "log_system",
-        "log_time",
-        "log_text",
-        "observer",
-        "warning",
-    ]
-
-    # If it's from the Twisted legacy logger (twisted.python.log), it adds some
-    # more keys we want to purge.
-    if event.get("log_namespace") == "log_legacy":
-        keys_to_delete.extend(["message", "system", "time"])
-
-    # Rather than modify the dictionary in place, construct a new one with only
-    # the content we want. The original event should be considered 'frozen'.
-    for key in event.keys():
-
-        if key in keys_to_delete:
-            continue
-
-        if isinstance(event[key], (str, int, bool, float)) or event[key] is None:
-            # If it's a plain type, include it as is.
-            new_event[key] = event[key]
-        else:
-            # If it's not one of those basic types, write out a string
-            # representation. This should probably be a warning in development,
-            # so that we are sure we are only outputting useful data.
-            new_event[key] = str(event[key])
-
-    # Add the metadata information to the event (e.g. the server_name).
-    new_event.update(metadata)
-
-    return new_event
-
-
-def TerseJSONToConsoleLogObserver(outFile: IO[str], metadata: dict) -> FileLogObserver:
-    """
-    A log observer that formats events to a flattened JSON representation.
-
-    Args:
-        outFile: The file object to write to.
-        metadata: Metadata to be added to each log object.
-    """
-
-    def formatEvent(_event: dict) -> str:
-        flattened = flatten_event(_event, metadata)
-        return _encoder.encode(flattened) + "\n"
-
-    return FileLogObserver(outFile, formatEvent)
-
-
-def TerseJSONToTCPLogObserver(
-    hs, host: str, port: int, metadata: dict, maximum_buffer: int
-) -> FileLogObserver:
-    """
-    A log observer that formats events to a flattened JSON representation.
-
-    Args:
-        hs (HomeServer): The homeserver that is being logged for.
-        host: The host of the logging target.
-        port: The logging target's port.
-        metadata: Metadata to be added to each log object.
-        maximum_buffer: The maximum buffer size.
-    """
-
-    def formatEvent(_event: dict) -> str:
-        flattened = flatten_event(_event, metadata, include_time=True)
-        return _encoder.encode(flattened) + "\n"
-
-    return TCPLogObserver(hs, host, port, formatEvent, maximum_buffer)
+# The properties of a standard LogRecord.
+_LOG_RECORD_ATTRIBUTES = {
+    "args",
+    "asctime",
+    "created",
+    "exc_info",
+    # exc_text isn't a public attribute, but is used to cache the result of formatException.
+    "exc_text",
+    "filename",
+    "funcName",
+    "levelname",
+    "levelno",
+    "lineno",
+    "message",
+    "module",
+    "msecs",
+    "msg",
+    "name",
+    "pathname",
+    "process",
+    "processName",
+    "relativeCreated",
+    "stack_info",
+    "thread",
+    "threadName",
+}
+
+
+class JsonFormatter(logging.Formatter):
+    def format(self, record: logging.LogRecord) -> str:
+        event = {
+            "log": record.getMessage(),
+            "namespace": record.name,
+            "level": record.levelname,
+        }
+
+        return self._format(record, event)
+
+    def _format(self, record: logging.LogRecord, event: dict) -> str:
+        # Add any extra attributes to the event.
+        for key, value in record.__dict__.items():
+            if key not in _LOG_RECORD_ATTRIBUTES:
+                event[key] = value
+
+        return _encoder.encode(event)
+
+
+class TerseJsonFormatter(JsonFormatter):
+    def format(self, record: logging.LogRecord) -> str:
+        event = {
+            "log": record.getMessage(),
+            "namespace": record.name,
+            "level": record.levelname,
+            "time": round(record.created, 2),
+        }
+
+        return self._format(record, event)
diff --git a/synapse/logging/filter.py b/synapse/logging/filter.py
new file mode 100644
index 0000000000..1baf8dd679
--- /dev/null
+++ b/synapse/logging/filter.py
@@ -0,0 +1,33 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+
+from typing_extensions import Literal
+
+
+class MetadataFilter(logging.Filter):
+    """Logging filter that adds constant values to each record.
+
+    Args:
+        metadata: Key-value pairs to add to each record.
+    """
+
+    def __init__(self, metadata: dict):
+        self._metadata = metadata
+
+    def filter(self, record: logging.LogRecord) -> Literal[True]:
+        for key, value in self._metadata.items():
+            setattr(record, key, value)
+        return True
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index e58850faff..ab586c318c 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -317,7 +317,7 @@ def ensure_active_span(message, ret=None):
 
 
 @contextlib.contextmanager
-def _noop_context_manager(*args, **kwargs):
+def noop_context_manager(*args, **kwargs):
     """Does exactly what it says on the tin"""
     yield
 
@@ -413,7 +413,7 @@ def start_active_span(
     """
 
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     return opentracing.tracer.start_active_span(
         operation_name,
@@ -428,7 +428,7 @@ def start_active_span(
 
 def start_active_span_follows_from(operation_name, contexts):
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     references = [opentracing.follows_from(context) for context in contexts]
     scope = start_active_span(operation_name, references=references)
@@ -459,7 +459,7 @@ def start_active_span_from_request(
     # Also, twisted uses byte arrays while opentracing expects strings.
 
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     header_dict = {
         k.decode(): v[0].decode() for k, v in request.requestHeaders.getAllRawHeaders()
@@ -497,7 +497,7 @@ def start_active_span_from_edu(
     """
 
     if opentracing is None:
-        return _noop_context_manager()
+        return noop_context_manager()
 
     carrier = json_decoder.decode(edu_content.get("context", "{}")).get(
         "opentracing", {}
diff --git a/synapse/metrics/background_process_metrics.py b/synapse/metrics/background_process_metrics.py
index ea5f1c7b62..658f6ecd72 100644
--- a/synapse/metrics/background_process_metrics.py
+++ b/synapse/metrics/background_process_metrics.py
@@ -24,7 +24,7 @@ from prometheus_client.core import REGISTRY, Counter, Gauge
 from twisted.internet import defer
 
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
-from synapse.logging.opentracing import start_active_span
+from synapse.logging.opentracing import noop_context_manager, start_active_span
 
 if TYPE_CHECKING:
     import resource
@@ -167,7 +167,7 @@ class _BackgroundProcess:
         )
 
 
-def run_as_background_process(desc: str, func, *args, **kwargs):
+def run_as_background_process(desc: str, func, *args, bg_start_span=True, **kwargs):
     """Run the given function in its own logcontext, with resource metrics
 
     This should be used to wrap processes which are fired off to run in the
@@ -181,6 +181,9 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
     Args:
         desc: a description for this background process type
         func: a function, which may return a Deferred or a coroutine
+        bg_start_span: Whether to start an opentracing span. Defaults to True.
+            Should only be disabled for processes that will not log to or tag
+            a span.
         args: positional args for func
         kwargs: keyword args for func
 
@@ -199,7 +202,10 @@ def run_as_background_process(desc: str, func, *args, **kwargs):
         with BackgroundProcessLoggingContext(desc) as context:
             context.request = "%s-%i" % (desc, count)
             try:
-                with start_active_span(desc, tags={"request_id": context.request}):
+                ctx = noop_context_manager()
+                if bg_start_span:
+                    ctx = start_active_span(desc, tags={"request_id": context.request})
+                with ctx:
                     result = func(*args, **kwargs)
 
                     if inspect.isawaitable(result):
@@ -266,7 +272,7 @@ class BackgroundProcessLoggingContext(LoggingContext):
 
         super().__exit__(type, value, traceback)
 
-        # The background process has finished. We explictly remove and manually
+        # The background process has finished. We explicitly remove and manually
         # update the metrics here so that if nothing is scraping metrics the set
         # doesn't infinitely grow.
         with _bg_metrics_lock:
diff --git a/synapse/notifier.py b/synapse/notifier.py
index 2e993411b9..a17352ef46 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -28,6 +28,7 @@ from typing import (
     Union,
 )
 
+import attr
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -40,7 +41,6 @@ from synapse.handlers.presence import format_user_presence_state
 from synapse.logging.context import PreserveLoggingContext
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
-from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.streams.config import PaginationConfig
 from synapse.types import (
     Collection,
@@ -174,6 +174,17 @@ class EventStreamResult(namedtuple("EventStreamResult", ("events", "tokens"))):
         return bool(self.events)
 
 
+@attr.s(slots=True, frozen=True)
+class _PendingRoomEventEntry:
+    event_pos = attr.ib(type=PersistedEventPosition)
+    extra_users = attr.ib(type=Collection[UserID])
+
+    room_id = attr.ib(type=str)
+    type = attr.ib(type=str)
+    state_key = attr.ib(type=Optional[str])
+    membership = attr.ib(type=Optional[str])
+
+
 class Notifier:
     """ This class is responsible for notifying any listeners when there are
     new events available for it.
@@ -191,9 +202,7 @@ class Notifier:
         self.storage = hs.get_storage()
         self.event_sources = hs.get_event_sources()
         self.store = hs.get_datastore()
-        self.pending_new_room_events = (
-            []
-        )  # type: List[Tuple[PersistedEventPosition, EventBase, Collection[UserID]]]
+        self.pending_new_room_events = []  # type: List[_PendingRoomEventEntry]
 
         # Called when there are new things to stream over replication
         self.replication_callbacks = []  # type: List[Callable[[], None]]
@@ -256,7 +265,29 @@ class Notifier:
         max_room_stream_token: RoomStreamToken,
         extra_users: Collection[UserID] = [],
     ):
-        """ Used by handlers to inform the notifier something has happened
+        """Unwraps event and calls `on_new_room_event_args`.
+        """
+        self.on_new_room_event_args(
+            event_pos=event_pos,
+            room_id=event.room_id,
+            event_type=event.type,
+            state_key=event.get("state_key"),
+            membership=event.content.get("membership"),
+            max_room_stream_token=max_room_stream_token,
+            extra_users=extra_users,
+        )
+
+    def on_new_room_event_args(
+        self,
+        room_id: str,
+        event_type: str,
+        state_key: Optional[str],
+        membership: Optional[str],
+        event_pos: PersistedEventPosition,
+        max_room_stream_token: RoomStreamToken,
+        extra_users: Collection[UserID] = [],
+    ):
+        """Used by handlers to inform the notifier something has happened
         in the room, room event wise.
 
         This triggers the notifier to wake up any listeners that are
@@ -267,7 +298,16 @@ class Notifier:
         until all previous events have been persisted before notifying
         the client streams.
         """
-        self.pending_new_room_events.append((event_pos, event, extra_users))
+        self.pending_new_room_events.append(
+            _PendingRoomEventEntry(
+                event_pos=event_pos,
+                extra_users=extra_users,
+                room_id=room_id,
+                type=event_type,
+                state_key=state_key,
+                membership=membership,
+            )
+        )
         self._notify_pending_new_room_events(max_room_stream_token)
 
         self.notify_replication()
@@ -285,18 +325,19 @@ class Notifier:
         users = set()  # type: Set[UserID]
         rooms = set()  # type: Set[str]
 
-        for event_pos, event, extra_users in pending:
-            if event_pos.persisted_after(max_room_stream_token):
-                self.pending_new_room_events.append((event_pos, event, extra_users))
+        for entry in pending:
+            if entry.event_pos.persisted_after(max_room_stream_token):
+                self.pending_new_room_events.append(entry)
             else:
                 if (
-                    event.type == EventTypes.Member
-                    and event.membership == Membership.JOIN
+                    entry.type == EventTypes.Member
+                    and entry.membership == Membership.JOIN
+                    and entry.state_key
                 ):
-                    self._user_joined_room(event.state_key, event.room_id)
+                    self._user_joined_room(entry.state_key, entry.room_id)
 
-                users.update(extra_users)
-                rooms.add(event.room_id)
+                users.update(entry.extra_users)
+                rooms.add(entry.room_id)
 
         if users or rooms:
             self.on_new_event(
@@ -310,44 +351,37 @@ class Notifier:
         """
 
         # poke any interested application service.
-        run_as_background_process(
-            "_notify_app_services", self._notify_app_services, max_room_stream_token
-        )
-
-        run_as_background_process(
-            "_notify_pusher_pool", self._notify_pusher_pool, max_room_stream_token
-        )
+        self._notify_app_services(max_room_stream_token)
+        self._notify_pusher_pool(max_room_stream_token)
 
         if self.federation_sender:
             self.federation_sender.notify_new_events(max_room_stream_token)
 
-    async def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
+    def _notify_app_services(self, max_room_stream_token: RoomStreamToken):
         try:
-            await self.appservice_handler.notify_interested_services(
-                max_room_stream_token
-            )
+            self.appservice_handler.notify_interested_services(max_room_stream_token)
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    async def _notify_app_services_ephemeral(
+    def _notify_app_services_ephemeral(
         self,
         stream_key: str,
         new_token: Union[int, RoomStreamToken],
-        users: Collection[UserID] = [],
+        users: Collection[Union[str, UserID]] = [],
     ):
         try:
             stream_token = None
             if isinstance(new_token, int):
                 stream_token = new_token
-            await self.appservice_handler.notify_interested_services_ephemeral(
+            self.appservice_handler.notify_interested_services_ephemeral(
                 stream_key, stream_token, users
             )
         except Exception:
             logger.exception("Error notifying application services of event")
 
-    async def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
+    def _notify_pusher_pool(self, max_room_stream_token: RoomStreamToken):
         try:
-            await self._pusher_pool.on_new_notifications(max_room_stream_token)
+            self._pusher_pool.on_new_notifications(max_room_stream_token)
         except Exception:
             logger.exception("Error pusher pool of event")
 
@@ -384,16 +418,12 @@ class Notifier:
                 self.notify_replication()
 
                 # Notify appservices
-                run_as_background_process(
-                    "_notify_app_services_ephemeral",
-                    self._notify_app_services_ephemeral,
-                    stream_key,
-                    new_token,
-                    users,
+                self._notify_app_services_ephemeral(
+                    stream_key, new_token, users,
                 )
 
     def on_new_replication_data(self) -> None:
-        """Used to inform replication listeners that something has happend
+        """Used to inform replication listeners that something has happened
         without waking up any of the normal user event streams"""
         self.notify_replication()
 
diff --git a/synapse/push/baserules.py b/synapse/push/baserules.py
index 8047873ff1..2858b61fb1 100644
--- a/synapse/push/baserules.py
+++ b/synapse/push/baserules.py
@@ -37,7 +37,7 @@ def list_with_base_rules(rawrules, use_new_defaults=False):
     modified_base_rules = {r["rule_id"]: r for r in rawrules if r["priority_class"] < 0}
 
     # Remove the modified base rules from the list, They'll be added back
-    # in the default postions in the list.
+    # in the default positions in the list.
     rawrules = [r for r in rawrules if r["priority_class"] >= 0]
 
     # shove the server default rules for each kind onto the end of each
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index a701defcdd..82a72dc34f 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -15,8 +15,8 @@
 # limitations under the License.
 
 import logging
-from collections import namedtuple
 
+import attr
 from prometheus_client import Counter
 
 from synapse.api.constants import EventTypes, Membership, RelationTypes
@@ -26,7 +26,8 @@ from synapse.events.snapshot import EventContext
 from synapse.state import POWER_KEY
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import register_cache
-from synapse.util.caches.descriptors import cached
+from synapse.util.caches.descriptors import lru_cache
+from synapse.util.caches.lrucache import LruCache
 
 from .push_rule_evaluator import PushRuleEvaluatorForEvent
 
@@ -120,7 +121,7 @@ class BulkPushRuleEvaluator:
             dict of user_id -> push_rules
         """
         room_id = event.room_id
-        rules_for_room = await self._get_rules_for_room(room_id)
+        rules_for_room = self._get_rules_for_room(room_id)
 
         rules_by_user = await rules_for_room.get_rules(event, context)
 
@@ -138,7 +139,7 @@ class BulkPushRuleEvaluator:
 
         return rules_by_user
 
-    @cached()
+    @lru_cache()
     def _get_rules_for_room(self, room_id):
         """Get the current RulesForRoom object for the given room id
 
@@ -275,12 +276,14 @@ class RulesForRoom:
     the entire cache for the room.
     """
 
-    def __init__(self, hs, room_id, rules_for_room_cache, room_push_rule_cache_metrics):
+    def __init__(
+        self, hs, room_id, rules_for_room_cache: LruCache, room_push_rule_cache_metrics
+    ):
         """
         Args:
             hs (HomeServer)
             room_id (str)
-            rules_for_room_cache(Cache): The cache object that caches these
+            rules_for_room_cache: The cache object that caches these
                 RoomsForUser objects.
             room_push_rule_cache_metrics (CacheMetric)
         """
@@ -390,12 +393,12 @@ class RulesForRoom:
                     continue
 
                 # If a user has left a room we remove their push rule. If they
-                # joined then we readd it later in _update_rules_with_member_event_ids
+                # joined then we re-add it later in _update_rules_with_member_event_ids
                 ret_rules_by_user.pop(user_id, None)
                 missing_member_event_ids[user_id] = event_id
 
             if missing_member_event_ids:
-                # If we have some memebr events we haven't seen, look them up
+                # If we have some member events we haven't seen, look them up
                 # and fetch push rules for them if appropriate.
                 logger.debug("Found new member events %r", missing_member_event_ids)
                 await self._update_rules_with_member_event_ids(
@@ -489,13 +492,21 @@ class RulesForRoom:
             self.state_group = state_group
 
 
-class _Invalidation(namedtuple("_Invalidation", ("cache", "room_id"))):
-    # We rely on _CacheContext implementing __eq__ and __hash__ sensibly,
-    # which namedtuple does for us (i.e. two _CacheContext are the same if
-    # their caches and keys match). This is important in particular to
-    # dedupe when we add callbacks to lru cache nodes, otherwise the number
-    # of callbacks would grow.
+@attr.attrs(slots=True, frozen=True)
+class _Invalidation:
+    # _Invalidation is passed as an `on_invalidate` callback to bulk_get_push_rules,
+    # which means that it it is stored on the bulk_get_push_rules cache entry. In order
+    # to ensure that we don't accumulate lots of redunant callbacks on the cache entry,
+    # we need to ensure that two _Invalidation objects are "equal" if they refer to the
+    # same `cache` and `room_id`.
+    #
+    # attrs provides suitable __hash__ and __eq__ methods, provided we remember to
+    # set `frozen=True`.
+
+    cache = attr.ib(type=LruCache)
+    room_id = attr.ib(type=str)
+
     def __call__(self):
-        rules = self.cache.get_immediate(self.room_id, None, update_metrics=False)
+        rules = self.cache.get(self.room_id, None, update_metrics=False)
         if rules:
             rules.invalidate_all()
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index 155791b754..38195c8eea 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -24,7 +24,7 @@ from typing import Iterable, List, TypeVar
 import bleach
 import jinja2
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import StoreError
 from synapse.config.emailconfig import EmailSubjectConfig
 from synapse.logging.context import make_deferred_yieldable
@@ -317,9 +317,14 @@ class Mailer:
     async def get_room_vars(
         self, room_id, user_id, notifs, notif_events, room_state_ids
     ):
-        my_member_event_id = room_state_ids[("m.room.member", user_id)]
-        my_member_event = await self.store.get_event(my_member_event_id)
-        is_invite = my_member_event.content["membership"] == "invite"
+        # Check if one of the notifs is an invite event for the user.
+        is_invite = False
+        for n in notifs:
+            ev = notif_events[n["event_id"]]
+            if ev.type == EventTypes.Member and ev.state_key == user_id:
+                if ev.content.get("membership") == Membership.INVITE:
+                    is_invite = True
+                    break
 
         room_name = await calculate_room_name(self.store, room_state_ids, user_id)
 
@@ -461,16 +466,26 @@ class Mailer:
                 self.store, room_state_ids[room_id], user_id, fallback_to_members=False
             )
 
-            my_member_event_id = room_state_ids[room_id][("m.room.member", user_id)]
-            my_member_event = await self.store.get_event(my_member_event_id)
-            if my_member_event.content["membership"] == "invite":
-                inviter_member_event_id = room_state_ids[room_id][
-                    ("m.room.member", my_member_event.sender)
-                ]
-                inviter_member_event = await self.store.get_event(
-                    inviter_member_event_id
+            # See if one of the notifs is an invite event for the user
+            invite_event = None
+            for n in notifs_by_room[room_id]:
+                ev = notif_events[n["event_id"]]
+                if ev.type == EventTypes.Member and ev.state_key == user_id:
+                    if ev.content.get("membership") == Membership.INVITE:
+                        invite_event = ev
+                        break
+
+            if invite_event:
+                inviter_member_event_id = room_state_ids[room_id].get(
+                    ("m.room.member", invite_event.sender)
                 )
-                inviter_name = name_from_member_event(inviter_member_event)
+                inviter_name = invite_event.sender
+                if inviter_member_event_id:
+                    inviter_member_event = await self.store.get_event(
+                        inviter_member_event_id, allow_none=True
+                    )
+                    if inviter_member_event:
+                        inviter_name = name_from_member_event(inviter_member_event)
 
                 if room_name is None:
                     return self.email_subjects.invite_from_person % {
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 0080c68ce2..f325964983 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -19,7 +19,10 @@ from typing import TYPE_CHECKING, Dict, Union
 
 from prometheus_client import Gauge
 
-from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.metrics.background_process_metrics import (
+    run_as_background_process,
+    wrap_as_background_process,
+)
 from synapse.push import PusherConfigException
 from synapse.push.emailpusher import EmailPusher
 from synapse.push.httppusher import HttpPusher
@@ -187,7 +190,7 @@ class PusherPool:
                 )
                 await self.remove_pusher(p["app_id"], p["pushkey"], p["user_name"])
 
-    async def on_new_notifications(self, max_token: RoomStreamToken):
+    def on_new_notifications(self, max_token: RoomStreamToken):
         if not self.pushers:
             # nothing to do here.
             return
@@ -201,6 +204,17 @@ class PusherPool:
             # Nothing to do
             return
 
+        # We only start a new background process if necessary rather than
+        # optimistically (to cut down on overhead).
+        self._on_new_notifications(max_token)
+
+    @wrap_as_background_process("on_new_notifications")
+    async def _on_new_notifications(self, max_token: RoomStreamToken):
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        max_stream_id = max_token.stream
+
         prev_stream_id = self._last_room_stream_id_seen
         self._last_room_stream_id_seen = max_stream_id
 
diff --git a/synapse/replication/http/membership.py b/synapse/replication/http/membership.py
index e7cc74a5d2..f0c37eaf5e 100644
--- a/synapse/replication/http/membership.py
+++ b/synapse/replication/http/membership.py
@@ -77,8 +77,7 @@ class ReplicationRemoteJoinRestServlet(ReplicationEndpoint):
 
         requester = Requester.deserialize(self.store, content["requester"])
 
-        if requester.user:
-            request.authenticated_entity = requester.user.to_string()
+        request.requester = requester
 
         logger.info("remote_join: %s into room: %s", user_id, room_id)
 
@@ -142,8 +141,7 @@ class ReplicationRemoteRejectInviteRestServlet(ReplicationEndpoint):
 
         requester = Requester.deserialize(self.store, content["requester"])
 
-        if requester.user:
-            request.authenticated_entity = requester.user.to_string()
+        request.requester = requester
 
         # hopefully we're now on the master, so this won't recurse!
         event_id, stream_id = await self.member_handler.remote_reject_invite(
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index fc129dbaa7..8fa104c8d3 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -115,8 +115,7 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             ratelimit = content["ratelimit"]
             extra_users = [UserID.from_string(u) for u in content["extra_users"]]
 
-        if requester.user:
-            request.authenticated_entity = requester.user.to_string()
+        request.requester = requester
 
         logger.info(
             "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index e27ee216f0..2618eb1e53 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -141,21 +141,25 @@ class ReplicationDataHandler:
                 if row.type != EventsStreamEventRow.TypeId:
                     continue
                 assert isinstance(row, EventsStreamRow)
+                assert isinstance(row.data, EventsStreamEventRow)
 
-                event = await self.store.get_event(
-                    row.data.event_id, allow_rejected=True
-                )
-                if event.rejected_reason:
+                if row.data.rejected:
                     continue
 
                 extra_users = ()  # type: Tuple[UserID, ...]
-                if event.type == EventTypes.Member:
-                    extra_users = (UserID.from_string(event.state_key),)
+                if row.data.type == EventTypes.Member and row.data.state_key:
+                    extra_users = (UserID.from_string(row.data.state_key),)
 
                 max_token = self.store.get_room_max_token()
                 event_pos = PersistedEventPosition(instance_name, token)
-                self.notifier.on_new_room_event(
-                    event, event_pos, max_token, extra_users
+                self.notifier.on_new_room_event_args(
+                    event_pos=event_pos,
+                    max_room_stream_token=max_token,
+                    extra_users=extra_users,
+                    room_id=row.data.room_id,
+                    event_type=row.data.type,
+                    state_key=row.data.state_key,
+                    membership=row.data.membership,
                 )
 
         # Notify any waiting deferreds. The list is ordered by position so we
diff --git a/synapse/replication/tcp/redis.py b/synapse/replication/tcp/redis.py
index de19705c1f..bc6ba709a7 100644
--- a/synapse/replication/tcp/redis.py
+++ b/synapse/replication/tcp/redis.py
@@ -166,7 +166,9 @@ class RedisSubscriber(txredisapi.SubscriberProtocol, AbstractConnection):
         Args:
             cmd (Command)
         """
-        run_as_background_process("send-cmd", self._async_send_command, cmd)
+        run_as_background_process(
+            "send-cmd", self._async_send_command, cmd, bg_start_span=False
+        )
 
     async def _async_send_command(self, cmd: Command):
         """Encode a replication command and send it over our outbound connection"""
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 666c13fdb7..1d4ceac0f1 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -117,6 +117,16 @@ class ReplicationStreamer:
                 stream.discard_updates_and_advance()
             return
 
+        # We check up front to see if anything has actually changed, as we get
+        # poked because of changes that happened on other instances.
+        if all(
+            stream.last_token == stream.current_token(self._instance_name)
+            for stream in self.streams
+        ):
+            return
+
+        # If there are updates then we need to set this even if we're already
+        # looping, as the loop needs to know that he might need to loop again.
         self.pending_updates = True
 
         if self.is_looping:
diff --git a/synapse/replication/tcp/streams/events.py b/synapse/replication/tcp/streams/events.py
index 82e9e0d64e..86a62b71eb 100644
--- a/synapse/replication/tcp/streams/events.py
+++ b/synapse/replication/tcp/streams/events.py
@@ -15,12 +15,15 @@
 # limitations under the License.
 import heapq
 from collections.abc import Iterable
-from typing import List, Tuple, Type
+from typing import TYPE_CHECKING, List, Optional, Tuple, Type
 
 import attr
 
 from ._base import Stream, StreamUpdateResult, Token
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 """Handling of the 'events' replication stream
 
 This stream contains rows of various types. Each row therefore contains a 'type'
@@ -81,12 +84,14 @@ class BaseEventsStreamRow:
 class EventsStreamEventRow(BaseEventsStreamRow):
     TypeId = "ev"
 
-    event_id = attr.ib()  # str
-    room_id = attr.ib()  # str
-    type = attr.ib()  # str
-    state_key = attr.ib()  # str, optional
-    redacts = attr.ib()  # str, optional
-    relates_to = attr.ib()  # str, optional
+    event_id = attr.ib(type=str)
+    room_id = attr.ib(type=str)
+    type = attr.ib(type=str)
+    state_key = attr.ib(type=Optional[str])
+    redacts = attr.ib(type=Optional[str])
+    relates_to = attr.ib(type=Optional[str])
+    membership = attr.ib(type=Optional[str])
+    rejected = attr.ib(type=bool)
 
 
 @attr.s(slots=True, frozen=True)
@@ -113,7 +118,7 @@ class EventsStream(Stream):
 
     NAME = "events"
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self._store = hs.get_datastore()
         super().__init__(
             hs.get_instance_name(),
diff --git a/synapse/rest/admin/__init__.py b/synapse/rest/admin/__init__.py
index 789431ef25..fa7e9e4043 100644
--- a/synapse/rest/admin/__init__.py
+++ b/synapse/rest/admin/__init__.py
@@ -31,7 +31,10 @@ from synapse.rest.admin.devices import (
     DeviceRestServlet,
     DevicesRestServlet,
 )
-from synapse.rest.admin.event_reports import EventReportsRestServlet
+from synapse.rest.admin.event_reports import (
+    EventReportDetailRestServlet,
+    EventReportsRestServlet,
+)
 from synapse.rest.admin.groups import DeleteGroupAdminRestServlet
 from synapse.rest.admin.media import ListMediaInRoom, register_servlets_for_media_repo
 from synapse.rest.admin.purge_room_servlet import PurgeRoomServlet
@@ -47,9 +50,11 @@ from synapse.rest.admin.server_notice_servlet import SendServerNoticeServlet
 from synapse.rest.admin.users import (
     AccountValidityRenewServlet,
     DeactivateAccountRestServlet,
+    PushersRestServlet,
     ResetPasswordRestServlet,
     SearchUsersRestServlet,
     UserAdminServlet,
+    UserMediaRestServlet,
     UserMembershipRestServlet,
     UserRegisterServlet,
     UserRestServletV2,
@@ -215,13 +220,16 @@ def register_servlets(hs, http_server):
     SendServerNoticeServlet(hs).register(http_server)
     VersionServlet(hs).register(http_server)
     UserAdminServlet(hs).register(http_server)
+    UserMediaRestServlet(hs).register(http_server)
     UserMembershipRestServlet(hs).register(http_server)
     UserRestServletV2(hs).register(http_server)
     UsersRestServletV2(hs).register(http_server)
     DeviceRestServlet(hs).register(http_server)
     DevicesRestServlet(hs).register(http_server)
     DeleteDevicesRestServlet(hs).register(http_server)
+    EventReportDetailRestServlet(hs).register(http_server)
     EventReportsRestServlet(hs).register(http_server)
+    PushersRestServlet(hs).register(http_server)
 
 
 def register_servlets_for_client_rest_resource(hs, http_server):
diff --git a/synapse/rest/admin/devices.py b/synapse/rest/admin/devices.py
index a163863322..ffd3aa38f7 100644
--- a/synapse/rest/admin/devices.py
+++ b/synapse/rest/admin/devices.py
@@ -119,7 +119,7 @@ class DevicesRestServlet(RestServlet):
             raise NotFoundError("Unknown user")
 
         devices = await self.device_handler.get_devices_by_user(target_user.to_string())
-        return 200, {"devices": devices}
+        return 200, {"devices": devices, "total": len(devices)}
 
 
 class DeleteDevicesRestServlet(RestServlet):
diff --git a/synapse/rest/admin/event_reports.py b/synapse/rest/admin/event_reports.py
index 5b8d0594cd..fd482f0e32 100644
--- a/synapse/rest/admin/event_reports.py
+++ b/synapse/rest/admin/event_reports.py
@@ -15,7 +15,7 @@
 
 import logging
 
-from synapse.api.errors import Codes, SynapseError
+from synapse.api.errors import Codes, NotFoundError, SynapseError
 from synapse.http.servlet import RestServlet, parse_integer, parse_string
 from synapse.rest.admin._base import admin_patterns, assert_requester_is_admin
 
@@ -86,3 +86,47 @@ class EventReportsRestServlet(RestServlet):
             ret["next_token"] = start + len(event_reports)
 
         return 200, ret
+
+
+class EventReportDetailRestServlet(RestServlet):
+    """
+    Get a specific reported event that is known to the homeserver. Results are returned
+    in a dictionary containing report information.
+    The requester must have administrator access in Synapse.
+
+    GET /_synapse/admin/v1/event_reports/<report_id>
+    returns:
+        200 OK with details report if success otherwise an error.
+
+    Args:
+        The parameter `report_id` is the ID of the event report in the database.
+    Returns:
+        JSON blob of information about the event report
+    """
+
+    PATTERNS = admin_patterns("/event_reports/(?P<report_id>[^/]*)$")
+
+    def __init__(self, hs):
+        self.hs = hs
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+
+    async def on_GET(self, request, report_id):
+        await assert_requester_is_admin(self.auth, request)
+
+        message = (
+            "The report_id parameter must be a string representing a positive integer."
+        )
+        try:
+            report_id = int(report_id)
+        except ValueError:
+            raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
+
+        if report_id < 0:
+            raise SynapseError(400, message, errcode=Codes.INVALID_PARAM)
+
+        ret = await self.store.get_event_report(report_id)
+        if not ret:
+            raise NotFoundError("Event report not found")
+
+        return 200, ret
diff --git a/synapse/rest/admin/media.py b/synapse/rest/admin/media.py
index ee75095c0e..ba50cb876d 100644
--- a/synapse/rest/admin/media.py
+++ b/synapse/rest/admin/media.py
@@ -16,9 +16,10 @@
 
 import logging
 
-from synapse.api.errors import AuthError
-from synapse.http.servlet import RestServlet, parse_integer
+from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
+from synapse.http.servlet import RestServlet, parse_boolean, parse_integer
 from synapse.rest.admin._base import (
+    admin_patterns,
     assert_requester_is_admin,
     assert_user_is_admin,
     historical_admin_path_patterns,
@@ -150,6 +151,80 @@ class PurgeMediaCacheRestServlet(RestServlet):
         return 200, ret
 
 
+class DeleteMediaByID(RestServlet):
+    """Delete local media by a given ID. Removes it from this server.
+    """
+
+    PATTERNS = admin_patterns("/media/(?P<server_name>[^/]+)/(?P<media_id>[^/]+)")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+        self.server_name = hs.hostname
+        self.media_repository = hs.get_media_repository()
+
+    async def on_DELETE(self, request, server_name: str, media_id: str):
+        await assert_requester_is_admin(self.auth, request)
+
+        if self.server_name != server_name:
+            raise SynapseError(400, "Can only delete local media")
+
+        if await self.store.get_local_media(media_id) is None:
+            raise NotFoundError("Unknown media")
+
+        logging.info("Deleting local media by ID: %s", media_id)
+
+        deleted_media, total = await self.media_repository.delete_local_media(media_id)
+        return 200, {"deleted_media": deleted_media, "total": total}
+
+
+class DeleteMediaByDateSize(RestServlet):
+    """Delete local media and local copies of remote media by
+    timestamp and size.
+    """
+
+    PATTERNS = admin_patterns("/media/(?P<server_name>[^/]+)/delete")
+
+    def __init__(self, hs):
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+        self.server_name = hs.hostname
+        self.media_repository = hs.get_media_repository()
+
+    async def on_POST(self, request, server_name: str):
+        await assert_requester_is_admin(self.auth, request)
+
+        before_ts = parse_integer(request, "before_ts", required=True)
+        size_gt = parse_integer(request, "size_gt", default=0)
+        keep_profiles = parse_boolean(request, "keep_profiles", default=True)
+
+        if before_ts < 0:
+            raise SynapseError(
+                400,
+                "Query parameter before_ts must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+        if size_gt < 0:
+            raise SynapseError(
+                400,
+                "Query parameter size_gt must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
+        if self.server_name != server_name:
+            raise SynapseError(400, "Can only delete local media")
+
+        logging.info(
+            "Deleting local media by timestamp: %s, size larger than: %s, keep profile media: %s"
+            % (before_ts, size_gt, keep_profiles)
+        )
+
+        deleted_media, total = await self.media_repository.delete_old_local_media(
+            before_ts, size_gt, keep_profiles
+        )
+        return 200, {"deleted_media": deleted_media, "total": total}
+
+
 def register_servlets_for_media_repo(hs, http_server):
     """
     Media repo specific APIs.
@@ -159,3 +234,5 @@ def register_servlets_for_media_repo(hs, http_server):
     QuarantineMediaByID(hs).register(http_server)
     QuarantineMediaByUser(hs).register(http_server)
     ListMediaInRoom(hs).register(http_server)
+    DeleteMediaByID(hs).register(http_server)
+    DeleteMediaByDateSize(hs).register(http_server)
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 8efefbc0a0..b337311a37 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -16,6 +16,7 @@ import hashlib
 import hmac
 import logging
 from http import HTTPStatus
+from typing import Tuple
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, NotFoundError, SynapseError
@@ -27,16 +28,28 @@ from synapse.http.servlet import (
     parse_json_object_from_request,
     parse_string,
 )
+from synapse.http.site import SynapseRequest
 from synapse.rest.admin._base import (
     admin_patterns,
     assert_requester_is_admin,
     assert_user_is_admin,
     historical_admin_path_patterns,
 )
-from synapse.types import UserID
+from synapse.types import JsonDict, UserID
 
 logger = logging.getLogger(__name__)
 
+_GET_PUSHERS_ALLOWED_KEYS = {
+    "app_display_name",
+    "app_id",
+    "data",
+    "device_display_name",
+    "kind",
+    "lang",
+    "profile_tag",
+    "pushkey",
+}
+
 
 class UsersRestServlet(RestServlet):
     PATTERNS = historical_admin_path_patterns("/users/(?P<user_id>[^/]*)$")
@@ -702,9 +715,114 @@ class UserMembershipRestServlet(RestServlet):
         if not self.is_mine(UserID.from_string(user_id)):
             raise SynapseError(400, "Can only lookup local users")
 
+        user = await self.store.get_user_by_id(user_id)
+        if user is None:
+            raise NotFoundError("Unknown user")
+
         room_ids = await self.store.get_rooms_for_user(user_id)
-        if not room_ids:
+        ret = {"joined_rooms": list(room_ids), "total": len(room_ids)}
+        return 200, ret
+
+
+class PushersRestServlet(RestServlet):
+    """
+    Gets information about all pushers for a specific `user_id`.
+
+    Example:
+        http://localhost:8008/_synapse/admin/v1/users/
+        @user:server/pushers
+
+    Returns:
+        pushers: Dictionary containing pushers information.
+        total: Number of pushers in dictonary `pushers`.
+    """
+
+    PATTERNS = admin_patterns("/users/(?P<user_id>[^/]*)/pushers$")
+
+    def __init__(self, hs):
+        self.is_mine = hs.is_mine
+        self.store = hs.get_datastore()
+        self.auth = hs.get_auth()
+
+    async def on_GET(
+        self, request: SynapseRequest, user_id: str
+    ) -> Tuple[int, JsonDict]:
+        await assert_requester_is_admin(self.auth, request)
+
+        if not self.is_mine(UserID.from_string(user_id)):
+            raise SynapseError(400, "Can only lookup local users")
+
+        if not await self.store.get_user_by_id(user_id):
             raise NotFoundError("User not found")
 
-        ret = {"joined_rooms": list(room_ids), "total": len(room_ids)}
+        pushers = await self.store.get_pushers_by_user_id(user_id)
+
+        filtered_pushers = [
+            {k: v for k, v in p.items() if k in _GET_PUSHERS_ALLOWED_KEYS}
+            for p in pushers
+        ]
+
+        return 200, {"pushers": filtered_pushers, "total": len(filtered_pushers)}
+
+
+class UserMediaRestServlet(RestServlet):
+    """
+    Gets information about all uploaded local media for a specific `user_id`.
+
+    Example:
+        http://localhost:8008/_synapse/admin/v1/users/
+        @user:server/media
+
+    Args:
+        The parameters `from` and `limit` are required for pagination.
+        By default, a `limit` of 100 is used.
+    Returns:
+        A list of media and an integer representing the total number of
+        media that exist given for this user
+    """
+
+    PATTERNS = admin_patterns("/users/(?P<user_id>[^/]+)/media$")
+
+    def __init__(self, hs):
+        self.is_mine = hs.is_mine
+        self.auth = hs.get_auth()
+        self.store = hs.get_datastore()
+
+    async def on_GET(
+        self, request: SynapseRequest, user_id: str
+    ) -> Tuple[int, JsonDict]:
+        await assert_requester_is_admin(self.auth, request)
+
+        if not self.is_mine(UserID.from_string(user_id)):
+            raise SynapseError(400, "Can only lookup local users")
+
+        user = await self.store.get_user_by_id(user_id)
+        if user is None:
+            raise NotFoundError("Unknown user")
+
+        start = parse_integer(request, "from", default=0)
+        limit = parse_integer(request, "limit", default=100)
+
+        if start < 0:
+            raise SynapseError(
+                400,
+                "Query parameter from must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
+        if limit < 0:
+            raise SynapseError(
+                400,
+                "Query parameter limit must be a string representing a positive integer.",
+                errcode=Codes.INVALID_PARAM,
+            )
+
+        media, total = await self.store.get_local_media_by_user_paginate(
+            start, limit, user_id
+        )
+
+        ret = {"media": media, "total": total}
+        if (start + limit) < total:
+            ret["next_token"] = start + len(media)
+
         return 200, ret
diff --git a/synapse/rest/client/v1/events.py b/synapse/rest/client/v1/events.py
index 1ecb77aa26..6de4078290 100644
--- a/synapse/rest/client/v1/events.py
+++ b/synapse/rest/client/v1/events.py
@@ -67,9 +67,6 @@ class EventStreamRestServlet(RestServlet):
 
         return 200, chunk
 
-    def on_OPTIONS(self, request):
-        return 200, {}
-
 
 class EventRestServlet(RestServlet):
     PATTERNS = client_patterns("/events/(?P<event_id>[^/]*)$", v1=True)
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index b82a4e978a..94452fcbf5 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -114,9 +114,6 @@ class LoginRestServlet(RestServlet):
 
         return 200, {"flows": flows}
 
-    def on_OPTIONS(self, request: SynapseRequest):
-        return 200, {}
-
     async def on_POST(self, request: SynapseRequest):
         self._address_ratelimiter.ratelimit(request.getClientIP())
 
diff --git a/synapse/rest/client/v1/logout.py b/synapse/rest/client/v1/logout.py
index f792b50cdc..ad8cea49c6 100644
--- a/synapse/rest/client/v1/logout.py
+++ b/synapse/rest/client/v1/logout.py
@@ -30,9 +30,6 @@ class LogoutRestServlet(RestServlet):
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
 
-    def on_OPTIONS(self, request):
-        return 200, {}
-
     async def on_POST(self, request):
         requester = await self.auth.get_user_by_req(request, allow_expired=True)
 
@@ -58,9 +55,6 @@ class LogoutAllRestServlet(RestServlet):
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
 
-    def on_OPTIONS(self, request):
-        return 200, {}
-
     async def on_POST(self, request):
         requester = await self.auth.get_user_by_req(request, allow_expired=True)
         user_id = requester.user.to_string()
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index 79d8e3057f..23a529f8e3 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -86,9 +86,6 @@ class PresenceStatusRestServlet(RestServlet):
 
         return 200, {}
 
-    def on_OPTIONS(self, request):
-        return 200, {}
-
 
 def register_servlets(hs, http_server):
     PresenceStatusRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/profile.py b/synapse/rest/client/v1/profile.py
index e7fcd2b1ff..85a66458c5 100644
--- a/synapse/rest/client/v1/profile.py
+++ b/synapse/rest/client/v1/profile.py
@@ -67,9 +67,6 @@ class ProfileDisplaynameRestServlet(RestServlet):
 
         return 200, {}
 
-    def on_OPTIONS(self, request, user_id):
-        return 200, {}
-
 
 class ProfileAvatarURLRestServlet(RestServlet):
     PATTERNS = client_patterns("/profile/(?P<user_id>[^/]*)/avatar_url", v1=True)
@@ -118,9 +115,6 @@ class ProfileAvatarURLRestServlet(RestServlet):
 
         return 200, {}
 
-    def on_OPTIONS(self, request, user_id):
-        return 200, {}
-
 
 class ProfileRestServlet(RestServlet):
     PATTERNS = client_patterns("/profile/(?P<user_id>[^/]*)", v1=True)
diff --git a/synapse/rest/client/v1/push_rule.py b/synapse/rest/client/v1/push_rule.py
index f9eecb7cf5..241e535917 100644
--- a/synapse/rest/client/v1/push_rule.py
+++ b/synapse/rest/client/v1/push_rule.py
@@ -155,9 +155,6 @@ class PushRuleRestServlet(RestServlet):
         else:
             raise UnrecognizedRequestError()
 
-    def on_OPTIONS(self, request, path):
-        return 200, {}
-
     def notify_user(self, user_id):
         stream_id = self.store.get_max_push_rules_stream_id()
         self.notifier.on_new_event("push_rules_key", stream_id, users=[user_id])
diff --git a/synapse/rest/client/v1/pusher.py b/synapse/rest/client/v1/pusher.py
index 28dabf1c7a..8fe83f321a 100644
--- a/synapse/rest/client/v1/pusher.py
+++ b/synapse/rest/client/v1/pusher.py
@@ -60,9 +60,6 @@ class PushersRestServlet(RestServlet):
 
         return 200, {"pushers": filtered_pushers}
 
-    def on_OPTIONS(self, _):
-        return 200, {}
-
 
 class PushersSetRestServlet(RestServlet):
     PATTERNS = client_patterns("/pushers/set$", v1=True)
@@ -140,9 +137,6 @@ class PushersSetRestServlet(RestServlet):
 
         return 200, {}
 
-    def on_OPTIONS(self, _):
-        return 200, {}
-
 
 class PushersRemoveRestServlet(RestServlet):
     """
@@ -182,9 +176,6 @@ class PushersRemoveRestServlet(RestServlet):
         )
         return None
 
-    def on_OPTIONS(self, _):
-        return 200, {}
-
 
 def register_servlets(hs, http_server):
     PushersRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 00b4397082..25d3cc6148 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -72,20 +72,6 @@ class RoomCreateRestServlet(TransactionRestServlet):
     def register(self, http_server):
         PATTERNS = "/createRoom"
         register_txn_path(self, PATTERNS, http_server)
-        # define CORS for all of /rooms in RoomCreateRestServlet for simplicity
-        http_server.register_paths(
-            "OPTIONS",
-            client_patterns("/rooms(?:/.*)?$", v1=True),
-            self.on_OPTIONS,
-            self.__class__.__name__,
-        )
-        # define CORS for /createRoom[/txnid]
-        http_server.register_paths(
-            "OPTIONS",
-            client_patterns("/createRoom(?:/.*)?$", v1=True),
-            self.on_OPTIONS,
-            self.__class__.__name__,
-        )
 
     def on_PUT(self, request, txn_id):
         set_tag("txn_id", txn_id)
@@ -104,9 +90,6 @@ class RoomCreateRestServlet(TransactionRestServlet):
         user_supplied_config = parse_json_object_from_request(request)
         return user_supplied_config
 
-    def on_OPTIONS(self, request):
-        return 200, {}
-
 
 # TODO: Needs unit testing for generic events
 class RoomStateEventRestServlet(TransactionRestServlet):
diff --git a/synapse/rest/client/v1/voip.py b/synapse/rest/client/v1/voip.py
index b8d491ca5c..d07ca2c47c 100644
--- a/synapse/rest/client/v1/voip.py
+++ b/synapse/rest/client/v1/voip.py
@@ -69,9 +69,6 @@ class VoipRestServlet(RestServlet):
             },
         )
 
-    def on_OPTIONS(self, request):
-        return 200, {}
-
 
 def register_servlets(hs, http_server):
     VoipRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index e857cff176..51effc4d8e 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -268,9 +268,6 @@ class PasswordRestServlet(RestServlet):
 
         return 200, {}
 
-    def on_OPTIONS(self, _):
-        return 200, {}
-
 
 class DeactivateAccountRestServlet(RestServlet):
     PATTERNS = client_patterns("/account/deactivate$")
diff --git a/synapse/rest/client/v2_alpha/auth.py b/synapse/rest/client/v2_alpha/auth.py
index 5fbfae5991..fab077747f 100644
--- a/synapse/rest/client/v2_alpha/auth.py
+++ b/synapse/rest/client/v2_alpha/auth.py
@@ -176,9 +176,6 @@ class AuthRestServlet(RestServlet):
         respond_with_html(request, 200, html)
         return None
 
-    def on_OPTIONS(self, _):
-        return 200, {}
-
 
 def register_servlets(hs, http_server):
     AuthRestServlet(hs).register(http_server)
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 395b6a82a9..8f2c8cd991 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -642,9 +642,6 @@ class RegisterRestServlet(RestServlet):
 
         return 200, return_dict
 
-    def on_OPTIONS(self, _):
-        return 200, {}
-
     async def _do_appservice_registration(self, username, as_token, body):
         user_id = await self.registration_handler.appservice_register(
             username, as_token
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 7447eeaebe..9e079f672f 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -69,6 +69,23 @@ class MediaFilePaths:
 
     local_media_thumbnail = _wrap_in_base_path(local_media_thumbnail_rel)
 
+    def local_media_thumbnail_dir(self, media_id: str) -> str:
+        """
+        Retrieve the local store path of thumbnails of a given media_id
+
+        Args:
+            media_id: The media ID to query.
+        Returns:
+            Path of local_thumbnails from media_id
+        """
+        return os.path.join(
+            self.base_path,
+            "local_thumbnails",
+            media_id[0:2],
+            media_id[2:4],
+            media_id[4:],
+        )
+
     def remote_media_filepath_rel(self, server_name, file_id):
         return os.path.join(
             "remote_content", server_name, file_id[0:2], file_id[2:4], file_id[4:]
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index e1192b47cd..9cac74ebd8 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -18,7 +18,7 @@ import errno
 import logging
 import os
 import shutil
-from typing import IO, Dict, Optional, Tuple
+from typing import IO, Dict, List, Optional, Tuple
 
 import twisted.internet.error
 import twisted.web.http
@@ -305,15 +305,12 @@ class MediaRepository:
         # file_id is the ID we use to track the file locally. If we've already
         # seen the file then reuse the existing ID, otherwise genereate a new
         # one.
-        if media_info:
-            file_id = media_info["filesystem_id"]
-        else:
-            file_id = random_string(24)
-
-        file_info = FileInfo(server_name, file_id)
 
         # If we have an entry in the DB, try and look for it
         if media_info:
+            file_id = media_info["filesystem_id"]
+            file_info = FileInfo(server_name, file_id)
+
             if media_info["quarantined_by"]:
                 logger.info("Media is quarantined")
                 raise NotFoundError()
@@ -324,14 +321,34 @@ class MediaRepository:
 
         # Failed to find the file anywhere, lets download it.
 
-        media_info = await self._download_remote_file(server_name, media_id, file_id)
+        try:
+            media_info = await self._download_remote_file(server_name, media_id,)
+        except SynapseError:
+            raise
+        except Exception as e:
+            # An exception may be because we downloaded media in another
+            # process, so let's check if we magically have the media.
+            media_info = await self.store.get_cached_remote_media(server_name, media_id)
+            if not media_info:
+                raise e
+
+        file_id = media_info["filesystem_id"]
+        file_info = FileInfo(server_name, file_id)
+
+        # We generate thumbnails even if another process downloaded the media
+        # as a) it's conceivable that the other download request dies before it
+        # generates thumbnails, but mainly b) we want to be sure the thumbnails
+        # have finished being generated before responding to the client,
+        # otherwise they'll request thumbnails and get a 404 if they're not
+        # ready yet.
+        await self._generate_thumbnails(
+            server_name, media_id, file_id, media_info["media_type"]
+        )
 
         responder = await self.media_storage.fetch_media(file_info)
         return responder, media_info
 
-    async def _download_remote_file(
-        self, server_name: str, media_id: str, file_id: str
-    ) -> dict:
+    async def _download_remote_file(self, server_name: str, media_id: str,) -> dict:
         """Attempt to download the remote file from the given server name,
         using the given file_id as the local id.
 
@@ -346,6 +363,8 @@ class MediaRepository:
             The media info of the file.
         """
 
+        file_id = random_string(24)
+
         file_info = FileInfo(server_name=server_name, file_id=file_id)
 
         with self.media_storage.store_into_file(file_info) as (f, fname, finish):
@@ -401,22 +420,32 @@ class MediaRepository:
 
             await finish()
 
-        media_type = headers[b"Content-Type"][0].decode("ascii")
-        upload_name = get_filename_from_headers(headers)
-        time_now_ms = self.clock.time_msec()
+            media_type = headers[b"Content-Type"][0].decode("ascii")
+            upload_name = get_filename_from_headers(headers)
+            time_now_ms = self.clock.time_msec()
+
+            # Multiple remote media download requests can race (when using
+            # multiple media repos), so this may throw a violation constraint
+            # exception. If it does we'll delete the newly downloaded file from
+            # disk (as we're in the ctx manager).
+            #
+            # However: we've already called `finish()` so we may have also
+            # written to the storage providers. This is preferable to the
+            # alternative where we call `finish()` *after* this, where we could
+            # end up having an entry in the DB but fail to write the files to
+            # the storage providers.
+            await self.store.store_cached_remote_media(
+                origin=server_name,
+                media_id=media_id,
+                media_type=media_type,
+                time_now_ms=self.clock.time_msec(),
+                upload_name=upload_name,
+                media_length=length,
+                filesystem_id=file_id,
+            )
 
         logger.info("Stored remote media in file %r", fname)
 
-        await self.store.store_cached_remote_media(
-            origin=server_name,
-            media_id=media_id,
-            media_type=media_type,
-            time_now_ms=self.clock.time_msec(),
-            upload_name=upload_name,
-            media_length=length,
-            filesystem_id=file_id,
-        )
-
         media_info = {
             "media_type": media_type,
             "media_length": length,
@@ -425,8 +454,6 @@ class MediaRepository:
             "filesystem_id": file_id,
         }
 
-        await self._generate_thumbnails(server_name, media_id, file_id, media_type)
-
         return media_info
 
     def _get_thumbnail_requirements(self, media_type):
@@ -692,42 +719,60 @@ class MediaRepository:
             if not t_byte_source:
                 continue
 
-            try:
-                file_info = FileInfo(
-                    server_name=server_name,
-                    file_id=file_id,
-                    thumbnail=True,
-                    thumbnail_width=t_width,
-                    thumbnail_height=t_height,
-                    thumbnail_method=t_method,
-                    thumbnail_type=t_type,
-                    url_cache=url_cache,
-                )
-
-                output_path = await self.media_storage.store_file(
-                    t_byte_source, file_info
-                )
-            finally:
-                t_byte_source.close()
-
-            t_len = os.path.getsize(output_path)
+            file_info = FileInfo(
+                server_name=server_name,
+                file_id=file_id,
+                thumbnail=True,
+                thumbnail_width=t_width,
+                thumbnail_height=t_height,
+                thumbnail_method=t_method,
+                thumbnail_type=t_type,
+                url_cache=url_cache,
+            )
 
-            # Write to database
-            if server_name:
-                await self.store.store_remote_media_thumbnail(
-                    server_name,
-                    media_id,
-                    file_id,
-                    t_width,
-                    t_height,
-                    t_type,
-                    t_method,
-                    t_len,
-                )
-            else:
-                await self.store.store_local_thumbnail(
-                    media_id, t_width, t_height, t_type, t_method, t_len
-                )
+            with self.media_storage.store_into_file(file_info) as (f, fname, finish):
+                try:
+                    await self.media_storage.write_to_file(t_byte_source, f)
+                    await finish()
+                finally:
+                    t_byte_source.close()
+
+                t_len = os.path.getsize(fname)
+
+                # Write to database
+                if server_name:
+                    # Multiple remote media download requests can race (when
+                    # using multiple media repos), so this may throw a violation
+                    # constraint exception. If it does we'll delete the newly
+                    # generated thumbnail from disk (as we're in the ctx
+                    # manager).
+                    #
+                    # However: we've already called `finish()` so we may have
+                    # also written to the storage providers. This is preferable
+                    # to the alternative where we call `finish()` *after* this,
+                    # where we could end up having an entry in the DB but fail
+                    # to write the files to the storage providers.
+                    try:
+                        await self.store.store_remote_media_thumbnail(
+                            server_name,
+                            media_id,
+                            file_id,
+                            t_width,
+                            t_height,
+                            t_type,
+                            t_method,
+                            t_len,
+                        )
+                    except Exception as e:
+                        thumbnail_exists = await self.store.get_remote_media_thumbnail(
+                            server_name, media_id, t_width, t_height, t_type,
+                        )
+                        if not thumbnail_exists:
+                            raise e
+                else:
+                    await self.store.store_local_thumbnail(
+                        media_id, t_width, t_height, t_type, t_method, t_len
+                    )
 
         return {"width": m_width, "height": m_height}
 
@@ -767,6 +812,76 @@ class MediaRepository:
 
         return {"deleted": deleted}
 
+    async def delete_local_media(self, media_id: str) -> Tuple[List[str], int]:
+        """
+        Delete the given local or remote media ID from this server
+
+        Args:
+            media_id: The media ID to delete.
+        Returns:
+            A tuple of (list of deleted media IDs, total deleted media IDs).
+        """
+        return await self._remove_local_media_from_disk([media_id])
+
+    async def delete_old_local_media(
+        self, before_ts: int, size_gt: int = 0, keep_profiles: bool = True,
+    ) -> Tuple[List[str], int]:
+        """
+        Delete local or remote media from this server by size and timestamp. Removes
+        media files, any thumbnails and cached URLs.
+
+        Args:
+            before_ts: Unix timestamp in ms.
+                       Files that were last used before this timestamp will be deleted
+            size_gt: Size of the media in bytes. Files that are larger will be deleted
+            keep_profiles: Switch to delete also files that are still used in image data
+                           (e.g user profile, room avatar)
+                           If false these files will be deleted
+        Returns:
+            A tuple of (list of deleted media IDs, total deleted media IDs).
+        """
+        old_media = await self.store.get_local_media_before(
+            before_ts, size_gt, keep_profiles,
+        )
+        return await self._remove_local_media_from_disk(old_media)
+
+    async def _remove_local_media_from_disk(
+        self, media_ids: List[str]
+    ) -> Tuple[List[str], int]:
+        """
+        Delete local or remote media from this server. Removes media files,
+        any thumbnails and cached URLs.
+
+        Args:
+            media_ids: List of media_id to delete
+        Returns:
+            A tuple of (list of deleted media IDs, total deleted media IDs).
+        """
+        removed_media = []
+        for media_id in media_ids:
+            logger.info("Deleting media with ID '%s'", media_id)
+            full_path = self.filepaths.local_media_filepath(media_id)
+            try:
+                os.remove(full_path)
+            except OSError as e:
+                logger.warning("Failed to remove file: %r: %s", full_path, e)
+                if e.errno == errno.ENOENT:
+                    pass
+                else:
+                    continue
+
+            thumbnail_dir = self.filepaths.local_media_thumbnail_dir(media_id)
+            shutil.rmtree(thumbnail_dir, ignore_errors=True)
+
+            await self.store.delete_remote_media(self.server_name, media_id)
+
+            await self.store.delete_url_cache((media_id,))
+            await self.store.delete_url_cache_media((media_id,))
+
+            removed_media.append(media_id)
+
+        return removed_media, len(removed_media)
+
 
 class MediaRepositoryResource(Resource):
     """File uploading and downloading.
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index a9586fb0b7..268e0c8f50 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -52,6 +52,7 @@ class MediaStorage:
         storage_providers: Sequence["StorageProviderWrapper"],
     ):
         self.hs = hs
+        self.reactor = hs.get_reactor()
         self.local_media_directory = local_media_directory
         self.filepaths = filepaths
         self.storage_providers = storage_providers
@@ -70,13 +71,16 @@ class MediaStorage:
 
         with self.store_into_file(file_info) as (f, fname, finish_cb):
             # Write to the main repository
-            await defer_to_thread(
-                self.hs.get_reactor(), _write_file_synchronously, source, f
-            )
+            await self.write_to_file(source, f)
             await finish_cb()
 
         return fname
 
+    async def write_to_file(self, source: IO, output: IO):
+        """Asynchronously write the `source` to `output`.
+        """
+        await defer_to_thread(self.reactor, _write_file_synchronously, source, output)
+
     @contextlib.contextmanager
     def store_into_file(self, file_info: FileInfo):
         """Context manager used to get a file like object to write into, as
@@ -112,14 +116,20 @@ class MediaStorage:
 
         finished_called = [False]
 
-        async def finish():
-            for provider in self.storage_providers:
-                await provider.store_file(path, file_info)
-
-            finished_called[0] = True
-
         try:
             with open(fname, "wb") as f:
+
+                async def finish():
+                    # Ensure that all writes have been flushed and close the
+                    # file.
+                    f.flush()
+                    f.close()
+
+                    for provider in self.storage_providers:
+                        await provider.store_file(path, file_info)
+
+                    finished_called[0] = True
+
                 yield f, fname, finish
         except Exception:
             try:
@@ -210,7 +220,7 @@ class MediaStorage:
             if res:
                 with res:
                     consumer = BackgroundFileConsumer(
-                        open(local_path, "wb"), self.hs.get_reactor()
+                        open(local_path, "wb"), self.reactor
                     )
                     await res.write_to_consumer(consumer)
                     await consumer.wait()
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 3673e7f47e..9137c4edb1 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -104,7 +104,7 @@ class ConsentServerNotices:
 
 
 def copy_with_str_subst(x: Any, substitutions: Any) -> Any:
-    """Deep-copy a structure, carrying out string substitions on any strings
+    """Deep-copy a structure, carrying out string substitutions on any strings
 
     Args:
         x (object): structure to be copied
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 5b0900aa3c..1fa3b280b4 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -547,7 +547,7 @@ class StateResolutionHandler:
             event_map:
                 a dict from event_id to event, for any events that we happen to
                 have in flight (eg, those currently being persisted). This will be
-                used as a starting point fof finding the state we need; any missing
+                used as a starting point for finding the state we need; any missing
                 events will be requested via state_res_store.
 
                 If None, all events will be fetched via state_res_store.
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index a493279cbd..85edae053d 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -56,7 +56,7 @@ async def resolve_events_with_store(
         event_map:
             a dict from event_id to event, for any events that we happen to
             have in flight (eg, those currently being persisted). This will be
-            used as a starting point fof finding the state we need; any missing
+            used as a starting point for finding the state we need; any missing
             events will be requested via state_map_factory.
 
             If None, all events will be fetched via state_map_factory.
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index edf94e7ad6..f57df0d728 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -69,7 +69,7 @@ async def resolve_events_with_store(
         event_map:
             a dict from event_id to event, for any events that we happen to
             have in flight (eg, those currently being persisted). This will be
-            used as a starting point fof finding the state we need; any missing
+            used as a starting point for finding the state we need; any missing
             events will be requested via state_res_store.
 
             If None, all events will be fetched via state_res_store.
diff --git a/synapse/static/client/login/js/login.js b/synapse/static/client/login/js/login.js
index 3678670ec7..744800ec77 100644
--- a/synapse/static/client/login/js/login.js
+++ b/synapse/static/client/login/js/login.js
@@ -182,7 +182,7 @@ matrixLogin.passwordLogin = function() {
 };
 
 /*
- * The onLogin function gets called after a succesful login.
+ * The onLogin function gets called after a successful login.
  *
  * It is expected that implementations override this to be notified when the
  * login is complete. The response to the login call is provided as the single
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 0217e63108..a0572b2952 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -94,7 +94,7 @@ def make_pool(
         cp_openfun=lambda conn: engine.on_new_connection(
             LoggingDatabaseConnection(conn, engine, "on_new_connection")
         ),
-        **db_config.config.get("args", {})
+        **db_config.config.get("args", {}),
     )
 
 
@@ -632,7 +632,7 @@ class DatabasePool:
                 func,
                 *args,
                 db_autocommit=db_autocommit,
-                **kwargs
+                **kwargs,
             )
 
             for after_callback, after_args, after_kwargs in after_callbacks:
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 9b16f45f3e..43660ec4fb 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -146,7 +146,6 @@ class DataStore(
             db_conn, "e2e_cross_signing_keys", "stream_id"
         )
 
-        self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
         self._event_reports_id_gen = IdGenerator(db_conn, "event_reports", "id")
         self._push_rule_id_gen = IdGenerator(db_conn, "push_rules", "id")
         self._push_rules_enable_id_gen = IdGenerator(db_conn, "push_rules_enable", "id")
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 637a938bac..26eef6eb61 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -15,21 +15,31 @@
 # limitations under the License.
 import logging
 import re
-from typing import List
+from typing import TYPE_CHECKING, List, Optional, Pattern, Tuple
 
-from synapse.appservice import ApplicationService, AppServiceTransaction
+from synapse.appservice import (
+    ApplicationService,
+    ApplicationServiceState,
+    AppServiceTransaction,
+)
 from synapse.config.appservice import load_appservices
 from synapse.events import EventBase
 from synapse.storage._base import SQLBaseStore, db_to_json
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
+from synapse.storage.types import Connection
 from synapse.types import JsonDict
 from synapse.util import json_encoder
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
-def _make_exclusive_regex(services_cache):
+def _make_exclusive_regex(
+    services_cache: List[ApplicationService],
+) -> Optional[Pattern]:
     # We precompile a regex constructed from all the regexes that the AS's
     # have registered for exclusive users.
     exclusive_user_regexes = [
@@ -39,17 +49,19 @@ def _make_exclusive_regex(services_cache):
     ]
     if exclusive_user_regexes:
         exclusive_user_regex = "|".join("(" + r + ")" for r in exclusive_user_regexes)
-        exclusive_user_regex = re.compile(exclusive_user_regex)
+        exclusive_user_pattern = re.compile(
+            exclusive_user_regex
+        )  # type: Optional[Pattern]
     else:
         # We handle this case specially otherwise the constructed regex
         # will always match
-        exclusive_user_regex = None
+        exclusive_user_pattern = None
 
-    return exclusive_user_regex
+    return exclusive_user_pattern
 
 
 class ApplicationServiceWorkerStore(SQLBaseStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         self.services_cache = load_appservices(
             hs.hostname, hs.config.app_service_config_files
         )
@@ -60,7 +72,7 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
     def get_app_services(self):
         return self.services_cache
 
-    def get_if_app_services_interested_in_user(self, user_id):
+    def get_if_app_services_interested_in_user(self, user_id: str) -> bool:
         """Check if the user is one associated with an app service (exclusively)
         """
         if self.exclusive_user_regex:
@@ -68,7 +80,7 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
         else:
             return False
 
-    def get_app_service_by_user_id(self, user_id):
+    def get_app_service_by_user_id(self, user_id: str) -> Optional[ApplicationService]:
         """Retrieve an application service from their user ID.
 
         All application services have associated with them a particular user ID.
@@ -77,35 +89,35 @@ class ApplicationServiceWorkerStore(SQLBaseStore):
         a user ID to an application service.
 
         Args:
-            user_id(str): The user ID to see if it is an application service.
+            user_id: The user ID to see if it is an application service.
         Returns:
-            synapse.appservice.ApplicationService or None.
+            The application service or None.
         """
         for service in self.services_cache:
             if service.sender == user_id:
                 return service
         return None
 
-    def get_app_service_by_token(self, token):
+    def get_app_service_by_token(self, token: str) -> Optional[ApplicationService]:
         """Get the application service with the given appservice token.
 
         Args:
-            token (str): The application service token.
+            token: The application service token.
         Returns:
-            synapse.appservice.ApplicationService or None.
+            The application service or None.
         """
         for service in self.services_cache:
             if service.token == token:
                 return service
         return None
 
-    def get_app_service_by_id(self, as_id):
+    def get_app_service_by_id(self, as_id: str) -> Optional[ApplicationService]:
         """Get the application service with the given appservice ID.
 
         Args:
-            as_id (str): The application service ID.
+            as_id: The application service ID.
         Returns:
-            synapse.appservice.ApplicationService or None.
+            The application service or None.
         """
         for service in self.services_cache:
             if service.id == as_id:
@@ -124,11 +136,13 @@ class ApplicationServiceStore(ApplicationServiceWorkerStore):
 class ApplicationServiceTransactionWorkerStore(
     ApplicationServiceWorkerStore, EventsWorkerStore
 ):
-    async def get_appservices_by_state(self, state):
+    async def get_appservices_by_state(
+        self, state: ApplicationServiceState
+    ) -> List[ApplicationService]:
         """Get a list of application services based on their state.
 
         Args:
-            state(ApplicationServiceState): The state to filter on.
+            state: The state to filter on.
         Returns:
             A list of ApplicationServices, which may be empty.
         """
@@ -145,13 +159,15 @@ class ApplicationServiceTransactionWorkerStore(
                     services.append(service)
         return services
 
-    async def get_appservice_state(self, service):
+    async def get_appservice_state(
+        self, service: ApplicationService
+    ) -> Optional[ApplicationServiceState]:
         """Get the application service state.
 
         Args:
-            service(ApplicationService): The service whose state to set.
+            service: The service whose state to set.
         Returns:
-            An ApplicationServiceState.
+            An ApplicationServiceState or none.
         """
         result = await self.db_pool.simple_select_one(
             "application_services_state",
@@ -164,12 +180,14 @@ class ApplicationServiceTransactionWorkerStore(
             return result.get("state")
         return None
 
-    async def set_appservice_state(self, service, state) -> None:
+    async def set_appservice_state(
+        self, service: ApplicationService, state: ApplicationServiceState
+    ) -> None:
         """Set the application service state.
 
         Args:
-            service(ApplicationService): The service whose state to set.
-            state(ApplicationServiceState): The connectivity state to apply.
+            service: The service whose state to set.
+            state: The connectivity state to apply.
         """
         await self.db_pool.simple_upsert(
             "application_services_state", {"as_id": service.id}, {"state": state}
@@ -226,13 +244,14 @@ class ApplicationServiceTransactionWorkerStore(
             "create_appservice_txn", _create_appservice_txn
         )
 
-    async def complete_appservice_txn(self, txn_id, service) -> None:
+    async def complete_appservice_txn(
+        self, txn_id: int, service: ApplicationService
+    ) -> None:
         """Completes an application service transaction.
 
         Args:
-            txn_id(str): The transaction ID being completed.
-            service(ApplicationService): The application service which was sent
-            this transaction.
+            txn_id: The transaction ID being completed.
+            service: The application service which was sent this transaction.
         """
         txn_id = int(txn_id)
 
@@ -242,7 +261,7 @@ class ApplicationServiceTransactionWorkerStore(
             # has probably missed some events), so whine loudly but still continue,
             # since it shouldn't fail completion of the transaction.
             last_txn_id = self._get_last_txn(txn, service.id)
-            if (last_txn_id + 1) != txn_id:
+            if (txn_id + 1) != txn_id:
                 logger.error(
                     "appservice: Completing a transaction which has an ID > 1 from "
                     "the last ID sent to this AS. We've either dropped events or "
@@ -272,12 +291,13 @@ class ApplicationServiceTransactionWorkerStore(
             "complete_appservice_txn", _complete_appservice_txn
         )
 
-    async def get_oldest_unsent_txn(self, service):
-        """Get the oldest transaction which has not been sent for this
-        service.
+    async def get_oldest_unsent_txn(
+        self, service: ApplicationService
+    ) -> Optional[AppServiceTransaction]:
+        """Get the oldest transaction which has not been sent for this service.
 
         Args:
-            service(ApplicationService): The app service to get the oldest txn.
+            service: The app service to get the oldest txn.
         Returns:
             An AppServiceTransaction or None.
         """
@@ -313,7 +333,7 @@ class ApplicationServiceTransactionWorkerStore(
             service=service, id=entry["txn_id"], events=events, ephemeral=[]
         )
 
-    def _get_last_txn(self, txn, service_id):
+    def _get_last_txn(self, txn, service_id: Optional[str]) -> int:
         txn.execute(
             "SELECT last_txn FROM application_services_state WHERE as_id=?",
             (service_id,),
@@ -324,7 +344,7 @@ class ApplicationServiceTransactionWorkerStore(
         else:
             return int(last_txn_id[0])  # select 'last_txn' col
 
-    async def set_appservice_last_pos(self, pos) -> None:
+    async def set_appservice_last_pos(self, pos: int) -> None:
         def set_appservice_last_pos_txn(txn):
             txn.execute(
                 "UPDATE appservice_stream_position SET stream_ordering = ?", (pos,)
@@ -334,7 +354,9 @@ class ApplicationServiceTransactionWorkerStore(
             "set_appservice_last_pos", set_appservice_last_pos_txn
         )
 
-    async def get_new_events_for_appservice(self, current_id, limit):
+    async def get_new_events_for_appservice(
+        self, current_id: int, limit: int
+    ) -> Tuple[int, List[EventBase]]:
         """Get all new events for an appservice"""
 
         def get_new_events_for_appservice_txn(txn):
@@ -394,7 +416,7 @@ class ApplicationServiceTransactionWorkerStore(
         )
 
     async def set_type_stream_id_for_appservice(
-        self, service: ApplicationService, type: str, pos: int
+        self, service: ApplicationService, type: str, pos: Optional[int]
     ) -> None:
         if type not in ("read_receipt", "presence"):
             raise ValueError(
diff --git a/synapse/storage/databases/main/events_bg_updates.py b/synapse/storage/databases/main/events_bg_updates.py
index 5e4af2eb51..97b6754846 100644
--- a/synapse/storage/databases/main/events_bg_updates.py
+++ b/synapse/storage/databases/main/events_bg_updates.py
@@ -92,6 +92,13 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
             where_clause="NOT have_censored",
         )
 
+        self.db_pool.updates.register_background_index_update(
+            "users_have_local_media",
+            index_name="users_have_local_media",
+            table="local_media_repository",
+            columns=["user_id", "created_ts"],
+        )
+
     async def _background_reindex_fields_sender(self, progress, batch_size):
         target_min_stream_id = progress["target_min_stream_id_inclusive"]
         max_stream_id = progress["max_stream_id_exclusive"]
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 6e7f16f39c..4732685f6e 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -31,6 +31,7 @@ from synapse.api.room_versions import (
     RoomVersions,
 )
 from synapse.events import EventBase, make_event_from_dict
+from synapse.events.snapshot import EventContext
 from synapse.events.utils import prune_event
 from synapse.logging.context import PreserveLoggingContext, current_context
 from synapse.metrics.background_process_metrics import (
@@ -44,7 +45,7 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
 from synapse.storage.database import DatabasePool
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
-from synapse.types import Collection, get_domain_from_id
+from synapse.types import Collection, JsonDict, get_domain_from_id
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
@@ -525,6 +526,57 @@ class EventsWorkerStore(SQLBaseStore):
 
         return event_map
 
+    async def get_stripped_room_state_from_event_context(
+        self,
+        context: EventContext,
+        state_types_to_include: List[EventTypes],
+        membership_user_id: Optional[str] = None,
+    ) -> List[JsonDict]:
+        """
+        Retrieve the stripped state from a room, given an event context to retrieve state
+        from as well as the state types to include. Optionally, include the membership
+        events from a specific user.
+
+        "Stripped" state means that only the `type`, `state_key`, `content` and `sender` keys
+        are included from each state event.
+
+        Args:
+            context: The event context to retrieve state of the room from.
+            state_types_to_include: The type of state events to include.
+            membership_user_id: An optional user ID to include the stripped membership state
+                events of. This is useful when generating the stripped state of a room for
+                invites. We want to send membership events of the inviter, so that the
+                invitee can display the inviter's profile information if the room lacks any.
+
+        Returns:
+            A list of dictionaries, each representing a stripped state event from the room.
+        """
+        current_state_ids = await context.get_current_state_ids()
+
+        # We know this event is not an outlier, so this must be
+        # non-None.
+        assert current_state_ids is not None
+
+        # The state to include
+        state_to_include_ids = [
+            e_id
+            for k, e_id in current_state_ids.items()
+            if k[0] in state_types_to_include
+            or (membership_user_id and k == (EventTypes.Member, membership_user_id))
+        ]
+
+        state_to_include = await self.get_events(state_to_include_ids)
+
+        return [
+            {
+                "type": e.type,
+                "state_key": e.state_key,
+                "content": e.content,
+                "sender": e.sender,
+            }
+            for e in state_to_include.values()
+        ]
+
     def _do_fetch(self, conn):
         """Takes a database connection and waits for requests for events from
         the _event_fetch_list queue.
@@ -1065,11 +1117,13 @@ class EventsWorkerStore(SQLBaseStore):
         def get_all_new_forward_event_rows(txn):
             sql = (
                 "SELECT e.stream_ordering, e.event_id, e.room_id, e.type,"
-                " state_key, redacts, relates_to_id"
+                " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
                 " FROM events AS e"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
+                " LEFT JOIN room_memberships USING (event_id)"
+                " LEFT JOIN rejections USING (event_id)"
                 " WHERE ? < stream_ordering AND stream_ordering <= ?"
                 " AND instance_name = ?"
                 " ORDER BY stream_ordering ASC"
@@ -1100,12 +1154,14 @@ class EventsWorkerStore(SQLBaseStore):
         def get_ex_outlier_stream_rows_txn(txn):
             sql = (
                 "SELECT event_stream_ordering, e.event_id, e.room_id, e.type,"
-                " state_key, redacts, relates_to_id"
+                " state_key, redacts, relates_to_id, membership, rejections.reason IS NOT NULL"
                 " FROM events AS e"
                 " INNER JOIN ex_outlier_stream AS out USING (event_id)"
                 " LEFT JOIN redactions USING (event_id)"
                 " LEFT JOIN state_events USING (event_id)"
                 " LEFT JOIN event_relations USING (event_id)"
+                " LEFT JOIN room_memberships USING (event_id)"
+                " LEFT JOIN rejections USING (event_id)"
                 " WHERE ? < event_stream_ordering"
                 " AND event_stream_ordering <= ?"
                 " AND out.instance_name = ?"
diff --git a/synapse/storage/databases/main/media_repository.py b/synapse/storage/databases/main/media_repository.py
index cc538c5c10..4b2f224718 100644
--- a/synapse/storage/databases/main/media_repository.py
+++ b/synapse/storage/databases/main/media_repository.py
@@ -93,6 +93,7 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
 
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
+        self.server_name = hs.hostname
 
     async def get_local_media(self, media_id: str) -> Optional[Dict[str, Any]]:
         """Get the metadata for a local piece of media
@@ -115,6 +116,109 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="get_local_media",
         )
 
+    async def get_local_media_by_user_paginate(
+        self, start: int, limit: int, user_id: str
+    ) -> Tuple[List[Dict[str, Any]], int]:
+        """Get a paginated list of metadata for a local piece of media
+        which an user_id has uploaded
+
+        Args:
+            start: offset in the list
+            limit: maximum amount of media_ids to retrieve
+            user_id: fully-qualified user id
+        Returns:
+            A paginated list of all metadata of user's media,
+            plus the total count of all the user's media
+        """
+
+        def get_local_media_by_user_paginate_txn(txn):
+
+            args = [user_id]
+            sql = """
+                SELECT COUNT(*) as total_media
+                FROM local_media_repository
+                WHERE user_id = ?
+            """
+            txn.execute(sql, args)
+            count = txn.fetchone()[0]
+
+            sql = """
+                SELECT
+                    "media_id",
+                    "media_type",
+                    "media_length",
+                    "upload_name",
+                    "created_ts",
+                    "last_access_ts",
+                    "quarantined_by",
+                    "safe_from_quarantine"
+                FROM local_media_repository
+                WHERE user_id = ?
+                ORDER BY created_ts DESC, media_id DESC
+                LIMIT ? OFFSET ?
+            """
+
+            args += [limit, start]
+            txn.execute(sql, args)
+            media = self.db_pool.cursor_to_dict(txn)
+            return media, count
+
+        return await self.db_pool.runInteraction(
+            "get_local_media_by_user_paginate_txn", get_local_media_by_user_paginate_txn
+        )
+
+    async def get_local_media_before(
+        self, before_ts: int, size_gt: int, keep_profiles: bool,
+    ) -> Optional[List[str]]:
+
+        # to find files that have never been accessed (last_access_ts IS NULL)
+        # compare with `created_ts`
+        sql = """
+            SELECT media_id
+            FROM local_media_repository AS lmr
+            WHERE
+                ( last_access_ts < ?
+                OR ( created_ts < ? AND last_access_ts IS NULL ) )
+                AND media_length > ?
+        """
+
+        if keep_profiles:
+            sql_keep = """
+                AND (
+                    NOT EXISTS
+                        (SELECT 1
+                         FROM profiles
+                         WHERE profiles.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM groups
+                         WHERE groups.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM room_memberships
+                         WHERE room_memberships.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM user_directory
+                         WHERE user_directory.avatar_url = '{media_prefix}' || lmr.media_id)
+                    AND NOT EXISTS
+                        (SELECT 1
+                         FROM room_stats_state
+                         WHERE room_stats_state.avatar = '{media_prefix}' || lmr.media_id)
+                )
+            """.format(
+                media_prefix="mxc://%s/" % (self.server_name,),
+            )
+            sql += sql_keep
+
+        def _get_local_media_before_txn(txn):
+            txn.execute(sql, (before_ts, before_ts, size_gt))
+            return [row[0] for row in txn]
+
+        return await self.db_pool.runInteraction(
+            "get_local_media_before", _get_local_media_before_txn
+        )
+
     async def store_local_media(
         self,
         media_id,
@@ -348,6 +452,33 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="get_remote_media_thumbnails",
         )
 
+    async def get_remote_media_thumbnail(
+        self, origin: str, media_id: str, t_width: int, t_height: int, t_type: str,
+    ) -> Optional[Dict[str, Any]]:
+        """Fetch the thumbnail info of given width, height and type.
+        """
+
+        return await self.db_pool.simple_select_one(
+            table="remote_media_cache_thumbnails",
+            keyvalues={
+                "media_origin": origin,
+                "media_id": media_id,
+                "thumbnail_width": t_width,
+                "thumbnail_height": t_height,
+                "thumbnail_type": t_type,
+            },
+            retcols=(
+                "thumbnail_width",
+                "thumbnail_height",
+                "thumbnail_method",
+                "thumbnail_type",
+                "thumbnail_length",
+                "filesystem_id",
+            ),
+            allow_none=True,
+            desc="get_remote_media_thumbnail",
+        )
+
     async def store_remote_media_thumbnail(
         self,
         origin,
diff --git a/synapse/storage/databases/main/profile.py b/synapse/storage/databases/main/profile.py
index a6d1eb908a..0e25ca3d7a 100644
--- a/synapse/storage/databases/main/profile.py
+++ b/synapse/storage/databases/main/profile.py
@@ -39,7 +39,7 @@ class ProfileWorkerStore(SQLBaseStore):
             avatar_url=profile["avatar_url"], display_name=profile["displayname"]
         )
 
-    async def get_profile_displayname(self, user_localpart: str) -> str:
+    async def get_profile_displayname(self, user_localpart: str) -> Optional[str]:
         return await self.db_pool.simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
@@ -47,7 +47,7 @@ class ProfileWorkerStore(SQLBaseStore):
             desc="get_profile_displayname",
         )
 
-    async def get_profile_avatar_url(self, user_localpart: str) -> str:
+    async def get_profile_avatar_url(self, user_localpart: str) -> Optional[str]:
         return await self.db_pool.simple_select_one_onecol(
             table="profiles",
             keyvalues={"user_id": user_localpart},
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 4c843b7679..e5d07ce72a 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -16,29 +16,64 @@
 # limitations under the License.
 import logging
 import re
-from typing import Any, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
+
+import attr
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, StoreError, SynapseError, ThreepidValidationError
 from synapse.metrics.background_process_metrics import wrap_as_background_process
-from synapse.storage._base import SQLBaseStore
 from synapse.storage.database import DatabasePool
-from synapse.storage.types import Cursor
+from synapse.storage.databases.main.cache import CacheInvalidationWorkerStore
+from synapse.storage.databases.main.stats import StatsStore
+from synapse.storage.types import Connection, Cursor
+from synapse.storage.util.id_generators import IdGenerator
 from synapse.storage.util.sequence import build_sequence_generator
 from synapse.types import UserID
 from synapse.util.caches.descriptors import cached
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 THIRTY_MINUTES_IN_MS = 30 * 60 * 1000
 
 logger = logging.getLogger(__name__)
 
 
-class RegistrationWorkerStore(SQLBaseStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+@attr.s(frozen=True, slots=True)
+class TokenLookupResult:
+    """Result of looking up an access token.
+
+    Attributes:
+        user_id: The user that this token authenticates as
+        is_guest
+        shadow_banned
+        token_id: The ID of the access token looked up
+        device_id: The device associated with the token, if any.
+        valid_until_ms: The timestamp the token expires, if any.
+        token_owner: The "owner" of the token. This is either the same as the
+            user, or a server admin who is logged in as the user.
+    """
+
+    user_id = attr.ib(type=str)
+    is_guest = attr.ib(type=bool, default=False)
+    shadow_banned = attr.ib(type=bool, default=False)
+    token_id = attr.ib(type=Optional[int], default=None)
+    device_id = attr.ib(type=Optional[str], default=None)
+    valid_until_ms = attr.ib(type=Optional[int], default=None)
+    token_owner = attr.ib(type=str)
+
+    # Make the token owner default to the user ID, which is the common case.
+    @token_owner.default
+    def _default_token_owner(self):
+        return self.user_id
+
+
+class RegistrationWorkerStore(CacheInvalidationWorkerStore):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         self.config = hs.config
-        self.clock = hs.get_clock()
 
         # Note: we don't check this sequence for consistency as we'd have to
         # call `find_max_generated_user_id_localpart` each time, which is
@@ -55,7 +90,7 @@ class RegistrationWorkerStore(SQLBaseStore):
 
         # Create a background job for culling expired 3PID validity tokens
         if hs.config.run_background_tasks:
-            self.clock.looping_call(
+            self._clock.looping_call(
                 self.cull_expired_threepid_validation_tokens, THIRTY_MINUTES_IN_MS
             )
 
@@ -92,21 +127,19 @@ class RegistrationWorkerStore(SQLBaseStore):
         if not info:
             return False
 
-        now = self.clock.time_msec()
+        now = self._clock.time_msec()
         trial_duration_ms = self.config.mau_trial_days * 24 * 60 * 60 * 1000
         is_trial = (now - info["creation_ts"] * 1000) < trial_duration_ms
         return is_trial
 
     @cached()
-    async def get_user_by_access_token(self, token: str) -> Optional[dict]:
+    async def get_user_by_access_token(self, token: str) -> Optional[TokenLookupResult]:
         """Get a user from the given access token.
 
         Args:
             token: The access token of a user.
         Returns:
-            None, if the token did not match, otherwise dict
-            including the keys `name`, `is_guest`, `device_id`, `token_id`,
-            `valid_until_ms`.
+            None, if the token did not match, otherwise a `TokenLookupResult`
         """
         return await self.db_pool.runInteraction(
             "get_user_by_access_token", self._query_for_auth, token
@@ -236,13 +269,13 @@ class RegistrationWorkerStore(SQLBaseStore):
             desc="get_renewal_token_for_user",
         )
 
-    async def get_users_expiring_soon(self) -> List[Dict[str, int]]:
+    async def get_users_expiring_soon(self) -> List[Dict[str, Any]]:
         """Selects users whose account will expire in the [now, now + renew_at] time
         window (see configuration for account_validity for information on what renew_at
         refers to).
 
         Returns:
-            A list of dictionaries mapping user ID to expiration time (in milliseconds).
+            A list of dictionaries, each with a user ID and expiration time (in milliseconds).
         """
 
         def select_users_txn(txn, now_ms, renew_at):
@@ -257,7 +290,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         return await self.db_pool.runInteraction(
             "get_users_expiring_soon",
             select_users_txn,
-            self.clock.time_msec(),
+            self._clock.time_msec(),
             self.config.account_validity.renew_at,
         )
 
@@ -327,19 +360,24 @@ class RegistrationWorkerStore(SQLBaseStore):
 
         await self.db_pool.runInteraction("set_server_admin", set_server_admin_txn)
 
-    def _query_for_auth(self, txn, token):
-        sql = (
-            "SELECT users.name, users.is_guest, users.shadow_banned, access_tokens.id as token_id,"
-            " access_tokens.device_id, access_tokens.valid_until_ms"
-            " FROM users"
-            " INNER JOIN access_tokens on users.name = access_tokens.user_id"
-            " WHERE token = ?"
-        )
+    def _query_for_auth(self, txn, token: str) -> Optional[TokenLookupResult]:
+        sql = """
+            SELECT users.name as user_id,
+                users.is_guest,
+                users.shadow_banned,
+                access_tokens.id as token_id,
+                access_tokens.device_id,
+                access_tokens.valid_until_ms,
+                access_tokens.user_id as token_owner
+            FROM users
+            INNER JOIN access_tokens on users.name = COALESCE(puppets_user_id, access_tokens.user_id)
+            WHERE token = ?
+        """
 
         txn.execute(sql, (token,))
         rows = self.db_pool.cursor_to_dict(txn)
         if rows:
-            return rows[0]
+            return TokenLookupResult(**rows[0])
 
         return None
 
@@ -803,7 +841,7 @@ class RegistrationWorkerStore(SQLBaseStore):
         await self.db_pool.runInteraction(
             "cull_expired_threepid_validation_tokens",
             cull_expired_threepid_validation_tokens_txn,
-            self.clock.time_msec(),
+            self._clock.time_msec(),
         )
 
     @wrap_as_background_process("account_validity_set_expiration_dates")
@@ -890,10 +928,10 @@ class RegistrationWorkerStore(SQLBaseStore):
 
 
 class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
-        self.clock = hs.get_clock()
+        self._clock = hs.get_clock()
         self.config = hs.config
 
         self.db_pool.updates.register_background_index_update(
@@ -1016,13 +1054,56 @@ class RegistrationBackgroundUpdateStore(RegistrationWorkerStore):
 
         return 1
 
+    async def set_user_deactivated_status(
+        self, user_id: str, deactivated: bool
+    ) -> None:
+        """Set the `deactivated` property for the provided user to the provided value.
+
+        Args:
+            user_id: The ID of the user to set the status for.
+            deactivated: The value to set for `deactivated`.
+        """
 
-class RegistrationStore(RegistrationBackgroundUpdateStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
+        await self.db_pool.runInteraction(
+            "set_user_deactivated_status",
+            self.set_user_deactivated_status_txn,
+            user_id,
+            deactivated,
+        )
+
+    def set_user_deactivated_status_txn(self, txn, user_id: str, deactivated: bool):
+        self.db_pool.simple_update_one_txn(
+            txn=txn,
+            table="users",
+            keyvalues={"name": user_id},
+            updatevalues={"deactivated": 1 if deactivated else 0},
+        )
+        self._invalidate_cache_and_stream(
+            txn, self.get_user_deactivated_status, (user_id,)
+        )
+        txn.call_after(self.is_guest.invalidate, (user_id,))
+
+    @cached()
+    async def is_guest(self, user_id: str) -> bool:
+        res = await self.db_pool.simple_select_one_onecol(
+            table="users",
+            keyvalues={"name": user_id},
+            retcol="is_guest",
+            allow_none=True,
+            desc="is_guest",
+        )
+
+        return res if res else False
+
+
+class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
+    def __init__(self, database: DatabasePool, db_conn: Connection, hs: "HomeServer"):
         super().__init__(database, db_conn, hs)
 
         self._ignore_unknown_session_error = hs.config.request_token_inhibit_3pid_errors
 
+        self._access_tokens_id_gen = IdGenerator(db_conn, "access_tokens", "id")
+
     async def add_access_token_to_user(
         self,
         user_id: str,
@@ -1138,19 +1219,19 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
     def _register_user(
         self,
         txn,
-        user_id,
-        password_hash,
-        was_guest,
-        make_guest,
-        appservice_id,
-        create_profile_with_displayname,
-        admin,
-        user_type,
-        shadow_banned,
+        user_id: str,
+        password_hash: Optional[str],
+        was_guest: bool,
+        make_guest: bool,
+        appservice_id: Optional[str],
+        create_profile_with_displayname: Optional[str],
+        admin: bool,
+        user_type: Optional[str],
+        shadow_banned: bool,
     ):
         user_id_obj = UserID.from_string(user_id)
 
-        now = int(self.clock.time())
+        now = int(self._clock.time())
 
         try:
             if was_guest:
@@ -1374,18 +1455,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
 
         await self.db_pool.runInteraction("delete_access_token", f)
 
-    @cached()
-    async def is_guest(self, user_id: str) -> bool:
-        res = await self.db_pool.simple_select_one_onecol(
-            table="users",
-            keyvalues={"name": user_id},
-            retcol="is_guest",
-            allow_none=True,
-            desc="is_guest",
-        )
-
-        return res if res else False
-
     async def add_user_pending_deactivation(self, user_id: str) -> None:
         """
         Adds a user to the table of users who need to be parted from all the rooms they're
@@ -1479,7 +1548,7 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
                 txn,
                 table="threepid_validation_session",
                 keyvalues={"session_id": session_id},
-                updatevalues={"validated_at": self.clock.time_msec()},
+                updatevalues={"validated_at": self._clock.time_msec()},
             )
 
             return next_link
@@ -1547,35 +1616,6 @@ class RegistrationStore(RegistrationBackgroundUpdateStore):
             start_or_continue_validation_session_txn,
         )
 
-    async def set_user_deactivated_status(
-        self, user_id: str, deactivated: bool
-    ) -> None:
-        """Set the `deactivated` property for the provided user to the provided value.
-
-        Args:
-            user_id: The ID of the user to set the status for.
-            deactivated: The value to set for `deactivated`.
-        """
-
-        await self.db_pool.runInteraction(
-            "set_user_deactivated_status",
-            self.set_user_deactivated_status_txn,
-            user_id,
-            deactivated,
-        )
-
-    def set_user_deactivated_status_txn(self, txn, user_id, deactivated):
-        self.db_pool.simple_update_one_txn(
-            txn=txn,
-            table="users",
-            keyvalues={"name": user_id},
-            updatevalues={"deactivated": 1 if deactivated else 0},
-        )
-        self._invalidate_cache_and_stream(
-            txn, self.get_user_deactivated_status, (user_id,)
-        )
-        txn.call_after(self.is_guest.invalidate, (user_id,))
-
 
 def find_max_generated_user_id_localpart(cur: Cursor) -> int:
     """
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index e83d961c20..dc0c4b5499 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1411,6 +1411,65 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
             desc="add_event_report",
         )
 
+    async def get_event_report(self, report_id: int) -> Optional[Dict[str, Any]]:
+        """Retrieve an event report
+
+        Args:
+            report_id: ID of reported event in database
+        Returns:
+            event_report: json list of information from event report
+        """
+
+        def _get_event_report_txn(txn, report_id):
+
+            sql = """
+                SELECT
+                    er.id,
+                    er.received_ts,
+                    er.room_id,
+                    er.event_id,
+                    er.user_id,
+                    er.content,
+                    events.sender,
+                    room_stats_state.canonical_alias,
+                    room_stats_state.name,
+                    event_json.json AS event_json
+                FROM event_reports AS er
+                LEFT JOIN events
+                    ON events.event_id = er.event_id
+                JOIN event_json
+                    ON event_json.event_id = er.event_id
+                JOIN room_stats_state
+                    ON room_stats_state.room_id = er.room_id
+                WHERE er.id = ?
+            """
+
+            txn.execute(sql, [report_id])
+            row = txn.fetchone()
+
+            if not row:
+                return None
+
+            event_report = {
+                "id": row[0],
+                "received_ts": row[1],
+                "room_id": row[2],
+                "event_id": row[3],
+                "user_id": row[4],
+                "score": db_to_json(row[5]).get("score"),
+                "reason": db_to_json(row[5]).get("reason"),
+                "sender": row[6],
+                "canonical_alias": row[7],
+                "name": row[8],
+                "event_json": db_to_json(row[9]),
+            }
+
+            return event_report
+
+        return await self.db_pool.runInteraction(
+            "get_event_report", _get_event_report_txn, report_id
+        )
+
     async def get_event_reports_paginate(
         self,
         start: int,
@@ -1468,18 +1527,15 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                     er.room_id,
                     er.event_id,
                     er.user_id,
-                    er.reason,
                     er.content,
                     events.sender,
-                    room_aliases.room_alias,
-                    event_json.json AS event_json
+                    room_stats_state.canonical_alias,
+                    room_stats_state.name
                 FROM event_reports AS er
-                LEFT JOIN room_aliases
-                    ON room_aliases.room_id = er.room_id
-                JOIN events
+                LEFT JOIN events
                     ON events.event_id = er.event_id
-                JOIN event_json
-                    ON event_json.event_id = er.event_id
+                JOIN room_stats_state
+                    ON room_stats_state.room_id = er.room_id
                 {where_clause}
                 ORDER BY er.received_ts {order}
                 LIMIT ?
@@ -1490,15 +1546,29 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
 
             args += [limit, start]
             txn.execute(sql, args)
-            event_reports = self.db_pool.cursor_to_dict(txn)
-
-            if count > 0:
-                for row in event_reports:
-                    try:
-                        row["content"] = db_to_json(row["content"])
-                        row["event_json"] = db_to_json(row["event_json"])
-                    except Exception:
-                        continue
+
+            event_reports = []
+            for row in txn:
+                try:
+                    s = db_to_json(row[5]).get("score")
+                    r = db_to_json(row[5]).get("reason")
+                except Exception:
+                    logger.error("Unable to parse json from event_reports: %s", row[0])
+                    continue
+                event_reports.append(
+                    {
+                        "id": row[0],
+                        "received_ts": row[1],
+                        "room_id": row[2],
+                        "event_id": row[3],
+                        "user_id": row[4],
+                        "score": s,
+                        "reason": r,
+                        "sender": row[6],
+                        "canonical_alias": row[7],
+                        "name": row[8],
+                    }
+                )
 
             return event_reports, count
 
diff --git a/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql
new file mode 100644
index 0000000000..00a9431a97
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/22puppet_token.sql
@@ -0,0 +1,17 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Whether the access token is an admin token for controlling another user.
+ALTER TABLE access_tokens ADD COLUMN puppets_user_id TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
new file mode 100644
index 0000000000..a2842687f1
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/58/22users_have_local_media.sql
@@ -0,0 +1,2 @@
+INSERT INTO background_updates (update_name, progress_json) VALUES
+  ('users_have_local_media', '{}');
\ No newline at end of file
diff --git a/synapse/types.py b/synapse/types.py
index 5bde67cc07..66bb5bac8d 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -29,6 +29,7 @@ from typing import (
     Tuple,
     Type,
     TypeVar,
+    Union,
 )
 
 import attr
@@ -38,6 +39,7 @@ from unpaddedbase64 import decode_base64
 from synapse.api.errors import Codes, SynapseError
 
 if TYPE_CHECKING:
+    from synapse.appservice.api import ApplicationService
     from synapse.storage.databases.main import DataStore
 
 # define a version of typing.Collection that works on python 3.5
@@ -74,6 +76,7 @@ class Requester(
             "shadow_banned",
             "device_id",
             "app_service",
+            "authenticated_entity",
         ],
     )
 ):
@@ -104,6 +107,7 @@ class Requester(
             "shadow_banned": self.shadow_banned,
             "device_id": self.device_id,
             "app_server_id": self.app_service.id if self.app_service else None,
+            "authenticated_entity": self.authenticated_entity,
         }
 
     @staticmethod
@@ -129,16 +133,18 @@ class Requester(
             shadow_banned=input["shadow_banned"],
             device_id=input["device_id"],
             app_service=appservice,
+            authenticated_entity=input["authenticated_entity"],
         )
 
 
 def create_requester(
-    user_id,
-    access_token_id=None,
-    is_guest=False,
-    shadow_banned=False,
-    device_id=None,
-    app_service=None,
+    user_id: Union[str, "UserID"],
+    access_token_id: Optional[int] = None,
+    is_guest: Optional[bool] = False,
+    shadow_banned: Optional[bool] = False,
+    device_id: Optional[str] = None,
+    app_service: Optional["ApplicationService"] = None,
+    authenticated_entity: Optional[str] = None,
 ):
     """
     Create a new ``Requester`` object
@@ -151,14 +157,27 @@ def create_requester(
         shadow_banned (bool):  True if the user making this request is shadow-banned.
         device_id (str|None):  device_id which was set at authentication time
         app_service (ApplicationService|None):  the AS requesting on behalf of the user
+        authenticated_entity: The entity that authenticated when making the request.
+            This is different to the user_id when an admin user or the server is
+            "puppeting" the user.
 
     Returns:
         Requester
     """
     if not isinstance(user_id, UserID):
         user_id = UserID.from_string(user_id)
+
+    if authenticated_entity is None:
+        authenticated_entity = user_id.to_string()
+
     return Requester(
-        user_id, access_token_id, is_guest, shadow_banned, device_id, app_service
+        user_id,
+        access_token_id,
+        is_guest,
+        shadow_banned,
+        device_id,
+        app_service,
+        authenticated_entity,
     )
 
 
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index 5d7fffee66..a924140cdf 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -13,10 +13,23 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import enum
 import functools
 import inspect
 import logging
-from typing import Any, Callable, Generic, Optional, Tuple, TypeVar, Union, cast
+from typing import (
+    Any,
+    Callable,
+    Generic,
+    Iterable,
+    Mapping,
+    Optional,
+    Sequence,
+    Tuple,
+    TypeVar,
+    Union,
+    cast,
+)
 from weakref import WeakValueDictionary
 
 from twisted.internet import defer
@@ -24,6 +37,7 @@ from twisted.internet import defer
 from synapse.logging.context import make_deferred_yieldable, preserve_fn
 from synapse.util import unwrapFirstError
 from synapse.util.caches.deferred_cache import DeferredCache
+from synapse.util.caches.lrucache import LruCache
 
 logger = logging.getLogger(__name__)
 
@@ -48,7 +62,7 @@ class _CachedFunction(Generic[F]):
 
 
 class _CacheDescriptorBase:
-    def __init__(self, orig: _CachedFunction, num_args, cache_context=False):
+    def __init__(self, orig: Callable[..., Any], num_args, cache_context=False):
         self.orig = orig
 
         arg_spec = inspect.getfullargspec(orig)
@@ -97,8 +111,107 @@ class _CacheDescriptorBase:
 
         self.add_cache_context = cache_context
 
+        self.cache_key_builder = get_cache_key_builder(
+            self.arg_names, self.arg_defaults
+        )
+
+
+class _LruCachedFunction(Generic[F]):
+    cache = None  # type: LruCache[CacheKey, Any]
+    __call__ = None  # type: F
+
+
+def lru_cache(
+    max_entries: int = 1000, cache_context: bool = False,
+) -> Callable[[F], _LruCachedFunction[F]]:
+    """A method decorator that applies a memoizing cache around the function.
+
+    This is more-or-less a drop-in equivalent to functools.lru_cache, although note
+    that the signature is slightly different.
+
+    The main differences with functools.lru_cache are:
+        (a) the size of the cache can be controlled via the cache_factor mechanism
+        (b) the wrapped function can request a "cache_context" which provides a
+            callback mechanism to indicate that the result is no longer valid
+        (c) prometheus metrics are exposed automatically.
+
+    The function should take zero or more arguments, which are used as the key for the
+    cache. Single-argument functions use that argument as the cache key; otherwise the
+    arguments are built into a tuple.
+
+    Cached functions can be "chained" (i.e. a cached function can call other cached
+    functions and get appropriately invalidated when they called caches are
+    invalidated) by adding a special "cache_context" argument to the function
+    and passing that as a kwarg to all caches called. For example:
+
+        @lru_cache(cache_context=True)
+        def foo(self, key, cache_context):
+            r1 = self.bar1(key, on_invalidate=cache_context.invalidate)
+            r2 = self.bar2(key, on_invalidate=cache_context.invalidate)
+            return r1 + r2
+
+    The wrapped function also has a 'cache' property which offers direct access to the
+    underlying LruCache.
+    """
+
+    def func(orig: F) -> _LruCachedFunction[F]:
+        desc = LruCacheDescriptor(
+            orig, max_entries=max_entries, cache_context=cache_context,
+        )
+        return cast(_LruCachedFunction[F], desc)
+
+    return func
+
+
+class LruCacheDescriptor(_CacheDescriptorBase):
+    """Helper for @lru_cache"""
+
+    class _Sentinel(enum.Enum):
+        sentinel = object()
+
+    def __init__(
+        self, orig, max_entries: int = 1000, cache_context: bool = False,
+    ):
+        super().__init__(orig, num_args=None, cache_context=cache_context)
+        self.max_entries = max_entries
+
+    def __get__(self, obj, owner):
+        cache = LruCache(
+            cache_name=self.orig.__name__, max_size=self.max_entries,
+        )  # type: LruCache[CacheKey, Any]
+
+        get_cache_key = self.cache_key_builder
+        sentinel = LruCacheDescriptor._Sentinel.sentinel
+
+        @functools.wraps(self.orig)
+        def _wrapped(*args, **kwargs):
+            invalidate_callback = kwargs.pop("on_invalidate", None)
+            callbacks = (invalidate_callback,) if invalidate_callback else ()
+
+            cache_key = get_cache_key(args, kwargs)
 
-class CacheDescriptor(_CacheDescriptorBase):
+            ret = cache.get(cache_key, default=sentinel, callbacks=callbacks)
+            if ret != sentinel:
+                return ret
+
+            # Add our own `cache_context` to argument list if the wrapped function
+            # has asked for one
+            if self.add_cache_context:
+                kwargs["cache_context"] = _CacheContext.get_instance(cache, cache_key)
+
+            ret2 = self.orig(obj, *args, **kwargs)
+            cache.set(cache_key, ret2, callbacks=callbacks)
+
+            return ret2
+
+        wrapped = cast(_CachedFunction, _wrapped)
+        wrapped.cache = cache
+        obj.__dict__[self.orig.__name__] = wrapped
+
+        return wrapped
+
+
+class DeferredCacheDescriptor(_CacheDescriptorBase):
     """ A method decorator that applies a memoizing cache around the function.
 
     This caches deferreds, rather than the results themselves. Deferreds that
@@ -141,7 +254,6 @@ class CacheDescriptor(_CacheDescriptorBase):
         cache_context=False,
         iterable=False,
     ):
-
         super().__init__(orig, num_args=num_args, cache_context=cache_context)
 
         self.max_entries = max_entries
@@ -157,41 +269,7 @@ class CacheDescriptor(_CacheDescriptorBase):
             iterable=self.iterable,
         )  # type: DeferredCache[CacheKey, Any]
 
-        def get_cache_key_gen(args, kwargs):
-            """Given some args/kwargs return a generator that resolves into
-            the cache_key.
-
-            We loop through each arg name, looking up if its in the `kwargs`,
-            otherwise using the next argument in `args`. If there are no more
-            args then we try looking the arg name up in the defaults
-            """
-            pos = 0
-            for nm in self.arg_names:
-                if nm in kwargs:
-                    yield kwargs[nm]
-                elif pos < len(args):
-                    yield args[pos]
-                    pos += 1
-                else:
-                    yield self.arg_defaults[nm]
-
-        # By default our cache key is a tuple, but if there is only one item
-        # then don't bother wrapping in a tuple.  This is to save memory.
-        if self.num_args == 1:
-            nm = self.arg_names[0]
-
-            def get_cache_key(args, kwargs):
-                if nm in kwargs:
-                    return kwargs[nm]
-                elif len(args):
-                    return args[0]
-                else:
-                    return self.arg_defaults[nm]
-
-        else:
-
-            def get_cache_key(args, kwargs):
-                return tuple(get_cache_key_gen(args, kwargs))
+        get_cache_key = self.cache_key_builder
 
         @functools.wraps(self.orig)
         def _wrapped(*args, **kwargs):
@@ -223,7 +301,6 @@ class CacheDescriptor(_CacheDescriptorBase):
             wrapped.prefill = lambda key, val: cache.prefill(key[0], val)
         else:
             wrapped.invalidate = cache.invalidate
-            wrapped.invalidate_all = cache.invalidate_all
             wrapped.invalidate_many = cache.invalidate_many
             wrapped.prefill = cache.prefill
 
@@ -236,7 +313,7 @@ class CacheDescriptor(_CacheDescriptorBase):
         return wrapped
 
 
-class CacheListDescriptor(_CacheDescriptorBase):
+class DeferredCacheListDescriptor(_CacheDescriptorBase):
     """Wraps an existing cache to support bulk fetching of keys.
 
     Given a list of keys it looks in the cache to find any hits, then passes
@@ -382,11 +459,13 @@ class _CacheContext:
     on a lower level.
     """
 
+    Cache = Union[DeferredCache, LruCache]
+
     _cache_context_objects = (
         WeakValueDictionary()
-    )  # type: WeakValueDictionary[Tuple[DeferredCache, CacheKey], _CacheContext]
+    )  # type: WeakValueDictionary[Tuple[_CacheContext.Cache, CacheKey], _CacheContext]
 
-    def __init__(self, cache, cache_key):  # type: (DeferredCache, CacheKey) -> None
+    def __init__(self, cache: "_CacheContext.Cache", cache_key: CacheKey) -> None:
         self._cache = cache
         self._cache_key = cache_key
 
@@ -396,8 +475,8 @@ class _CacheContext:
 
     @classmethod
     def get_instance(
-        cls, cache, cache_key
-    ):  # type: (DeferredCache, CacheKey) -> _CacheContext
+        cls, cache: "_CacheContext.Cache", cache_key: CacheKey
+    ) -> "_CacheContext":
         """Returns an instance constructed with the given arguments.
 
         A new instance is only created if none already exists.
@@ -418,7 +497,7 @@ def cached(
     cache_context: bool = False,
     iterable: bool = False,
 ) -> Callable[[F], _CachedFunction[F]]:
-    func = lambda orig: CacheDescriptor(
+    func = lambda orig: DeferredCacheDescriptor(
         orig,
         max_entries=max_entries,
         num_args=num_args,
@@ -460,7 +539,7 @@ def cachedList(
             def batch_do_something(self, first_arg, second_args):
                 ...
     """
-    func = lambda orig: CacheListDescriptor(
+    func = lambda orig: DeferredCacheListDescriptor(
         orig,
         cached_method_name=cached_method_name,
         list_name=list_name,
@@ -468,3 +547,65 @@ def cachedList(
     )
 
     return cast(Callable[[F], _CachedFunction[F]], func)
+
+
+def get_cache_key_builder(
+    param_names: Sequence[str], param_defaults: Mapping[str, Any]
+) -> Callable[[Sequence[Any], Mapping[str, Any]], CacheKey]:
+    """Construct a function which will build cache keys suitable for a cached function
+
+    Args:
+        param_names: list of formal parameter names for the cached function
+        param_defaults: a mapping from parameter name to default value for that param
+
+    Returns:
+        A function which will take an (args, kwargs) pair and return a cache key
+    """
+
+    # By default our cache key is a tuple, but if there is only one item
+    # then don't bother wrapping in a tuple.  This is to save memory.
+
+    if len(param_names) == 1:
+        nm = param_names[0]
+
+        def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
+            if nm in kwargs:
+                return kwargs[nm]
+            elif len(args):
+                return args[0]
+            else:
+                return param_defaults[nm]
+
+    else:
+
+        def get_cache_key(args: Sequence[Any], kwargs: Mapping[str, Any]) -> CacheKey:
+            return tuple(_get_cache_key_gen(param_names, param_defaults, args, kwargs))
+
+    return get_cache_key
+
+
+def _get_cache_key_gen(
+    param_names: Iterable[str],
+    param_defaults: Mapping[str, Any],
+    args: Sequence[Any],
+    kwargs: Mapping[str, Any],
+) -> Iterable[Any]:
+    """Given some args/kwargs return a generator that resolves into
+    the cache_key.
+
+    This is essentially the same operation as `inspect.getcallargs`, but optimised so
+    that we don't need to inspect the target function for each call.
+    """
+
+    # We loop through each arg name, looking up if its in the `kwargs`,
+    # otherwise using the next argument in `args`. If there are no more
+    # args then we try looking the arg name up in the defaults.
+    pos = 0
+    for nm in param_names:
+        if nm in kwargs:
+            yield kwargs[nm]
+        elif pos < len(args):
+            yield args[pos]
+            pos += 1
+        else:
+            yield param_defaults[nm]
diff --git a/synapse/util/retryutils.py b/synapse/util/retryutils.py
index a5cc9d0551..4ab379e429 100644
--- a/synapse/util/retryutils.py
+++ b/synapse/util/retryutils.py
@@ -110,7 +110,7 @@ async def get_retry_limiter(destination, clock, store, ignore_backoff=False, **k
         failure_ts,
         retry_interval,
         backoff_on_failure=backoff_on_failure,
-        **kwargs
+        **kwargs,
     )