summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/_scripts/register_new_matrix_user.py2
-rw-r--r--synapse/api/errors.py7
-rw-r--r--synapse/api/filtering.py4
-rw-r--r--synapse/api/urls.py3
-rw-r--r--synapse/app/_base.py8
-rw-r--r--synapse/app/generic_worker.py36
-rw-r--r--synapse/app/homeserver.py50
-rw-r--r--synapse/appservice/__init__.py4
-rw-r--r--synapse/appservice/api.py3
-rw-r--r--synapse/config/_base.py6
-rw-r--r--synapse/config/appservice.py13
-rw-r--r--synapse/config/oidc_config.py2
-rw-r--r--synapse/config/server.py235
-rw-r--r--synapse/config/tls.py4
-rw-r--r--synapse/config/workers.py24
-rw-r--r--synapse/crypto/keyring.py6
-rw-r--r--synapse/events/utils.py4
-rw-r--r--synapse/events/validator.py12
-rw-r--r--synapse/federation/federation_base.py4
-rw-r--r--synapse/federation/federation_server.py4
-rw-r--r--synapse/federation/transport/client.py3
-rw-r--r--synapse/groups/groups_server.py4
-rw-r--r--synapse/handlers/cas_handler.py3
-rw-r--r--synapse/handlers/device.py1
-rw-r--r--synapse/handlers/devicemessage.py25
-rw-r--r--synapse/handlers/e2e_room_keys.py1
-rw-r--r--synapse/handlers/federation.py9
-rw-r--r--synapse/handlers/message.py4
-rw-r--r--synapse/handlers/pagination.py24
-rw-r--r--synapse/handlers/presence.py29
-rw-r--r--synapse/handlers/profile.py8
-rw-r--r--synapse/handlers/room.py4
-rw-r--r--synapse/handlers/room_member.py7
-rw-r--r--synapse/handlers/typing.py40
-rw-r--r--synapse/http/client.py8
-rw-r--r--synapse/http/federation/matrix_federation_agent.py10
-rw-r--r--synapse/http/federation/well_known_resolver.py17
-rw-r--r--synapse/http/matrixfederationclient.py21
-rw-r--r--synapse/http/server.py4
-rw-r--r--synapse/http/site.py6
-rw-r--r--synapse/logging/formatter.py3
-rw-r--r--synapse/module_api/__init__.py12
-rw-r--r--synapse/push/mailer.py3
-rw-r--r--synapse/push/push_rule_evaluator.py8
-rw-r--r--synapse/push/pusherpool.py4
-rw-r--r--synapse/python_dependencies.py6
-rw-r--r--synapse/replication/http/_base.py6
-rw-r--r--synapse/replication/tcp/streams/_base.py29
-rw-r--r--synapse/rest/admin/users.py20
-rw-r--r--synapse/rest/client/v1/login.py12
-rw-r--r--synapse/rest/client/v1/presence.py4
-rw-r--r--synapse/rest/client/v1/room.py3
-rw-r--r--synapse/rest/client/v2_alpha/account.py11
-rw-r--r--synapse/rest/client/v2_alpha/register.py11
-rw-r--r--synapse/rest/client/v2_alpha/report_event.py10
-rw-r--r--synapse/rest/consent/consent_resource.py5
-rw-r--r--synapse/rest/media/v1/_base.py3
-rw-r--r--synapse/rest/media/v1/media_storage.py6
-rw-r--r--synapse/rest/media/v1/preview_url_resource.py9
-rw-r--r--synapse/server_notices/consent_server_notices.py4
-rw-r--r--synapse/storage/data_stores/main/event_federation.py3
-rw-r--r--synapse/storage/data_stores/main/events.py10
-rw-r--r--synapse/storage/data_stores/main/events_bg_updates.py4
-rw-r--r--synapse/storage/data_stores/main/events_worker.py41
-rw-r--r--synapse/storage/data_stores/main/presence.py41
-rw-r--r--synapse/storage/data_stores/main/push_rule.py56
-rw-r--r--synapse/storage/data_stores/main/receipts.py91
-rw-r--r--synapse/storage/data_stores/main/schema/delta/30/as_users.py2
-rw-r--r--synapse/storage/data_stores/main/search.py4
-rw-r--r--synapse/storage/data_stores/main/stream.py2
-rw-r--r--synapse/storage/data_stores/main/tags.py2
-rw-r--r--synapse/storage/data_stores/state/store.py2
-rw-r--r--synapse/storage/database.py3
-rw-r--r--synapse/storage/persist_events.py2
-rw-r--r--synapse/util/async_helpers.py4
-rw-r--r--synapse/util/caches/stream_change_cache.py4
-rw-r--r--synapse/util/file_consumer.py2
-rw-r--r--synapse/util/frozenutils.py6
-rw-r--r--synapse/util/wheel_timer.py2
-rw-r--r--synapse/visibility.py2
81 files changed, 638 insertions, 475 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 1d9d85a727..4d39996a2e 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.15.0"
+__version__ = "1.15.1"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/_scripts/register_new_matrix_user.py b/synapse/_scripts/register_new_matrix_user.py
index d528450c78..55cce2db22 100644
--- a/synapse/_scripts/register_new_matrix_user.py
+++ b/synapse/_scripts/register_new_matrix_user.py
@@ -23,8 +23,6 @@ import hmac
 import logging
 import sys
 
-from six.moves import input
-
 import requests as _requests
 import yaml
 
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index a07a54580d..5305038c21 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -17,10 +17,9 @@
 """Contains exceptions and error codes."""
 
 import logging
+from http import HTTPStatus
 from typing import Dict, List
 
-from six.moves import http_client
-
 from canonicaljson import json
 
 from twisted.web import http
@@ -173,7 +172,7 @@ class ConsentNotGivenError(SynapseError):
             consent_url (str): The URL where the user can give their consent
         """
         super(ConsentNotGivenError, self).__init__(
-            code=http_client.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
+            code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.CONSENT_NOT_GIVEN
         )
         self._consent_uri = consent_uri
 
@@ -193,7 +192,7 @@ class UserDeactivatedError(SynapseError):
             msg (str): The human-readable error message
         """
         super(UserDeactivatedError, self).__init__(
-            code=http_client.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
+            code=HTTPStatus.FORBIDDEN, msg=msg, errcode=Codes.USER_DEACTIVATED
         )
 
 
diff --git a/synapse/api/filtering.py b/synapse/api/filtering.py
index 8b64d0a285..f988f62a1e 100644
--- a/synapse/api/filtering.py
+++ b/synapse/api/filtering.py
@@ -17,8 +17,6 @@
 # limitations under the License.
 from typing import List
 
-from six import text_type
-
 import jsonschema
 from canonicaljson import json
 from jsonschema import FormatChecker
@@ -313,7 +311,7 @@ class Filter(object):
 
             content = event.get("content", {})
             # check if there is a string url field in the content for filtering purposes
-            contains_url = isinstance(content.get("url"), text_type)
+            contains_url = isinstance(content.get("url"), str)
             labels = content.get(EventContentFields.LABELS, [])
 
         return self.check_fields(room_id, sender, ev_type, labels, contains_url)
diff --git a/synapse/api/urls.py b/synapse/api/urls.py
index f34434bd67..bd03ebca5a 100644
--- a/synapse/api/urls.py
+++ b/synapse/api/urls.py
@@ -17,8 +17,7 @@
 """Contains the URL paths to prefix various aspects of the server with. """
 import hmac
 from hashlib import sha256
-
-from six.moves.urllib.parse import urlencode
+from urllib.parse import urlencode
 
 from synapse.config import ConfigError
 
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index dedff81af3..373a80a4a7 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -20,6 +20,7 @@ import signal
 import socket
 import sys
 import traceback
+from typing import Iterable
 
 from daemonize import Daemonize
 from typing_extensions import NoReturn
@@ -29,6 +30,7 @@ from twisted.protocols.tls import TLSMemoryBIOFactory
 
 import synapse
 from synapse.app import check_bind_error
+from synapse.config.server import ListenerConfig
 from synapse.crypto import context_factory
 from synapse.logging.context import PreserveLoggingContext
 from synapse.util.async_helpers import Linearizer
@@ -234,7 +236,7 @@ def refresh_certificate(hs):
         logger.info("Context factories updated.")
 
 
-def start(hs, listeners=None):
+def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
     """
     Start a Synapse server or worker.
 
@@ -245,8 +247,8 @@ def start(hs, listeners=None):
     notify systemd.
 
     Args:
-        hs (synapse.server.HomeServer)
-        listeners (list[dict]): Listener configuration ('listeners' in homeserver.yaml)
+        hs: homeserver instance
+        listeners: Listener configuration ('listeners' in homeserver.yaml)
     """
     try:
         # Set up the SIGHUP machinery.
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 53c488d211..27a3fc9ed6 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -37,6 +37,7 @@ from synapse.app import _base
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
+from synapse.config.server import ListenerConfig
 from synapse.federation import send_queue
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.handlers.presence import (
@@ -514,13 +515,18 @@ class GenericWorkerSlavedStore(
 class GenericWorkerServer(HomeServer):
     DATASTORE_CLASS = GenericWorkerSlavedStore
 
-    def _listen_http(self, listener_config):
-        port = listener_config["port"]
-        bind_addresses = listener_config["bind_addresses"]
-        site_tag = listener_config.get("tag", port)
+    def _listen_http(self, listener_config: ListenerConfig):
+        port = listener_config.port
+        bind_addresses = listener_config.bind_addresses
+
+        assert listener_config.http_options is not None
+
+        site_tag = listener_config.http_options.tag
+        if site_tag is None:
+            site_tag = port
         resources = {}
-        for res in listener_config["resources"]:
-            for name in res["names"]:
+        for res in listener_config.http_options.resources:
+            for name in res.names:
                 if name == "metrics":
                     resources[METRICS_PREFIX] = MetricsResource(RegistryProxy)
                 elif name == "client":
@@ -590,7 +596,7 @@ class GenericWorkerServer(HomeServer):
                             " repository is disabled. Ignoring."
                         )
 
-                if name == "openid" and "federation" not in res["names"]:
+                if name == "openid" and "federation" not in res.names:
                     # Only load the openid resource separately if federation resource
                     # is not specified since federation resource includes openid
                     # resource.
@@ -625,19 +631,19 @@ class GenericWorkerServer(HomeServer):
 
         logger.info("Synapse worker now listening on port %d", port)
 
-    def start_listening(self, listeners):
+    def start_listening(self, listeners: Iterable[ListenerConfig]):
         for listener in listeners:
-            if listener["type"] == "http":
+            if listener.type == "http":
                 self._listen_http(listener)
-            elif listener["type"] == "manhole":
+            elif listener.type == "manhole":
                 _base.listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     manhole(
                         username="matrix", password="rabbithole", globals={"hs": self}
                     ),
                 )
-            elif listener["type"] == "metrics":
+            elif listener.type == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warning(
                         (
@@ -646,9 +652,9 @@ class GenericWorkerServer(HomeServer):
                         )
                     )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
+                    _base.listen_metrics(listener.bind_addresses, listener.port)
             else:
-                logger.warning("Unrecognized listener type: %s", listener["type"])
+                logger.warning("Unsupported listener type: %s", listener.type)
 
         self.get_tcp_replication().start_replication(self)
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 93bc45208e..299134d00f 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -23,6 +23,7 @@ import math
 import os
 import resource
 import sys
+from typing import Iterable
 
 from prometheus_client import Gauge
 
