summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/_scripts/register_new_matrix_user.py2
-rw-r--r--synapse/api/auth.py4
-rw-r--r--synapse/api/constants.py5
-rw-r--r--synapse/api/errors.py10
-rw-r--r--synapse/api/filtering.py4
-rw-r--r--synapse/api/urls.py3
-rw-r--r--synapse/app/_base.py8
-rw-r--r--synapse/app/generic_worker.py41
-rw-r--r--synapse/app/homeserver.py56
-rw-r--r--synapse/appservice/__init__.py4
-rw-r--r--synapse/appservice/api.py3
-rw-r--r--synapse/config/_base.py6
-rw-r--r--synapse/config/appservice.py13
-rw-r--r--synapse/config/cache.py20
-rw-r--r--synapse/config/homeserver.py2
-rw-r--r--synapse/config/oidc_config.py2
-rw-r--r--synapse/config/registration.py106
-rw-r--r--synapse/config/repository.py6
-rw-r--r--synapse/config/room.py80
-rw-r--r--synapse/config/saml2_config.py4
-rw-r--r--synapse/config/server.py237
-rw-r--r--synapse/config/tls.py4
-rw-r--r--synapse/config/workers.py24
-rw-r--r--synapse/crypto/keyring.py6
-rw-r--r--synapse/events/__init__.py4
-rw-r--r--synapse/events/snapshot.py4
-rw-r--r--synapse/events/utils.py4
-rw-r--r--synapse/events/validator.py12
-rw-r--r--synapse/federation/federation_base.py8
-rw-r--r--synapse/federation/federation_server.py48
-rw-r--r--synapse/federation/send_queue.py8
-rw-r--r--synapse/federation/sender/__init__.py14
-rw-r--r--synapse/federation/transport/client.py3
-rw-r--r--synapse/groups/groups_server.py4
-rw-r--r--synapse/handlers/appservice.py10
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/cas_handler.py3
-rw-r--r--synapse/handlers/device.py15
-rw-r--r--synapse/handlers/devicemessage.py25
-rw-r--r--synapse/handlers/directory.py68
-rw-r--r--synapse/handlers/e2e_keys.py14
-rw-r--r--synapse/handlers/e2e_room_keys.py7
-rw-r--r--synapse/handlers/federation.py44
-rw-r--r--synapse/handlers/groups_local.py4
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/pagination.py28
-rw-r--r--synapse/handlers/presence.py43
-rw-r--r--synapse/handlers/profile.py8
-rw-r--r--synapse/handlers/register.py230
-rw-r--r--synapse/handlers/room.py82
-rw-r--r--synapse/handlers/room_list.py4
-rw-r--r--synapse/handlers/room_member.py7
-rw-r--r--synapse/handlers/sync.py34
-rw-r--r--synapse/handlers/typing.py69
-rw-r--r--synapse/handlers/user_directory.py6
-rw-r--r--synapse/http/client.py8
-rw-r--r--synapse/http/federation/matrix_federation_agent.py10
-rw-r--r--synapse/http/federation/well_known_resolver.py17
-rw-r--r--synapse/http/matrixfederationclient.py21
-rw-r--r--synapse/http/server.py4
-rw-r--r--synapse/http/site.py6
-rw-r--r--synapse/logging/formatter.py3
-rw-r--r--synapse/logging/opentracing.py39
-rw-r--r--synapse/metrics/__init__.py12
-rw-r--r--synapse/metrics/_exposition.py5
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py14
-rw-r--r--synapse/push/httppusher.py2
-rw-r--r--synapse/push/mailer.py3
-rw-r--r--synapse/push/push_rule_evaluator.py8
-rw-r--r--synapse/push/pusherpool.py4
-rw-r--r--synapse/python_dependencies.py13
-rw-r--r--synapse/replication/http/_base.py6
-rw-r--r--synapse/replication/tcp/commands.py4
-rw-r--r--synapse/replication/tcp/handler.py30
-rw-r--r--synapse/replication/tcp/streams/_base.py29
-rw-r--r--synapse/rest/admin/rooms.py4
-rw-r--r--synapse/rest/admin/users.py20
-rw-r--r--synapse/rest/client/v1/login.py17
-rw-r--r--synapse/rest/client/v1/presence.py8
-rw-r--r--synapse/rest/client/v1/room.py3
-rw-r--r--synapse/rest/client/v2_alpha/account.py5
-rw-r--r--synapse/rest/client/v2_alpha/register.py11
-rw-r--r--synapse/rest/client/v2_alpha/report_event.py10
-rw-r--r--synapse/rest/consent/consent_resource.py5
-rw-r--r--synapse/rest/media/v1/_base.py3
-rw-r--r--synapse/rest/media/v1/media_repository.py12
-rw-r--r--synapse/rest/media/v1/media_storage.py6
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py27
-rw-r--r--synapse/server_notices/consent_server_notices.py6
-rw-r--r--synapse/server_notices/resource_limits_server_notices.py4
-rw-r--r--synapse/state/__init__.py28
-rw-r--r--synapse/state/v1.py26
-rw-r--r--synapse/state/v2.py77
-rw-r--r--synapse/static/client/login/index.html26
-rw-r--r--synapse/static/client/login/js/login.js133
-rw-r--r--synapse/static/client/login/style.css34
-rw-r--r--synapse/storage/data_stores/main/client_ips.py6
-rw-r--r--synapse/storage/data_stores/main/devices.py8
-rw-r--r--synapse/storage/data_stores/main/end_to_end_keys.py6
-rw-r--r--synapse/storage/data_stores/main/event_federation.py3
-rw-r--r--synapse/storage/data_stores/main/event_push_actions.py4
-rw-r--r--synapse/storage/data_stores/main/events.py30
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py4
-rw-r--r--synapse/storage/data_stores/main/events_worker.py41
-rw-r--r--synapse/storage/data_stores/main/media_repository.py9
-rw-r--r--synapse/storage/data_stores/main/presence.py41
-rw-r--r--synapse/storage/data_stores/main/push_rule.py56
-rw-r--r--synapse/storage/data_stores/main/receipts.py91
-rw-r--r--synapse/storage/data_stores/main/registration.py4
-rw-r--r--synapse/storage/data_stores/main/room.py42
-rw-r--r--synapse/storage/data_stores/main/roommember.py10
-rw-r--r--synapse/storage/data_stores/main/schema/delta/30/as_users.py2
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres18
-rw-r--r--synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite18
-rw-r--r--synapse/storage/data_stores/main/search.py4
-rw-r--r--synapse/storage/data_stores/main/stream.py2
-rw-r--r--synapse/storage/data_stores/main/tags.py2
-rw-r--r--synapse/storage/data_stores/main/ui_auth.py2
-rw-r--r--synapse/storage/data_stores/state/bg_updates.py6
-rw-r--r--synapse/storage/data_stores/state/store.py19
-rw-r--r--synapse/storage/database.py16
-rw-r--r--synapse/storage/persist_events.py9
-rw-r--r--synapse/storage/state.py38
-rw-r--r--synapse/util/async_helpers.py4
-rw-r--r--synapse/util/caches/descriptors.py4
-rw-r--r--synapse/util/caches/expiringcache.py6
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/caches/treecache.py4
-rw-r--r--synapse/util/file_consumer.py2
-rw-r--r--synapse/util/frozenutils.py6
-rw-r--r--synapse/util/wheel_timer.py2
-rw-r--r--synapse/visibility.py23
133 files changed, 1687 insertions, 1040 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index c371e8f3c4..f5cd8271a6 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.15.2"
+__version__ = "1.16.0rc1"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py
index d528450c78..55cce2db22 100644
--- a/synapse/_scripts/register_new_matrix_user.py
+++ b/synapse/_scripts/register_new_matrix_user.py
@@ -23,8 +23,6 @@ import hmac
 import logging
 import sys
 
-from six.moves import input
-
 import requests as _requests
 import yaml
 
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 06ade25674..06ba6604f3 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -16,8 +16,6 @@
 import logging
 from typing import Optional
 
-from six import itervalues
-
 import pymacaroons
 from netaddr import IPAddress
 
@@ -90,7 +88,7 @@ class Auth(object):
             event, prev_state_ids, for_verification=True
         )
         auth_events = yield self.store.get_events(auth_events_ids)
-        auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
+        auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
 
         room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
         event_auth.check(
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 5ec4a77ccd..6a6d32c302 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -150,3 +150,8 @@ class EventContentFields(object):
     # Timestamp to delete the event after
     # cf https://github.com/matrix-org/matrix-doc/pull/2228
     SELF_DESTRUCT_AFTER = "org.matrix.self_destruct_after"
+
+
+class RoomEncryptionAlgorithms(object):
+    MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
+    DEFAULT = MEGOLM_V1_AES_SHA2
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index d54dfb385d..5305038c21 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -17,11 +17,9 @@
 """Contains exceptions and error codes."""
 
 import logging
+from http import HTTPStatus
 from typing import Dict, List
 
-from six import iteritems
-from six.moves import http_client
-
 from canonicaljson import json
 
 from twisted.web import http
@@ -174,7 +172,7 @@ class ConsentNotGivenError(SynapseError):
             consent_url (str): The URL where the user can give their consent
         """
         super(ConsentNotGivenError, self).__init__(
-            code=http_client.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
+            code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
         )
         self._consent_uri = consent_uri
 
@@ -194,7 +192,7 @@ class UserDeactivatedError(SynapseError):
             msg (str): The human-readable error message
         """
         super(UserDeactivatedError, self).__init__(
-            code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
+            code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
         )
 
 
@@ -497,7 +495,7 @@ def cs_error(msg, code=Codes.UNKNOWN, **kwargs):
         A dict representing the error response JSON.
     """
     err = {"error": msg, "errcode": code}
-    for key, value in iteritems(kwargs):
+    for key, value in kwargs.items():
         err[key] = value
     return err
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 8b64d0a285..f988f62a1e 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -17,8 +17,6 @@
 # limitations under the License.
 from typing import List
 
-from six import text_type
-
 import jsonschema
 from canonicaljson import json
 from jsonschema import FormatChecker
@@ -313,7 +311,7 @@ class Filter(object):
 
             content = event.get("content", {})
             # check if there is a string url field in the content for filtering purposes
-            contains_url = isinstance(content.get("url"), text_type)
+            contains_url = isinstance(content.get("url"), str)
             labels = content.get(EventContentFields.LABELS, [])
 
         return self.check_fields(room_id, sender, ev_type, labels, contains_url)
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index f34434bd67..bd03ebca5a 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -17,8 +17,7 @@
 """Contains the URL paths to prefix various aspects of the server with. """
 import hmac
 from hashlib import sha256
-
-from six.moves.urllib.parse import urlencode
+from urllib.parse import urlencode
 
 from synapse.config import ConfigError
 
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index dedff81af3..373a80a4a7 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -20,6 +20,7 @@ import signal
 import socket
 import sys
 import traceback
+from typing import Iterable
 
 from daemonize import Daemonize
 from typing_extensions import NoReturn
@@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
 
 import synapse
 from synapse.app import check_bind_error
+from synapse.config.server import ListenerConfig
 from synapse.crypto import context_factory
 from synapse.logging.context import PreserveLoggingContext
 from synapse.util.async_helpers import Linearizer
@@ -234,7 +236,7 @@ def refresh_certificate(hs):
         logger.info("Context factories updated.")
 
 
-def start(hs, listeners=None):
+def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
     """
     Start a Synapse server or worker.
 
@@ -245,8 +247,8 @@ def start(hs, listeners=None):
     notify systemd.
 
     Args:
-        hs (synapse.server.HomeServer)
-        listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
+        hs: homeserver instance
+        listeners: Listener configuration ('listeners' in homeserver.yaml)
     """
     try:
         # Set up the SIGHUP machinery.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index f3ec2a34ec..27a3fc9ed6 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -37,6 +37,7 @@ from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.config.server import ListenerConfig
 from synapse.federation import send_queue
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.handlers.presence import (
@@ -514,13 +515,18 @@ class GenericWorkerSlavedStore(
 class GenericWorkerServer(HomeServer):
     DATASTORE_CLASS = GenericWorkerSlavedStore
 
-    def _listen_http(self, listener_config):
-        port = listener_config["port"]
-        bind_addresses = listener_config["bind_addresses"]
-        site_tag = listener_config.get("tag", port)
+    def _listen_http(self, listener_config: ListenerConfig):
+        port = listener_config.port
+        bind_addresses = listener_config.bind_addresses
+
+        assert listener_config.http_options is not None
+
+        site_tag = listener_config.http_options.tag
+        if site_tag is None:
+            site_tag = port
         resources = {}
-        for res in listener_config["resources"]:
-            for name in res["names"]:
+        for res in listener_config.http_options.resources:
+            for name in res.names:
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "client":
@@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer):
                             " repository is disabled. Ignoring."
                         )
 
-                if name == "openid" and "federation" not in res["names"]:
+                if name == "openid" and "federation" not in res.names:
                     # Only load the openid resource separately if federation resource
                     # is not specified since federation resource includes openid
                     # resource.
@@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer):
 
         logger.info("Synapse worker now listening on port %d", port)
 
-    def start_listening(self, listeners):
+    def start_listening(self, listeners: Iterable[ListenerConfig]):
         for listener in listeners:
-            if listener["type"] == "http":
+            if listener.type == "http":
                 self._listen_http(listener)
-            elif listener["type"] == "manhole":
+            elif listener.type == "manhole":
                 _base.listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     manhole(
                         username="matrix", password="rabbithole", globals={"hs": self}
                     ),
                 )
-            elif listener["type"] == "metrics":
+            elif listener.type == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warning(
                         (
@@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer):
                         )
                     )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
+                    _base.listen_metrics(listener.bind_addresses, listener.port)
             else:
-                logger.warning("Unrecognized listener type: %s", listener["type"])
+                logger.warning("Unsupported listener type: %s", listener.type)
 
         self.get_tcp_replication().start_replication(self)
 
@@ -738,6 +744,11 @@ class GenericWorkerReplicationHandler(ReplicationDataHandler):
         except Exception:
             logger.exception("Error processing replication")
 
+    async def on_position(self, stream_name: str, instance_name: str, token: int):
+        await super().on_position(stream_name, instance_name, token)
+        # Also call on_rdata to ensure that stream positions are properly reset.
+        await self.on_rdata(stream_name, instance_name, token, [])
+
     def stop_pusher(self, user_id, app_id, pushkey):
         if not self.notify_pushers:
             return
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 41994dc14b..09291d86ad 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -23,8 +23,7 @@ import math
 import os
 import resource
 import sys
-
-from six import iteritems
+from typing import Iterable
 
 from prometheus_client import Gauge
 
@@ -50,6 +49,7 @@ from synapse.app import _base
 from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.server import ListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import (
@@ -90,24 +90,24 @@ def gz_wrap(r):
 class SynapseHomeServer(HomeServer):
     DATASTORE_CLASS = DataStore
 
-    def _listener_http(self, config, listener_config):
-        port = listener_config["port"]
-        bind_addresses = listener_config["bind_addresses"]
-        tls = listener_config.get("tls", False)
-        site_tag = listener_config.get("tag", port)
+    def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
+        port = listener_config.port
+        bind_addresses = listener_config.bind_addresses
+        tls = listener_config.tls
+        site_tag = listener_config.http_options.tag
+        if site_tag is None:
+            site_tag = port
 
         resources = {}
-        for res in listener_config["resources"]:
-            for name in res["names"]:
-                if name == "openid" and "federation" in res["names"]:
+        for res in listener_config.http_options.resources:
+            for name in res.names:
+                if name == "openid" and "federation" in res.names:
                     # Skip loading openid resource if federation is defined
                     # since federation resource will include openid
                     continue
-                resources.update(
-                    self._configure_named_resource(name, res.get("compress", False))
-                )
+                resources.update(self._configure_named_resource(name, res.compress))
 
-        additional_resources = listener_config.get("additional_resources", {})
+        additional_resources = listener_config.http_options.additional_resources
         logger.debug("Configuring additional resources: %r", additional_resources)
         module_api = ModuleApi(self, self.get_auth_handler())
         for path, resmodule in additional_resources.items():
@@ -279,7 +279,7 @@ class SynapseHomeServer(HomeServer):
 
         return resources
 
-    def start_listening(self, listeners):
+    def start_listening(self, listeners: Iterable[ListenerConfig]):
         config = self.get_config()
 
         if config.redis_enabled:
@@ -289,25 +289,25 @@ class SynapseHomeServer(HomeServer):
             self.get_tcp_replication().start_replication(self)
 
         for listener in listeners:
-            if listener["type"] == "http":
+            if listener.type == "http":
                 self._listening_services.extend(self._listener_http(config, listener))
-            elif listener["type"] == "manhole":
+            elif listener.type == "manhole":
                 listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     manhole(
                         username="matrix", password="rabbithole", globals={"hs": self}
                     ),
                 )
-            elif listener["type"] == "replication":
+            elif listener.type == "replication":
                 services = listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     ReplicationStreamProtocolFactory(self),
                 )
                 for s in services:
                     reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
-            elif listener["type"] == "metrics":
+            elif listener.type == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warning(
                         (
@@ -316,9 +316,11 @@ class SynapseHomeServer(HomeServer):
                         )
                     )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
+                    _base.listen_metrics(listener.bind_addresses, listener.port)
             else:
-                logger.warning("Unrecognized listener type: %s", listener["type"])
+                # this shouldn't happen, as the listener type should have been checked
+                # during parsing
+                logger.warning("Unrecognized listener type: %s", listener.type)
 
 
 # Gauges to expose monthly active user control metrics
@@ -526,7 +528,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
     stats["total_nonbridged_users"] = total_nonbridged_users
 
     daily_user_type_results = yield hs.get_datastore().count_daily_user_type()
-    for name, count in iteritems(daily_user_type_results):
+    for name, count in daily_user_type_results.items():
         stats["daily_user_type_" + name] = count
 
     room_count = yield hs.get_datastore().get_room_count()
@@ -538,7 +540,7 @@ def phone_stats_home(hs, stats, stats_process=_stats_process):
     stats["daily_messages"] = yield hs.get_datastore().count_daily_messages()
 
     r30_results = yield hs.get_datastore().count_r30_users()
-    for name, count in iteritems(r30_results):
+    for name, count in r30_results.items():
         stats["r30_users_" + name] = count
 
     daily_sent_messages = yield hs.get_datastore().count_daily_sent_messages()
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 1b13e84425..0323256472 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -15,8 +15,6 @@
 import logging
 import re
 
-from six import string_types
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
@@ -156,7 +154,7 @@ class ApplicationService(object):
                         )
 
                 regex = regex_obj.get("regex")
-                if isinstance(regex, string_types):
+                if isinstance(regex, str):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
                 else:
                     raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 57174da021..da9a5e86d4 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -13,8 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-
-from six.moves import urllib
+import urllib
 
 from prometheus_client import Counter
 
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 30d1050a91..1391e5fc43 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -22,8 +22,6 @@ from collections import OrderedDict
 from textwrap import dedent
 from typing import Any, MutableMapping, Optional
 
-from six import integer_types
-
 import yaml
 
 
@@ -117,7 +115,7 @@ class Config(object):
 
     @staticmethod
     def parse_size(value):
-        if isinstance(value, integer_types):
+        if isinstance(value, int):
             return value
         sizes = {"K": 1024, "M": 1024 * 1024}
         size = 1
@@ -129,7 +127,7 @@ class Config(object):
 
     @staticmethod
     def parse_duration(value):
-        if isinstance(value, integer_types):
+        if isinstance(value, int):
             return value
         second = 1000
         minute = 60 * second
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index ca43e96bd1..8ed3e24258 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -14,9 +14,7 @@
 
 import logging
 from typing import Dict
-
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from urllib import parse as urlparse
 
 import yaml
 from netaddr import IPSet
@@ -98,17 +96,14 @@ def load_appservices(hostname, config_files):
 def _load_appservice(hostname, as_info, config_filename):
     required_string_fields = ["id", "as_token", "hs_token", "sender_localpart"]
     for field in required_string_fields:
-        if not isinstance(as_info.get(field), string_types):
+        if not isinstance(as_info.get(field), str):
             raise KeyError(
                 "Required string field: '%s' (%s)" % (field, config_filename)
             )
 
     # 'url' must either be a string or explicitly null, not missing
     # to avoid accidentally turning off push for ASes.
-    if (
-        not isinstance(as_info.get("url"), string_types)
-        and as_info.get("url", "") is not None
-    ):
+    if not isinstance(as_info.get("url"), str) and as_info.get("url", "") is not None:
         raise KeyError(
             "Required string field or explicit null: 'url' (%s)" % (config_filename,)
         )
@@ -138,7 +133,7 @@ def _load_appservice(hostname, as_info, config_filename):
                         ns,
                         regex_obj,
                     )
-                if not isinstance(regex_obj.get("regex"), string_types):
+                if not isinstance(regex_obj.get("regex"), str):
                     raise ValueError("Missing/bad type 'regex' key in %s", regex_obj)
                 if not isinstance(regex_obj.get("exclusive"), bool):
                     raise ValueError(
diff --git a/synapse/config/cache.py b/synapse/config/cache.py
index 0672538796..aff5b21ab2 100644
--- a/synapse/config/cache.py
+++ b/synapse/config/cache.py
@@ -15,6 +15,7 @@
 
 import os
 import re
+import threading
 from typing import Callable, Dict
 
 from ._base import Config, ConfigError
@@ -25,6 +26,9 @@ _CACHE_PREFIX = "SYNAPSE_CACHE_FACTOR"
 # Map from canonicalised cache name to cache.
 _CACHES = {}
 
+# a lock on the contents of _CACHES
+_CACHES_LOCK = threading.Lock()
+
 _DEFAULT_FACTOR_SIZE = 0.5
 _DEFAULT_EVENT_CACHE_SIZE = "10K"
 
@@ -66,7 +70,10 @@ def add_resizable_cache(cache_name: str, cache_resize_callback: Callable):
     # Some caches have '*' in them which we strip out.
     cache_name = _canonicalise_cache_name(cache_name)
 
-    _CACHES[cache_name] = cache_resize_callback
+    # sometimes caches are initialised from background threads, so we need to make
+    # sure we don't conflict with another thread running a resize operation
+    with _CACHES_LOCK:
+        _CACHES[cache_name] = cache_resize_callback
 
     # Ensure all loaded caches are sized appropriately
     #
@@ -87,7 +94,8 @@ class CacheConfig(Config):
             os.environ.get(_CACHE_PREFIX, _DEFAULT_FACTOR_SIZE)
         )
         properties.resize_all_caches_func = None
-        _CACHES.clear()
+        with _CACHES_LOCK:
+            _CACHES.clear()
 
     def generate_config_section(self, **kwargs):
         return """\
@@ -193,6 +201,8 @@ class CacheConfig(Config):
         For each cache, run the mapped callback function with either
         a specific cache factor or the default, global one.
         """
-        for cache_name, callback in _CACHES.items():
-            new_factor = self.cache_factors.get(cache_name, self.global_factor)
-            callback(new_factor)
+        # block other threads from modifying _CACHES while we iterate it.
+        with _CACHES_LOCK:
+            for cache_name, callback in _CACHES.items():
+                new_factor = self.cache_factors.get(cache_name, self.global_factor)
+                callback(new_factor)
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 2c7b3a699f..264c274c52 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -36,6 +36,7 @@ from .ratelimiting import RatelimitConfig
 from .redis import RedisConfig
 from .registration import RegistrationConfig
 from .repository import ContentRepositoryConfig
+from .room import RoomConfig
 from .room_directory import RoomDirectoryConfig
 from .saml2_config import SAML2Config
 from .server import ServerConfig
@@ -79,6 +80,7 @@ class HomeServerConfig(RootConfig):
         PasswordAuthProviderConfig,
         PushConfig,
         SpamCheckerConfig,
+        RoomConfig,
         GroupsConfig,
         UserDirectoryConfig,
         ConsentConfig,
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index e24dd637bc..e0939bce84 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -89,7 +89,7 @@ class OIDCConfig(Config):
         # use an OpenID Connect Provider for authentication, instead of its internal
         # password database.
         #
-        # See https://github.com/matrix-org/synapse/blob/master/openid.md.
+        # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md.
         #
         oidc_config:
           # Uncomment the following to enable authorization against an OpenID Connect
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index fecced2d57..6badf4e75d 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -18,8 +18,9 @@ from distutils.util import strtobool
 
 import pkg_resources
 
+from synapse.api.constants import RoomCreationPreset
 from synapse.config._base import Config, ConfigError
-from synapse.types import RoomAlias
+from synapse.types import RoomAlias, UserID
 from synapse.util.stringutils import random_string_with_symbols
 
 
@@ -127,7 +128,50 @@ class RegistrationConfig(Config):
         for room_alias in self.auto_join_rooms:
             if not RoomAlias.is_valid(room_alias):
                 raise ConfigError("Invalid auto_join_rooms entry %s" % (room_alias,))
+
+        # Options for creating auto-join rooms if they do not exist yet.
         self.autocreate_auto_join_rooms = config.get("autocreate_auto_join_rooms", True)
+        self.autocreate_auto_join_rooms_federated = config.get(
+            "autocreate_auto_join_rooms_federated", True
+        )
+        self.autocreate_auto_join_room_preset = (
+            config.get("autocreate_auto_join_room_preset")
+            or RoomCreationPreset.PUBLIC_CHAT
+        )
+        self.auto_join_room_requires_invite = self.autocreate_auto_join_room_preset in {
+            RoomCreationPreset.PRIVATE_CHAT,
+            RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+        }
+
+        # Pull the creater/inviter from the configuration, this gets used to
+        # send invites for invite-only rooms.
+        mxid_localpart = config.get("auto_join_mxid_localpart")
+        self.auto_join_user_id = None
+        if mxid_localpart:
+            # Convert the localpart to a full mxid.
+            self.auto_join_user_id = UserID(
+                mxid_localpart, self.server_name
+            ).to_string()
+
+        if self.autocreate_auto_join_rooms:
+            # Ensure the preset is a known value.
+            if self.autocreate_auto_join_room_preset not in {
+                RoomCreationPreset.PUBLIC_CHAT,
+                RoomCreationPreset.PRIVATE_CHAT,
+                RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+            }:
+                raise ConfigError("Invalid value for autocreate_auto_join_room_preset")
+            # If the preset requires invitations to be sent, ensure there's a
+            # configured user to send them from.
+            if self.auto_join_room_requires_invite:
+                if not mxid_localpart:
+                    raise ConfigError(
+                        "The configuration option `auto_join_mxid_localpart` is required if "
+                        "`autocreate_auto_join_room_preset` is set to private_chat or trusted_private_chat, such that "
+                        "Synapse knows who to send invitations from. Please "
+                        "configure `auto_join_mxid_localpart`."
+                    )
+
         self.auto_join_rooms_for_guests = config.get("auto_join_rooms_for_guests", True)
 
         self.enable_set_displayname = config.get("enable_set_displayname", True)
@@ -357,7 +401,11 @@ class RegistrationConfig(Config):
         #enable_3pid_changes: false
 
         # Users who register on this homeserver will automatically be joined
-        # to these rooms
+        # to these rooms.
+        #
+        # By default, any room aliases included in this list will be created
+        # as a publicly joinable room when the first user registers for the
+        # homeserver. This behaviour can be customised with the settings below.
         #
         #auto_join_rooms:
         #  - "#example:example.com"
@@ -365,10 +413,62 @@ class RegistrationConfig(Config):
         # Where auto_join_rooms are specified, setting this flag ensures that the
         # the rooms exist by creating them when the first user on the
         # homeserver registers.
+        #
+        # By default the auto-created rooms are publicly joinable from any federated
+        # server. Use the autocreate_auto_join_rooms_federated and
+        # autocreate_auto_join_room_preset settings below to customise this behaviour.
+        #
         # Setting to false means that if the rooms are not manually created,
         # users cannot be auto-joined since they do not exist.
         #
-        #autocreate_auto_join_rooms: true
+        # Defaults to true. Uncomment the following line to disable automatically
+        # creating auto-join rooms.
+        #
+        #autocreate_auto_join_rooms: false
+
+        # Whether the auto_join_rooms that are auto-created are available via
+        # federation. Only has an effect if autocreate_auto_join_rooms is true.
+        #
+        # Note that whether a room is federated cannot be modified after
+        # creation.
+        #
+        # Defaults to true: the room will be joinable from other servers.
+        # Uncomment the following to prevent users from other homeservers from
+        # joining these rooms.
+        #
+        #autocreate_auto_join_rooms_federated: false
+
+        # The room preset to use when auto-creating one of auto_join_rooms. Only has an
+        # effect if autocreate_auto_join_rooms is true.
+        #
+        # This can be one of "public_chat", "private_chat", or "trusted_private_chat".
+        # If a value of "private_chat" or "trusted_private_chat" is used then
+        # auto_join_mxid_localpart must also be configured.
+        #
+        # Defaults to "public_chat", meaning that the room is joinable by anyone, including
+        # federated servers if autocreate_auto_join_rooms_federated is true (the default).
+        # Uncomment the following to require an invitation to join these rooms.
+        #
+        #autocreate_auto_join_room_preset: private_chat
+
+        # The local part of the user id which is used to create auto_join_rooms if
+        # autocreate_auto_join_rooms is true. If this is not provided then the
+        # initial user account that registers will be used to create the rooms.
+        #
+        # The user id is also used to invite new users to any auto-join rooms which
+        # are set to invite-only.
+        #
+        # It *must* be configured if autocreate_auto_join_room_preset is set to
+        # "private_chat" or "trusted_private_chat".
+        #
+        # Note that this must be specified in order for new users to be correctly
+        # invited to any auto-join rooms which have been set to invite-only (either
+        # at the time of creation or subsequently).
+        #
+        # Note that, if the room already exists, this user must be joined and
+        # have the appropriate permissions to invite new members.
+        #
+        #auto_join_mxid_localpart: system
 
         # When auto_join_rooms is specified, setting this flag to false prevents
         # guest accounts from being automatically joined to the rooms.
diff --git a/synapse/config/repository.py b/synapse/config/repository.py
index b751d02d37..01009f3924 100644
--- a/synapse/config/repository.py
+++ b/synapse/config/repository.py
@@ -94,6 +94,12 @@ class ContentRepositoryConfig(Config):
         else:
             self.can_load_media_repo = True
 
+        # Whether this instance should be the one to run the background jobs to
+        # e.g clean up old URL previews.
+        self.media_instance_running_background_jobs = config.get(
+            "media_instance_running_background_jobs",
+        )
+
         self.max_upload_size = self.parse_size(config.get("max_upload_size", "10M"))
         self.max_image_pixels = self.parse_size(config.get("max_image_pixels", "32M"))
         self.max_spider_size = self.parse_size(config.get("max_spider_size", "10M"))
diff --git a/synapse/config/room.py b/synapse/config/room.py
new file mode 100644
index 0000000000..6aa4de0672
--- /dev/null
+++ b/synapse/config/room.py
@@ -0,0 +1,80 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from synapse.api.constants import RoomCreationPreset
+
+from ._base import Config, ConfigError
+
+logger = logging.Logger(__name__)
+
+
+class RoomDefaultEncryptionTypes(object):
+    """Possible values for the encryption_enabled_by_default_for_room_type config option"""
+
+    ALL = "all"
+    INVITE = "invite"
+    OFF = "off"
+
+
+class RoomConfig(Config):
+    section = "room"
+
+    def read_config(self, config, **kwargs):
+        # Whether new, locally-created rooms should have encryption enabled
+        encryption_for_room_type = config.get(
+            "encryption_enabled_by_default_for_room_type",
+            RoomDefaultEncryptionTypes.OFF,
+        )
+        if encryption_for_room_type == RoomDefaultEncryptionTypes.ALL:
+            self.encryption_enabled_by_default_for_room_presets = [
+                RoomCreationPreset.PRIVATE_CHAT,
+                RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+                RoomCreationPreset.PUBLIC_CHAT,
+            ]
+        elif encryption_for_room_type == RoomDefaultEncryptionTypes.INVITE:
+            self.encryption_enabled_by_default_for_room_presets = [
+                RoomCreationPreset.PRIVATE_CHAT,
+                RoomCreationPreset.TRUSTED_PRIVATE_CHAT,
+            ]
+        elif encryption_for_room_type == RoomDefaultEncryptionTypes.OFF:
+            self.encryption_enabled_by_default_for_room_presets = []
+        else:
+            raise ConfigError(
+                "Invalid value for encryption_enabled_by_default_for_room_type"
+            )
+
+    def generate_config_section(self, **kwargs):
+        return """\
+        ## Rooms ##
+
+        # Controls whether locally-created rooms should be end-to-end encrypted by
+        # default.
+        #
+        # Possible options are "all", "invite", and "off". They are defined as:
+        #
+        # * "all": any locally-created room
+        # * "invite": any room created with the "private_chat" or "trusted_private_chat"
+        #             room creation presets
+        # * "off": this option will take no effect
+        #
+        # The default value is "off".
+        #
+        # Note that this option will only affect rooms created after it is set. It
+        # will also not affect rooms created by other servers.
+        #
+        #encryption_enabled_by_default_for_room_type: invite
+        """
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2_config.py
index d0a19751e8..293643b2de 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2_config.py
@@ -160,7 +160,7 @@ class SAML2Config(Config):
 
         # session lifetime: in milliseconds
         self.saml2_session_lifetime = self.parse_duration(
-            saml2_config.get("saml_session_lifetime", "5m")
+            saml2_config.get("saml_session_lifetime", "15m")
         )
 
         template_dir = saml2_config.get("template_dir")
@@ -286,7 +286,7 @@ class SAML2Config(Config):
 
           # The lifetime of a SAML session. This defines how long a user has to
           # complete the authentication process, if allow_unsolicited is unset.
-          # The default is 5 minutes.
+          # The default is 15 minutes.
           #
           #saml_session_lifetime: 5m
 
diff --git a/synapse/config/server.py b/synapse/config/server.py
index f57eefc99c..8204664883 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -19,7 +19,7 @@ import logging
 import os.path
 import re
 from textwrap import indent
-from typing import Dict, List, Optional
+from typing import Any, Dict, Iterable, List, Optional
 
 import attr
 import yaml
@@ -57,6 +57,64 @@ on how to configure the new listener.
 --------------------------------------------------------------------------------"""
 
 
+KNOWN_LISTENER_TYPES = {
+    "http",
+    "metrics",
+    "manhole",
+    "replication",
+}
+
+KNOWN_RESOURCES = {
+    "client",
+    "consent",
+    "federation",
+    "keys",
+    "media",
+    "metrics",
+    "openid",
+    "replication",
+    "static",
+    "webclient",
+}
+
+
+@attr.s(frozen=True)
+class HttpResourceConfig:
+    names = attr.ib(
+        type=List[str],
+        factory=list,
+        validator=attr.validators.deep_iterable(attr.validators.in_(KNOWN_RESOURCES)),  # type: ignore
+    )
+    compress = attr.ib(
+        type=bool,
+        default=False,
+        validator=attr.validators.optional(attr.validators.instance_of(bool)),  # type: ignore[arg-type]
+    )
+
+
+@attr.s(frozen=True)
+class HttpListenerConfig:
+    """Object describing the http-specific parts of the config of a listener"""
+
+    x_forwarded = attr.ib(type=bool, default=False)
+    resources = attr.ib(type=List[HttpResourceConfig], factory=list)
+    additional_resources = attr.ib(type=Dict[str, dict], factory=dict)
+    tag = attr.ib(type=str, default=None)
+
+
+@attr.s(frozen=True)
+class ListenerConfig:
+    """Object describing the configuration of a single listener."""
+
+    port = attr.ib(type=int, validator=attr.validators.instance_of(int))
+    bind_addresses = attr.ib(type=List[str])
+    type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES))
+    tls = attr.ib(type=bool, default=False)
+
+    # http_options is only populated if type=http
+    http_options = attr.ib(type=Optional[HttpListenerConfig], default=None)
+
+
 class ServerConfig(Config):
     section = "server"
 
