summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py6
-rw-r--r--synapse/api/auth.py78
-rw-r--r--synapse/api/auth_blocking.py9
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/app/_base.py30
-rw-r--r--synapse/app/admin_cmd.py8
-rw-r--r--synapse/app/generic_worker.py12
-rw-r--r--synapse/app/homeserver.py42
-rw-r--r--synapse/config/database.py1
-rw-r--r--synapse/config/logger.py3
-rw-r--r--synapse/config/server.py8
-rw-r--r--synapse/event_auth.py8
-rw-r--r--synapse/events/spamcheck.py5
-rw-r--r--synapse/federation/federation_client.py118
-rw-r--r--synapse/federation/sender/__init__.py145
-rw-r--r--synapse/federation/sender/per_destination_queue.py15
-rw-r--r--synapse/handlers/directory.py59
-rw-r--r--synapse/handlers/identity.py9
-rw-r--r--synapse/handlers/message.py24
-rw-r--r--synapse/handlers/oidc.py22
-rw-r--r--synapse/handlers/presence.py192
-rw-r--r--synapse/handlers/room_member.py2
-rw-r--r--synapse/handlers/ui_auth/checkers.py35
-rw-r--r--synapse/http/site.py69
-rw-r--r--synapse/python_dependencies.py9
-rw-r--r--synapse/rest/admin/rooms.py134
-rw-r--r--synapse/rest/admin/users.py3
-rw-r--r--synapse/rest/consent/consent_resource.py10
-rw-r--r--synapse/rest/media/v1/filepath.py2
-rw-r--r--synapse/rest/media/v1/upload_resource.py2
-rw-r--r--synapse/secrets.py44
-rw-r--r--synapse/server.py13
-rw-r--r--synapse/storage/_base.py2
-rw-r--r--synapse/storage/database.py19
-rw-r--r--synapse/storage/databases/main/transactions.py28
-rw-r--r--synapse/util/caches/lrucache.py76
-rw-r--r--synapse/util/caches/response_cache.py2
37 files changed, 691 insertions, 556 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 837e938f56..319c52be2c 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -21,8 +21,8 @@ import os
 import sys
 
 # Check that we're not running on an unsupported Python version.
-if sys.version_info < (3, 5):
-    print("Synapse requires Python 3.5 or above.")
+if sys.version_info < (3, 6):
+    print("Synapse requires Python 3.6 or above.")
     sys.exit(1)
 
 # Twisted and canonicaljson will fail to import when this file is executed to
@@ -47,7 +47,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.32.2"
+__version__ = "1.33.0rc2"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2d845d0d5c..efc926d094 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -12,14 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
 
 import pymacaroons
 from netaddr import IPAddress
 
 from twisted.web.server import Request
 
-import synapse.types
 from synapse import event_auth
 from synapse.api.auth_blocking import AuthBlocking
 from synapse.api.constants import EventTypes, HistoryVisibility, Membership
@@ -36,11 +35,14 @@ from synapse.http import get_request_user_agent
 from synapse.http.site import SynapseRequest
 from synapse.logging import opentracing as opentracing
 from synapse.storage.databases.main.registration import TokenLookupResult
-from synapse.types import StateMap, UserID
+from synapse.types import Requester, StateMap, UserID, create_requester
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
 from synapse.util.metrics import Measure
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -68,7 +70,7 @@ class Auth:
     The latter should be moved to synapse.handlers.event_auth.EventAuthHandler.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
@@ -88,13 +90,13 @@ class Auth:
 
     async def check_from_context(
         self, room_version: str, event, context, do_sig_check=True
-    ):
+    ) -> None:
         prev_state_ids = await context.get_prev_state_ids()
         auth_events_ids = self.compute_auth_events(
             event, prev_state_ids, for_verification=True
         )
-        auth_events = await self.store.get_events(auth_events_ids)
-        auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+        auth_events_by_id = await self.store.get_events(auth_events_ids)
+        auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
 
         room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
         event_auth.check(
@@ -151,17 +153,11 @@ class Auth:
 
         raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
 
-    async def check_host_in_room(self, room_id, host):
+    async def check_host_in_room(self, room_id: str, host: str) -> bool:
         with Measure(self.clock, "check_host_in_room"):
-            latest_event_ids = await self.store.is_host_joined(room_id, host)
-            return latest_event_ids
-
-    def can_federate(self, event, auth_events):
-        creation_event = auth_events.get((EventTypes.Create, ""))
+            return await self.store.is_host_joined(room_id, host)
 
-        return creation_event.content.get("m.federate", True) is True
-
-    def get_public_keys(self, invite_event):
+    def get_public_keys(self, invite_event: EventBase) -> List[Dict[str, Any]]:
         return event_auth.get_public_keys(invite_event)
 
     async def get_user_by_req(
@@ -170,7 +166,7 @@ class Auth:
         allow_guest: bool = False,
         rights: str = "access",
         allow_expired: bool = False,
-    ) -> synapse.types.Requester:
+    ) -> Requester:
         """Get a registered user's ID.
 
         Args:
@@ -196,7 +192,7 @@ class Auth:
             access_token = self.get_access_token_from_request(request)
 
             user_id, app_service = await self._get_appservice_user_id(request)
-            if user_id:
+            if user_id and app_service:
                 if ip_addr and self._track_appservice_user_ips:
                     await self.store.insert_client_ip(
                         user_id=user_id,
@@ -206,9 +202,7 @@ class Auth:
                         device_id="dummy-device",  # stubbed
                     )
 
-                requester = synapse.types.create_requester(
-                    user_id, app_service=app_service
-                )
+                requester = create_requester(user_id, app_service=app_service)
 
                 request.requester = user_id
                 opentracing.set_tag("authenticated_entity", user_id)
@@ -251,7 +245,7 @@ class Auth:
                     errcode=Codes.GUEST_ACCESS_FORBIDDEN,
                 )
 
-            requester = synapse.types.create_requester(
+            requester = create_requester(
                 user_info.user_id,
                 token_id,
                 is_guest,
@@ -271,7 +265,9 @@ class Auth:
         except KeyError:
             raise MissingClientTokenError()
 
-    async def _get_appservice_user_id(self, request):
+    async def _get_appservice_user_id(
+        self, request: Request
+    ) -> Tuple[Optional[str], Optional[ApplicationService]]:
         app_service = self.store.get_app_service_by_token(
             self.get_access_token_from_request(request)
         )
@@ -283,6 +279,9 @@ class Auth:
             if ip_address not in app_service.ip_range_whitelist:
                 return None, None
 
+        # This will always be set by the time Twisted calls us.
+        assert request.args is not None
+
         if b"user_id" not in request.args:
             return app_service.sender, app_service
 
@@ -387,7 +386,9 @@ class Auth:
             logger.warning("Invalid macaroon in auth: %s %s", type(e), e)
             raise InvalidClientTokenError("Invalid macaroon passed.")
 
-    def _parse_and_validate_macaroon(self, token, rights="access"):
+    def _parse_and_validate_macaroon(
+        self, token: str, rights: str = "access"
+    ) -> Tuple[str, bool]:
         """Takes a macaroon and tries to parse and validate it. This is cached
         if and only if rights == access and there isn't an expiry.
 
@@ -432,15 +433,16 @@ class Auth:
 
         return user_id, guest
 
-    def validate_macaroon(self, macaroon, type_string, user_id):
+    def validate_macaroon(
+        self, macaroon: pymacaroons.Macaroon, type_string: str, user_id: str
+    ) -> None:
         """
         validate that a Macaroon is understood by and was signed by this server.
 
         Args:
-            macaroon(pymacaroons.Macaroon): The macaroon to validate
-            type_string(str): The kind of token required (e.g. "access",
-                              "delete_pusher")
-            user_id (str): The user_id required
+            macaroon: The macaroon to validate
+            type_string: The kind of token required (e.g. "access", "delete_pusher")
+            user_id: The user_id required
         """
         v = pymacaroons.Verifier()
 
@@ -465,9 +467,7 @@ class Auth:
         if not service:
             logger.warning("Unrecognised appservice access token.")
             raise InvalidClientTokenError()
-        request.requester = synapse.types.create_requester(
-            service.sender, app_service=service
-        )
+        request.requester = create_requester(service.sender, app_service=service)
         return service
 
     async def is_server_admin(self, user: UserID) -> bool:
@@ -519,7 +519,7 @@ class Auth:
 
         return auth_ids
 
-    async def check_can_change_room_list(self, room_id: str, user: UserID):
+    async def check_can_change_room_list(self, room_id: str, user: UserID) -> bool:
         """Determine whether the user is allowed to edit the room's entry in the
         published room list.
 
@@ -554,11 +554,11 @@ class Auth:
         return user_level >= send_level
 
     @staticmethod
-    def has_access_token(request: Request):
+    def has_access_token(request: Request) -> bool:
         """Checks if the request has an access_token.
 
         Returns:
-            bool: False if no access_token was given, True otherwise.
+            False if no access_token was given, True otherwise.
         """
         # This will always be set by the time Twisted calls us.
         assert request.args is not None
@@ -568,13 +568,13 @@ class Auth:
         return bool(query_params) or bool(auth_headers)
 
     @staticmethod
-    def get_access_token_from_request(request: Request):
+    def get_access_token_from_request(request: Request) -> str:
         """Extracts the access_token from the request.
 
         Args:
             request: The http request.
         Returns:
-            unicode: The access_token
+            The access_token
         Raises:
             MissingClientTokenError: If there isn't a single access_token in the
                 request
@@ -649,5 +649,5 @@ class Auth:
                 % (user_id, room_id),
             )
 
-    def check_auth_blocking(self, *args, **kwargs):
-        return self._auth_blocking.check_auth_blocking(*args, **kwargs)
+    async def check_auth_blocking(self, *args, **kwargs) -> None:
+        await self._auth_blocking.check_auth_blocking(*args, **kwargs)
diff --git a/synapse/api/auth_blocking.py b/synapse/api/auth_blocking.py
index a8df60cb89..e6bced93d5 100644
--- a/synapse/api/auth_blocking.py
+++ b/synapse/api/auth_blocking.py
@@ -13,18 +13,21 @@
 # limitations under the License.
 
 import logging
-from typing import Optional
+from typing import TYPE_CHECKING, Optional
 
 from synapse.api.constants import LimitBlockingTypes, UserTypes
 from synapse.api.errors import Codes, ResourceLimitError
 from synapse.config.server import is_threepid_reserved
 from synapse.types import Requester
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class AuthBlocking:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
 
         self._server_notices_mxid = hs.config.server_notices_mxid
@@ -43,7 +46,7 @@ class AuthBlocking:
         threepid: Optional[dict] = None,
         user_type: Optional[str] = None,
         requester: Optional[Requester] = None,
-    ):
+    ) -> None:
         """Checks if the user should be rejected for some external reason,
         such as monthly active user limiting or global disable flag
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 31a59bceec..936b6534b4 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -17,6 +17,9 @@
 
 """Contains constants from the specification."""
 
+# the max size of a (canonical-json-encoded) event
+MAX_PDU_SIZE = 65536
+
 # the "depth" field on events is limited to 2**63 - 1
 MAX_DEPTH = 2 ** 63 - 1
 
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 2113c4f370..638e01c1b2 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -30,9 +30,10 @@ from twisted.internet import defer, error, reactor
 from twisted.protocols.tls import TLSMemoryBIOFactory
 
 import synapse
+from synapse.api.constants import MAX_PDU_SIZE
 from synapse.app import check_bind_error
 from synapse.app.phone_stats_home import start_phone_stats_home
-from synapse.config.server import ListenerConfig
+from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.logging.context import PreserveLoggingContext
 from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -288,7 +289,7 @@ def refresh_certificate(hs):
         logger.info("Context factories updated.")
 
 
-async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
+async def start(hs: "synapse.server.HomeServer"):
     """
     Start a Synapse server or worker.
 
@@ -300,7 +301,6 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
 
     Args:
         hs: homeserver instance