@@ -48,6 +49,7 @@ from synapse.app import _base
 from synapse.app._base import listen_ssl, listen_tcp, quit_with_error
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
+from synapse.config.server import ListenerConfig
 from synapse.federation.transport.server import TransportLayerServer
 from synapse.http.additional_resource import AdditionalResource
 from synapse.http.server import (
@@ -87,24 +89,24 @@ def gz_wrap(r):
 class SynapseHomeServer(HomeServer):
     DATASTORE_CLASS = DataStore
 
-    def _listener_http(self, config, listener_config):
-        port = listener_config["port"]
-        bind_addresses = listener_config["bind_addresses"]
-        tls = listener_config.get("tls", False)
-        site_tag = listener_config.get("tag", port)
+    def _listener_http(self, config: HomeServerConfig, listener_config: ListenerConfig):
+        port = listener_config.port
+        bind_addresses = listener_config.bind_addresses
+        tls = listener_config.tls
+        site_tag = listener_config.http_options.tag
+        if site_tag is None:
+            site_tag = port
 
         resources = {}
-        for res in listener_config["resources"]:
-            for name in res["names"]:
-                if name == "openid" and "federation" in res["names"]:
+        for res in listener_config.http_options.resources:
+            for name in res.names:
+                if name == "openid" and "federation" in res.names:
                     # Skip loading openid resource if federation is defined
                     # since federation resource will include openid
                     continue
-                resources.update(
-                    self._configure_named_resource(name, res.get("compress", False))
-                )
+                resources.update(self._configure_named_resource(name, res.compress))
 
-        additional_resources = listener_config.get("additional_resources", {})
+        additional_resources = listener_config.http_options.additional_resources
         logger.debug("Configuring additional resources: %r", additional_resources)
         module_api = ModuleApi(self, self.get_auth_handler())
         for path, resmodule in additional_resources.items():
@@ -276,7 +278,7 @@ class SynapseHomeServer(HomeServer):
 
         return resources
 
-    def start_listening(self, listeners):
+    def start_listening(self, listeners: Iterable[ListenerConfig]):
         config = self.get_config()
 
         if config.redis_enabled:
@@ -286,25 +288,25 @@ class SynapseHomeServer(HomeServer):
             self.get_tcp_replication().start_replication(self)
 
         for listener in listeners:
-            if listener["type"] == "http":
+            if listener.type == "http":
                 self._listening_services.extend(self._listener_http(config, listener))
-            elif listener["type"] == "manhole":
+            elif listener.type == "manhole":
                 listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     manhole(
                         username="matrix", password="rabbithole", globals={"hs": self}
                     ),
                 )
-            elif listener["type"] == "replication":
+            elif listener.type == "replication":
                 services = listen_tcp(
-                    listener["bind_addresses"],
-                    listener["port"],
+                    listener.bind_addresses,
+                    listener.port,
                     ReplicationStreamProtocolFactory(self),
                 )
                 for s in services:
                     reactor.addSystemEventTrigger("before", "shutdown", s.stopListening)
-            elif listener["type"] == "metrics":
+            elif listener.type == "metrics":
                 if not self.get_config().enable_metrics:
                     logger.warning(
                         (
@@ -313,9 +315,11 @@ class SynapseHomeServer(HomeServer):
                         )
                     )
                 else:
-                    _base.listen_metrics(listener["bind_addresses"], listener["port"])
+                    _base.listen_metrics(listener.bind_addresses, listener.port)
             else:
-                logger.warning("Unrecognized listener type: %s", listener["type"])
+                # this shouldn't happen, as the listener type should have been checked
+                # during parsing
+                logger.warning("Unrecognized listener type: %s", listener.type)
 
 
 # Gauges to expose monthly active user control metrics
diff --git a/synapse/appservice/__init__.py b/synapse/appservice/__init__.py
index 1b13e84425..0323256472 100644
--- a/synapse/appservice/__init__.py
+++ b/synapse/appservice/__init__.py
@@ -15,8 +15,6 @@
 import logging
 import re
 
-from six import string_types
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
@@ -156,7 +154,7 @@ class ApplicationService(object):
                         )
 
                 regex = regex_obj.get("regex")
-                if isinstance(regex, string_types):
+                if isinstance(regex, str):
                     regex_obj["regex"] = re.compile(regex)  # Pre-compile regex
                 else:
                     raise ValueError("Expected string for 'regex' in ns '%s'" % ns)
diff --git a/synapse/appservice/api.py b/synapse/appservice/api.py
index 57174da021..da9a5e86d4 100644
--- a/synapse/appservice/api.py
+++ b/synapse/appservice/api.py
@@ -13,8 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-
-from six.moves import urllib
+import urllib
 
 from prometheus_client import Counter
 
diff --git a/synapse/config/_base.py b/synapse/config/_base.py
index 30d1050a91..1391e5fc43 100644
--- a/synapse/config/_base.py
+++ b/synapse/config/_base.py
@@ -22,8 +22,6 @@ from collections import OrderedDict
 from textwrap import dedent
 from typing import Any, MutableMapping, Optional
 
-from six import integer_types
-
 import yaml
 
 
@@ -117,7 +115,7 @@ class Config(object):
 
     @staticmethod
     def parse_size(value):
-        if isinstance(value, integer_types):
+        if isinstance(value, int):
             return value
         sizes = {"K": 1024, "M": 1024 * 1024}
         size = 1
@@ -129,7 +127,7 @@ class Config(object):
 
     @staticmethod
     def parse_duration(value):
-        if isinstance(value, integer_types):
+        if isinstance(value, int):
             return value
         second = 1000
         minute = 60 * second
diff --git a/synapse/config/appservice.py b/synapse/config/appservice.py
index ca43e96bd1..8ed3e24258 100644
--- a/synapse/config/appservice.py
+++ b/synapse/config/appservice.py
@@ -14,9 +14,7 @@
 
 import logging
 from typing import Dict
-
-from six import string_types
-from six.moves.urllib import parse as urlparse
+from urllib import parse as urlparse
 
 import yaml
 from netaddr import IPSet
@@ -98,17 +96,14 @@ def load_appservices(hostname, config_files):
 def _load_appservice(hostname, as_info, config_filename):
     required_string_fields = ["id", "as_token", "hs_token", "sender_localpart"]
     for field in required_string_fields:
-        if not isinstance(as_info.get(field), string_types):
+        if not isinstance(as_info.get(field), str):
             raise KeyError(
                 "Required string field: '%s' (%s)" % (field, config_filename)
             )
 
     # 'url' must either be a string or explicitly null, not missing
     # to avoid accidentally turning off push for ASes.
-    if (
-        not isinstance(as_info.get("url"), string_types)
-        and as_info.get("url", "") is not None
-    ):
+    if not isinstance(as_info.get("url"), str) and as_info.get("url", "") is not None:
         raise KeyError(
             "Required string field or explicit null: 'url' (%s)" % (config_filename,)
         )
@@ -138,7 +133,7 @@ def _load_appservice(hostname, as_info, config_filename):
                         ns,
                         regex_obj,
                     )