@@ -379,38 +437,21 @@ class ServerConfig(Config):
                 }
             ]
 
-        self.listeners = []  # type: List[dict]
-        for listener in config.get("listeners", []):
-            if not isinstance(listener.get("port", None), int):
-                raise ConfigError(
-                    "Listener configuration is lacking a valid 'port' option"
-                )
+        self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
 
-            if listener.setdefault("tls", False):
-                # no_tls is not really supported any more, but let's grandfather it in
-                # here.
-                if config.get("no_tls", False):
+        # no_tls is not really supported any more, but let's grandfather it in
+        # here.
+        if config.get("no_tls", False):
+            l2 = []
+            for listener in self.listeners:
+                if listener.tls:
                     logger.info(
-                        "Ignoring TLS-enabled listener on port %i due to no_tls"
+                        "Ignoring TLS-enabled listener on port %i due to no_tls",
+                        listener.port,
                     )
-                    continue
-
-            bind_address = listener.pop("bind_address", None)
-            bind_addresses = listener.setdefault("bind_addresses", [])
-
-            # if bind_address was specified, add it to the list of addresses
-            if bind_address:
-                bind_addresses.append(bind_address)
-
-            # if we still have an empty list of addresses, use the default list
-            if not bind_addresses:
-                if listener["type"] == "metrics":
-                    # the metrics listener doesn't support IPv6
-                    bind_addresses.append("0.0.0.0")
                 else:
-                    bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
-
-            self.listeners.append(listener)
+                    l2.append(listener)
+            self.listeners = l2
 
         if not self.web_client_location:
             _warn_if_webclient_configured(self.listeners)
@@ -446,43 +487,41 @@ class ServerConfig(Config):
             bind_host = config.get("bind_host", "")
             gzip_responses = config.get("gzip_responses", True)
 
+            http_options = HttpListenerConfig(
+                resources=[
+                    HttpResourceConfig(names=["client"], compress=gzip_responses),
+                    HttpResourceConfig(names=["federation"]),
+                ],
+            )
+
             self.listeners.append(
-                {
-                    "port": bind_port,
-                    "bind_addresses": [bind_host],
-                    "tls": True,
-                    "type": "http",
-                    "resources": [
-                        {"names": ["client"], "compress": gzip_responses},
-                        {"names": ["federation"], "compress": False},
-                    ],
-                }
+                ListenerConfig(
+                    port=bind_port,
+                    bind_addresses=[bind_host],
+                    tls=True,
+                    type="http",
+                    http_options=http_options,
+                )
             )
 
             unsecure_port = config.get("unsecure_port", bind_port - 400)
             if unsecure_port:
                 self.listeners.append(
-                    {
-                        "port": unsecure_port,
-                        "bind_addresses": [bind_host],
-                        "tls": False,
-                        "type": "http",
-                        "resources": [
-                            {"names": ["client"], "compress": gzip_responses},
-                            {"names": ["federation"], "compress": False},
-                        ],
-                    }
+                    ListenerConfig(
+                        port=unsecure_port,
+                        bind_addresses=[bind_host],
+                        tls=False,
+                        type="http",
+                        http_options=http_options,
+                    )
                 )
 
         manhole = config.get("manhole")
         if manhole:
             self.listeners.append(
-                {
-                    "port": manhole,
-                    "bind_addresses": ["127.0.0.1"],
-                    "type": "manhole",
-                    "tls": False,
-                }
+                ListenerConfig(
+                    port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
+                )
             )
 
         metrics_port = config.get("metrics_port")
@@ -490,13 +529,14 @@ class ServerConfig(Config):
             logger.warning(METRICS_PORT_WARNING)
 
             self.listeners.append(
-                {
-                    "port": metrics_port,
-                    "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],
-                    "tls": False,
-                    "type": "http",
-                    "resources": [{"names": ["metrics"], "compress": False}],
-                }
+                ListenerConfig(
+                    port=metrics_port,
+                    bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")],
+                    type="http",
+                    http_options=HttpListenerConfig(
+                        resources=[HttpResourceConfig(names=["metrics"])]
+                    ),
+                )
             )
 
         _check_resource_config(self.listeners)
@@ -522,7 +562,7 @@ class ServerConfig(Config):
         )
 
     def has_tls_listener(self) -> bool:
-        return any(listener["tls"] for listener in self.listeners)
+        return any(listener.tls for listener in self.listeners)
 
     def generate_config_section(
         self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
@@ -856,7 +896,7 @@ class ServerConfig(Config):
         # number of monthly active users.
         #
         # 'limit_usage_by_mau' disables/enables monthly active user blocking. When
-        # anabled and a limit is reached the server returns a 'ResourceLimitError'
+        # enabled and a limit is reached the server returns a 'ResourceLimitError'
         # with error type Codes.RESOURCE_LIMIT_EXCEEDED
         #
         # 'max_mau_value' is the hard limit of monthly active users above which
@@ -1081,6 +1121,44 @@ def read_gc_thresholds(thresholds):
         )
 
 
+def parse_listener_def(listener: Any) -> ListenerConfig:
+    """parse a listener config from the config file"""
+    listener_type = listener["type"]
+
+    port = listener.get("port")
+    if not isinstance(port, int):
+        raise ConfigError("Listener configuration is lacking a valid 'port' option")
+
+    tls = listener.get("tls", False)
+
+    bind_addresses = listener.get("bind_addresses", [])
+    bind_address = listener.get("bind_address")
+    # if bind_address was specified, add it to the list of addresses
+    if bind_address:
+        bind_addresses.append(bind_address)
+
+    # if we still have an empty list of addresses, use the default list
+    if not bind_addresses:
+        if listener_type == "metrics":
+            # the metrics listener doesn't support IPv6
+            bind_addresses.append("0.0.0.0")
+        else:
+            bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
+
+    http_config = None
+    if listener_type == "http":
+        http_config = HttpListenerConfig(
+            x_forwarded=listener.get("x_forwarded", False),
+            resources=[
+                HttpResourceConfig(**res) for res in listener.get("resources", [])
+            ],
+            additional_resources=listener.get("additional_resources", {}),
+            tag=listener.get("tag"),
+        )
+
+    return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
+
+
 NO_MORE_WEB_CLIENT_WARNING = """
 Synapse no longer includes a web client. To enable a web client, configure
 web_client_location. To remove this warning, remove 'webclient' from the 'listeners'
@@ -1088,40 +1166,27 @@ configuration.
 """
 
 
-def _warn_if_webclient_configured(listeners):
+def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None:
     for listener in listeners:
-        for res in listener.get("resources", []):
-            for name in res.get("names", []):
+        if not listener.http_options:
+            continue
+        for res in listener.http_options.resources:
+            for name in res.names:
                 if name == "webclient":
                     logger.warning(NO_MORE_WEB_CLIENT_WARNING)
                     return
 
 
-KNOWN_RESOURCES = (
-    "client",
-    "consent",
-    "federation",
-    "keys",
-    "media",
-    "metrics",
-    "openid",
-    "replication",
-    "static",
-    "webclient",
-)
-
-
-def _check_resource_config(listeners):
+def _check_resource_config(listeners: Iterable[ListenerConfig]) -> None:
     resource_names = {
         res_name
         for listener in listeners
-        for res in listener.get("resources", [])
-        for res_name in res.get("names", [])
+        if listener.http_options
+        for res in listener.http_options.resources
+        for res_name in res.names
     }
 
     for resource in resource_names:
-        if resource not in KNOWN_RESOURCES:
-            raise ConfigError("Unknown listener resource '%s'" % (resource,))
         if resource == "consent":
             try:
                 check_requirements("resources.consent")
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index a65538562b..e368ea564d 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -20,8 +20,6 @@ from datetime import datetime
 from hashlib import sha256
 from typing import List
 
-import six
-
 from unpaddedbase64 import encode_base64
 
 from OpenSSL import SSL, crypto
@@ -59,7 +57,7 @@ class TlsConfig(Config):
             logger.warning(ACME_SUPPORT_ENABLED_WARN)
 
         # hyperlink complains on py2 if this is not a Unicode