-        listeners: Listener configuration ('listeners' in homeserver.yaml)
     """
     # Set up the SIGHUP machinery.
     if hasattr(signal, "SIGHUP"):
@@ -336,7 +336,7 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
     synapse.logging.opentracing.init_tracer(hs)  # type: ignore[attr-defined] # noqa
 
     # It is now safe to start your Synapse.
-    hs.start_listening(listeners)
+    hs.start_listening()
     hs.get_datastore().db_pool.start_profiling()
     hs.get_pusherpool().start()
 
@@ -530,3 +530,25 @@ def sdnotify(state):
         # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
         # unless systemd is expecting us to notify it.
         logger.warning("Unable to send notification to systemd: %s", e)
+
+
+def max_request_body_size(config: HomeServerConfig) -> int:
+    """Get a suitable maximum size for incoming HTTP requests"""
+
+    # Other than media uploads, the biggest request we expect to see is a fully-loaded
+    # /federation/v1/send request.
+    #
+    # The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are
+    # limited to 65536 bytes (possibly slightly more if the sender didn't use canonical
+    # json encoding); there is no specced limit to EDUs (see
+    # https://github.com/matrix-org/matrix-doc/issues/3121).
+    #
+    # in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M)
+    #
+    max_request_size = 200 * MAX_PDU_SIZE
+
+    # if we have a media repo enabled, we may need to allow larger uploads than that
+    if config.media.can_load_media_repo:
+        max_request_size = max(max_request_size, config.media.max_upload_size)
+
+    return max_request_size
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index eb256db749..68ae19c977 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -70,12 +70,6 @@ class AdminCmdSlavedStore(
 class AdminCmdServer(HomeServer):
     DATASTORE_CLASS = AdminCmdSlavedStore
 
-    def _listen_http(self, listener_config):
-        pass
-
-    def start_listening(self, listeners):
-        pass
-
 
 async def export_data_command(hs, args):
     """Export data for a user.
@@ -232,7 +226,7 @@ def start(config_options):
 
     async def run():
         with LoggingContext("command"):
-            _base.start(ss, [])
+            _base.start(ss)
             await args.func(ss, args)
 
     _base.start_worker_reactor(
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index d463ca3d87..986c8e3e23 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 import sys
-from typing import Dict, Iterable, Optional
+from typing import Dict, Optional
 
 from twisted.internet import address
 from twisted.web.resource import IResource
@@ -32,7 +32,7 @@ from synapse.api.urls import (
     SERVER_KEY_V2_PREFIX,
 )
 from synapse.app import _base
-from synapse.app._base import register_start
+from synapse.app._base import max_request_body_size, register_start
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
@@ -367,14 +367,16 @@ class GenericWorkerServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
+                max_request_body_size=max_request_body_size(self.config),
+                reactor=self.get_reactor(),
             ),
             reactor=self.get_reactor(),
         )
 
         logger.info("Synapse worker now listening on port %d", port)
 
-    def start_listening(self, listeners: Iterable[ListenerConfig]):
-        for listener in listeners:
+    def start_listening(self):
+        for listener in self.config.worker_listeners:
             if listener.type == "http":
                 self._listen_http(listener)
             elif listener.type == "manhole":
@@ -468,7 +470,7 @@ def start(config_options):
     # streams. Will no-op if no streams can be written to by this worker.
     hs.get_replication_streamer()
 
-    register_start(_base.start, hs, config.worker_listeners)
+    register_start(_base.start, hs)
 
     _base.start_worker_reactor("synapse-generic-worker", config)
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 11933c699f..510d7f8ad4 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -17,7 +17,7 @@
 import logging
 import os
 import sys
-from typing import Iterable, Iterator
+from typing import Iterator
 
 from twisted.internet import reactor
 from twisted.web.resource import EncodingResourceWrapper, IResource
@@ -36,7 +36,13 @@ from synapse.api.urls import (
     WEB_CLIENT_PREFIX,
 )
 from synapse.app import _base
-from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
+from synapse.app._base import (
+    listen_ssl,
+    listen_tcp,
+    max_request_body_size,
+    quit_with_error,
+    register_start,
+)
 from synapse.config._base import ConfigError
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.config.homeserver import HomeServerConfig
@@ -126,19 +132,21 @@ class SynapseHomeServer(HomeServer):
         else:
             root_resource = OptionsResource()
 
-        root_resource = create_resource_tree(resources, root_resource)
+        site = SynapseSite(
+            "synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
+            site_tag,
+            listener_config,
+            create_resource_tree(resources, root_resource),
+            self.version_string,
+            max_request_body_size=max_request_body_size(self.config),
+            reactor=self.get_reactor(),
+        )
 
         if tls:
             ports = listen_ssl(
                 bind_addresses,
                 port,
-                SynapseSite(
-                    "synapse.access.https.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                    self.version_string,
-                ),
+                site,
                 self.tls_server_context_factory,
                 reactor=self.get_reactor(),
             )
@@ -148,13 +156,7 @@ class SynapseHomeServer(HomeServer):
             ports = listen_tcp(
                 bind_addresses,
                 port,
-                SynapseSite(
-                    "synapse.access.http.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                    self.version_string,
-                ),
+                site,
                 reactor=self.get_reactor(),
             )
             logger.info("Synapse now listening on TCP port %d", port)
@@ -273,14 +275,14 @@ class SynapseHomeServer(HomeServer):
 
         return resources
 
-    def start_listening(self, listeners: Iterable[ListenerConfig]):
+    def start_listening(self):
         if self.config.redis_enabled:
             # If redis is enabled we connect via the replication command handler
             # in the same way as the workers (since we're effectively a client
             # rather than a server).
             self.get_tcp_replication().start_replication(self)
 
-        for listener in listeners:
+        for listener in self.config.server.listeners:
             if listener.type == "http":
                 self._listening_services.extend(
                     self._listener_http(self.config, listener)
@@ -413,7 +415,7 @@ def setup(config_options):
             # Loading the provider metadata also ensures the provider config is valid.
             await oidc.load_metadata()
 
-        await _base.start(hs, config.listeners)
+        await _base.start(hs)
 
         hs.get_datastore().db_pool.updates.start_doing_background_updates()
 
diff --git a/synapse/config/database.py b/synapse/config/database.py
index 79a02706b4..c76ef1e1de 100644
--- a/synapse/config/database.py
+++ b/synapse/config/database.py
@@ -58,6 +58,7 @@ DEFAULT_CONFIG = """\
 #    password: secretpassword
 #    database: synapse
 #    host: localhost
+#    port: 5432
 #    cp_min: 5
 #    cp_max: 10
 #
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index b174e0df6d..813076dfe2 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -31,7 +31,6 @@ from twisted.logger import (
 )
 
 import synapse
-from synapse.app import _base as appbase
 from synapse.logging._structured import setup_structured_logging
 from synapse.logging.context import LoggingContextFilter
 from synapse.logging.filter import MetadataFilter
@@ -318,6 +317,8 @@ def setup_logging(
     # Perform one-time logging configuration.
     _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
     # Add a SIGHUP handler to reload the logging configuration, if one is available.
+    from synapse.app import _base as appbase
+
     appbase.register_sighup(_reload_logging_config, log_config_path)
 
     # Log immediately so we can grep backwards.
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 02b86b11a5..21ca7b33e3 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -235,7 +235,11 @@ class ServerConfig(Config):
         self.print_pidfile = config.get("print_pidfile")
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
+
         self.public_baseurl = config.get("public_baseurl")
+        if self.public_baseurl is not None:
+            if self.public_baseurl[-1] != "/":
+                self.public_baseurl += "/"
 
         # Whether to enable user presence.
         presence_config = config.get("presence") or {}
@@ -407,10 +411,6 @@ class ServerConfig(Config):
             config_path=("federation_ip_range_blacklist",),
         )
 
-        if self.public_baseurl is not None:
-            if self.public_baseurl[-1] != "/":
-                self.public_baseurl += "/"
-
         # (undocumented) option for torturing the worker-mode replication a bit,
         # for testing. The value defines the number of milliseconds to pause before
         # sending out any replication updates.
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index c831d9f73c..70c556566e 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -14,14 +14,14 @@
 # limitations under the License.
 
 import logging
-from typing import List, Optional, Set, Tuple
+from typing import Any, Dict, List, Optional, Set, Tuple
 
 from canonicaljson import encode_canonical_json
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import SignatureVerifyException, verify_signed_json
 from unpaddedbase64 import decode_base64
 
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import MAX_PDU_SIZE, EventTypes, JoinRules, Membership
 from synapse.api.errors import AuthError, EventSizeError, SynapseError
 from synapse.api.room_versions import (
     KNOWN_ROOM_VERSIONS,
@@ -205,7 +205,7 @@ def _check_size_limits(event: EventBase) -> None:
         too_big("type")
     if len(event.event_id) > 255:
         too_big("event_id")
-    if len(encode_canonical_json(event.get_pdu_json())) > 65536:
+    if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
         too_big("event")
 
 
@@ -688,7 +688,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase
     return False
 
 
-def get_public_keys(invite_event):
+def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]:
     public_keys = []
     if "public_key" in invite_event.content:
         o = {"public_key": invite_event.content["public_key"]}
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index 7118d5f52d..d5fa195094 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -20,6 +20,7 @@ from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple,
 from synapse.rest.media.v1._base import FileInfo
 from synapse.rest.media.v1.media_storage import ReadableFileWrapper
 from synapse.spam_checker_api import RegistrationBehaviour
+from synapse.types import RoomAlias
 from synapse.util.async_helpers import maybe_awaitable
 
 if TYPE_CHECKING:
@@ -113,7 +114,9 @@ class SpamChecker:
 
         return True
 
-    async def user_may_create_room_alias(self, userid: str, room_alias: str) -> bool:
+    async def user_may_create_room_alias(
+        self, userid: str, room_alias: RoomAlias
+    ) -> bool:
         """Checks if a given user may create a room alias
 
         If this method returns false, the association request will be rejected.
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f93335edaa..a5b6a61195 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -451,6 +451,28 @@ class FederationClient(FederationBase):
 
         return signed_auth
 
+    def _is_unknown_endpoint(
+        self, e: HttpResponseException, synapse_error: Optional[SynapseError] = None
+    ) -> bool:
+        """
+        Returns true if the response was due to an endpoint being unimplemented.
+
+        Args:
+            e: The error response received from the remote server.
+            synapse_error: The above error converted to a SynapseError. This is
+                automatically generated if not provided.
+
+        """
+        if synapse_error is None:
+            synapse_error = e.to_synapse_error()
+        # There is no good way to detect an "unknown" endpoint.
+        #
+        # Dendrite returns a 404 (with no body); synapse returns a 400
+        # with M_UNRECOGNISED.
+        return e.code == 404 or (
+            e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
+        )
+
     async def _try_destination_list(
         self,
         description: str,
@@ -468,9 +490,9 @@ class FederationClient(FederationBase):
             callback:  Function to run for each server. Passed a single
                 argument: the server_name to try.
 
-                If the callback raises a CodeMessageException with a 300/400 code,
-                attempts to perform the operation stop immediately and the exception is
-                reraised.
+                If the callback raises a CodeMessageException with a 300/400 code or
+                an UnsupportedRoomVersionError, attempts to perform the operation
+                stop immediately and the exception is reraised.
 
                 Otherwise, if the callback raises an Exception the error is logged and the
                 next server tried. Normally the stacktrace is logged but this is
@@ -492,8 +514,7 @@ class FederationClient(FederationBase):
                 continue
 
             try:
-                res = await callback(destination)
-                return res
+                return await callback(destination)
             except InvalidResponseError as e:
                 logger.warning("Failed to %s via %s: %s", description, destination, e)
             except UnsupportedRoomVersionError:
@@ -502,17 +523,15 @@ class FederationClient(FederationBase):
                 synapse_error = e.to_synapse_error()
                 failover = False
 
+                # Failover on an internal server error, or if the destination
+                # doesn't implemented the endpoint for some reason.
                 if 500 <= e.code < 600:
                     failover = True
 
-                elif failover_on_unknown_endpoint:
-                    # there is no good way to detect an "unknown" endpoint. Dendrite
-                    # returns a 404 (with no body); synapse returns a 400
-                    # with M_UNRECOGNISED.
-                    if e.code == 404 or (
-                        e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
-                    ):
-                        failover = True
+                elif failover_on_unknown_endpoint and self._is_unknown_endpoint(
+                    e, synapse_error
+                ):
+                    failover = True
 
                 if not failover:
                     raise synapse_error from e
@@ -570,9 +589,8 @@ class FederationClient(FederationBase):
             UnsupportedRoomVersionError: if remote responds with
                 a room version we don't understand.
 
-            SynapseError: if the chosen remote server returns a 300/400 code.
-
-            RuntimeError: if no servers were reachable.
+            SynapseError: if the chosen remote server returns a 300/400 code, or
+                no servers successfully handle the request.
         """
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
@@ -642,9 +660,8 @@ class FederationClient(FederationBase):
             ``auth_chain``.
 
         Raises:
-            SynapseError: if the chosen remote server returns a 300/400 code.
-
-            RuntimeError: if no servers were reachable.
+            SynapseError: if the chosen remote server returns a 300/400 code, or
+                no servers successfully handle the request.
         """
 
         async def send_request(destination) -> Dict[str, Any]:
@@ -673,7 +690,7 @@ class FederationClient(FederationBase):
             if create_event is None:
                 # If the state doesn't have a create event then the room is
                 # invalid, and it would fail auth checks anyway.
-                raise SynapseError(400, "No create event in state")
+                raise InvalidResponseError("No create event in state")
 
             # the room version should be sane.
             create_room_version = create_event.content.get(
@@ -746,16 +763,11 @@ class FederationClient(FederationBase):
                 content=pdu.get_pdu_json(time_now),
             )
         except HttpResponseException as e:
-            if e.code in [400, 404]:
-                err = e.to_synapse_error()
-
-                # If we receive an error response that isn't a generic error, or an
-                # unrecognised endpoint error, we  assume that the remote understands
-                # the v2 invite API and this is a legitimate error.
-                if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
-                    raise err
-            else:
-                raise e.to_synapse_error()
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v1 endpoint. Otherwise consider it a legitmate error
+            # and raise.
+            if not self._is_unknown_endpoint(e):
+                raise
 
         logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
 
@@ -802,6 +814,11 @@ class FederationClient(FederationBase):
 
         Returns:
             The event as a dict as returned by the remote server
+
+        Raises:
+            SynapseError: if the remote server returns an error or if the server
+                only supports the v1 endpoint and a room version other than "1"
+                or "2" is requested.
         """
         time_now = self._clock.time_msec()
 
@@ -817,28 +834,19 @@ class FederationClient(FederationBase):
                 },
             )
         except HttpResponseException as e:
-            if e.code in [400, 404]:
-                err = e.to_synapse_error()
-
-                # If we receive an error response that isn't a generic error, we
-                # assume that the remote understands the v2 invite API and this
-                # is a legitimate error.
-                if err.errcode != Codes.UNKNOWN:
-                    raise err
-
-                # Otherwise, we assume that the remote server doesn't understand
-                # the v2 invite API. That's ok provided the room uses old-style event
-                # IDs.
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v1 endpoint if the room uses old-style event IDs.
+            # Otherwise consider it a legitmate error and raise.
+            err = e.to_synapse_error()
+            if self._is_unknown_endpoint(e, err):
                 if room_version.event_format != EventFormatVersions.V1:
                     raise SynapseError(
                         400,
                         "User's homeserver does not support this room version",
                         Codes.UNSUPPORTED_ROOM_VERSION,
                     )
-            elif e.code in (403, 429):
-                raise e.to_synapse_error()
             else:
-                raise
+                raise err
 
         # Didn't work, try v1 API.
         # Note the v1 API returns a tuple of `(200, content)`
@@ -865,9 +873,8 @@ class FederationClient(FederationBase):
             pdu: event to be sent
 
         Raises:
-            SynapseError if the chosen remote server returns a 300/400 code.
-
-            RuntimeError if no servers were reachable.
+            SynapseError: if the chosen remote server returns a 300/400 code, or
+                no servers successfully handle the request.
         """
 
         async def send_request(destination: str) -> None:
@@ -889,16 +896,11 @@ class FederationClient(FederationBase):
                 content=pdu.get_pdu_json(time_now),
             )
         except HttpResponseException as e:
-            if e.code in [400, 404]:
-                err = e.to_synapse_error()
-
-                # If we receive an error response that isn't a generic error, or an
-                # unrecognised endpoint error, we  assume that the remote understands
-                # the v2 invite API and this is a legitimate error.
-                if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
-                    raise err
-            else:
-                raise e.to_synapse_error()
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v1 endpoint. Otherwise consider it a legitmate error
+            # and raise.
+            if not self._is_unknown_endpoint(e):
+                raise
 
         logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
 
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 022bbf7dad..deb40f4610 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,26 +14,19 @@
 
 import abc
 import logging
-from typing import (
-    TYPE_CHECKING,
-    Collection,
-    Dict,
-    Hashable,
-    Iterable,
-    List,
-    Optional,
-    Set,
-    Tuple,
-)
+from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
 
 from prometheus_client import Counter
 
+from twisted.internet import defer
+
 import synapse.metrics
 from synapse.api.presence import UserPresenceState
 from synapse.events import EventBase
 from synapse.federation.sender.per_destination_queue import PerDestinationQueue
 from synapse.federation.sender.transaction_manager import TransactionManager
 from synapse.federation.units import Edu
+from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics import (
     LaterGauge,
     event_processing_loop_counter,
@@ -262,27 +255,15 @@ class FederationSender(AbstractFederationSender):
                 if not events and next_token >= self._last_poked_id:
                     break
 
-                async def get_destinations_for_event(
-                    event: EventBase,
-                ) -> Collection[str]:
-                    """Computes the destinations to which this event must be sent.
-
-                    This returns an empty tuple when there are no destinations to send to,
-                    or if this event is not from this homeserver and it is not sending
-                    it on behalf of another server.
-
-                    Will also filter out destinations which this sender is not responsible for,
-                    if multiple federation senders exist.
-                    """
-
+                async def handle_event(event: EventBase) -> None:
                     # Only send events for this server.
                     send_on_behalf_of = event.internal_metadata.get_send_on_behalf_of()
                     is_mine = self.is_mine_id(event.sender)
                     if not is_mine and send_on_behalf_of is None:
-                        return ()
+                        return
 
                     if not event.internal_metadata.should_proactively_send():
-                        return ()
+                        return
 
                     destinations = None  # type: Optional[Set[str]]
                     if not event.prev_event_ids():
@@ -317,7 +298,7 @@ class FederationSender(AbstractFederationSender):
                                 "Failed to calculate hosts in room for event: %s",
                                 event.event_id,
                             )
-                            return ()
+                            return
 
                     destinations = {
                         d
@@ -327,15 +308,17 @@ class FederationSender(AbstractFederationSender):
                         )
                     }
 