-                if not isinstance(regex_obj.get("regex"), string_types):
+                if not isinstance(regex_obj.get("regex"), str):
                     raise ValueError("Missing/bad type 'regex' key in %s", regex_obj)
                 if not isinstance(regex_obj.get("exclusive"), bool):
                     raise ValueError(
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc_config.py
index e24dd637bc..e0939bce84 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc_config.py
@@ -89,7 +89,7 @@ class OIDCConfig(Config):
         # use an OpenID Connect Provider for authentication, instead of its internal
         # password database.
         #
-        # See https://github.com/matrix-org/synapse/blob/master/openid.md.
+        # See https://github.com/matrix-org/synapse/blob/master/docs/openid.md.
         #
         oidc_config:
           # Uncomment the following to enable authorization against an OpenID Connect
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 73226e63d5..8204664883 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -19,7 +19,7 @@ import logging
 import os.path
 import re
 from textwrap import indent
-from typing import Dict, List, Optional
+from typing import Any, Dict, Iterable, List, Optional
 
 import attr
 import yaml
@@ -57,6 +57,64 @@ on how to configure the new listener.
 --------------------------------------------------------------------------------"""
 
 
+KNOWN_LISTENER_TYPES = {
+    "http",
+    "metrics",
+    "manhole",
+    "replication",
+}
+
+KNOWN_RESOURCES = {
+    "client",
+    "consent",
+    "federation",
+    "keys",
+    "media",
+    "metrics",
+    "openid",
+    "replication",
+    "static",
+    "webclient",
+}
+
+
+@attr.s(frozen=True)
+class HttpResourceConfig:
+    names = attr.ib(
+        type=List[str],
+        factory=list,
+        validator=attr.validators.deep_iterable(attr.validators.in_(KNOWN_RESOURCES)),  # type: ignore
+    )
+    compress = attr.ib(
+        type=bool,
+        default=False,
+        validator=attr.validators.optional(attr.validators.instance_of(bool)),  # type: ignore[arg-type]
+    )
+
+
+@attr.s(frozen=True)
+class HttpListenerConfig:
+    """Object describing the http-specific parts of the config of a listener"""
+
+    x_forwarded = attr.ib(type=bool, default=False)
+    resources = attr.ib(type=List[HttpResourceConfig], factory=list)
+    additional_resources = attr.ib(type=Dict[str, dict], factory=dict)
+    tag = attr.ib(type=str, default=None)
+
+
+@attr.s(frozen=True)
+class ListenerConfig:
+    """Object describing the configuration of a single listener."""
+
+    port = attr.ib(type=int, validator=attr.validators.instance_of(int))
+    bind_addresses = attr.ib(type=List[str])
+    type = attr.ib(type=str, validator=attr.validators.in_(KNOWN_LISTENER_TYPES))
+    tls = attr.ib(type=bool, default=False)
+
+    # http_options is only populated if type=http
+    http_options = attr.ib(type=Optional[HttpListenerConfig], default=None)
+
+
 class ServerConfig(Config):
     section = "server"
 
@@ -379,38 +437,21 @@ class ServerConfig(Config):
                 }
             ]
 
-        self.listeners = []  # type: List[dict]
-        for listener in config.get("listeners", []):
-            if not isinstance(listener.get("port", None), int):
-                raise ConfigError(
-                    "Listener configuration is lacking a valid 'port' option"
-                )
+        self.listeners = [parse_listener_def(x) for x in config.get("listeners", [])]
 
-            if listener.setdefault("tls", False):
-                # no_tls is not really supported any more, but let's grandfather it in
-                # here.
-                if config.get("no_tls", False):
+        # no_tls is not really supported any more, but let's grandfather it in
+        # here.
+        if config.get("no_tls", False):
+            l2 = []
+            for listener in self.listeners:
+                if listener.tls:
                     logger.info(
-                        "Ignoring TLS-enabled listener on port %i due to no_tls"
+                        "Ignoring TLS-enabled listener on port %i due to no_tls",
+                        listener.port,
                     )
-                    continue
-
-            bind_address = listener.pop("bind_address", None)
-            bind_addresses = listener.setdefault("bind_addresses", [])
-
-            # if bind_address was specified, add it to the list of addresses
-            if bind_address:
-                bind_addresses.append(bind_address)
-
-            # if we still have an empty list of addresses, use the default list
-            if not bind_addresses:
-                if listener["type"] == "metrics":
-                    # the metrics listener doesn't support IPv6
-                    bind_addresses.append("0.0.0.0")
                 else:
-                    bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
-
-            self.listeners.append(listener)
+                    l2.append(listener)
+            self.listeners = l2
 
         if not self.web_client_location:
             _warn_if_webclient_configured(self.listeners)
@@ -446,43 +487,41 @@ class ServerConfig(Config):
             bind_host = config.get("bind_host", "")
             gzip_responses = config.get("gzip_responses", True)
 
+            http_options = HttpListenerConfig(
+                resources=[
+                    HttpResourceConfig(names=["client"], compress=gzip_responses),
+                    HttpResourceConfig(names=["federation"]),
+                ],
+            )
+
             self.listeners.append(
-                {
-                    "port": bind_port,
-                    "bind_addresses": [bind_host],
-                    "tls": True,
-                    "type": "http",
-                    "resources": [
-                        {"names": ["client"], "compress": gzip_responses},
-                        {"names": ["federation"], "compress": False},
-                    ],
-                }
+                ListenerConfig(
+                    port=bind_port,
+                    bind_addresses=[bind_host],
+                    tls=True,
+                    type="http",
+                    http_options=http_options,
+                )
             )
 
             unsecure_port = config.get("unsecure_port", bind_port - 400)
             if unsecure_port:
                 self.listeners.append(
-                    {
-                        "port": unsecure_port,
-                        "bind_addresses": [bind_host],
-                        "tls": False,
-                        "type": "http",
-                        "resources": [
-                            {"names": ["client"], "compress": gzip_responses},
-                            {"names": ["federation"], "compress": False},
-                        ],
-                    }
+                    ListenerConfig(
+                        port=unsecure_port,
+                        bind_addresses=[bind_host],
+                        tls=False,
+                        type="http",
+                        http_options=http_options,
+                    )
                 )
 
         manhole = config.get("manhole")
         if manhole:
             self.listeners.append(
-                {
-                    "port": manhole,
-                    "bind_addresses": ["127.0.0.1"],
-                    "type": "manhole",
-                    "tls": False,
-                }
+                ListenerConfig(
+                    port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
+                )
             )
 
         metrics_port = config.get("metrics_port")
@@ -490,13 +529,14 @@ class ServerConfig(Config):
             logger.warning(METRICS_PORT_WARNING)
 
             self.listeners.append(
-                {
-                    "port": metrics_port,
-                    "bind_addresses": [config.get("metrics_bind_host", "127.0.0.1")],
-                    "tls": False,
-                    "type": "http",
-                    "resources": [{"names": ["metrics"], "compress": False}],
-                }
+                ListenerConfig(
+                    port=metrics_port,
+                    bind_addresses=[config.get("metrics_bind_host", "127.0.0.1")],
+                    type="http",
+                    http_options=HttpListenerConfig(
+                        resources=[HttpResourceConfig(names=["metrics"])]
+                    ),
+                )
             )
 
         _check_resource_config(self.listeners)
@@ -522,7 +562,7 @@ class ServerConfig(Config):
         )
 
     def has_tls_listener(self) -> bool:
-        return any(listener["tls"] for listener in self.listeners)
+        return any(listener.tls for listener in self.listeners)
 
     def generate_config_section(
         self, server_name, data_dir_path, open_private_ports, listeners, **kwargs
@@ -1081,6 +1121,44 @@ def read_gc_thresholds(thresholds):
         )
 
 
+def parse_listener_def(listener: Any) -> ListenerConfig:
+    """parse a listener config from the config file"""
+    listener_type = listener["type"]
+
+    port = listener.get("port")
+    if not isinstance(port, int):
+        raise ConfigError("Listener configuration is lacking a valid 'port' option")
+
+    tls = listener.get("tls", False)
+
+    bind_addresses = listener.get("bind_addresses", [])
+    bind_address = listener.get("bind_address")
+    # if bind_address was specified, add it to the list of addresses
+    if bind_address:
+        bind_addresses.append(bind_address)
+
+    # if we still have an empty list of addresses, use the default list
+    if not bind_addresses:
+        if listener_type == "metrics":
+            # the metrics listener doesn't support IPv6
+            bind_addresses.append("0.0.0.0")
+        else:
+            bind_addresses.extend(DEFAULT_BIND_ADDRESSES)
+
+    http_config = None
+    if listener_type == "http":
+        http_config = HttpListenerConfig(
+            x_forwarded=listener.get("x_forwarded", False),
+            resources=[
+                HttpResourceConfig(**res) for res in listener.get("resources", [])
+            ],
+            additional_resources=listener.get("additional_resources", {}),
+            tag=listener.get("tag"),
+        )
+
+    return ListenerConfig(port, bind_addresses, listener_type, tls, http_config)
+
+
 NO_MORE_WEB_CLIENT_WARNING = """
 Synapse no longer includes a web client. To enable a web client, configure
 web_client_location. To remove this warning, remove 'webclient' from the 'listeners'
@@ -1088,40 +1166,27 @@ configuration.
 """
 
 
-def _warn_if_webclient_configured(listeners):
+def _warn_if_webclient_configured(listeners: Iterable[ListenerConfig]) -> None:
     for listener in listeners:
-        for res in listener.get("resources", []):
-            for name in res.get("names", []):
+        if not listener.http_options:
+            continue
+        for res in listener.http_options.resources:
+            for name in res.names:
                 if name == "webclient":
                     logger.warning(NO_MORE_WEB_CLIENT_WARNING)
                     return
 
 
-KNOWN_RESOURCES = (
-    "client",
-    "consent",
-    "federation",
-    "keys",
-    "media",
-    "metrics",
-    "openid",
-    "replication",
-    "static",
-    "webclient",
-)
-
-
-def _check_resource_config(listeners):
+def _check_resource_config(listeners: Iterable[ListenerConfig]) -> None:
     resource_names = {
         res_name
         for listener in listeners
-        for res in listener.get("resources", [])
-        for res_name in res.get("names", [])
+        if listener.http_options
+        for res in listener.http_options.resources
+        for res_name in res.names
     }
 
     for resource in resource_names:
-        if resource not in KNOWN_RESOURCES:
-            raise ConfigError("Unknown listener resource '%s'" % (resource,))
         if resource == "consent":
             try:
                 check_requirements("resources.consent")
diff --git a/synapse/config/tls.py b/synapse/config/tls.py
index a65538562b..e368ea564d 100644
--- a/synapse/config/tls.py
+++ b/synapse/config/tls.py
@@ -20,8 +20,6 @@ from datetime import datetime
 from hashlib import sha256
 from typing import List
 
-import six
-
 from unpaddedbase64 import encode_base64
 
 from OpenSSL import SSL, crypto
@@ -59,7 +57,7 @@ class TlsConfig(Config):
             logger.warning(ACME_SUPPORT_ENABLED_WARN)
 
         # hyperlink complains on py2 if this is not a Unicode
-        self.acme_url = six.text_type(
+        self.acme_url = str(
             acme_config.get("url", "https://acme-v01.api.letsencrypt.org/directory")
         )
         self.acme_port = acme_config.get("port", 80)
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index ed06b91a54..dbc661630c 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -16,6 +16,7 @@
 import attr
 
 from ._base import Config, ConfigError
+from .server import ListenerConfig, parse_listener_def
 
 
 @attr.s
@@ -52,7 +53,9 @@ class WorkerConfig(Config):
         if self.worker_app == "synapse.app.homeserver":
             self.worker_app = None
 
-        self.worker_listeners = config.get("worker_listeners", [])
+        self.worker_listeners = [
+            parse_listener_def(x) for x in config.get("worker_listeners", [])
+        ]
         self.worker_daemonize = config.get("worker_daemonize")
         self.worker_pid_file = config.get("worker_pid_file")
         self.worker_log_config = config.get("worker_log_config")
@@ -75,24 +78,11 @@ class WorkerConfig(Config):
         manhole = config.get("worker_manhole")
         if manhole:
             self.worker_listeners.append(
-                {
-                    "port": manhole,
-                    "bind_addresses": ["127.0.0.1"],
-                    "type": "manhole",
-                    "tls": False,
-                }
+                ListenerConfig(
+                    port=manhole, bind_addresses=["127.0.0.1"], type="manhole",
+                )
             )
 
-        if self.worker_listeners:
-            for listener in self.worker_listeners:
-                bind_address = listener.pop("bind_address", None)
-                bind_addresses = listener.setdefault("bind_addresses", [])
-
-                if bind_address:
-                    bind_addresses.append(bind_address)
-                elif not bind_addresses:
-                    bind_addresses.append("")
-
         # A map from instance name to host/port of their HTTP replication endpoint.
         instance_map = config.get("instance_map") or {}
         self.instance_map = {
diff --git a/synapse/crypto/keyring.py b/synapse/crypto/keyring.py
index a9f4025bfe..dbfc3e8972 100644
--- a/synapse/crypto/keyring.py
+++ b/synapse/crypto/keyring.py
@@ -15,11 +15,9 @@
 # limitations under the License.
 
 import logging
+import urllib
 from collections import defaultdict
 
-import six
-from six.moves import urllib
-
 import attr
 from signedjson.key import (
     decode_verify_key_bytes,
@@ -661,7 +659,7 @@ class PerspectivesKeyFetcher(BaseV2KeyFetcher):
         for response in query_response["server_keys"]:
             # do this first, so that we can give useful errors thereafter
             server_name = response.get("server_name")
-            if not isinstance(server_name, six.string_types):
+            if not isinstance(server_name, str):
                 raise KeyLookupError(
                     "Malformed response from key notary server %s: invalid server_name"
                     % (perspective_name,)
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index dd340be9a7..f6b507977f 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -16,8 +16,6 @@ import collections
 import re
 from typing import Any, Mapping, Union
 
-from six import string_types
-
 from frozendict import frozendict
 
 from twisted.internet import defer
@@ -318,7 +316,7 @@ def serialize_event(
 
     if only_event_fields:
         if not isinstance(only_event_fields, list) or not all(
-            isinstance(f, string_types) for f in only_event_fields
+            isinstance(f, str) for f in only_event_fields
         ):
             raise TypeError("only_event_fields must be a list of strings")
         d = only_fields(d, only_event_fields)
diff --git a/synapse/events/validator.py b/synapse/events/validator.py
index b001c64bb4..588d222f36 100644
--- a/synapse/events/validator.py
+++ b/synapse/events/validator.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six import integer_types, string_types
-
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes, Membership
 from synapse.api.errors import Codes, SynapseError
 from synapse.api.room_versions import EventFormatVersions
@@ -53,7 +51,7 @@ class EventValidator(object):
         event_strings = ["origin"]
 
         for s in event_strings:
-            if not isinstance(getattr(event, s), string_types):
+            if not isinstance(getattr(event, s), str):
                 raise SynapseError(400, "'%s' not a string type" % (s,))
 
         # Depending on the room version, ensure the data is spec compliant JSON.
@@ -90,7 +88,7 @@ class EventValidator(object):
         max_lifetime = event.content.get("max_lifetime")
 
         if min_lifetime is not None:
-            if not isinstance(min_lifetime, integer_types):
+            if not isinstance(min_lifetime, int):
                 raise SynapseError(
                     code=400,
                     msg="'min_lifetime' must be an integer",
@@ -124,7 +122,7 @@ class EventValidator(object):
                 )
 
         if max_lifetime is not None:
-            if not isinstance(max_lifetime, integer_types):
+            if not isinstance(max_lifetime, int):
                 raise SynapseError(
                     code=400,
                     msg="'max_lifetime' must be an integer",
@@ -183,7 +181,7 @@ class EventValidator(object):
             strings.append("state_key")
 
         for s in strings:
-            if not isinstance(getattr(event, s), string_types):
+            if not isinstance(getattr(event, s), str):
                 raise SynapseError(400, "Not '%s' a string type" % (s,))
 
         RoomID.from_string(event.room_id)
@@ -223,7 +221,7 @@ class EventValidator(object):
         for s in keys:
             if s not in d:
                 raise SynapseError(400, "'%s' not in content" % (s,))
-            if not isinstance(d[s], string_types):
+            if not isinstance(d[s], str):
                 raise SynapseError(400, "'%s' not a string type" % (s,))
 
     def _ensure_state_event(self, event):
diff --git a/synapse/federation/federation_base.py b/synapse/federation/federation_base.py
index b2ab5bd6a4..420df2385f 100644
--- a/synapse/federation/federation_base.py
+++ b/synapse/federation/federation_base.py
@@ -17,8 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Iterable, List
 
-import six
-
 from twisted.internet import defer
 from twisted.internet.defer import Deferred, DeferredList
 from twisted.python.failure import Failure
@@ -294,7 +292,7 @@ def event_from_pdu_json(
     assert_params_in_dict(pdu_json, ("type", "depth"))
 
     depth = pdu_json["depth"]
-    if not isinstance(depth, six.integer_types):
+    if not isinstance(depth, int):
         raise SynapseError(400, "Depth %r not an intger" % (depth,), Codes.BAD_JSON)
 
     if depth < 0:
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 6920c23723..afe0a8238b 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Any, Callable, Dict, List, Match, Optional, Tuple, Union
 
-import six
-
 from canonicaljson import json
 from prometheus_client import Counter
 
@@ -751,7 +749,7 @@ def server_matches_acl_event(server_name: str, acl_event: EventBase) -> bool:
 
 
 def _acl_entry_matches(server_name: str, acl_entry: str) -> Match:
-    if not isinstance(acl_entry, six.string_types):
+    if not isinstance(acl_entry, str):
         logger.warning(
             "Ignoring non-str ACL entry '%s' (is %s)", acl_entry, type(acl_entry)
         )
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 060bf07197..9f99311419 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -15,10 +15,9 @@
 # limitations under the License.
 
 import logging
+import urllib
 from typing import Any, Dict, Optional
 
-from six.moves import urllib
-
 from twisted.internet import defer
 
 from synapse.api.constants import Membership
diff --git a/synapse/groups/groups_server.py b/synapse/groups/groups_server.py
index 8a9de913b3..8db8ab1b7b 100644
--- a/synapse/groups/groups_server.py
+++ b/synapse/groups/groups_server.py
@@ -17,8 +17,6 @@
 
 import logging
 
-from six import string_types
-
 from synapse.api.errors import Codes, SynapseError
 from synapse.types import GroupID, RoomID, UserID, get_domain_from_id
 from synapse.util.async_helpers import concurrently_execute
@@ -513,7 +511,7 @@ class GroupsServerHandler(GroupsServerWorkerHandler):
         for keyname in ("name", "avatar_url", "short_description", "long_description"):
             if keyname in content:
                 value = content[keyname]
-                if not isinstance(value, string_types):
+                if not isinstance(value, str):
                     raise SynapseError(400, "%r value is not a string" % (keyname,))
                 profile[keyname] = value
 
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 64aaa1335c..76f213723a 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -14,11 +14,10 @@
 # limitations under the License.
 
 import logging
+import urllib
 import xml.etree.ElementTree as ET
 from typing import Dict, Optional, Tuple
 
-from six.moves import urllib
-
 from twisted.web.client import PartialDownloadError
 
 from synapse.api.errors import Codes, LoginError
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 83f8fa1180..31346b56c3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -691,6 +691,7 @@ class DeviceListUpdater(object):
 
         return False
 
+    @trace
     @defer.inlineCallbacks
     def _maybe_retry_device_resync(self):
         """Retry to resync device lists that are out of sync, except if another retry is
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 05c4b3eec0..610b08d00b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,8 +18,6 @@ from typing import Any, Dict
 
 from canonicaljson import json
 
-from twisted.internet import defer
-
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
@@ -51,8 +49,7 @@ class DeviceMessageHandler(object):
 
         self._device_list_updater = hs.get_device_handler().device_list_updater
 
-    @defer.inlineCallbacks
-    def on_direct_to_device_edu(self, origin, content):
+    async def on_direct_to_device_edu(self, origin, content):
         local_messages = {}
         sender_user_id = content["sender"]
         if origin != get_domain_from_id(sender_user_id):
@@ -82,11 +79,11 @@ class DeviceMessageHandler(object):
             }
             local_messages[user_id] = messages_by_device
 