-        self.acme_url = six.text_type(
+        self.acme_url = str(
             acme_config.get("url", "https://acme-v01.api.letsencrypt.org/directory")
         )
         self.acme_port = acme_config.get("port", 80)
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ed06b91a54..dbc661630c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -16,6 +16,7 @@
 import attr
 
 from ._base import Config, ConfigError
+from .server import ListenerConfig, parse_listener_def
 
 
 @attr.s
@@ -52,7 +53,9 @@ class WorkerConfig(Config):
         if self.worker_app == "synapse.app.homeserver":
             self.worker_app = None
 
-        self.worker_listeners = config.get("worker_listeners", [])
+        self.worker_listeners = [
+            parse_listener_def(x) for x in config.get("worker_listeners", [])
+        ]
         self.worker_daemonize = config.get("worker_daemonize")
         self.worker_pid_file = config.get("worker_pid_file")
         self.worker_log_config = config.get("worker_log_config")
@@ -75,24 +78,11 @@ class WorkerConfig(Config):
         manhole = config.get("worker_manhole")
         if manhole:
             self.worker_listeners.append(
-                {
-                    "port": manhole,
-                    "bind_addresses": ["127.0.0.1"],
-                    "type": "manhole",
-                    "tls": False,
-                }
+                ListenerConfig(
+                    port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
+                )
             )
 
-        if self.worker_listeners:
-            for listener in self.worker_listeners:
-                bind_address = listener.pop("bind_address", None)
-                bind_addresses = listener.setdefault("bind_addresses", [])
-
-                if bind_address:
-                    bind_addresses.append(bind_address)
-                elif not bind_addresses:
-                    bind_addresses.append("")
-
         # A map from instance name to host/port of their HTTP replication endpoint.
         instance_map = config.get("instance_map") or {}
         self.instance_map = {
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index a9f4025bfe..dbfc3e8972 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -15,11 +15,9 @@
 # limitations under the License.
 
 import logging
+import urllib
 from collections import defaultdict
 
-import six
-from six.moves import urllib
-
 import attr
 from signedjson.key import (
     decode_verify_key_bytes,
@@ -661,7 +659,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
         for response in query_response["server_keys"]:
             # do this first, so that we can give useful errors thereafter
             server_name = response.get("server_name")
-            if not isinstance(server_name, six.string_types):
+            if not isinstance(server_name, str):
                 raise KeyLookupError(
                     "Malformed response from key notary server %s: invalid server_name"
                     % (perspective_name,)
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 533ba327f5..cc5deca75b 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -20,8 +20,6 @@ import os
 from distutils.util import strtobool
 from typing import Dict, Optional, Type
 
-import six
-
 from unpaddedbase64 import encode_base64
 
 from synapse.api.room_versions import EventFormatVersions, RoomVersion, RoomVersions
@@ -290,7 +288,7 @@ class EventBase(metaclass=abc.ABCMeta):
         return list(self._dict.items())
 
     def keys(self):
-        return six.iterkeys(self._dict)
+        return self._dict.keys()
 
     def prev_event_ids(self):
         """Returns the list of prev event IDs. The order matches the order
diff --git a/synapse/events/snapshot.py b/synapse/events/snapshot.py
index 7c5f620d09..f94cdcbaba 100644
--- a/synapse/events/snapshot.py
+++ b/synapse/events/snapshot.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 from typing import Optional, Union
 
-from six import iteritems
-
 import attr
 from frozendict import frozendict
 
@@ -341,7 +339,7 @@ def _encode_state_dict(state_dict):
     if state_dict is None:
         return None
 
-    return [(etype, state_key, v) for (etype, state_key), v in iteritems(state_dict)]
+    return [(etype, state_key, v) for (etype, state_key), v in state_dict.items()]
 
 
 def _decode_state_dict(input):
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index dd340be9a7..f6b507977f 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -16,8 +16,6 @@ import collections
 import re
 from typing import Any, Mapping, Union
 
-from six import string_types
-
 from frozendict import frozendict
 
 from twisted.internet import defer
@@ -318,7 +316,7 @@ def serialize_event(
 
     if only_event_fields:
         if not isinstance(only_event_fields, list) or not all(
-            isinstance(f, string_types) for f in only_event_fields
+            isinstance(f, str) for f in only_event_fields
         ):
             raise TypeError("only_event_fields must be a list of strings")
         d = only_fields(d, only_event_fields)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index b001c64bb4..588d222f36 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six import integer_types, string_types
-
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
 from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import EventFormatVersions
@@ -53,7 +51,7 @@ class EventValidator(object):
         event_strings = ["origin"]
 
         for s in event_strings:
-            if not isinstance(getattr(event, s), string_types):
+            if not isinstance(getattr(event, s), str):
                 raise SynapseError(400, "'%s' not a string type" % (s,))
 
         # Depending on the room version, ensure the data is spec compliant JSON.
@@ -90,7 +88,7 @@ class EventValidator(object):
         max_lifetime = event.content.get("max_lifetime")
 
         if min_lifetime is not None:
-            if not isinstance(min_lifetime, integer_types):
+            if not isinstance(min_lifetime, int):
                 raise SynapseError(
                     code=400,
                     msg="'min_lifetime' must be an integer",
@@ -124,7 +122,7 @@ class EventValidator(object):
                 )
 
         if max_lifetime is not None:
-            if not isinstance(max_lifetime, integer_types):
+            if not isinstance(max_lifetime, int):
                 raise SynapseError(
                     code=400,
                     msg="'max_lifetime' must be an integer",
@@ -183,7 +181,7 @@ class EventValidator(object):
             strings.append("state_key")
 
         for s in strings:
-            if not isinstance(getattr(event, s), string_types):
+            if not isinstance(getattr(event, s), str):
                 raise SynapseError(400, "Not '%s' a string type" % (s,))
 
         RoomID.from_string(event.room_id)
@@ -223,7 +221,7 @@ class EventValidator(object):
         for s in keys:
             if s not in d:
                 raise SynapseError(400, "'%s' not in content" % (s,))
-            if not isinstance(d[s], string_types):
+            if not isinstance(d[s], str):
                 raise SynapseError(400, "'%s' not a string type" % (s,))
 
     def _ensure_state_event(self, event):
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index c0012c6872..420df2385f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -17,8 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Iterable, List
 
-import six
-
 from twisted.internet import defer
 from twisted.internet.defer import Deferred, DeferredList
 from twisted.python.failure import Failure
@@ -93,8 +91,8 @@ class FederationBase(object):
                     # *actual* redacted copy to be on the safe side.)
                     redacted_event = prune_event(pdu)
                     if set(redacted_event.keys()) == set(pdu.keys()) and set(
-                        six.iterkeys(redacted_event.content)
-                    ) == set(six.iterkeys(pdu.content)):
+                        redacted_event.content.keys()
+                    ) == set(pdu.content.keys()):
                         logger.info(
                             "Event %s seems to have been redacted; using our redacted "
                             "copy",
@@ -294,7 +292,7 @@ def event_from_pdu_json(
     assert_params_in_dict(pdu_json, ("type", "depth"))
 
     depth = pdu_json["depth"]
-    if not isinstance(depth, six.integer_types):
+    if not isinstance(depth, int):
         raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
 
     if depth < 0:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 32a8a2ee46..e704cf2f44 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -17,11 +17,8 @@
 import logging
 from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
 
-import six
-from six import iteritems
-
 from canonicaljson import json
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 from twisted.internet.abstract import isIPAddress
@@ -73,6 +70,10 @@ received_queries_counter = Counter(
     "synapse_federation_server_received_queries", "", ["type"]
 )
 
+pdu_process_time = Histogram(
+    "synapse_federation_server_pdu_process_time", "Time taken to process an event",
+)
+
 
 class FederationServer(FederationBase):
     def __init__(self, hs):
@@ -274,21 +275,22 @@ class FederationServer(FederationBase):
 
             for pdu in pdus_by_room[room_id]:
                 event_id = pdu.event_id
-                with nested_logging_context(event_id):
-                    try:
-                        await self._handle_received_pdu(origin, pdu)
-                        pdu_results[event_id] = {}
-                    except FederationError as e:
-                        logger.warning("Error handling PDU %s: %s", event_id, e)
-                        pdu_results[event_id] = {"error": str(e)}
-                    except Exception as e:
-                        f = failure.Failure()
-                        pdu_results[event_id] = {"error": str(e)}
-                        logger.error(
-                            "Failed to handle PDU %s",
-                            event_id,
-                            exc_info=(f.type, f.value, f.getTracebackObject()),
-                        )
+                with pdu_process_time.time():
+                    with nested_logging_context(event_id):
+                        try:
+                            await self._handle_received_pdu(origin, pdu)
+                            pdu_results[event_id] = {}
+                        except FederationError as e:
+                            logger.warning("Error handling PDU %s: %s", event_id, e)
+                            pdu_results[event_id] = {"error": str(e)}
+                        except Exception as e:
+                            f = failure.Failure()
+                            pdu_results[event_id] = {"error": str(e)}
+                            logger.error(
+                                "Failed to handle PDU %s",
+                                event_id,
+                                exc_info=(f.type, f.value, f.getTracebackObject()),
+                            )
 
         await concurrently_execute(
             process_pdus_for_room, pdus_by_room.keys(), TRANSACTION_CONCURRENCY_LIMIT
@@ -534,9 +536,9 @@ class FederationServer(FederationBase):
             ",".join(
                 (
                     "%s for %s:%s" % (key_id, user_id, device_id)
-                    for user_id, user_keys in iteritems(json_result)
-                    for device_id, device_keys in iteritems(user_keys)
-                    for key_id, _ in iteritems(device_keys)
+                    for user_id, user_keys in json_result.items()
+                    for device_id, device_keys in user_keys.items()
+                    for key_id, _ in device_keys.items()
                 )
             ),
         )
@@ -752,7 +754,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
 
 
 def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
-    if not isinstance(acl_entry, six.string_types):
+    if not isinstance(acl_entry, str):
         logger.warning(
             "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
         )
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 52f4f54215..6bbd762681 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -33,8 +33,6 @@ import logging
 from collections import namedtuple
 from typing import Dict, List, Tuple, Type
 
-from six import iteritems
-
 from sortedcontainers import SortedDict
 
 from twisted.internet import defer
@@ -327,7 +325,7 @@ class FederationRemoteSendQueue(object):
         # stream position.
         keyed_edus = {v: k for k, v in self.keyed_edu_changed.items()[i:j]}
 
-        for ((destination, edu_key), pos) in iteritems(keyed_edus):
+        for ((destination, edu_key), pos) in keyed_edus.items():
             rows.append(
                 (
                     pos,
@@ -530,10 +528,10 @@ def process_rows_for_federation(transaction_queue, rows):
             states=[state], destinations=destinations
         )
 
-    for destination, edu_map in iteritems(buff.keyed_edus):
+    for destination, edu_map in buff.keyed_edus.items():
         for key, edu in edu_map.items():
             transaction_queue.send_edu(edu, key)
 
-    for destination, edu_list in iteritems(buff.edus):
+    for destination, edu_list in buff.edus.items():
         for edu in edu_list:
             transaction_queue.send_edu(edu, None)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index d473576902..464d7a41de 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -16,8 +16,6 @@
 import logging
 from typing import Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
-from six import itervalues
-
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -203,7 +201,15 @@ class FederationSender(object):
 
                     logger.debug("Sending %s to %r", event, destinations)
 
-                    self._send_pdu(event, destinations)
+                    if destinations:
+                        self._send_pdu(event, destinations)
+
+                        now = self.clock.time_msec()
+                        ts = await self.store.get_received_ts(event.event_id)
+
+                        synapse.metrics.event_processing_lag_by_event.labels(
+                            "federation_sender"
+                        ).observe((now - ts) / 1000)
 
                 async def handle_room_events(events: Iterable[EventBase]) -> None:
                     with Measure(self.clock, "handle_room_events"):
@@ -218,7 +224,7 @@ class FederationSender(object):
                     defer.gatherResults(
                         [
                             run_in_background(handle_room_events, evs)
-                            for evs in itervalues(events_by_room)
+                            for evs in events_by_room.values()
                         ],
                         consumeErrors=True,
                     )
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 060bf07197..9f99311419 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -15,10 +15,9 @@
 # limitations under the License.
 
 import logging
+import urllib
 from typing import Any, Dict, Optional
 
-from six.moves import urllib
-
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 8a9de913b3..8db8ab1b7b 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -17,8 +17,6 @@
 
 import logging
 
-from six import string_types
-
 from synapse.api.errors import Codes, SynapseError
 from synapse.types import GroupID, RoomID, UserID, get_domain_from_id
 from synapse.util.async_helpers import concurrently_execute
@@ -513,7 +511,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
         for keyname in ("name", "avatar_url", "short_description", "long_description"):
             if keyname in content:
                 value = content[keyname]
-                if not isinstance(value, string_types):
+                if not isinstance(value, str):
                     raise SynapseError(400, "%r value is not a string" % (keyname,))
                 profile[keyname] = value
 
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index fe62f78e67..904c96eeec 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import itervalues
-
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -116,6 +114,12 @@ class ApplicationServicesHandler(object):
                         for service in services:
                             self.scheduler.submit_event_for_as(service, event)
 
+                        now = self.clock.time_msec()
+                        ts = yield self.store.get_received_ts(event.event_id)
+                        synapse.metrics.event_processing_lag_by_event.labels(
+                            "appservice_sender"
+                        ).observe((now - ts) / 1000)
+
                     @defer.inlineCallbacks
                     def handle_room_events(events):
                         for event in events:
@@ -125,7 +129,7 @@ class ApplicationServicesHandler(object):
                         defer.gatherResults(
                             [
                                 run_in_background(handle_room_events, evs)
-                                for evs in itervalues(events_by_room)
+                                for evs in events_by_room.values()
                             ],
                             consumeErrors=True,
                         )
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index bb3b43d5ae..c3f86e7414 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -297,7 +297,7 @@ class AuthHandler(BaseHandler):
 
         # Convert the URI and method to strings.
         uri = request.uri.decode("utf-8")
-        method = request.uri.decode("utf-8")
+        method = request.method.decode("utf-8")
 
         # If there's no session ID, create a new session.
         if not sid:
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 64aaa1335c..76f213723a 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -14,11 +14,10 @@
 # limitations under the License.
 
 import logging
+import urllib
 import xml.etree.ElementTree as ET
 from typing import Dict, Optional, Tuple
 
-from six.moves import urllib
-
 from twisted.web.client import PartialDownloadError
 
 from synapse.api.errors import Codes, LoginError
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 230d170258..31346b56c3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Any, Dict, Optional
 
-from six import iteritems, itervalues
-
 from twisted.internet import defer
 
 from synapse.api import errors
@@ -159,7 +157,7 @@ class DeviceWorkerHandler(BaseHandler):
             # The user may have left the room
             # TODO: Check if they actually did or if we were just invited.
             if room_id not in room_ids:
-                for key, event_id in iteritems(current_state_ids):
+                for key, event_id in current_state_ids.items():
                     etype, state_key = key
                     if etype != EventTypes.Member:
                         continue
@@ -182,7 +180,7 @@ class DeviceWorkerHandler(BaseHandler):
                 log_kv(
                     {"event": "encountered empty previous state", "room_id": room_id}
                 )
-                for key, event_id in iteritems(current_state_ids):
+                for key, event_id in current_state_ids.items():
                     etype, state_key = key
                     if etype != EventTypes.Member:
                         continue
@@ -198,10 +196,10 @@ class DeviceWorkerHandler(BaseHandler):
 
             # Check if we've joined the room? If so we just blindly add all the users to
             # the "possibly changed" users.
-            for state_dict in itervalues(prev_state_ids):
+            for state_dict in prev_state_ids.values():
                 member_event = state_dict.get((EventTypes.Member, user_id), None)
                 if not member_event or member_event != current_member_id:
-                    for key, event_id in iteritems(current_state_ids):
+                    for key, event_id in current_state_ids.items():
                         etype, state_key = key
                         if etype != EventTypes.Member:
                             continue
@@ -211,14 +209,14 @@ class DeviceWorkerHandler(BaseHandler):
             # If there has been any change in membership, include them in the
             # possibly changed list. We'll check if they are joined below,
             # and we're not toooo worried about spuriously adding users.
-            for key, event_id in iteritems(current_state_ids):
+            for key, event_id in current_state_ids.items():
                 etype, state_key = key
                 if etype != EventTypes.Member:
                     continue
 
                 # check if this member has changed since any of the extremities
                 # at the stream_ordering, and add them to the list if so.
-                for state_dict in itervalues(prev_state_ids):
+                for state_dict in prev_state_ids.values():
                     prev_event_id = state_dict.get(key, None)
                     if not prev_event_id or prev_event_id != event_id:
                         if state_key != user_id:
@@ -693,6 +691,7 @@ class DeviceListUpdater(object):
 
         return False
 
+    @trace
     @defer.inlineCallbacks
     def _maybe_retry_device_resync(self):
         """Retry to resync device lists that are out of sync, except if another retry is
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 05c4b3eec0..610b08d00b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,8 +18,6 @@ from typing import Any, Dict
 
 from canonicaljson import json
 
-from twisted.internet import defer
-
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
@@ -51,8 +49,7 @@ class DeviceMessageHandler(object):
 
         self._device_list_updater = hs.get_device_handler().device_list_updater
 
-    @defer.inlineCallbacks
-    def on_direct_to_device_edu(self, origin, content):
+    async def on_direct_to_device_edu(self, origin, content):
         local_messages = {}
         sender_user_id = content["sender"]
         if origin != get_domain_from_id(sender_user_id):
@@ -82,11 +79,11 @@ class DeviceMessageHandler(object):
             }
             local_messages[user_id] = messages_by_device
 
-            yield self._check_for_unknown_devices(
+            await self._check_for_unknown_devices(
                 message_type, sender_user_id, by_device
             )
 
-        stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
+        stream_id = await self.store.add_messages_from_remote_to_device_inbox(
             origin, message_id, local_messages
         )
 
@@ -94,14 +91,13 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
-    @defer.inlineCallbacks
-    def _check_for_unknown_devices(
+    async def _check_for_unknown_devices(
         self,
         message_type: str,
         sender_user_id: str,
         by_device: Dict[str, Dict[str, Any]],
     ):
-        """Checks inbound device messages for unkown remote devices, and if
+        """Checks inbound device messages for unknown remote devices, and if
         found marks the remote cache for the user as stale.
         """
 
@@ -115,7 +111,7 @@ class DeviceMessageHandler(object):
             requesting_device_ids.add(device_id)
 
         # Check if we are tracking the devices of the remote user.
-        room_ids = yield self.store.get_rooms_for_user(sender_user_id)
+        room_ids = await self.store.get_rooms_for_user(sender_user_id)
         if not room_ids:
             logger.info(
                 "Received device message from remote device we don't"
@@ -127,7 +123,7 @@ class DeviceMessageHandler(object):
 
         # If we are tracking check that we know about the sending
         # devices.
-        cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
+        cached_devices = await self.store.get_cached_devices_for_user(sender_user_id)
 
         unknown_devices = requesting_device_ids - set(cached_devices)
         if unknown_devices:
@@ -136,15 +132,14 @@ class DeviceMessageHandler(object):
                 sender_user_id,
                 unknown_devices,
             )
-            yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
+            await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
 
             # Immediately attempt a resync in the background
             run_in_background(
                 self._device_list_updater.user_device_resync, sender_user_id
             )
 
-    @defer.inlineCallbacks
-    def send_device_message(self, sender_user_id, message_type, messages):
+    async def send_device_message(self, sender_user_id, message_type, messages):
         set_tag("number_of_messages", len(messages))
         set_tag("sender", sender_user_id)
         local_messages = {}
@@ -183,7 +178,7 @@ class DeviceMessageHandler(object):
                 }
 
         log_kv({"local_messages": local_messages})
-        stream_id = yield self.store.add_messages_to_device_inbox(
+        stream_id = await self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index f2f16b1e43..79a2df6201 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,8 +17,6 @@ import logging
 import string
 from typing import Iterable, List, Optional
 
-from twisted.internet import defer
-
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
 from synapse.api.errors import (
     AuthError,
@@ -55,8 +53,7 @@ class DirectoryHandler(BaseHandler):
 
         self.spam_checker = hs.get_spam_checker()
 
-    @defer.inlineCallbacks
-    def _create_association(
+    async def _create_association(
         self,
         room_alias: RoomAlias,
         room_id: str,
@@ -76,13 +73,13 @@ class DirectoryHandler(BaseHandler):
         # TODO(erikj): Add transactions.
         # TODO(erikj): Check if there is a current association.
         if not servers:
-            users = yield self.state.get_current_users_in_room(room_id)
+            users = await self.state.get_current_users_in_room(room_id)
             servers = {get_domain_from_id(u) for u in users}
 
         if not servers:
             raise SynapseError(400, "Failed to get server list")
 
-        yield self.store.create_room_alias_association(
+        await self.store.create_room_alias_association(
             room_alias, room_id, servers, creator=creator
         )
 
@@ -93,7 +90,7 @@ class DirectoryHandler(BaseHandler):
         room_id: str,
         servers: Optional[List[str]] = None,
         check_membership: bool = True,
-    ):
+    ) -> None:
         """Attempt to create a new alias
 
         Args:
@@ -103,9 +100,6 @@ class DirectoryHandler(BaseHandler):
             servers: Iterable of servers that others servers should try and join via
             check_membership: Whether to check if the user is in the room
                 before the alias can be set (if the server's config requires it).
-
-        Returns:
-            Deferred
         """
 
         user_id = requester.user.to_string()
@@ -148,7 +142,7 @@ class DirectoryHandler(BaseHandler):
                 # per alias creation rule?
                 raise SynapseError(403, "Not allowed to create alias")
 
-            can_create = await self.can_modify_alias(room_alias, user_id=user_id)
+            can_create = self.can_modify_alias(room_alias, user_id=user_id)
             if not can_create:
                 raise AuthError(
                     400,
@@ -158,7 +152,9 @@ class DirectoryHandler(BaseHandler):
 
         await self._create_association(room_alias, room_id, servers, creator=user_id)
 
-    async def delete_association(self, requester: Requester, room_alias: RoomAlias):
+    async def delete_association(
+        self, requester: Requester, room_alias: RoomAlias
+    ) -> str:
         """Remove an alias from the directory
 
         (this is only meant for human users; AS users should call
@@ -169,7 +165,7 @@ class DirectoryHandler(BaseHandler):
             room_alias
 
         Returns:
-            Deferred[unicode]: room id that the alias used to point to
+            room id that the alias used to point to
 
         Raises:
             NotFoundError: if the alias doesn't exist
@@ -191,7 +187,7 @@ class DirectoryHandler(BaseHandler):
         if not can_delete:
             raise AuthError(403, "You don't have permission to delete the alias.")
 
-        can_delete = await self.can_modify_alias(room_alias, user_id=user_id)
+        can_delete = self.can_modify_alias(room_alias, user_id=user_id)
         if not can_delete:
             raise SynapseError(
                 400,
@@ -208,8 +204,7 @@ class DirectoryHandler(BaseHandler):
 
         return room_id
 
-    @defer.inlineCallbacks
-    def delete_appservice_association(
+    async def delete_appservice_association(
         self, service: ApplicationService, room_alias: RoomAlias
     ):
         if not service.is_interested_in_alias(room_alias.to_string()):
@@ -218,29 +213,27 @@ class DirectoryHandler(BaseHandler):
                 "This application service has not reserved this kind of alias",
                 errcode=Codes.EXCLUSIVE,
             )
-        yield self._delete_association(room_alias)
+        await self._delete_association(room_alias)
 
-    @defer.inlineCallbacks
-    def _delete_association(self, room_alias: RoomAlias):
+    async def _delete_association(self, room_alias: RoomAlias):
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
 
-        room_id = yield self.store.delete_room_alias(room_alias)
+        room_id = await self.store.delete_room_alias(room_alias)
 
         return room_id
 
-    @defer.inlineCallbacks
-    def get_association(self, room_alias: RoomAlias):
+    async def get_association(self, room_alias: RoomAlias):
         room_id = None
         if self.hs.is_mine(room_alias):
-            result = yield self.get_association_from_room_alias(room_alias)
+            result = await self.get_association_from_room_alias(room_alias)
 
             if result:
                 room_id = result.room_id
                 servers = result.servers
         else:
             try:
-                result = yield self.federation.make_query(
+                result = await self.federation.make_query(
                     destination=room_alias.domain,
                     query_type="directory",
                     args={"room_alias": room_alias.to_string()},
@@ -265,7 +258,7 @@ class DirectoryHandler(BaseHandler):
                 Codes.NOT_FOUND,
             )
 
-        users = yield self.state.get_current_users_in_room(room_id)
+        users = await self.state.get_current_users_in_room(room_id)
         extra_servers = {get_domain_from_id(u) for u in users}
         servers = set(extra_servers) | set(servers)
 
@@ -277,13 +270,12 @@ class DirectoryHandler(BaseHandler):
 
         return {"room_id": room_id, "servers": servers}
 
-    @defer.inlineCallbacks
-    def on_directory_query(self, args):
+    async def on_directory_query(self, args):
         room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room Alias is not hosted on this homeserver")
 
-        result = yield self.get_association_from_room_alias(room_alias)
+        result = await self.get_association_from_room_alias(room_alias)
 
         if result is not None:
             return {"room_id": result.room_id, "servers": result.servers}
@@ -344,16 +336,15 @@ class DirectoryHandler(BaseHandler):
                 ratelimit=False,
             )
 
-    @defer.inlineCallbacks
-    def get_association_from_room_alias(self, room_alias: RoomAlias):
-        result = yield self.store.get_association_from_room_alias(room_alias)
+    async def get_association_from_room_alias(self, room_alias: RoomAlias):
+        result = await self.store.get_association_from_room_alias(room_alias)
         if not result:
             # Query AS to see if it exists
             as_handler = self.appservice_handler
-            result = yield as_handler.query_room_alias_exists(room_alias)
+            result = await as_handler.query_room_alias_exists(room_alias)
         return result
 
-    def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None):
+    def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None) -> bool:
         # Any application service "interested" in an alias they are regexing on
         # can modify the alias.
         # Users can only modify the alias if ALL the interested services have
@@ -366,12 +357,12 @@ class DirectoryHandler(BaseHandler):
         for service in interested_services:
             if user_id == service.sender:
                 # this user IS the app service so they can do whatever they like
-                return defer.succeed(True)
+                return True
             elif service.is_exclusive_alias(alias.to_string()):
                 # another service has an exclusive lock on this alias.
-                return defer.succeed(False)
+                return False
         # either no interested services, or no service with an exclusive lock
-        return defer.succeed(True)
+        return True
 
     async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
         """Determine whether a user can delete an alias.
@@ -459,8 +450,7 @@ class DirectoryHandler(BaseHandler):
 
         await self.store.set_room_is_public(room_id, making_public)
 
-    @defer.inlineCallbacks
-    def edit_published_appservice_room_list(
+    async def edit_published_appservice_room_list(
         self, appservice_id: str, network_id: str, room_id: str, visibility: str
     ):
         """Add or remove a room from the appservice/network specific public
@@ -475,7 +465,7 @@ class DirectoryHandler(BaseHandler):
         if visibility not in ["public", "private"]:
             raise SynapseError(400, "Invalid visibility setting")
 
-        yield self.store.set_room_is_public_appservice(
+        await self.store.set_room_is_public_appservice(
             room_id, appservice_id, network_id, visibility == "public"
         )
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 774a252619..a7e60cbc26 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -17,8 +17,6 @@
 
 import logging
 
-from six import iteritems
-
 import attr
 from canonicaljson import encode_canonical_json, json
 from signedjson.key import decode_verify_key_bytes
@@ -135,7 +133,7 @@ class E2eKeysHandler(object):
         remote_queries_not_in_cache = {}
         if remote_queries:
             query_list = []
-            for user_id, device_ids in iteritems(remote_queries):
+            for user_id, device_ids in remote_queries.items():
                 if device_ids:
                     query_list.extend((user_id, device_id) for device_id in device_ids)
                 else:
@@ -145,9 +143,9 @@ class E2eKeysHandler(object):
                 user_ids_not_in_cache,
                 remote_results,
             ) = yield self.store.get_user_devices_from_cache(query_list)
-            for user_id, devices in iteritems(remote_results):
+            for user_id, devices in remote_results.items():
                 user_devices = results.setdefault(user_id, {})
-                for device_id, device in iteritems(devices):
+                for device_id, device in devices.items():
                     keys = device.get("keys", None)
                     device_display_name = device.get("device_display_name", None)
                     if keys:
@@ -446,9 +444,9 @@ class E2eKeysHandler(object):
             ",".join(
                 (
                     "%s for %s:%s" % (key_id, user_id, device_id)
-                    for user_id, user_keys in iteritems(json_result)
-                    for device_id, device_keys in iteritems(user_keys)
-                    for key_id, _ in iteritems(device_keys)
+                    for user_id, user_keys in json_result.items()
+                    for device_id, device_keys in user_keys.items()
+                    for key_id, _ in device_keys.items()
                 )
             ),
         )
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 9abaf13b8f..f55470a707 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six import iteritems
-
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -205,8 +203,8 @@ class E2eRoomKeysHandler(object):
             )
             to_insert = []  # batch the inserts together
             changed = False  # if anything has changed, we need to update the etag
-            for room_id, room in iteritems(room_keys["rooms"]):
-                for session_id, room_key in iteritems(room["sessions"]):
+            for room_id, room in room_keys["rooms"].items():
+                for session_id, room_key in room["sessions"].items():
                     if not isinstance(room_key["is_verified"], bool):
                         msg = (
                             "is_verified must be a boolean in keys for session %s in"
@@ -351,6 +349,7 @@ class E2eRoomKeysHandler(object):
                     raise
 
             res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
+            res["etag"] = str(res["etag"])
             return res
 
     @trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d0b62f4cf2..4dbd8e1d98 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,12 +19,9 @@
 
 import itertools
 import logging
+from http import HTTPStatus
 from typing import Dict, Iterable, List, Optional, Sequence, Tuple
 
-import six
-from six import iteritems, itervalues
-from six.moves import http_client, zip
-
 import attr
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
@@ -33,7 +30,12 @@ from unpaddedbase64 import decode_base64
 from twisted.internet import defer
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+    EventTypes,
+    Membership,
+    RejectedReason,
+    RoomEncryptionAlgorithms,
+)
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -374,6 +376,7 @@ class FederationHandler(BaseHandler):
 
                     room_version = await self.store.get_room_version_id(room_id)
                     state_map = await resolve_events_with_store(
+                        self.clock,
                         room_id,
                         room_version,
                         state_maps,
@@ -393,7 +396,7 @@ class FederationHandler(BaseHandler):
                     )
                     event_map.update(evs)
 
-                    state = [event_map[e] for e in six.itervalues(state_map)]
+                    state = [event_map[e] for e in state_map.values()]
                 except Exception:
                     logger.warning(
                         "[%s %s] Error attempting to resolve state at missing "
@@ -742,7 +745,10 @@ class FederationHandler(BaseHandler):
                 if device:
                     keys = device.get("keys", {}).get("keys", {})
 
-                    if event.content.get("algorithm") == "m.megolm.v1.aes-sha2":
+                    if (
+                        event.content.get("algorithm")
+                        == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
+                    ):
                         # For this algorithm we expect a curve25519 key.
                         key_name = "curve25519:%s" % (device_id,)
                         current_keys = [keys.get(key_name)]
@@ -1001,7 +1007,7 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in iteritems(state)
+                for (e_type, state_key), event in state.items()
                 if e_type == EventTypes.Member and event.membership == Membership.JOIN
             ]
 
@@ -1091,16 +1097,16 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = await self.store.get_events(
-            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
+            [e_id for ids in states.values() for e_id in ids.values()],
             get_prev_content=False,
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in iteritems(state_dict)
+                for k, e_id in state_dict.items()
                 if e_id in state_map
             }
-            for key, state_dict in iteritems(states)
+            for key, state_dict in states.items()
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -1188,7 +1194,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.prev_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
 
         if len(ev.auth_event_ids()) > 10:
             logger.warning(
@@ -1196,7 +1202,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.auth_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
 
     async def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -1539,7 +1545,7 @@ class FederationHandler(BaseHandler):
 
         # block any attempts to invite the server notices mxid
         if event.state_key == self._server_notices_mxid:
-            raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+            raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
         # keep a record of the room version, if we don't yet know it.
         # (this may get overwritten if we later get a different room version in a
@@ -1725,7 +1731,7 @@ class FederationHandler(BaseHandler):
         state_groups = await self.state_store.get_state_groups(room_id, [event_id])
 
         if state_groups:
-            _, state = list(iteritems(state_groups)).pop()
+            _, state = list(state_groups.items()).pop()
             results = {(e.type, e.state_key): e for e in state}
 
             if event.is_state():
@@ -2088,7 +2094,7 @@ class FederationHandler(BaseHandler):
                     room_version, state_sets, event
                 )
                 current_state_ids = {
-                    k: e.event_id for k, e in iteritems(current_state_ids)
+                    k: e.event_id for k, e in current_state_ids.items()
                 }
             else:
                 current_state_ids = await self.state_handler.get_current_state_ids(
@@ -2104,7 +2110,7 @@ class FederationHandler(BaseHandler):
             # Now check if event pass auth against said current state
             auth_types = auth_types_for_event(event)
             current_state_ids = [
-                e for k, e in iteritems(current_state_ids) if k in auth_types
+                e for k, e in current_state_ids.items() if k in auth_types
             ]
 
             current_auth_events = await self.store.get_events(current_state_ids)
@@ -2420,7 +2426,7 @@ class FederationHandler(BaseHandler):
         else:
             event_key = None
         state_updates = {
-            k: a.event_id for k, a in iteritems(auth_events) if k != event_key
+            k: a.event_id for k, a in auth_events.items() if k != event_key
         }
 
         current_state_ids = await context.get_current_state_ids()
@@ -2431,7 +2437,7 @@ class FederationHandler(BaseHandler):
         prev_state_ids = await context.get_prev_state_ids()
         prev_state_ids = dict(prev_state_ids)
 
-        prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
+        prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
 
         # create a new state group as a delta from the existing one.
         prev_group = context.state_group
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index ebe8d25bd8..7cb106e365 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six import iteritems
-
 from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.types import get_domain_from_id
 
@@ -227,7 +225,7 @@ class GroupsLocalWorkerHandler(object):
 
         results = {}
         failed_results = []
-        for destination, dest_user_ids in iteritems(destinations):
+        for destination, dest_user_ids in destinations.items():
             try:
                 r = await self.transport_client.bulk_get_publicised_groups(
                     destination, list(dest_user_ids)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 649ca1f08a..665ad19b5d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Optional, Tuple
 
-from six import iteritems, itervalues, string_types
-
 from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
@@ -246,7 +244,7 @@ class MessageHandler(object):
                 "avatar_url": profile.avatar_url,
                 "display_name": profile.display_name,
             }
-            for user_id, profile in iteritems(users_with_profile)
+            for user_id, profile in users_with_profile.items()
         }
 
     def maybe_schedule_expiry(self, event):
@@ -715,7 +713,7 @@ class EventCreationHandler(object):
 
             spam_error = self.spam_checker.check_event_for_spam(event)
             if spam_error:
-                if not isinstance(spam_error, string_types):
+                if not isinstance(spam_error, str):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
@@ -881,7 +879,9 @@ class EventCreationHandler(object):
         """
         room_alias = RoomAlias.from_string(room_alias_str)
         try:
-            mapping = yield directory_handler.get_association(room_alias)
+            mapping = yield defer.ensureDeferred(
+                directory_handler.get_association(room_alias)
+            )
         except SynapseError as e:
             # Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
             if e.errcode == Codes.NOT_FOUND:
@@ -988,7 +988,7 @@ class EventCreationHandler(object):
 
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(current_state_ids)
+                    for k, e_id in current_state_ids.items()
                     if k[0] in self.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -1002,7 +1002,7 @@ class EventCreationHandler(object):
                         "content": e.content,
                         "sender": e.sender,
                     }
-                    for e in itervalues(state_to_include)
+                    for e in state_to_include.values()
                 ]
 
                 invitee = UserID.from_string(event.state_key)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d7442c62a7..da06582d4b 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,9 +15,6 @@
 # limitations under the License.
 import logging
 
-from six import iteritems
-
-from twisted.internet import defer
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
@@ -99,8 +96,7 @@ class PaginationHandler(object):
                     job["longest_max_lifetime"],
                 )
 
-    @defer.inlineCallbacks
-    def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+    async def purge_history_for_rooms_in_range(self, min_ms, max_ms):
         """Purge outdated events from rooms within the given retention range.
 
         If a default retention policy is defined in the server's configuration and its
@@ -139,13 +135,13 @@ class PaginationHandler(object):
             include_null,
         )
 
-        rooms = yield self.store.get_rooms_for_retention_period_in_range(
+        rooms = await self.store.get_rooms_for_retention_period_in_range(
             min_ms, max_ms, include_null
         )
 
         logger.debug("[purge] Rooms to purge: %s", rooms)
 
-        for room_id, retention_policy in iteritems(rooms):
+        for room_id, retention_policy in rooms.items():
             logger.info("[purge] Attempting to purge messages in room %s", room_id)
 
             if room_id in self._purges_in_progress_by_room:
@@ -167,9 +163,9 @@ class PaginationHandler(object):
             # Figure out what token we should start purging at.
             ts = self.clock.time_msec() - max_lifetime
 
-            stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+            stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
 
-            r = yield self.store.get_room_event_before_stream_ordering(
+            r = await self.store.get_room_event_before_stream_ordering(
                 room_id, stream_ordering,
             )
             if not r:
@@ -229,8 +225,7 @@ class PaginationHandler(object):
         )
         return purge_id
 
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token, delete_local_events):
+    async def _purge_history(self, purge_id, room_id, token, delete_local_events):
         """Carry out a history purge on a room.
 
         Args:
@@ -239,14 +234,11 @@ class PaginationHandler(object):
             token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
-
-        Returns:
-            Deferred
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.storage.purge_events.purge_history(
+            with await self.pagination_lock.write(room_id):
+                await self.storage.purge_events.purge_history(
                     room_id, token, delete_local_events
                 )
             logger.info("[purge] complete")
@@ -284,9 +276,7 @@ class PaginationHandler(object):
             await self.store.get_room_version_id(room_id)
 
             # first check that we have no users in this room
-            joined = await defer.maybeDeferred(
-                self.store.is_host_joined, room_id, self._server_name
-            )
+            joined = await self.store.is_host_joined(room_id, self._server_name)
 
             if joined:
                 raise SynapseError(400, "Users are still joined to this room")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3594f3b00f..d2f25ae12a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,9 +25,7 @@ The methods that define policy are:
 import abc
 import logging
 from contextlib import contextmanager
-from typing import Dict, Iterable, List, Set
-
-from six import iteritems, itervalues
+from typing import Dict, Iterable, List, Set, Tuple
 
 from prometheus_client import Counter
 from typing_extensions import ContextManager
@@ -170,14 +168,14 @@ class BasePresenceHandler(abc.ABC):
             for user_id in user_ids
         }
 
-        missing = [user_id for user_id, state in iteritems(states) if not state]
+        missing = [user_id for user_id, state in states.items() if not state]
         if missing:
             # There are things not in our in memory cache. Lets pull them out of
             # the database.
             res = await self.store.get_presence_for_users(missing)
             states.update(res)
 
-            missing = [user_id for user_id, state in iteritems(states) if not state]
+            missing = [user_id for user_id, state in states.items() if not state]
             if missing:
                 new = {
                     user_id: UserPresenceState.default(user_id) for user_id in missing
@@ -632,7 +630,7 @@ class PresenceHandler(BasePresenceHandler):
             await self._update_states(
                 [
                     prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
-                    for prev_state in itervalues(prev_states)
+                    for prev_state in prev_states.values()
                 ]
             )
             self.external_process_last_updated_ms.pop(process_id, None)
@@ -775,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
 
         return False
 
-    async def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
         """
         Gets a list of presence update rows from between the given stream ids.
         Each row has:
@@ -787,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
         - last_user_sync_ts(int)
         - status_msg(int)
         - currently_active(int)
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
+
         # TODO(markjh): replicate the unpersisted changes.
         # This could use the in-memory stores for recent changes.
-        rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
+        rows = await self.store.get_all_presence_updates(
+            instance_name, last_id, current_id, limit
+        )
         return rows
 
     def notify_new_event(self):
@@ -1087,7 +1108,7 @@ class PresenceEventSource(object):
             return (list(updates.values()), max_token)
         else:
             return (
-                [s for s in itervalues(updates) if s.state != PresenceState.OFFLINE],
+                [s for s in updates.values() if s.state != PresenceState.OFFLINE],
                 max_token,
             )
 
@@ -1323,11 +1344,11 @@ def get_interested_remotes(store, states, state_handler):
     # hosts in those rooms.
     room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
 
-    for room_id, states in iteritems(room_ids_to_states):
+    for room_id, states in room_ids_to_states.items():
         hosts = yield state_handler.get_current_hosts_in_room(room_id)
         hosts_and_states.append((hosts, states))
 
-    for user_id, states in iteritems(users_to_states):
+    for user_id, states in users_to_states.items():
         host = get_domain_from_id(user_id)
         hosts_and_states.append(([host], states))
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 302efc1b9a..4b1e3073a8 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import raise_from
-
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler):
                 )
                 return result
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
@@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
@@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 51979ea43e..78c3772ac1 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -17,7 +17,7 @@
 import logging
 
 from synapse import types
-from synapse.api.constants import MAX_USERID_LENGTH, LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType
 from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
 from synapse.config.server import is_threepid_reserved
 from synapse.http.servlet import assert_params_in_dict
@@ -26,7 +26,8 @@ from synapse.replication.http.register import (
     ReplicationPostRegisterActionsServlet,
     ReplicationRegisterServlet,
 )
-from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.storage.state import StateFilter
+from synapse.types import RoomAlias, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
 
 from ._base import BaseHandler
@@ -270,51 +271,157 @@ class RegistrationHandler(BaseHandler):
 
         return user_id
 
-    async def _auto_join_rooms(self, user_id):
-        """Automatically joins users to auto join rooms - creating the room in the first place
-        if the user is the first to be created.
+    async def _create_and_join_rooms(self, user_id: str):
+        """
+        Create the auto-join rooms and join or invite the user to them.
+
+        This should only be called when the first "real" user registers.
 
         Args:
-            user_id(str): The user to join
+            user_id: The user to join
         """
-        # auto-join the user to any rooms we're supposed to dump them into
-        fake_requester = create_requester(user_id)
+        # Getting the handlers during init gives a dependency loop.
+        room_creation_handler = self.hs.get_room_creation_handler()
+        room_member_handler = self.hs.get_room_member_handler()
 
-        # try to create the room if we're the first real user on the server. Note
-        # that an auto-generated support or bot user is not a real user and will never be
-        # the user to create the room
-        should_auto_create_rooms = False
-        is_real_user = await self.store.is_real_user(user_id)
-        if self.hs.config.autocreate_auto_join_rooms and is_real_user:
-            count = await self.store.count_real_users()
-            should_auto_create_rooms = count == 1
-        for r in self.hs.config.auto_join_rooms:
+        # Generate a stub for how the rooms will be configured.
+        stub_config = {
+            "preset": self.hs.config.registration.autocreate_auto_join_room_preset,
+        }
+
+        # If the configuration providers a user ID to create rooms with, use
+        # that instead of the first user registered.
+        requires_join = False
+        if self.hs.config.registration.auto_join_user_id:
+            fake_requester = create_requester(
+                self.hs.config.registration.auto_join_user_id
+            )
+
+            # If the room requires an invite, add the user to the list of invites.
+            if self.hs.config.registration.auto_join_room_requires_invite:
+                stub_config["invite"] = [user_id]
+
+            # If the room is being created by a different user, the first user
+            # registered needs to join it. Note that in the case of an invitation
+            # being necessary this will occur after the invite was sent.
+            requires_join = True
+        else:
+            fake_requester = create_requester(user_id)
+
+        # Choose whether to federate the new room.
+        if not self.hs.config.registration.autocreate_auto_join_rooms_federated:
+            stub_config["creation_content"] = {"m.federate": False}
+
+        for r in self.hs.config.registration.auto_join_rooms:
             logger.info("Auto-joining %s to %s", user_id, r)
+
             try:
-                if should_auto_create_rooms:
-                    room_alias = RoomAlias.from_string(r)
-                    if self.hs.hostname != room_alias.domain:
-                        logger.warning(
-                            "Cannot create room alias %s, "
-                            "it does not match server domain",
-                            r,
-                        )
-                    else:
-                        # create room expects the localpart of the room alias
-                        room_alias_localpart = room_alias.localpart
-
-                        # getting the RoomCreationHandler during init gives a dependency
-                        # loop
-                        await self.hs.get_room_creation_handler().create_room(
-                            fake_requester,
-                            config={
-                                "preset": "public_chat",
-                                "room_alias_name": room_alias_localpart,
-                            },
+                room_alias = RoomAlias.from_string(r)
+
+                if self.hs.hostname != room_alias.domain:
+                    logger.warning(
+                        "Cannot create room alias %s, "
+                        "it does not match server domain",
+                        r,
+                    )
+                else:
+                    # A shallow copy is OK here since the only key that is
+                    # modified is room_alias_name.
+                    config = stub_config.copy()
+                    # create room expects the localpart of the room alias
+                    config["room_alias_name"] = room_alias.localpart
+
+                    info, _ = await room_creation_handler.create_room(
+                        fake_requester, config=config, ratelimit=False,
+                    )
+
+                    # If the room does not require an invite, but another user
+                    # created it, then ensure the first user joins it.
+                    if requires_join:
+                        await room_member_handler.update_membership(
+                            requester=create_requester(user_id),
+                            target=UserID.from_string(user_id),
+                            room_id=info["room_id"],
+                            # Since it was just created, there are no remote hosts.
+                            remote_room_hosts=[],
+                            action="join",
                             ratelimit=False,
                         )
+
+            except ConsentNotGivenError as e:
+                # Technically not necessary to pull out this error though
+                # moving away from bare excepts is a good thing to do.
+                logger.error("Failed to join new user to %r: %r", r, e)
+            except Exception as e:
+                logger.error("Failed to join new user to %r: %r", r, e)
+
+    async def _join_rooms(self, user_id: str):
+        """
+        Join or invite the user to the auto-join rooms.
+
+        Args:
+            user_id: The user to join
+        """
+        room_member_handler = self.hs.get_room_member_handler()
+
+        for r in self.hs.config.registration.auto_join_rooms:
+            logger.info("Auto-joining %s to %s", user_id, r)
+
+            try:
+                room_alias = RoomAlias.from_string(r)
+
+                if RoomAlias.is_valid(r):
+                    (
+                        room_id,
+                        remote_room_hosts,
+                    ) = await room_member_handler.lookup_room_alias(room_alias)
+                    room_id = room_id.to_string()
                 else:
-                    await self._join_user_to_room(fake_requester, r)
+                    raise SynapseError(
+                        400, "%s was not legal room ID or room alias" % (r,)
+                    )
+
+                # Calculate whether the room requires an invite or can be
+                # joined directly. Note that unless a join rule of public exists,
+                # it is treated as requiring an invite.
+                requires_invite = True
+
+                state = await self.store.get_filtered_current_state_ids(
+                    room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
+                )
+
+                event_id = state.get((EventTypes.JoinRules, ""))
+                if event_id:
+                    join_rules_event = await self.store.get_event(
+                        event_id, allow_none=True
+                    )
+                    if join_rules_event:
+                        join_rule = join_rules_event.content.get("join_rule", None)
+                        requires_invite = join_rule and join_rule != JoinRules.PUBLIC
+
+                # Send the invite, if necessary.
+                if requires_invite:
+                    await room_member_handler.update_membership(
+                        requester=create_requester(
+                            self.hs.config.registration.auto_join_user_id
+                        ),
+                        target=UserID.from_string(user_id),
+                        room_id=room_id,
+                        remote_room_hosts=remote_room_hosts,
+                        action="invite",
+                        ratelimit=False,
+                    )
+
+                # Send the join.
+                await room_member_handler.update_membership(
+                    requester=create_requester(user_id),
+                    target=UserID.from_string(user_id),
+                    room_id=room_id,
+                    remote_room_hosts=remote_room_hosts,
+                    action="join",
+                    ratelimit=False,
+                )
+
             except ConsentNotGivenError as e:
                 # Technically not necessary to pull out this error though
                 # moving away from bare excepts is a good thing to do.
@@ -322,6 +429,29 @@ class RegistrationHandler(BaseHandler):
             except Exception as e:
                 logger.error("Failed to join new user to %r: %r", r, e)
 
+    async def _auto_join_rooms(self, user_id: str):
+        """Automatically joins users to auto join rooms - creating the room in the first place
+        if the user is the first to be created.
+
+        Args:
+            user_id: The user to join
+        """
+        # auto-join the user to any rooms we're supposed to dump them into
+
+        # try to create the room if we're the first real user on the server. Note
+        # that an auto-generated support or bot user is not a real user and will never be
+        # the user to create the room
+        should_auto_create_rooms = False
+        is_real_user = await self.store.is_real_user(user_id)
+        if self.hs.config.registration.autocreate_auto_join_rooms and is_real_user:
+            count = await self.store.count_real_users()
+            should_auto_create_rooms = count == 1
+
+        if should_auto_create_rooms:
+            await self._create_and_join_rooms(user_id)
+        else:
+            await self._join_rooms(user_id)
+
     async def post_consent_actions(self, user_id):
         """A series of registration actions that can only be carried out once consent
         has been granted
@@ -392,30 +522,6 @@ class RegistrationHandler(BaseHandler):
         self._next_generated_user_id += 1
         return str(id)
 
-    async def _join_user_to_room(self, requester, room_identifier):
-        room_member_handler = self.hs.get_room_member_handler()
-        if RoomID.is_valid(room_identifier):
-            room_id = room_identifier
-        elif RoomAlias.is_valid(room_identifier):
-            room_alias = RoomAlias.from_string(room_identifier)
-            room_id, remote_room_hosts = await room_member_handler.lookup_room_alias(
-                room_alias
-            )
-            room_id = room_id.to_string()
-        else:
-            raise SynapseError(
-                400, "%s was not legal room ID or room alias" % (room_identifier,)
-            )
-
-        await room_member_handler.update_membership(
-            requester=requester,
-            target=requester.user,
-            room_id=room_id,
-            remote_room_hosts=remote_room_hosts,
-            action="join",
-            ratelimit=False,
-        )
-
     def check_registration_ratelimit(self, address):
         """A simple helper method to check whether the registration rate limit has been hit
         for a given IP address
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 61db3ccc43..950a84acd0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,9 +24,12 @@ import string
 from collections import OrderedDict
 from typing import Tuple
 
-from six import iteritems, string_types
-
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.constants import (
+    EventTypes,
+    JoinRules,
+    RoomCreationPreset,
+    RoomEncryptionAlgorithms,
+)
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events.utils import copy_power_levels_contents
@@ -56,31 +59,6 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000
 
 
 class RoomCreationHandler(BaseHandler):
-
-    PRESETS_DICT = {
-        RoomCreationPreset.PRIVATE_CHAT: {
-            "join_rules": JoinRules.INVITE,
-            "history_visibility": "shared",
-            "original_invitees_have_ops": False,
-            "guest_can_join": True,
-            "power_level_content_override": {"invite": 0},
-        },
-        RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
-            "join_rules": JoinRules.INVITE,
-            "history_visibility": "shared",
-            "original_invitees_have_ops": True,
-            "guest_can_join": True,
-            "power_level_content_override": {"invite": 0},
-        },
-        RoomCreationPreset.PUBLIC_CHAT: {
-            "join_rules": JoinRules.PUBLIC,
-            "history_visibility": "shared",
-            "original_invitees_have_ops": False,
-            "guest_can_join": False,
-            "power_level_content_override": {},
-        },
-    }
-
     def __init__(self, hs):
         super(RoomCreationHandler, self).__init__(hs)
 
@@ -89,6 +67,39 @@ class RoomCreationHandler(BaseHandler):
         self.room_member_handler = hs.get_room_member_handler()
         self.config = hs.config
 
+        # Room state based off defined presets
+        self._presets_dict = {
+            RoomCreationPreset.PRIVATE_CHAT: {
+                "join_rules": JoinRules.INVITE,
+                "history_visibility": "shared",
+                "original_invitees_have_ops": False,
+                "guest_can_join": True,
+                "power_level_content_override": {"invite": 0},
+            },
+            RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
+                "join_rules": JoinRules.INVITE,
+                "history_visibility": "shared",
+                "original_invitees_have_ops": True,
+                "guest_can_join": True,
+                "power_level_content_override": {"invite": 0},
+            },
+            RoomCreationPreset.PUBLIC_CHAT: {
+                "join_rules": JoinRules.PUBLIC,
+                "history_visibility": "shared",
+                "original_invitees_have_ops": False,
+                "guest_can_join": False,
+                "power_level_content_override": {},
+            },
+        }
+
+        # Modify presets to selectively enable encryption by default per homeserver config
+        for preset_name, preset_config in self._presets_dict.items():
+            encrypted = (
+                preset_name
+                in self.config.encryption_enabled_by_default_for_room_presets
+            )
+            preset_config["encrypted"] = encrypted
+
         self._replication = hs.get_replication_data_handler()
 
         # linearizer to stop two upgrades happening at once
@@ -364,7 +375,7 @@ class RoomCreationHandler(BaseHandler):
         # map from event_id to BaseEvent
         old_room_state_events = await self.store.get_events(old_room_state_ids.values())
 
-        for k, old_event_id in iteritems(old_room_state_ids):
+        for k, old_event_id in old_room_state_ids.items():
             old_event = old_room_state_events.get(old_event_id)
             if old_event:
                 initial_state[k] = old_event.content
@@ -417,7 +428,7 @@ class RoomCreationHandler(BaseHandler):
         old_room_member_state_events = await self.store.get_events(
             old_room_member_state_ids.values()
         )
-        for k, old_event in iteritems(old_room_member_state_events):
+        for k, old_event in old_room_member_state_events.items():
             # Only transfer ban events
             if (
                 "membership" in old_event.content
@@ -582,7 +593,7 @@ class RoomCreationHandler(BaseHandler):
             "room_version", self.config.default_room_version.identifier
         )
 
-        if not isinstance(room_version_id, string_types):
+        if not isinstance(room_version_id, str):
             raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
 
         room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
@@ -798,7 +809,7 @@ class RoomCreationHandler(BaseHandler):
             )
             return last_stream_id
 
-        config = RoomCreationHandler.PRESETS_DICT[preset_config]
+        config = self._presets_dict[preset_config]
 
         creator_id = creator.user.to_string()
 
@@ -888,6 +899,13 @@ class RoomCreationHandler(BaseHandler):
                 etype=etype, state_key=state_key, content=content
             )
 
+        if config["encrypted"]:
+            last_sent_stream_id = await send(
+                etype=EventTypes.RoomEncryption,
+                state_key="",
+                content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
+            )
+
         return last_sent_stream_id
 
     async def _generate_room_id(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 4cbc02b0d0..5e05be6181 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,8 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Any, Dict, Optional
 
-from six import iteritems
-
 import msgpack
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -271,7 +269,7 @@ class RoomListHandler(BaseHandler):
         event_map = yield self.store.get_events(
             [
                 event_id
-                for key, event_id in iteritems(current_state_ids)
+                for key, event_id in current_state_ids.items()
                 if key[0]
                 in (
                     EventTypes.Create,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f7af982f0..27c479da9e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,10 +17,9 @@
 
 import abc
 import logging
+from http import HTTPStatus
 from typing import Dict, Iterable, List, Optional, Tuple
 
-from six.moves import http_client
-
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -361,7 +360,7 @@ class RoomMemberHandler(object):
         if effective_membership_state == Membership.INVITE:
             # block any attempts to invite the server notices mxid
             if target.to_string() == self._server_notices_mxid:
-                raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+                raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
             block_invite = False
 
@@ -444,7 +443,7 @@ class RoomMemberHandler(object):
                 is_blocked = await self._is_server_notice_room(room_id)
                 if is_blocked:
                     raise SynapseError(
-                        http_client.FORBIDDEN,
+                        HTTPStatus.FORBIDDEN,
                         "You cannot reject this invite",
                         errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
                     )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6bdb24baff..4c7524493e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,8 +18,6 @@ import itertools
 import logging
 from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
 
-from six import iteritems, itervalues
-
 import attr
 from prometheus_client import Counter
 
@@ -390,7 +388,7 @@ class SyncHandler(object):
                 # result returned by the event source is poor form (it might cache
                 # the object)
                 room_id = event["room_id"]
-                event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+                event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
             receipt_key = since_token.receipt_key if since_token else "0"
@@ -408,7 +406,7 @@ class SyncHandler(object):
             for event in receipts:
                 room_id = event["room_id"]
                 # exclude room id, as above
-                event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+                event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
         return now_token, ephemeral_by_room
@@ -454,7 +452,7 @@ class SyncHandler(object):
                     current_state_ids_map = await self.state.get_current_state_ids(
                         room_id
                     )
-                    current_state_ids = frozenset(itervalues(current_state_ids_map))
+                    current_state_ids = frozenset(current_state_ids_map.values())
 
                 recents = await filter_events_for_client(
                     self.storage,
@@ -509,7 +507,7 @@ class SyncHandler(object):
                     current_state_ids_map = await self.state.get_current_state_ids(
                         room_id
                     )
-                    current_state_ids = frozenset(itervalues(current_state_ids_map))
+                    current_state_ids = frozenset(current_state_ids_map.values())
 
                 loaded_recents = await filter_events_for_client(
                     self.storage,
@@ -909,7 +907,7 @@ class SyncHandler(object):
                     logger.debug("filtering state from %r...", state_ids)
                     state_ids = {
                         t: event_id
-                        for t, event_id in iteritems(state_ids)
+                        for t, event_id in state_ids.items()
                         if cache.get(t[1]) != event_id
                     }
                     logger.debug("...to %r", state_ids)
@@ -1430,7 +1428,7 @@ class SyncHandler(object):
         if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
-                    joined_sync.timeline.events, itervalues(joined_sync.state)
+                    joined_sync.timeline.events, joined_sync.state.values()
                 )
                 for event in it:
                     if event.type == EventTypes.Member:
@@ -1505,7 +1503,7 @@ class SyncHandler(object):
         newly_left_rooms = []
         room_entries = []
         invited = []
-        for room_id, events in iteritems(mem_change_events_by_room_id):
+        for room_id, events in mem_change_events_by_room_id.items():
             logger.debug(
                 "Membership changes in %s: [%s]",
                 room_id,
@@ -1993,17 +1991,17 @@ def _calculate_state(
     event_id_to_key = {
         e: key
         for key, e in itertools.chain(
-            iteritems(timeline_contains),
-            iteritems(previous),
-            iteritems(timeline_start),
-            iteritems(current),
+            timeline_contains.items(),
+            previous.items(),
+            timeline_start.items(),
+            current.items(),
         )
     }
 
-    c_ids = set(itervalues(current))
-    ts_ids = set(itervalues(timeline_start))
-    p_ids = set(itervalues(previous))
-    tc_ids = set(itervalues(timeline_contains))
+    c_ids = set(current.values())
+    ts_ids = set(timeline_start.values())
+    p_ids = set(previous.values())
+    tc_ids = set(timeline_contains.values())
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -2017,7 +2015,7 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member
+            e for t, e in timeline_start.items() if t[0] == EventTypes.Member
         )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..6c7abaa578 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,9 +15,7 @@
 
 import logging
 from collections import namedtuple
-from typing import List
-
-from twisted.internet import defer
+from typing import List, Tuple
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.logging.context import run_in_background
@@ -115,8 +113,7 @@ class TypingHandler(object):
     def is_typing(self, member):
         return member.user_id in self._room_typing.get(member.room_id, [])
 
-    @defer.inlineCallbacks
-    def started_typing(self, target_user, auth_user, room_id, timeout):
+    async def started_typing(self, target_user, auth_user, room_id, timeout):
         target_user_id = target_user.to_string()
         auth_user_id = auth_user.to_string()
 
@@ -126,7 +123,7 @@ class TypingHandler(object):
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, target_user_id)
 
         logger.debug("%s has started typing in %s", target_user_id, room_id)
 
@@ -145,8 +142,7 @@ class TypingHandler(object):
 
         self._push_update(member=member, typing=True)
 
-    @defer.inlineCallbacks
-    def stopped_typing(self, target_user, auth_user, room_id):
+    async def stopped_typing(self, target_user, auth_user, room_id):
         target_user_id = target_user.to_string()
         auth_user_id = auth_user.to_string()
 
@@ -156,7 +152,7 @@ class TypingHandler(object):
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, target_user_id)
 
         logger.debug("%s has stopped typing in %s", target_user_id, room_id)
 
@@ -164,12 +160,11 @@ class TypingHandler(object):
 
         self._stopped_typing(member)
 
-    @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
         user_id = user.to_string()
         if self.is_mine_id(user_id):
             member = RoomMember(room_id=room_id, user_id=user_id)
-            yield self._stopped_typing(member)
+            self._stopped_typing(member)
 
     def _stopped_typing(self, member):
         if member.user_id not in self._room_typing.get(member.room_id, set()):
@@ -188,10 +183,9 @@ class TypingHandler(object):
 
         self._push_update_local(member=member, typing=typing)
 
-    @defer.inlineCallbacks
-    def _push_remote(self, member, typing):
+    async def _push_remote(self, member, typing):
         try:
-            users = yield self.state.get_current_users_in_room(member.room_id)
+            users = await self.state.get_current_users_in_room(member.room_id)
             self._member_last_federation_poke[member] = self.clock.time_msec()
 
             now = self.clock.time_msec()
@@ -215,8 +209,7 @@ class TypingHandler(object):
         except Exception:
             logger.exception("Error pushing typing notif to remotes")
 
-    @defer.inlineCallbacks
-    def _recv_edu(self, origin, content):
+    async def _recv_edu(self, origin, content):
         room_id = content["room_id"]
         user_id = content["user_id"]
 
@@ -231,7 +224,7 @@ class TypingHandler(object):
             )
             return
 
-        users = yield self.state.get_current_users_in_room(room_id)
+        users = await self.state.get_current_users_in_room(room_id)
         domains = {get_domain_from_id(u) for u in users}
 
         if self.server_name in domains:
@@ -259,14 +252,31 @@ class TypingHandler(object):
         )
 
     async def get_all_typing_updates(
-        self, last_id: int, current_id: int, limit: int
-    ) -> List[dict]:
-        """Get up to `limit` typing updates between the given tokens, earliest
-        updates first.
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for typing replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
             last_id
@@ -280,9 +290,16 @@ class TypingHandler(object):
             serial = self._room_serials[room_id]
             if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
-                rows.append((serial, room_id, list(typing)))
+                rows.append((serial, [room_id, list(typing)]))
         rows.sort()
-        return rows[:limit]
+
+        limited = False
+        if len(rows) > limit:
+            rows = rows[:limit]
+            current_id = rows[-1][0]
+            limited = True
+
+        return rows, current_id, limited
 
     def get_current_token(self):
         return self._latest_room_serial
@@ -306,7 +323,7 @@ class TypingNotificationEventSource(object):
             "content": {"user_ids": list(typing)},
         }
 