-                    destinations.discard(self.server_name)
-
                     if send_on_behalf_of is not None:
                         # If we are sending the event on behalf of another server
                         # then it already has the event and there is no reason to
                         # send the event to it.
                         destinations.discard(send_on_behalf_of)
 
+                    logger.debug("Sending %s to %r", event, destinations)
+
                     if destinations:
+                        await self._send_pdu(event, destinations)
+
                         now = self.clock.time_msec()
                         ts = await self.store.get_received_ts(event.event_id)
 
@@ -343,29 +326,24 @@ class FederationSender(AbstractFederationSender):
                             "federation_sender"
                         ).observe((now - ts) / 1000)
 
-                        return destinations
-                    return ()
-
-                async def get_federatable_events_and_destinations(
-                    events: Iterable[EventBase],
-                ) -> List[Tuple[EventBase, Collection[str]]]:
-                    with Measure(self.clock, "get_destinations_for_events"):
-                        # Fetch federation destinations per event,
-                        # skip if get_destinations_for_event returns an empty collection,
-                        # return list of event->destinations pairs.
-                        return [
-                            (event, dests)
-                            for (event, dests) in [
-                                (event, await get_destinations_for_event(event))
-                                for event in events
-                            ]
-                            if dests
-                        ]
-
-                events_and_dests = await get_federatable_events_and_destinations(events)
-
-                # Send corresponding events to each destination queue
-                await self._distribute_events(events_and_dests)
+                async def handle_room_events(events: Iterable[EventBase]) -> None:
+                    with Measure(self.clock, "handle_room_events"):
+                        for event in events:
+                            await handle_event(event)
+
+                events_by_room = {}  # type: Dict[str, List[EventBase]]
+                for event in events:
+                    events_by_room.setdefault(event.room_id, []).append(event)
+
+                await make_deferred_yieldable(
+                    defer.gatherResults(
+                        [
+                            run_in_background(handle_room_events, evs)
+                            for evs in events_by_room.values()
+                        ],
+                        consumeErrors=True,
+                    )
+                )
 
                 await self.store.update_federation_out_pos("events", next_token)
 
@@ -383,7 +361,7 @@ class FederationSender(AbstractFederationSender):
                     events_processed_counter.inc(len(events))
 
                     event_processing_loop_room_count.labels("federation_sender").inc(
-                        len({event.room_id for event in events})
+                        len(events_by_room)
                     )
 
                 event_processing_loop_counter.labels("federation_sender").inc()
@@ -395,53 +373,34 @@ class FederationSender(AbstractFederationSender):
         finally:
             self._is_processing = False
 
-    async def _distribute_events(
-        self,
-        events_and_dests: Iterable[Tuple[EventBase, Collection[str]]],
-    ) -> None:
-        """Distribute events to the respective per_destination queues.
-
-        Also persists last-seen per-room stream_ordering to 'destination_rooms'.
-
-        Args:
-            events_and_dests: A list of tuples, which are (event: EventBase, destinations: Collection[str]).
-                              Every event is paired with its intended destinations (in federation).
-        """
-        # Tuples of room_id + destination to their max-seen stream_ordering
-        room_with_dest_stream_ordering = {}  # type: Dict[Tuple[str, str], int]
-
-        # List of events to send to each destination
-        events_by_dest = {}  # type: Dict[str, List[EventBase]]
+    async def _send_pdu(self, pdu: EventBase, destinations: Iterable[str]) -> None:
+        # We loop through all destinations to see whether we already have
+        # a transaction in progress. If we do, stick it in the pending_pdus
+        # table and we'll get back to it later.
 
-        # For each event-destinations pair...
-        for event, destinations in events_and_dests:
+        destinations = set(destinations)
+        destinations.discard(self.server_name)
+        logger.debug("Sending to: %s", str(destinations))
 
-            # (we got this from the database, it's filled)
-            assert event.internal_metadata.stream_ordering
-
-            sent_pdus_destination_dist_total.inc(len(destinations))
-            sent_pdus_destination_dist_count.inc()
+        if not destinations:
+            return
 
-            # ...iterate over those destinations..
-            for destination in destinations:
-                # ...update their stream-ordering...
-                room_with_dest_stream_ordering[(event.room_id, destination)] = max(
-                    event.internal_metadata.stream_ordering,
-                    room_with_dest_stream_ordering.get((event.room_id, destination), 0),
-                )
+        sent_pdus_destination_dist_total.inc(len(destinations))
+        sent_pdus_destination_dist_count.inc()
 
-                # ...and add the event to each destination queue.
-                events_by_dest.setdefault(destination, []).append(event)
+        assert pdu.internal_metadata.stream_ordering
 