-            yield self._check_for_unknown_devices(
+            await self._check_for_unknown_devices(
                 message_type, sender_user_id, by_device
             )
 
-        stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
+        stream_id = await self.store.add_messages_from_remote_to_device_inbox(
             origin, message_id, local_messages
         )
 
@@ -94,14 +91,13 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
-    @defer.inlineCallbacks
-    def _check_for_unknown_devices(
+    async def _check_for_unknown_devices(
         self,
         message_type: str,
         sender_user_id: str,
         by_device: Dict[str, Dict[str, Any]],
     ):
-        """Checks inbound device messages for unkown remote devices, and if
+        """Checks inbound device messages for unknown remote devices, and if
         found marks the remote cache for the user as stale.
         """
 
@@ -115,7 +111,7 @@ class DeviceMessageHandler(object):
             requesting_device_ids.add(device_id)
 
         # Check if we are tracking the devices of the remote user.
-        room_ids = yield self.store.get_rooms_for_user(sender_user_id)
+        room_ids = await self.store.get_rooms_for_user(sender_user_id)
         if not room_ids:
             logger.info(
                 "Received device message from remote device we don't"
@@ -127,7 +123,7 @@ class DeviceMessageHandler(object):
 
         # If we are tracking check that we know about the sending
         # devices.
-        cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
+        cached_devices = await self.store.get_cached_devices_for_user(sender_user_id)
 
         unknown_devices = requesting_device_ids - set(cached_devices)
         if unknown_devices:
@@ -136,15 +132,14 @@ class DeviceMessageHandler(object):
                 sender_user_id,
                 unknown_devices,
             )
-            yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
+            await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
 
             # Immediately attempt a resync in the background
             run_in_background(
                 self._device_list_updater.user_device_resync, sender_user_id
             )
 
-    @defer.inlineCallbacks
-    def send_device_message(self, sender_user_id, message_type, messages):
+    async def send_device_message(self, sender_user_id, message_type, messages):
         set_tag("number_of_messages", len(messages))
         set_tag("sender", sender_user_id)
         local_messages = {}
@@ -183,7 +178,7 @@ class DeviceMessageHandler(object):
                 }
 
         log_kv({"local_messages": local_messages})
-        stream_id = yield self.store.add_messages_to_device_inbox(
+        stream_id = await self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
 
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 2efea801bc..f55470a707 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -349,6 +349,7 @@ class E2eRoomKeysHandler(object):
                     raise
 
             res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
+            res["etag"] = str(res["etag"])
             return res
 
     @trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d6038d9995..873f6bc39f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,10 +19,9 @@
 
 import itertools
 import logging
+from http import HTTPStatus
 from typing import Dict, Iterable, List, Optional, Sequence, Tuple
 
-from six.moves import http_client, zip
-
 import attr
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
@@ -1194,7 +1193,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.prev_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
 
         if len(ev.auth_event_ids()) > 10:
             logger.warning(
@@ -1202,7 +1201,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.auth_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
 
     async def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -1545,7 +1544,7 @@ class FederationHandler(BaseHandler):
 
         # block any attempts to invite the server notices mxid
         if event.state_key == self._server_notices_mxid:
-            raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+            raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
         # keep a record of the room version, if we don't yet know it.
         # (this may get overwritten if we later get a different room version in a
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 354da9a3b5..200127d291 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Optional, Tuple
 
-from six import string_types
-
 from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
@@ -715,7 +713,7 @@ class EventCreationHandler(object):
 
             spam_error = self.spam_checker.check_event_for_spam(event)
             if spam_error:
-                if not isinstance(spam_error, string_types):
+                if not isinstance(spam_error, str):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 7fbc229502..da06582d4b 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,7 +15,6 @@
 # limitations under the License.
 import logging
 
-from twisted.internet import defer
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
@@ -97,8 +96,7 @@ class PaginationHandler(object):
                     job["longest_max_lifetime"],
                 )
 
-    @defer.inlineCallbacks
-    def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+    async def purge_history_for_rooms_in_range(self, min_ms, max_ms):
         """Purge outdated events from rooms within the given retention range.
 
         If a default retention policy is defined in the server's configuration and its
@@ -137,7 +135,7 @@ class PaginationHandler(object):
             include_null,
         )
 
-        rooms = yield self.store.get_rooms_for_retention_period_in_range(
+        rooms = await self.store.get_rooms_for_retention_period_in_range(
             min_ms, max_ms, include_null
         )
 
@@ -165,9 +163,9 @@ class PaginationHandler(object):
             # Figure out what token we should start purging at.
             ts = self.clock.time_msec() - max_lifetime
 
-            stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+            stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
 
-            r = yield self.store.get_room_event_before_stream_ordering(
+            r = await self.store.get_room_event_before_stream_ordering(
                 room_id, stream_ordering,
             )
             if not r:
@@ -227,8 +225,7 @@ class PaginationHandler(object):
         )
         return purge_id
 
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token, delete_local_events):
+    async def _purge_history(self, purge_id, room_id, token, delete_local_events):
         """Carry out a history purge on a room.
 
         Args:
@@ -237,14 +234,11 @@ class PaginationHandler(object):
             token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
-
-        Returns:
-            Deferred
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.storage.purge_events.purge_history(
+            with await self.pagination_lock.write(room_id):
+                await self.storage.purge_events.purge_history(
                     room_id, token, delete_local_events
                 )
             logger.info("[purge] complete")
@@ -282,9 +276,7 @@ class PaginationHandler(object):
             await self.store.get_room_version_id(room_id)
 
             # first check that we have no users in this room
-            joined = await defer.maybeDeferred(
-                self.store.is_host_joined, room_id, self._server_name
-            )
+            joined = await self.store.is_host_joined(room_id, self._server_name)
 
             if joined:
                 raise SynapseError(400, "Users are still joined to this room")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 2e8914be14..d2f25ae12a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,7 +25,7 @@ The methods that define policy are:
 import abc
 import logging
 from contextlib import contextmanager
-from typing import Dict, Iterable, List, Set
+from typing import Dict, Iterable, List, Set, Tuple
 
 from prometheus_client import Counter
 from typing_extensions import ContextManager
@@ -773,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
 
         return False
 
-    async def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
         """
         Gets a list of presence update rows from between the given stream ids.
         Each row has:
@@ -785,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
         - last_user_sync_ts(int)
         - status_msg(int)
         - currently_active(int)
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
+
         # TODO(markjh): replicate the unpersisted changes.
         # This could use the in-memory stores for recent changes.
-        rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
+        rows = await self.store.get_all_presence_updates(
+            instance_name, last_id, current_id, limit
+        )
         return rows
 
     def notify_new_event(self):
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 302efc1b9a..4b1e3073a8 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import raise_from
-
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler):
                 )
                 return result
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
@@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
@@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f7401373ca..950a84acd0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,8 +24,6 @@ import string
 from collections import OrderedDict
 from typing import Tuple
 
-from six import string_types
-
 from synapse.api.constants import (
     EventTypes,
     JoinRules,
@@ -595,7 +593,7 @@ class RoomCreationHandler(BaseHandler):
             "room_version", self.config.default_room_version.identifier
         )
 
-        if not isinstance(room_version_id, string_types):
+        if not isinstance(room_version_id, str):
             raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
 
         room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f7af982f0..27c479da9e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,10 +17,9 @@
 
 import abc
 import logging
+from http import HTTPStatus
 from typing import Dict, Iterable, List, Optional, Tuple
 
-from six.moves import http_client
-
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -361,7 +360,7 @@ class RoomMemberHandler(object):
         if effective_membership_state == Membership.INVITE:
             # block any attempts to invite the server notices mxid
             if target.to_string() == self._server_notices_mxid:
-                raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+                raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
             block_invite = False
 
@@ -444,7 +443,7 @@ class RoomMemberHandler(object):
                 is_blocked = await self._is_server_notice_room(room_id)
                 if is_blocked:
                     raise SynapseError(
-                        http_client.FORBIDDEN,
+                        HTTPStatus.FORBIDDEN,
                         "You cannot reject this invite",
                         errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
                     )
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..4330abb9f7 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,7 +15,7 @@
 
 import logging
 from collections import namedtuple
-from typing import List
+from typing import List, Tuple
 
 from twisted.internet import defer
 
@@ -259,14 +259,31 @@ class TypingHandler(object):
         )
 
     async def get_all_typing_updates(
-        self, last_id: int, current_id: int, limit: int
-    ) -> List[dict]:
-        """Get up to `limit` typing updates between the given tokens, earliest
-        updates first.
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for typing replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
             last_id
@@ -280,9 +297,16 @@ class TypingHandler(object):
             serial = self._room_serials[room_id]
             if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
-                rows.append((serial, room_id, list(typing)))
+                rows.append((serial, [room_id, list(typing)]))
         rows.sort()
-        return rows[:limit]
+
+        limited = False
+        if len(rows) > limit:
+            rows = rows[:limit]
+            current_id = rows[-1][0]
+            limited = True
+
+        return rows, current_id, limited
 
     def get_current_token(self):
         return self._latest_room_serial
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 3cef747a4d..8743e9839d 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -15,11 +15,9 @@
 # limitations under the License.
 
 import logging
+import urllib
 from io import BytesIO
 
-from six import raise_from, text_type
-from six.moves import urllib
-
 import treq
 from canonicaljson import encode_canonical_json, json
 from netaddr import IPAddress
@@ -577,7 +575,7 @@ class SimpleHttpClient(object):
             # This can happen e.g. because the body is too large.
             raise
         except Exception as e:
-            raise_from(SynapseError(502, ("Failed to download remote body: %s" % e)), e)
+            raise SynapseError(502, ("Failed to download remote body: %s" % e)) from e
 
         return (
             length,
@@ -638,7 +636,7 @@ def encode_urlencode_args(args):
 
 
 def encode_urlencode_arg(arg):
-    if isinstance(arg, text_type):
+    if isinstance(arg, str):
         return arg.encode("utf-8")
     elif isinstance(arg, list):
         return [encode_urlencode_arg(i) for i in arg]
diff --git a/synapse/http/federation/matrix_federation_agent.py b/synapse/http/federation/matrix_federation_agent.py
index f5f917f5ae..c5fc746f2f 100644
--- a/synapse/http/federation/matrix_federation_agent.py
+++ b/synapse/http/federation/matrix_federation_agent.py
@@ -48,6 +48,9 @@ class MatrixFederationAgent(object):
         tls_client_options_factory (FederationPolicyForHTTPS|None):
             factory to use for fetching client tls options, or none to disable TLS.
 
+        user_agent (bytes):
+            The user agent header to use for federation requests.
+
         _srv_resolver (SrvResolver|None):
             SRVResolver impl to use for looking up SRV records. None to use a default
             implementation.
@@ -61,6 +64,7 @@ class MatrixFederationAgent(object):
         self,
         reactor,
         tls_client_options_factory,
+        user_agent,
         _srv_resolver=None,
         _well_known_resolver=None,
     ):
@@ -78,6 +82,7 @@ class MatrixFederationAgent(object):
             ),
             pool=self._pool,
         )
+        self.user_agent = user_agent
 
         if _well_known_resolver is None:
             _well_known_resolver = WellKnownResolver(
@@ -87,6 +92,7 @@ class MatrixFederationAgent(object):
                     pool=self._pool,
                     contextFactory=tls_client_options_factory,
                 ),
+                user_agent=self.user_agent,
             )
 
         self._well_known_resolver = _well_known_resolver
@@ -149,7 +155,7 @@ class MatrixFederationAgent(object):
             parsed_uri = urllib.parse.urlparse(uri)
 
         # We need to make sure the host header is set to the netloc of the
-        # server.
+        # server and that a user-agent is provided.
         if headers is None:
             headers = Headers()
         else:
@@ -157,6 +163,8 @@ class MatrixFederationAgent(object):
 
         if not headers.hasHeader(b"host"):
             headers.addRawHeader(b"host", parsed_uri.netloc)
+        if not headers.hasHeader(b"user-agent"):
+            headers.addRawHeader(b"user-agent", self.user_agent)
 
         res = yield make_deferred_yieldable(
             self._agent.request(method, uri, headers, bodyProducer)
diff --git a/synapse/http/federation/well_known_resolver.py b/synapse/http/federation/well_known_resolver.py
index 7ddfad286d..89a3b041ce 100644
--- a/synapse/http/federation/well_known_resolver.py
+++ b/synapse/http/federation/well_known_resolver.py
@@ -23,6 +23,7 @@ import attr
 from twisted.internet import defer
 from twisted.web.client import RedirectAgent, readBody
 from twisted.web.http import stringToDatetime
+from twisted.web.http_headers import Headers
 
 from synapse.logging.context import make_deferred_yieldable
 from synapse.util import Clock
@@ -78,7 +79,12 @@ class WellKnownResolver(object):
     """
 
     def __init__(
-        self, reactor, agent, well_known_cache=None, had_well_known_cache=None
+        self,
+        reactor,
+        agent,
+        user_agent,
+        well_known_cache=None,
+        had_well_known_cache=None,
     ):
         self._reactor = reactor
         self._clock = Clock(reactor)
@@ -92,6 +98,7 @@ class WellKnownResolver(object):
         self._well_known_cache = well_known_cache
         self._had_valid_well_known_cache = had_well_known_cache
         self._well_known_agent = RedirectAgent(agent)
+        self.user_agent = user_agent
 
     @defer.inlineCallbacks
     def get_well_known(self, server_name):
@@ -227,6 +234,10 @@ class WellKnownResolver(object):
         uri = b"https://%s/.well-known/matrix/server" % (server_name,)
         uri_str = uri.decode("ascii")
 
+        headers = {
+            b"User-Agent": [self.user_agent],
+        }
+
         i = 0
         while True:
             i += 1
@@ -234,7 +245,9 @@ class WellKnownResolver(object):
             logger.info("Fetching %s", uri_str)
             try:
                 response = yield make_deferred_yieldable(
-                    self._well_known_agent.request(b"GET", uri)
+                    self._well_known_agent.request(
+                        b"GET", uri, headers=Headers(headers)
+                    )
                 )
                 body = yield make_deferred_yieldable(readBody(response))
 
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 2d47b9ea00..18f6a8fd29 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -17,11 +17,9 @@ import cgi
 import logging
 import random
 import sys
+import urllib
 from io import BytesIO
 
-from six import raise_from, string_types
-from six.moves import urllib
-
 import attr
 import treq
 from canonicaljson import encode_canonical_json
@@ -199,7 +197,14 @@ class MatrixFederationHttpClient(object):
 
         self.reactor = Reactor()
 
-        self.agent = MatrixFederationAgent(self.reactor, tls_client_options_factory)
+        user_agent = hs.version_string
+        if hs.config.user_agent_suffix:
+            user_agent = "%s %s" % (user_agent, hs.config.user_agent_suffix)
+        user_agent = user_agent.encode("ascii")
+
+        self.agent = MatrixFederationAgent(
+            self.reactor, tls_client_options_factory, user_agent
+        )
 
         # Use a BlacklistingAgentWrapper to prevent circumventing the IP
         # blacklist via IP literals in server names
@@ -432,10 +437,10 @@ class MatrixFederationHttpClient(object):
                     except TimeoutError as e:
                         raise RequestSendFailed(e, can_retry=True) from e
                     except DNSLookupError as e:
-                        raise_from(RequestSendFailed(e, can_retry=retry_on_dns_fail), e)
+                        raise RequestSendFailed(e, can_retry=retry_on_dns_fail) from e
                     except Exception as e:
                         logger.info("Failed to send request: %s", e)