-    def get_new_events(self, from_key, room_ids, **kwargs):
+    async def get_new_events(self, from_key, room_ids, **kwargs):
         with Measure(self.clock, "typing.get_new_events"):
             from_key = int(from_key)
             handler = self.get_typing_handler()
@@ -320,7 +337,7 @@ class TypingNotificationEventSource(object):
 
                 events.append(self._make_event_for(room_id))
 
-            return defer.succeed((events, handler._latest_room_serial))
+            return (events, handler._latest_room_serial)
 
     def get_current_key(self):
         return self.get_typing_handler()._latest_room_serial
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 12423b909a..521b6d620d 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import iteritems, iterkeys
-
 import synapse.metrics
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
@@ -289,7 +287,7 @@ class UserDirectoryHandler(StateDeltasHandler):
         users_with_profile = await self.state.get_current_users_in_room(room_id)
 
         # Remove every user from the sharing tables for that room.
-        for user_id in iterkeys(users_with_profile):
+        for user_id in users_with_profile.keys():
             await self.store.remove_user_who_share_room(user_id, room_id)
 
         # Then, re-add them to the tables.
@@ -298,7 +296,7 @@ class UserDirectoryHandler(StateDeltasHandler):
         # which when ran over an entire room, will result in the same values
         # being added multiple times. The batching upserts shouldn't make this
         # too bad, though.