-        # Bulk-store destination_rooms stream_ids
-        await self.store.bulk_store_destination_rooms_entries(
-            room_with_dest_stream_ordering
+        # track the fact that we have a PDU for these destinations,
+        # to allow us to perform catch-up later on if the remote is unreachable
+        # for a while.
+        await self.store.store_destination_rooms_entries(
+            destinations,
+            pdu.room_id,
+            pdu.internal_metadata.stream_ordering,
         )
 
-        for destination, pdus in events_by_dest.items():
-            logger.debug("Sending %d pdus to %s", len(pdus), destination)
-
-            self._get_per_destination_queue(destination).send_pdus(pdus)
+        for destination in destinations:
+            self._get_per_destination_queue(destination).send_pdu(pdu)
 
     async def send_read_receipt(self, receipt: ReadReceipt) -> None:
         """Send a RR to any other servers in the room
diff --git a/synapse/federation/sender/per_destination_queue.py b/synapse/federation/sender/per_destination_queue.py
index 3bb66bce32..3b053ebcfb 100644
--- a/synapse/federation/sender/per_destination_queue.py
+++ b/synapse/federation/sender/per_destination_queue.py
@@ -154,22 +154,19 @@ class PerDestinationQueue:
             + len(self._pending_edus_keyed)
         )
 
-    def send_pdus(self, pdus: Iterable[EventBase]) -> None:
-        """Add PDUs to the queue, and start the transmission loop if necessary
+    def send_pdu(self, pdu: EventBase) -> None:
+        """Add a PDU to the queue, and start the transmission loop if necessary
 
         Args:
-            pdus: pdus to send
+            pdu: pdu to send
         """
         if not self._catching_up or self._last_successful_stream_ordering is None:
             # only enqueue the PDU if we are not catching up (False) or do not
             # yet know if we have anything to catch up (None)
-            self._pending_pdus.extend(pdus)
+            self._pending_pdus.append(pdu)
         else:
-            self._catchup_last_skipped = max(
-                pdu.internal_metadata.stream_ordering
-                for pdu in pdus
-                if pdu.internal_metadata.stream_ordering is not None
-            )
+            assert pdu.internal_metadata.stream_ordering
+            self._catchup_last_skipped = pdu.internal_metadata.stream_ordering
 
         self.attempt_new_transaction()
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 90932316f3..de1b14cde3 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -14,7 +14,7 @@
 
 import logging
 import string
-from typing import Iterable, List, Optional
+from typing import TYPE_CHECKING, Iterable, List, Optional
 
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
 from synapse.api.errors import (
@@ -27,15 +27,19 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.appservice import ApplicationService
-from synapse.types import Requester, RoomAlias, UserID, get_domain_from_id
+from synapse.storage.databases.main.directory import RoomAliasMapping
+from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class DirectoryHandler(BaseHandler):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.state = hs.get_state_handler()
@@ -60,7 +64,7 @@ class DirectoryHandler(BaseHandler):
         room_id: str,
         servers: Optional[Iterable[str]] = None,
         creator: Optional[str] = None,
-    ):
+    ) -> None:
         # general association creation for both human users and app services
 
         for wchar in string.whitespace:
@@ -104,8 +108,9 @@ class DirectoryHandler(BaseHandler):
         """
 
         user_id = requester.user.to_string()
+        room_alias_str = room_alias.to_string()
 
-        if len(room_alias.to_string()) > MAX_ALIAS_LENGTH:
+        if len(room_alias_str) > MAX_ALIAS_LENGTH:
             raise SynapseError(
                 400,
                 "Can't create aliases longer than %s characters" % MAX_ALIAS_LENGTH,
@@ -114,7 +119,7 @@ class DirectoryHandler(BaseHandler):
 
         service = requester.app_service
         if service:
-            if not service.is_interested_in_alias(room_alias.to_string()):
+            if not service.is_interested_in_alias(room_alias_str):
                 raise SynapseError(
                     400,
                     "This application service has not reserved this kind of alias.",
@@ -138,7 +143,7 @@ class DirectoryHandler(BaseHandler):
                 raise AuthError(403, "This user is not permitted to create this alias")
 
             if not self.config.is_alias_creation_allowed(
-                user_id, room_id, room_alias.to_string()
+                user_id, room_id, room_alias_str
             ):
                 # Lets just return a generic message, as there may be all sorts of
                 # reasons why we said no. TODO: Allow configurable error messages
@@ -211,7 +216,7 @@ class DirectoryHandler(BaseHandler):
 
     async def delete_appservice_association(
         self, service: ApplicationService, room_alias: RoomAlias
-    ):
+    ) -> None:
         if not service.is_interested_in_alias(room_alias.to_string()):
             raise SynapseError(
                 400,
@@ -220,7 +225,7 @@ class DirectoryHandler(BaseHandler):
             )
         await self._delete_association(room_alias)
 
-    async def _delete_association(self, room_alias: RoomAlias):
+    async def _delete_association(self, room_alias: RoomAlias) -> str:
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
 
@@ -228,17 +233,19 @@ class DirectoryHandler(BaseHandler):
 
         return room_id
 
-    async def get_association(self, room_alias: RoomAlias):
+    async def get_association(self, room_alias: RoomAlias) -> JsonDict:
         room_id = None
         if self.hs.is_mine(room_alias):
-            result = await self.get_association_from_room_alias(room_alias)
+            result = await self.get_association_from_room_alias(
+                room_alias
+            )  # type: Optional[RoomAliasMapping]
 
             if result:
                 room_id = result.room_id
                 servers = result.servers
         else:
             try:
-                result = await self.federation.make_query(
+                fed_result = await self.federation.make_query(
                     destination=room_alias.domain,
                     query_type="directory",
                     args={"room_alias": room_alias.to_string()},
@@ -248,13 +255,13 @@ class DirectoryHandler(BaseHandler):
             except CodeMessageException as e:
                 logging.warning("Error retrieving alias")
                 if e.code == 404:
-                    result = None
+                    fed_result = None
                 else:
                     raise
 
-            if result and "room_id" in result and "servers" in result:
-                room_id = result["room_id"]
-                servers = result["servers"]
+            if fed_result and "room_id" in fed_result and "servers" in fed_result:
+                room_id = fed_result["room_id"]
+                servers = fed_result["servers"]
 
         if not room_id:
             raise SynapseError(
@@ -275,7 +282,7 @@ class DirectoryHandler(BaseHandler):
 
         return {"room_id": room_id, "servers": servers}
 
-    async def on_directory_query(self, args):
+    async def on_directory_query(self, args: JsonDict) -> JsonDict:
         room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room Alias is not hosted on this homeserver")
@@ -293,7 +300,7 @@ class DirectoryHandler(BaseHandler):
 
     async def _update_canonical_alias(
         self, requester: Requester, user_id: str, room_id: str, room_alias: RoomAlias
-    ):
+    ) -> None:
         """
         Send an updated canonical alias event if the removed alias was set as
         the canonical alias or listed in the alt_aliases field.
@@ -344,7 +351,9 @@ class DirectoryHandler(BaseHandler):
                 ratelimit=False,
             )
 
-    async def get_association_from_room_alias(self, room_alias: RoomAlias):
+    async def get_association_from_room_alias(
+        self, room_alias: RoomAlias
+    ) -> Optional[RoomAliasMapping]:
         result = await self.store.get_association_from_room_alias(room_alias)
         if not result:
             # Query AS to see if it exists
@@ -372,7 +381,7 @@ class DirectoryHandler(BaseHandler):
         # either no interested services, or no service with an exclusive lock
         return True
 
-    async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
+    async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
         """Determine whether a user can delete an alias.
 
         One of the following must be true:
@@ -394,14 +403,13 @@ class DirectoryHandler(BaseHandler):
         if not room_id:
             return False
 
-        res = await self.auth.check_can_change_room_list(
+        return await self.auth.check_can_change_room_list(
             room_id, UserID.from_string(user_id)
         )
-        return res
 
     async def edit_published_room_list(
         self, requester: Requester, room_id: str, visibility: str
-    ):
+    ) -> None:
         """Edit the entry of the room in the published room list.
 
         requester
@@ -469,7 +477,7 @@ class DirectoryHandler(BaseHandler):
 
     async def edit_published_appservice_room_list(
         self, appservice_id: str, network_id: str, room_id: str, visibility: str
-    ):
+    ) -> None:
         """Add or remove a room from the appservice/network specific public
         room list.
 
@@ -499,5 +507,4 @@ class DirectoryHandler(BaseHandler):
                 room_id, requester.user.to_string()
             )
 
-        aliases = await self.store.get_aliases_for_room(room_id)
-        return aliases
+        return await self.store.get_aliases_for_room(room_id)
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 0b3b1fadb5..33d16fbf9c 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -17,7 +17,7 @@
 """Utilities for interacting with Identity Servers"""
 import logging
 import urllib.parse
-from typing import Awaitable, Callable, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Awaitable, Callable, Dict, List, Optional, Tuple
 
 from synapse.api.errors import (
     CodeMessageException,
@@ -41,13 +41,16 @@ from synapse.util.stringutils import (
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 id_server_scheme = "https://"
 
 
 class IdentityHandler(BaseHandler):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         # An HTTP client for contacting trusted URLs.
@@ -80,7 +83,7 @@ class IdentityHandler(BaseHandler):
         request: SynapseRequest,
         medium: str,
         address: str,
-    ):
+    ) -> None:
         """Used to ratelimit requests to `/requestToken` by IP and address.
 
         Args:
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ec8eb21674..49f8aa25ea 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 import random
-from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
@@ -66,7 +66,7 @@ logger = logging.getLogger(__name__)
 class MessageHandler:
     """Contains some read only APIs to get state about a room"""
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.auth = hs.get_auth()
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
@@ -91,7 +91,7 @@ class MessageHandler:
         room_id: str,
         event_type: str,
         state_key: str,
-    ) -> dict:
+    ) -> Optional[EventBase]:
         """Get data from a room.
 
         Args:
@@ -115,6 +115,10 @@ class MessageHandler:
             data = await self.state.get_current_state(room_id, event_type, state_key)
         elif membership == Membership.LEAVE:
             key = (event_type, state_key)
+            # If the membership is not JOIN, then the event ID should exist.
+            assert (
+                membership_event_id is not None
+            ), "check_user_in_room_or_world_readable returned invalid data"
             room_state = await self.state_store.get_state_for_events(
                 [membership_event_id], StateFilter.from_types([key])
             )
@@ -186,10 +190,12 @@ class MessageHandler:
 
             event = last_events[0]
             if visible_events:
-                room_state = await self.state_store.get_state_for_events(
+                room_state_events = await self.state_store.get_state_for_events(
                     [event.event_id], state_filter=state_filter
                 )
-                room_state = room_state[event.event_id]
+                room_state = room_state_events[
+                    event.event_id
+                ]  # type: Mapping[Any, EventBase]
             else:
                 raise AuthError(
                     403,
@@ -210,10 +216,14 @@ class MessageHandler:
                 )
                 room_state = await self.store.get_events(state_ids.values())
             elif membership == Membership.LEAVE:
-                room_state = await self.state_store.get_state_for_events(
+                # If the membership is not JOIN, then the event ID should exist.
+                assert (
+                    membership_event_id is not None
+                ), "check_user_in_room_or_world_readable returned invalid data"
+                room_state_events = await self.state_store.get_state_for_events(
                     [membership_event_id], state_filter=state_filter
                 )
-                room_state = room_state[membership_event_id]
+                room_state = room_state_events[membership_event_id]
 
         now = self.clock.time_msec()
         events = await self._event_serializer.serialize_events(
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index 1c4a43be0a..ee6e41c0e4 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -15,7 +15,7 @@
 import inspect
 import logging
 from typing import TYPE_CHECKING, Dict, Generic, List, Optional, TypeVar, Union
-from urllib.parse import urlencode
+from urllib.parse import urlencode, urlparse
 
 import attr
 import pymacaroons
@@ -68,8 +68,8 @@ logger = logging.getLogger(__name__)
 #
 # Here we have the names of the cookies, and the options we use to set them.
 _SESSION_COOKIES = [
-    (b"oidc_session", b"Path=/_synapse/client/oidc; HttpOnly; Secure; SameSite=None"),
-    (b"oidc_session_no_samesite", b"Path=/_synapse/client/oidc; HttpOnly"),
+    (b"oidc_session", b"HttpOnly; Secure; SameSite=None"),
+    (b"oidc_session_no_samesite", b"HttpOnly"),
 ]
 
 #: A token exchanged from the token endpoint, as per RFC6749 sec 5.1. and
@@ -279,6 +279,13 @@ class OidcProvider:
         self._config = provider
         self._callback_url = hs.config.oidc_callback_url  # type: str
 
+        # Calculate the prefix for OIDC callback paths based on the public_baseurl.
+        # We'll insert this into the Path= parameter of any session cookies we set.
+        public_baseurl_path = urlparse(hs.config.server.public_baseurl).path
+        self._callback_path_prefix = (
+            public_baseurl_path.encode("utf-8") + b"_synapse/client/oidc"
+        )
+
         self._oidc_attribute_requirements = provider.attribute_requirements
         self._scopes = provider.scopes
         self._user_profile_method = provider.user_profile_method
@@ -779,8 +786,13 @@ class OidcProvider:
 
         for cookie_name, options in _SESSION_COOKIES:
             request.cookies.append(
-                b"%s=%s; Max-Age=3600; %s"
-                % (cookie_name, cookie.encode("utf-8"), options)
+                b"%s=%s; Max-Age=3600; Path=%s; %s"
+                % (
+                    cookie_name,
+                    cookie.encode("utf-8"),
+                    self._callback_path_prefix,
+                    options,
+                )
             )
 
         metadata = await self.load_metadata()
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 9938be3821..ebbc234334 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -28,6 +28,7 @@ from bisect import bisect
 from contextlib import contextmanager
 from typing import (
     TYPE_CHECKING,
+    Callable,
     Collection,
     Dict,
     FrozenSet,
@@ -58,7 +59,6 @@ from synapse.replication.http.presence import (
 from synapse.replication.http.streams import ReplicationGetStreamUpdates
 from synapse.replication.tcp.commands import ClearUserSyncsCommand
 from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
-from synapse.state import StateHandler
 from synapse.storage.databases.main import DataStore
 from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
@@ -233,23 +233,23 @@ class BasePresenceHandler(abc.ABC):
         """
 
     async def update_external_syncs_row(
-        self, process_id, user_id, is_syncing, sync_time_msec
-    ):
+        self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
+    ) -> None:
         """Update the syncing users for an external process as a delta.
 
         This is a no-op when presence is handled by a different worker.
 
         Args:
-            process_id (str): An identifier for the process the users are
+            process_id: An identifier for the process the users are
                 syncing against. This allows synapse to process updates
                 as user start and stop syncing against a given process.
-            user_id (str): The user who has started or stopped syncing
-            is_syncing (bool): Whether or not the user is now syncing
-            sync_time_msec(int): Time in ms when the user was last syncing
+            user_id: The user who has started or stopped syncing
+            is_syncing: Whether or not the user is now syncing
+            sync_time_msec: Time in ms when the user was last syncing
         """
         pass
 