-                        raise_from(RequestSendFailed(e, can_retry=True), e)
+                        raise RequestSendFailed(e, can_retry=True) from e
 
                     incoming_responses_counter.labels(
                         request.method, response.code
@@ -487,7 +492,7 @@ class MatrixFederationHttpClient(object):
                         # Retry if the error is a 429 (Too Many Requests),
                         # otherwise just raise a standard HttpResponseException
                         if response.code == 429:
-                            raise_from(RequestSendFailed(e, can_retry=True), e)
+                            raise RequestSendFailed(e, can_retry=True) from e
                         else:
                             raise e
 
@@ -998,7 +1003,7 @@ def encode_query_args(args):
 
     encoded_args = {}
     for k, vs in args.items():
-        if isinstance(vs, string_types):
+        if isinstance(vs, str):
             vs = [vs]
         encoded_args[k] = [v.encode("UTF-8") for v in vs]
 
diff --git a/synapse/http/server.py b/synapse/http/server.py
index 2487a72171..6aa1dc1f92 100644
--- a/synapse/http/server.py
+++ b/synapse/http/server.py
@@ -16,10 +16,10 @@
 
 import collections
 import html
-import http.client
 import logging
 import types
 import urllib
+from http import HTTPStatus
 from io import BytesIO
 from typing import Awaitable, Callable, TypeVar, Union
 
@@ -188,7 +188,7 @@ def return_html_error(
                 exc_info=(f.type, f.value, f.getTracebackObject()),
             )
     else:
-        code = http.HTTPStatus.INTERNAL_SERVER_ERROR
+        code = HTTPStatus.INTERNAL_SERVER_ERROR
         msg = "Internal server error"
 
         logger.error(
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 167293c46d..cbc37eac6e 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -19,6 +19,7 @@ from typing import Optional
 from twisted.python.failure import Failure
 from twisted.web.server import Request, Site
 
+from synapse.config.server import ListenerConfig
 from synapse.http import redact_uri
 from synapse.http.request_metrics import RequestMetrics, requests_counter
 from synapse.logging.context import LoggingContext, PreserveLoggingContext
@@ -350,7 +351,7 @@ class SynapseSite(Site):
         self,
         logger_name,
         site_tag,
-        config,
+        config: ListenerConfig,
         resource,
         server_version_string,
         *args,
@@ -360,7 +361,8 @@ class SynapseSite(Site):
 
         self.site_tag = site_tag
 
-        proxied = config.get("x_forwarded", False)
+        assert config.http_options is not None
+        proxied = config.http_options.x_forwarded
         self.requestFactory = XForwardedForRequest if proxied else SynapseRequest
         self.access_logger = logging.getLogger(logger_name)
         self.server_version_string = server_version_string.encode("ascii")
diff --git a/synapse/logging/formatter.py b/synapse/logging/formatter.py
index fbf570c756..d736ad5b9b 100644
--- a/synapse/logging/formatter.py
+++ b/synapse/logging/formatter.py
@@ -16,8 +16,7 @@
 
 import logging
 import traceback
-
-from six import StringIO
+from io import StringIO
 
 
 class LogFormatter(logging.Formatter):
diff --git a/synapse/module_api/__init__.py b/synapse/module_api/__init__.py
index ecdf1ad69f..a7849cefa5 100644
--- a/synapse/module_api/__init__.py
+++ b/synapse/module_api/__init__.py
@@ -126,7 +126,7 @@ class ModuleApi(object):
                 'errcode' property for more information on the reason for failure
 
         Returns:
-            Deferred[str]: user_id
+            defer.Deferred[str]: user_id
         """
         return defer.ensureDeferred(
             self._hs.get_registration_handler().register_user(
@@ -149,10 +149,12 @@ class ModuleApi(object):
         Returns:
             defer.Deferred[tuple[str, str]]: Tuple of device ID and access token
         """
-        return self._hs.get_registration_handler().register_device(
-            user_id=user_id,
-            device_id=device_id,
-            initial_display_name=initial_display_name,
+        return defer.ensureDeferred(
+            self._hs.get_registration_handler().register_device(
+                user_id=user_id,
+                device_id=device_id,
+                initial_display_name=initial_display_name,
+            )
         )
 
     def record_user_external_id(
diff --git a/synapse/push/mailer.py b/synapse/push/mailer.py
index d57a66a697..dda560b2c2 100644
--- a/synapse/push/mailer.py
+++ b/synapse/push/mailer.py
@@ -17,12 +17,11 @@ import email.mime.multipart
 import email.utils
 import logging
 import time
+import urllib
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
 from typing import Iterable, List, TypeVar
 
-from six.moves import urllib
-
 import bleach
 import jinja2
 
diff --git a/synapse/push/push_rule_evaluator.py b/synapse/push/push_rule_evaluator.py
index 11032491af..8e0d3a416d 100644
--- a/synapse/push/push_rule_evaluator.py
+++ b/synapse/push/push_rule_evaluator.py
@@ -18,8 +18,6 @@ import logging
 import re
 from typing import Pattern
 
-from six import string_types
-
 from synapse.events import EventBase
 from synapse.types import UserID
 from synapse.util.caches import register_cache
@@ -131,7 +129,7 @@ class PushRuleEvaluatorForEvent(object):
         # XXX: optimisation: cache our pattern regexps
         if condition["key"] == "content.body":
             body = self._event.content.get("body", None)
-            if not body:
+            if not body or not isinstance(body, str):
                 return False
 
             return _glob_matches(pattern, body, word_boundary=True)
@@ -147,7 +145,7 @@ class PushRuleEvaluatorForEvent(object):
             return False
 
         body = self._event.content.get("body", None)
-        if not body:
+        if not body or not isinstance(body, str):
             return False
 
         # Similar to _glob_matches, but do not treat display_name as a glob.
@@ -244,7 +242,7 @@ def _flatten_dict(d, prefix=[], result=None):
     if result is None:
         result = {}
     for key, value in d.items():
-        if isinstance(value, string_types):
+        if isinstance(value, str):
             result[".".join(prefix + [key])] = value.lower()
         elif hasattr(value, "items"):
             _flatten_dict(value, prefix=(prefix + [key]), result=result)
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 88d203aa44..f6a5458681 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -215,11 +215,9 @@ class PusherPool:
         try:
             # Need to subtract 1 from the minimum because the lower bound here
             # is not inclusive
-            updated_receipts = yield self.store.get_all_updated_receipts(
+            users_affected = yield self.store.get_users_sent_receipts_between(
                 min_stream_id - 1, max_stream_id
             )
-            # This returns a tuple, user_id is at index 3
-            users_affected = {r[3] for r in updated_receipts}
 
             for u in users_affected:
                 if u in self.pushers:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 8b4312e5a3..d655aba35c 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -66,11 +66,9 @@ REQUIREMENTS = [
     "pymacaroons>=0.13.0",
     "msgpack>=0.5.2",
     "phonenumbers>=8.2.0",
-    "six>=1.10",
     "prometheus_client>=0.0.18,<0.8.0",
-    # we use attr.s(slots), which arrived in 16.0.0
-    # Twisted 18.7.0 requires attrs>=17.4.0
-    "attrs>=17.4.0",
+    # we use attr.validators.deep_iterable, which arrived in 19.1.0
+    "attrs>=19.1.0",
     "netaddr>=0.7.18",
     "Jinja2>=2.9",
     "bleach>=1.4.3",
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index 793cef6c26..9caf1e80c1 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -16,12 +16,10 @@
 import abc
 import logging
 import re
+import urllib
 from inspect import signature
 from typing import Dict, List, Tuple
 
-from six import raise_from
-from six.moves import urllib
-
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -220,7 +218,7 @@ class ReplicationEndpoint(object):
                 # importantly, not stack traces everywhere)
                 raise e.to_synapse_error()
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to talk to master"), e)
+                raise SynapseError(502, "Failed to talk to master") from e
 
             return result
 
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 4acefc8a96..f196eff072 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -264,7 +264,7 @@ class BackfillStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_current_backfill_token),
-            db_query_to_update_function(store.get_all_new_backfill_event_rows),
+            store.get_all_new_backfill_event_rows,
         )
 
 
@@ -291,9 +291,7 @@ class PresenceStream(Stream):
         if hs.config.worker_app is None:
             # on the master, query the presence handler
             presence_handler = hs.get_presence_handler()
-            update_function = db_query_to_update_function(
-                presence_handler.get_all_presence_updates
-            )
+            update_function = presence_handler.get_all_presence_updates
         else:
             # Query master process
             update_function = make_http_update_function(hs, self.NAME)
@@ -318,9 +316,7 @@ class TypingStream(Stream):
 
         if hs.config.worker_app is None:
             # on the master, query the typing handler
-            update_function = db_query_to_update_function(
-                typing_handler.get_all_typing_updates
-            )
+            update_function = typing_handler.get_all_typing_updates
         else:
             # Query master process
             update_function = make_http_update_function(hs, self.NAME)
@@ -352,7 +348,7 @@ class ReceiptsStream(Stream):
         super().__init__(
             hs.get_instance_name(),
             current_token_without_instance(store.get_max_receipt_stream_id),
-            db_query_to_update_function(store.get_all_updated_receipts),
+            store.get_all_updated_receipts,
         )
 
 
@@ -367,26 +363,17 @@ class PushRulesStream(Stream):
 
     def __init__(self, hs):
         self.store = hs.get_datastore()
+
         super(PushRulesStream, self).__init__(
-            hs.get_instance_name(), self._current_token, self._update_function
+            hs.get_instance_name(),
+            self._current_token,
+            self.store.get_all_push_rule_updates,
         )
 
     def _current_token(self, instance_name: str) -> int:
         push_rules_token, _ = self.store.get_push_rules_stream_token()
         return push_rules_token
 
-    async def _update_function(
-        self, instance_name: str, from_token: Token, to_token: Token, limit: int
-    ):
-        rows = await self.store.get_all_push_rule_updates(from_token, to_token, limit)
-
-        limited = False
-        if len(rows) == limit:
-            to_token = rows[-1][0]
-            limited = True
-
-        return [(row[0], (row[2],)) for row in rows], to_token, limited
-
 
 class PushersStream(Stream):
     """A user has added/changed/removed a pusher
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index fefc8f71fa..e4330c39d6 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -16,9 +16,7 @@ import hashlib
 import hmac
 import logging
 import re
-
-from six import text_type
-from six.moves import http_client
+from http import HTTPStatus
 
 from synapse.api.constants import UserTypes
 from synapse.api.errors import Codes, NotFoundError, SynapseError
@@ -215,10 +213,7 @@ class UserRestServletV2(RestServlet):
                     await self.store.set_server_admin(target_user, set_admin_to)
 
             if "password" in body:
-                if (
-                    not isinstance(body["password"], text_type)
-                    or len(body["password"]) > 512
-                ):
+                if not isinstance(body["password"], str) or len(body["password"]) > 512:
                     raise SynapseError(400, "Invalid password")
                 else:
                     new_password = body["password"]
@@ -252,7 +247,7 @@ class UserRestServletV2(RestServlet):
             password = body.get("password")
             password_hash = None
             if password is not None:
-                if not isinstance(password, text_type) or len(password) > 512:
+                if not isinstance(password, str) or len(password) > 512:
                     raise SynapseError(400, "Invalid password")
                 password_hash = await self.auth_handler.hash(password)
 
@@ -370,10 +365,7 @@ class UserRegisterServlet(RestServlet):
                 400, "username must be specified", errcode=Codes.BAD_JSON
             )
         else:
-            if (
-                not isinstance(body["username"], text_type)
-                or len(body["username"]) > 512
-            ):
+            if not isinstance(body["username"], str) or len(body["username"]) > 512:
                 raise SynapseError(400, "Invalid username")
 
             username = body["username"].encode("utf-8")
@@ -386,7 +378,7 @@ class UserRegisterServlet(RestServlet):
             )
         else:
             password = body["password"]
-            if not isinstance(password, text_type) or len(password) > 512:
+            if not isinstance(password, str) or len(password) > 512:
                 raise SynapseError(400, "Invalid password")
 
             password_bytes = password.encode("utf-8")
@@ -477,7 +469,7 @@ class DeactivateAccountRestServlet(RestServlet):
         erase = body.get("erase", False)
         if not isinstance(erase, bool):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'erase' must be a boolean, if given",
                 Codes.BAD_JSON,
             )
diff --git a/synapse/rest/client/v1/login.py b/synapse/rest/client/v1/login.py
index dceb2792fa..c2c9a9c3aa 100644
--- a/synapse/rest/client/v1/login.py
+++ b/synapse/rest/client/v1/login.py
@@ -60,10 +60,18 @@ def login_id_thirdparty_from_phone(identifier):
 
     Returns: Login identifier dict of type 'm.id.threepid'
     """
-    if "country" not in identifier or "number" not in identifier:
+    if "country" not in identifier or (
+        # The specification requires a "phone" field, while Synapse used to require a "number"
+        # field. Accept both for backwards compatibility.
+        "phone" not in identifier
+        and "number" not in identifier
+    ):
         raise SynapseError(400, "Invalid phone-type identifier")
 
-    msisdn = phone_number_to_msisdn(identifier["country"], identifier["number"])
+    # Accept both "phone" and "number" as valid keys in m.id.phone
+    phone_number = identifier.get("phone", identifier["number"])
+
+    msisdn = phone_number_to_msisdn(identifier["country"], phone_number)
 
     return {"type": "m.id.thirdparty", "medium": "msisdn", "address": msisdn}
 
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index 7cf007d35e..970fdd5834 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -17,8 +17,6 @@
 """
 import logging
 
-from six import string_types
-
 from synapse.api.errors import AuthError, SynapseError
 from synapse.handlers.presence import format_user_presence_state
 from synapse.http.servlet import RestServlet, parse_json_object_from_request
@@ -73,7 +71,7 @@ class PresenceStatusRestServlet(RestServlet):
 
             if "status_msg" in content:
                 state["status_msg"] = content.pop("status_msg")
-                if not isinstance(state["status_msg"], string_types):
+                if not isinstance(state["status_msg"], str):
                     raise SynapseError(400, "status_msg must be a string.")
 
             if content:
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 105e0cf4d2..46811abbfa 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -18,8 +18,7 @@
 import logging
 import re
 from typing import List, Optional
-
-from six.moves.urllib import parse as urlparse
+from urllib import parse as urlparse
 
 from canonicaljson import json
 
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index d4f721b6b9..923bcb9f85 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -15,8 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-
-from six.moves import http_client
+from http import HTTPStatus
 
 from synapse.api.constants import LoginType
 from synapse.api.errors import Codes, SynapseError, ThreepidValidationError
@@ -321,7 +320,7 @@ class DeactivateAccountRestServlet(RestServlet):
         erase = body.get("erase", False)
         if not isinstance(erase, bool):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'erase' must be a boolean, if given",
                 Codes.BAD_JSON,
             )
@@ -682,7 +681,7 @@ class ThreepidRestServlet(RestServlet):
 
 
 class ThreepidAddRestServlet(RestServlet):
-    PATTERNS = client_patterns("/account/3pid/add$", releases=(), unstable=True)
+    PATTERNS = client_patterns("/account/3pid/add$")
 
     def __init__(self, hs):
         super(ThreepidAddRestServlet, self).__init__()
@@ -733,7 +732,7 @@ class ThreepidAddRestServlet(RestServlet):
 
 
 class ThreepidBindRestServlet(RestServlet):
-    PATTERNS = client_patterns("/account/3pid/bind$", releases=(), unstable=True)
+    PATTERNS = client_patterns("/account/3pid/bind$")
 
     def __init__(self, hs):
         super(ThreepidBindRestServlet, self).__init__()
@@ -762,7 +761,7 @@ class ThreepidBindRestServlet(RestServlet):
 
 
 class ThreepidUnbindRestServlet(RestServlet):
-    PATTERNS = client_patterns("/account/3pid/unbind$", releases=(), unstable=True)
+    PATTERNS = client_patterns("/account/3pid/unbind$")
 
     def __init__(self, hs):
         super(ThreepidUnbindRestServlet, self).__init__()
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index b9ffe86b2a..141a3f5fac 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -18,8 +18,6 @@ import hmac
 import logging
 from typing import List, Union
 
-from six import string_types
-
 import synapse
 import synapse.api.auth
 import synapse.types
@@ -413,7 +411,7 @@ class RegisterRestServlet(RestServlet):
         # in sessions. Pull out the username/password provided to us.
         if "password" in body:
             password = body.pop("password")
-            if not isinstance(password, string_types) or len(password) > 512:
+            if not isinstance(password, str) or len(password) > 512:
                 raise SynapseError(400, "Invalid password")
             self.password_policy_handler.validate_password(password)
 
@@ -425,10 +423,7 @@ class RegisterRestServlet(RestServlet):
 
         desired_username = None
         if "username" in body:
-            if (
-                not isinstance(body["username"], string_types)
-                or len(body["username"]) > 512
-            ):
+            if not isinstance(body["username"], str) or len(body["username"]) > 512:
                 raise SynapseError(400, "Invalid username")
             desired_username = body["username"]
 
@@ -453,7 +448,7 @@ class RegisterRestServlet(RestServlet):
 
             access_token = self.auth.get_access_token_from_request(request)
 
-            if isinstance(desired_username, string_types):
+            if isinstance(desired_username, str):
                 result = await self._do_appservice_registration(
                     desired_username, access_token, body
                 )
diff --git a/synapse/rest/client/v2_alpha/report_event.py b/synapse/rest/client/v2_alpha/report_event.py
index f067b5edac..e15927c4ea 100644
--- a/synapse/rest/client/v2_alpha/report_event.py
+++ b/synapse/rest/client/v2_alpha/report_event.py
@@ -14,9 +14,7 @@
 # limitations under the License.
 
 import logging
-
-from six import string_types
-from six.moves import http_client
+from http import HTTPStatus
 
 from synapse.api.errors import Codes, SynapseError
 from synapse.http.servlet import (
@@ -47,15 +45,15 @@ class ReportEventRestServlet(RestServlet):
         body = parse_json_object_from_request(request)
         assert_params_in_dict(body, ("reason", "score"))
 
-        if not isinstance(body["reason"], string_types):
+        if not isinstance(body["reason"], str):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'reason' must be a string",
                 Codes.BAD_JSON,
             )
         if not isinstance(body["score"], int):
             raise SynapseError(
-                http_client.BAD_REQUEST,
+                HTTPStatus.BAD_REQUEST,
                 "Param 'score' must be an integer",
                 Codes.BAD_JSON,
             )
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index 1ddf9997ff..049c16b236 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -16,10 +16,9 @@
 import hmac
 import logging
 from hashlib import sha256
+from http import HTTPStatus
 from os import path
 
-from six.moves import http_client
-
 import jinja2
 from jinja2 import TemplateNotFound
 
@@ -223,4 +222,4 @@ class ConsentResource(DirectServeResource):
         )
 
         if not compare_digest(want_mac, userhmac):
-            raise SynapseError(http_client.FORBIDDEN, "HMAC incorrect")
+            raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect")
diff --git a/synapse/rest/media/v1/_base.py b/synapse/rest/media/v1/_base.py
index 3689777266..595849f9d5 100644
--- a/synapse/rest/media/v1/_base.py
+++ b/synapse/rest/media/v1/_base.py
@@ -16,8 +16,7 @@
 
 import logging
 import os
-
-from six.moves import urllib
+import urllib
 
 from twisted.internet import defer
 from twisted.protocols.basic import FileSender
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 683a79c966..79cb0dddbe 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -17,9 +17,6 @@ import contextlib
 import logging
 import os
 import shutil
-import sys
-
-import six
 
 from twisted.internet import defer
 from twisted.protocols.basic import FileSender
@@ -117,12 +114,11 @@ class MediaStorage(object):
             with open(fname, "wb") as f:
                 yield f, fname, finish
         except Exception:
-            t, v, tb = sys.exc_info()
             try:
                 os.remove(fname)
             except Exception:
                 pass
-            six.reraise(t, v, tb)
+            raise
 
         if not finished_called:
             raise Exception("Finished callback not called")
diff --git a/synapse/rest/media/v1/preview_url_resource.py b/synapse/rest/media/v1/preview_url_resource.py
index f206605727..f67e0fb3ec 100644
--- a/synapse/rest/media/v1/preview_url_resource.py
+++ b/synapse/rest/media/v1/preview_url_resource.py
@@ -24,10 +24,7 @@ import shutil
 import sys
 import traceback
 from typing import Dict, Optional
-
-import six
-from six import string_types
-from six.moves import urllib_parse as urlparse
+from urllib import parse as urlparse
 
 from canonicaljson import json
 
@@ -188,7 +185,7 @@ class PreviewUrlResource(DirectServeResource):
             # It may be stored as text in the database, not as bytes (such as
             # PostgreSQL). If so, encode it back before handing it on.
             og = cache_result["og"]
-            if isinstance(og, six.text_type):
+            if isinstance(og, str):
                 og = og.encode("utf8")
             return og
 
@@ -631,7 +628,7 @@ def _iterate_over_text(tree, *tags_to_ignore):
         if el is None:
             return
 
-        if isinstance(el, string_types):
+        if isinstance(el, str):
             yield el
         elif el.tag not in tags_to_ignore:
             # el.text is the text before the first child, so we can immediately
diff --git a/synapse/server_notices/consent_server_notices.py b/synapse/server_notices/consent_server_notices.py
index e7e8b8e688..3bfc8d7278 100644
--- a/synapse/server_notices/consent_server_notices.py
+++ b/synapse/server_notices/consent_server_notices.py
@@ -14,8 +14,6 @@
 # limitations under the License.
 import logging
 
-from six import string_types
-
 from synapse.api.errors import SynapseError
 from synapse.api.urls import ConsentURIBuilder
 from synapse.config import ConfigError
@@ -118,7 +116,7 @@ def copy_with_str_subst(x, substitutions):
     Returns:
         copy of x
     """
-    if isinstance(x, string_types):
+    if isinstance(x, str):
         return x % substitutions
     if isinstance(x, dict):
         return {k: copy_with_str_subst(v, substitutions) for (k, v) in x.items()}
diff --git a/synapse/storage/data_stores/main/event_federation.py b/synapse/storage/data_stores/main/event_federation.py
index 24ce8c4330..a6bb3221ff 100644
--- a/synapse/storage/data_stores/main/event_federation.py
+++ b/synapse/storage/data_stores/main/event_federation.py
@@ -14,10 +14,9 @@
 # limitations under the License.
 import itertools
 import logging
+from queue import Empty, PriorityQueue
 from typing import Dict, List, Optional, Set, Tuple
 
-from six.moves.queue import Empty, PriorityQueue
-
 from twisted.internet import defer
 
 from synapse.api.errors import StoreError
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 8a13101f1d..cfd24d2f06 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -21,9 +21,6 @@ from collections import OrderedDict, namedtuple
 from functools import wraps
 from typing import TYPE_CHECKING, Dict, Iterable, List, Tuple
 
-from six import integer_types, text_type
-from six.moves import range
-
 import attr
 from canonicaljson import json
 from prometheus_client import Counter
@@ -893,8 +890,7 @@ class PersistEventsStore:
                     "received_ts": self._clock.time_msec(),
                     "sender": event.sender,
                     "contains_url": (
-                        "url" in event.content
-                        and isinstance(event.content["url"], text_type)
+                        "url" in event.content and isinstance(event.content["url"], str)
                     ),
                 }
                 for event, _ in events_and_contexts
@@ -1345,10 +1341,10 @@ class PersistEventsStore:
         ):
             if (
                 "min_lifetime" in event.content
-                and not isinstance(event.content.get("min_lifetime"), integer_types)
+                and not isinstance(event.content.get("min_lifetime"), int)
             ) or (
                 "max_lifetime" in event.content
-                and not isinstance(event.content.get("max_lifetime"), integer_types)
+                and not isinstance(event.content.get("max_lifetime"), int)
             ):
                 # Ignore the event if one of the value isn't an integer.
                 return
diff --git a/synapse/storage/data_stores/main/events_bg_updates.py b/synapse/storage/data_stores/main/events_bg_updates.py
index f54c8b1ee0..62d28f44dc 100644
--- a/synapse/storage/data_stores/main/events_bg_updates.py
+++ b/synapse/storage/data_stores/main/events_bg_updates.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import text_type
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -133,7 +131,7 @@ class EventsBackgroundUpdatesStore(SQLBaseStore):
 
                     contains_url = "url" in content
                     if contains_url:
-                        contains_url &= isinstance(content["url"], text_type)
+                        contains_url &= isinstance(content["url"], str)
                 except (KeyError, AttributeError):
                     # If the event is missing a necessary field then
                     # skip over it.
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 213d69100a..a48c7a96ca 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -1077,9 +1077,32 @@ class EventsWorkerStore(SQLBaseStore):
             "get_ex_outlier_stream_rows", get_ex_outlier_stream_rows_txn
         )
 
-    def get_all_new_backfill_event_rows(self, last_id, current_id, limit):
+    async def get_all_new_backfill_event_rows(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for backfill replication stream, including all new
+        backfilled events and events that have gone from being outliers to not.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_new_backfill_event_rows(txn):
             sql = (
@@ -1094,10 +1117,12 @@ class EventsWorkerStore(SQLBaseStore):
                 " LIMIT ?"
             )
             txn.execute(sql, (-last_id, -current_id, limit))
-            new_event_updates = txn.fetchall()
+            new_event_updates = [(row[0], row[1:]) for row in txn]
 
+            limited = False
             if len(new_event_updates) == limit:
                 upper_bound = new_event_updates[-1][0]
+                limited = True
             else:
                 upper_bound = current_id
 
@@ -1114,11 +1139,15 @@ class EventsWorkerStore(SQLBaseStore):
                 " ORDER BY event_stream_ordering DESC"
             )
             txn.execute(sql, (-last_id, -upper_bound))
-            new_event_updates.extend(txn.fetchall())
+            new_event_updates.extend((row[0], row[1:]) for row in txn)
 
-            return new_event_updates
+            if len(new_event_updates) >= limit:
+                upper_bound = new_event_updates[-1][0]
+                limited = True
 
-        return self.db.runInteraction(
+            return new_event_updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_new_backfill_event_rows", get_all_new_backfill_event_rows
         )
 
diff --git a/synapse/storage/data_stores/main/presence.py b/synapse/storage/data_stores/main/presence.py
index dab31e0c2d..7574612619 100644
--- a/synapse/storage/data_stores/main/presence.py
+++ b/synapse/storage/data_stores/main/presence.py
@@ -13,6 +13,8 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+from typing import List, Tuple
+
 from twisted.internet import defer
 
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
@@ -73,9 +75,32 @@ class PresenceStore(SQLBaseStore):
             )
             txn.execute(sql + clause, [stream_id] + list(args))
 