-        for user_id, profile in iteritems(users_with_profile):
+        for user_id, profile in users_with_profile.items():
             await self._handle_new_user(room_id, user_id, profile)
 
     async def _handle_new_user(self, room_id, user_id, profile):
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 3cef747a4d..8743e9839d 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -15,11 +15,9 @@
 # limitations under the License.
 
 import logging
+import urllib
 from io import BytesIO
 
-from six import raise_from, text_type
-from six.moves import urllib
-
 import treq
 from canonicaljson import encode_canonical_json, json
 from netaddr import IPAddress
@@ -577,7 +575,7 @@ class SimpleHttpClient(object):
             # This can happen e.g. because the body is too large.
             raise
         except Exception as e:
-            raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
+            raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e
 
         return (
             length,
@@ -638,7 +636,7 @@ def encode_urlencode_args(args):
 
 
 def encode_urlencode_arg(arg):
-    if isinstance(arg, text_type):
+    if isinstance(arg, str):
         return arg.encode("utf-8")
     elif isinstance(arg, list):
         return [encode_urlencode_arg(i) for i in arg]
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index f5f917f5ae..c5fc746f2f 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -48,6 +48,9 @@ class MatrixFederationAgent(object):
         tls_client_options_factory (FederationPolicyForHTTPS|None):
             factory to use for fetching client tls options, or none to disable TLS.
 
+        user_agent (bytes):
+            The user agent header to use for federation requests.
+
         _srv_resolver (SrvResolver|None):
             SRVResolver impl to use for looking up SRV records. None to use a default
             implementation.
@@ -61,6 +64,7 @@ class MatrixFederationAgent(object):
         self,
         reactor,
         tls_client_options_factory,
+        user_agent,
         _srv_resolver=None,
         _well_known_resolver=None,
     ):
@@ -78,6 +82,7 @@ class MatrixFederationAgent(object):
             ),
             pool=self._pool,
         )
+        self.user_agent = user_agent
 
         if _well_known_resolver is None:
             _well_known_resolver = WellKnownResolver(
@@ -87,6 +92,7 @@ class MatrixFederationAgent(object):
                     pool=self._pool,
                     contextFactory=tls_client_options_factory,
                 ),
+                user_agent=self.user_agent,
             )
 
         self._well_known_resolver = _well_known_resolver
@@ -149,7 +155,7 @@ class MatrixFederationAgent(object):
             parsed_uri = urllib.parse.urlparse(uri)
 
         # We need to make sure the host header is set to the netloc of the
-        # server.
+        # server and that a user-agent is provided.
         if headers is None:
             headers = Headers()
         else:
@@ -157,6 +163,8 @@ class MatrixFederationAgent(object):
 
         if not headers.hasHeader(b"host"):
             headers.addRawHeader(b"host", parsed_uri.netloc)
+        if not headers.hasHeader(b"user-agent"):
+            headers.addRawHeader(b"user-agent", self.user_agent)
 
         res = yield make_deferred_yieldable(
             self._agent.request(method, uri, headers, bodyProducer)
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 7ddfad286d..89a3b041ce 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -23,6 +23,7 @@ import attr
 from twisted.internet import defer
 from twisted.web.client import RedirectAgent, readBody
 from twisted.web.http import stringToDatetime
+from twisted.web.http_headers import Headers
 
 from synapse.logging.context import make_deferred_yieldable
 from synapse.util import Clock
@@ -78,7 +79,12 @@ class WellKnownResolver(object):
     """
 
     def __init__(
-        self, reactor, agent, well_known_cache=None, had_well_known_cache=None
+        self,
+        reactor,
+        agent,
+        user_agent,
+        well_known_cache=None,
+        had_well_known_cache=None,
     ):
         self._reactor = reactor
         self._clock = Clock(reactor)
@@ -92,6 +98,7 @@ class WellKnownResolver(object):
         self._well_known_cache = well_known_cache
         self._had_valid_well_known_cache = had_well_known_cache
         self._well_known_agent = RedirectAgent(agent)
+        self.user_agent = user_agent
 
     @defer.inlineCallbacks
     def get_well_known(self, server_name):
@@ -227,6 +234,10 @@ class WellKnownResolver(object):
         uri = b"https://%s/.well-known/matrix/server" % (server_name,)
         uri_str = uri.decode("ascii")
 
+        headers = {
+            b"User-Agent": [self.user_agent],
+        }
+
         i = 0
         while True:
             i += 1
@@ -234,7 +245,9 @@ class WellKnownResolver(object):
             logger.info("Fetching %s", uri_str)
             try:
                 response = yield make_deferred_yieldable(
-                    self._well_known_agent.request(b"GET", uri)
+                    self._well_known_agent.request(
+                        b"GET", uri, headers=Headers(headers)
+                    )
                 )
                 body = yield make_deferred_yieldable(readBody(response))
 
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 2d47b9ea00..18f6a8fd29 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,11 +17,9 @@ import cgi
 import logging
 import random
 import sys
+import urllib
 from io import BytesIO
 
-from six import raise_from, string_types
-from six.moves import urllib
-
 import attr
 import treq
 from canonicaljson import encode_canonical_json
@@ -199,7 +197,14 @@ class MatrixFederationHttpClient(object):
 
         self.reactor = Reactor()
 
-        self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
+        user_agent = hs.version_string
+        if hs.config.user_agent_suffix:
+            user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix)
+        user_agent = user_agent.encode("ascii")
+
+        self.agent = MatrixFederationAgent(
+            self.reactor, tls_client_options_factory, user_agent
+        )
 
         # Use a BlacklistingAgentWrapper to prevent circumventing the IP
         # blacklist via IP literals in server names
@@ -432,10 +437,10 @@ class MatrixFederationHttpClient(object):
                     except TimeoutError as e:
                         raise RequestSendFailed(e, can_retry=True) from e
                     except DNSLookupError as e:
-                        raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
+                        raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
                     except Exception as e:
                         logger.info("Failed to send request: %s", e)
-                        raise_from(RequestSendFailed(e, can_retry=True), e)
+                        raise RequestSendFailed(e, can_retry=True) from e
 
                     incoming_responses_counter.labels(
                         request.method, response.code
@@ -487,7 +492,7 @@ class MatrixFederationHttpClient(object):
                         # Retry if the error is a 429 (Too Many Requests),
                         # otherwise just raise a standard HttpResponseException
                         if response.code == 429:
-                            raise_from(RequestSendFailed(e, can_retry=True), e)
+                            raise RequestSendFailed(e, can_retry=True) from e
                         else:
                             raise e
 
@@ -998,7 +1003,7 @@ def encode_query_args(args):
 
     encoded_args = {}
     for k, vs in args.items():
-        if isinstance(vs, string_types):
+        if isinstance(vs, str):
             vs = [vs]
         encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2331a2a4b0..d192de7923 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -16,10 +16,10 @@
 
 import collections
 import html
-import http.client
 import logging
 import types
 import urllib
+from http import HTTPStatus
 from io import BytesIO
 from typing import Awaitable, Callable, TypeVar, Union
 
@@ -188,7 +188,7 @@ def return_html_error(
                 exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
-        code = http.HTTPStatus.INTERNAL_SERVER_ERROR
+        code = HTTPStatus.INTERNAL_SERVER_ERROR
         msg = "Internal server error"
 
         logger.error(
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 167293c46d..cbc37eac6e 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -19,6 +19,7 @@ from typing import Optional
 from twisted.python.failure import Failure
 from twisted.web.server import Request, Site
 
+from synapse.config.server import ListenerConfig
 from synapse.http import redact_uri
 from synapse.http.request_metrics import RequestMetrics, requests_counter
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
@@ -350,7 +351,7 @@ class SynapseSite(Site):
         self,
         logger_name,
         site_tag,
-        config,
+        config: ListenerConfig,
         resource,
         server_version_string,
         *args,
@@ -360,7 +361,8 @@ class SynapseSite(Site):
 
         self.site_tag = site_tag
 
-        proxied = config.get("x_forwarded", False)
+        assert config.http_options is not None
+        proxied = config.http_options.x_forwarded
         self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
         self.access_logger = logging.getLogger(logger_name)
         self.server_version_string = server_version_string.encode("ascii")
diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py
index fbf570c756..d736ad5b9b 100644
--- a/synapse/logging/formatter.py
+++ b/synapse/logging/formatter.py
@@ -16,8 +16,7 @@
 
 import logging
 import traceback
-
-from six import StringIO
+from io import StringIO
 
 
 class LogFormatter(logging.Formatter):
diff --git a/synapse/logging/opentracing.py b/synapse/logging/opentracing.py
index 5dddf57008..73bef5e5ca 100644
--- a/synapse/logging/opentracing.py
+++ b/synapse/logging/opentracing.py
@@ -171,8 +171,9 @@ import logging
 import re
 import types
 from functools import wraps
-from typing import TYPE_CHECKING, Dict
+from typing import TYPE_CHECKING, Dict, Optional, Type
 
+import attr
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -232,6 +233,30 @@ except ImportError:
     LogContextScopeManager = None  # type: ignore
 
 
+try:
+    from rust_python_jaeger_reporter import Reporter
+
+    @attr.s(slots=True, frozen=True)
+    class _WrappedRustReporter:
+        """Wrap the reporter to ensure `report_span` never throws.
+        """
+
+        _reporter = attr.ib(type=Reporter, default=attr.Factory(Reporter))
+
+        def set_process(self, *args, **kwargs):
+            return self._reporter.set_process(*args, **kwargs)
+
+        def report_span(self, span):
+            try:
+                return self._reporter.report_span(span)
+            except Exception:
+                logger.exception("Failed to report span")
+
+    RustReporter = _WrappedRustReporter  # type: Optional[Type[_WrappedRustReporter]]
+except ImportError:
+    RustReporter = None
+
+
 logger = logging.getLogger(__name__)
 
 
@@ -320,11 +345,19 @@ def init_tracer(hs: "HomeServer"):
 
     set_homeserver_whitelist(hs.config.opentracer_whitelist)
 
-    JaegerConfig(
+    config = JaegerConfig(
         config=hs.config.jaeger_config,
         service_name="{} {}".format(hs.config.server_name, hs.get_instance_name()),
         scope_manager=LogContextScopeManager(hs.config),
-    ).initialize_tracer()
+    )
+
+    # If we have the rust jaeger reporter available let's use that.
+    if RustReporter:
+        logger.info("Using rust_python_jaeger_reporter library")
+        tracer = config.create_tracer(RustReporter(), config.sampler)
+        opentracing.set_global_tracer(tracer)
+    else:
+        config.initialize_tracer()
 
 
 # Whitelisting
diff --git a/synapse/metrics/__init__.py b/synapse/metrics/__init__.py
index 9cf31f96b3..6035672698 100644
--- a/synapse/metrics/__init__.py
+++ b/synapse/metrics/__init__.py
@@ -22,8 +22,6 @@ import threading
 import time
 from typing import Callable, Dict, Iterable, Optional, Tuple, Union
 
-import six
-
 import attr
 from prometheus_client import Counter, Gauge, Histogram
 from prometheus_client.core import (
@@ -83,7 +81,7 @@ class LaterGauge(object):
             return
 
         if isinstance(calls, dict):
-            for k, v in six.iteritems(calls):
+            for k, v in calls.items():
                 g.add_metric(k, v)
         else:
             g.add_metric([], calls)
@@ -194,7 +192,7 @@ class InFlightGauge(object):
             gauge = GaugeMetricFamily(
                 "_".join([self.name, name]), "", labels=self.labels
             )
-            for key, metrics in six.iteritems(metrics_by_key):
+            for key, metrics in metrics_by_key.items():
                 gauge.add_metric(key, getattr(metrics, name))
             yield gauge
 
@@ -465,6 +463,12 @@ event_processing_last_ts = Gauge("synapse_event_processing_last_ts", "", ["name"
 # finished being processed.
 event_processing_lag = Gauge("synapse_event_processing_lag", "", ["name"])
 
+event_processing_lag_by_event = Histogram(
+    "synapse_event_processing_lag_by_event",
+    "Time between an event being persisted and it being queued up to be sent to the relevant remote servers",
+    ["name"],
+)
+
 # Build info of the running server.
 build_info = Gauge(
     "synapse_build_info", "Build information", ["pythonversion", "version", "osversion"]
diff --git a/synapse/metrics/_exposition.py b/synapse/metrics/_exposition.py
index ab7f948ed4..4304c60d56 100644
--- a/synapse/metrics/_exposition.py
+++ b/synapse/metrics/_exposition.py
@@ -208,6 +208,7 @@ class MetricsHandler(BaseHTTPRequestHandler):
             raise
         self.send_response(200)
         self.send_header("Content-Type", CONTENT_TYPE_LATEST)
+        self.send_header("Content-Length", str(len(output)))
         self.end_headers()
         self.wfile.write(output)
 
@@ -261,4 +262,6 @@ class MetricsResource(Resource):
 
     def render_GET(self, request):
         request.setHeader(b"Content-Type", CONTENT_TYPE_LATEST.encode("ascii"))
-        return generate_latest(self.registry)
+        response = generate_latest(self.registry)
+        request.setHeader(b"Content-Length", str(len(response)))
+        return response
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index e75d964ac8..43ffe6faf0 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -17,8 +17,6 @@
 import logging
 from collections import namedtuple
 
-from six import iteritems, itervalues
-
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -130,7 +128,7 @@ class BulkPushRuleEvaluator(object):
                 event, prev_state_ids, for_verification=False
             )
             auth_events = yield self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
+            auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
 
         sender_level = get_user_power_level(event.sender, auth_events)
 
@@ -162,7 +160,7 @@ class BulkPushRuleEvaluator(object):
 
         condition_cache = {}
 
-        for uid, rules in iteritems(rules_by_user):
+        for uid, rules in rules_by_user.items():
             if event.sender == uid:
                 continue
 
@@ -395,7 +393,7 @@ class RulesForRoom(object):
         # If the event is a join event then it will be in current state evnts
         # map but not in the DB, so we have to explicitly insert it.
         if event.type == EventTypes.Member:
-            for event_id in itervalues(member_event_ids):
+            for event_id in member_event_ids.values():
                 if event_id == event.event_id:
                     members[event_id] = (event.state_key, event.membership)
 
@@ -404,7 +402,7 @@ class RulesForRoom(object):
 
         interested_in_user_ids = {
             user_id
-            for user_id, membership in itervalues(members)
+            for user_id, membership in members.values()
             if membership == Membership.JOIN
         }
 
@@ -415,7 +413,7 @@ class RulesForRoom(object):
         )
 
         user_ids = {
-            uid for uid, have_pusher in iteritems(if_users_with_pushers) if have_pusher
+            uid for uid, have_pusher in if_users_with_pushers.items() if have_pusher
         }
 
         logger.debug("With pushers: %r", user_ids)
@@ -436,7 +434,7 @@ class RulesForRoom(object):
         )
 
         ret_rules_by_user.update(
-            item for item in iteritems(rules_by_user) if item[0] is not None
+            item for item in rules_by_user.items() if item[0] is not None
         )
 
         self.update_cache(sequence, members, ret_rules_by_user, state_group)
diff --git a/synapse/push/httppusher.py b/synapse/push/httppusher.py
index eaaa7afc91..ed60dbc1bf 100644
--- a/synapse/push/httppusher.py
+++ b/synapse/push/httppusher.py
@@ -129,6 +129,8 @@ class HttpPusher(object):
 
     @defer.inlineCallbacks
     def _update_badge(self):
+        # XXX as per https://github.com/matrix-org/matrix-doc/issues/2627, this seems
+        # to be largely redundant. perhaps we can remove it.
         badge = yield push_tools.get_badge_count(self.hs.get_datastore(), self.user_id)
         yield self._send_badge(badge)
 
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index d57a66a697..dda560b2c2 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -17,12 +17,11 @@ import email.mime.multipart
 import email.utils
 import logging
 import time
+import urllib
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
 from typing import Iterable, List, TypeVar
 
-from six.moves import urllib
-
 import bleach
 import jinja2
 
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 11032491af..8e0d3a416d 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -18,8 +18,6 @@ import logging
 import re
 from typing import Pattern
 
-from six import string_types
-
 from synapse.events import EventBase
 from synapse.types import UserID
 from synapse.util.caches import register_cache
@@ -131,7 +129,7 @@ class PushRuleEvaluatorForEvent(object):
         # XXX: optimisation: cache our pattern regexps
         if condition["key"] == "content.body":
             body = self._event.content.get("body", None)
-            if not body:
+            if not body or not isinstance(body, str):
                 return False
 
             return _glob_matches(pattern, body, word_boundary=True)
@@ -147,7 +145,7 @@ class PushRuleEvaluatorForEvent(object):
             return False
 
         body = self._event.content.get("body", None)
-        if not body:
+        if not body or not isinstance(body, str):
             return False
 
         # Similar to _glob_matches, but do not treat display_name as a glob.
@@ -244,7 +242,7 @@ def _flatten_dict(d, prefix=[], result=None):
     if result is None:
         result = {}
     for key, value in d.items():
-        if isinstance(value, string_types):
+        if isinstance(value, str):
             result[".".join(prefix + [key])] = value.lower()
         elif hasattr(value, "items"):
             _flatten_dict(value, prefix=(prefix + [key]), result=result)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 88d203aa44..f6a5458681 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -215,11 +215,9 @@ class PusherPool:
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive
-            updated_receipts = yield self.store.get_all_updated_receipts(
+            users_affected = yield self.store.get_users_sent_receipts_between(
                 min_stream_id - 1, max_stream_id
             )
-            # This returns a tuple, user_id is at index 3
-            users_affected = {r[3] for r in updated_receipts}
 
             for u in users_affected:
                 if u in self.pushers:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 8b4312e5a3..b1cac901eb 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -66,11 +66,9 @@ REQUIREMENTS = [
     "pymacaroons>=0.13.0",
     "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
-    "six>=1.10",
     "prometheus_client>=0.0.18,<0.8.0",
-    # we use attr.s(slots), which arrived in 16.0.0
-    # Twisted 18.7.0 requires attrs>=17.4.0
-    "attrs>=17.4.0",
+    # we use attr.validators.deep_iterable, which arrived in 19.1.0
+    "attrs>=19.1.0",
     "netaddr>=0.7.18",
     "Jinja2>=2.9",
     "bleach>=1.4.3",
@@ -95,7 +93,12 @@ CONDITIONAL_REQUIREMENTS = {
     "oidc": ["authlib>=0.14.0"],
     "systemd": ["systemd-python>=231"],
     "url_preview": ["lxml>=3.5.0"],
-    "test": ["mock>=2.0", "parameterized"],
+    # Dependencies which are exclusively required by unit test code. This is
+    # NOT a list of all modules that are necessary to run the unit tests.
+    # Tests assume that all optional dependencies are installed.
+    #
+    # parameterized_class decorator was introduced in parameterized 0.7.0
+    "test": ["mock>=2.0", "parameterized>=0.7.0"],
     "sentry": ["sentry-sdk>=0.7.2"],
     "opentracing": ["jaeger-client>=4.0.0", "opentracing>=2.2.0"],
     "jwt": ["pyjwt>=1.6.4"],
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 793cef6c26..9caf1e80c1 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -16,12 +16,10 @@
 import abc
 import logging
 import re
+import urllib
 from inspect import signature
 from typing import Dict, List, Tuple
 
-from six import raise_from
-from six.moves import urllib
-
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -220,7 +218,7 @@ class ReplicationEndpoint(object):
                 # importantly, not stack traces everywhere)
                 raise e.to_synapse_error()
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to talk to master"), e)
+                raise SynapseError(502, "Failed to talk to master") from e
 
             return result
 
diff --git a/synapse/replication/tcp/commands.py b/synapse/replication/tcp/commands.py
index c04f622816..ea5937a20c 100644
--- a/synapse/replication/tcp/commands.py
+++ b/synapse/replication/tcp/commands.py
@@ -149,7 +149,7 @@ class RdataCommand(Command):
 
 
 class PositionCommand(Command):
-    """Sent by the server to tell the client the stream postition without
+    """Sent by the server to tell the client the stream position without
     needing to send an RDATA.
 
     Format::
@@ -188,7 +188,7 @@ class ErrorCommand(_SimpleCommand):
 
 
 class PingCommand(_SimpleCommand):
-    """Sent by either side as a keep alive. The data is arbitary (often timestamp)
+    """Sent by either side as a keep alive. The data is arbitrary (often timestamp)
     """
 
     NAME = "PING"
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index cbcf46f3ae..e6a2e2598b 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -112,8 +112,8 @@ class ReplicationCommandHandler:
             "replication_position", clock=self._clock
         )
 
-        # Map of stream to batched updates. See RdataCommand for info on how
-        # batching works.
+        # Map of stream name to batched updates. See RdataCommand for info on
+        # how batching works.
         self._pending_batches = {}  # type: Dict[str, List[Any]]
 
         # The factory used to create connections.
@@ -123,7 +123,8 @@ class ReplicationCommandHandler:
         # outgoing replication commands to.)
         self._connections = []  # type: List[AbstractConnection]
 
-        # For each connection, the incoming streams that are coming from that connection
+        # For each connection, the incoming stream names that are coming from
+        # that connection.
         self._streams_by_connection = {}  # type: Dict[AbstractConnection, Set[str]]
 
         LaterGauge(
@@ -310,7 +311,28 @@ class ReplicationCommandHandler:
                 # Check if this is the last of a batch of updates
                 rows = self._pending_batches.pop(stream_name, [])
                 rows.append(row)
-                await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
+
+                stream = self._streams.get(stream_name)
+                if not stream:
+                    logger.error("Got RDATA for unknown stream: %s", stream_name)
+                    return
+
+                # Find where we previously streamed up to.
+                current_token = stream.current_token(cmd.instance_name)
+
+                # Discard this data if this token is earlier than the current
+                # position. Note that streams can be reset (in which case you
+                # expect an earlier token), but that must be preceded by a
+                # POSITION command.
+                if cmd.token <= current_token:
+                    logger.debug(
+                        "Discarding RDATA from stream %s at position %s before previous position %s",
+                        stream_name,
+                        cmd.token,
+                        current_token,
+                    )
+                else:
+                    await self.on_rdata(stream_name, cmd.instance_name, cmd.token, rows)
 
     async def on_rdata(
         self, stream_name: str, instance_name: str, token: int, rows: list
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 4acefc8a96..f196eff072 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -264,7 +264,7 @@ class BackfillStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_current_backfill_token),
-            db_query_to_update_function(store.get_all_new_backfill_event_rows),
+            store.get_all_new_backfill_event_rows,
         )
 
 
@@ -291,9 +291,7 @@ class PresenceStream(Stream):
         if hs.config.worker_app is None:
             # on the master, query the presence handler
             presence_handler = hs.get_presence_handler()
-            update_function = db_query_to_update_function(
-                presence_handler.get_all_presence_updates
-            )
+            update_function = presence_handler.get_all_presence_updates
         else:
             # Query master process
             update_function = make_http_update_function(hs, self.NAME)
@@ -318,9 +316,7 @@ class TypingStream(Stream):
 
         if hs.config.worker_app is None:
             # on the master, query the typing handler
-            update_function = db_query_to_update_function(
-                typing_handler.get_all_typing_updates
-            )
+            update_function = typing_handler.get_all_typing_updates
         else:
             # Query master process
             update_function = make_http_update_function(hs, self.NAME)
@@ -352,7 +348,7 @@ class ReceiptsStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_max_receipt_stream_id),
-            db_query_to_update_function(store.get_all_updated_receipts),
+            store.get_all_updated_receipts,
         )
 
 
@@ -367,26 +363,17 @@ class PushRulesStream(Stream):
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
+
         super(PushRulesStream, self).__init__(
-            hs.get_instance_name(), self._current_token, self._update_function
+            hs.get_instance_name(),
+            self._current_token,
+            self.store.get_all_push_rule_updates,
         )
 
     def _current_token(self, instance_name: str) -> int:
         push_rules_token, _ = self.store.get_push_rules_stream_token()
         return push_rules_token
 
-    async def _update_function(
-        self, instance_name: str, from_token: Token, to_token: Token, limit: int
-    ):
-        rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
-
-        limited = False
-        if len(rows) == limit:
-            to_token = rows[-1][0]
-            limited = True
-
-        return [(row[0], (row[2],)) for row in rows], to_token, limited
-
 
 class PushersStream(Stream):
     """A user has added/changed/removed a pusher
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index 8173baef8f..e07c32118d 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -15,7 +15,7 @@
 import logging
 from typing import List, Optional
 
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, JoinRules, Membership, RoomCreationPreset
 from synapse.api.errors import Codes, NotFoundError, SynapseError
 from synapse.http.servlet import (
     RestServlet,
@@ -77,7 +77,7 @@ class ShutdownRoomRestServlet(RestServlet):
         info, stream_id = await self._room_creation_handler.create_room(
             room_creator_requester,
             config={
-                "preset": "public_chat",
+                "preset": RoomCreationPreset.PUBLIC_CHAT,
                 "name": room_name,
                 "power_level_content_override": {"users_default": -10},
             },
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index fefc8f71fa..e4330c39d6 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -16,9 +16,7 @@ import hashlib
 import hmac
 import logging
 import re
-
-from six import text_type
-from six.moves import http_client
+from http import HTTPStatus
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, NotFoundError, SynapseError
@@ -215,10 +213,7 @@ class UserRestServletV2(RestServlet):
                     await self.store.set_server_admin(target_user, set_admin_to)
 
             if "password" in body:
-                if (
-                    not isinstance(body["password"], text_type)
-                    or len(body["password"]) > 512
-                ):
+                if not isinstance(body["password"], str) or len(body["password"]) > 512:
                     raise SynapseError(400, "Invalid password")
                 else:
                     new_password = body["password"]
@@ -252,7 +247,7 @@ class UserRestServletV2(RestServlet):
             password = body.get("password")
             password_hash = None
             if password is not None:
-                if not isinstance(password, text_type) or len(password) > 512:
+                if not isinstance(password, str) or len(password) > 512:
                     raise SynapseError(400, "Invalid password")
                 password_hash = await self.auth_handler.hash(password)
 
@@ -370,10 +365,7 @@ class UserRegisterServlet(RestServlet):
                 400, "username must be specified", errcode=Codes.BAD_JSON
             )
         else:
-            if (
-                not isinstance(body["username"], text_type)
-                or len(body["username"]) > 512
-            ):
+            if not isinstance(body["username"], str) or len(body["username"]) > 512:
                 raise SynapseError(400, "Invalid username")
 
             username = body["username"].encode("utf-8")
@@ -386,7 +378,7 @@ class UserRegisterServlet(RestServlet):
             )
         else:
             password = body["password"]
-            if not isinstance(password, text_type) or len(password) > 512:
+            if not isinstance(password, str) or len(password) > 512:
                 raise SynapseError(400, "Invalid password")
 
             password_bytes = password.encode("utf-8")
@@ -477,7 +469,7 @@ class DeactivateAccountRestServlet(RestServlet):
         erase = body.get("erase", False)
         if not isinstance(erase, bool):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'erase' must be a boolean, if given",
                 Codes.BAD_JSON,
             )
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index dceb2792fa..bf0f9bd077 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -60,10 +60,18 @@ def login_id_thirdparty_from_phone(identifier):
 
     Returns: Login identifier dict of type 'm.id.threepid'
     """