-    async def update_external_syncs_clear(self, process_id):
+    async def update_external_syncs_clear(self, process_id: str) -> None:
         """Marks all users that had been marked as syncing by a given process
         as offline.
 
@@ -291,7 +291,6 @@ class BasePresenceHandler(abc.ABC):
             self.store,
             self.presence_router,
             states,
-            self.state,
         )
 
         for destinations, states in hosts_and_states:
@@ -306,7 +305,7 @@ class _NullContextManager(ContextManager[None]):
 
 
 class WorkerPresenceHandler(BasePresenceHandler):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.hs = hs
 
@@ -329,7 +328,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
 
         # user_id -> last_sync_ms. Lists the users that have stopped syncing but
         # we haven't notified the presence writer of that yet
-        self.users_going_offline = {}
+        self.users_going_offline = {}  # type: Dict[str, int]
 
         self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
         self._set_state_client = ReplicationPresenceSetState.make_client(hs)
@@ -348,24 +347,21 @@ class WorkerPresenceHandler(BasePresenceHandler):
             self._on_shutdown,
         )
 
-    def _on_shutdown(self):
+    def _on_shutdown(self) -> None:
         if self._presence_enabled:
             self.hs.get_tcp_replication().send_command(
                 ClearUserSyncsCommand(self.instance_id)
             )
 
-    def send_user_sync(self, user_id, is_syncing, last_sync_ms):
+    def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
         if self._presence_enabled:
             self.hs.get_tcp_replication().send_user_sync(
                 self.instance_id, user_id, is_syncing, last_sync_ms
             )
 
-    def mark_as_coming_online(self, user_id):
+    def mark_as_coming_online(self, user_id: str) -> None:
         """A user has started syncing. Send a UserSync to the presence writer,
         unless they had recently stopped syncing.
-
-        Args:
-            user_id (str)
         """
         going_offline = self.users_going_offline.pop(user_id, None)
         if not going_offline:
@@ -373,18 +369,15 @@ class WorkerPresenceHandler(BasePresenceHandler):
             # were offline
             self.send_user_sync(user_id, True, self.clock.time_msec())
 
-    def mark_as_going_offline(self, user_id):
+    def mark_as_going_offline(self, user_id: str) -> None:
         """A user has stopped syncing. We wait before notifying the presence
         writer as its likely they'll come back soon. This allows us to avoid
         sending a stopped syncing immediately followed by a started syncing
         notification to the presence writer
-
-        Args:
-            user_id (str)
         """
         self.users_going_offline[user_id] = self.clock.time_msec()
 
-    def send_stop_syncing(self):
+    def send_stop_syncing(self) -> None:
         """Check if there are any users who have stopped syncing a while ago and
         haven't come back yet. If there are poke the presence writer about them.
         """
@@ -432,7 +425,9 @@ class WorkerPresenceHandler(BasePresenceHandler):
 
         return _user_syncing()
 
-    async def notify_from_replication(self, states, stream_id):
+    async def notify_from_replication(
+        self, states: List[UserPresenceState], stream_id: int
+    ) -> None:
         parties = await get_interested_parties(self.store, self.presence_router, states)
         room_ids_to_states, users_to_states = parties
 
@@ -480,7 +475,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
             if count > 0
         ]
 
-    async def set_state(self, target_user, state, ignore_status_msg=False):
+    async def set_state(
+        self,
+        target_user: UserID,
+        state: JsonDict,
+        ignore_status_msg: bool = False,
+    ) -> None:
         """Set the presence state of the user."""
         presence = state["presence"]
 
@@ -510,7 +510,7 @@ class WorkerPresenceHandler(BasePresenceHandler):
             ignore_status_msg=ignore_status_msg,
         )
 
-    async def bump_presence_active_time(self, user):
+    async def bump_presence_active_time(self, user: UserID) -> None:
         """We've seen the user do something that indicates they're interacting
         with the app.
         """
@@ -594,8 +594,8 @@ class PresenceHandler(BasePresenceHandler):
         # we assume that all the sync requests on that process have stopped.
         # Stored as a dict from process_id to set of user_id, and a dict of
         # process_id to millisecond timestamp last updated.
-        self.external_process_to_current_syncs = {}  # type: Dict[int, Set[str]]
-        self.external_process_last_updated_ms = {}  # type: Dict[int, int]
+        self.external_process_to_current_syncs = {}  # type: Dict[str, Set[str]]
+        self.external_process_last_updated_ms = {}  # type: Dict[str, int]
 
         self.external_sync_linearizer = Linearizer(name="external_sync_linearizer")
 
@@ -635,7 +635,7 @@ class PresenceHandler(BasePresenceHandler):
         self._event_pos = self.store.get_current_events_token()
         self._event_processing = False
 
-    async def _on_shutdown(self):
+    async def _on_shutdown(self) -> None:
         """Gets called when shutting down. This lets us persist any updates that
         we haven't yet persisted, e.g. updates that only changes some internal
         timers. This allows changes to persist across startup without having to
@@ -664,7 +664,7 @@ class PresenceHandler(BasePresenceHandler):
             )
         logger.info("Finished _on_shutdown")
 
-    async def _persist_unpersisted_changes(self):
+    async def _persist_unpersisted_changes(self) -> None:
         """We periodically persist the unpersisted changes, as otherwise they
         may stack up and slow down shutdown times.
         """
@@ -757,7 +757,6 @@ class PresenceHandler(BasePresenceHandler):
                     self.store,
                     self.presence_router,
                     list(to_federation_ping.values()),
-                    self.state,
                 )
 
                 for destinations, states in hosts_and_states:
@@ -765,7 +764,7 @@ class PresenceHandler(BasePresenceHandler):
                         states, destinations
                     )
 
-    async def _handle_timeouts(self):
+    async def _handle_timeouts(self) -> None:
         """Checks the presence of users that have timed out and updates as
         appropriate.
         """
@@ -817,7 +816,7 @@ class PresenceHandler(BasePresenceHandler):
 
         return await self._update_states(changes)
 
-    async def bump_presence_active_time(self, user):
+    async def bump_presence_active_time(self, user: UserID) -> None:
         """We've seen the user do something that indicates they're interacting
         with the app.
         """
@@ -914,17 +913,17 @@ class PresenceHandler(BasePresenceHandler):
         return []
 
     async def update_external_syncs_row(
-        self, process_id, user_id, is_syncing, sync_time_msec
-    ):
+        self, process_id: str, user_id: str, is_syncing: bool, sync_time_msec: int
+    ) -> None:
         """Update the syncing users for an external process as a delta.
 
         Args:
-            process_id (str): An identifier for the process the users are
+            process_id: An identifier for the process the users are
                 syncing against. This allows synapse to process updates
                 as user start and stop syncing against a given process.
-            user_id (str): The user who has started or stopped syncing
-            is_syncing (bool): Whether or not the user is now syncing
-            sync_time_msec(int): Time in ms when the user was last syncing
+            user_id: The user who has started or stopped syncing
+            is_syncing: Whether or not the user is now syncing
+            sync_time_msec: Time in ms when the user was last syncing
         """
         with (await self.external_sync_linearizer.queue(process_id)):
             prev_state = await self.current_state_for_user(user_id)
@@ -961,7 +960,7 @@ class PresenceHandler(BasePresenceHandler):
 
             self.external_process_last_updated_ms[process_id] = self.clock.time_msec()
 
-    async def update_external_syncs_clear(self, process_id):
+    async def update_external_syncs_clear(self, process_id: str) -> None:
         """Marks all users that had been marked as syncing by a given process
         as offline.
 
@@ -982,12 +981,12 @@ class PresenceHandler(BasePresenceHandler):
             )
             self.external_process_last_updated_ms.pop(process_id, None)
 
-    async def current_state_for_user(self, user_id):
+    async def current_state_for_user(self, user_id: str) -> UserPresenceState:
         """Get the current presence state for a user."""
         res = await self.current_state_for_users([user_id])
         return res[user_id]
 
-    async def _persist_and_notify(self, states):
+    async def _persist_and_notify(self, states: List[UserPresenceState]) -> None:
         """Persist states in the database, poke the notifier and send to
         interested remote servers
         """
@@ -1008,7 +1007,7 @@ class PresenceHandler(BasePresenceHandler):
         # stream (which is updated by `store.update_presence`).
         await self.maybe_send_presence_to_interested_destinations(states)
 
-    async def incoming_presence(self, origin, content):
+    async def incoming_presence(self, origin: str, content: JsonDict) -> None:
         """Called when we receive a `m.presence` EDU from a remote server."""
         if not self._presence_enabled:
             return
@@ -1058,7 +1057,9 @@ class PresenceHandler(BasePresenceHandler):
             federation_presence_counter.inc(len(updates))
             await self._update_states(updates)
 
-    async def set_state(self, target_user, state, ignore_status_msg=False):
+    async def set_state(
+        self, target_user: UserID, state: JsonDict, ignore_status_msg: bool = False
+    ) -> None:
         """Set the presence state of the user."""
         status_msg = state.get("status_msg", None)
         presence = state["presence"]
@@ -1092,7 +1093,7 @@ class PresenceHandler(BasePresenceHandler):
 
         await self._update_states([prev_state.copy_and_replace(**new_fields)])
 
-    async def is_visible(self, observed_user, observer_user):
+    async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
         """Returns whether a user can see another user's presence."""
         observer_room_ids = await self.store.get_rooms_for_user(
             observer_user.to_string()
@@ -1147,7 +1148,7 @@ class PresenceHandler(BasePresenceHandler):
         )
         return rows
 
-    def notify_new_event(self):
+    def notify_new_event(self) -> None:
         """Called when new events have happened. Handles users and servers
         joining rooms and require being sent presence.
         """
@@ -1166,7 +1167,7 @@ class PresenceHandler(BasePresenceHandler):
 
         run_as_background_process("presence.notify_new_event", _process_presence)
 
-    async def _unsafe_process(self):
+    async def _unsafe_process(self) -> None:
         # Loop round handling deltas until we're up to date
         while True:
             with Measure(self.clock, "presence_delta"):
@@ -1191,7 +1192,7 @@ class PresenceHandler(BasePresenceHandler):
                     max_pos
                 )
 
-    async def _handle_state_delta(self, deltas):
+    async def _handle_state_delta(self, deltas: List[JsonDict]) -> None:
         """Process current state deltas to find new joins that need to be
         handled.
         """
@@ -1314,7 +1315,7 @@ class PresenceHandler(BasePresenceHandler):
             return [remote_host], states
 
 
-def should_notify(old_state, new_state):
+def should_notify(old_state: UserPresenceState, new_state: UserPresenceState) -> bool:
     """Decides if a presence state change should be sent to interested parties."""
     if old_state == new_state:
         return False
@@ -1350,7 +1351,9 @@ def should_notify(old_state, new_state):
     return False
 
 