-    def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for presence replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_presence_updates_txn(txn):
             sql = """
@@ -89,9 +114,17 @@ class PresenceStore(SQLBaseStore):
                 LIMIT ?
             """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(row[0], row[1:]) for row in txn]
+
+            upper_bound = current_id
+            limited = False
+            if len(updates) >= limit:
+                upper_bound = updates[-1][0]
+                limited = True
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_presence_updates", get_all_presence_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/push_rule.py b/synapse/storage/data_stores/main/push_rule.py
index ef8f40959f..f6e78ca590 100644
--- a/synapse/storage/data_stores/main/push_rule.py
+++ b/synapse/storage/data_stores/main/push_rule.py
@@ -16,7 +16,7 @@
 
 import abc
 import logging
-from typing import Union
+from typing import List, Tuple, Union
 
 from canonicaljson import json
 
@@ -348,23 +348,53 @@ class PushRulesWorkerStore(
             results.setdefault(row["user_name"], {})[row["rule_id"]] = enabled
         return results
 
-    def get_all_push_rule_updates(self, last_id, current_id, limit):
-        """Get all the push rules changes that have happend on the server"""
+    async def get_all_push_rule_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, tuple]], int, bool]:
+        """Get updates for push_rules replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
         if last_id == current_id:
-            return defer.succeed([])
+            return [], current_id, False
 
         def get_all_push_rule_updates_txn(txn):
-            sql = (
-                "SELECT stream_id, event_stream_ordering, user_id, rule_id,"
-                " op, priority_class, priority, conditions, actions"
-                " FROM push_rules_stream"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC LIMIT ?"
-            )
+            sql = """
+                SELECT stream_id, user_id
+                FROM push_rules_stream
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
             txn.execute(sql, (last_id, current_id, limit))