-    if "country" not in identifier or "number" not in identifier:
+    if "country" not in identifier or (
+        # The specification requires a "phone" field, while Synapse used to require a "number"
+        # field. Accept both for backwards compatibility.
+        "phone" not in identifier
+        and "number" not in identifier
+    ):
         raise SynapseError(400, "Invalid phone-type identifier")
 
-    msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"])
+    # Accept both "phone" and "number" as valid keys in m.id.phone
+    phone_number = identifier.get("phone", identifier["number"])
+
+    msisdn = phone_number_to_msisdn(identifier["country"], phone_number)
 
     return {"type": "m.id.thirdparty", "medium": "msisdn", "address": msisdn}
 
@@ -73,7 +81,8 @@ class LoginRestServlet(RestServlet):
     CAS_TYPE = "m.login.cas"
     SSO_TYPE = "m.login.sso"
     TOKEN_TYPE = "m.login.token"
-    JWT_TYPE = "m.login.jwt"
+    JWT_TYPE = "org.matrix.login.jwt"
+    JWT_TYPE_DEPRECATED = "m.login.jwt"
 
     def __init__(self, hs):
         super(LoginRestServlet, self).__init__()
@@ -108,6 +117,7 @@ class LoginRestServlet(RestServlet):
         flows = []
         if self.jwt_enabled:
             flows.append({"type": LoginRestServlet.JWT_TYPE})
+            flows.append({"type": LoginRestServlet.JWT_TYPE_DEPRECATED})
 
         if self.cas_enabled:
             # we advertise CAS for backwards compat, though MSC1721 renamed it
@@ -141,6 +151,7 @@ class LoginRestServlet(RestServlet):
         try:
             if self.jwt_enabled and (
                 login_submission["type"] == LoginRestServlet.JWT_TYPE
+                or login_submission["type"] == LoginRestServlet.JWT_TYPE_DEPRECATED
             ):
                 result = await self.do_jwt_login(login_submission)
             elif login_submission["type"] == LoginRestServlet.TOKEN_TYPE:
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index eec16f8ad8..970fdd5834 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -17,8 +17,6 @@
 """
 import logging
 
-from six import string_types
-
 from synapse.api.errors import AuthError, SynapseError
 from synapse.handlers.presence import format_user_presence_state
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
@@ -51,7 +49,9 @@ class PresenceStatusRestServlet(RestServlet):
                 raise AuthError(403, "You are not allowed to see their presence.")
 
         state = await self.presence_handler.get_state(target_user=user)
-        state = format_user_presence_state(state, self.clock.time_msec())
+        state = format_user_presence_state(
+            state, self.clock.time_msec(), include_user_id=False
+        )
 
         return 200, state
 
@@ -71,7 +71,7 @@ class PresenceStatusRestServlet(RestServlet):
 
             if "status_msg" in content:
                 state["status_msg"] = content.pop("status_msg")
-                if not isinstance(state["status_msg"], string_types):
+                if not isinstance(state["status_msg"], str):
                     raise SynapseError(400, "status_msg must be a string.")
 
             if content:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 105e0cf4d2..46811abbfa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -18,8 +18,7 @@
 import logging
 import re
 from typing import List, Optional
-
-from six.moves.urllib import parse as urlparse
+from urllib import parse as urlparse
 
 from canonicaljson import json
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index b58a77826f..182a308eef 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -15,8 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-
-from six.moves import http_client
+from http import HTTPStatus
 
 from synapse.api.constants import LoginType
 from synapse.api.errors import Codes, SynapseError, ThreepidValidationError
@@ -320,7 +319,7 @@ class DeactivateAccountRestServlet(RestServlet):
         erase = body.get("erase", False)
         if not isinstance(erase, bool):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'erase' must be a boolean, if given",
                 Codes.BAD_JSON,
             )
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index c8d2de7b54..56a451c42f 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -18,8 +18,6 @@ import hmac
 import logging
 from typing import List, Union
 
-from six import string_types
-
 import synapse
 import synapse.api.auth
 import synapse.types
@@ -411,7 +409,7 @@ class RegisterRestServlet(RestServlet):
         # in sessions. Pull out the username/password provided to us.
         if "password" in body:
             password = body.pop("password")
-            if not isinstance(password, string_types) or len(password) > 512:
+            if not isinstance(password, str) or len(password) > 512:
                 raise SynapseError(400, "Invalid password")
             self.password_policy_handler.validate_password(password)
 
@@ -423,10 +421,7 @@ class RegisterRestServlet(RestServlet):
 
         desired_username = None
         if "username" in body:
-            if (
-                not isinstance(body["username"], string_types)
-                or len(body["username"]) > 512
-            ):
+            if not isinstance(body["username"], str) or len(body["username"]) > 512:
                 raise SynapseError(400, "Invalid username")
             desired_username = body["username"]
 
@@ -451,7 +446,7 @@ class RegisterRestServlet(RestServlet):
 
             access_token = self.auth.get_access_token_from_request(request)
 
-            if isinstance(desired_username, string_types):
+            if isinstance(desired_username, str):
                 result = await self._do_appservice_registration(
                     desired_username, access_token, body
                 )
diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py
index f067b5edac..e15927c4ea 100644
--- a/synapse/rest/client/v2_alpha/report_event.py
+++ b/synapse/rest/client/v2_alpha/report_event.py
@@ -14,9 +14,7 @@
 # limitations under the License.
 
 import logging
-
-from six import string_types
-from six.moves import http_client
+from http import HTTPStatus
 
 from synapse.api.errors import Codes, SynapseError
 from synapse.http.servlet import (
@@ -47,15 +45,15 @@ class ReportEventRestServlet(RestServlet):
         body = parse_json_object_from_request(request)
         assert_params_in_dict(body, ("reason", "score"))
 
-        if not isinstance(body["reason"], string_types):
+        if not isinstance(body["reason"], str):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'reason' must be a string",
                 Codes.BAD_JSON,
             )
         if not isinstance(body["score"], int):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'score' must be an integer",
                 Codes.BAD_JSON,
             )
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index 4a20282d1b..0a890c98cb 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -16,10 +16,9 @@
 import hmac
 import logging
 from hashlib import sha256
+from http import HTTPStatus
 from os import path
 
-from six.moves import http_client
-
 import jinja2
 from jinja2 import TemplateNotFound
 
@@ -219,4 +218,4 @@ class ConsentResource(DirectServeResource):
         )
 
         if not compare_digest(want_mac, userhmac):
-            raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect")
+            raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect")
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 3689777266..595849f9d5 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -16,8 +16,7 @@
 
 import logging
 import os
-
-from six.moves import urllib
+import urllib
 
 from twisted.internet import defer
 from twisted.protocols.basic import FileSender
diff --git a/synapse/rest/media/v1/media_repository.py b/synapse/rest/media/v1/media_repository.py
index fd10d42f2f..45628c07b4 100644
--- a/synapse/rest/media/v1/media_repository.py
+++ b/synapse/rest/media/v1/media_repository.py
@@ -20,8 +20,6 @@ import os
 import shutil
 from typing import Dict, Tuple
 
-from six import iteritems
-
 import twisted.internet.error
 import twisted.web.http
 from twisted.web.resource import Resource
@@ -340,7 +338,7 @@ class MediaRepository(object):
 
         with self.media_storage.store_into_file(file_info) as (f, fname, finish):
             request_path = "/".join(
-                ("/_matrix/media/v1/download", server_name, media_id)
+                ("/_matrix/media/r0/download", server_name, media_id)
             )
             try:
                 length, headers = await self.client.get_file(
@@ -606,7 +604,7 @@ class MediaRepository(object):
                 thumbnails[(t_width, t_height, r_type)] = r_method
 
         # Now we generate the thumbnails for each dimension, store it
-        for (t_width, t_height, t_type), t_method in iteritems(thumbnails):
+        for (t_width, t_height, t_type), t_method in thumbnails.items():
             # Generate the thumbnail
             if t_method == "crop":
                 t_byte_source = await defer_to_thread(
@@ -705,7 +703,7 @@ class MediaRepositoryResource(Resource):
     Uploads are POSTed to a resource which returns a token which is used to GET
     the download::
 
-        => POST /_matrix/media/v1/upload HTTP/1.1
+        => POST /_matrix/media/r0/upload HTTP/1.1
            Content-Type: <media-type>
            Content-Length: <content-length>
 
@@ -716,7 +714,7 @@ class MediaRepositoryResource(Resource):
 
            { "content_uri": "mxc://<server-name>/<media-id>" }
 
-        => GET /_matrix/media/v1/download/<server-name>/<media-id> HTTP/1.1
+        => GET /_matrix/media/r0/download/<server-name>/<media-id> HTTP/1.1
 
         <= HTTP/1.1 200 OK
            Content-Type: <media-type>
@@ -727,7 +725,7 @@ class MediaRepositoryResource(Resource):
     Clients can get thumbnails by supplying a desired width and height and
     thumbnailing method::
 
-        => GET /_matrix/media/v1/thumbnail/<server_name>
+        => GET /_matrix/media/r0/thumbnail/<server_name>
                 /<media-id>?width=<w>&height=<h>&method=<m> HTTP/1.1
 
         <= HTTP/1.1 200 OK
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 683a79c966..79cb0dddbe 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -17,9 +17,6 @@ import contextlib
 import logging
 import os
 import shutil
-import sys
-
-import six
 
 from twisted.internet import defer
 from twisted.protocols.basic import FileSender
@@ -117,12 +114,11 @@ class MediaStorage(object):
             with open(fname, "wb") as f:
                 yield f, fname, finish
         except Exception:
-            t, v, tb = sys.exc_info()
             try:
                 os.remove(fname)
             except Exception:
                 pass
-            six.reraise(t, v, tb)
+            raise
 
         if not finished_called:
             raise Exception("Finished callback not called")
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index f206605727..b4645cd608 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -24,10 +24,7 @@ import shutil
 import sys
 import traceback
 from typing import Dict, Optional
-
-import six
-from six import string_types
-from six.moves import urllib_parse as urlparse
+from urllib import parse as urlparse
 
 from canonicaljson import json
 
@@ -85,6 +82,15 @@ class PreviewUrlResource(DirectServeResource):
         self.primary_base_path = media_repo.primary_base_path
         self.media_storage = media_storage
 
+        # We run the background jobs if we're the instance specified (or no
+        # instance is specified, where we assume there is only one instance
+        # serving media).
+        instance_running_jobs = hs.config.media.media_instance_running_background_jobs
+        self._worker_run_media_background_jobs = (
+            instance_running_jobs is None
+            or instance_running_jobs == hs.get_instance_name()
+        )
+
         self.url_preview_url_blacklist = hs.config.url_preview_url_blacklist
         self.url_preview_accept_language = hs.config.url_preview_accept_language
 
@@ -97,9 +103,10 @@ class PreviewUrlResource(DirectServeResource):
             expiry_ms=60 * 60 * 1000,
         )
 
-        self._cleaner_loop = self.clock.looping_call(
-            self._start_expire_url_cache_data, 10 * 1000
-        )
+        if self._worker_run_media_background_jobs:
+            self._cleaner_loop = self.clock.looping_call(
+                self._start_expire_url_cache_data, 10 * 1000
+            )
 
     def render_OPTIONS(self, request):
         request.setHeader(b"Allow", b"OPTIONS, GET")
@@ -188,7 +195,7 @@ class PreviewUrlResource(DirectServeResource):
             # It may be stored as text in the database, not as bytes (such as
             # PostgreSQL). If so, encode it back before handing it on.
             og = cache_result["og"]
-            if isinstance(og, six.text_type):
+            if isinstance(og, str):
                 og = og.encode("utf8")
             return og
 
@@ -400,6 +407,8 @@ class PreviewUrlResource(DirectServeResource):
         """
         # TODO: Delete from backup media store
 
+        assert self._worker_run_media_background_jobs
+
         now = self.clock.time_msec()
 
         logger.debug("Running url preview cache expiry")
@@ -631,7 +640,7 @@ def _iterate_over_text(tree, *tags_to_ignore):
         if el is None:
             return
 
-        if isinstance(el, string_types):
+        if isinstance(el, str):
             yield el
         elif el.tag not in tags_to_ignore:
             # el.text is the text before the first child, so we can immediately
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index 3bf330da49..3bfc8d7278 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 import logging
 
-from six import iteritems, string_types
-
 from synapse.api.errors import SynapseError
 from synapse.api.urls import ConsentURIBuilder
 from synapse.config import ConfigError
@@ -118,10 +116,10 @@ def copy_with_str_subst(x, substitutions):
     Returns:
         copy of x
     """
-    if isinstance(x, string_types):
+    if isinstance(x, str):
         return x % substitutions
     if isinstance(x, dict):
-        return {k: copy_with_str_subst(v, substitutions) for (k, v) in iteritems(x)}
+        return {k: copy_with_str_subst(v, substitutions) for (k, v) in x.items()}
     if isinstance(x, (list, tuple)):
         return [copy_with_str_subst(y) for y in x]
 
diff --git a/synapse/server_notices/resource_limits_server_notices.py b/synapse/server_notices/resource_limits_server_notices.py
index 73f2cedb5c..4404ceff93 100644
--- a/synapse/server_notices/resource_limits_server_notices.py
+++ b/synapse/server_notices/resource_limits_server_notices.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 import logging
 
-from six import iteritems
-
 from synapse.api.constants import (
     EventTypes,
     LimitBlockingTypes,
@@ -214,7 +212,7 @@ class ResourceLimitsServerNotices(object):
             referenced_events = list(pinned_state_event.content.get("pinned", []))
 
         events = await self._store.get_events(referenced_events)
-        for event_id, event in iteritems(events):
+        for event_id, event in events.items():
             if event.type != EventTypes.Message:
                 continue
             if event.content.get("msgtype") == ServerNoticeMsgType:
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 2fa529fcd0..495d9f04c8 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -18,8 +18,6 @@ import logging
 from collections import namedtuple
 from typing import Dict, Iterable, List, Optional, Set
 
-from six import iteritems, itervalues
-
 import attr
 from frozendict import frozendict
 from prometheus_client import Histogram
@@ -34,6 +32,7 @@ from synapse.logging.utils import log_function
 from synapse.state import v1, v2
 from synapse.storage.data_stores.main.events_worker import EventRedactBehaviour
 from synapse.types import StateMap
+from synapse.util import Clock
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import Measure, measure_func
@@ -144,7 +143,7 @@ class StateHandler(object):
             list(state.values()), get_prev_content=False
         )
         state = {
-            key: state_map[e_id] for key, e_id in iteritems(state) if e_id in state_map
+            key: state_map[e_id] for key, e_id in state.items() if e_id in state_map
         }
 
         return state
@@ -416,6 +415,7 @@ class StateHandler(object):
 
         with Measure(self.clock, "state._resolve_events"):
             new_state = yield resolve_events_with_store(
+                self.clock,
                 event.room_id,
                 room_version,
                 state_set_ids,
@@ -423,7 +423,7 @@ class StateHandler(object):
                 state_res_store=StateResolutionStore(self.store),
             )
 
-        new_state = {key: state_map[ev_id] for key, ev_id in iteritems(new_state)}
+        new_state = {key: state_map[ev_id] for key, ev_id in new_state.items()}
 
         return new_state
 
@@ -505,8 +505,8 @@ class StateResolutionHandler(object):
             # resolve_events_with_store do it?
             new_state = {}
             conflicted_state = False
-            for st in itervalues(state_groups_ids):
-                for key, e_id in iteritems(st):
+            for st in state_groups_ids.values():
+                for key, e_id in st.items():
                     if key in new_state:
                         conflicted_state = True
                         break
@@ -518,9 +518,10 @@ class StateResolutionHandler(object):
                 logger.info("Resolving conflicted state for %r", room_id)
                 with Measure(self.clock, "state._resolve_events"):
                     new_state = yield resolve_events_with_store(
+                        self.clock,
                         room_id,
                         room_version,
-                        list(itervalues(state_groups_ids)),
+                        list(state_groups_ids.values()),
                         event_map=event_map,
                         state_res_store=state_res_store,
                     )
@@ -561,12 +562,12 @@ def _make_state_cache_entry(new_state, state_groups_ids):
     # not get persisted.
 
     # first look for exact matches
-    new_state_event_ids = set(itervalues(new_state))
-    for sg, state in iteritems(state_groups_ids):
+    new_state_event_ids = set(new_state.values())
+    for sg, state in state_groups_ids.items():
         if len(new_state_event_ids) != len(state):
             continue
 
-        old_state_event_ids = set(itervalues(state))
+        old_state_event_ids = set(state.values())
         if new_state_event_ids == old_state_event_ids:
             # got an exact match.
             return _StateCacheEntry(state=new_state, state_group=sg)
@@ -579,8 +580,8 @@ def _make_state_cache_entry(new_state, state_groups_ids):
     prev_group = None
     delta_ids = None
 
-    for old_group, old_state in iteritems(state_groups_ids):
-        n_delta_ids = {k: v for k, v in iteritems(new_state) if old_state.get(k) != v}
+    for old_group, old_state in state_groups_ids.items():
+        n_delta_ids = {k: v for k, v in new_state.items() if old_state.get(k) != v}
         if not delta_ids or len(n_delta_ids) < len(delta_ids):
             prev_group = old_group
             delta_ids = n_delta_ids
@@ -591,6 +592,7 @@ def _make_state_cache_entry(new_state, state_groups_ids):
 
 
 def resolve_events_with_store(
+    clock: Clock,
     room_id: str,
     room_version: str,
     state_sets: List[StateMap[str]],
@@ -627,7 +629,7 @@ def resolve_events_with_store(
         )
     else:
         return v2.resolve_events_with_store(
-            room_id, room_version, state_sets, event_map, state_res_store
+            clock, room_id, room_version, state_sets, event_map, state_res_store
         )
 
 
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index 9bf98d06f2..7b531a8337 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -17,8 +17,6 @@ import hashlib
 import logging
 from typing import Callable, Dict, List, Optional
 
-from six import iteritems, iterkeys, itervalues
-
 from twisted.internet import defer
 
 from synapse import event_auth
@@ -70,11 +68,11 @@ def resolve_events_with_store(
     unconflicted_state, conflicted_state = _seperate(state_sets)
 
     needed_events = {
-        event_id for event_ids in itervalues(conflicted_state) for event_id in event_ids
+        event_id for event_ids in conflicted_state.values() for event_id in event_ids
     }
     needed_event_count = len(needed_events)
     if event_map is not None:
-        needed_events -= set(iterkeys(event_map))
+        needed_events -= set(event_map.keys())
 
     logger.info(
         "Asking for %d/%d conflicted events", len(needed_events), needed_event_count
@@ -102,11 +100,11 @@ def resolve_events_with_store(
         unconflicted_state, conflicted_state, state_map
     )
 
-    new_needed_events = set(itervalues(auth_events))
+    new_needed_events = set(auth_events.values())
     new_needed_event_count = len(new_needed_events)
     new_needed_events -= needed_events
     if event_map is not None:
-        new_needed_events -= set(iterkeys(event_map))
+        new_needed_events -= set(event_map.keys())
 
     logger.info(
         "Asking for %d/%d auth events", len(new_needed_events), new_needed_event_count
@@ -152,7 +150,7 @@ def _seperate(state_sets):
     conflicted_state = {}
 
     for state_set in state_set_iterator:
-        for key, value in iteritems(state_set):
+        for key, value in state_set.items():
             # Check if there is an unconflicted entry for the state key.
             unconflicted_value = unconflicted_state.get(key)
             if unconflicted_value is None:
@@ -178,7 +176,7 @@ def _seperate(state_sets):
 
 def _create_auth_events_from_maps(unconflicted_state, conflicted_state, state_map):
     auth_events = {}
-    for event_ids in itervalues(conflicted_state):
+    for event_ids in conflicted_state.values():
         for event_id in event_ids:
             if event_id in state_map:
                 keys = event_auth.auth_types_for_event(state_map[event_id])
@@ -194,7 +192,7 @@ def _resolve_with_state(
     unconflicted_state_ids, conflicted_state_ids, auth_event_ids, state_map
 ):
     conflicted_state = {}
-    for key, event_ids in iteritems(conflicted_state_ids):
+    for key, event_ids in conflicted_state_ids.items():
         events = [state_map[ev_id] for ev_id in event_ids if ev_id in state_map]
         if len(events) > 1:
             conflicted_state[key] = events
@@ -203,7 +201,7 @@ def _resolve_with_state(
 
     auth_events = {
         key: state_map[ev_id]
-        for key, ev_id in iteritems(auth_event_ids)
+        for key, ev_id in auth_event_ids.items()
         if ev_id in state_map
     }
 
@@ -214,7 +212,7 @@ def _resolve_with_state(
         raise
 
     new_state = unconflicted_state_ids
-    for key, event in iteritems(resolved_state):
+    for key, event in resolved_state.items():
         new_state[key] = event.event_id
 
     return new_state
@@ -238,21 +236,21 @@ def _resolve_state_events(conflicted_state, auth_events):
 
     auth_events.update(resolved_state)
 
-    for key, events in iteritems(conflicted_state):
+    for key, events in conflicted_state.items():
         if key[0] == EventTypes.JoinRules:
             logger.debug("Resolving conflicted join rules %r", events)
             resolved_state[key] = _resolve_auth_events(events, auth_events)
 
     auth_events.update(resolved_state)
 
-    for key, events in iteritems(conflicted_state):
+    for key, events in conflicted_state.items():
         if key[0] == EventTypes.Member:
             logger.debug("Resolving conflicted member lists %r", events)
             resolved_state[key] = _resolve_auth_events(events, auth_events)
 
     auth_events.update(resolved_state)
 
-    for key, events in iteritems(conflicted_state):
+    for key, events in conflicted_state.items():
         if key not in resolved_state:
             logger.debug("Resolving conflicted state %r:%r", key, events)
             resolved_state[key] = _resolve_normal_events(events, auth_events)
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 18484e2fa6..bf6caa0946 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -18,8 +18,6 @@ import itertools
 import logging
 from typing import Dict, List, Optional
 
-from six import iteritems, itervalues
-
 from twisted.internet import defer
 
 import synapse.state
@@ -29,12 +27,20 @@ from synapse.api.errors import AuthError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
 from synapse.types import StateMap
+from synapse.util import Clock
 
 logger = logging.getLogger(__name__)
 
 
+# We want to yield to the reactor occasionally during state res when dealing
+# with large data sets, so that we don't exhaust the reactor. This is done by
+# yielding to reactor during loops every N iterations.
+_YIELD_AFTER_ITERATIONS = 100
+
+
 @defer.inlineCallbacks
 def resolve_events_with_store(
+    clock: Clock,
     room_id: str,
     room_version: str,
     state_sets: List[StateMap[str]],
@@ -44,13 +50,11 @@ def resolve_events_with_store(
     """Resolves the state using the v2 state resolution algorithm
 
     Args:
+        clock
         room_id: the room we are working in
-
         room_version: The room version
-
         state_sets: List of dicts of (type, state_key) -> event_id,
             which are the different state groups to resolve.
-
         event_map:
             a dict from event_id to event, for any events that we happen to
             have in flight (eg, those currently being persisted). This will be
@@ -87,7 +91,7 @@ def resolve_events_with_store(
 
     full_conflicted_set = set(
         itertools.chain(
-            itertools.chain.from_iterable(itervalues(conflicted_state)), auth_diff
+            itertools.chain.from_iterable(conflicted_state.values()), auth_diff
         )
     )
 
@@ -115,13 +119,14 @@ def resolve_events_with_store(
     )
 
     sorted_power_events = yield _reverse_topological_power_sort(
-        room_id, power_events, event_map, state_res_store, full_conflicted_set
+        clock, room_id, power_events, event_map, state_res_store, full_conflicted_set
     )
 
     logger.debug("sorted %d power events", len(sorted_power_events))
 
     # Now sequentially auth each one
     resolved_state = yield _iterative_auth_checks(
+        clock,
         room_id,
         room_version,
         sorted_power_events,
@@ -135,20 +140,22 @@ def resolve_events_with_store(
     # OK, so we've now resolved the power events. Now sort the remaining
     # events using the mainline of the resolved power level.
 
+    set_power_events = set(sorted_power_events)
     leftover_events = [
-        ev_id for ev_id in full_conflicted_set if ev_id not in sorted_power_events
+        ev_id for ev_id in full_conflicted_set if ev_id not in set_power_events
     ]
 
     logger.debug("sorting %d remaining events", len(leftover_events))
 
     pl = resolved_state.get((EventTypes.PowerLevels, ""), None)
     leftover_events = yield _mainline_sort(
-        room_id, leftover_events, pl, event_map, state_res_store
+        clock, room_id, leftover_events, pl, event_map, state_res_store
     )
 
     logger.debug("resolving remaining events")
 
     resolved_state = yield _iterative_auth_checks(
+        clock,
         room_id,
         room_version,
         leftover_events,
@@ -318,12 +325,13 @@ def _add_event_and_auth_chain_to_graph(
 
 @defer.inlineCallbacks
 def _reverse_topological_power_sort(
-    room_id, event_ids, event_map, state_res_store, auth_diff
+    clock, room_id, event_ids, event_map, state_res_store, auth_diff
 ):
     """Returns a list of the event_ids sorted by reverse topological ordering,
     and then by power level and origin_server_ts
 
     Args:
+        clock (Clock)
         room_id (str): the room we are working in
         event_ids (list[str]): The events to sort
         event_map (dict[str,FrozenEvent])
@@ -335,18 +343,28 @@ def _reverse_topological_power_sort(
     """
 
     graph = {}
-    for event_id in event_ids:
+    for idx, event_id in enumerate(event_ids, start=1):
         yield _add_event_and_auth_chain_to_graph(
             graph, room_id, event_id, event_map, state_res_store, auth_diff
         )
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     event_to_pl = {}
-    for event_id in graph:
+    for idx, event_id in enumerate(graph, start=1):
         pl = yield _get_power_level_for_sender(
             room_id, event_id, event_map, state_res_store
         )
         event_to_pl[event_id] = pl
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     def _get_power_order(event_id):
         ev = event_map[event_id]
         pl = event_to_pl[event_id]
@@ -362,12 +380,13 @@ def _reverse_topological_power_sort(
 
 @defer.inlineCallbacks
 def _iterative_auth_checks(
-    room_id, room_version, event_ids, base_state, event_map, state_res_store
+    clock, room_id, room_version, event_ids, base_state, event_map, state_res_store
 ):
     """Sequentially apply auth checks to each event in given list, updating the
     state as it goes along.
 
     Args:
+        clock (Clock)
         room_id (str)
         room_version (str)
         event_ids (list[str]): Ordered list of events to apply auth checks to
@@ -381,7 +400,7 @@ def _iterative_auth_checks(
     resolved_state = base_state.copy()
     room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
-    for event_id in event_ids:
+    for idx, event_id in enumerate(event_ids, start=1):
         event = event_map[event_id]
 
         auth_events = {}
@@ -419,17 +438,23 @@ def _iterative_auth_checks(
         except AuthError:
             pass
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     return resolved_state
 
 
 @defer.inlineCallbacks
 def _mainline_sort(
-    room_id, event_ids, resolved_power_event_id, event_map, state_res_store
+    clock, room_id, event_ids, resolved_power_event_id, event_map, state_res_store
 ):
     """Returns a sorted list of event_ids sorted by mainline ordering based on
     the given event resolved_power_event_id
 
     Args:
+        clock (Clock)
         room_id (str): room we're working in
         event_ids (list[str]): Events to sort
         resolved_power_event_id (str): The final resolved power level event ID
@@ -439,8 +464,14 @@ def _mainline_sort(
     Returns:
         Deferred[list[str]]: The sorted list
     """
+    if not event_ids:
+        # It's possible for there to be no event IDs here to sort, so we can
+        # skip calculating the mainline in that case.
+        return []
+
     mainline = []
     pl = resolved_power_event_id
+    idx = 0
     while pl:
         mainline.append(pl)
         pl_ev = yield _get_event(room_id, pl, event_map, state_res_store)
@@ -454,17 +485,29 @@ def _mainline_sort(
                 pl = aid
                 break
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx != 0 and idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
+        idx += 1
+
     mainline_map = {ev_id: i + 1 for i, ev_id in enumerate(reversed(mainline))}
 
     event_ids = list(event_ids)
 
     order_map = {}
-    for ev_id in event_ids:
+    for idx, ev_id in enumerate(event_ids, start=1):
         depth = yield _get_mainline_depth_for_event(
             event_map[ev_id], mainline_map, event_map, state_res_store
         )
         order_map[ev_id] = (depth, event_map[ev_id].origin_server_ts, ev_id)
 
+        # We yield occasionally when we're working with large data sets to
+        # ensure that we don't block the reactor loop for too long.
+        if idx % _YIELD_AFTER_ITERATIONS == 0:
+            yield clock.sleep(0)
+
     event_ids.sort(key=lambda ev_id: order_map[ev_id])
 
     return event_ids
@@ -572,7 +615,7 @@ def lexicographical_topological_sort(graph, key):
     # `(key(node), node)` so that sorting does the right thing
     zero_outdegree = []
 
-    for node, edges in iteritems(graph):
+    for node, edges in graph.items():
         if len(edges) == 0:
             zero_outdegree.append((key(node), node))
 
diff --git a/synapse/static/client/login/index.html b/synapse/static/client/login/index.html
index 6fefdaaff7..9e6daf38ac 100644
--- a/synapse/static/client/login/index.html
+++ b/synapse/static/client/login/index.html
@@ -1,24 +1,24 @@
 <!doctype html>
 <html>
 <head>
-<title> Login </title>
-<meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
-<link rel="stylesheet" href="style.css">
-<script src="js/jquery-3.4.1.min.js"></script>
-<script src="js/login.js"></script>
+    <meta http-equiv="Content-Type" content="text/html; charset=utf-8">
+    <title> Login </title>
+    <meta name='viewport' content='width=device-width, initial-scale=1, user-scalable=no, minimum-scale=1.0, maximum-scale=1.0'>
+    <link rel="stylesheet" href="style.css">
+    <script src="js/jquery-3.4.1.min.js"></script>
+    <script src="js/login.js"></script>
 </head>
 <body onload="matrixLogin.onLoad()">
-    <center>
-        <br/>
+    <div id="container">
         <h1 id="title"></h1>
 
-        <span id="feedback" style="color: #f00"></span>
+        <span id="feedback"></span>
 
         <div id="loading">
             <img src="spinner.gif" />
         </div>
 
-        <div id="sso_flow" class="login_flow" style="display:none">
+        <div id="sso_flow" class="login_flow" style="display: none;">
             Single-sign on:
             <form id="sso_form" action="/_matrix/client/r0/login/sso/redirect" method="get">
                 <input id="sso_redirect_url" type="hidden" name="redirectUrl" value=""/>
@@ -26,9 +26,9 @@
             </form>
         </div>
 
-        <div id="password_flow" class="login_flow" style="display:none">
+        <div id="password_flow" class="login_flow" style="display: none;">
             Password Authentication:
-            <form onsubmit="matrixLogin.password_login(); return false;">
+            <form onsubmit="matrixLogin.passwordLogin(); return false;">
                 <input id="user_id" size="32" type="text" placeholder="Matrix ID (e.g. bob)" autocapitalize="off" autocorrect="off" />
                 <br/>
                 <input id="password" size="32" type="password" placeholder="Password"/>
@@ -38,9 +38,9 @@
             </form>
         </div>
 
-        <div id="no_login_types" type="button" class="login_flow" style="display:none">
+        <div id="no_login_types" type="button" class="login_flow" style="display: none;">
             Log in currently unavailable.
         </div>
-    </center>
+    </div>
 </body>
 </html>
diff --git a/synapse/static/client/login/js/login.js b/synapse/static/client/login/js/login.js
index ba8048b23f..3678670ec7 100644
--- a/synapse/static/client/login/js/login.js
+++ b/synapse/static/client/login/js/login.js
@@ -5,11 +5,11 @@ window.matrixLogin = {
 };
 
 // Titles get updated through the process to give users feedback.
-var TITLE_PRE_AUTH = "Log in with one of the following methods";
-var TITLE_POST_AUTH = "Logging in...";
+const TITLE_PRE_AUTH = "Log in with one of the following methods";
+const TITLE_POST_AUTH = "Logging in...";
 
 // The cookie used to store the original query parameters when using SSO.
-var COOKIE_KEY = "synapse_login_fallback_qs";
+const COOKIE_KEY = "synapse_login_fallback_qs";
 
 /*
  * Submit a login request.
@@ -20,9 +20,9 @@ var COOKIE_KEY = "synapse_login_fallback_qs";
  *     login request, e.g. device_id.
  * callback: (Optional) Function to call on successful login.
  */
-var submitLogin = function(type, data, extra, callback) {
+function submitLogin(type, data, extra, callback) {
     console.log("Logging in with " + type);
-    set_title(TITLE_POST_AUTH);
+    setTitle(TITLE_POST_AUTH);
 
     // Add the login type.
     data.type = type;
@@ -41,12 +41,15 @@ var submitLogin = function(type, data, extra, callback) {
         }
         matrixLogin.onLogin(response);
     }).fail(errorFunc);
-};
+}
 
-var errorFunc = function(err) {
+/*
+ * Display an error to the user and show the login form again.
+ */
+function errorFunc(err) {
     // We want to show the error to the user rather than redirecting immediately to the
     // SSO portal (if SSO is the only login option), so we inhibit the redirect.
-    show_login(true);
+    showLogin(true);
 
     if (err.responseJSON && err.responseJSON.error) {
         setFeedbackString(err.responseJSON.error + " (" + err.responseJSON.errcode + ")");
@@ -54,27 +57,42 @@ var errorFunc = function(err) {
     else {
         setFeedbackString("Request failed: " + err.status);
     }
-};
+}
 
-var setFeedbackString = function(text) {
+/*
+ * Display an error to the user.
+ */
+function setFeedbackString(text) {
     $("#feedback").text(text);
-};
+}
 
-var show_login = function(inhibit_redirect) {
-    // Set the redirect to come back to this page, a login token will get added
-    // and handled after the redirect.
-    var this_page = window.location.origin + window.location.pathname;
-    $("#sso_redirect_url").val(this_page);
+/*
+ * (Maybe) Show the login forms.
+ *
+ * This actually does a few unrelated functions:
+ *
+ * * Configures the SSO redirect URL to come back to this page.
+ * * Configures and shows the SSO form, if the server supports SSO.
+ * * Otherwise, shows the password form.
+ */
+function showLogin(inhibitRedirect) {
+    setTitle(TITLE_PRE_AUTH);
 
-    // If inhibit_redirect is false, and SSO is the only supported login method,
+    // If inhibitRedirect is false, and SSO is the only supported login method,
     // we can redirect straight to the SSO page.
     if (matrixLogin.serverAcceptsSso) {
+        // Set the redirect to come back to this page, a login token will get
+        // added as a query parameter and handled after the redirect.
+        $("#sso_redirect_url").val(window.location.origin + window.location.pathname);
+
         // Before submitting SSO, set the current query parameters into a cookie
         // for retrieval later.
         var qs = parseQsFromUrl();
         setCookie(COOKIE_KEY, JSON.stringify(qs));
 
-        if (!inhibit_redirect && !matrixLogin.serverAcceptsPassword) {
+        // If password is not supported and redirects are allowed, then submit
+        // the form (redirecting to the SSO provider).
+        if (!inhibitRedirect && !matrixLogin.serverAcceptsPassword) {
             $("#sso_form").submit();
             return;
         }
@@ -87,30 +105,39 @@ var show_login = function(inhibit_redirect) {
         $("#password_flow").show();
     }
 
+    // If neither password or SSO are supported, show an error to the user.
     if (!matrixLogin.serverAcceptsPassword && !matrixLogin.serverAcceptsSso) {
         $("#no_login_types").show();
     }
 
-    set_title(TITLE_PRE_AUTH);
-
     $("#loading").hide();
-};
+}
 
-var show_spinner = function() {
+/*
+ * Hides the forms and shows a loading throbber.
+ */
+function showSpinner() {
     $("#password_flow").hide();
     $("#sso_flow").hide();
     $("#no_login_types").hide();
     $("#loading").show();
-};
+}
 
-var set_title = function(title) {
+/*
+ * Helper to show the page's main title.
+ */
+function setTitle(title) {
     $("#title").text(title);
-};
+}
 
-var fetch_info = function(cb) {
+/*
+ * Query the login endpoint for the homeserver's supported flows.
+ *
+ * This populates matrixLogin.serverAccepts* variables.
+ */
+function fetchLoginFlows(cb) {
     $.get(matrixLogin.endpoint, function(response) {
-        var serverAcceptsPassword = false;
-        for (var i=0; i<response.flows.length; i++) {
+        for (var i = 0; i < response.flows.length; i++) {
             var flow = response.flows[i];
             if ("m.login.sso" === flow.type) {
                 matrixLogin.serverAcceptsSso = true;
@@ -126,27 +153,41 @@ var fetch_info = function(cb) {
     }).fail(errorFunc);
 }
 
+/*
+ * Called on load to fetch login flows and attempt SSO login (if a token is available).
+ */
 matrixLogin.onLoad = function() {
-    fetch_info(function() {
-        if (!try_token()) {
-            show_login(false);
+    fetchLoginFlows(function() {
+        // (Maybe) attempt logging in via SSO if a token is available.
+        if (!tryTokenLogin()) {
+            showLogin(false);
         }
     });
 };
 
-matrixLogin.password_login = function() {
+/*
+ * Submit simple user & password login.
+ */
+matrixLogin.passwordLogin = function() {
     var user = $("#user_id").val();
     var pwd = $("#password").val();
 
     setFeedbackString("");
 
-    show_spinner();
+    showSpinner();
     submitLogin(
         "m.login.password",
         {user: user, password: pwd},
         parseQsFromUrl());
 };
 
+/*
+ * The onLogin function gets called after a succesful login.
+ *
+ * It is expected that implementations override this to be notified when the
+ * login is complete. The response to the login call is provided as the single
+ * parameter.
+ */
 matrixLogin.onLogin = function(response) {
     // clobber this function
     console.warn("onLogin - This function should be replaced to proceed.");
@@ -155,7 +196,7 @@ matrixLogin.onLogin = function(response) {
 /*
  * Process the query parameters from the current URL into an object.
  */
-var parseQsFromUrl = function() {
+function parseQsFromUrl() {
     var pos = window.location.href.indexOf("?");
     if (pos == -1) {
         return {};
@@ -174,12 +215,12 @@ var parseQsFromUrl = function() {
         result[key] = val;
     });
     return result;
-};
+}
 
 /*
  * Process the cookies and return an object.
  */
-var parseCookies = function() {
+function parseCookies() {
     var allCookies = document.cookie;
     var result = {};
     allCookies.split(";").forEach(function(part) {
@@ -196,32 +237,32 @@ var parseCookies = function() {
         result[key] = val;
     });
     return result;
-};
+}
 
 /*
  * Set a cookie that is valid for 1 hour.
  */
-var setCookie = function(key, value) {
+function setCookie(key, value) {
     // The maximum age is set in seconds.
     var maxAge = 60 * 60;
     // Set the cookie, this defaults to the current domain and path.
     document.cookie = key + "=" + encodeURIComponent(value) + ";max-age=" + maxAge + ";sameSite=lax";
-};
+}
 
 /*
  * Removes a cookie by key.
  */
-var deleteCookie = function(key) {
+function deleteCookie(key) {
     // Delete a cookie by setting the expiration to 0. (Note that the value
     // doesn't matter.)
     document.cookie = key + "=deleted;expires=0";
-};
+}
 
 /*
  * Submits the login token if one is found in the query parameters. Returns a
  * boolean of whether the login token was found or not.
  */
-var try_token = function() {
+function tryTokenLogin() {
     // Check if the login token is in the query parameters.
     var qs = parseQsFromUrl();
 
@@ -233,18 +274,18 @@ var try_token = function() {
     // Retrieve the original query parameters (from before the SSO redirect).
     // They are stored as JSON in a cookie.
     var cookies = parseCookies();
-    var original_query_params = JSON.parse(cookies[COOKIE_KEY] || "{}")
+    var originalQueryParams = JSON.parse(cookies[COOKIE_KEY] || "{}")
 
     // If the login is successful, delete the cookie.
-    var callback = function() {
+    function callback() {
         deleteCookie(COOKIE_KEY);
     }
 
     submitLogin(
         "m.login.token",
         {token: loginToken},
-        original_query_params,
+        originalQueryParams,
         callback);
 
     return true;
-};
+}
diff --git a/synapse/static/client/login/style.css b/synapse/static/client/login/style.css
index 1cce5ed950..83e4f6abc8 100644
--- a/synapse/static/client/login/style.css
+++ b/synapse/static/client/login/style.css
@@ -31,20 +31,44 @@ form {
     margin: 10px 0 0 0;
 }
 
+/*
+ * Add some padding to the viewport.
+ */
+#container {
+    padding: 10px;
+}
+/*
+ * Center all direct children of the main form.
+ */
+#container > * {
+    display: block;
+    margin-left: auto;
+    margin-right: auto;
+    text-align: center;
+}
+
+/*
+ * A wrapper around each login flow.
+ */
 .login_flow {
     width: 300px;
     text-align: left;
     padding: 10px;
     margin-bottom: 40px;
 
-    -webkit-border-radius: 10px;
-    -moz-border-radius: 10px;
     border-radius: 10px;
-
-    -webkit-box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15);
-    -moz-box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15);
     box-shadow: 0px 0px 20px 0px rgba(0,0,0,0.15);
 
     background-color: #f8f8f8;
     border: 1px #ccc solid;
 }
+
+/*
+ * Used to show error content.
+ */
+#feedback {
+    /* Red text. */
+    color: #ff0000;
+    /* A little space to not overlap the box-shadow. */
+    margin-bottom: 20px;
+}
diff --git a/synapse/storage/data_stores/main/client_ips.py b/synapse/storage/data_stores/main/client_ips.py
index 71f8d43a76..995d4764a9 100644
--- a/synapse/storage/data_stores/main/client_ips.py
+++ b/synapse/storage/data_stores/main/client_ips.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import iteritems
-
 from twisted.internet import defer
 
 from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -421,7 +419,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
         ):
             self.database_engine.lock_table(txn, "user_ips")
 
-        for entry in iteritems(to_update):
+        for entry in to_update.items():
             (user_id, access_token, ip), (user_agent, device_id, last_seen) = entry
 
             try:
@@ -530,7 +528,7 @@ class ClientIpStore(ClientIpBackgroundUpdateStore):
                 "user_agent": user_agent,
                 "last_seen": last_seen,
             }
-            for (access_token, ip), (user_agent, last_seen) in iteritems(results)
+            for (access_token, ip), (user_agent, last_seen) in results.items()
         ]
 
     @wrap_as_background_process("prune_old_user_ips")
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index fb9f798e29..0ff0542453 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -17,8 +17,6 @@
 import logging
 from typing import List, Optional, Set, Tuple
 
-from six import iteritems
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -208,7 +206,7 @@ class DeviceWorkerStore(SQLBaseStore):
         )
 
         # add the updated cross-signing keys to the results list
-        for user_id, result in iteritems(cross_signing_keys_by_user):
+        for user_id, result in cross_signing_keys_by_user.items():
             result["user_id"] = user_id
             # FIXME: switch to m.signing_key_update when MSC1756 is merged into the spec
             results.append(("org.matrix.signing_key_update", result))
@@ -269,7 +267,7 @@ class DeviceWorkerStore(SQLBaseStore):
         )
 
         results = []
-        for user_id, user_devices in iteritems(devices):
+        for user_id, user_devices in devices.items():
             # The prev_id for the first row is always the last row before
             # `from_stream_id`
             prev_id = yield self._get_last_device_update_for_remote_user(
@@ -493,7 +491,7 @@ class DeviceWorkerStore(SQLBaseStore):
         if devices:
             user_devices = devices[user_id]
             results = []
-            for device_id, device in iteritems(user_devices):
+            for device_id, device in user_devices.items():
                 result = {"device_id": device_id}
 
                 key_json = device.get("key_json", None)
diff --git a/synapse/storage/data_stores/main/end_to_end_keys.py b/synapse/storage/data_stores/main/end_to_end_keys.py
index 20698bfd16..1a0842d4b0 100644
--- a/synapse/storage/data_stores/main/end_to_end_keys.py
+++ b/synapse/storage/data_stores/main/end_to_end_keys.py
@@ -16,8 +16,6 @@
 # limitations under the License.
 from typing import Dict, List
 
-from six import iteritems
-
 from canonicaljson import encode_canonical_json, json
 
 from twisted.enterprise.adbapi import Connection
@@ -64,9 +62,9 @@ class EndToEndKeyWorkerStore(SQLBaseStore):
         # Build the result structure, un-jsonify the results, and add the
         # "unsigned" section
         rv = {}
-        for user_id, device_keys in iteritems(results):
+        for user_id, device_keys in results.items():
             rv[user_id] = {}
-            for device_id, device_info in iteritems(device_keys):
+            for device_id, device_info in device_keys.items():
                 r = db_to_json(device_info.pop("key_json"))
                 r["unsigned"] = {}
                 display_name = device_info["device_display_name"]
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 24ce8c4330..a6bb3221ff 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,10 +14,9 @@
 # limitations under the License.
 import itertools
 import logging
+from queue import Empty, PriorityQueue
 from typing import Dict, List, Optional, Set, Tuple
 
-from six.moves.queue import Empty, PriorityQueue
-
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
diff --git a/synapse/storage/data_stores/main/event_push_actions.py b/synapse/storage/data_stores/main/event_push_actions.py
index 0321274de2..bc9f4f08ea 100644
--- a/synapse/storage/data_stores/main/event_push_actions.py
+++ b/synapse/storage/data_stores/main/event_push_actions.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six import iteritems
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -455,7 +453,7 @@ class EventPushActionsWorkerStore(SQLBaseStore):
                 sql,
                 (
                     _gen_entry(user_id, actions)
-                    for user_id, actions in iteritems(user_id_actions)
+                    for user_id, actions in user_id_actions.items()
                 ),
             )
 
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index a6572571b4..cfd24d2f06 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple
 from functools import wraps
 from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
-from six import integer_types, iteritems, text_type
-from six.moves import range
-
 import attr
 from canonicaljson import json
 from prometheus_client import Counter
@@ -232,10 +229,10 @@ class PersistEventsStore:
 
                 event_counter.labels(event.type, origin_type, origin_entity).inc()
 
-            for room_id, new_state in iteritems(current_state_for_room):
+            for room_id, new_state in current_state_for_room.items():
                 self.store.get_current_state_ids.prefill((room_id,), new_state)
 
-            for room_id, latest_event_ids in iteritems(new_forward_extremeties):
+            for room_id, latest_event_ids in new_forward_extremeties.items():
                 self.store.get_latest_event_ids_in_room.prefill(
                     (room_id,), list(latest_event_ids)
                 )
@@ -461,7 +458,7 @@ class PersistEventsStore:
         state_delta_by_room: Dict[str, DeltaState],
         stream_id: int,
     ):
-        for room_id, delta_state in iteritems(state_delta_by_room):
+        for room_id, delta_state in state_delta_by_room.items():
             to_delete = delta_state.to_delete
             to_insert = delta_state.to_insert
 
@@ -545,7 +542,7 @@ class PersistEventsStore:
                     """,
                     [
                         (room_id, key[0], key[1], ev_id, ev_id)
-                        for key, ev_id in iteritems(to_insert)
+                        for key, ev_id in to_insert.items()
                     ],
                 )
 
@@ -642,7 +639,7 @@ class PersistEventsStore:
     def _update_forward_extremities_txn(
         self, txn, new_forward_extremities, max_stream_order
     ):
-        for room_id, new_extrem in iteritems(new_forward_extremities):
+        for room_id, new_extrem in new_forward_extremities.items():
             self.db.simple_delete_txn(
                 txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
             )
@@ -655,7 +652,7 @@ class PersistEventsStore:
             table="event_forward_extremities",
             values=[
                 {"event_id": ev_id, "room_id": room_id}
-                for room_id, new_extrem in iteritems(new_forward_extremities)
+                for room_id, new_extrem in new_forward_extremities.items()
                 for ev_id in new_extrem
             ],
         )
@@ -672,7 +669,7 @@ class PersistEventsStore:
                     "event_id": event_id,
                     "stream_ordering": max_stream_order,
                 }
-                for room_id, new_extrem in iteritems(new_forward_extremities)
+                for room_id, new_extrem in new_forward_extremities.items()
                 for event_id in new_extrem
             ],
         )
@@ -727,7 +724,7 @@ class PersistEventsStore:
                     event.depth, depth_updates.get(event.room_id, event.depth)
                 )
 
-        for room_id, depth in iteritems(depth_updates):
+        for room_id, depth in depth_updates.items():
             self._update_min_depth_for_room_txn(txn, room_id, depth)
 
     def _update_outliers_txn(self, txn, events_and_contexts):
@@ -893,8 +890,7 @@ class PersistEventsStore:
                     "received_ts": self._clock.time_msec(),
                     "sender": event.sender,
                     "contains_url": (
-                        "url" in event.content
-                        and isinstance(event.content["url"], text_type)
+                        "url" in event.content and isinstance(event.content["url"], str)
                     ),
                 }
                 for event, _ in events_and_contexts
@@ -1345,10 +1341,10 @@ class PersistEventsStore:
         ):
             if (
                 "min_lifetime" in event.content
-                and not isinstance(event.content.get("min_lifetime"), integer_types)
+                and not isinstance(event.content.get("min_lifetime"), int)
             ) or (
                 "max_lifetime" in event.content
-                and not isinstance(event.content.get("max_lifetime"), integer_types)
+                and not isinstance(event.content.get("max_lifetime"), int)
             ):
                 # Ignore the event if one of the value isn't an integer.
                 return
@@ -1497,11 +1493,11 @@ class PersistEventsStore:
             table="event_to_state_groups",
             values=[
                 {"state_group": state_group_id, "event_id": event_id}
-                for event_id, state_group_id in iteritems(state_groups)
+                for event_id, state_group_id in state_groups.items()
             ],
         )
 
-        for event_id, state_group_id in iteritems(state_groups):
+        for event_id, state_group_id in state_groups.items():
             txn.call_after(
                 self.store._get_state_group_for_event.prefill,
                 (event_id,),
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index f54c8b1ee0..62d28f44dc 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import text_type
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
                     contains_url = "url" in content
                     if contains_url:
-                        contains_url &= isinstance(content["url"], text_type)
+                        contains_url &= isinstance(content["url"], str)
                 except (KeyError, AttributeError):
                     # If the event is missing a necessary field then
                     # skip over it.
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 213d69100a..a48c7a96ca 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore):
             "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
         )
 
-    def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+    async def get_all_new_backfill_event_rows(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for backfill replication stream, including all new
+        backfilled events and events that have gone from being outliers to not.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_new_backfill_event_rows(txn):
             sql = (
@@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (-last_id, -current_id, limit))
-            new_event_updates = txn.fetchall()
+            new_event_updates = [(row[0], row[1:]) for row in txn]
 
+            limited = False
             if len(new_event_updates) == limit:
                 upper_bound = new_event_updates[-1][0]
+                limited = True
             else:
                 upper_bound = current_id
 
@@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore):
                 " ORDER BY event_stream_ordering DESC"
             )
             txn.execute(sql, (-last_id, -upper_bound))
-            new_event_updates.extend(txn.fetchall())
+            new_event_updates.extend((row[0], row[1:]) for row in txn)
 
-            return new_event_updates
+            if len(new_event_updates) >= limit:
+                upper_bound = new_event_updates[-1][0]
+                limited = True
 
-        return self.db.runInteraction(
+            return new_event_updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
         )
 
diff --git a/synapse/storage/data_stores/main/media_repository.py b/synapse/storage/data_stores/main/media_repository.py
index 8aecd414c2..15bc13cbd0 100644
--- a/synapse/storage/data_stores/main/media_repository.py
+++ b/synapse/storage/data_stores/main/media_repository.py
@@ -81,6 +81,15 @@ class MediaRepositoryStore(MediaRepositoryBackgroundUpdateStore):
             desc="store_local_media",
         )
 
+    def mark_local_media_as_safe(self, media_id: str):
+        """Mark a local media as safe from quarantining."""
+        return self.db.simple_update_one(
+            table="local_media_repository",
+            keyvalues={"media_id": media_id},
+            updatevalues={"safe_from_quarantine": True},
+            desc="mark_local_media_as_safe",
+        )
+
     def get_url_cache(self, url, ts):
         """Get the media_id and ts for a cached URL as of the given timestamp
         Returns:
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index dab31e0c2d..7574612619 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Tuple
+
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore):
             )
             txn.execute(sql + clause, [stream_id] + list(args))
 
-    def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for presence replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_presence_updates_txn(txn):
             sql = """
@@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore):
                 LIMIT ?
             """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(row[0], row[1:]) for row in txn]
+
+            upper_bound = current_id
+            limited = False
+            if len(updates) >= limit:
+                upper_bound = updates[-1][0]
+                limited = True
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_presence_updates", get_all_presence_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index ef8f40959f..f6e78ca590 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -16,7 +16,7 @@
 
 import abc
 import logging
-from typing import Union
+from typing import List, Tuple, Union
 
 from canonicaljson import json
 
@@ -348,23 +348,53 @@ class PushRulesWorkerStore(
             results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
         return results
 
-    def get_all_push_rule_updates(self, last_id, current_id, limit):
-        """Get all the push rules changes that have happend on the server"""
+    async def get_all_push_rule_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for push_rules replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_push_rule_updates_txn(txn):
-            sql = (
-                "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
-                " op, priority_class, priority, conditions, actions"
-                " FROM push_rules_stream"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
+            sql = """
+                SELECT stream_id, user_id
+                FROM push_rules_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]
+
+            limited = False
+            upper_bound = current_id
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_push_rule_updates", get_all_push_rule_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index cebdcd409f..8f5505bd67 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -16,6 +16,7 @@
 
 import abc
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -24,6 +25,7 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
@@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore):
         }
         return results
 
-    def get_all_updated_receipts(self, last_id, current_id, limit=None):
+    def get_users_sent_receipts_between(self, last_id: int, current_id: int):
+        """Get all users who sent receipts between `last_id` exclusive and
+        `current_id` inclusive.
+
+        Returns:
+            Deferred[List[str]]
+        """
+
         if last_id == current_id:
             return defer.succeed([])
 
-        def get_all_updated_receipts_txn(txn):
-            sql = (
-                "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
-                " FROM receipts_linearized"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-            )
-            args = [last_id, current_id]
-            if limit is not None:
-                sql += " LIMIT ?"
-                args.append(limit)
-            txn.execute(sql, args)
+        def _get_users_sent_receipts_between_txn(txn):
+            sql = """
+                SELECT DISTINCT user_id FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (last_id, current_id))
 
-            return [r[0:5] + (json.loads(r[5]),) for r in txn]
+            return [r[0] for r in txn]
 
         return self.db.runInteraction(
+            "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
+        )
+
+    async def get_all_updated_receipts(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for receipts replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_all_updated_receipts_txn(txn):
+            sql = """
+                SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+                FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, limit))
+
+            updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]
+
+            limited = False
+            upper_bound = current_id
+
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
         )
 
@@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
             room_id, None, update_metrics=False
         )
 
-        # first handle the Deferred case
-        if isinstance(res, defer.Deferred):
-            if res.called:
-                res = res.result
+        # first handle the ObservableDeferred case
+        if isinstance(res, ObservableDeferred):
+            if res.has_called():
+                res = res.get_result()
             else:
                 res = None
 
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index 9768981891..587d4b91c1 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -19,8 +19,6 @@ import logging
 import re
 from typing import Optional
 
-from six import iterkeys
-
 from twisted.internet import defer
 from twisted.internet.defer import Deferred
 
@@ -753,7 +751,7 @@ class RegistrationWorkerStore(SQLBaseStore):
                 last_send_attempt, validated_at
                 FROM threepid_validation_session WHERE %s
                 """ % (
-                " AND ".join("%s = ?" % k for k in iterkeys(keyvalues)),
+                " AND ".join("%s = ?" % k for k in keyvalues.keys()),
             )
 
             if validated is not None:
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index 46f643c6b9..13e366536a 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -626,36 +626,10 @@ class RoomWorkerStore(SQLBaseStore):
 
         def _quarantine_media_in_room_txn(txn):
             local_mxcs, remote_mxcs = self._get_media_mxcs_in_room_txn(txn, room_id)
-            total_media_quarantined = 0
-
-            # Now update all the tables to set the quarantined_by flag
-
-            txn.executemany(
-                """
-                UPDATE local_media_repository
-                SET quarantined_by = ?
-                WHERE media_id = ?
-            """,
-                ((quarantined_by, media_id) for media_id in local_mxcs),
-            )
-
-            txn.executemany(
-                """
-                    UPDATE remote_media_cache
-                    SET quarantined_by = ?
-                    WHERE media_origin = ? AND media_id = ?
-                """,
-                (
-                    (quarantined_by, origin, media_id)
-                    for origin, media_id in remote_mxcs
-                ),
+            return self._quarantine_media_txn(
+                txn, local_mxcs, remote_mxcs, quarantined_by
             )
 
-            total_media_quarantined += len(local_mxcs)
-            total_media_quarantined += len(remote_mxcs)
-
-            return total_media_quarantined
-
         return self.db.runInteraction(
             "quarantine_media_in_room", _quarantine_media_in_room_txn
         )
@@ -805,17 +779,17 @@ class RoomWorkerStore(SQLBaseStore):
         Returns:
             The total number of media items quarantined
         """
-        total_media_quarantined = 0
-
         # Update all the tables to set the quarantined_by flag
         txn.executemany(
             """
             UPDATE local_media_repository
             SET quarantined_by = ?
-            WHERE media_id = ?
+            WHERE media_id = ? AND safe_from_quarantine = ?
         """,
-            ((quarantined_by, media_id) for media_id in local_mxcs),
+            ((quarantined_by, media_id, False) for media_id in local_mxcs),
         )
+        # Note that a rowcount of -1 can be used to indicate no rows were affected.
+        total_media_quarantined = txn.rowcount if txn.rowcount > 0 else 0
 
         txn.executemany(
             """
@@ -825,9 +799,7 @@ class RoomWorkerStore(SQLBaseStore):
             """,
             ((quarantined_by, origin, media_id) for origin, media_id in remote_mxcs),
         )
-
-        total_media_quarantined += len(local_mxcs)
-        total_media_quarantined += len(remote_mxcs)
+        total_media_quarantined += txn.rowcount if txn.rowcount > 0 else 0
 
         return total_media_quarantined
 
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 137ebac833..44bab65eac 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Iterable, List, Set
 
-from six import iteritems, itervalues
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -544,7 +542,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
         users_in_room = {}
         member_event_ids = [
             e_id
-            for key, e_id in iteritems(current_state_ids)
+            for key, e_id in current_state_ids.items()
             if key[0] == EventTypes.Member
         ]
 
@@ -561,7 +559,7 @@ class RoomMemberWorkerStore(EventsWorkerStore):
                     users_in_room = dict(prev_res)
                     member_event_ids = [
                         e_id
-                        for key, e_id in iteritems(context.delta_ids)
+                        for key, e_id in context.delta_ids.items()
                         if key[0] == EventTypes.Member
                     ]
                     for etype, state_key in context.delta_ids:
@@ -1101,7 +1099,7 @@ class _JoinedHostsCache(object):
             if state_entry.state_group == self.state_group:
                 pass
             elif state_entry.prev_group == self.state_group:
-                for (typ, state_key), event_id in iteritems(state_entry.delta_ids):
+                for (typ, state_key), event_id in state_entry.delta_ids.items():
                     if typ != EventTypes.Member:
                         continue
 
@@ -1131,7 +1129,7 @@ class _JoinedHostsCache(object):
                 self.state_group = state_entry.state_group
             else:
                 self.state_group = object()
-            self._len = sum(len(v) for v in itervalues(self.hosts_to_joined_users))
+            self._len = sum(len(v) for v in self.hosts_to_joined_users.values())
         return frozenset(self.hosts_to_joined_users)
 
     def __len__(self):
diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
index 9b95411fb6..b42c02710a 100644
--- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py
+++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
@@ -13,8 +13,6 @@
 # limitations under the License.
 import logging
 
-from six.moves import range
-
 from synapse.config.appservice import load_appservices
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
new file mode 100644
index 0000000000..597f2ffd3d
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.postgres
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT FALSE;
diff --git a/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
new file mode 100644
index 0000000000..69db89ac0e
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/58/08_media_safe_from_quarantine.sql.sqlite
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- The local_media_repository should have files which do not get quarantined,
+-- e.g. files from sticker packs.
+ALTER TABLE local_media_repository ADD COLUMN safe_from_quarantine BOOLEAN NOT NULL DEFAULT 0;
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 13f49d8060..a8381dc577 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -17,8 +17,6 @@ import logging
 import re
 from collections import namedtuple
 
-from six import string_types
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                     # skip over it.
                     continue
 
-                if not isinstance(value, string_types):
+                if not isinstance(value, str):
                     # If the event body, name or topic isn't a string
                     # then skip over it
                     continue
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index e89f0bffb5..379d758b5d 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -40,8 +40,6 @@ import abc
 import logging
 from collections import namedtuple
 
-from six.moves import range
-
 from twisted.internet import defer
 
 from synapse.logging.context import make_deferred_yieldable, run_in_background
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 4219018302..f8c776be3f 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six.moves import range
-
 from canonicaljson import json
 
 from twisted.internet import defer
diff --git a/synapse/storage/data_stores/main/ui_auth.py b/synapse/storage/data_stores/main/ui_auth.py
index 1d8ee22fb1..ec2f38c373 100644
--- a/synapse/storage/data_stores/main/ui_auth.py
+++ b/synapse/storage/data_stores/main/ui_auth.py
@@ -186,7 +186,7 @@ class UIAuthWorkerStore(SQLBaseStore):
         # The clientdict gets stored as JSON.
         clientdict_json = json.dumps(clientdict)
 
-        self.db.simple_update_one(
+        await self.db.simple_update_one(
             table="ui_auth_sessions",
             keyvalues={"session_id": session_id},
             updatevalues={"clientdict": clientdict_json},
diff --git a/synapse/storage/data_stores/state/bg_updates.py b/synapse/storage/data_stores/state/bg_updates.py
index ff000bc9ec..be1fe97d79 100644
--- a/synapse/storage/data_stores/state/bg_updates.py
+++ b/synapse/storage/data_stores/state/bg_updates.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import iteritems
-
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore
@@ -280,7 +278,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
 
                         delta_state = {
                             key: value
-                            for key, value in iteritems(curr_state)
+                            for key, value in curr_state.items()
                             if prev_state.get(key, None) != value
                         }
 
@@ -316,7 +314,7 @@ class StateBackgroundUpdateStore(StateGroupBackgroundUpdateStore):
                                     "state_key": key[1],
                                     "event_id": state_id,
                                 }
-                                for key, state_id in iteritems(delta_state)
+                                for key, state_id in delta_state.items()
                             ],
                         )
 
diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py
index f3ad1e4369..5db9f20135 100644
--- a/synapse/storage/data_stores/state/store.py
+++ b/synapse/storage/data_stores/state/store.py
@@ -17,9 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Dict, Iterable, List, Set, Tuple
 
-from six import iteritems
-from six.moves import range
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
@@ -263,7 +260,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
         # And finally update the result dict, by filtering out any extra
         # stuff we pulled out of the database.
-        for group, group_state_dict in iteritems(group_to_state_dict):
+        for group, group_state_dict in group_to_state_dict.items():
             # We just replace any existing entries, as we will have loaded
             # everything we need from the database anyway.
             state[group] = state_filter.filter_state(group_state_dict)
@@ -341,11 +338,11 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
         else:
             non_member_types = non_member_filter.concrete_types()
 
-        for group, group_state_dict in iteritems(group_to_state_dict):
+        for group, group_state_dict in group_to_state_dict.items():
             state_dict_members = {}
             state_dict_non_members = {}
 
-            for k, v in iteritems(group_state_dict):
+            for k, v in group_state_dict.items():
                 if k[0] == EventTypes.Member:
                     state_dict_members[k] = v
                 else:
@@ -432,7 +429,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in iteritems(delta_ids)
+                        for key, state_id in delta_ids.items()
                     ],
                 )
             else:
@@ -447,7 +444,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
                             "state_key": key[1],
                             "event_id": state_id,
                         }
-                        for key, state_id in iteritems(current_state_ids)
+                        for key, state_id in current_state_ids.items()
                     ],
                 )
 
@@ -458,7 +455,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
             current_member_state_ids = {
                 s: ev
-                for (s, ev) in iteritems(current_state_ids)
+                for (s, ev) in current_state_ids.items()
                 if s[0] == EventTypes.Member
             }
             txn.call_after(
@@ -470,7 +467,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
 
             current_non_member_state_ids = {
                 s: ev
-                for (s, ev) in iteritems(current_state_ids)
+                for (s, ev) in current_state_ids.items()
                 if s[0] != EventTypes.Member
             }
             txn.call_after(
@@ -555,7 +552,7 @@ class StateGroupDataStore(StateBackgroundUpdateStore, SQLBaseStore):
                         "state_key": key[1],
                         "event_id": state_id,
                     }
-                    for key, state_id in iteritems(curr_state)
+                    for key, state_id in curr_state.items()
                 ],
             )
 
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index b112ff3df2..3be20c866a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -16,6 +16,7 @@
 # limitations under the License.
 import logging
 import time
+from sys import intern
 from time import monotonic as monotonic_time
 from typing import (
     Any,
@@ -29,9 +30,6 @@ from typing import (
     TypeVar,
 )
 
-from six import iteritems, iterkeys, itervalues
-from six.moves import intern, range
-
 from prometheus_client import Histogram
 
 from twisted.enterprise import adbapi
@@ -259,7 +257,7 @@ class PerformanceCounters(object):
 
     def interval(self, interval_duration_secs, limit=3):
         counters = []
-        for name, (count, cum_time) in iteritems(self.current_counters):
+        for name, (count, cum_time) in self.current_counters.items():
             prev_count, prev_time = self.previous_counters.get(name, (0, 0))
             counters.append(
                 (
@@ -1053,7 +1051,7 @@ class Database(object):
         sql = ("SELECT %(retcol)s FROM %(table)s") % {"retcol": retcol, "table": table}
 
         if keyvalues:
-            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+            sql += " WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
             txn.execute(sql, list(keyvalues.values()))
         else:
             txn.execute(sql)
@@ -1191,7 +1189,7 @@ class Database(object):
         clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
         clauses = [clause]
 
-        for key, value in iteritems(keyvalues):
+        for key, value in keyvalues.items():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -1212,7 +1210,7 @@ class Database(object):
     @staticmethod
     def simple_update_txn(txn, table, keyvalues, updatevalues):
         if keyvalues:
-            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in iterkeys(keyvalues))
+            where = "WHERE %s" % " AND ".join("%s = ?" % k for k in keyvalues.keys())
         else:
             where = ""
 
@@ -1351,7 +1349,7 @@ class Database(object):
         clause, values = make_in_list_sql_clause(txn.database_engine, column, iterable)
         clauses = [clause]
 
-        for key, value in iteritems(keyvalues):
+        for key, value in keyvalues.items():
             clauses.append("%s = ?" % (key,))
             values.append(value)
 
@@ -1388,7 +1386,7 @@ class Database(object):
         txn.close()
 
         if cache:
-            min_val = min(itervalues(cache))
+            min_val = min(cache.values())
         else:
             min_val = max_value
 
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index f159400a87..ec894a91cb 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -20,9 +20,6 @@ import logging
 from collections import deque, namedtuple
 from typing import Iterable, List, Optional, Set, Tuple
 
-from six import iteritems
-from six.moves import range
-
 from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
@@ -218,7 +215,7 @@ class EventsPersistenceStorage(object):
             partitioned.setdefault(event.room_id, []).append((event, ctx))
 
         deferreds = []
-        for room_id, evs_ctxs in iteritems(partitioned):
+        for room_id, evs_ctxs in partitioned.items():
             d = self._event_persist_queue.add_to_queue(
                 room_id, evs_ctxs, backfilled=backfilled
             )
@@ -319,7 +316,7 @@ class EventsPersistenceStorage(object):
                             (event, context)
                         )
 
-                    for room_id, ev_ctx_rm in iteritems(events_by_room):
+                    for room_id, ev_ctx_rm in events_by_room.items():
                         latest_event_ids = await self.main_store.get_latest_event_ids_in_room(
                             room_id
                         )
@@ -674,7 +671,7 @@ class EventsPersistenceStorage(object):
 
         to_insert = {
             key: ev_id
-            for key, ev_id in iteritems(current_state)
+            for key, ev_id in current_state.items()
             if ev_id != existing_state.get(key)
         }
 
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index c522c80922..dc568476f4 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -16,8 +16,6 @@
 import logging
 from typing import Iterable, List, TypeVar
 
-from six import iteritems, itervalues
-
 import attr
 
 from twisted.internet import defer
@@ -51,7 +49,7 @@ class StateFilter(object):
         # If `include_others` is set we canonicalise the filter by removing
         # wildcards from the types dictionary
         if self.include_others:
-            self.types = {k: v for k, v in iteritems(self.types) if v is not None}
+            self.types = {k: v for k, v in self.types.items() if v is not None}
 
     @staticmethod
     def all():
@@ -150,7 +148,7 @@ class StateFilter(object):
 
         has_non_member_wildcard = self.include_others or any(
             state_keys is None
-            for t, state_keys in iteritems(self.types)
+            for t, state_keys in self.types.items()
             if t != EventTypes.Member
         )
 
@@ -199,7 +197,7 @@ class StateFilter(object):
 
         # First we build up a lost of clauses for each type/state_key combo
         clauses = []
-        for etype, state_keys in iteritems(self.types):
+        for etype, state_keys in self.types.items():
             if state_keys is None:
                 clauses.append("(type = ?)")
                 where_args.append(etype)
@@ -251,7 +249,7 @@ class StateFilter(object):
             return dict(state_dict)
 
         filtered_state = {}
-        for k, v in iteritems(state_dict):
+        for k, v in state_dict.items():
             typ, state_key = k
             if typ in self.types:
                 state_keys = self.types[typ]
@@ -279,7 +277,7 @@ class StateFilter(object):
         """
 
         return self.include_others or any(
-            state_keys is None for state_keys in itervalues(self.types)
+            state_keys is None for state_keys in self.types.values()
         )
 
     def concrete_types(self):
@@ -292,7 +290,7 @@ class StateFilter(object):
         """
         return [
             (t, s)
-            for t, state_keys in iteritems(self.types)
+            for t, state_keys in self.types.items()
             if state_keys is not None
             for s in state_keys
         ]
@@ -324,7 +322,7 @@ class StateFilter(object):
             member_filter = StateFilter.none()
 
         non_member_filter = StateFilter(
-            types={k: v for k, v in iteritems(self.types) if k != EventTypes.Member},
+            types={k: v for k, v in self.types.items() if k != EventTypes.Member},
             include_others=self.include_others,
         )
 
@@ -366,7 +364,7 @@ class StateGroupStorage(object):
 
         event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
 
-        groups = set(itervalues(event_to_groups))
+        groups = set(event_to_groups.values())
         group_to_state = yield self.stores.state._get_state_for_groups(groups)
 
         return group_to_state
@@ -400,8 +398,8 @@ class StateGroupStorage(object):
         state_event_map = yield self.stores.main.get_events(
             [
                 ev_id
-                for group_ids in itervalues(group_to_ids)
-                for ev_id in itervalues(group_ids)
+                for group_ids in group_to_ids.values()
+                for ev_id in group_ids.values()
             ],
             get_prev_content=False,
         )
@@ -409,10 +407,10 @@ class StateGroupStorage(object):
         return {
             group: [
                 state_event_map[v]
-                for v in itervalues(event_id_map)
+                for v in event_id_map.values()
                 if v in state_event_map
             ]
-            for group, event_id_map in iteritems(group_to_ids)
+            for group, event_id_map in group_to_ids.items()
         }
 
     def _get_state_groups_from_groups(
@@ -444,23 +442,23 @@ class StateGroupStorage(object):
         """
         event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
 
-        groups = set(itervalues(event_to_groups))
+        groups = set(event_to_groups.values())
         group_to_state = yield self.stores.state._get_state_for_groups(
             groups, state_filter
         )
 
         state_event_map = yield self.stores.main.get_events(
-            [ev_id for sd in itervalues(group_to_state) for ev_id in itervalues(sd)],
+            [ev_id for sd in group_to_state.values() for ev_id in sd.values()],
             get_prev_content=False,
         )
 
         event_to_state = {
             event_id: {
                 k: state_event_map[v]
-                for k, v in iteritems(group_to_state[group])
+                for k, v in group_to_state[group].items()
                 if v in state_event_map
             }
-            for event_id, group in iteritems(event_to_groups)
+            for event_id, group in event_to_groups.items()
         }
 
         return {event: event_to_state[event] for event in event_ids}
@@ -481,14 +479,14 @@ class StateGroupStorage(object):
         """
         event_to_groups = yield self.stores.main._get_state_group_for_events(event_ids)
 
-        groups = set(itervalues(event_to_groups))
+        groups = set(event_to_groups.values())
         group_to_state = yield self.stores.state._get_state_for_groups(
             groups, state_filter
         )
 
         event_to_state = {
             event_id: group_to_state[group]
-            for event_id, group in iteritems(event_to_groups)
+            for event_id, group in event_to_groups.items()
         }
 
         return {event: event_to_state[event] for event in event_ids}
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f7af2bca7f..65abf0846e 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -19,8 +19,6 @@ import logging
 from contextlib import contextmanager
 from typing import Dict, Sequence, Set, Union
 
-from six.moves import range
-
 import attr
 
 from twisted.internet import defer
@@ -95,7 +93,7 @@ class ObservableDeferred(object):
 
         This returns a brand new deferred that is resolved when the underlying
         deferred is resolved. Interacting with the returned deferred does not
-        effect the underdlying deferred.
+        effect the underlying deferred.
         """
         if not self._result:
             d = defer.Deferred()
diff --git a/synapse/util/caches/descriptors.py b/synapse/util/caches/descriptors.py
index cd48262420..64f35fc288 100644
--- a/synapse/util/caches/descriptors.py
+++ b/synapse/util/caches/descriptors.py
@@ -21,8 +21,6 @@ import threading
 from typing import Any, Tuple, Union, cast
 from weakref import WeakValueDictionary
 
-from six import itervalues
-
 from prometheus_client import Gauge
 from typing_extensions import Protocol
 
@@ -281,7 +279,7 @@ class Cache(object):
     def invalidate_all(self):
         self.check_thread()
         self.cache.clear()
-        for entry in itervalues(self._pending_deferred_cache):
+        for entry in self._pending_deferred_cache.values():
             entry.invalidate()
         self._pending_deferred_cache.clear()
 
diff --git a/synapse/util/caches/expiringcache.py b/synapse/util/caches/expiringcache.py
index 2726b67b6d..89a3420f92 100644
--- a/synapse/util/caches/expiringcache.py
+++ b/synapse/util/caches/expiringcache.py
@@ -16,8 +16,6 @@
 import logging
 from collections import OrderedDict
 
-from six import iteritems, itervalues
-
 from synapse.config import cache as cache_config
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.util.caches import register_cache
@@ -150,7 +148,7 @@ class ExpiringCache(object):
 
         keys_to_delete = set()
 
-        for key, cache_entry in iteritems(self._cache):
+        for key, cache_entry in self._cache.items():
             if now - cache_entry.time > self._expiry_ms:
                 keys_to_delete.add(key)
 
@@ -170,7 +168,7 @@ class ExpiringCache(object):
 
     def __len__(self):
         if self.iterable:
-            return sum(len(entry.value) for entry in itervalues(self._cache))
+            return sum(len(entry.value) for entry in self._cache.values())
         else:
             return len(self._cache)
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2a161bf244..c541bf4579 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -17,8 +17,6 @@ import logging
 import math
 from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
 
-from six import integer_types
-
 from sortedcontainers import SortedDict
 
 from synapse.types import Collection
@@ -88,7 +86,7 @@ class StreamChangeCache:
     def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) in integer_types
+        assert isinstance(stream_pos, int)
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/caches/treecache.py b/synapse/util/caches/treecache.py
index 2ea4e4e911..ecd9948e79 100644
--- a/synapse/util/caches/treecache.py
+++ b/synapse/util/caches/treecache.py
@@ -1,7 +1,5 @@
 from typing import Dict
 
-from six import itervalues
-
 SENTINEL = object()
 
 
@@ -81,7 +79,7 @@ def iterate_tree_cache_entry(d):
     can contain dicts.
     """
     if isinstance(d, dict):
-        for value_d in itervalues(d):
+        for value_d in d.values():
             for value in iterate_tree_cache_entry(value_d):
                 yield value
     else:
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 8b17d1c8b8..6a3f6177b1 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six.moves import queue
+import queue
 
 from twisted.internet import threads
 
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 9815bb8667..eab78dd256 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six import binary_type, text_type
-
 from canonicaljson import json
 from frozendict import frozendict
 
@@ -26,7 +24,7 @@ def freeze(o):
     if isinstance(o, frozendict):
         return o
 
-    if isinstance(o, (binary_type, text_type)):
+    if isinstance(o, (bytes, str)):
         return o
 
     try:
@@ -41,7 +39,7 @@ def unfreeze(o):
     if isinstance(o, (dict, frozendict)):
         return dict({k: unfreeze(v) for k, v in o.items()})
 
-    if isinstance(o, (binary_type, text_type)):
+    if isinstance(o, (bytes, str)):
         return o
 
     try:
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 9bf6a44f75..023beb5ede 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six.moves import range
-
 
 class _Entry(object):
     __slots__ = ["end_key", "queue"]
diff --git a/synapse/visibility.py b/synapse/visibility.py
index bab41182b9..3dfd4af26c 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -16,9 +16,6 @@
 import logging
 import operator
 
-from six import iteritems, itervalues
-from six.moves import map
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
@@ -298,7 +295,7 @@ def filter_events_for_server(
                 # membership states for the requesting server to determine
                 # if the server is either in the room or has been invited
                 # into the room.
-                for ev in itervalues(state):
+                for ev in state.values():
                     if ev.type != EventTypes.Member:
                         continue
                     try:
@@ -332,7 +329,7 @@ def filter_events_for_server(
     )
 
     visibility_ids = set()
-    for sids in itervalues(event_to_state_ids):
+    for sids in event_to_state_ids.values():
         hist = sids.get((EventTypes.RoomHistoryVisibility, ""))
         if hist:
             visibility_ids.add(hist)
@@ -345,7 +342,7 @@ def filter_events_for_server(
         event_map = yield storage.main.get_events(visibility_ids)
         all_open = all(
             e.content.get("history_visibility") in (None, "shared", "world_readable")
-            for e in itervalues(event_map)
+            for e in event_map.values()
         )
 
     if not check_history_visibility_only:
@@ -394,8 +391,8 @@ def filter_events_for_server(
     #
     event_id_to_state_key = {
         event_id: key
-        for key_to_eid in itervalues(event_to_state_ids)
-        for key, event_id in iteritems(key_to_eid)
+        for key_to_eid in event_to_state_ids.values()
+        for key, event_id in key_to_eid.items()
     }
 
     def include(typ, state_key):
@@ -409,20 +406,16 @@ def filter_events_for_server(
         return state_key[idx + 1 :] == server_name
 
     event_map = yield storage.main.get_events(
-        [
-            e_id
-            for e_id, key in iteritems(event_id_to_state_key)
-            if include(key[0], key[1])
-        ]
+        [e_id for e_id, key in event_id_to_state_key.items() if include(key[0], key[1])]
     )
 
     event_to_state = {
         e_id: {
             key: event_map[inner_e_id]
-            for key, inner_e_id in iteritems(key_to_eid)
+            for key, inner_e_id in key_to_eid.items()
             if inner_e_id in event_map
         }
-        for e_id, key_to_eid in iteritems(event_to_state_ids)
+        for e_id, key_to_eid in event_to_state_ids.items()
     }
 
     to_return = []