-def format_user_presence_state(state, now, include_user_id=True):
+def format_user_presence_state(
+    state: UserPresenceState, now: int, include_user_id: bool = True
+) -> JsonDict:
     """Convert UserPresenceState to a format that can be sent down to clients
     and to other servers.
 
@@ -1384,16 +1387,15 @@ class PresenceEventSource:
         self.get_presence_router = hs.get_presence_router
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
-        self.state = hs.get_state_handler()
 
     @log_function
     async def get_new_events(
         self,
-        user,
-        from_key,
-        room_ids=None,
-        include_offline=True,
-        explicit_room_id=None,
+        user: UserID,
+        from_key: Optional[int],
+        room_ids: Optional[List[str]] = None,
+        include_offline: bool = True,
+        explicit_room_id: Optional[str] = None,
         **kwargs,
     ) -> Tuple[List[UserPresenceState], int]:
         # The process for getting presence events are:
@@ -1598,7 +1600,7 @@ class PresenceEventSource:
             if update.state != PresenceState.OFFLINE
         ]
 
-    def get_current_key(self):
+    def get_current_key(self) -> int:
         return self.store.get_current_presence_token()
 
     @cached(num_args=2, cache_context=True)
@@ -1658,15 +1660,20 @@ class PresenceEventSource:
         return users_interested_in
 
 
-def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
+def handle_timeouts(
+    user_states: List[UserPresenceState],
+    is_mine_fn: Callable[[str], bool],
+    syncing_user_ids: Set[str],
+    now: int,
+) -> List[UserPresenceState]:
     """Checks the presence of users that have timed out and updates as
     appropriate.
 
     Args:
-        user_states(list): List of UserPresenceState's to check.
-        is_mine_fn (fn): Function that returns if a user_id is ours
-        syncing_user_ids (set): Set of user_ids with active syncs.
-        now (int): Current time in ms.
+        user_states: List of UserPresenceState's to check.
+        is_mine_fn: Function that returns if a user_id is ours
+        syncing_user_ids: Set of user_ids with active syncs.
+        now: Current time in ms.
 
     Returns:
         List of UserPresenceState updates
@@ -1683,14 +1690,16 @@ def handle_timeouts(user_states, is_mine_fn, syncing_user_ids, now):
     return list(changes.values())
 
 
-def handle_timeout(state, is_mine, syncing_user_ids, now):
+def handle_timeout(
+    state: UserPresenceState, is_mine: bool, syncing_user_ids: Set[str], now: int
+) -> Optional[UserPresenceState]:
     """Checks the presence of the user to see if any of the timers have elapsed
 
     Args:
-        state (UserPresenceState)
-        is_mine (bool): Whether the user is ours
-        syncing_user_ids (set): Set of user_ids with active syncs.
-        now (int): Current time in ms.
+        state
+        is_mine: Whether the user is ours
+        syncing_user_ids: Set of user_ids with active syncs.
+        now: Current time in ms.
 
     Returns:
         A UserPresenceState update or None if no update.
@@ -1742,23 +1751,29 @@ def handle_timeout(state, is_mine, syncing_user_ids, now):
     return state if changed else None
 
 
-def handle_update(prev_state, new_state, is_mine, wheel_timer, now):
+def handle_update(
+    prev_state: UserPresenceState,
+    new_state: UserPresenceState,
+    is_mine: bool,
+    wheel_timer: WheelTimer,
+    now: int,
+) -> Tuple[UserPresenceState, bool, bool]:
     """Given a presence update:
         1. Add any appropriate timers.
         2. Check if we should notify anyone.
 
     Args:
-        prev_state (UserPresenceState)
-        new_state (UserPresenceState)
-        is_mine (bool): Whether the user is ours
-        wheel_timer (WheelTimer)
-        now (int): Time now in ms
+        prev_state
+        new_state
+        is_mine: Whether the user is ours
+        wheel_timer
+        now: Time now in ms
 
     Returns:
         3-tuple: `(new_state, persist_and_notify, federation_ping)` where:
             - new_state: is the state to actually persist
-            - persist_and_notify (bool): whether to persist and notify people
-            - federation_ping (bool): whether we should send a ping over federation
+            - persist_and_notify: whether to persist and notify people
+            - federation_ping: whether we should send a ping over federation
     """
     user_id = new_state.user_id
 
@@ -1853,7 +1868,6 @@ async def get_interested_remotes(
     store: DataStore,
     presence_router: PresenceRouter,
     states: List[UserPresenceState],
-    state_handler: StateHandler,
 ) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
     """Given a list of presence states figure out which remote servers
     should be sent which.
@@ -1864,7 +1878,6 @@ async def get_interested_remotes(
         store: The homeserver's data store.
         presence_router: A module for augmenting the destinations for presence updates.
         states: A list of incoming user presence updates.
-        state_handler:
 
     Returns:
         A list of 2-tuples of destinations and states, where for
@@ -1881,7 +1894,8 @@ async def get_interested_remotes(
     )
 
     for room_id, states in room_ids_to_states.items():
-        hosts = await state_handler.get_current_hosts_in_room(room_id)
+        user_ids = await store.get_users_in_room(room_id)
+        hosts = {get_domain_from_id(user_id) for user_id in user_ids}
         hosts_and_states.append((hosts, states))
 
     for user_id, states in users_to_states.items():
@@ -2031,18 +2045,40 @@ class PresenceFederationQueue:
             )
             return result["updates"], result["upto_token"], result["limited"]
 
+        # If the from_token is the current token then there's nothing to return
+        # and we can trivially no-op.
+        if from_token == self._next_id - 1:
+            return [], upto_token, False
+
         # We can find the correct position in the queue by noting that there is
         # exactly one entry per stream ID, and that the last entry has an ID of
         # `self._next_id - 1`, so we can count backwards from the end.
         #
+        # Since we are returning all states in the range `from_token < stream_id
+        # <= upto_token` we look for the index with a `stream_id` of `from_token
+        # + 1`.
+        #
         # Since the start of the queue is periodically truncated we need to
         # handle the case where `from_token` stream ID has already been dropped.
-        start_idx = max(from_token - self._next_id, -len(self._queue))
+        start_idx = max(from_token + 1 - self._next_id, -len(self._queue))
 
         to_send = []  # type: List[Tuple[int, Tuple[str, str]]]
         limited = False
         new_id = upto_token
         for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+            if stream_id <= from_token:
+                # Paranoia check that we are actually only sending states that
+                # are have stream_id strictly greater than from_token. We should
+                # never hit this.
+                logger.warning(
+                    "Tried returning presence federation stream ID: %d less than from_token: %d (next_id: %d, len: %d)",
+                    stream_id,
+                    from_token,
+                    self._next_id,
+                    len(self._queue),
+                )
+                continue
+
             if stream_id > upto_token:
                 break
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 2c5bada1d8..20700fc5a8 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -1044,7 +1044,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
 
 class RoomMemberMasterHandler(RoomMemberHandler):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.distributor = hs.get_distributor()
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 0eeb7c03f2..5414ce77d8 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -13,7 +13,7 @@
 # limitations under the License.
 
 import logging
-from typing import Any
+from typing import TYPE_CHECKING, Any
 
 from twisted.web.client import PartialDownloadError
 
@@ -22,13 +22,16 @@ from synapse.api.errors import Codes, LoginError, SynapseError
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.util import json_decoder
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class UserInteractiveAuthChecker:
     """Abstract base class for an interactive auth checker"""
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         pass
 
     def is_enabled(self) -> bool:
@@ -57,10 +60,10 @@ class UserInteractiveAuthChecker:
 class DummyAuthChecker(UserInteractiveAuthChecker):
     AUTH_TYPE = LoginType.DUMMY
 
-    def is_enabled(self):
+    def is_enabled(self) -> bool:
         return True
 
-    async def check_auth(self, authdict, clientip):
+    async def check_auth(self, authdict: dict, clientip: str) -> Any:
         return True
 
 
@@ -70,24 +73,24 @@ class TermsAuthChecker(UserInteractiveAuthChecker):
     def is_enabled(self):
         return True
 
-    async def check_auth(self, authdict, clientip):
+    async def check_auth(self, authdict: dict, clientip: str) -> Any:
         return True
 
 
 class RecaptchaAuthChecker(UserInteractiveAuthChecker):
     AUTH_TYPE = LoginType.RECAPTCHA
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self._enabled = bool(hs.config.recaptcha_private_key)
         self._http_client = hs.get_proxied_http_client()
         self._url = hs.config.recaptcha_siteverify_api
         self._secret = hs.config.recaptcha_private_key
 
-    def is_enabled(self):
+    def is_enabled(self) -> bool:
         return self._enabled
 
-    async def check_auth(self, authdict, clientip):
+    async def check_auth(self, authdict: dict, clientip: str) -> Any:
         try:
             user_response = authdict["response"]
         except KeyError:
@@ -132,11 +135,11 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
 
 
 class _BaseThreepidAuthChecker:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.store = hs.get_datastore()
 
-    async def _check_threepid(self, medium, authdict):
+    async def _check_threepid(self, medium: str, authdict: dict) -> dict:
         if "threepid_creds" not in authdict:
             raise LoginError(400, "Missing threepid_creds", Codes.MISSING_PARAM)
 
@@ -206,31 +209,31 @@ class _BaseThreepidAuthChecker:
 class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
     AUTH_TYPE = LoginType.EMAIL_IDENTITY
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         UserInteractiveAuthChecker.__init__(self, hs)
         _BaseThreepidAuthChecker.__init__(self, hs)
 
-    def is_enabled(self):
+    def is_enabled(self) -> bool:
         return self.hs.config.threepid_behaviour_email in (
             ThreepidBehaviour.REMOTE,
             ThreepidBehaviour.LOCAL,
         )
 
-    async def check_auth(self, authdict, clientip):
+    async def check_auth(self, authdict: dict, clientip: str) -> Any:
         return await self._check_threepid("email", authdict)
 
 
 class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
     AUTH_TYPE = LoginType.MSISDN
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         UserInteractiveAuthChecker.__init__(self, hs)
         _BaseThreepidAuthChecker.__init__(self, hs)
 
-    def is_enabled(self):
+    def is_enabled(self) -> bool:
         return bool(self.hs.config.account_threepid_delegate_msisdn)
 
-    async def check_auth(self, authdict, clientip):
+    async def check_auth(self, authdict: dict, clientip: str) -> Any:
         return await self._check_threepid("msisdn", authdict)
 
 
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 32b5e19c09..671fd3fbcc 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,13 +14,14 @@
 import contextlib
 import logging
 import time
-from typing import Optional, Tuple, Type, Union
+from typing import Optional, Tuple, Union
 
 import attr
 from zope.interface import implementer
 
-from twisted.internet.interfaces import IAddress
+from twisted.internet.interfaces import IAddress, IReactorTime
 from twisted.python.failure import Failure
+from twisted.web.resource import IResource
 from twisted.web.server import Request, Site
 
 from synapse.config.server import ListenerConfig
@@ -49,6 +50,7 @@ class SynapseRequest(Request):
      * Redaction of access_token query-params in __repr__
      * Logging at start and end
      * Metrics to record CPU, wallclock and DB time by endpoint.
+     * A limit to the size of request which will be accepted
 
     It also provides a method `processing`, which returns a context manager. If this
     method is called, the request won't be logged until the context manager is closed;
@@ -59,8 +61,9 @@ class SynapseRequest(Request):
         logcontext: the log context for this request
     """
 
-    def __init__(self, channel, *args, **kw):
+    def __init__(self, channel, *args, max_request_body_size=1024, **kw):
         Request.__init__(self, channel, *args, **kw)
+        self._max_request_body_size = max_request_body_size
         self.site = channel.site  # type: SynapseSite
         self._channel = channel  # this is used by the tests
         self.start_time = 0.0
@@ -97,6 +100,18 @@ class SynapseRequest(Request):
             self.site.site_tag,
         )
 
+    def handleContentChunk(self, data):
+        # we should have a `content` by now.
+        assert self.content, "handleContentChunk() called before gotLength()"
+        if self.content.tell() + len(data) > self._max_request_body_size:
+            logger.warning(
+                "Aborting connection from %s because the request exceeds maximum size",
+                self.client,
+            )
+            self.transport.abortConnection()
+            return
+        super().handleContentChunk(data)
+
     @property
     def requester(self) -> Optional[Union[Requester, str]]:
         return self._requester
@@ -485,29 +500,55 @@ class _XForwardedForAddress:
 
 class SynapseSite(Site):
     """
-    Subclass of a twisted http Site that does access logging with python's
-    standard logging
+    Synapse-specific twisted http Site
+
+    This does two main things.
+
+    First, it replaces the requestFactory in use so that we build SynapseRequests
+    instead of regular t.w.server.Requests. All of the  constructor params are really
+    just parameters for SynapseRequest.
+
+    Second, it inhibits the log() method called by Request.finish, since SynapseRequest
+    does its own logging.
     """
 
     def __init__(
         self,
-        logger_name,
-        site_tag,
+        logger_name: str,
+        site_tag: str,
         config: ListenerConfig,
-        resource,
+        resource: IResource,
         server_version_string,
-        *args,
-        **kwargs,
+        max_request_body_size: int,
+        reactor: IReactorTime,
     ):
-        Site.__init__(self, resource, *args, **kwargs)
+        """
+
+        Args:
+            logger_name:  The name of the logger to use for access logs.
+            site_tag:  A tag to use for this site - mostly in access logs.
+            config:  Configuration for the HTTP listener corresponding to this site
+            resource:  The base of the resource tree to be used for serving requests on
+                this site
+            server_version_string: A string to present for the Server header
+            max_request_body_size: Maximum request body length to allow before
+                dropping the connection
+            reactor: reactor to be used to manage connection timeouts
+        """
+        Site.__init__(self, resource, reactor=reactor)
 
         self.site_tag = site_tag
 
         assert config.http_options is not None
         proxied = config.http_options.x_forwarded
-        self.requestFactory = (
-            XForwardedForRequest if proxied else SynapseRequest
-        )  # type: Type[Request]
+        request_class = XForwardedForRequest if proxied else SynapseRequest
+
+        def request_factory(channel, queued) -> Request:
+            return request_class(
+                channel, max_request_body_size=max_request_body_size, queued=queued
+            )
+
+        self.requestFactory = request_factory  # type: ignore
         self.access_logger = logging.getLogger(logger_name)
         self.server_version_string = server_version_string.encode("ascii")
 
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 6ad5cf582c..d58eeeaa74 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -85,7 +85,7 @@ REQUIREMENTS = [
     "typing-extensions>=3.7.4",
     # We enforce that we have a `cryptography` version that bundles an `openssl`
     # with the latest security patches.
-    "cryptography>=3.4.7;python_version>='3.6'",
+    "cryptography>=3.4.7",
 ]
 
 CONDITIONAL_REQUIREMENTS = {
@@ -100,14 +100,9 @@ CONDITIONAL_REQUIREMENTS = {
     # that use the protocol, such as Let's Encrypt.
     "acme": [
         "txacme>=0.9.2",
-        # txacme depends on eliot. Eliot 1.8.0 is incompatible with
-        # python 3.5.2, as per https://github.com/itamarst/eliot/issues/418
-        "eliot<1.8.0;python_version<'3.5.3'",
     ],
     "saml2": [
-        # pysaml2 6.4.0 is incompatible with Python 3.5 (see https://github.com/IdentityPython/pysaml2/issues/749)
-        "pysaml2>=4.5.0,<6.4.0;python_version<'3.6'",
-        "pysaml2>=4.5.0;python_version>='3.6'",
+        "pysaml2>=4.5.0",
     ],
     "oidc": ["authlib>=0.14.0"],
     # systemd-python is necessary for logging to the systemd journal via
diff --git a/synapse/rest/admin/rooms.py b/synapse/rest/admin/rooms.py
index d0cf121743..f289ffe3d0 100644
--- a/synapse/rest/admin/rooms.py
+++ b/synapse/rest/admin/rooms.py
@@ -37,9 +37,11 @@ from synapse.types import JsonDict, RoomAlias, RoomID, UserID, create_requester
 from synapse.util import json_decoder
 
 if TYPE_CHECKING:
+    from synapse.api.auth import Auth
+    from synapse.handlers.pagination import PaginationHandler
+    from synapse.handlers.room import RoomShutdownHandler
     from synapse.server import HomeServer
 
-
 logger = logging.getLogger(__name__)
 
 
@@ -146,50 +148,14 @@ class DeleteRoomRestServlet(RestServlet):
     async def on_POST(
         self, request: SynapseRequest, room_id: str
     ) -> Tuple[int, JsonDict]:
-        requester = await self.auth.get_user_by_req(request)
-        await assert_user_is_admin(self.auth, requester.user)
-
-        content = parse_json_object_from_request(request)
-
-        block = content.get("block", False)
-        if not isinstance(block, bool):
-            raise SynapseError(
-                HTTPStatus.BAD_REQUEST,
-                "Param 'block' must be a boolean, if given",
-                Codes.BAD_JSON,
-            )
-
-        purge = content.get("purge", True)
-        if not isinstance(purge, bool):
-            raise SynapseError(
-                HTTPStatus.BAD_REQUEST,
-                "Param 'purge' must be a boolean, if given",
-                Codes.BAD_JSON,
-            )
-
-        force_purge = content.get("force_purge", False)
-        if not isinstance(force_purge, bool):
-            raise SynapseError(
-                HTTPStatus.BAD_REQUEST,
-                "Param 'force_purge' must be a boolean, if given",
-                Codes.BAD_JSON,
-            )
-
-        ret = await self.room_shutdown_handler.shutdown_room(
-            room_id=room_id,
-            new_room_user_id=content.get("new_room_user_id"),
-            new_room_name=content.get("room_name"),
-            message=content.get("message"),
-            requester_user_id=requester.user.to_string(),
-            block=block,
+        return await _delete_room(
+            request,
+            room_id,
+            self.auth,
+            self.room_shutdown_handler,
+            self.pagination_handler,
         )
 
-        # Purge room
-        if purge:
-            await self.pagination_handler.purge_room(room_id, force=force_purge)
-
-        return (200, ret)
-
 
 class ListRoomRestServlet(RestServlet):
     """
@@ -282,7 +248,22 @@ class ListRoomRestServlet(RestServlet):
 
 
 class RoomRestServlet(RestServlet):
-    """Get room details.
+    """Manage a room.
+
+    On GET : Get details of a room.
+
+    On DELETE : Delete a room from server.
+
+    It is a combination and improvement of shutdown and purge room.
+
+    Shuts down a room by removing all local users from the room.
+    Blocking all future invites and joins to the room is optional.
+
+    If desired any local aliases will be repointed to a new room
+    created by `new_room_user_id` and kicked users will be auto-
+    joined to the new room.
+
+    If 'purge' is true, it will remove all traces of a room from the database.
 
     TODO: Add on_POST to allow room creation without joining the room
     """
@@ -293,6 +274,8 @@ class RoomRestServlet(RestServlet):
         self.hs = hs
         self.auth = hs.get_auth()
         self.store = hs.get_datastore()
+        self.room_shutdown_handler = hs.get_room_shutdown_handler()
+        self.pagination_handler = hs.get_pagination_handler()
 
     async def on_GET(
         self, request: SynapseRequest, room_id: str
@@ -308,6 +291,17 @@ class RoomRestServlet(RestServlet):
 
         return (200, ret)
 
+    async def on_DELETE(
+        self, request: SynapseRequest, room_id: str
+    ) -> Tuple[int, JsonDict]:
+        return await _delete_room(
+            request,
+            room_id,
+            self.auth,
+            self.room_shutdown_handler,
+            self.pagination_handler,
+        )
+
 
 class RoomMembersRestServlet(RestServlet):
     """
@@ -694,3 +688,55 @@ class RoomEventContextServlet(RestServlet):
         )
 
         return 200, results
+
+
+async def _delete_room(
+    request: SynapseRequest,
+    room_id: str,
+    auth: "Auth",
+    room_shutdown_handler: "RoomShutdownHandler",
+    pagination_handler: "PaginationHandler",
+) -> Tuple[int, JsonDict]:
+    requester = await auth.get_user_by_req(request)
+    await assert_user_is_admin(auth, requester.user)
+
+    content = parse_json_object_from_request(request)
+
+    block = content.get("block", False)
+    if not isinstance(block, bool):
+        raise SynapseError(
+            HTTPStatus.BAD_REQUEST,
+            "Param 'block' must be a boolean, if given",
+            Codes.BAD_JSON,
+        )
+
+    purge = content.get("purge", True)
+    if not isinstance(purge, bool):
+        raise SynapseError(
+            HTTPStatus.BAD_REQUEST,
+            "Param 'purge' must be a boolean, if given",
+            Codes.BAD_JSON,
+        )
+
+    force_purge = content.get("force_purge", False)
+    if not isinstance(force_purge, bool):
+        raise SynapseError(
+            HTTPStatus.BAD_REQUEST,
+            "Param 'force_purge' must be a boolean, if given",
+            Codes.BAD_JSON,
+        )
+
+    ret = await room_shutdown_handler.shutdown_room(
+        room_id=room_id,
+        new_room_user_id=content.get("new_room_user_id"),
+        new_room_name=content.get("room_name"),
+        message=content.get("message"),
+        requester_user_id=requester.user.to_string(),
+        block=block,
+    )
+
+    # Purge room
+    if purge:
+        await pagination_handler.purge_room(room_id, force=force_purge)
+
+    return (200, ret)
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index edda7861fa..8c9d21d3ea 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -14,6 +14,7 @@
 import hashlib
 import hmac
 import logging
+import secrets
 from http import HTTPStatus
 from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
@@ -375,7 +376,7 @@ class UserRegisterServlet(RestServlet):
         """
         self._clear_old_nonces()
 
-        nonce = self.hs.get_secrets().token_hex(64)
+        nonce = secrets.token_hex(64)
         self.nonces[nonce] = int(self.reactor.seconds())
         return 200, {"nonce": nonce}
 
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index c4550d3cf0..b19cd8afc5 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -32,14 +32,6 @@ TEMPLATE_LANGUAGE = "en"
 
 logger = logging.getLogger(__name__)
 
-# use hmac.compare_digest if we have it (python 2.7.7), else just use equality
-if hasattr(hmac, "compare_digest"):
-    compare_digest = hmac.compare_digest
-else:
-
-    def compare_digest(a, b):
-        return a == b
-
 
 class ConsentResource(DirectServeHtmlResource):
     """A twisted Resource to display a privacy policy and gather consent to it
@@ -209,5 +201,5 @@ class ConsentResource(DirectServeHtmlResource):
             .encode("ascii")
         )
 
-        if not compare_digest(want_mac, userhmac):
+        if not hmac.compare_digest(want_mac, userhmac):
             raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect")
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 4088e7a059..09531ebf54 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -21,7 +21,7 @@ from typing import Callable, List
 NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
 
 
-def _wrap_in_base_path(func: "Callable[..., str]") -> "Callable[..., str]":
+def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]:
     """Takes a function that returns a relative path and turns it into an
     absolute path based on the location of the primary media store
     """
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index 80f017a4dd..024a105bf2 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -51,8 +51,6 @@ class UploadResource(DirectServeJsonResource):
 
     async def _async_render_POST(self, request: SynapseRequest) -> None:
         requester = await self.auth.get_user_by_req(request)
-        # TODO: The checks here are a bit late. The content will have
-        # already been uploaded to a tmp file at this point
         content_length = request.getHeader("Content-Length")
         if content_length is None:
             raise SynapseError(msg="Request must specify a Content-Length", code=400)
diff --git a/synapse/secrets.py b/synapse/secrets.py
deleted file mode 100644
index bf829251fd..0000000000
--- a/synapse/secrets.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Injectable secrets module for Synapse.
-
-See https://docs.python.org/3/library/secrets.html#module-secrets for the API
-used in Python 3.6, and the API emulated in Python 2.7.
-"""
-import sys
-
-# secrets is available since python 3.6
-if sys.version_info[0:2] >= (3, 6):
-    import secrets
-
-    class Secrets:
-        def token_bytes(self, nbytes: int = 32) -> bytes:
-            return secrets.token_bytes(nbytes)
-
-        def token_hex(self, nbytes: int = 32) -> str:
-            return secrets.token_hex(nbytes)
-
-
-else:
-    import binascii
-    import os
-
-    class Secrets:
-        def token_bytes(self, nbytes: int = 32) -> bytes:
-            return os.urandom(nbytes)
-
-        def token_hex(self, nbytes: int = 32) -> str:
-            return binascii.hexlify(self.token_bytes(nbytes)).decode("ascii")
diff --git a/synapse/server.py b/synapse/server.py
index 8c147be2b3..2337d2d9b4 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -126,7 +126,6 @@ from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
 )
-from synapse.secrets import Secrets
 from synapse.server_notices.server_notices_manager import ServerNoticesManager
 from synapse.server_notices.server_notices_sender import ServerNoticesSender
 from synapse.server_notices.worker_server_notices_sender import (
@@ -287,6 +286,14 @@ class HomeServer(metaclass=abc.ABCMeta):
         if self.config.run_background_tasks:
             self.setup_background_tasks()
 
+    def start_listening(self) -> None:
+        """Start the HTTP, manhole, metrics, etc listeners
+
+        Does nothing in this base class; overridden in derived classes to start the
+        appropriate listeners.
+        """
+        pass
+
     def setup_background_tasks(self) -> None:
         """
         Some handlers have side effects on instantiation (like registering
@@ -634,10 +641,6 @@ class HomeServer(metaclass=abc.ABCMeta):
         return GroupAttestionRenewer(self)
 
     @cache_in_self
-    def get_secrets(self) -> Secrets:
-        return Secrets()
-
-    @cache_in_self
     def get_stats_handler(self) -> StatsHandler:
         return StatsHandler(self)
 
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index d472676acf..6b68d8720c 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -114,7 +114,7 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
         db_content = db_content.tobytes()
 
     # Decode it to a Unicode string before feeding it to the JSON decoder, since
-    # Python 3.5 does not support deserializing bytes.
+    # it only supports handling strings
     if isinstance(db_content, (bytes, bytearray)):
         db_content = db_content.decode("utf8")
 
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 9452368bf0..a761ad603b 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -171,10 +171,7 @@ class LoggingDatabaseConnection:
 
 
 # The type of entry which goes on our after_callbacks and exception_callbacks lists.
-#
-# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so
-# that mypy sees the type but the runtime python doesn't.
-_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]]
+_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]]
 
 
 R = TypeVar("R")
@@ -221,7 +218,7 @@ class LoggingTransaction:
         self.after_callbacks = after_callbacks
         self.exception_callbacks = exception_callbacks
 
-    def call_after(self, callback: "Callable[..., None]", *args: Any, **kwargs: Any):
+    def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any):
         """Call the given callback on the main twisted thread after the
         transaction has finished. Used to invalidate the caches on the
         correct thread.
@@ -233,7 +230,7 @@ class LoggingTransaction:
         self.after_callbacks.append((callback, args, kwargs))
 
     def call_on_exception(
-        self, callback: "Callable[..., None]", *args: Any, **kwargs: Any
+        self, callback: Callable[..., None], *args: Any, **kwargs: Any
     ):
         # if self.exception_callbacks is None, that means that whatever constructed the
         # LoggingTransaction isn't expecting there to be any callbacks; assert that
@@ -485,7 +482,7 @@ class DatabasePool:
         desc: str,
         after_callbacks: List[_CallbackListEntry],
         exception_callbacks: List[_CallbackListEntry],
-        func: "Callable[..., R]",
+        func: Callable[..., R],
         *args: Any,
         **kwargs: Any,
     ) -> R:
@@ -618,7 +615,7 @@ class DatabasePool:
     async def runInteraction(
         self,
         desc: str,
-        func: "Callable[..., R]",
+        func: Callable[..., R],
         *args: Any,
         db_autocommit: bool = False,
         **kwargs: Any,
@@ -678,7 +675,7 @@ class DatabasePool:
 
     async def runWithConnection(
         self,
-        func: "Callable[..., R]",
+        func: Callable[..., R],
         *args: Any,
         db_autocommit: bool = False,
         **kwargs: Any,
@@ -718,7 +715,9 @@ class DatabasePool:
             # pool).
             assert not self.engine.in_transaction(conn)
 
-            with LoggingContext("runWithConnection", parent_context) as context:
+            with LoggingContext(
+                str(curr_context), parent_context=parent_context
+            ) as context:
                 sched_duration_sec = monotonic_time() - start_time
                 sql_scheduling_timer.observe(sched_duration_sec)
                 context.add_database_scheduled(sched_duration_sec)
diff --git a/synapse/storage/databases/main/transactions.py b/synapse/storage/databases/main/transactions.py
index b28ca61f80..82335e7a9d 100644
--- a/synapse/storage/databases/main/transactions.py
+++ b/synapse/storage/databases/main/transactions.py
@@ -14,7 +14,7 @@
 
 import logging
 from collections import namedtuple
-from typing import Dict, List, Optional, Tuple
+from typing import Iterable, List, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
 
@@ -295,33 +295,37 @@ class TransactionStore(TransactionWorkerStore):
                 },
             )
 
-    async def bulk_store_destination_rooms_entries(
-        self, room_and_destination_to_ordering: Dict[Tuple[str, str], int]
-    ):
+    async def store_destination_rooms_entries(
+        self,
+        destinations: Iterable[str],
+        room_id: str,
+        stream_ordering: int,
+    ) -> None:
         """
-        Updates or creates `destination_rooms` entries for a number of events.
+        Updates or creates `destination_rooms` entries in batch for a single event.
 
         Args:
-            room_and_destination_to_ordering: A mapping of (room, destination) -> stream_id
+            destinations: list of destinations
+            room_id: the room_id of the event
+            stream_ordering: the stream_ordering of the event
         """
 
         await self.db_pool.simple_upsert_many(
             table="destinations",
             key_names=("destination",),
-            key_values={(d,) for _, d in room_and_destination_to_ordering.keys()},
+            key_values=[(d,) for d in destinations],
             value_names=[],
             value_values=[],
             desc="store_destination_rooms_entries_dests",
         )
 
+        rows = [(destination, room_id) for destination in destinations]
         await self.db_pool.simple_upsert_many(
             table="destination_rooms",
-            key_names=("room_id", "destination"),
-            key_values=list(room_and_destination_to_ordering.keys()),
+            key_names=("destination", "room_id"),
+            key_values=rows,
             value_names=["stream_ordering"],
-            value_values=[
-                (stream_id,) for stream_id in room_and_destination_to_ordering.values()
-            ],
+            value_values=[(stream_ordering,)] * len(rows),
             desc="store_destination_rooms_entries_rooms",
         )
 
diff --git a/synapse/util/caches/lrucache.py b/synapse/util/caches/lrucache.py
index 5c8ef444fa..0cbd6932f6 100644
--- a/synapse/util/caches/lrucache.py
+++ b/synapse/util/caches/lrucache.py
@@ -17,8 +17,10 @@ from functools import wraps
 from typing import (
     Any,
     Callable,
+    Collection,
     Generic,
     Iterable,
+    List,
     Optional,
     Type,
     TypeVar,
@@ -83,13 +85,30 @@ class _Node:
     __slots__ = ["prev_node", "next_node", "key", "value", "callbacks", "memory"]
 
     def __init__(
-        self, prev_node, next_node, key, value, callbacks: Optional[set] = None
+        self,
+        prev_node,
+        next_node,
+        key,
+        value,
+        callbacks: Collection[Callable[[], None]] = (),
     ):
         self.prev_node = prev_node
         self.next_node = next_node
         self.key = key
         self.value = value
-        self.callbacks = callbacks or set()
+
+        # Set of callbacks to run when the node gets deleted. We store as a list
+        # rather than a set to keep memory usage down (and since we expect few
+        # entries per node, the performance of checking for duplication in a
+        # list vs using a set is negligible).
+        #
+        # Note that we store this as an optional list to keep the memory
+        # footprint down. Storing `None` is free as its a singleton, while empty
+        # lists are 56 bytes (and empty sets are 216 bytes, if we did the naive
+        # thing and used sets).
+        self.callbacks = None  # type: Optional[List[Callable[[], None]]]
+
+        self.add_callbacks(callbacks)
 
         self.memory = 0
         if TRACK_MEMORY_USAGE:
@@ -101,6 +120,32 @@ class _Node:
             )
             self.memory += _get_size_of(self.memory, recurse=False)
 
+    def add_callbacks(self, callbacks: Collection[Callable[[], None]]) -> None:
+        """Add to stored list of callbacks, removing duplicates."""
+
+        if not callbacks:
+            return
+
+        if not self.callbacks:
+            self.callbacks = []
+
+        for callback in callbacks:
+            if callback not in self.callbacks:
+                self.callbacks.append(callback)
+
+    def run_and_clear_callbacks(self) -> None:
+        """Run all callbacks and clear the stored list of callbacks. Used when
+        the node is being deleted.
+        """
+
+        if not self.callbacks:
+            return
+
+        for callback in self.callbacks:
+            callback()
+
+        self.callbacks = None
+
 
 class LruCache(Generic[KT, VT]):
     """
@@ -213,10 +258,10 @@ class LruCache(Generic[KT, VT]):
 
         self.len = synchronized(cache_len)
 
-        def add_node(key, value, callbacks: Optional[set] = None):
+        def add_node(key, value, callbacks: Collection[Callable[[], None]] = ()):
             prev_node = list_root
             next_node = prev_node.next_node
-            node = _Node(prev_node, next_node, key, value, callbacks or set())
+            node = _Node(prev_node, next_node, key, value, callbacks)
             prev_node.next_node = node
             next_node.prev_node = node
             cache[key] = node
@@ -250,9 +295,7 @@ class LruCache(Generic[KT, VT]):
                 deleted_len = size_callback(node.value)
                 cached_cache_len[0] -= deleted_len
 
-            for cb in node.callbacks:
-                cb()
-            node.callbacks.clear()
+            node.run_and_clear_callbacks()
 
             if TRACK_MEMORY_USAGE and metrics:
                 metrics.dec_memory_usage(node.memory)
@@ -263,7 +306,7 @@ class LruCache(Generic[KT, VT]):
         def cache_get(
             key: KT,
             default: Literal[None] = None,
-            callbacks: Iterable[Callable[[], None]] = ...,
+            callbacks: Collection[Callable[[], None]] = ...,
             update_metrics: bool = ...,
         ) -> Optional[VT]:
             ...
@@ -272,7 +315,7 @@ class LruCache(Generic[KT, VT]):
         def cache_get(
             key: KT,
             default: T,
-            callbacks: Iterable[Callable[[], None]] = ...,
+            callbacks: Collection[Callable[[], None]] = ...,
             update_metrics: bool = ...,
         ) -> Union[T, VT]:
             ...
@@ -281,13 +324,13 @@ class LruCache(Generic[KT, VT]):
         def cache_get(
             key: KT,
             default: Optional[T] = None,
-            callbacks: Iterable[Callable[[], None]] = (),
+            callbacks: Collection[Callable[[], None]] = (),
             update_metrics: bool = True,
         ):
             node = cache.get(key, None)
             if node is not None:
                 move_node_to_front(node)
-                node.callbacks.update(callbacks)
+                node.add_callbacks(callbacks)
                 if update_metrics and metrics:
                     metrics.inc_hits()
                 return node.value
@@ -303,10 +346,8 @@ class LruCache(Generic[KT, VT]):
                 # We sometimes store large objects, e.g. dicts, which cause
                 # the inequality check to take a long time. So let's only do
                 # the check if we have some callbacks to call.
-                if node.callbacks and value != node.value:
-                    for cb in node.callbacks:
-                        cb()
-                    node.callbacks.clear()
+                if value != node.value:
+                    node.run_and_clear_callbacks()
 
                 # We don't bother to protect this by value != node.value as
                 # generally size_callback will be cheap compared with equality
@@ -316,7 +357,7 @@ class LruCache(Generic[KT, VT]):
                     cached_cache_len[0] -= size_callback(node.value)
                     cached_cache_len[0] += size_callback(value)
 
-                node.callbacks.update(callbacks)
+                node.add_callbacks(callbacks)
 
                 move_node_to_front(node)
                 node.value = value
@@ -369,8 +410,7 @@ class LruCache(Generic[KT, VT]):
             list_root.next_node = list_root
             list_root.prev_node = list_root
             for node in cache.values():
-                for cb in node.callbacks:
-                    cb()
+                node.run_and_clear_callbacks()
             cache.clear()
             if size_callback:
                 cached_cache_len[0] = 0
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 2529845c9e..25ea1bcc91 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -110,7 +110,7 @@ class ResponseCache(Generic[T]):
         return result.observe()
 
     def wrap(
-        self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any
+        self, key: T, callback: Callable[..., Any], *args: Any, **kwargs: Any
     ) -> defer.Deferred:
         """Wrap together a *get* and *set* call, taking care of logcontexts