-            return txn.fetchall()
+            updates = [(stream_id, (user_id,)) for stream_id, user_id in txn]
+
+            limited = False
+            upper_bound = current_id
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
 
-        return self.db.runInteraction(
+        return await self.db.runInteraction(
             "get_all_push_rule_updates", get_all_push_rule_updates_txn
         )
 
diff --git a/synapse/storage/data_stores/main/receipts.py b/synapse/storage/data_stores/main/receipts.py
index cebdcd409f..8f5505bd67 100644
--- a/synapse/storage/data_stores/main/receipts.py
+++ b/synapse/storage/data_stores/main/receipts.py
@@ -16,6 +16,7 @@
 
 import abc
 import logging
+from typing import List, Tuple
 
 from canonicaljson import json
 
@@ -24,6 +25,7 @@ from twisted.internet import defer
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
 from synapse.storage.database import Database
 from synapse.storage.util.id_generators import StreamIdGenerator
+from synapse.util.async_helpers import ObservableDeferred
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
@@ -266,26 +268,79 @@ class ReceiptsWorkerStore(SQLBaseStore):
         }
         return results
 
-    def get_all_updated_receipts(self, last_id, current_id, limit=None):
+    def get_users_sent_receipts_between(self, last_id: int, current_id: int):
+        """Get all users who sent receipts between `last_id` exclusive and
+        `current_id` inclusive.
+
+        Returns:
+            Deferred[List[str]]
+        """
+
         if last_id == current_id:
             return defer.succeed([])
 
-        def get_all_updated_receipts_txn(txn):
-            sql = (
-                "SELECT stream_id, room_id, receipt_type, user_id, event_id, data"
-                " FROM receipts_linearized"
-                " WHERE ? < stream_id AND stream_id <= ?"
-                " ORDER BY stream_id ASC"
-            )
-            args = [last_id, current_id]
-            if limit is not None:
-                sql += " LIMIT ?"
-                args.append(limit)
-            txn.execute(sql, args)
+        def _get_users_sent_receipts_between_txn(txn):
+            sql = """
+                SELECT DISTINCT user_id FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+            """
+            txn.execute(sql, (last_id, current_id))
 
-            return [r[0:5] + (json.loads(r[5]),) for r in txn]
+            return [r[0] for r in txn]
 
         return self.db.runInteraction(
+            "get_users_sent_receipts_between", _get_users_sent_receipts_between_txn
+        )
+
+    async def get_all_updated_receipts(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for receipts replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
+        """
+
+        if last_id == current_id:
+            return [], current_id, False
+
+        def get_all_updated_receipts_txn(txn):
+            sql = """
+                SELECT stream_id, room_id, receipt_type, user_id, event_id, data
+                FROM receipts_linearized
+                WHERE ? < stream_id AND stream_id <= ?
+                ORDER BY stream_id ASC
+                LIMIT ?
+            """
+            txn.execute(sql, (last_id, current_id, limit))
+
+            updates = [(r[0], r[1:5] + (json.loads(r[5]),)) for r in txn]
+
+            limited = False
+            upper_bound = current_id
+
+            if len(updates) == limit:
+                limited = True
+                upper_bound = updates[-1][0]
+
+            return updates, upper_bound, limited
+
+        return await self.db.runInteraction(
             "get_all_updated_receipts", get_all_updated_receipts_txn
         )
 
@@ -300,10 +355,10 @@ class ReceiptsWorkerStore(SQLBaseStore):
             room_id, None, update_metrics=False
         )
 
-        # first handle the Deferred case
-        if isinstance(res, defer.Deferred):
-            if res.called:
-                res = res.result
+        # first handle the ObservableDeferred case
+        if isinstance(res, ObservableDeferred):
+            if res.has_called():
+                res = res.get_result()
             else:
                 res = None
 
diff --git a/synapse/storage/data_stores/main/schema/delta/30/as_users.py b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
index 9b95411fb6..b42c02710a 100644
--- a/synapse/storage/data_stores/main/schema/delta/30/as_users.py
+++ b/synapse/storage/data_stores/main/schema/delta/30/as_users.py
@@ -13,8 +13,6 @@
 # limitations under the License.
 import logging
 
-from six.moves import range
-
 from synapse.config.appservice import load_appservices
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/data_stores/main/search.py b/synapse/storage/data_stores/main/search.py
index 13f49d8060..a8381dc577 100644
--- a/synapse/storage/data_stores/main/search.py
+++ b/synapse/storage/data_stores/main/search.py
@@ -17,8 +17,6 @@ import logging
 import re
 from collections import namedtuple
 
-from six import string_types
-
 from canonicaljson import json
 
 from twisted.internet import defer
@@ -180,7 +178,7 @@ class SearchBackgroundUpdateStore(SearchWorkerStore):
                     # skip over it.
                     continue
 
-                if not isinstance(value, string_types):
+                if not isinstance(value, str):
                     # If the event body, name or topic isn't a string
                     # then skip over it
                     continue
diff --git a/synapse/storage/data_stores/main/stream.py b/synapse/storage/data_stores/main/stream.py
index e89f0bffb5..379d758b5d 100644
--- a/synapse/storage/data_stores/main/stream.py
+++ b/synapse/storage/data_stores/main/stream.py
@@ -40,8 +40,6 @@ import abc
 import logging
 from collections import namedtuple
 
-from six.moves import range
-
 from twisted.internet import defer
 
 from synapse.logging.context import make_deferred_yieldable, run_in_background
diff --git a/synapse/storage/data_stores/main/tags.py b/synapse/storage/data_stores/main/tags.py
index 4219018302..f8c776be3f 100644
--- a/synapse/storage/data_stores/main/tags.py
+++ b/synapse/storage/data_stores/main/tags.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six.moves import range
-
 from canonicaljson import json
 
 from twisted.internet import defer
diff --git a/synapse/storage/data_stores/state/store.py b/synapse/storage/data_stores/state/store.py
index b720212e55..5db9f20135 100644
--- a/synapse/storage/data_stores/state/store.py
+++ b/synapse/storage/data_stores/state/store.py
@@ -17,8 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Dict, Iterable, List, Set, Tuple
 
-from six.moves import range
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 645a70934c..3be20c866a 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -16,6 +16,7 @@
 # limitations under the License.
 import logging
 import time
+from sys import intern
 from time import monotonic as monotonic_time
 from typing import (
     Any,
@@ -29,8 +30,6 @@ from typing import (
     TypeVar,
 )
 
-from six.moves import intern, range
-
 from prometheus_client import Histogram
 
 from twisted.enterprise import adbapi
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 92dfd709bc..ec894a91cb 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -20,8 +20,6 @@ import logging
 from collections import deque, namedtuple
 from typing import Iterable, List, Optional, Set, Tuple
 
-from six.moves import range
-
 from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
diff --git a/synapse/util/async_helpers.py b/synapse/util/async_helpers.py
index f7af2bca7f..65abf0846e 100644
--- a/synapse/util/async_helpers.py
+++ b/synapse/util/async_helpers.py
@@ -19,8 +19,6 @@ import logging
 from contextlib import contextmanager
 from typing import Dict, Sequence, Set, Union
 
-from six.moves import range
-
 import attr
 
 from twisted.internet import defer
@@ -95,7 +93,7 @@ class ObservableDeferred(object):
 
         This returns a brand new deferred that is resolved when the underlying
         deferred is resolved. Interacting with the returned deferred does not
-        effect the underdlying deferred.
+        effect the underlying deferred.
         """
         if not self._result:
             d = defer.Deferred()
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 2a161bf244..c541bf4579 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -17,8 +17,6 @@ import logging
 import math
 from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
 
-from six import integer_types
-
 from sortedcontainers import SortedDict
 
 from synapse.types import Collection
@@ -88,7 +86,7 @@ class StreamChangeCache:
     def has_entity_changed(self, entity: EntityType, stream_pos: int) -> bool:
         """Returns True if the entity may have been updated since stream_pos
         """
-        assert type(stream_pos) in integer_types
+        assert isinstance(stream_pos, int)
 
         if stream_pos < self._earliest_known_stream_pos:
             self.metrics.inc_misses()
diff --git a/synapse/util/file_consumer.py b/synapse/util/file_consumer.py
index 8b17d1c8b8..6a3f6177b1 100644
--- a/synapse/util/file_consumer.py
+++ b/synapse/util/file_consumer.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six.moves import queue
+import queue
 
 from twisted.internet import threads
 
diff --git a/synapse/util/frozenutils.py b/synapse/util/frozenutils.py
index 9815bb8667..eab78dd256 100644
--- a/synapse/util/frozenutils.py
+++ b/synapse/util/frozenutils.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six import binary_type, text_type
-
 from canonicaljson import json
 from frozendict import frozendict
 
@@ -26,7 +24,7 @@ def freeze(o):
     if isinstance(o, frozendict):
         return o
 
-    if isinstance(o, (binary_type, text_type)):
+    if isinstance(o, (bytes, str)):
         return o
 
     try:
@@ -41,7 +39,7 @@ def unfreeze(o):
     if isinstance(o, (dict, frozendict)):
         return dict({k: unfreeze(v) for k, v in o.items()})
 
-    if isinstance(o, (binary_type, text_type)):
+    if isinstance(o, (bytes, str)):
         return o
 
     try:
diff --git a/synapse/util/wheel_timer.py b/synapse/util/wheel_timer.py
index 9bf6a44f75..023beb5ede 100644
--- a/synapse/util/wheel_timer.py
+++ b/synapse/util/wheel_timer.py
@@ -13,8 +13,6 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from six.moves import range
-
 
 class _Entry(object):
     __slots__ = ["end_key", "queue"]
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 780927cda1..3dfd4af26c 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -16,8 +16,6 @@
 import logging
 import operator
 
-from six.moves import map
-
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership