summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorAndrew Morgan <andrew@amorgan.xyz>2021-04-27 13:46:16 +0100
committerAndrew Morgan <andrew@amorgan.xyz>2021-04-27 13:46:16 +0100
commit9773abf3d8e1b2711e936ff150d8ae2f48f98be9 (patch)
treeb8c98e96f1262bba37a7ec58bcc7373fdd99a751 /synapse
parentMerge remote-tracking branch 'origin/release-v1.32.2' into matrix-org-hotfixes (diff)
parentRemove various bits of compatibility code for Python <3.6 (#9879) (diff)
downloadsynapse-9773abf3d8e1b2711e936ff150d8ae2f48f98be9.tar.xz
Merge branch 'develop' of github.com:matrix-org/synapse into matrix-org-hotfixes
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py6
-rw-r--r--synapse/api/auth.py85
-rw-r--r--synapse/api/auth_blocking.py9
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/app/_base.py30
-rw-r--r--synapse/app/admin_cmd.py8
-rw-r--r--synapse/app/generic_worker.py43
-rw-r--r--synapse/app/homeserver.py42
-rw-r--r--synapse/config/_base.pyi22
-rw-r--r--synapse/config/account_validity.py165
-rw-r--r--synapse/config/consent.py (renamed from synapse/config/consent_config.py)0
-rw-r--r--synapse/config/emailconfig.py2
-rw-r--r--synapse/config/homeserver.py13
-rw-r--r--synapse/config/jwt.py (renamed from synapse/config/jwt_config.py)0
-rw-r--r--synapse/config/logger.py3
-rw-r--r--synapse/config/oidc.py (renamed from synapse/config/oidc_config.py)11
-rw-r--r--synapse/config/registration.py129
-rw-r--r--synapse/config/saml2.py (renamed from synapse/config/saml2_config.py)7
-rw-r--r--synapse/config/server.py8
-rw-r--r--synapse/config/server_notices.py (renamed from synapse/config/server_notices_config.py)0
-rw-r--r--synapse/config/workers.py27
-rw-r--r--synapse/event_auth.py10
-rw-r--r--synapse/events/spamcheck.py3
-rw-r--r--synapse/federation/federation_client.py118
-rw-r--r--synapse/federation/send_queue.py4
-rw-r--r--synapse/federation/sender/__init__.py18
-rw-r--r--synapse/handlers/account_validity.py101
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/cas.py (renamed from synapse/handlers/cas_handler.py)0
-rw-r--r--synapse/handlers/deactivate_account.py4
-rw-r--r--synapse/handlers/device.py23
-rw-r--r--synapse/handlers/event_auth.py86
-rw-r--r--synapse/handlers/federation.py46
-rw-r--r--synapse/handlers/identity.py29
-rw-r--r--synapse/handlers/oidc.py (renamed from synapse/handlers/oidc_handler.py)32
-rw-r--r--synapse/handlers/presence.py312
-rw-r--r--synapse/handlers/room_member.py62
-rw-r--r--synapse/handlers/saml.py (renamed from synapse/handlers/saml_handler.py)0
-rw-r--r--synapse/handlers/sso.py3
-rw-r--r--synapse/handlers/sync.py13
-rw-r--r--synapse/http/client.py15
-rw-r--r--synapse/http/matrixfederationclient.py43
-rw-r--r--synapse/http/site.py69
-rw-r--r--synapse/logging/_remote.py4
-rw-r--r--synapse/notifier.py9
-rw-r--r--synapse/push/bulk_push_rule_evaluator.py161
-rw-r--r--synapse/push/emailpusher.py9
-rw-r--r--synapse/push/pusherpool.py8
-rw-r--r--synapse/python_dependencies.py9
-rw-r--r--synapse/replication/http/_base.py5
-rw-r--r--synapse/replication/slave/storage/presence.py50
-rw-r--r--synapse/replication/tcp/client.py7
-rw-r--r--synapse/replication/tcp/handler.py18
-rw-r--r--synapse/replication/tcp/protocol.py3
-rw-r--r--synapse/replication/tcp/streams/__init__.py3
-rw-r--r--synapse/replication/tcp/streams/_base.py41
-rw-r--r--synapse/res/templates/account_previously_renewed.html1
-rw-r--r--synapse/res/templates/account_renewed.html2
-rw-r--r--synapse/rest/admin/users.py3
-rw-r--r--synapse/rest/client/v1/presence.py7
-rw-r--r--synapse/rest/client/v2_alpha/account.py8
-rw-r--r--synapse/rest/client/v2_alpha/account_validity.py32
-rw-r--r--synapse/rest/client/v2_alpha/register.py10
-rw-r--r--synapse/rest/consent/consent_resource.py10
-rw-r--r--synapse/rest/key/v2/remote_key_resource.py4
-rw-r--r--synapse/rest/media/v1/filepath.py2
-rw-r--r--synapse/rest/media/v1/upload_resource.py2
-rw-r--r--synapse/rest/synapse/client/new_user_consent.py9
-rw-r--r--synapse/secrets.py44
-rw-r--r--synapse/server.py34
-rw-r--r--synapse/state/__init__.py3
-rw-r--r--synapse/state/v2.py3
-rw-r--r--synapse/storage/_base.py6
-rw-r--r--synapse/storage/database.py17
-rw-r--r--synapse/storage/databases/main/__init__.py47
-rw-r--r--synapse/storage/databases/main/devices.py23
-rw-r--r--synapse/storage/databases/main/event_federation.py3
-rw-r--r--synapse/storage/databases/main/events.py10
-rw-r--r--synapse/storage/databases/main/events_worker.py13
-rw-r--r--synapse/storage/databases/main/presence.py92
-rw-r--r--synapse/storage/databases/main/registration.py62
-rw-r--r--synapse/storage/databases/main/roommember.py173
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql18
-rw-r--r--synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres20
-rw-r--r--synapse/storage/databases/main/search.py3
-rw-r--r--synapse/storage/databases/main/stream.py4
-rw-r--r--synapse/storage/persist_events.py3
-rw-r--r--synapse/storage/prepare_database.py3
-rw-r--r--synapse/types.py31
-rw-r--r--synapse/util/caches/response_cache.py2
-rw-r--r--synapse/util/caches/stream_change_cache.py3
-rw-r--r--synapse/util/iterutils.py3
-rw-r--r--synapse/util/stringutils.py32
-rw-r--r--synapse/util/threepids.py30
96 files changed, 1754 insertions, 973 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index 5bfae24cbd..fbd49a93e1 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -21,8 +21,8 @@ import os
 import sys
 
 # Check that we're not running on an unsupported Python version.
-if sys.version_info < (3, 5):
-    print("Synapse requires Python 3.5 or above.")
+if sys.version_info < (3, 6):
+    print("Synapse requires Python 3.6 or above.")
     sys.exit(1)
 
 # Twisted and canonicaljson will fail to import when this file is executed to
@@ -47,7 +47,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.32.1"
+__version__ = "1.32.2"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 6c13f53957..efc926d094 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -12,14 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import List, Optional, Tuple
+from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple
 
 import pymacaroons
 from netaddr import IPAddress
 
 from twisted.web.server import Request
 
-import synapse.types
 from synapse import event_auth
 from synapse.api.auth_blocking import AuthBlocking
 from synapse.api.constants import EventTypes, HistoryVisibility, Membership
@@ -36,11 +35,14 @@ from synapse.http import get_request_user_agent
 from synapse.http.site import SynapseRequest
 from synapse.logging import opentracing as opentracing
 from synapse.storage.databases.main.registration import TokenLookupResult
-from synapse.types import StateMap, UserID
+from synapse.types import Requester, StateMap, UserID, create_requester
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
 from synapse.util.metrics import Measure
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
@@ -65,9 +67,10 @@ class Auth:
     """
     FIXME: This class contains a mix of functions for authenticating users
     of our client-server API and authenticating events added to room graphs.
+    The latter should be moved to synapse.handlers.event_auth.EventAuthHandler.
     """
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.hs = hs
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
@@ -79,19 +82,21 @@ class Auth:
 
         self._auth_blocking = AuthBlocking(self.hs)
 
-        self._account_validity = hs.config.account_validity
+        self._account_validity_enabled = (
+            hs.config.account_validity.account_validity_enabled
+        )
         self._track_appservice_user_ips = hs.config.track_appservice_user_ips
         self._macaroon_secret_key = hs.config.macaroon_secret_key
 
     async def check_from_context(
         self, room_version: str, event, context, do_sig_check=True
-    ):
+    ) -> None:
         prev_state_ids = await context.get_prev_state_ids()
         auth_events_ids = self.compute_auth_events(
             event, prev_state_ids, for_verification=True
         )
-        auth_events = await self.store.get_events(auth_events_ids)
-        auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
+        auth_events_by_id = await self.store.get_events(auth_events_ids)
+        auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
 
         room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
         event_auth.check(
@@ -148,17 +153,11 @@ class Auth:
 
         raise AuthError(403, "User %s not in room %s" % (user_id, room_id))
 
-    async def check_host_in_room(self, room_id, host):
+    async def check_host_in_room(self, room_id: str, host: str) -> bool:
         with Measure(self.clock, "check_host_in_room"):
-            latest_event_ids = await self.store.is_host_joined(room_id, host)
-            return latest_event_ids
-
-    def can_federate(self, event, auth_events):
-        creation_event = auth_events.get((EventTypes.Create, ""))
+            return await self.store.is_host_joined(room_id, host)
 
-        return creation_event.content.get("m.federate", True) is True
-
-    def get_public_keys(self, invite_event):
+    def get_public_keys(self, invite_event: EventBase) -> List[Dict[str, Any]]:
         return event_auth.get_public_keys(invite_event)
 
     async def get_user_by_req(
@@ -167,7 +166,7 @@ class Auth:
         allow_guest: bool = False,
         rights: str = "access",
         allow_expired: bool = False,
-    ) -> synapse.types.Requester:
+    ) -> Requester:
         """Get a registered user's ID.
 
         Args:
@@ -193,7 +192,7 @@ class Auth:
             access_token = self.get_access_token_from_request(request)
 
             user_id, app_service = await self._get_appservice_user_id(request)
-            if user_id:
+            if user_id and app_service:
                 if ip_addr and self._track_appservice_user_ips:
                     await self.store.insert_client_ip(
                         user_id=user_id,
@@ -203,9 +202,7 @@ class Auth:
                         device_id="dummy-device",  # stubbed
                     )
 
-                requester = synapse.types.create_requester(
-                    user_id, app_service=app_service
-                )
+                requester = create_requester(user_id, app_service=app_service)
 
                 request.requester = user_id
                 opentracing.set_tag("authenticated_entity", user_id)
@@ -222,7 +219,7 @@ class Auth:
             shadow_banned = user_info.shadow_banned
 
             # Deny the request if the user account has expired.
-            if self._account_validity.enabled and not allow_expired:
+            if self._account_validity_enabled and not allow_expired:
                 if await self.store.is_account_expired(
                     user_info.user_id, self.clock.time_msec()
                 ):
@@ -248,7 +245,7 @@ class Auth:
                     errcode=Codes.GUEST_ACCESS_FORBIDDEN,
                 )
 
-            requester = synapse.types.create_requester(
+            requester = create_requester(
                 user_info.user_id,
                 token_id,
                 is_guest,
@@ -268,7 +265,9 @@ class Auth:
         except KeyError:
             raise MissingClientTokenError()
 
-    async def _get_appservice_user_id(self, request):
+    async def _get_appservice_user_id(
+        self, request: Request
+    ) -> Tuple[Optional[str], Optional[ApplicationService]]:
         app_service = self.store.get_app_service_by_token(
             self.get_access_token_from_request(request)
         )
@@ -280,6 +279,9 @@ class Auth:
             if ip_address not in app_service.ip_range_whitelist:
                 return None, None
 
+        # This will always be set by the time Twisted calls us.
+        assert request.args is not None
+
         if b"user_id" not in request.args:
             return app_service.sender, app_service
 
@@ -384,7 +386,9 @@ class Auth:
             logger.warning("Invalid macaroon in auth: %s %s", type(e), e)
             raise InvalidClientTokenError("Invalid macaroon passed.")
 
-    def _parse_and_validate_macaroon(self, token, rights="access"):
+    def _parse_and_validate_macaroon(
+        self, token: str, rights: str = "access"
+    ) -> Tuple[str, bool]:
         """Takes a macaroon and tries to parse and validate it. This is cached
         if and only if rights == access and there isn't an expiry.
 
@@ -429,15 +433,16 @@ class Auth:
 
         return user_id, guest
 
-    def validate_macaroon(self, macaroon, type_string, user_id):
+    def validate_macaroon(
+        self, macaroon: pymacaroons.Macaroon, type_string: str, user_id: str
+    ) -> None:
         """
         validate that a Macaroon is understood by and was signed by this server.
 
         Args:
-            macaroon(pymacaroons.Macaroon): The macaroon to validate
-            type_string(str): The kind of token required (e.g. "access",
-                              "delete_pusher")
-            user_id (str): The user_id required
+            macaroon: The macaroon to validate
+            type_string: The kind of token required (e.g. "access", "delete_pusher")
+            user_id: The user_id required
         """
         v = pymacaroons.Verifier()
 
@@ -462,9 +467,7 @@ class Auth:
         if not service:
             logger.warning("Unrecognised appservice access token.")
             raise InvalidClientTokenError()
-        request.requester = synapse.types.create_requester(
-            service.sender, app_service=service
-        )
+        request.requester = create_requester(service.sender, app_service=service)
         return service
 
     async def is_server_admin(self, user: UserID) -> bool:
@@ -516,7 +519,7 @@ class Auth:
 
         return auth_ids
 
-    async def check_can_change_room_list(self, room_id: str, user: UserID):
+    async def check_can_change_room_list(self, room_id: str, user: UserID) -> bool:
         """Determine whether the user is allowed to edit the room's entry in the
         published room list.
 
@@ -551,11 +554,11 @@ class Auth:
         return user_level >= send_level
 
     @staticmethod
-    def has_access_token(request: Request):
+    def has_access_token(request: Request) -> bool:
         """Checks if the request has an access_token.
 
         Returns:
-            bool: False if no access_token was given, True otherwise.
+            False if no access_token was given, True otherwise.
         """
         # This will always be set by the time Twisted calls us.
         assert request.args is not None
@@ -565,13 +568,13 @@ class Auth:
         return bool(query_params) or bool(auth_headers)
 
     @staticmethod
-    def get_access_token_from_request(request: Request):
+    def get_access_token_from_request(request: Request) -> str:
         """Extracts the access_token from the request.
 
         Args:
             request: The http request.
         Returns:
-            unicode: The access_token
+            The access_token
         Raises:
             MissingClientTokenError: If there isn't a single access_token in the
                 request
@@ -646,5 +649,5 @@ class Auth:
                 % (user_id, room_id),
             )
 
-    def check_auth_blocking(self, *args, **kwargs):
-        return self._auth_blocking.check_auth_blocking(*args, **kwargs)
+    async def check_auth_blocking(self, *args, **kwargs) -> None:
+        await self._auth_blocking.check_auth_blocking(*args, **kwargs)
diff --git a/synapse/api/auth_blocking.py b/synapse/api/auth_blocking.py
index a8df60cb89..e6bced93d5 100644
--- a/synapse/api/auth_blocking.py
+++ b/synapse/api/auth_blocking.py
@@ -13,18 +13,21 @@
 # limitations under the License.
 
 import logging
-from typing import Optional
+from typing import TYPE_CHECKING, Optional
 
 from synapse.api.constants import LimitBlockingTypes, UserTypes
 from synapse.api.errors import Codes, ResourceLimitError
 from synapse.config.server import is_threepid_reserved
 from synapse.types import Requester
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class AuthBlocking:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
 
         self._server_notices_mxid = hs.config.server_notices_mxid
@@ -43,7 +46,7 @@ class AuthBlocking:
         threepid: Optional[dict] = None,
         user_type: Optional[str] = None,
         requester: Optional[Requester] = None,
-    ):
+    ) -> None:
         """Checks if the user should be rejected for some external reason,
         such as monthly active user limiting or global disable flag
 
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 31a59bceec..936b6534b4 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -17,6 +17,9 @@
 
 """Contains constants from the specification."""
 
+# the max size of a (canonical-json-encoded) event
+MAX_PDU_SIZE = 65536
+
 # the "depth" field on events is limited to 2**63 - 1
 MAX_DEPTH = 2 ** 63 - 1
 
diff --git a/synapse/app/_base.py b/synapse/app/_base.py
index 2113c4f370..638e01c1b2 100644
--- a/synapse/app/_base.py
+++ b/synapse/app/_base.py
@@ -30,9 +30,10 @@ from twisted.internet import defer, error, reactor
 from twisted.protocols.tls import TLSMemoryBIOFactory
 
 import synapse
+from synapse.api.constants import MAX_PDU_SIZE
 from synapse.app import check_bind_error
 from synapse.app.phone_stats_home import start_phone_stats_home
-from synapse.config.server import ListenerConfig
+from synapse.config.homeserver import HomeServerConfig
 from synapse.crypto import context_factory
 from synapse.logging.context import PreserveLoggingContext
 from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -288,7 +289,7 @@ def refresh_certificate(hs):
         logger.info("Context factories updated.")
 
 
-async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerConfig]):
+async def start(hs: "synapse.server.HomeServer"):
     """
     Start a Synapse server or worker.
 
@@ -300,7 +301,6 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
 
     Args:
         hs: homeserver instance
-        listeners: Listener configuration ('listeners' in homeserver.yaml)
     """
     # Set up the SIGHUP machinery.
     if hasattr(signal, "SIGHUP"):
@@ -336,7 +336,7 @@ async def start(hs: "synapse.server.HomeServer", listeners: Iterable[ListenerCon
     synapse.logging.opentracing.init_tracer(hs)  # type: ignore[attr-defined] # noqa
 
     # It is now safe to start your Synapse.
-    hs.start_listening(listeners)
+    hs.start_listening()
     hs.get_datastore().db_pool.start_profiling()
     hs.get_pusherpool().start()
 
@@ -530,3 +530,25 @@ def sdnotify(state):
         # this is a bit surprising, since we don't expect to have a NOTIFY_SOCKET
         # unless systemd is expecting us to notify it.
         logger.warning("Unable to send notification to systemd: %s", e)
+
+
+def max_request_body_size(config: HomeServerConfig) -> int:
+    """Get a suitable maximum size for incoming HTTP requests"""
+
+    # Other than media uploads, the biggest request we expect to see is a fully-loaded
+    # /federation/v1/send request.
+    #
+    # The main thing in such a request is up to 50 PDUs, and up to 100 EDUs. PDUs are
+    # limited to 65536 bytes (possibly slightly more if the sender didn't use canonical
+    # json encoding); there is no specced limit to EDUs (see
+    # https://github.com/matrix-org/matrix-doc/issues/3121).
+    #
+    # in short, we somewhat arbitrarily limit requests to 200 * 64K (about 12.5M)
+    #
+    max_request_size = 200 * MAX_PDU_SIZE
+
+    # if we have a media repo enabled, we may need to allow larger uploads than that
+    if config.media.can_load_media_repo:
+        max_request_size = max(max_request_size, config.media.max_upload_size)
+
+    return max_request_size
diff --git a/synapse/app/admin_cmd.py b/synapse/app/admin_cmd.py
index eb256db749..68ae19c977 100644
--- a/synapse/app/admin_cmd.py
+++ b/synapse/app/admin_cmd.py
@@ -70,12 +70,6 @@ class AdminCmdSlavedStore(
 class AdminCmdServer(HomeServer):
     DATASTORE_CLASS = AdminCmdSlavedStore
 
-    def _listen_http(self, listener_config):
-        pass
-
-    def start_listening(self, listeners):
-        pass
-
 
 async def export_data_command(hs, args):
     """Export data for a user.
@@ -232,7 +226,7 @@ def start(config_options):
 
     async def run():
         with LoggingContext("command"):
-            _base.start(ss, [])
+            _base.start(ss)
             await args.func(ss, args)
 
     _base.start_worker_reactor(
diff --git a/synapse/app/generic_worker.py b/synapse/app/generic_worker.py
index 26c458dbb6..1a15ceee81 100644
--- a/synapse/app/generic_worker.py
+++ b/synapse/app/generic_worker.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import logging
 import sys
-from typing import Dict, Iterable, Optional
+from typing import Dict, Optional
 
 from twisted.internet import address
 from twisted.web.resource import IResource
@@ -32,7 +32,7 @@ from synapse.api.urls import (
     SERVER_KEY_V2_PREFIX,
 )
 from synapse.app import _base
-from synapse.app._base import register_start
+from synapse.app._base import max_request_body_size, register_start
 from synapse.config._base import ConfigError
 from synapse.config.homeserver import HomeServerConfig
 from synapse.config.logger import setup_logging
@@ -55,7 +55,6 @@ from synapse.replication.slave.storage.events import SlavedEventStore
 from synapse.replication.slave.storage.filtering import SlavedFilteringStore
 from synapse.replication.slave.storage.groups import SlavedGroupServerStore
 from synapse.replication.slave.storage.keys import SlavedKeyStore
-from synapse.replication.slave.storage.presence import SlavedPresenceStore
 from synapse.replication.slave.storage.profile import SlavedProfileStore
 from synapse.replication.slave.storage.push_rule import SlavedPushRuleStore
 from synapse.replication.slave.storage.pushers import SlavedPusherStore
@@ -64,7 +63,7 @@ from synapse.replication.slave.storage.registration import SlavedRegistrationSto
 from synapse.replication.slave.storage.room import RoomStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.rest.admin import register_servlets_for_media_repo
-from synapse.rest.client.v1 import events, login, room
+from synapse.rest.client.v1 import events, login, presence, room
 from synapse.rest.client.v1.initial_sync import InitialSyncRestServlet
 from synapse.rest.client.v1.profile import (
     ProfileAvatarURLRestServlet,
@@ -110,6 +109,7 @@ from synapse.storage.databases.main.metrics import ServerMetricsStore
 from synapse.storage.databases.main.monthly_active_users import (
     MonthlyActiveUsersWorkerStore,
 )
+from synapse.storage.databases.main.presence import PresenceStore
 from synapse.storage.databases.main.search import SearchWorkerStore
 from synapse.storage.databases.main.stats import StatsStore
 from synapse.storage.databases.main.transactions import TransactionWorkerStore
@@ -121,26 +121,6 @@ from synapse.util.versionstring import get_version_string
 logger = logging.getLogger("synapse.app.generic_worker")
 
 
-class PresenceStatusStubServlet(RestServlet):
-    """If presence is disabled this servlet can be used to stub out setting
-    presence status.
-    """
-
-    PATTERNS = client_patterns("/presence/(?P<user_id>[^/]*)/status")
-
-    def __init__(self, hs):
-        super().__init__()
-        self.auth = hs.get_auth()
-
-    async def on_GET(self, request, user_id):
-        await self.auth.get_user_by_req(request)
-        return 200, {"presence": "offline"}
-
-    async def on_PUT(self, request, user_id):
-        await self.auth.get_user_by_req(request)
-        return 200, {}
-
-
 class KeyUploadServlet(RestServlet):
     """An implementation of the `KeyUploadServlet` that responds to read only
     requests, but otherwise proxies through to the master instance.
@@ -241,6 +221,7 @@ class GenericWorkerSlavedStore(
     StatsStore,
     UIAuthWorkerStore,
     EndToEndRoomKeyStore,
+    PresenceStore,
     SlavedDeviceInboxStore,
     SlavedDeviceStore,
     SlavedReceiptsStore,
@@ -259,7 +240,6 @@ class GenericWorkerSlavedStore(
     SlavedTransactionStore,
     SlavedProfileStore,
     SlavedClientIpStore,
-    SlavedPresenceStore,
     SlavedFilteringStore,
     MonthlyActiveUsersWorkerStore,
     MediaRepositoryStore,
@@ -327,10 +307,7 @@ class GenericWorkerServer(HomeServer):
 
                     user_directory.register_servlets(self, resource)
 
-                    # If presence is disabled, use the stub servlet that does
-                    # not allow sending presence
-                    if not self.config.use_presence:
-                        PresenceStatusStubServlet(self).register(resource)
+                    presence.register_servlets(self, resource)
 
                     groups.register_servlets(self, resource)
 
@@ -390,14 +367,16 @@ class GenericWorkerServer(HomeServer):
                 listener_config,
                 root_resource,
                 self.version_string,
+                max_request_body_size=max_request_body_size(self.config),
+                reactor=self.get_reactor(),
             ),
             reactor=self.get_reactor(),
         )
 
         logger.info("Synapse worker now listening on port %d", port)
 
-    def start_listening(self, listeners: Iterable[ListenerConfig]):
-        for listener in listeners:
+    def start_listening(self):
+        for listener in self.config.worker_listeners:
             if listener.type == "http":
                 self._listen_http(listener)
             elif listener.type == "manhole":
@@ -490,7 +469,7 @@ def start(config_options):
     # streams. Will no-op if no streams can be written to by this worker.
     hs.get_replication_streamer()
 
-    register_start(_base.start, hs, config.worker_listeners)
+    register_start(_base.start, hs)
 
     _base.start_worker_reactor("synapse-generic-worker", config)
 
diff --git a/synapse/app/homeserver.py b/synapse/app/homeserver.py
index 8be8b520eb..8e78134bbe 100644
--- a/synapse/app/homeserver.py
+++ b/synapse/app/homeserver.py
@@ -17,7 +17,7 @@
 import logging
 import os
 import sys
-from typing import Iterable, Iterator
+from typing import Iterator
 
 from twisted.internet import reactor
 from twisted.web.resource import EncodingResourceWrapper, IResource
@@ -36,7 +36,13 @@ from synapse.api.urls import (
     WEB_CLIENT_PREFIX,
 )
 from synapse.app import _base
-from synapse.app._base import listen_ssl, listen_tcp, quit_with_error, register_start
+from synapse.app._base import (
+    listen_ssl,
+    listen_tcp,
+    max_request_body_size,
+    quit_with_error,
+    register_start,
+)
 from synapse.config._base import ConfigError
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.config.homeserver import HomeServerConfig
@@ -126,19 +132,21 @@ class SynapseHomeServer(HomeServer):
         else:
             root_resource = OptionsResource()
 
-        root_resource = create_resource_tree(resources, root_resource)
+        site = SynapseSite(
+            "synapse.access.%s.%s" % ("https" if tls else "http", site_tag),
+            site_tag,
+            listener_config,
+            create_resource_tree(resources, root_resource),
+            self.version_string,
+            max_request_body_size=max_request_body_size(self.config),
+            reactor=self.get_reactor(),
+        )
 
         if tls:
             ports = listen_ssl(
                 bind_addresses,
                 port,
-                SynapseSite(
-                    "synapse.access.https.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                    self.version_string,
-                ),
+                site,
                 self.tls_server_context_factory,
                 reactor=self.get_reactor(),
             )
@@ -148,13 +156,7 @@ class SynapseHomeServer(HomeServer):
             ports = listen_tcp(
                 bind_addresses,
                 port,
-                SynapseSite(
-                    "synapse.access.http.%s" % (site_tag,),
-                    site_tag,
-                    listener_config,
-                    root_resource,
-                    self.version_string,
-                ),
+                site,
                 reactor=self.get_reactor(),
             )
             logger.info("Synapse now listening on TCP port %d", port)
@@ -273,14 +275,14 @@ class SynapseHomeServer(HomeServer):
 
         return resources
 
-    def start_listening(self, listeners: Iterable[ListenerConfig]):
+    def start_listening(self):
         if self.config.redis_enabled:
             # If redis is enabled we connect via the replication command handler
             # in the same way as the workers (since we're effectively a client
             # rather than a server).
             self.get_tcp_replication().start_replication(self)
 
-        for listener in listeners:
+        for listener in self.config.server.listeners:
             if listener.type == "http":
                 self._listening_services.extend(
                     self._listener_http(self.config, listener)
@@ -412,7 +414,7 @@ def setup(config_options):
             # Loading the provider metadata also ensures the provider config is valid.
             await oidc.load_metadata()
 
-        await _base.start(hs, config.listeners)
+        await _base.start(hs)
 
         hs.get_datastore().db_pool.updates.start_doing_background_updates()
 
diff --git a/synapse/config/_base.pyi b/synapse/config/_base.pyi
index e896fd34e2..ff9abbc232 100644
--- a/synapse/config/_base.pyi
+++ b/synapse/config/_base.pyi
@@ -1,21 +1,22 @@
 from typing import Any, Iterable, List, Optional
 
 from synapse.config import (
+    account_validity,
     api,
     appservice,
     auth,
     captcha,
     cas,
-    consent_config,
+    consent,
     database,
     emailconfig,
     experimental,
     groups,
-    jwt_config,
+    jwt,
     key,
     logger,
     metrics,
-    oidc_config,
+    oidc,
     password_auth_providers,
     push,
     ratelimiting,
@@ -23,9 +24,9 @@ from synapse.config import (
     registration,
     repository,
     room_directory,
-    saml2_config,
+    saml2,
     server,
-    server_notices_config,
+    server_notices,
     spam_checker,
     sso,
     stats,
@@ -59,15 +60,16 @@ class RootConfig:
     captcha: captcha.CaptchaConfig
     voip: voip.VoipConfig
     registration: registration.RegistrationConfig
+    account_validity: account_validity.AccountValidityConfig
     metrics: metrics.MetricsConfig
     api: api.ApiConfig
     appservice: appservice.AppServiceConfig
     key: key.KeyConfig
-    saml2: saml2_config.SAML2Config
+    saml2: saml2.SAML2Config
     cas: cas.CasConfig
     sso: sso.SSOConfig
-    oidc: oidc_config.OIDCConfig
-    jwt: jwt_config.JWTConfig
+    oidc: oidc.OIDCConfig
+    jwt: jwt.JWTConfig
     auth: auth.AuthConfig
     email: emailconfig.EmailConfig
     worker: workers.WorkerConfig
@@ -76,9 +78,9 @@ class RootConfig:
     spamchecker: spam_checker.SpamCheckerConfig
     groups: groups.GroupsConfig
     userdirectory: user_directory.UserDirectoryConfig
-    consent: consent_config.ConsentConfig
+    consent: consent.ConsentConfig
     stats: stats.StatsConfig
-    servernotices: server_notices_config.ServerNoticesConfig
+    servernotices: server_notices.ServerNoticesConfig
     roomdirectory: room_directory.RoomDirectoryConfig
     thirdpartyrules: third_party_event_rules.ThirdPartyRulesConfig
     tracer: tracer.TracerConfig
diff --git a/synapse/config/account_validity.py b/synapse/config/account_validity.py
new file mode 100644
index 0000000000..c58a7d95a7
--- /dev/null
+++ b/synapse/config/account_validity.py
@@ -0,0 +1,165 @@
+# -*- coding: utf-8 -*-
+# Copyright 2020 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from synapse.config._base import Config, ConfigError
+
+
+class AccountValidityConfig(Config):
+    section = "account_validity"
+
+    def read_config(self, config, **kwargs):
+        account_validity_config = config.get("account_validity") or {}
+        self.account_validity_enabled = account_validity_config.get("enabled", False)
+        self.account_validity_renew_by_email_enabled = (
+            "renew_at" in account_validity_config
+        )
+
+        if self.account_validity_enabled:
+            if "period" in account_validity_config:
+                self.account_validity_period = self.parse_duration(
+                    account_validity_config["period"]
+                )
+            else:
+                raise ConfigError("'period' is required when using account validity")
+
+            if "renew_at" in account_validity_config:
+                self.account_validity_renew_at = self.parse_duration(
+                    account_validity_config["renew_at"]
+                )
+
+            if "renew_email_subject" in account_validity_config:
+                self.account_validity_renew_email_subject = account_validity_config[
+                    "renew_email_subject"
+                ]
+            else:
+                self.account_validity_renew_email_subject = "Renew your %(app)s account"
+
+            self.account_validity_startup_job_max_delta = (
+                self.account_validity_period * 10.0 / 100.0
+            )
+
+        if self.account_validity_renew_by_email_enabled:
+            if not self.public_baseurl:
+                raise ConfigError("Can't send renewal emails without 'public_baseurl'")
+
+        # Load account validity templates.
+        account_validity_template_dir = account_validity_config.get("template_dir")
+
+        account_renewed_template_filename = account_validity_config.get(
+            "account_renewed_html_path", "account_renewed.html"
+        )
+        invalid_token_template_filename = account_validity_config.get(
+            "invalid_token_html_path", "invalid_token.html"
+        )
+
+        # Read and store template content
+        (
+            self.account_validity_account_renewed_template,
+            self.account_validity_account_previously_renewed_template,
+            self.account_validity_invalid_token_template,
+        ) = self.read_templates(
+            [
+                account_renewed_template_filename,
+                "account_previously_renewed.html",
+                invalid_token_template_filename,
+            ],
+            account_validity_template_dir,
+        )
+
+    def generate_config_section(self, **kwargs):
+        return """\
+        ## Account Validity ##
+
+        # Optional account validity configuration. This allows for accounts to be denied
+        # any request after a given period.
+        #
+        # Once this feature is enabled, Synapse will look for registered users without an
+        # expiration date at startup and will add one to every account it found using the
+        # current settings at that time.
+        # This means that, if a validity period is set, and Synapse is restarted (it will
+        # then derive an expiration date from the current validity period), and some time
+        # after that the validity period changes and Synapse is restarted, the users'
+        # expiration dates won't be updated unless their account is manually renewed. This
+        # date will be randomly selected within a range [now + period - d ; now + period],
+        # where d is equal to 10% of the validity period.
+        #
+        account_validity:
+          # The account validity feature is disabled by default. Uncomment the
+          # following line to enable it.
+          #
+          #enabled: true
+
+          # The period after which an account is valid after its registration. When
+          # renewing the account, its validity period will be extended by this amount
+          # of time. This parameter is required when using the account validity
+          # feature.
+          #
+          #period: 6w
+
+          # The amount of time before an account's expiry date at which Synapse will
+          # send an email to the account's email address with a renewal link. By
+          # default, no such emails are sent.
+          #
+          # If you enable this setting, you will also need to fill out the 'email' and
+          # 'public_baseurl' configuration sections.
+          #
+          #renew_at: 1w
+
+          # The subject of the email sent out with the renewal link. '%(app)s' can be
+          # used as a placeholder for the 'app_name' parameter from the 'email'
+          # section.
+          #
+          # Note that the placeholder must be written '%(app)s', including the
+          # trailing 's'.
+          #
+          # If this is not set, a default value is used.
+          #
+          #renew_email_subject: "Renew your %(app)s account"
+
+          # Directory in which Synapse will try to find templates for the HTML files to
+          # serve to the user when trying to renew an account. If not set, default
+          # templates from within the Synapse package will be used.
+          #
+          # The currently available templates are:
+          #
+          # * account_renewed.html: Displayed to the user after they have successfully
+          #       renewed their account.
+          #
+          # * account_previously_renewed.html: Displayed to the user if they attempt to
+          #       renew their account with a token that is valid, but that has already
+          #       been used. In this case the account is not renewed again.
+          #
+          # * invalid_token.html: Displayed to the user when they try to renew an account
+          #       with an unknown or invalid renewal token.
+          #
+          # See https://github.com/matrix-org/synapse/tree/master/synapse/res/templates for
+          # default template contents.
+          #
+          # The file name of some of these templates can be configured below for legacy
+          # reasons.
+          #
+          #template_dir: "res/templates"
+
+          # A custom file name for the 'account_renewed.html' template.
+          #
+          # If not set, the file is assumed to be named "account_renewed.html".
+          #
+          #account_renewed_html_path: "account_renewed.html"
+
+          # A custom file name for the 'invalid_token.html' template.
+          #
+          # If not set, the file is assumed to be named "invalid_token.html".
+          #
+          #invalid_token_html_path: "invalid_token.html"
+        """
diff --git a/synapse/config/consent_config.py b/synapse/config/consent.py
index 30d07cc219..30d07cc219 100644
--- a/synapse/config/consent_config.py
+++ b/synapse/config/consent.py
diff --git a/synapse/config/emailconfig.py b/synapse/config/emailconfig.py
index c587939c7a..5564d7d097 100644
--- a/synapse/config/emailconfig.py
+++ b/synapse/config/emailconfig.py
@@ -299,7 +299,7 @@ class EmailConfig(Config):
                 "client_base_url", email_config.get("riot_base_url", None)
             )
 
-        if self.account_validity.renew_by_email_enabled:
+        if self.account_validity_renew_by_email_enabled:
             expiry_template_html = email_config.get(
                 "expiry_template_html", "notice_expiry.html"
             )
diff --git a/synapse/config/homeserver.py b/synapse/config/homeserver.py
index 1309535068..c23b66c88c 100644
--- a/synapse/config/homeserver.py
+++ b/synapse/config/homeserver.py
@@ -12,25 +12,25 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
 from ._base import RootConfig
+from .account_validity import AccountValidityConfig
 from .api import ApiConfig
 from .appservice import AppServiceConfig
 from .auth import AuthConfig
 from .cache import CacheConfig
 from .captcha import CaptchaConfig
 from .cas import CasConfig
-from .consent_config import ConsentConfig
+from .consent import ConsentConfig
 from .database import DatabaseConfig
 from .emailconfig import EmailConfig
 from .experimental import ExperimentalConfig
 from .federation import FederationConfig
 from .groups import GroupsConfig
-from .jwt_config import JWTConfig
+from .jwt import JWTConfig
 from .key import KeyConfig
 from .logger import LoggingConfig
 from .metrics import MetricsConfig
-from .oidc_config import OIDCConfig
+from .oidc import OIDCConfig
 from .password_auth_providers import PasswordAuthProviderConfig
 from .push import PushConfig
 from .ratelimiting import RatelimitConfig
@@ -39,9 +39,9 @@ from .registration import RegistrationConfig
 from .repository import ContentRepositoryConfig
 from .room import RoomConfig
 from .room_directory import RoomDirectoryConfig
-from .saml2_config import SAML2Config
+from .saml2 import SAML2Config
 from .server import ServerConfig
-from .server_notices_config import ServerNoticesConfig
+from .server_notices import ServerNoticesConfig
 from .spam_checker import SpamCheckerConfig
 from .sso import SSOConfig
 from .stats import StatsConfig
@@ -68,6 +68,7 @@ class HomeServerConfig(RootConfig):
         CaptchaConfig,
         VoipConfig,
         RegistrationConfig,
+        AccountValidityConfig,
         MetricsConfig,
         ApiConfig,
         AppServiceConfig,
diff --git a/synapse/config/jwt_config.py b/synapse/config/jwt.py
index 9e07e73008..9e07e73008 100644
--- a/synapse/config/jwt_config.py
+++ b/synapse/config/jwt.py
diff --git a/synapse/config/logger.py b/synapse/config/logger.py
index b174e0df6d..813076dfe2 100644
--- a/synapse/config/logger.py
+++ b/synapse/config/logger.py
@@ -31,7 +31,6 @@ from twisted.logger import (
 )
 
 import synapse
-from synapse.app import _base as appbase
 from synapse.logging._structured import setup_structured_logging
 from synapse.logging.context import LoggingContextFilter
 from synapse.logging.filter import MetadataFilter
@@ -318,6 +317,8 @@ def setup_logging(
     # Perform one-time logging configuration.
     _setup_stdlib_logging(config, log_config_path, logBeginner=logBeginner)
     # Add a SIGHUP handler to reload the logging configuration, if one is available.
+    from synapse.app import _base as appbase
+
     appbase.register_sighup(_reload_logging_config, log_config_path)
 
     # Log immediately so we can grep backwards.
diff --git a/synapse/config/oidc_config.py b/synapse/config/oidc.py
index 5fb94376fd..ea0abf5aa2 100644
--- a/synapse/config/oidc_config.py
+++ b/synapse/config/oidc.py
@@ -14,20 +14,23 @@
 # limitations under the License.
 
 from collections import Counter
-from typing import Iterable, List, Mapping, Optional, Tuple, Type
+from typing import Collection, Iterable, List, Mapping, Optional, Tuple, Type
 
 import attr
 
 from synapse.config._util import validate_config
 from synapse.config.sso import SsoAttributeRequirement
 from synapse.python_dependencies import DependencyException, check_requirements
-from synapse.types import Collection, JsonDict
+from synapse.types import JsonDict
 from synapse.util.module_loader import load_module
 from synapse.util.stringutils import parse_and_validate_mxc_uri
 
 from ._base import Config, ConfigError, read_file
 
-DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.oidc_handler.JinjaOidcMappingProvider"
+DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.oidc.JinjaOidcMappingProvider"
+# The module that JinjaOidcMappingProvider is in was renamed, we want to
+# transparently handle both the same.
+LEGACY_USER_MAPPING_PROVIDER = "synapse.handlers.oidc_handler.JinjaOidcMappingProvider"
 
 
 class OIDCConfig(Config):
@@ -403,6 +406,8 @@ def _parse_oidc_config_dict(
     """
     ump_config = oidc_config.get("user_mapping_provider", {})
     ump_config.setdefault("module", DEFAULT_USER_MAPPING_PROVIDER)
+    if ump_config.get("module") == LEGACY_USER_MAPPING_PROVIDER:
+        ump_config["module"] = DEFAULT_USER_MAPPING_PROVIDER
     ump_config.setdefault("config", {})
 
     (
diff --git a/synapse/config/registration.py b/synapse/config/registration.py
index f8a2768af8..e6f52b4f40 100644
--- a/synapse/config/registration.py
+++ b/synapse/config/registration.py
@@ -12,74 +12,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-import os
-
-import pkg_resources
-
 from synapse.api.constants import RoomCreationPreset
 from synapse.config._base import Config, ConfigError
 from synapse.types import RoomAlias, UserID
 from synapse.util.stringutils import random_string_with_symbols, strtobool
 
 
-class AccountValidityConfig(Config):
-    section = "accountvalidity"
-
-    def __init__(self, config, synapse_config):
-        if config is None:
-            return
-        super().__init__()
-        self.enabled = config.get("enabled", False)
-        self.renew_by_email_enabled = "renew_at" in config
-
-        if self.enabled:
-            if "period" in config:
-                self.period = self.parse_duration(config["period"])
-            else:
-                raise ConfigError("'period' is required when using account validity")
-
-            if "renew_at" in config:
-                self.renew_at = self.parse_duration(config["renew_at"])
-
-            if "renew_email_subject" in config:
-                self.renew_email_subject = config["renew_email_subject"]
-            else:
-                self.renew_email_subject = "Renew your %(app)s account"
-
-            self.startup_job_max_delta = self.period * 10.0 / 100.0
-
-        if self.renew_by_email_enabled:
-            if "public_baseurl" not in synapse_config:
-                raise ConfigError("Can't send renewal emails without 'public_baseurl'")
-
-        template_dir = config.get("template_dir")
-
-        if not template_dir:
-            template_dir = pkg_resources.resource_filename("synapse", "res/templates")
-
-        if "account_renewed_html_path" in config:
-            file_path = os.path.join(template_dir, config["account_renewed_html_path"])
-
-            self.account_renewed_html_content = self.read_file(
-                file_path, "account_validity.account_renewed_html_path"
-            )
-        else:
-            self.account_renewed_html_content = (
-                "<html><body>Your account has been successfully renewed.</body><html>"
-            )
-
-        if "invalid_token_html_path" in config:
-            file_path = os.path.join(template_dir, config["invalid_token_html_path"])
-
-            self.invalid_token_html_content = self.read_file(
-                file_path, "account_validity.invalid_token_html_path"
-            )
-        else:
-            self.invalid_token_html_content = (
-                "<html><body>Invalid renewal token.</body><html>"
-            )
-
-
 class RegistrationConfig(Config):
     section = "registration"
 
@@ -92,10 +30,6 @@ class RegistrationConfig(Config):
                 str(config["disable_registration"])
             )
 
-        self.account_validity = AccountValidityConfig(
-            config.get("account_validity") or {}, config
-        )
-
         self.registrations_require_3pid = config.get("registrations_require_3pid", [])
         self.allowed_local_3pids = config.get("allowed_local_3pids", [])
         self.enable_3pid_lookup = config.get("enable_3pid_lookup", True)
@@ -207,69 +141,6 @@ class RegistrationConfig(Config):
         #
         #enable_registration: false
 
-        # Optional account validity configuration. This allows for accounts to be denied
-        # any request after a given period.
-        #
-        # Once this feature is enabled, Synapse will look for registered users without an
-        # expiration date at startup and will add one to every account it found using the
-        # current settings at that time.
-        # This means that, if a validity period is set, and Synapse is restarted (it will
-        # then derive an expiration date from the current validity period), and some time
-        # after that the validity period changes and Synapse is restarted, the users'
-        # expiration dates won't be updated unless their account is manually renewed. This
-        # date will be randomly selected within a range [now + period - d ; now + period],
-        # where d is equal to 10%% of the validity period.
-        #
-        account_validity:
-          # The account validity feature is disabled by default. Uncomment the
-          # following line to enable it.
-          #
-          #enabled: true
-
-          # The period after which an account is valid after its registration. When
-          # renewing the account, its validity period will be extended by this amount
-          # of time. This parameter is required when using the account validity
-          # feature.
-          #
-          #period: 6w
-
-          # The amount of time before an account's expiry date at which Synapse will
-          # send an email to the account's email address with a renewal link. By
-          # default, no such emails are sent.
-          #
-          # If you enable this setting, you will also need to fill out the 'email' and
-          # 'public_baseurl' configuration sections.
-          #
-          #renew_at: 1w
-
-          # The subject of the email sent out with the renewal link. '%%(app)s' can be
-          # used as a placeholder for the 'app_name' parameter from the 'email'
-          # section.
-          #
-          # Note that the placeholder must be written '%%(app)s', including the
-          # trailing 's'.
-          #
-          # If this is not set, a default value is used.
-          #
-          #renew_email_subject: "Renew your %%(app)s account"
-
-          # Directory in which Synapse will try to find templates for the HTML files to
-          # serve to the user when trying to renew an account. If not set, default
-          # templates from within the Synapse package will be used.
-          #
-          #template_dir: "res/templates"
-
-          # File within 'template_dir' giving the HTML to be displayed to the user after
-          # they successfully renewed their account. If not set, default text is used.
-          #
-          #account_renewed_html_path: "account_renewed.html"
-
-          # File within 'template_dir' giving the HTML to be displayed when the user
-          # tries to renew an account with an invalid renewal token. If not set,
-          # default text is used.
-          #
-          #invalid_token_html_path: "invalid_token.html"
-
         # Time that a user's session remains valid for, after they log in.
         #
         # Note that this is not currently compatible with guest logins.
diff --git a/synapse/config/saml2_config.py b/synapse/config/saml2.py
index 55a7838b10..3d1218c8d1 100644
--- a/synapse/config/saml2_config.py
+++ b/synapse/config/saml2.py
@@ -25,7 +25,10 @@ from ._util import validate_config
 
 logger = logging.getLogger(__name__)
 
-DEFAULT_USER_MAPPING_PROVIDER = (
+DEFAULT_USER_MAPPING_PROVIDER = "synapse.handlers.saml.DefaultSamlMappingProvider"
+# The module that DefaultSamlMappingProvider is in was renamed, we want to
+# transparently handle both the same.
+LEGACY_USER_MAPPING_PROVIDER = (
     "synapse.handlers.saml_handler.DefaultSamlMappingProvider"
 )
 
@@ -97,6 +100,8 @@ class SAML2Config(Config):
 
         # Use the default user mapping provider if not set
         ump_dict.setdefault("module", DEFAULT_USER_MAPPING_PROVIDER)
+        if ump_dict.get("module") == LEGACY_USER_MAPPING_PROVIDER:
+            ump_dict["module"] = DEFAULT_USER_MAPPING_PROVIDER
 
         # Ensure a config is present
         ump_dict["config"] = ump_dict.get("config") or {}
diff --git a/synapse/config/server.py b/synapse/config/server.py
index 02b86b11a5..21ca7b33e3 100644
--- a/synapse/config/server.py
+++ b/synapse/config/server.py
@@ -235,7 +235,11 @@ class ServerConfig(Config):
         self.print_pidfile = config.get("print_pidfile")
         self.user_agent_suffix = config.get("user_agent_suffix")
         self.use_frozen_dicts = config.get("use_frozen_dicts", False)
+
         self.public_baseurl = config.get("public_baseurl")
+        if self.public_baseurl is not None:
+            if self.public_baseurl[-1] != "/":
+                self.public_baseurl += "/"
 
         # Whether to enable user presence.
         presence_config = config.get("presence") or {}
@@ -407,10 +411,6 @@ class ServerConfig(Config):
             config_path=("federation_ip_range_blacklist",),
         )
 
-        if self.public_baseurl is not None:
-            if self.public_baseurl[-1] != "/":
-                self.public_baseurl += "/"
-
         # (undocumented) option for torturing the worker-mode replication a bit,
         # for testing. The value defines the number of milliseconds to pause before
         # sending out any replication updates.
diff --git a/synapse/config/server_notices_config.py b/synapse/config/server_notices.py
index 48bf3241b6..48bf3241b6 100644
--- a/synapse/config/server_notices_config.py
+++ b/synapse/config/server_notices.py
diff --git a/synapse/config/workers.py b/synapse/config/workers.py
index b2540163d1..462630201d 100644
--- a/synapse/config/workers.py
+++ b/synapse/config/workers.py
@@ -64,6 +64,14 @@ class WriterLocations:
     Attributes:
         events: The instances that write to the event and backfill streams.
         typing: The instance that writes to the typing stream.
+        to_device: The instances that write to the to_device stream. Currently
+            can only be a single instance.
+        account_data: The instances that write to the account data streams. Currently
+            can only be a single instance.
+        receipts: The instances that write to the receipts stream. Currently
+            can only be a single instance.
+        presence: The instances that write to the presence stream. Currently
+            can only be a single instance.
     """
 
     events = attr.ib(
@@ -85,6 +93,11 @@ class WriterLocations:
         type=List[str],
         converter=_instance_to_list_converter,
     )
+    presence = attr.ib(
+        default=["master"],
+        type=List[str],
+        converter=_instance_to_list_converter,
+    )
 
 
 class WorkerConfig(Config):
@@ -188,7 +201,14 @@ class WorkerConfig(Config):
 
         # Check that the configured writers for events and typing also appears in
         # `instance_map`.
-        for stream in ("events", "typing", "to_device", "account_data", "receipts"):
+        for stream in (
+            "events",
+            "typing",
+            "to_device",
+            "account_data",
+            "receipts",
+            "presence",
+        ):
             instances = _instance_to_list_converter(getattr(self.writers, stream))
             for instance in instances:
                 if instance != "master" and instance not in self.instance_map:
@@ -215,6 +235,11 @@ class WorkerConfig(Config):
         if len(self.writers.events) == 0:
             raise ConfigError("Must specify at least one instance to handle `events`.")
 
+        if len(self.writers.presence) != 1:
+            raise ConfigError(
+                "Must only specify one instance to handle `presence` messages."
+            )
+
         self.events_shard_config = RoutableShardedWorkerHandlingConfig(
             self.writers.events
         )
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index 5234e3f81e..70c556566e 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -14,14 +14,14 @@
 # limitations under the License.
 
 import logging
-from typing import List, Optional, Set, Tuple
+from typing import Any, Dict, List, Optional, Set, Tuple
 
 from canonicaljson import encode_canonical_json
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import SignatureVerifyException, verify_signed_json
 from unpaddedbase64 import decode_base64
 
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import MAX_PDU_SIZE, EventTypes, JoinRules, Membership
 from synapse.api.errors import AuthError, EventSizeError, SynapseError
 from synapse.api.room_versions import (
     KNOWN_ROOM_VERSIONS,
@@ -205,7 +205,7 @@ def _check_size_limits(event: EventBase) -> None:
         too_big("type")
     if len(event.event_id) > 255:
         too_big("event_id")
-    if len(encode_canonical_json(event.get_pdu_json())) > 65536:
+    if len(encode_canonical_json(event.get_pdu_json())) > MAX_PDU_SIZE:
         too_big("event")
 
 
@@ -670,7 +670,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase
         public_key = public_key_object["public_key"]
         try:
             for server, signature_block in signed["signatures"].items():
-                for key_name, encoded_signature in signature_block.items():
+                for key_name in signature_block.keys():
                     if not key_name.startswith("ed25519:"):
                         continue
                     verify_key = decode_verify_key_bytes(
@@ -688,7 +688,7 @@ def _verify_third_party_invite(event: EventBase, auth_events: StateMap[EventBase
     return False
 
 
-def get_public_keys(invite_event):
+def get_public_keys(invite_event: EventBase) -> List[Dict[str, Any]]:
     public_keys = []
     if "public_key" in invite_event.content:
         o = {"public_key": invite_event.content["public_key"]}
diff --git a/synapse/events/spamcheck.py b/synapse/events/spamcheck.py
index c727b48c1e..7118d5f52d 100644
--- a/synapse/events/spamcheck.py
+++ b/synapse/events/spamcheck.py
@@ -15,12 +15,11 @@
 
 import inspect
 import logging
-from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
+from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Tuple, Union
 
 from synapse.rest.media.v1._base import FileInfo
 from synapse.rest.media.v1.media_storage import ReadableFileWrapper
 from synapse.spam_checker_api import RegistrationBehaviour
-from synapse.types import Collection
 from synapse.util.async_helpers import maybe_awaitable
 
 if TYPE_CHECKING:
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index f93335edaa..a5b6a61195 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -451,6 +451,28 @@ class FederationClient(FederationBase):
 
         return signed_auth
 
+    def _is_unknown_endpoint(
+        self, e: HttpResponseException, synapse_error: Optional[SynapseError] = None
+    ) -> bool:
+        """
+        Returns true if the response was due to an endpoint being unimplemented.
+
+        Args:
+            e: The error response received from the remote server.
+            synapse_error: The above error converted to a SynapseError. This is
+                automatically generated if not provided.
+
+        """
+        if synapse_error is None:
+            synapse_error = e.to_synapse_error()
+        # There is no good way to detect an "unknown" endpoint.
+        #
+        # Dendrite returns a 404 (with no body); synapse returns a 400
+        # with M_UNRECOGNISED.
+        return e.code == 404 or (
+            e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
+        )
+
     async def _try_destination_list(
         self,
         description: str,
@@ -468,9 +490,9 @@ class FederationClient(FederationBase):
             callback:  Function to run for each server. Passed a single
                 argument: the server_name to try.
 
-                If the callback raises a CodeMessageException with a 300/400 code,
-                attempts to perform the operation stop immediately and the exception is
-                reraised.
+                If the callback raises a CodeMessageException with a 300/400 code or
+                an UnsupportedRoomVersionError, attempts to perform the operation
+                stop immediately and the exception is reraised.
 
                 Otherwise, if the callback raises an Exception the error is logged and the
                 next server tried. Normally the stacktrace is logged but this is
@@ -492,8 +514,7 @@ class FederationClient(FederationBase):
                 continue
 
             try:
-                res = await callback(destination)
-                return res
+                return await callback(destination)
             except InvalidResponseError as e:
                 logger.warning("Failed to %s via %s: %s", description, destination, e)
             except UnsupportedRoomVersionError:
@@ -502,17 +523,15 @@ class FederationClient(FederationBase):
                 synapse_error = e.to_synapse_error()
                 failover = False
 
+                # Failover on an internal server error, or if the destination
+                # doesn't implemented the endpoint for some reason.
                 if 500 <= e.code < 600:
                     failover = True
 
-                elif failover_on_unknown_endpoint:
-                    # there is no good way to detect an "unknown" endpoint. Dendrite
-                    # returns a 404 (with no body); synapse returns a 400
-                    # with M_UNRECOGNISED.
-                    if e.code == 404 or (
-                        e.code == 400 and synapse_error.errcode == Codes.UNRECOGNIZED
-                    ):
-                        failover = True
+                elif failover_on_unknown_endpoint and self._is_unknown_endpoint(
+                    e, synapse_error
+                ):
+                    failover = True
 
                 if not failover:
                     raise synapse_error from e
@@ -570,9 +589,8 @@ class FederationClient(FederationBase):
             UnsupportedRoomVersionError: if remote responds with
                 a room version we don't understand.
 
-            SynapseError: if the chosen remote server returns a 300/400 code.
-
-            RuntimeError: if no servers were reachable.
+            SynapseError: if the chosen remote server returns a 300/400 code, or
+                no servers successfully handle the request.
         """
         valid_memberships = {Membership.JOIN, Membership.LEAVE}
         if membership not in valid_memberships:
@@ -642,9 +660,8 @@ class FederationClient(FederationBase):
             ``auth_chain``.
 
         Raises:
-            SynapseError: if the chosen remote server returns a 300/400 code.
-
-            RuntimeError: if no servers were reachable.
+            SynapseError: if the chosen remote server returns a 300/400 code, or
+                no servers successfully handle the request.
         """
 
         async def send_request(destination) -> Dict[str, Any]:
@@ -673,7 +690,7 @@ class FederationClient(FederationBase):
             if create_event is None:
                 # If the state doesn't have a create event then the room is
                 # invalid, and it would fail auth checks anyway.
-                raise SynapseError(400, "No create event in state")
+                raise InvalidResponseError("No create event in state")
 
             # the room version should be sane.
             create_room_version = create_event.content.get(
@@ -746,16 +763,11 @@ class FederationClient(FederationBase):
                 content=pdu.get_pdu_json(time_now),
             )
         except HttpResponseException as e:
-            if e.code in [400, 404]:
-                err = e.to_synapse_error()
-
-                # If we receive an error response that isn't a generic error, or an
-                # unrecognised endpoint error, we  assume that the remote understands
-                # the v2 invite API and this is a legitimate error.
-                if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
-                    raise err
-            else:
-                raise e.to_synapse_error()
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v1 endpoint. Otherwise consider it a legitmate error
+            # and raise.
+            if not self._is_unknown_endpoint(e):
+                raise
 
         logger.debug("Couldn't send_join with the v2 API, falling back to the v1 API")
 
@@ -802,6 +814,11 @@ class FederationClient(FederationBase):
 
         Returns:
             The event as a dict as returned by the remote server
+
+        Raises:
+            SynapseError: if the remote server returns an error or if the server
+                only supports the v1 endpoint and a room version other than "1"
+                or "2" is requested.
         """
         time_now = self._clock.time_msec()
 
@@ -817,28 +834,19 @@ class FederationClient(FederationBase):
                 },
             )
         except HttpResponseException as e:
-            if e.code in [400, 404]:
-                err = e.to_synapse_error()
-
-                # If we receive an error response that isn't a generic error, we
-                # assume that the remote understands the v2 invite API and this
-                # is a legitimate error.
-                if err.errcode != Codes.UNKNOWN:
-                    raise err
-
-                # Otherwise, we assume that the remote server doesn't understand
-                # the v2 invite API. That's ok provided the room uses old-style event
-                # IDs.
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v1 endpoint if the room uses old-style event IDs.
+            # Otherwise consider it a legitmate error and raise.
+            err = e.to_synapse_error()
+            if self._is_unknown_endpoint(e, err):
                 if room_version.event_format != EventFormatVersions.V1:
                     raise SynapseError(
                         400,
                         "User's homeserver does not support this room version",
                         Codes.UNSUPPORTED_ROOM_VERSION,
                     )
-            elif e.code in (403, 429):
-                raise e.to_synapse_error()
             else:
-                raise
+                raise err
 
         # Didn't work, try v1 API.
         # Note the v1 API returns a tuple of `(200, content)`
@@ -865,9 +873,8 @@ class FederationClient(FederationBase):
             pdu: event to be sent
 
         Raises:
-            SynapseError if the chosen remote server returns a 300/400 code.
-
-            RuntimeError if no servers were reachable.
+            SynapseError: if the chosen remote server returns a 300/400 code, or
+                no servers successfully handle the request.
         """
 
         async def send_request(destination: str) -> None:
@@ -889,16 +896,11 @@ class FederationClient(FederationBase):
                 content=pdu.get_pdu_json(time_now),
             )
         except HttpResponseException as e:
-            if e.code in [400, 404]:
-                err = e.to_synapse_error()
-
-                # If we receive an error response that isn't a generic error, or an
-                # unrecognised endpoint error, we  assume that the remote understands
-                # the v2 invite API and this is a legitimate error.
-                if err.errcode not in [Codes.UNKNOWN, Codes.UNRECOGNIZED]:
-                    raise err
-            else:
-                raise e.to_synapse_error()
+            # If an error is received that is due to an unrecognised endpoint,
+            # fallback to the v1 endpoint. Otherwise consider it a legitmate error
+            # and raise.
+            if not self._is_unknown_endpoint(e):
+                raise
 
         logger.debug("Couldn't send_leave with the v2 API, falling back to the v1 API")
 
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index d71f04e43e..65d76ea974 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -501,10 +501,10 @@ def process_rows_for_federation(
             states=[state], destinations=destinations
         )
 
-    for destination, edu_map in buff.keyed_edus.items():
+    for edu_map in buff.keyed_edus.values():
         for key, edu in edu_map.items():
             transaction_queue.send_edu(edu, key)
 
-    for destination, edu_list in buff.edus.items():
+    for edu_list in buff.edus.values():
         for edu in edu_list:
             transaction_queue.send_edu(edu, None)
diff --git a/synapse/federation/sender/__init__.py b/synapse/federation/sender/__init__.py
index 6266accaf5..022bbf7dad 100644
--- a/synapse/federation/sender/__init__.py
+++ b/synapse/federation/sender/__init__.py
@@ -14,7 +14,17 @@
 
 import abc
 import logging
-from typing import TYPE_CHECKING, Dict, Hashable, Iterable, List, Optional, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    Hashable,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+)
 
 from prometheus_client import Counter
 
@@ -31,7 +41,7 @@ from synapse.metrics import (
     events_processed_counter,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import Collection, JsonDict, ReadReceipt, RoomStreamToken
+from synapse.types import JsonDict, ReadReceipt, RoomStreamToken
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -539,6 +549,10 @@ class FederationSender(AbstractFederationSender):
             # No-op if presence is disabled.
             return
 
+        # Ensure we only send out presence states for local users.
+        for state in states:
+            assert self.is_mine_id(state.user_id)
+
         for destination in destinations:
             if destination == self.server_name:
                 continue
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 66ce7e8b83..5b927f10b3 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -17,7 +17,7 @@ import email.utils
 import logging
 from email.mime.multipart import MIMEMultipart
 from email.mime.text import MIMEText
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, List, Optional, Tuple
 
 from synapse.api.errors import StoreError, SynapseError
 from synapse.logging.context import make_deferred_yieldable
@@ -39,28 +39,44 @@ class AccountValidityHandler:
         self.sendmail = self.hs.get_sendmail()
         self.clock = self.hs.get_clock()
 
-        self._account_validity = self.hs.config.account_validity
+        self._account_validity_enabled = (
+            hs.config.account_validity.account_validity_enabled
+        )
+        self._account_validity_renew_by_email_enabled = (
+            hs.config.account_validity.account_validity_renew_by_email_enabled
+        )
+
+        self._account_validity_period = None
+        if self._account_validity_enabled:
+            self._account_validity_period = (
+                hs.config.account_validity.account_validity_period
+            )
 
         if (
-            self._account_validity.enabled
-            and self._account_validity.renew_by_email_enabled
+            self._account_validity_enabled
+            and self._account_validity_renew_by_email_enabled
         ):
             # Don't do email-specific configuration if renewal by email is disabled.
-            self._template_html = self.config.account_validity_template_html
-            self._template_text = self.config.account_validity_template_text
+            self._template_html = (
+                hs.config.account_validity.account_validity_template_html
+            )
+            self._template_text = (
+                hs.config.account_validity.account_validity_template_text
+            )
+            account_validity_renew_email_subject = (
+                hs.config.account_validity.account_validity_renew_email_subject
+            )
 
             try:
-                app_name = self.hs.config.email_app_name
+                app_name = hs.config.email_app_name
 
-                self._subject = self._account_validity.renew_email_subject % {
-                    "app": app_name
-                }
+                self._subject = account_validity_renew_email_subject % {"app": app_name}
 
-                self._from_string = self.hs.config.email_notif_from % {"app": app_name}
+                self._from_string = hs.config.email_notif_from % {"app": app_name}
             except Exception:
                 # If substitution failed, fall back to the bare strings.
-                self._subject = self._account_validity.renew_email_subject
-                self._from_string = self.hs.config.email_notif_from
+                self._subject = account_validity_renew_email_subject
+                self._from_string = hs.config.email_notif_from
 
             self._raw_from = email.utils.parseaddr(self._from_string)[1]
 
@@ -220,50 +236,87 @@ class AccountValidityHandler:
                 attempts += 1
         raise StoreError(500, "Couldn't generate a unique string as refresh string.")
 
-    async def renew_account(self, renewal_token: str) -> bool:
+    async def renew_account(self, renewal_token: str) -> Tuple[bool, bool, int]:
         """Renews the account attached to a given renewal token by pushing back the
         expiration date by the current validity period in the server's configuration.
 
+        If it turns out that the token is valid but has already been used, then the
+        token is considered stale. A token is stale if the 'token_used_ts_ms' db column
+        is non-null.
+
         Args:
             renewal_token: Token sent with the renewal request.
         Returns:
-            Whether the provided token is valid.
+            A tuple containing:
+              * A bool representing whether the token is valid and unused.
+              * A bool which is `True` if the token is valid, but stale.
+              * An int representing the user's expiry timestamp as milliseconds since the
+                epoch, or 0 if the token was invalid.
         """
         try:
-            user_id = await self.store.get_user_from_renewal_token(renewal_token)
+            (
+                user_id,
+                current_expiration_ts,
+                token_used_ts,
+            ) = await self.store.get_user_from_renewal_token(renewal_token)
         except StoreError:
-            return False
+            return False, False, 0
+
+        # Check whether this token has already been used.
+        if token_used_ts:
+            logger.info(
+                "User '%s' attempted to use previously used token '%s' to renew account",
+                user_id,
+                renewal_token,
+            )
+            return False, True, current_expiration_ts
 
         logger.debug("Renewing an account for user %s", user_id)
-        await self.renew_account_for_user(user_id)
 
-        return True
+        # Renew the account. Pass the renewal_token here so that it is not cleared.
+        # We want to keep the token around in case the user attempts to renew their
+        # account with the same token twice (clicking the email link twice).
+        #
+        # In that case, the token will be accepted, but the account's expiration ts
+        # will remain unchanged.
+        new_expiration_ts = await self.renew_account_for_user(
+            user_id, renewal_token=renewal_token
+        )
+
+        return True, False, new_expiration_ts
 
     async def renew_account_for_user(
         self,
         user_id: str,
         expiration_ts: Optional[int] = None,
         email_sent: bool = False,
+        renewal_token: Optional[str] = None,
     ) -> int:
         """Renews the account attached to a given user by pushing back the
         expiration date by the current validity period in the server's
         configuration.
 
         Args:
-            renewal_token: Token sent with the renewal request.
+            user_id: The ID of the user to renew.
             expiration_ts: New expiration date. Defaults to now + validity period.
-            email_sen: Whether an email has been sent for this validity period.
-                Defaults to False.
+            email_sent: Whether an email has been sent for this validity period.
+            renewal_token: Token sent with the renewal request. The user's token
+                will be cleared if this is None.
 
         Returns:
             New expiration date for this account, as a timestamp in
             milliseconds since epoch.
         """
+        now = self.clock.time_msec()
         if expiration_ts is None:
-            expiration_ts = self.clock.time_msec() + self._account_validity.period
+            expiration_ts = now + self._account_validity_period
 
         await self.store.set_account_validity_for_user(
-            user_id=user_id, expiration_ts=expiration_ts, email_sent=email_sent
+            user_id=user_id,
+            expiration_ts=expiration_ts,
+            email_sent=email_sent,
+            renewal_token=renewal_token,
+            token_used_ts=now,
         )
 
         return expiration_ts
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index d7bc4e23ed..177310f0be 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -12,7 +12,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Dict, List, Optional, Union
+from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Union
 
 from prometheus_client import Counter
 
@@ -33,7 +33,7 @@ from synapse.metrics.background_process_metrics import (
     wrap_as_background_process,
 )
 from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import Collection, JsonDict, RoomAlias, RoomStreamToken, UserID
+from synapse.types import JsonDict, RoomAlias, RoomStreamToken, UserID
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index b8a37b6477..36f2450e2e 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -1248,7 +1248,7 @@ class AuthHandler(BaseHandler):
 
         # see if any of our auth providers want to know about this
         for provider in self.password_providers:
-            for token, token_id, device_id in tokens_and_devices:
+            for token, _, device_id in tokens_and_devices:
                 await provider.on_logged_out(
                     user_id=user_id, device_id=device_id, access_token=token
                 )
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas.py
index 7346ccfe93..7346ccfe93 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas.py
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 3f6f9f7f3d..45d2404dde 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -49,7 +49,9 @@ class DeactivateAccountHandler(BaseHandler):
         if hs.config.run_background_tasks:
             hs.get_reactor().callWhenRunning(self._start_user_parting)
 
-        self._account_validity_enabled = hs.config.account_validity.enabled
+        self._account_validity_enabled = (
+            hs.config.account_validity.account_validity_enabled
+        )
 
     async def deactivate_account(
         self,
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index d75edb184b..95bdc5902a 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Collection, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse.api import errors
 from synapse.api.constants import EventTypes
@@ -28,7 +28,6 @@ from synapse.api.errors import (
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import (
-    Collection,
     JsonDict,
     StreamToken,
     UserID,
@@ -156,8 +155,7 @@ class DeviceWorkerHandler(BaseHandler):
             # The user may have left the room
             # TODO: Check if they actually did or if we were just invited.
             if room_id not in room_ids:
-                for key, event_id in current_state_ids.items():
-                    etype, state_key = key
+                for etype, state_key in current_state_ids.keys():
                     if etype != EventTypes.Member:
                         continue
                     possibly_left.add(state_key)
@@ -179,8 +177,7 @@ class DeviceWorkerHandler(BaseHandler):
                 log_kv(
                     {"event": "encountered empty previous state", "room_id": room_id}
                 )
-                for key, event_id in current_state_ids.items():
-                    etype, state_key = key
+                for etype, state_key in current_state_ids.keys():
                     if etype != EventTypes.Member:
                         continue
                     possibly_changed.add(state_key)
@@ -198,8 +195,7 @@ class DeviceWorkerHandler(BaseHandler):
             for state_dict in prev_state_ids.values():
                 member_event = state_dict.get((EventTypes.Member, user_id), None)
                 if not member_event or member_event != current_member_id:
-                    for key, event_id in current_state_ids.items():
-                        etype, state_key = key
+                    for etype, state_key in current_state_ids.keys():
                         if etype != EventTypes.Member:
                             continue
                         possibly_changed.add(state_key)
@@ -714,7 +710,7 @@ class DeviceListUpdater:
                 # This can happen since we batch updates
                 return
 
-            for device_id, stream_id, prev_ids, content in pending_updates:
+            for device_id, stream_id, prev_ids, _ in pending_updates:
                 logger.debug(
                     "Handling update %r/%r, ID: %r, prev: %r ",
                     user_id,
@@ -740,7 +736,7 @@ class DeviceListUpdater:
             else:
                 # Simply update the single device, since we know that is the only
                 # change (because of the single prev_id matching the current cache)
-                for device_id, stream_id, prev_ids, content in pending_updates:
+                for device_id, stream_id, _, content in pending_updates:
                     await self.store.update_remote_device_list_cache_entry(
                         user_id, device_id, content, stream_id
                     )
@@ -929,6 +925,10 @@ class DeviceListUpdater:
         else:
             cached_devices = await self.store.get_cached_devices_for_user(user_id)
             if cached_devices == {d["device_id"]: d for d in devices}:
+                logging.info(
+                    "Skipping device list resync for %s, as our cache matches already",
+                    user_id,
+                )
                 devices = []
                 ignore_devices = True
 
@@ -944,6 +944,9 @@ class DeviceListUpdater:
             await self.store.update_remote_device_list_cache(
                 user_id, devices, stream_id
             )
+        # mark the cache as valid, whether or not we actually processed any device
+        # list updates.
+        await self.store.mark_remote_user_device_cache_as_valid(user_id)
         device_ids = [device["device_id"] for device in devices]
 
         # Handle cross-signing keys.
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
new file mode 100644
index 0000000000..eff639f407
--- /dev/null
+++ b/synapse/handlers/event_auth.py
@@ -0,0 +1,86 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+from typing import TYPE_CHECKING
+
+from synapse.api.constants import EventTypes, JoinRules
+from synapse.api.room_versions import RoomVersion
+from synapse.types import StateMap
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+class EventAuthHandler:
+    """
+    This class contains methods for authenticating events added to room graphs.
+    """
+
+    def __init__(self, hs: "HomeServer"):
+        self._store = hs.get_datastore()
+
+    async def can_join_without_invite(
+        self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str
+    ) -> bool:
+        """
+        Check whether a user can join a room without an invite.
+
+        When joining a room with restricted joined rules (as defined in MSC3083),
+        the membership of spaces must be checked during join.
+
+        Args:
+            state_ids: The state of the room as it currently is.
+            room_version: The room version of the room being joined.
+            user_id: The user joining the room.
+
+        Returns:
+            True if the user can join the room, false otherwise.
+        """
+        # This only applies to room versions which support the new join rule.
+        if not room_version.msc3083_join_rules:
+            return True
+
+        # If there's no join rule, then it defaults to invite (so this doesn't apply).
+        join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
+        if not join_rules_event_id:
+            return True
+
+        # If the join rule is not restricted, this doesn't apply.
+        join_rules_event = await self._store.get_event(join_rules_event_id)
+        if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED:
+            return True
+
+        # If allowed is of the wrong form, then only allow invited users.
+        allowed_spaces = join_rules_event.content.get("allow", [])
+        if not isinstance(allowed_spaces, list):
+            return False
+
+        # Get the list of joined rooms and see if there's an overlap.
+        joined_rooms = await self._store.get_rooms_for_user(user_id)
+
+        # Pull out the other room IDs, invalid data gets filtered.
+        for space in allowed_spaces:
+            if not isinstance(space, dict):
+                continue
+
+            space_id = space.get("space")
+            if not isinstance(space_id, str):
+                continue
+
+            # The user was joined to one of the spaces specified, they can join
+            # this room!
+            if space_id in joined_rooms:
+                return True
+
+        # The user was not in any of the required spaces.
+        return False
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 4b3730aa3b..9d867aaf4d 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -146,6 +146,7 @@ class FederationHandler(BaseHandler):
         self.is_mine_id = hs.is_mine_id
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
+        self._event_auth_handler = hs.get_event_auth_handler()
         self._message_handler = hs.get_message_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
         self.config = hs.config
@@ -1673,8 +1674,40 @@ class FederationHandler(BaseHandler):
         # would introduce the danger of backwards-compatibility problems.
         event.internal_metadata.send_on_behalf_of = origin
 
+        # Calculate the event context.
         context = await self.state_handler.compute_event_context(event)
-        context = await self._auth_and_persist_event(origin, event, context)
+
+        # Get the state before the new event.
+        prev_state_ids = await context.get_prev_state_ids()
+
+        # Check if the user is already in the room or invited to the room.
+        user_id = event.state_key
+        prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+        newly_joined = True
+        user_is_invited = False
+        if prev_member_event_id:
+            prev_member_event = await self.store.get_event(prev_member_event_id)
+            newly_joined = prev_member_event.membership != Membership.JOIN
+            user_is_invited = prev_member_event.membership == Membership.INVITE
+
+        # If the member is not already in the room, and not invited, check if
+        # they should be allowed access via membership in a space.
+        if (
+            newly_joined
+            and not user_is_invited
+            and not await self._event_auth_handler.can_join_without_invite(
+                prev_state_ids,
+                event.room_version,
+                user_id,
+            )
+        ):
+            raise AuthError(
+                403,
+                "You do not belong to any of the required spaces to join this room.",
+            )
+
+        # Persist the event.
+        await self._auth_and_persist_event(origin, event, context)
 
         logger.debug(
             "on_send_join_request: After _auth_and_persist_event: %s, sigs: %s",
@@ -1682,8 +1715,6 @@ class FederationHandler(BaseHandler):
             event.signatures,
         )
 
-        prev_state_ids = await context.get_prev_state_ids()
-
         state_ids = list(prev_state_ids.values())
         auth_chain = await self.store.get_auth_chain(event.room_id, state_ids)
 
@@ -2006,7 +2037,7 @@ class FederationHandler(BaseHandler):
         state: Optional[Iterable[EventBase]] = None,
         auth_events: Optional[MutableStateMap[EventBase]] = None,
         backfilled: bool = False,
-    ) -> EventContext:
+    ) -> None:
         """
         Process an event by performing auth checks and then persisting to the database.
 
@@ -2028,9 +2059,6 @@ class FederationHandler(BaseHandler):
                 event is an outlier), may be the auth events claimed by the remote
                 server.
             backfilled: True if the event was backfilled.
-
-        Returns:
-             The event context.
         """
         context = await self._check_event_auth(
             origin,
@@ -2060,8 +2088,6 @@ class FederationHandler(BaseHandler):
             )
             raise
 
-        return context
-
     async def _auth_and_persist_events(
         self,
         origin: str,
@@ -2956,7 +2982,7 @@ class FederationHandler(BaseHandler):
             try:
                 # for each sig on the third_party_invite block of the actual invite
                 for server, signature_block in signed["signatures"].items():
-                    for key_name, encoded_signature in signature_block.items():
+                    for key_name in signature_block.keys():
                         if not key_name.startswith("ed25519:"):
                             continue
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 87a8b89237..0b3b1fadb5 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -15,7 +15,6 @@
 # limitations under the License.
 
 """Utilities for interacting with Identity Servers"""
-
 import logging
 import urllib.parse
 from typing import Awaitable, Callable, Dict, List, Optional, Tuple
@@ -34,7 +33,11 @@ from synapse.http.site import SynapseRequest
 from synapse.types import JsonDict, Requester
 from synapse.util import json_decoder
 from synapse.util.hash import sha256_and_url_safe_base64
-from synapse.util.stringutils import assert_valid_client_secret, random_string
+from synapse.util.stringutils import (
+    assert_valid_client_secret,
+    random_string,
+    valid_id_server_location,
+)
 
 from ._base import BaseHandler
 
@@ -172,6 +175,11 @@ class IdentityHandler(BaseHandler):
                 server with, if necessary. Required if use_v2 is true
             use_v2: Whether to use v2 Identity Service API endpoints. Defaults to True
 
+        Raises:
+            SynapseError: On any of the following conditions
+                - the supplied id_server is not a valid identity server name
+                - we failed to contact the supplied identity server
+
         Returns:
             The response from the identity server
         """
@@ -181,6 +189,12 @@ class IdentityHandler(BaseHandler):
         if id_access_token is None:
             use_v2 = False
 
+        if not valid_id_server_location(id_server):
+            raise SynapseError(
+                400,
+                "id_server must be a valid hostname with optional port and path components",
+            )
+
         # Decide which API endpoint URLs to use
         headers = {}
         bind_data = {"sid": sid, "client_secret": client_secret, "mxid": mxid}
@@ -269,12 +283,21 @@ class IdentityHandler(BaseHandler):
             id_server: Identity server to unbind from
 
         Raises:
-            SynapseError: If we failed to contact the identity server
+            SynapseError: On any of the following conditions
+                - the supplied id_server is not a valid identity server name
+                - we failed to contact the supplied identity server
 
         Returns:
             True on success, otherwise False if the identity
             server doesn't support unbinding
         """
+
+        if not valid_id_server_location(id_server):
+            raise SynapseError(
+                400,
+                "id_server must be a valid hostname with optional port and path components",
+            )
+
         url = "https://%s/_matrix/identity/api/v1/3pid/unbind" % (id_server,)
         url_bytes = "/_matrix/identity/api/v1/3pid/unbind".encode("ascii")
 
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc.py
index b156196a70..ee6e41c0e4 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc.py
@@ -15,7 +15,7 @@
 import inspect
 import logging
 from typing import TYPE_CHECKING, Dict, Generic, List, Optional, TypeVar, Union
-from urllib.parse import urlencode
+from urllib.parse import urlencode, urlparse
 
 import attr
 import pymacaroons
@@ -37,10 +37,7 @@ from twisted.web.client import readBody
 from twisted.web.http_headers import Headers
 
 from synapse.config import ConfigError
-from synapse.config.oidc_config import (
-    OidcProviderClientSecretJwtKey,
-    OidcProviderConfig,
-)
+from synapse.config.oidc import OidcProviderClientSecretJwtKey, OidcProviderConfig
 from synapse.handlers.sso import MappingException, UserAttributes
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import make_deferred_yieldable
@@ -71,8 +68,8 @@ logger = logging.getLogger(__name__)
 #
 # Here we have the names of the cookies, and the options we use to set them.
 _SESSION_COOKIES = [
-    (b"oidc_session", b"Path=/_synapse/client/oidc; HttpOnly; Secure; SameSite=None"),
-    (b"oidc_session_no_samesite", b"Path=/_synapse/client/oidc; HttpOnly"),
+    (b"oidc_session", b"HttpOnly; Secure; SameSite=None"),
+    (b"oidc_session_no_samesite", b"HttpOnly"),
 ]
 
 #: A token exchanged from the token endpoint, as per RFC6749 sec 5.1. and
@@ -282,6 +279,13 @@ class OidcProvider:
         self._config = provider
         self._callback_url = hs.config.oidc_callback_url  # type: str
 
+        # Calculate the prefix for OIDC callback paths based on the public_baseurl.
+        # We'll insert this into the Path= parameter of any session cookies we set.
+        public_baseurl_path = urlparse(hs.config.server.public_baseurl).path
+        self._callback_path_prefix = (
+            public_baseurl_path.encode("utf-8") + b"_synapse/client/oidc"
+        )
+
         self._oidc_attribute_requirements = provider.attribute_requirements
         self._scopes = provider.scopes
         self._user_profile_method = provider.user_profile_method
@@ -782,8 +786,13 @@ class OidcProvider:
 
         for cookie_name, options in _SESSION_COOKIES:
             request.cookies.append(
-                b"%s=%s; Max-Age=3600; %s"
-                % (cookie_name, cookie.encode("utf-8"), options)
+                b"%s=%s; Max-Age=3600; Path=%s; %s"
+                % (
+                    cookie_name,
+                    cookie.encode("utf-8"),
+                    self._callback_path_prefix,
+                    options,
+                )
             )
 
         metadata = await self.load_metadata()
@@ -960,6 +969,11 @@ class OidcProvider:
                 # and attempt to match it.
                 attributes = await oidc_response_to_user_attributes(failures=0)
 
+                if attributes.localpart is None:
+                    # If no localpart is returned then we will generate one, so
+                    # there is no need to search for existing users.
+                    return None
+
                 user_id = UserID(attributes.localpart, self._server_name).to_string()
                 users = await self._store.get_users_by_id_case_insensitive(user_id)
                 if users:
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 6460eb9952..969c73c1e7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -24,9 +24,11 @@ The methods that define policy are:
 import abc
 import contextlib
 import logging
+from bisect import bisect
 from contextlib import contextmanager
 from typing import (
     TYPE_CHECKING,
+    Collection,
     Dict,
     FrozenSet,
     Iterable,
@@ -53,10 +55,11 @@ from synapse.replication.http.presence import (
     ReplicationBumpPresenceActiveTime,
     ReplicationPresenceSetState,
 )
+from synapse.replication.http.streams import ReplicationGetStreamUpdates
 from synapse.replication.tcp.commands import ClearUserSyncsCommand
-from synapse.state import StateHandler
+from synapse.replication.tcp.streams import PresenceFederationStream, PresenceStream
 from synapse.storage.databases.main import DataStore
-from synapse.types import Collection, JsonDict, UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.descriptors import _CacheContext, cached
 from synapse.util.metrics import Measure
@@ -118,19 +121,21 @@ assert LAST_ACTIVE_GRANULARITY < IDLE_TIMER
 
 
 class BasePresenceHandler(abc.ABC):
-    """Parts of the PresenceHandler that are shared between workers and master"""
+    """Parts of the PresenceHandler that are shared between workers and presence
+    writer"""
 
     def __init__(self, hs: "HomeServer"):
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
         self.presence_router = hs.get_presence_router()
         self.state = hs.get_state_handler()
+        self.is_mine_id = hs.is_mine_id
 
         self._federation = None
-        if hs.should_send_federation() or not hs.config.worker_app:
+        if hs.should_send_federation():
             self._federation = hs.get_federation_sender()
 
-        self._send_federation = hs.should_send_federation()
+        self._federation_queue = PresenceFederationQueue(hs, self)
 
         self._busy_presence_enabled = hs.config.experimental.msc3026_enabled
 
@@ -253,28 +258,38 @@ class BasePresenceHandler(abc.ABC):
         """
         pass
 
-    async def process_replication_rows(self, token, rows):
-        """Process presence stream rows received over replication."""
-        pass
+    async def process_replication_rows(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
+        """Process streams received over replication."""
+        await self._federation_queue.process_replication_rows(
+            stream_name, instance_name, token, rows
+        )
+
+    def get_federation_queue(self) -> "PresenceFederationQueue":
+        """Get the presence federation queue."""
+        return self._federation_queue
 
     async def maybe_send_presence_to_interested_destinations(
         self, states: List[UserPresenceState]
     ):
         """If this instance is a federation sender, send the states to all
-        destinations that are interested.
+        destinations that are interested. Filters out any states for remote
+        users.
         """
 
-        if not self._send_federation:
+        if not self._federation:
             return
 
-        # If this worker sends federation we must have a FederationSender.
-        assert self._federation
+        states = [s for s in states if self.is_mine_id(s.user_id)]
+
+        if not states:
+            return
 
         hosts_and_states = await get_interested_remotes(
             self.store,
             self.presence_router,
             states,
-            self.state,
         )
 
         for destinations, states in hosts_and_states:
@@ -292,10 +307,17 @@ class WorkerPresenceHandler(BasePresenceHandler):
     def __init__(self, hs):
         super().__init__(hs)
         self.hs = hs
-        self.is_mine_id = hs.is_mine_id
+
+        self._presence_writer_instance = hs.config.worker.writers.presence[0]
 
         self._presence_enabled = hs.config.use_presence
 
+        # Route presence EDUs to the right worker
+        hs.get_federation_registry().register_instances_for_edu(
+            "m.presence",
+            hs.config.worker.writers.presence,
+        )
+
         # The number of ongoing syncs on this process, by user id.
         # Empty if _presence_enabled is false.
         self._user_to_num_current_syncs = {}  # type: Dict[str, int]
@@ -303,8 +325,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
         self.notifier = hs.get_notifier()
         self.instance_id = hs.get_instance_id()
 
-        # user_id -> last_sync_ms. Lists the users that have stopped syncing
-        # but we haven't notified the master of that yet
+        # user_id -> last_sync_ms. Lists the users that have stopped syncing but
+        # we haven't notified the presence writer of that yet
         self.users_going_offline = {}
 
         self._bump_active_client = ReplicationBumpPresenceActiveTime.make_client(hs)
@@ -337,22 +359,23 @@ class WorkerPresenceHandler(BasePresenceHandler):
             )
 
     def mark_as_coming_online(self, user_id):
-        """A user has started syncing. Send a UserSync to the master, unless they
-        had recently stopped syncing.
+        """A user has started syncing. Send a UserSync to the presence writer,
+        unless they had recently stopped syncing.
 
         Args:
             user_id (str)
         """
         going_offline = self.users_going_offline.pop(user_id, None)
         if not going_offline:
-            # Safe to skip because we haven't yet told the master they were offline
+            # Safe to skip because we haven't yet told the presence writer they
+            # were offline
             self.send_user_sync(user_id, True, self.clock.time_msec())
 
     def mark_as_going_offline(self, user_id):
-        """A user has stopped syncing. We wait before notifying the master as
-        its likely they'll come back soon. This allows us to avoid sending
-        a stopped syncing immediately followed by a started syncing notification
-        to the master
+        """A user has stopped syncing. We wait before notifying the presence
+        writer as its likely they'll come back soon. This allows us to avoid
+        sending a stopped syncing immediately followed by a started syncing
+        notification to the presence writer
 
         Args:
             user_id (str)
@@ -360,8 +383,8 @@ class WorkerPresenceHandler(BasePresenceHandler):
         self.users_going_offline[user_id] = self.clock.time_msec()
 
     def send_stop_syncing(self):
-        """Check if there are any users who have stopped syncing a while ago
-        and haven't come back yet. If there are poke the master about them.
+        """Check if there are any users who have stopped syncing a while ago and
+        haven't come back yet. If there are poke the presence writer about them.
         """
         now = self.clock.time_msec()
         for user_id, last_sync_ms in list(self.users_going_offline.items()):
@@ -421,7 +444,14 @@ class WorkerPresenceHandler(BasePresenceHandler):
         # If this is a federation sender, notify about presence updates.
         await self.maybe_send_presence_to_interested_destinations(states)
 
-    async def process_replication_rows(self, token, rows):
+    async def process_replication_rows(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
+        await super().process_replication_rows(stream_name, instance_name, token, rows)
+
+        if stream_name != PresenceStream.NAME:
+            return
+
         states = [
             UserPresenceState(
                 row.user_id,
@@ -470,9 +500,12 @@ class WorkerPresenceHandler(BasePresenceHandler):
         if not self.hs.config.use_presence:
             return
 
-        # Proxy request to master
+        # Proxy request to instance that writes presence
         await self._set_state_client(
-            user_id=user_id, state=state, ignore_status_msg=ignore_status_msg
+            instance_name=self._presence_writer_instance,
+            user_id=user_id,
+            state=state,
+            ignore_status_msg=ignore_status_msg,
         )
 
     async def bump_presence_active_time(self, user):
@@ -483,16 +516,17 @@ class WorkerPresenceHandler(BasePresenceHandler):
         if not self.hs.config.use_presence:
             return
 
-        # Proxy request to master
+        # Proxy request to instance that writes presence
         user_id = user.to_string()
-        await self._bump_active_client(user_id=user_id)
+        await self._bump_active_client(
+            instance_name=self._presence_writer_instance, user_id=user_id
+        )
 
 
 class PresenceHandler(BasePresenceHandler):
     def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.hs = hs
-        self.is_mine_id = hs.is_mine_id
         self.server_name = hs.hostname
         self.wheel_timer = WheelTimer()
         self.notifier = hs.get_notifier()
@@ -721,15 +755,12 @@ class PresenceHandler(BasePresenceHandler):
                     self.store,
                     self.presence_router,
                     list(to_federation_ping.values()),
-                    self.state,
                 )
 
-                # Since this is master we know that we have a federation sender or
-                # queue, and so this will be defined.
-                assert self._federation
-
                 for destinations, states in hosts_and_states:
-                    self._federation.send_presence_to_destinations(states, destinations)
+                    self._federation_queue.send_presence_to_destinations(
+                        states, destinations
+                    )
 
     async def _handle_timeouts(self):
         """Checks the presence of users that have timed out and updates as
@@ -1208,13 +1239,9 @@ class PresenceHandler(BasePresenceHandler):
                     user_presence_states
                 )
 
-        # Since this is master we know that we have a federation sender or
-        # queue, and so this will be defined.
-        assert self._federation
-
         # Send out user presence updates for each destination
         for destination, user_state_set in presence_destinations.items():
-            self._federation.send_presence_to_destinations(
+            self._federation_queue.send_presence_to_destinations(
                 destinations=[destination], states=user_state_set
             )
 
@@ -1354,7 +1381,6 @@ class PresenceEventSource:
         self.get_presence_router = hs.get_presence_router
         self.clock = hs.get_clock()
         self.store = hs.get_datastore()
-        self.state = hs.get_state_handler()
 
     @log_function
     async def get_new_events(
@@ -1823,7 +1849,6 @@ async def get_interested_remotes(
     store: DataStore,
     presence_router: PresenceRouter,
     states: List[UserPresenceState],
-    state_handler: StateHandler,
 ) -> List[Tuple[Collection[str], List[UserPresenceState]]]:
     """Given a list of presence states figure out which remote servers
     should be sent which.
@@ -1834,7 +1859,6 @@ async def get_interested_remotes(
         store: The homeserver's data store.
         presence_router: A module for augmenting the destinations for presence updates.
         states: A list of incoming user presence updates.
-        state_handler:
 
     Returns:
         A list of 2-tuples of destinations and states, where for
@@ -1851,7 +1875,8 @@ async def get_interested_remotes(
     )
 
     for room_id, states in room_ids_to_states.items():
-        hosts = await state_handler.get_current_hosts_in_room(room_id)
+        user_ids = await store.get_users_in_room(room_id)
+        hosts = {get_domain_from_id(user_id) for user_id in user_ids}
         hosts_and_states.append((hosts, states))
 
     for user_id, states in users_to_states.items():
@@ -1859,3 +1884,198 @@ async def get_interested_remotes(
         hosts_and_states.append(([host], states))
 
     return hosts_and_states
+
+
+class PresenceFederationQueue:
+    """Handles sending ad hoc presence updates over federation, which are *not*
+    due to state updates (that get handled via the presence stream), e.g.
+    federation pings and sending existing present states to newly joined hosts.
+
+    Only the last N minutes will be queued, so if a federation sender instance
+    is down for longer then some updates will be dropped. This is OK as presence
+    is ephemeral, and so it will self correct eventually.
+
+    On workers the class tracks the last received position of the stream from
+    replication, and handles querying for missed updates over HTTP replication,
+    c.f. `get_current_token` and `get_replication_rows`.
+    """
+
+    # How long to keep entries in the queue for. Workers that are down for
+    # longer than this duration will miss out on older updates.
+    _KEEP_ITEMS_IN_QUEUE_FOR_MS = 5 * 60 * 1000
+
+    # How often to check if we can expire entries from the queue.
+    _CLEAR_ITEMS_EVERY_MS = 60 * 1000
+
+    def __init__(self, hs: "HomeServer", presence_handler: BasePresenceHandler):
+        self._clock = hs.get_clock()
+        self._notifier = hs.get_notifier()
+        self._instance_name = hs.get_instance_name()
+        self._presence_handler = presence_handler
+        self._repl_client = ReplicationGetStreamUpdates.make_client(hs)
+
+        # Should we keep a queue of recent presence updates? We only bother if
+        # another process may be handling federation sending.
+        self._queue_presence_updates = True
+
+        # Whether this instance is a presence writer.
+        self._presence_writer = self._instance_name in hs.config.worker.writers.presence
+
+        # The FederationSender instance, if this process sends federation traffic directly.
+        self._federation = None
+
+        if hs.should_send_federation():
+            self._federation = hs.get_federation_sender()
+
+            # We don't bother queuing up presence states if only this instance
+            # is sending federation.
+            if hs.config.worker.federation_shard_config.instances == [
+                self._instance_name
+            ]:
+                self._queue_presence_updates = False
+
+        # The queue of recently queued updates as tuples of: `(timestamp,
+        # stream_id, destinations, user_ids)`. We don't store the full states
+        # for efficiency, and remote workers will already have the full states
+        # cached.
+        self._queue = []  # type: List[Tuple[int, int, Collection[str], Set[str]]]
+
+        self._next_id = 1
+
+        # Map from instance name to current token
+        self._current_tokens = {}  # type: Dict[str, int]
+
+        if self._queue_presence_updates:
+            self._clock.looping_call(self._clear_queue, self._CLEAR_ITEMS_EVERY_MS)
+
+    def _clear_queue(self):
+        """Clear out older entries from the queue."""
+        clear_before = self._clock.time_msec() - self._KEEP_ITEMS_IN_QUEUE_FOR_MS
+
+        # The queue is sorted by timestamp, so we can bisect to find the right
+        # place to purge before. Note that we are searching using a 1-tuple with
+        # the time, which does The Right Thing since the queue is a tuple where
+        # the first item is a timestamp.
+        index = bisect(self._queue, (clear_before,))
+        self._queue = self._queue[index:]
+
+    def send_presence_to_destinations(
+        self, states: Collection[UserPresenceState], destinations: Collection[str]
+    ) -> None:
+        """Send the presence states to the given destinations.
+
+        Will forward to the local federation sender (if there is one) and queue
+        to send over replication (if there are other federation sender instances.).
+
+        Must only be called on the presence writer process.
+        """
+
+        # This should only be called on a presence writer.
+        assert self._presence_writer
+
+        if self._federation:
+            self._federation.send_presence_to_destinations(
+                states=states,
+                destinations=destinations,
+            )
+
+        if not self._queue_presence_updates:
+            return
+
+        now = self._clock.time_msec()
+
+        stream_id = self._next_id
+        self._next_id += 1
+
+        self._queue.append((now, stream_id, destinations, {s.user_id for s in states}))
+
+        self._notifier.notify_replication()
+
+    def get_current_token(self, instance_name: str) -> int:
+        """Get the current position of the stream.
+
+        On workers this returns the last stream ID received from replication.
+        """
+        if instance_name == self._instance_name:
+            return self._next_id - 1
+        else:
+            return self._current_tokens.get(instance_name, 0)
+
+    async def get_replication_rows(
+        self,
+        instance_name: str,
+        from_token: int,
+        upto_token: int,
+        target_row_count: int,
+    ) -> Tuple[List[Tuple[int, Tuple[str, str]]], int, bool]:
+        """Get all the updates between the two tokens.
+
+        We return rows in the form of `(destination, user_id)` to keep the size
+        of each row bounded (rather than returning the sets in a row).
+
+        On workers this will query the presence writer process via HTTP replication.
+        """
+        if instance_name != self._instance_name:
+            # If not local we query over http replication from the presence
+            # writer
+            result = await self._repl_client(
+                instance_name=instance_name,
+                stream_name=PresenceFederationStream.NAME,
+                from_token=from_token,
+                upto_token=upto_token,
+            )
+            return result["updates"], result["upto_token"], result["limited"]
+
+        # We can find the correct position in the queue by noting that there is
+        # exactly one entry per stream ID, and that the last entry has an ID of
+        # `self._next_id - 1`, so we can count backwards from the end.
+        #
+        # Since the start of the queue is periodically truncated we need to
+        # handle the case where `from_token` stream ID has already been dropped.
+        start_idx = max(from_token - self._next_id, -len(self._queue))
+
+        to_send = []  # type: List[Tuple[int, Tuple[str, str]]]
+        limited = False
+        new_id = upto_token
+        for _, stream_id, destinations, user_ids in self._queue[start_idx:]:
+            if stream_id > upto_token:
+                break
+
+            new_id = stream_id
+
+            to_send.extend(
+                (stream_id, (destination, user_id))
+                for destination in destinations
+                for user_id in user_ids
+            )
+
+            if len(to_send) > target_row_count:
+                limited = True
+                break
+
+        return to_send, new_id, limited
+
+    async def process_replication_rows(
+        self, stream_name: str, instance_name: str, token: int, rows: list
+    ):
+        if stream_name != PresenceFederationStream.NAME:
+            return
+
+        # We keep track of the current tokens (so that we can catch up with anything we missed after a disconnect)
+        self._current_tokens[instance_name] = token
+
+        # If we're a federation sender we pull out the presence states to send
+        # and forward them on.
+        if not self._federation:
+            return
+
+        hosts_to_users = {}  # type: Dict[str, Set[str]]
+        for row in rows:
+            hosts_to_users.setdefault(row.destination, set()).add(row.user_id)
+
+        for host, user_ids in hosts_to_users.items():
+            states = await self._presence_handler.current_state_for_users(user_ids)
+            self._federation.send_presence_to_destinations(
+                states=states.values(),
+                destinations=[host],
+            )
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 6e28677530..54c25e3557 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -19,7 +19,7 @@ from http import HTTPStatus
 from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
 
 from synapse import types
-from synapse.api.constants import AccountDataTypes, EventTypes, JoinRules, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -28,7 +28,6 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.api.ratelimiting import Ratelimiter
-from synapse.api.room_versions import RoomVersion
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
@@ -64,6 +63,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.account_data_handler = hs.get_account_data_handler()
+        self.event_auth_handler = hs.get_event_auth_handler()
 
         self.member_linearizer = Linearizer(name="member")
         self.member_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -179,62 +179,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         await self._invites_per_user_limiter.ratelimit(requester, invitee_user_id)
 
-    async def _can_join_without_invite(
-        self, state_ids: StateMap[str], room_version: RoomVersion, user_id: str
-    ) -> bool:
-        """
-        Check whether a user can join a room without an invite.
-
-        When joining a room with restricted joined rules (as defined in MSC3083),
-        the membership of spaces must be checked during join.
-
-        Args:
-            state_ids: The state of the room as it currently is.
-            room_version: The room version of the room being joined.
-            user_id: The user joining the room.
-
-        Returns:
-            True if the user can join the room, false otherwise.
-        """
-        # This only applies to room versions which support the new join rule.
-        if not room_version.msc3083_join_rules:
-            return True
-
-        # If there's no join rule, then it defaults to public (so this doesn't apply).
-        join_rules_event_id = state_ids.get((EventTypes.JoinRules, ""), None)
-        if not join_rules_event_id:
-            return True
-
-        # If the join rule is not restricted, this doesn't apply.
-        join_rules_event = await self.store.get_event(join_rules_event_id)
-        if join_rules_event.content.get("join_rule") != JoinRules.MSC3083_RESTRICTED:
-            return True
-
-        # If allowed is of the wrong form, then only allow invited users.
-        allowed_spaces = join_rules_event.content.get("allow", [])
-        if not isinstance(allowed_spaces, list):
-            return False
-
-        # Get the list of joined rooms and see if there's an overlap.
-        joined_rooms = await self.store.get_rooms_for_user(user_id)
-
-        # Pull out the other room IDs, invalid data gets filtered.
-        for space in allowed_spaces:
-            if not isinstance(space, dict):
-                continue
-
-            space_id = space.get("space")
-            if not isinstance(space_id, str):
-                continue
-
-            # The user was joined to one of the spaces specified, they can join
-            # this room!
-            if space_id in joined_rooms:
-                return True
-
-        # The user was not in any of the required spaces.
-        return False
-
     async def _local_membership_update(
         self,
         requester: Requester,
@@ -303,7 +247,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             if (
                 newly_joined
                 and not user_is_invited
-                and not await self._can_join_without_invite(
+                and not await self.event_auth_handler.can_join_without_invite(
                     prev_state_ids, event.room_version, user_id
                 )
             ):
diff --git a/synapse/handlers/saml_handler.py b/synapse/handlers/saml.py
index 80ba65b9e0..80ba65b9e0 100644
--- a/synapse/handlers/saml_handler.py
+++ b/synapse/handlers/saml.py
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 8d00ffdc73..044ff06d84 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -18,6 +18,7 @@ from typing import (
     Any,
     Awaitable,
     Callable,
+    Collection,
     Dict,
     Iterable,
     List,
@@ -40,7 +41,7 @@ from synapse.handlers.ui_auth import UIAuthSessionDataConstants
 from synapse.http import get_request_user_agent
 from synapse.http.server import respond_with_html, respond_with_redirect
 from synapse.http.site import SynapseRequest
-from synapse.types import Collection, JsonDict, UserID, contains_invalid_mxid_characters
+from synapse.types import JsonDict, UserID, contains_invalid_mxid_characters
 from synapse.util.async_helpers import Linearizer
 from synapse.util.stringutils import random_string
 
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index d2ba805f86..3ffc4628cb 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -14,7 +14,17 @@
 # limitations under the License.
 import itertools
 import logging
-from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Any,
+    Collection,
+    Dict,
+    FrozenSet,
+    List,
+    Optional,
+    Set,
+    Tuple,
+)
 
 import attr
 from prometheus_client import Counter
@@ -28,7 +38,6 @@ from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
 from synapse.types import (
-    Collection,
     JsonDict,
     MutableStateMap,
     Requester,
diff --git a/synapse/http/client.py b/synapse/http/client.py
index 1730187ffa..5f40f16e24 100644
--- a/synapse/http/client.py
+++ b/synapse/http/client.py
@@ -33,6 +33,7 @@ import treq
 from canonicaljson import encode_canonical_json
 from netaddr import AddrFormatError, IPAddress, IPSet
 from prometheus_client import Counter
+from typing_extensions import Protocol
 from zope.interface import implementer, provider
 
 from OpenSSL import SSL
@@ -754,6 +755,16 @@ def _timeout_to_request_timed_out_error(f: Failure):
     return f
 
 
+class ByteWriteable(Protocol):
+    """The type of object which must be passed into read_body_with_max_size.
+
+    Typically this is a file object.
+    """
+
+    def write(self, data: bytes) -> int:
+        pass
+
+
 class BodyExceededMaxSize(Exception):
     """The maximum allowed size of the HTTP body was exceeded."""
 
@@ -790,7 +801,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
     transport = None  # type: Optional[ITCPTransport]
 
     def __init__(
-        self, stream: BinaryIO, deferred: defer.Deferred, max_size: Optional[int]
+        self, stream: ByteWriteable, deferred: defer.Deferred, max_size: Optional[int]
     ):
         self.stream = stream
         self.deferred = deferred
@@ -830,7 +841,7 @@ class _ReadBodyWithMaxSizeProtocol(protocol.Protocol):
 
 
 def read_body_with_max_size(
-    response: IResponse, stream: BinaryIO, max_size: Optional[int]
+    response: IResponse, stream: ByteWriteable, max_size: Optional[int]
 ) -> defer.Deferred:
     """
     Read a HTTP response body to a file-object. Optionally enforcing a maximum file size.
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index d48721a4e2..bb837b7b19 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -1,5 +1,4 @@
-# Copyright 2014-2016 OpenMarket Ltd
-# Copyright 2018 New Vector Ltd
+# Copyright 2014-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -13,11 +12,13 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import cgi
+import codecs
 import logging
 import random
 import sys
+import typing
 import urllib.parse
-from io import BytesIO
+from io import BytesIO, StringIO
 from typing import Callable, Dict, List, Optional, Tuple, Union
 
 import attr
@@ -72,6 +73,9 @@ incoming_responses_counter = Counter(
     "synapse_http_matrixfederationclient_responses", "", ["method", "code"]
 )
 
+# a federation response can be rather large (eg a big state_ids is 50M or so), so we
+# need a generous limit here.
+MAX_RESPONSE_SIZE = 100 * 1024 * 1024
 
 MAX_LONG_RETRIES = 10
 MAX_SHORT_RETRIES = 3
@@ -167,12 +171,27 @@ async def _handle_json_response(
     try:
         check_content_type_is_json(response.headers)
 
-        # Use the custom JSON decoder (partially re-implements treq.json_content).
-        d = treq.text_content(response, encoding="utf-8")
-        d.addCallback(json_decoder.decode)
+        buf = StringIO()
+        d = read_body_with_max_size(response, BinaryIOWrapper(buf), MAX_RESPONSE_SIZE)
         d = timeout_deferred(d, timeout=timeout_sec, reactor=reactor)
 
+        def parse(_len: int):
+            return json_decoder.decode(buf.getvalue())
+
+        d.addCallback(parse)
+
         body = await make_deferred_yieldable(d)
+    except BodyExceededMaxSize as e:
+        # The response was too big.
+        logger.warning(
+            "{%s} [%s] JSON response exceeded max size %i - %s %s",
+            request.txn_id,
+            request.destination,
+            MAX_RESPONSE_SIZE,
+            request.method,
+            request.uri.decode("ascii"),
+        )
+        raise RequestSendFailed(e, can_retry=False) from e
     except ValueError as e:
         # The JSON content was invalid.
         logger.warning(
@@ -218,6 +237,18 @@ async def _handle_json_response(
     return body
 
 
+class BinaryIOWrapper:
+    """A wrapper for a TextIO which converts from bytes on the fly."""
+
+    def __init__(self, file: typing.TextIO, encoding="utf-8", errors="strict"):
+        self.decoder = codecs.getincrementaldecoder(encoding)(errors)
+        self.file = file
+
+    def write(self, b: Union[bytes, bytearray]) -> int:
+        self.file.write(self.decoder.decode(b))
+        return len(b)
+
+
 class MatrixFederationHttpClient:
     """HTTP client used to talk to other homeservers over the federation
     protocol. Send client certificates and signs requests.
diff --git a/synapse/http/site.py b/synapse/http/site.py
index 32b5e19c09..671fd3fbcc 100644
--- a/synapse/http/site.py
+++ b/synapse/http/site.py
@@ -14,13 +14,14 @@
 import contextlib
 import logging
 import time
-from typing import Optional, Tuple, Type, Union
+from typing import Optional, Tuple, Union
 
 import attr
 from zope.interface import implementer
 
-from twisted.internet.interfaces import IAddress
+from twisted.internet.interfaces import IAddress, IReactorTime
 from twisted.python.failure import Failure
+from twisted.web.resource import IResource
 from twisted.web.server import Request, Site
 
 from synapse.config.server import ListenerConfig
@@ -49,6 +50,7 @@ class SynapseRequest(Request):
      * Redaction of access_token query-params in __repr__
      * Logging at start and end
      * Metrics to record CPU, wallclock and DB time by endpoint.
+     * A limit to the size of request which will be accepted
 
     It also provides a method `processing`, which returns a context manager. If this
     method is called, the request won't be logged until the context manager is closed;
@@ -59,8 +61,9 @@ class SynapseRequest(Request):
         logcontext: the log context for this request
     """
 
-    def __init__(self, channel, *args, **kw):
+    def __init__(self, channel, *args, max_request_body_size=1024, **kw):
         Request.__init__(self, channel, *args, **kw)
+        self._max_request_body_size = max_request_body_size
         self.site = channel.site  # type: SynapseSite
         self._channel = channel  # this is used by the tests
         self.start_time = 0.0
@@ -97,6 +100,18 @@ class SynapseRequest(Request):
             self.site.site_tag,
         )
 
+    def handleContentChunk(self, data):
+        # we should have a `content` by now.
+        assert self.content, "handleContentChunk() called before gotLength()"
+        if self.content.tell() + len(data) > self._max_request_body_size:
+            logger.warning(
+                "Aborting connection from %s because the request exceeds maximum size",
+                self.client,
+            )
+            self.transport.abortConnection()
+            return
+        super().handleContentChunk(data)
+
     @property
     def requester(self) -> Optional[Union[Requester, str]]:
         return self._requester
@@ -485,29 +500,55 @@ class _XForwardedForAddress:
 
 class SynapseSite(Site):
     """
-    Subclass of a twisted http Site that does access logging with python's
-    standard logging
+    Synapse-specific twisted http Site
+
+    This does two main things.
+
+    First, it replaces the requestFactory in use so that we build SynapseRequests
+    instead of regular t.w.server.Requests. All of the  constructor params are really
+    just parameters for SynapseRequest.
+
+    Second, it inhibits the log() method called by Request.finish, since SynapseRequest
+    does its own logging.
     """
 
     def __init__(
         self,
-        logger_name,
-        site_tag,
+        logger_name: str,
+        site_tag: str,
         config: ListenerConfig,
-        resource,
+        resource: IResource,
         server_version_string,
-        *args,
-        **kwargs,
+        max_request_body_size: int,
+        reactor: IReactorTime,
     ):
-        Site.__init__(self, resource, *args, **kwargs)
+        """
+
+        Args:
+            logger_name:  The name of the logger to use for access logs.
+            site_tag:  A tag to use for this site - mostly in access logs.
+            config:  Configuration for the HTTP listener corresponding to this site
+            resource:  The base of the resource tree to be used for serving requests on
+                this site
+            server_version_string: A string to present for the Server header
+            max_request_body_size: Maximum request body length to allow before
+                dropping the connection
+            reactor: reactor to be used to manage connection timeouts
+        """
+        Site.__init__(self, resource, reactor=reactor)
 
         self.site_tag = site_tag
 
         assert config.http_options is not None
         proxied = config.http_options.x_forwarded
-        self.requestFactory = (
-            XForwardedForRequest if proxied else SynapseRequest
-        )  # type: Type[Request]
+        request_class = XForwardedForRequest if proxied else SynapseRequest
+
+        def request_factory(channel, queued) -> Request:
+            return request_class(
+                channel, max_request_body_size=max_request_body_size, queued=queued
+            )
+
+        self.requestFactory = request_factory  # type: ignore
         self.access_logger = logging.getLogger(logger_name)
         self.server_version_string = server_version_string.encode("ascii")
 
diff --git a/synapse/logging/_remote.py b/synapse/logging/_remote.py
index 4e8b0f8d10..c515690b38 100644
--- a/synapse/logging/_remote.py
+++ b/synapse/logging/_remote.py
@@ -226,11 +226,11 @@ class RemoteHandler(logging.Handler):
         old_buffer = self._buffer
         self._buffer = deque()
 
-        for i in range(buffer_split):
+        for _ in range(buffer_split):
             self._buffer.append(old_buffer.popleft())
 
         end_buffer = []
-        for i in range(buffer_split):
+        for _ in range(buffer_split):
             end_buffer.append(old_buffer.pop())
 
         self._buffer.extend(reversed(end_buffer))
diff --git a/synapse/notifier.py b/synapse/notifier.py
index d5ab77058d..b9531007e2 100644
--- a/synapse/notifier.py
+++ b/synapse/notifier.py
@@ -17,6 +17,7 @@ from collections import namedtuple
 from typing import (
     Awaitable,
     Callable,
+    Collection,
     Dict,
     Iterable,
     List,
@@ -42,13 +43,7 @@ from synapse.logging.opentracing import log_kv, start_active_span
 from synapse.logging.utils import log_function
 from synapse.metrics import LaterGauge
 from synapse.streams.config import PaginationConfig
-from synapse.types import (
-    Collection,
-    PersistedEventPosition,
-    RoomStreamToken,
-    StreamToken,
-    UserID,
-)
+from synapse.types import PersistedEventPosition, RoomStreamToken, StreamToken, UserID
 from synapse.util.async_helpers import ObservableDeferred, timeout_deferred
 from synapse.util.metrics import Measure
 from synapse.visibility import filter_events_for_client
diff --git a/synapse/push/bulk_push_rule_evaluator.py b/synapse/push/bulk_push_rule_evaluator.py
index 50b470c310..350646f458 100644
--- a/synapse/push/bulk_push_rule_evaluator.py
+++ b/synapse/push/bulk_push_rule_evaluator.py
@@ -106,6 +106,10 @@ class BulkPushRuleEvaluator:
         self.store = hs.get_datastore()
         self.auth = hs.get_auth()
 
+        # Used by `RulesForRoom` to ensure only one thing mutates the cache at a
+        # time. Keyed off room_id.
+        self._rules_linearizer = Linearizer(name="rules_for_room")
+
         self.room_push_rule_cache_metrics = register_cache(
             "cache",
             "room_push_rule_cache",
@@ -123,7 +127,16 @@ class BulkPushRuleEvaluator:
             dict of user_id -> push_rules
         """
         room_id = event.room_id
-        rules_for_room = self._get_rules_for_room(room_id)
+
+        rules_for_room_data = self._get_rules_for_room(room_id)
+        rules_for_room = RulesForRoom(
+            hs=self.hs,
+            room_id=room_id,
+            rules_for_room_cache=self._get_rules_for_room.cache,
+            room_push_rule_cache_metrics=self.room_push_rule_cache_metrics,
+            linearizer=self._rules_linearizer,
+            cached_data=rules_for_room_data,
+        )
 
         rules_by_user = await rules_for_room.get_rules(event, context)
 
@@ -142,17 +155,12 @@ class BulkPushRuleEvaluator:
         return rules_by_user
 
     @lru_cache()
-    def _get_rules_for_room(self, room_id: str) -> "RulesForRoom":
-        """Get the current RulesForRoom object for the given room id"""
-        # It's important that RulesForRoom gets added to self._get_rules_for_room.cache
+    def _get_rules_for_room(self, room_id: str) -> "RulesForRoomData":
+        """Get the current RulesForRoomData object for the given room id"""
+        # It's important that the RulesForRoomData object gets added to self._get_rules_for_room.cache
         # before any lookup methods get called on it as otherwise there may be
         # a race if invalidate_all gets called (which assumes its in the cache)
-        return RulesForRoom(
-            self.hs,
-            room_id,
-            self._get_rules_for_room.cache,
-            self.room_push_rule_cache_metrics,
-        )
+        return RulesForRoomData()
 
     async def _get_power_levels_and_sender_level(
         self, event: EventBase, context: EventContext
@@ -282,11 +290,49 @@ def _condition_checker(
     return True
 
 
+@attr.s(slots=True)
+class RulesForRoomData:
+    """The data stored in the cache by `RulesForRoom`.
+
+    We don't store `RulesForRoom` directly in the cache as we want our caches to
+    *only* include data, and not references to e.g. the data stores.
+    """
+
+    # event_id -> (user_id, state)
+    member_map = attr.ib(type=Dict[str, Tuple[str, str]], factory=dict)
+    # user_id -> rules
+    rules_by_user = attr.ib(type=Dict[str, List[Dict[str, dict]]], factory=dict)
+
+    # The last state group we updated the caches for. If the state_group of
+    # a new event comes along, we know that we can just return the cached
+    # result.
+    # On invalidation of the rules themselves (if the user changes them),
+    # we invalidate everything and set state_group to `object()`
+    state_group = attr.ib(type=Union[object, int], factory=object)
+
+    # A sequence number to keep track of when we're allowed to update the
+    # cache. We bump the sequence number when we invalidate the cache. If
+    # the sequence number changes while we're calculating stuff we should
+    # not update the cache with it.
+    sequence = attr.ib(type=int, default=0)
+
+    # A cache of user_ids that we *know* aren't interesting, e.g. user_ids
+    # owned by AS's, or remote users, etc. (I.e. users we will never need to
+    # calculate push for)
+    # These never need to be invalidated as we will never set up push for
+    # them.
+    uninteresting_user_set = attr.ib(type=Set[str], factory=set)
+
+
 class RulesForRoom:
     """Caches push rules for users in a room.
 
     This efficiently handles users joining/leaving the room by not invalidating
     the entire cache for the room.
+
+    A new instance is constructed for each call to
+    `BulkPushRuleEvaluator._get_rules_for_event`, with the cached data from
+    previous calls passed in.
     """
 
     def __init__(
@@ -295,6 +341,8 @@ class RulesForRoom:
         room_id: str,
         rules_for_room_cache: LruCache,
         room_push_rule_cache_metrics: CacheMetric,
+        linearizer: Linearizer,
+        cached_data: RulesForRoomData,
     ):
         """
         Args:
@@ -303,38 +351,21 @@ class RulesForRoom:
             rules_for_room_cache: The cache object that caches these
                 RoomsForUser objects.
             room_push_rule_cache_metrics: The metrics object
+            linearizer: The linearizer used to ensure only one thing mutates
+                the cache at a time. Keyed off room_id
+            cached_data: Cached data from previous calls to `self.get_rules`,
+                can be mutated.
         """
         self.room_id = room_id
         self.is_mine_id = hs.is_mine_id
         self.store = hs.get_datastore()
         self.room_push_rule_cache_metrics = room_push_rule_cache_metrics
 
-        self.linearizer = Linearizer(name="rules_for_room")
-
-        # event_id -> (user_id, state)
-        self.member_map = {}  # type: Dict[str, Tuple[str, str]]
-        # user_id -> rules
-        self.rules_by_user = {}  # type: Dict[str, List[Dict[str, dict]]]
-
-        # The last state group we updated the caches for. If the state_group of
-        # a new event comes along, we know that we can just return the cached
-        # result.
-        # On invalidation of the rules themselves (if the user changes them),
-        # we invalidate everything and set state_group to `object()`
-        self.state_group = object()
-
-        # A sequence number to keep track of when we're allowed to update the
-        # cache. We bump the sequence number when we invalidate the cache. If
-        # the sequence number changes while we're calculating stuff we should
-        # not update the cache with it.
-        self.sequence = 0
-
-        # A cache of user_ids that we *know* aren't interesting, e.g. user_ids
-        # owned by AS's, or remote users, etc. (I.e. users we will never need to
-        # calculate push for)
-        # These never need to be invalidated as we will never set up push for
-        # them.
-        self.uninteresting_user_set = set()  # type: Set[str]
+        # Used to ensure only one thing mutates the cache at a time. Keyed off
+        # room_id.
+        self.linearizer = linearizer
+
+        self.data = cached_data
 
         # We need to be clever on the invalidating caches callbacks, as
         # otherwise the invalidation callback holds a reference to the object,
@@ -352,25 +383,25 @@ class RulesForRoom:
         """
         state_group = context.state_group
 
-        if state_group and self.state_group == state_group:
+        if state_group and self.data.state_group == state_group:
             logger.debug("Using cached rules for %r", self.room_id)
             self.room_push_rule_cache_metrics.inc_hits()
-            return self.rules_by_user
+            return self.data.rules_by_user
 
-        with (await self.linearizer.queue(())):
-            if state_group and self.state_group == state_group:
+        with (await self.linearizer.queue(self.room_id)):
+            if state_group and self.data.state_group == state_group:
                 logger.debug("Using cached rules for %r", self.room_id)
                 self.room_push_rule_cache_metrics.inc_hits()
-                return self.rules_by_user
+                return self.data.rules_by_user
 
             self.room_push_rule_cache_metrics.inc_misses()
 
             ret_rules_by_user = {}
             missing_member_event_ids = {}
-            if state_group and self.state_group == context.prev_group:
+            if state_group and self.data.state_group == context.prev_group:
                 # If we have a simple delta then we can reuse most of the previous
                 # results.
-                ret_rules_by_user = self.rules_by_user
+                ret_rules_by_user = self.data.rules_by_user
                 current_state_ids = context.delta_ids
 
                 push_rules_delta_state_cache_metric.inc_hits()
@@ -393,24 +424,24 @@ class RulesForRoom:
                 if typ != EventTypes.Member:
                     continue
 
-                if user_id in self.uninteresting_user_set:
+                if user_id in self.data.uninteresting_user_set:
                     continue
 
                 if not self.is_mine_id(user_id):
-                    self.uninteresting_user_set.add(user_id)
+                    self.data.uninteresting_user_set.add(user_id)
                     continue
 
                 if self.store.get_if_app_services_interested_in_user(user_id):
-                    self.uninteresting_user_set.add(user_id)
+                    self.data.uninteresting_user_set.add(user_id)
                     continue
 
                 event_id = current_state_ids[key]
 
-                res = self.member_map.get(event_id, None)
+                res = self.data.member_map.get(event_id, None)
                 if res:
                     user_id, state = res
                     if state == Membership.JOIN:
-                        rules = self.rules_by_user.get(user_id, None)
+                        rules = self.data.rules_by_user.get(user_id, None)
                         if rules:
                             ret_rules_by_user[user_id] = rules
                     continue
@@ -430,7 +461,7 @@ class RulesForRoom:
             else:
                 # The push rules didn't change but lets update the cache anyway
                 self.update_cache(
-                    self.sequence,
+                    self.data.sequence,
                     members={},  # There were no membership changes
                     rules_by_user=ret_rules_by_user,
                     state_group=state_group,
@@ -461,7 +492,7 @@ class RulesForRoom:
                 for. Used when updating the cache.
             event: The event we are currently computing push rules for.
         """
-        sequence = self.sequence
+        sequence = self.data.sequence
 
         rows = await self.store.get_membership_from_event_ids(member_event_ids.values())
 
@@ -501,23 +532,11 @@ class RulesForRoom:
 
         self.update_cache(sequence, members, ret_rules_by_user, state_group)
 
-    def invalidate_all(self) -> None:
-        # Note: Don't hand this function directly to an invalidation callback
-        # as it keeps a reference to self and will stop this instance from being
-        # GC'd if it gets dropped from the rules_to_user cache. Instead use
-        # `self.invalidate_all_cb`
-        logger.debug("Invalidating RulesForRoom for %r", self.room_id)
-        self.sequence += 1
-        self.state_group = object()
-        self.member_map = {}
-        self.rules_by_user = {}
-        push_rules_invalidation_counter.inc()
-
     def update_cache(self, sequence, members, rules_by_user, state_group) -> None:
-        if sequence == self.sequence:
-            self.member_map.update(members)
-            self.rules_by_user = rules_by_user
-            self.state_group = state_group
+        if sequence == self.data.sequence:
+            self.data.member_map.update(members)
+            self.data.rules_by_user = rules_by_user
+            self.data.state_group = state_group
 
 
 @attr.attrs(slots=True, frozen=True)
@@ -535,6 +554,10 @@ class _Invalidation:
     room_id = attr.ib(type=str)
 
     def __call__(self) -> None:
-        rules = self.cache.get(self.room_id, None, update_metrics=False)
-        if rules:
-            rules.invalidate_all()
+        rules_data = self.cache.get(self.room_id, None, update_metrics=False)
+        if rules_data:
+            rules_data.sequence += 1
+            rules_data.state_group = object()
+            rules_data.member_map = {}
+            rules_data.rules_by_user = {}
+            push_rules_invalidation_counter.inc()
diff --git a/synapse/push/emailpusher.py b/synapse/push/emailpusher.py
index cd89b54305..99a18874d1 100644
--- a/synapse/push/emailpusher.py
+++ b/synapse/push/emailpusher.py
@@ -19,8 +19,9 @@ from twisted.internet.error import AlreadyCalled, AlreadyCancelled
 from twisted.internet.interfaces import IDelayedCall
 
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.push import Pusher, PusherConfig, ThrottleParams
+from synapse.push import Pusher, PusherConfig, PusherConfigException, ThrottleParams
 from synapse.push.mailer import Mailer
+from synapse.util.threepids import validate_email
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -71,6 +72,12 @@ class EmailPusher(Pusher):
 
         self._is_processing = False
 
+        # Make sure that the email is valid.
+        try:
+            validate_email(self.email)
+        except ValueError:
+            raise PusherConfigException("Invalid email")
+
     def on_started(self, should_check_for_notifs: bool) -> None:
         """Called when this pusher has been started.
 
diff --git a/synapse/push/pusherpool.py b/synapse/push/pusherpool.py
index 564a5ed0df..579fcdf472 100644
--- a/synapse/push/pusherpool.py
+++ b/synapse/push/pusherpool.py
@@ -62,7 +62,9 @@ class PusherPool:
         self.store = self.hs.get_datastore()
         self.clock = self.hs.get_clock()
 
-        self._account_validity = hs.config.account_validity
+        self._account_validity_enabled = (
+            hs.config.account_validity.account_validity_enabled
+        )
 
         # We shard the handling of push notifications by user ID.
         self._pusher_shard_config = hs.config.push.pusher_shard_config
@@ -236,7 +238,7 @@ class PusherPool:
 
             for u in users_affected:
                 # Don't push if the user account has expired
-                if self._account_validity.enabled:
+                if self._account_validity_enabled:
                     expired = await self.store.is_account_expired(
                         u, self.clock.time_msec()
                     )
@@ -266,7 +268,7 @@ class PusherPool:
 
             for u in users_affected:
                 # Don't push if the user account has expired
-                if self._account_validity.enabled:
+                if self._account_validity_enabled:
                     expired = await self.store.is_account_expired(
                         u, self.clock.time_msec()
                     )
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 2a1c925ee8..2de946f464 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -85,7 +85,7 @@ REQUIREMENTS = [
     "typing-extensions>=3.7.4",
     # We enforce that we have a `cryptography` version that bundles an `openssl`
     # with the latest security patches.
-    "cryptography>=3.4.7;python_version>='3.6'",
+    "cryptography>=3.4.7",
 ]
 
 CONDITIONAL_REQUIREMENTS = {
@@ -100,14 +100,9 @@ CONDITIONAL_REQUIREMENTS = {
     # that use the protocol, such as Let's Encrypt.
     "acme": [
         "txacme>=0.9.2",
-        # txacme depends on eliot. Eliot 1.8.0 is incompatible with
-        # python 3.5.2, as per https://github.com/itamarst/eliot/issues/418
-        "eliot<1.8.0;python_version<'3.5.3'",
     ],
     "saml2": [
-        # pysaml2 6.4.0 is incompatible with Python 3.5 (see https://github.com/IdentityPython/pysaml2/issues/749)
-        "pysaml2>=4.5.0,<6.4.0;python_version<'3.6'",
-        "pysaml2>=4.5.0;python_version>='3.6'",
+        "pysaml2>=4.5.0",
     ],
     "oidc": ["authlib>=0.14.0"],
     # systemd-python is necessary for logging to the systemd journal via
diff --git a/synapse/replication/http/_base.py b/synapse/replication/http/_base.py
index ece03467b5..5685cf2121 100644
--- a/synapse/replication/http/_base.py
+++ b/synapse/replication/http/_base.py
@@ -158,7 +158,10 @@ class ReplicationEndpoint(metaclass=abc.ABCMeta):
     def make_client(cls, hs):
         """Create a client that makes requests.
 
-        Returns a callable that accepts the same parameters as `_serialize_payload`.
+        Returns a callable that accepts the same parameters as
+        `_serialize_payload`, and also accepts an optional `instance_name`
+        parameter to specify which instance to hit (the instance must be in
+        the `instance_map` config).
         """
         clock = hs.get_clock()
         client = hs.get_simple_http_client()
diff --git a/synapse/replication/slave/storage/presence.py b/synapse/replication/slave/storage/presence.py
deleted file mode 100644
index 57327d910d..0000000000
--- a/synapse/replication/slave/storage/presence.py
+++ /dev/null
@@ -1,50 +0,0 @@
-# Copyright 2016 OpenMarket Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from synapse.replication.tcp.streams import PresenceStream
-from synapse.storage import DataStore
-from synapse.storage.database import DatabasePool
-from synapse.storage.databases.main.presence import PresenceStore
-from synapse.util.caches.stream_change_cache import StreamChangeCache
-
-from ._base import BaseSlavedStore
-from ._slaved_id_tracker import SlavedIdTracker
-
-
-class SlavedPresenceStore(BaseSlavedStore):
-    def __init__(self, database: DatabasePool, db_conn, hs):
-        super().__init__(database, db_conn, hs)
-        self._presence_id_gen = SlavedIdTracker(db_conn, "presence_stream", "stream_id")
-
-        self._presence_on_startup = self._get_active_presence(db_conn)  # type: ignore
-
-        self.presence_stream_cache = StreamChangeCache(
-            "PresenceStreamChangeCache", self._presence_id_gen.get_current_token()
-        )
-
-    _get_active_presence = DataStore._get_active_presence
-    take_presence_startup_info = DataStore.take_presence_startup_info
-    _get_presence_for_user = PresenceStore.__dict__["_get_presence_for_user"]
-    get_presence_for_users = PresenceStore.__dict__["get_presence_for_users"]
-
-    def get_current_presence_token(self):
-        return self._presence_id_gen.get_current_token()
-
-    def process_replication_rows(self, stream_name, instance_name, token, rows):
-        if stream_name == PresenceStream.NAME:
-            self._presence_id_gen.advance(instance_name, token)
-            for row in rows:
-                self.presence_stream_cache.entity_has_changed(row.user_id, token)
-                self._get_presence_for_user.invalidate((row.user_id,))
-        return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index ce5d651cb8..4f3c6a18b6 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -29,7 +29,6 @@ from synapse.replication.tcp.streams import (
     AccountDataStream,
     DeviceListsStream,
     GroupServerStream,
-    PresenceStream,
     PushersStream,
     PushRulesStream,
     ReceiptsStream,
@@ -191,8 +190,6 @@ class ReplicationDataHandler:
                     self.stop_pusher(row.user_id, row.app_id, row.pushkey)
                 else:
                     await self.start_pusher(row.user_id, row.app_id, row.pushkey)
-        elif stream_name == PresenceStream.NAME:
-            await self._presence_handler.process_replication_rows(token, rows)
         elif stream_name == EventsStream.NAME:
             # We shouldn't get multiple rows per token for events stream, so
             # we don't need to optimise this for multiple rows.
@@ -221,6 +218,10 @@ class ReplicationDataHandler:
                     membership=row.data.membership,
                 )
 
+        await self._presence_handler.process_replication_rows(
+            stream_name, instance_name, token, rows
+        )
+
         # Notify any waiting deferreds. The list is ordered by position so we
         # just iterate through the list until we reach a position that is
         # greater than the received row position.
diff --git a/synapse/replication/tcp/handler.py b/synapse/replication/tcp/handler.py
index 2ce1b9f222..7ced4c543c 100644
--- a/synapse/replication/tcp/handler.py
+++ b/synapse/replication/tcp/handler.py
@@ -55,6 +55,8 @@ from synapse.replication.tcp.streams import (
     CachesStream,
     EventsStream,
     FederationStream,
+    PresenceFederationStream,
+    PresenceStream,
     ReceiptsStream,
     Stream,
     TagAccountDataStream,
@@ -99,6 +101,10 @@ class ReplicationCommandHandler:
         self._instance_id = hs.get_instance_id()
         self._instance_name = hs.get_instance_name()
 
+        self._is_presence_writer = (
+            hs.get_instance_name() in hs.config.worker.writers.presence
+        )
+
         self._streams = {
             stream.NAME: stream(hs) for stream in STREAMS_MAP.values()
         }  # type: Dict[str, Stream]
@@ -153,6 +159,14 @@ class ReplicationCommandHandler:
 
                 continue
 
+            if isinstance(stream, (PresenceStream, PresenceFederationStream)):
+                # Only add PresenceStream as a source on the instance in charge
+                # of presence.
+                if self._is_presence_writer:
+                    self._streams_to_replicate.append(stream)
+
+                continue
+
             # Only add any other streams if we're on master.
             if hs.config.worker_app is not None:
                 continue
@@ -350,7 +364,7 @@ class ReplicationCommandHandler:
     ) -> Optional[Awaitable[None]]:
         user_sync_counter.inc()
 
-        if self._is_master:
+        if self._is_presence_writer:
             return self._presence_handler.update_external_syncs_row(
                 cmd.instance_id, cmd.user_id, cmd.is_syncing, cmd.last_sync_ms
             )
@@ -360,7 +374,7 @@ class ReplicationCommandHandler:
     def on_CLEAR_USER_SYNC(
         self, conn: IReplicationConnection, cmd: ClearUserSyncsCommand
     ) -> Optional[Awaitable[None]]:
-        if self._is_master:
+        if self._is_presence_writer:
             return self._presence_handler.update_external_syncs_clear(cmd.instance_id)
         else:
             return None
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 6860576e78..6e3705364f 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -49,7 +49,7 @@ import fcntl
 import logging
 import struct
 from inspect import isawaitable
-from typing import TYPE_CHECKING, List, Optional
+from typing import TYPE_CHECKING, Collection, List, Optional
 
 from prometheus_client import Counter
 from zope.interface import Interface, implementer
@@ -76,7 +76,6 @@ from synapse.replication.tcp.commands import (
     ServerCommand,
     parse_command_from_line,
 )
-from synapse.types import Collection
 from synapse.util import Clock
 from synapse.util.stringutils import random_string
 
diff --git a/synapse/replication/tcp/streams/__init__.py b/synapse/replication/tcp/streams/__init__.py
index fb74ac4e98..4c0023c68a 100644
--- a/synapse/replication/tcp/streams/__init__.py
+++ b/synapse/replication/tcp/streams/__init__.py
@@ -30,6 +30,7 @@ from synapse.replication.tcp.streams._base import (
     CachesStream,
     DeviceListsStream,
     GroupServerStream,
+    PresenceFederationStream,
     PresenceStream,
     PublicRoomsStream,
     PushersStream,
@@ -50,6 +51,7 @@ STREAMS_MAP = {
         EventsStream,
         BackfillStream,
         PresenceStream,
+        PresenceFederationStream,
         TypingStream,
         ReceiptsStream,
         PushRulesStream,
@@ -71,6 +73,7 @@ __all__ = [
     "Stream",
     "BackfillStream",
     "PresenceStream",
+    "PresenceFederationStream",
     "TypingStream",
     "ReceiptsStream",
     "PushRulesStream",
diff --git a/synapse/replication/tcp/streams/_base.py b/synapse/replication/tcp/streams/_base.py
index 520c45f151..b03824925a 100644
--- a/synapse/replication/tcp/streams/_base.py
+++ b/synapse/replication/tcp/streams/_base.py
@@ -272,15 +272,22 @@ class PresenceStream(Stream):
     NAME = "presence"
     ROW_TYPE = PresenceStreamRow
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         store = hs.get_datastore()
 
-        if hs.config.worker_app is None:
-            # on the master, query the presence handler
+        if hs.get_instance_name() in hs.config.worker.writers.presence:
+            # on the presence writer, query the presence handler
             presence_handler = hs.get_presence_handler()
-            update_function = presence_handler.get_all_presence_updates
+
+            from synapse.handlers.presence import PresenceHandler
+
+            assert isinstance(presence_handler, PresenceHandler)
+
+            update_function = (
+                presence_handler.get_all_presence_updates
+            )  # type: UpdateFunction
         else:
-            # Query master process
+            # Query presence writer process
             update_function = make_http_update_function(hs, self.NAME)
 
         super().__init__(
@@ -290,6 +297,30 @@ class PresenceStream(Stream):
         )
 
 
+class PresenceFederationStream(Stream):
+    """A stream used to send ad hoc presence updates over federation.
+
+    Streams the remote destination and the user ID of the presence state to
+    send.
+    """
+
+    @attr.s(slots=True, auto_attribs=True)
+    class PresenceFederationStreamRow:
+        destination: str
+        user_id: str
+
+    NAME = "presence_federation"
+    ROW_TYPE = PresenceFederationStreamRow
+
+    def __init__(self, hs: "HomeServer"):
+        federation_queue = hs.get_presence_handler().get_federation_queue()
+        super().__init__(
+            hs.get_instance_name(),
+            federation_queue.get_current_token,
+            federation_queue.get_replication_rows,
+        )
+
+
 class TypingStream(Stream):
     TypingStreamRow = namedtuple(
         "TypingStreamRow", ("room_id", "user_ids")  # str  # list(str)
diff --git a/synapse/res/templates/account_previously_renewed.html b/synapse/res/templates/account_previously_renewed.html
new file mode 100644
index 0000000000..b751359bdf
--- /dev/null
+++ b/synapse/res/templates/account_previously_renewed.html
@@ -0,0 +1 @@
+<html><body>Your account is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</body><html>
diff --git a/synapse/res/templates/account_renewed.html b/synapse/res/templates/account_renewed.html
index 894da030af..e8c0f52f05 100644
--- a/synapse/res/templates/account_renewed.html
+++ b/synapse/res/templates/account_renewed.html
@@ -1 +1 @@
-<html><body>Your account has been successfully renewed.</body><html>
+<html><body>Your account has been successfully renewed and is valid until {{ expiration_ts|format_ts("%d-%m-%Y") }}.</body><html>
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index edda7861fa..8c9d21d3ea 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -14,6 +14,7 @@
 import hashlib
 import hmac
 import logging
+import secrets
 from http import HTTPStatus
 from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 
@@ -375,7 +376,7 @@ class UserRegisterServlet(RestServlet):
         """
         self._clear_old_nonces()
 
-        nonce = self.hs.get_secrets().token_hex(64)
+        nonce = secrets.token_hex(64)
         self.nonces[nonce] = int(self.reactor.seconds())
         return 200, {"nonce": nonce}
 
diff --git a/synapse/rest/client/v1/presence.py b/synapse/rest/client/v1/presence.py
index c232484f29..2b24fe5aa6 100644
--- a/synapse/rest/client/v1/presence.py
+++ b/synapse/rest/client/v1/presence.py
@@ -35,10 +35,15 @@ class PresenceStatusRestServlet(RestServlet):
         self.clock = hs.get_clock()
         self.auth = hs.get_auth()
 
+        self._use_presence = hs.config.server.use_presence
+
     async def on_GET(self, request, user_id):
         requester = await self.auth.get_user_by_req(request)
         user = UserID.from_string(user_id)
 
+        if not self._use_presence:
+            return 200, {"presence": "offline"}
+
         if requester.user != user:
             allowed = await self.presence_handler.is_visible(
                 observed_user=user, observer_user=requester.user
@@ -80,7 +85,7 @@ class PresenceStatusRestServlet(RestServlet):
         except Exception:
             raise SynapseError(400, "Unable to parse state")
 
-        if self.hs.config.use_presence:
+        if self._use_presence:
             await self.presence_handler.set_state(user, state)
 
         return 200, {}
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index 3aad15132d..085561d3e9 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -39,7 +39,7 @@ from synapse.metrics import threepid_send_requests
 from synapse.push.mailer import Mailer
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.stringutils import assert_valid_client_secret, random_string
-from synapse.util.threepids import canonicalise_email, check_3pid_allowed
+from synapse.util.threepids import check_3pid_allowed, validate_email
 
 from ._base import client_patterns, interactive_auth_handler
 
@@ -92,7 +92,7 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
         # Stored in the database "foo@bar.com"
         # User requests with "FOO@bar.com" would raise a Not Found error
         try:
-            email = canonicalise_email(body["email"])
+            email = validate_email(body["email"])
         except ValueError as e:
             raise SynapseError(400, str(e))
         send_attempt = body["send_attempt"]
@@ -247,7 +247,7 @@ class PasswordRestServlet(RestServlet):
                     # We store all email addresses canonicalised in the DB.
                     # (See add_threepid in synapse/handlers/auth.py)
                     try:
-                        threepid["address"] = canonicalise_email(threepid["address"])
+                        threepid["address"] = validate_email(threepid["address"])
                     except ValueError as e:
                         raise SynapseError(400, str(e))
                 # if using email, we must know about the email they're authing with!
@@ -375,7 +375,7 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
         # Otherwise the email will be sent to "FOO@bar.com" and stored as
         # "foo@bar.com" in database.
         try:
-            email = canonicalise_email(body["email"])
+            email = validate_email(body["email"])
         except ValueError as e:
             raise SynapseError(400, str(e))
         send_attempt = body["send_attempt"]
diff --git a/synapse/rest/client/v2_alpha/account_validity.py b/synapse/rest/client/v2_alpha/account_validity.py
index 0ad07fb895..2d1ad3d3fb 100644
--- a/synapse/rest/client/v2_alpha/account_validity.py
+++ b/synapse/rest/client/v2_alpha/account_validity.py
@@ -36,24 +36,40 @@ class AccountValidityRenewServlet(RestServlet):
         self.hs = hs
         self.account_activity_handler = hs.get_account_validity_handler()
         self.auth = hs.get_auth()
-        self.success_html = hs.config.account_validity.account_renewed_html_content
-        self.failure_html = hs.config.account_validity.invalid_token_html_content
+        self.account_renewed_template = (
+            hs.config.account_validity.account_validity_account_renewed_template
+        )
+        self.account_previously_renewed_template = (
+            hs.config.account_validity.account_validity_account_previously_renewed_template
+        )
+        self.invalid_token_template = (
+            hs.config.account_validity.account_validity_invalid_token_template
+        )
 
     async def on_GET(self, request):
         if b"token" not in request.args:
             raise SynapseError(400, "Missing renewal token")
         renewal_token = request.args[b"token"][0]
 
-        token_valid = await self.account_activity_handler.renew_account(
+        (
+            token_valid,
+            token_stale,
+            expiration_ts,
+        ) = await self.account_activity_handler.renew_account(
             renewal_token.decode("utf8")
         )
 
         if token_valid:
             status_code = 200
-            response = self.success_html
+            response = self.account_renewed_template.render(expiration_ts=expiration_ts)
+        elif token_stale:
+            status_code = 200
+            response = self.account_previously_renewed_template.render(
+                expiration_ts=expiration_ts
+            )
         else:
             status_code = 404
-            response = self.failure_html
+            response = self.invalid_token_template.render(expiration_ts=expiration_ts)
 
         respond_with_html(request, status_code, response)
 
@@ -71,10 +87,12 @@ class AccountValiditySendMailServlet(RestServlet):
         self.hs = hs
         self.account_activity_handler = hs.get_account_validity_handler()
         self.auth = hs.get_auth()
-        self.account_validity = self.hs.config.account_validity
+        self.account_validity_renew_by_email_enabled = (
+            hs.config.account_validity.account_validity_renew_by_email_enabled
+        )
 
     async def on_POST(self, request):
-        if not self.account_validity.renew_by_email_enabled:
+        if not self.account_validity_renew_by_email_enabled:
             raise AuthError(
                 403, "Account renewal via email is disabled on this server."
             )
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index b26aad7b34..a30a5df1b1 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -30,7 +30,7 @@ from synapse.api.errors import (
 )
 from synapse.config import ConfigError
 from synapse.config.captcha import CaptchaConfig
-from synapse.config.consent_config import ConsentConfig
+from synapse.config.consent import ConsentConfig
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.config.ratelimiting import FederationRateLimitConfig
 from synapse.config.registration import RegistrationConfig
@@ -49,7 +49,11 @@ from synapse.push.mailer import Mailer
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.ratelimitutils import FederationRateLimiter
 from synapse.util.stringutils import assert_valid_client_secret, random_string
-from synapse.util.threepids import canonicalise_email, check_3pid_allowed
+from synapse.util.threepids import (
+    canonicalise_email,
+    check_3pid_allowed,
+    validate_email,
+)
 
 from ._base import client_patterns, interactive_auth_handler
 
@@ -111,7 +115,7 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
         # (See on_POST in EmailThreepidRequestTokenRestServlet
         # in synapse/rest/client/v2_alpha/account.py)
         try:
-            email = canonicalise_email(body["email"])
+            email = validate_email(body["email"])
         except ValueError as e:
             raise SynapseError(400, str(e))
         send_attempt = body["send_attempt"]
diff --git a/synapse/rest/consent/consent_resource.py b/synapse/rest/consent/consent_resource.py
index c4550d3cf0..b19cd8afc5 100644
--- a/synapse/rest/consent/consent_resource.py
+++ b/synapse/rest/consent/consent_resource.py
@@ -32,14 +32,6 @@ TEMPLATE_LANGUAGE = "en"
 
 logger = logging.getLogger(__name__)
 
-# use hmac.compare_digest if we have it (python 2.7.7), else just use equality
-if hasattr(hmac, "compare_digest"):
-    compare_digest = hmac.compare_digest
-else:
-
-    def compare_digest(a, b):
-        return a == b
-
 
 class ConsentResource(DirectServeHtmlResource):
     """A twisted Resource to display a privacy policy and gather consent to it
@@ -209,5 +201,5 @@ class ConsentResource(DirectServeHtmlResource):
             .encode("ascii")
         )
 
-        if not compare_digest(want_mac, userhmac):
+        if not hmac.compare_digest(want_mac, userhmac):
             raise SynapseError(HTTPStatus.FORBIDDEN, "HMAC incorrect")
diff --git a/synapse/rest/key/v2/remote_key_resource.py b/synapse/rest/key/v2/remote_key_resource.py
index c57ac22e58..f648678b09 100644
--- a/synapse/rest/key/v2/remote_key_resource.py
+++ b/synapse/rest/key/v2/remote_key_resource.py
@@ -144,7 +144,7 @@ class RemoteKey(DirectServeJsonResource):
 
         # Note that the value is unused.
         cache_misses = {}  # type: Dict[str, Dict[str, int]]
-        for (server_name, key_id, from_server), results in cached.items():
+        for (server_name, key_id, _), results in cached.items():
             results = [(result["ts_added_ms"], result) for result in results]
 
             if not results and key_id is not None:
@@ -206,7 +206,7 @@ class RemoteKey(DirectServeJsonResource):
                 # Cast to bytes since postgresql returns a memoryview.
                 json_results.add(bytes(most_recent_result["key_json"]))
             else:
-                for ts_added, result in results:
+                for _, result in results:
                     # Cast to bytes since postgresql returns a memoryview.
                     json_results.add(bytes(result["key_json"]))
 
diff --git a/synapse/rest/media/v1/filepath.py b/synapse/rest/media/v1/filepath.py
index 4088e7a059..09531ebf54 100644
--- a/synapse/rest/media/v1/filepath.py
+++ b/synapse/rest/media/v1/filepath.py
@@ -21,7 +21,7 @@ from typing import Callable, List
 NEW_FORMAT_ID_RE = re.compile(r"^\d\d\d\d-\d\d-\d\d")
 
 
-def _wrap_in_base_path(func: "Callable[..., str]") -> "Callable[..., str]":
+def _wrap_in_base_path(func: Callable[..., str]) -> Callable[..., str]:
     """Takes a function that returns a relative path and turns it into an
     absolute path based on the location of the primary media store
     """
diff --git a/synapse/rest/media/v1/upload_resource.py b/synapse/rest/media/v1/upload_resource.py
index 80f017a4dd..024a105bf2 100644
--- a/synapse/rest/media/v1/upload_resource.py
+++ b/synapse/rest/media/v1/upload_resource.py
@@ -51,8 +51,6 @@ class UploadResource(DirectServeJsonResource):
 
     async def _async_render_POST(self, request: SynapseRequest) -> None:
         requester = await self.auth.get_user_by_req(request)
-        # TODO: The checks here are a bit late. The content will have
-        # already been uploaded to a tmp file at this point
         content_length = request.getHeader("Content-Length")
         if content_length is None:
             raise SynapseError(msg="Request must specify a Content-Length", code=400)
diff --git a/synapse/rest/synapse/client/new_user_consent.py b/synapse/rest/synapse/client/new_user_consent.py
index e5634f9679..488b97b32e 100644
--- a/synapse/rest/synapse/client/new_user_consent.py
+++ b/synapse/rest/synapse/client/new_user_consent.py
@@ -61,6 +61,15 @@ class NewUserConsentResource(DirectServeHtmlResource):
             self._sso_handler.render_error(request, "bad_session", e.msg, code=e.code)
             return
 
+        # It should be impossible to get here without having first been through
+        # the pick-a-username step, which ensures chosen_localpart gets set.
+        if not session.chosen_localpart:
+            logger.warning("Session has no user name selected")
+            self._sso_handler.render_error(
+                request, "no_user", "No user name has been selected.", code=400
+            )
+            return
+
         user_id = UserID(session.chosen_localpart, self._server_name)
         user_profile = {
             "display_name": session.display_name,
diff --git a/synapse/secrets.py b/synapse/secrets.py
deleted file mode 100644
index bf829251fd..0000000000
--- a/synapse/secrets.py
+++ /dev/null
@@ -1,44 +0,0 @@
-# Copyright 2018 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-"""
-Injectable secrets module for Synapse.
-
-See https://docs.python.org/3/library/secrets.html#module-secrets for the API
-used in Python 3.6, and the API emulated in Python 2.7.
-"""
-import sys
-
-# secrets is available since python 3.6
-if sys.version_info[0:2] >= (3, 6):
-    import secrets
-
-    class Secrets:
-        def token_bytes(self, nbytes: int = 32) -> bytes:
-            return secrets.token_bytes(nbytes)
-
-        def token_hex(self, nbytes: int = 32) -> str:
-            return secrets.token_hex(nbytes)
-
-
-else:
-    import binascii
-    import os
-
-    class Secrets:
-        def token_bytes(self, nbytes: int = 32) -> bytes:
-            return os.urandom(nbytes)
-
-        def token_hex(self, nbytes: int = 32) -> str:
-            return binascii.hexlify(self.token_bytes(nbytes)).decode("ascii")
diff --git a/synapse/server.py b/synapse/server.py
index 42d2fad8e8..2337d2d9b4 100644
--- a/synapse/server.py
+++ b/synapse/server.py
@@ -70,13 +70,14 @@ from synapse.handlers.acme import AcmeHandler
 from synapse.handlers.admin import AdminHandler
 from synapse.handlers.appservice import ApplicationServicesHandler
 from synapse.handlers.auth import AuthHandler, MacaroonGenerator
-from synapse.handlers.cas_handler import CasHandler
+from synapse.handlers.cas import CasHandler
 from synapse.handlers.deactivate_account import DeactivateAccountHandler
 from synapse.handlers.device import DeviceHandler, DeviceWorkerHandler
 from synapse.handlers.devicemessage import DeviceMessageHandler
 from synapse.handlers.directory import DirectoryHandler
 from synapse.handlers.e2e_keys import E2eKeysHandler
 from synapse.handlers.e2e_room_keys import E2eRoomKeysHandler
+from synapse.handlers.event_auth import EventAuthHandler
 from synapse.handlers.events import EventHandler, EventStreamHandler
 from synapse.handlers.federation import FederationHandler
 from synapse.handlers.groups_local import GroupsLocalHandler, GroupsLocalWorkerHandler
@@ -125,7 +126,6 @@ from synapse.rest.media.v1.media_repository import (
     MediaRepository,
     MediaRepositoryResource,
 )
-from synapse.secrets import Secrets
 from synapse.server_notices.server_notices_manager import ServerNoticesManager
 from synapse.server_notices.server_notices_sender import ServerNoticesSender
 from synapse.server_notices.worker_server_notices_sender import (
@@ -145,8 +145,8 @@ logger = logging.getLogger(__name__)
 if TYPE_CHECKING:
     from txredisapi import RedisProtocol
 
-    from synapse.handlers.oidc_handler import OidcHandler
-    from synapse.handlers.saml_handler import SamlHandler
+    from synapse.handlers.oidc import OidcHandler
+    from synapse.handlers.saml import SamlHandler
 
 
 T = TypeVar("T", bound=Callable[..., Any])
@@ -286,6 +286,14 @@ class HomeServer(metaclass=abc.ABCMeta):
         if self.config.run_background_tasks:
             self.setup_background_tasks()
 
+    def start_listening(self) -> None:
+        """Start the HTTP, manhole, metrics, etc listeners
+
+        Does nothing in this base class; overridden in derived classes to start the
+        appropriate listeners.
+        """
+        pass
+
     def setup_background_tasks(self) -> None:
         """
         Some handlers have side effects on instantiation (like registering
@@ -417,10 +425,10 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_presence_handler(self) -> BasePresenceHandler:
-        if self.config.worker_app:
-            return WorkerPresenceHandler(self)
-        else:
+        if self.get_instance_name() in self.config.worker.writers.presence:
             return PresenceHandler(self)
+        else:
+            return WorkerPresenceHandler(self)
 
     @cache_in_self
     def get_typing_writer_handler(self) -> TypingWriterHandler:
@@ -633,10 +641,6 @@ class HomeServer(metaclass=abc.ABCMeta):
         return GroupAttestionRenewer(self)
 
     @cache_in_self
-    def get_secrets(self) -> Secrets:
-        return Secrets()
-
-    @cache_in_self
     def get_stats_handler(self) -> StatsHandler:
         return StatsHandler(self)
 
@@ -696,13 +700,13 @@ class HomeServer(metaclass=abc.ABCMeta):
 
     @cache_in_self
     def get_saml_handler(self) -> "SamlHandler":
-        from synapse.handlers.saml_handler import SamlHandler
+        from synapse.handlers.saml import SamlHandler
 
         return SamlHandler(self)
 
     @cache_in_self
     def get_oidc_handler(self) -> "OidcHandler":
-        from synapse.handlers.oidc_handler import OidcHandler
+        from synapse.handlers.oidc import OidcHandler
 
         return OidcHandler(self)
 
@@ -747,6 +751,10 @@ class HomeServer(metaclass=abc.ABCMeta):
         return SpaceSummaryHandler(self)
 
     @cache_in_self
+    def get_event_auth_handler(self) -> EventAuthHandler:
+        return EventAuthHandler(self)
+
+    @cache_in_self
     def get_external_cache(self) -> ExternalCache:
         return ExternalCache(self)
 
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index c7ee731154..b3bd92d37c 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -19,6 +19,7 @@ from typing import (
     Any,
     Awaitable,
     Callable,
+    Collection,
     DefaultDict,
     Dict,
     FrozenSet,
@@ -46,7 +47,7 @@ from synapse.logging.utils import log_function
 from synapse.state import v1, v2
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.roommember import ProfileInfo
-from synapse.types import Collection, StateMap
+from synapse.types import StateMap
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
 from synapse.util.metrics import Measure, measure_func
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 32671ddbde..008644cd98 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -18,6 +18,7 @@ import logging
 from typing import (
     Any,
     Callable,
+    Collection,
     Dict,
     Generator,
     Iterable,
@@ -37,7 +38,7 @@ from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
-from synapse.types import Collection, MutableStateMap, StateMap
+from synapse.types import MutableStateMap, StateMap
 from synapse.util import Clock
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 56dd3a4861..6b68d8720c 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -16,13 +16,13 @@
 import logging
 import random
 from abc import ABCMeta
-from typing import TYPE_CHECKING, Any, Iterable, Optional, Union
+from typing import TYPE_CHECKING, Any, Collection, Iterable, Optional, Union
 
 from synapse.storage.database import LoggingTransaction  # noqa: F401
 from synapse.storage.database import make_in_list_sql_clause  # noqa: F401
 from synapse.storage.database import DatabasePool
 from synapse.storage.types import Connection
-from synapse.types import Collection, StreamToken, get_domain_from_id
+from synapse.types import StreamToken, get_domain_from_id
 from synapse.util import json_decoder
 
 if TYPE_CHECKING:
@@ -114,7 +114,7 @@ def db_to_json(db_content: Union[memoryview, bytes, bytearray, str]) -> Any:
         db_content = db_content.tobytes()
 
     # Decode it to a Unicode string before feeding it to the JSON decoder, since
-    # Python 3.5 does not support deserializing bytes.
+    # it only supports handling strings
     if isinstance(db_content, (bytes, bytearray)):
         db_content = db_content.decode("utf8")
 
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index 9a6d2b21f9..bd39c095af 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -20,6 +20,7 @@ from time import monotonic as monotonic_time
 from typing import (
     Any,
     Callable,
+    Collection,
     Dict,
     Iterable,
     Iterator,
@@ -48,7 +49,6 @@ from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.background_updates import BackgroundUpdater
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine, Sqlite3Engine
 from synapse.storage.types import Connection, Cursor
-from synapse.types import Collection
 
 # python 3 does not have a maximum int value
 MAX_TXN_ID = 2 ** 63 - 1
@@ -171,10 +171,7 @@ class LoggingDatabaseConnection:
 
 
 # The type of entry which goes on our after_callbacks and exception_callbacks lists.
-#
-# Python 3.5.2 doesn't support Callable with an ellipsis, so we wrap it in quotes so
-# that mypy sees the type but the runtime python doesn't.
-_CallbackListEntry = Tuple["Callable[..., None]", Iterable[Any], Dict[str, Any]]
+_CallbackListEntry = Tuple[Callable[..., None], Iterable[Any], Dict[str, Any]]
 
 
 R = TypeVar("R")
@@ -221,7 +218,7 @@ class LoggingTransaction:
         self.after_callbacks = after_callbacks
         self.exception_callbacks = exception_callbacks
 
-    def call_after(self, callback: "Callable[..., None]", *args: Any, **kwargs: Any):
+    def call_after(self, callback: Callable[..., None], *args: Any, **kwargs: Any):
         """Call the given callback on the main twisted thread after the
         transaction has finished. Used to invalidate the caches on the
         correct thread.
@@ -233,7 +230,7 @@ class LoggingTransaction:
         self.after_callbacks.append((callback, args, kwargs))
 
     def call_on_exception(
-        self, callback: "Callable[..., None]", *args: Any, **kwargs: Any
+        self, callback: Callable[..., None], *args: Any, **kwargs: Any
     ):
         # if self.exception_callbacks is None, that means that whatever constructed the
         # LoggingTransaction isn't expecting there to be any callbacks; assert that
@@ -485,7 +482,7 @@ class DatabasePool:
         desc: str,
         after_callbacks: List[_CallbackListEntry],
         exception_callbacks: List[_CallbackListEntry],
-        func: "Callable[..., R]",
+        func: Callable[..., R],
         *args: Any,
         **kwargs: Any,
     ) -> R:
@@ -618,7 +615,7 @@ class DatabasePool:
     async def runInteraction(
         self,
         desc: str,
-        func: "Callable[..., R]",
+        func: Callable[..., R],
         *args: Any,
         db_autocommit: bool = False,
         **kwargs: Any,
@@ -678,7 +675,7 @@ class DatabasePool:
 
     async def runWithConnection(
         self,
-        func: "Callable[..., R]",
+        func: Callable[..., R],
         *args: Any,
         db_autocommit: bool = False,
         **kwargs: Any,
diff --git a/synapse/storage/databases/main/__init__.py b/synapse/storage/databases/main/__init__.py
index 5c50f5f950..49c7606d51 100644
--- a/synapse/storage/databases/main/__init__.py
+++ b/synapse/storage/databases/main/__init__.py
@@ -17,7 +17,6 @@
 import logging
 from typing import List, Optional, Tuple
 
-from synapse.api.constants import PresenceState
 from synapse.config.homeserver import HomeServerConfig
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.stats import UserSortOrder
@@ -51,7 +50,7 @@ from .media_repository import MediaRepositoryStore
 from .metrics import ServerMetricsStore
 from .monthly_active_users import MonthlyActiveUsersStore
 from .openid import OpenIdStore
-from .presence import PresenceStore, UserPresenceState
+from .presence import PresenceStore
 from .profile import ProfileStore
 from .purge_events import PurgeEventsStore
 from .push_rule import PushRuleStore
@@ -126,9 +125,6 @@ class DataStore(
         self._clock = hs.get_clock()
         self.database_engine = database.engine
 
-        self._presence_id_gen = StreamIdGenerator(
-            db_conn, "presence_stream", "stream_id"
-        )
         self._public_room_id_gen = StreamIdGenerator(
             db_conn, "public_room_list_stream", "stream_id"
         )
@@ -177,21 +173,6 @@ class DataStore(
 
         super().__init__(database, db_conn, hs)
 
-        self._presence_on_startup = self._get_active_presence(db_conn)
-
-        presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
-            db_conn,
-            "presence_stream",
-            entity_column="user_id",
-            stream_column="stream_id",
-            max_value=self._presence_id_gen.get_current_token(),
-        )
-        self.presence_stream_cache = StreamChangeCache(
-            "PresenceStreamChangeCache",
-            min_presence_val,
-            prefilled_cache=presence_cache_prefill,
-        )
-
         device_list_max = self._device_list_id_gen.get_current_token()
         self._device_list_stream_cache = StreamChangeCache(
             "DeviceListStreamChangeCache", device_list_max
@@ -238,32 +219,6 @@ class DataStore(
     def get_device_stream_token(self) -> int:
         return self._device_list_id_gen.get_current_token()
 
-    def take_presence_startup_info(self):
-        active_on_startup = self._presence_on_startup
-        self._presence_on_startup = None
-        return active_on_startup
-
-    def _get_active_presence(self, db_conn):
-        """Fetch non-offline presence from the database so that we can register
-        the appropriate time outs.
-        """
-
-        sql = (
-            "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
-            " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
-            " WHERE state != ?"
-        )
-
-        txn = db_conn.cursor()
-        txn.execute(sql, (PresenceState.OFFLINE,))
-        rows = self.db_pool.cursor_to_dict(txn)
-        txn.close()
-
-        for row in rows:
-            row["currently_active"] = bool(row["currently_active"])
-
-        return [UserPresenceState(**row) for row in rows]
-
     async def get_users(self) -> List[JsonDict]:
         """Function to retrieve a list of users in users table.
 
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index b204875580..c9346de316 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 import abc
 import logging
-from typing import Any, Dict, Iterable, List, Optional, Set, Tuple
+from typing import Any, Collection, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse.api.errors import Codes, StoreError
 from synapse.logging.opentracing import (
@@ -31,7 +31,7 @@ from synapse.storage.database import (
     LoggingTransaction,
     make_tuple_comparison_clause,
 )
-from synapse.types import Collection, JsonDict, get_verify_key_from_cross_signing_key
+from synapse.types import JsonDict, get_verify_key_from_cross_signing_key
 from synapse.util import json_decoder, json_encoder
 from synapse.util.caches.descriptors import cached, cachedList
 from synapse.util.caches.lrucache import LruCache
@@ -717,7 +717,15 @@ class DeviceWorkerStore(SQLBaseStore):
             keyvalues={"user_id": user_id},
             values={},
             insertion_values={"added_ts": self._clock.time_msec()},
-            desc="make_remote_user_device_cache_as_stale",
+            desc="mark_remote_user_device_cache_as_stale",
+        )
+
+    async def mark_remote_user_device_cache_as_valid(self, user_id: str) -> None:
+        # Remove the database entry that says we need to resync devices, after a resync
+        await self.db_pool.simple_delete(
+            table="device_lists_remote_resync",
+            keyvalues={"user_id": user_id},
+            desc="mark_remote_user_device_cache_as_valid",
         )
 
     async def mark_remote_user_device_list_as_unsubscribed(self, user_id: str) -> None:
@@ -1289,15 +1297,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             lock=False,
         )
 
-        # If we're replacing the remote user's device list cache presumably
-        # we've done a full resync, so we remove the entry that says we need
-        # to resync
-        self.db_pool.simple_delete_txn(
-            txn,
-            table="device_lists_remote_resync",
-            keyvalues={"user_id": user_id},
-        )
-
     async def add_device_change_to_streams(
         self, user_id: str, device_ids: Collection[str], hosts: List[str]
     ):
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 32ce70a396..ff81d5cd17 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -14,7 +14,7 @@
 import itertools
 import logging
 from queue import Empty, PriorityQueue
-from typing import Dict, Iterable, List, Set, Tuple
+from typing import Collection, Dict, Iterable, List, Set, Tuple
 
 from synapse.api.errors import StoreError
 from synapse.events import EventBase
@@ -25,7 +25,6 @@ from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.databases.main.signatures import SignatureWorkerStore
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.types import Cursor
-from synapse.types import Collection
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
diff --git a/synapse/storage/databases/main/events.py b/synapse/storage/databases/main/events.py
index a362521e20..fd25c8112d 100644
--- a/synapse/storage/databases/main/events.py
+++ b/synapse/storage/databases/main/events.py
@@ -170,7 +170,7 @@ class PersistEventsStore:
             )
 
         async with stream_ordering_manager as stream_orderings:
-            for (event, context), stream in zip(events_and_contexts, stream_orderings):
+            for (event, _), stream in zip(events_and_contexts, stream_orderings):
                 event.internal_metadata.stream_ordering = stream
 
             await self.db_pool.runInteraction(
@@ -297,7 +297,7 @@ class PersistEventsStore:
                 txn.execute(sql + clause, args)
                 to_recursively_check = []
 
-                for event_id, prev_event_id, metadata, rejected in txn:
+                for _, prev_event_id, metadata, rejected in txn:
                     if prev_event_id in existing_prevs:
                         continue
 
@@ -1127,7 +1127,7 @@ class PersistEventsStore:
     def _update_forward_extremities_txn(
         self, txn, new_forward_extremities, max_stream_order
     ):
-        for room_id, new_extrem in new_forward_extremities.items():
+        for room_id in new_forward_extremities.keys():
             self.db_pool.simple_delete_txn(
                 txn, table="event_forward_extremities", keyvalues={"room_id": room_id}
             )
@@ -1399,7 +1399,7 @@ class PersistEventsStore:
         ]
 
         state_values = []
-        for event, context in state_events_and_contexts:
+        for event, _ in state_events_and_contexts:
             vals = {
                 "event_id": event.event_id,
                 "room_id": event.room_id,
@@ -1468,7 +1468,7 @@ class PersistEventsStore:
             # nothing to do here
             return
 
-        for event, context in events_and_contexts:
+        for event, _ in events_and_contexts:
             if event.type == EventTypes.Redaction and event.redacts is not None:
                 # Remove the entries in the event_push_actions table for the
                 # redacted event.
diff --git a/synapse/storage/databases/main/events_worker.py b/synapse/storage/databases/main/events_worker.py
index 64d70785b8..2c823e09cf 100644
--- a/synapse/storage/databases/main/events_worker.py
+++ b/synapse/storage/databases/main/events_worker.py
@@ -15,7 +15,16 @@
 import logging
 import threading
 from collections import namedtuple
-from typing import Container, Dict, Iterable, List, Optional, Tuple, overload
+from typing import (
+    Collection,
+    Container,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Tuple,
+    overload,
+)
 
 from constantly import NamedConstant, Names
 from typing_extensions import Literal
@@ -45,7 +54,7 @@ from synapse.storage.database import DatabasePool
 from synapse.storage.engines import PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.storage.util.sequence import build_sequence_generator
-from synapse.types import Collection, JsonDict, get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.lrucache import LruCache
 from synapse.util.iterutils import batch_iter
diff --git a/synapse/storage/databases/main/presence.py b/synapse/storage/databases/main/presence.py
index c207d917b1..db22fab23e 100644
--- a/synapse/storage/databases/main/presence.py
+++ b/synapse/storage/databases/main/presence.py
@@ -12,16 +12,69 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-from typing import Dict, List, Tuple
+from typing import TYPE_CHECKING, Dict, List, Tuple
 
-from synapse.api.presence import UserPresenceState
+from synapse.api.presence import PresenceState, UserPresenceState
+from synapse.replication.tcp.streams import PresenceStream
 from synapse.storage._base import SQLBaseStore, make_in_list_sql_clause
+from synapse.storage.database import DatabasePool
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.types import Connection
+from synapse.storage.util.id_generators import MultiWriterIdGenerator, StreamIdGenerator
 from synapse.util.caches.descriptors import cached, cachedList
+from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.iterutils import batch_iter
 
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
 
 class PresenceStore(SQLBaseStore):
+    def __init__(
+        self,
+        database: DatabasePool,
+        db_conn: Connection,
+        hs: "HomeServer",
+    ):
+        super().__init__(database, db_conn, hs)
+
+        self._can_persist_presence = (
+            hs.get_instance_name() in hs.config.worker.writers.presence
+        )
+
+        if isinstance(database.engine, PostgresEngine):
+            self._presence_id_gen = MultiWriterIdGenerator(
+                db_conn=db_conn,
+                db=database,
+                stream_name="presence_stream",
+                instance_name=self._instance_name,
+                tables=[("presence_stream", "instance_name", "stream_id")],
+                sequence_name="presence_stream_sequence",
+                writers=hs.config.worker.writers.to_device,
+            )
+        else:
+            self._presence_id_gen = StreamIdGenerator(
+                db_conn, "presence_stream", "stream_id"
+            )
+
+        self._presence_on_startup = self._get_active_presence(db_conn)
+
+        presence_cache_prefill, min_presence_val = self.db_pool.get_cache_dict(
+            db_conn,
+            "presence_stream",
+            entity_column="user_id",
+            stream_column="stream_id",
+            max_value=self._presence_id_gen.get_current_token(),
+        )
+        self.presence_stream_cache = StreamChangeCache(
+            "PresenceStreamChangeCache",
+            min_presence_val,
+            prefilled_cache=presence_cache_prefill,
+        )
+
     async def update_presence(self, presence_states):
+        assert self._can_persist_presence
+
         stream_ordering_manager = self._presence_id_gen.get_next_mult(
             len(presence_states)
         )
@@ -57,6 +110,7 @@ class PresenceStore(SQLBaseStore):
                     "last_user_sync_ts": state.last_user_sync_ts,
                     "status_msg": state.status_msg,
                     "currently_active": state.currently_active,
+                    "instance_name": self._instance_name,
                 }
                 for stream_id, state in zip(stream_orderings, presence_states)
             ],
@@ -216,3 +270,37 @@ class PresenceStore(SQLBaseStore):
 
     def get_current_presence_token(self):
         return self._presence_id_gen.get_current_token()
+
+    def _get_active_presence(self, db_conn: Connection):
+        """Fetch non-offline presence from the database so that we can register
+        the appropriate time outs.
+        """
+
+        sql = (
+            "SELECT user_id, state, last_active_ts, last_federation_update_ts,"
+            " last_user_sync_ts, status_msg, currently_active FROM presence_stream"
+            " WHERE state != ?"
+        )
+
+        txn = db_conn.cursor()
+        txn.execute(sql, (PresenceState.OFFLINE,))
+        rows = self.db_pool.cursor_to_dict(txn)
+        txn.close()
+
+        for row in rows:
+            row["currently_active"] = bool(row["currently_active"])
+
+        return [UserPresenceState(**row) for row in rows]
+
+    def take_presence_startup_info(self):
+        active_on_startup = self._presence_on_startup
+        self._presence_on_startup = None
+        return active_on_startup
+
+    def process_replication_rows(self, stream_name, instance_name, token, rows):
+        if stream_name == PresenceStream.NAME:
+            self._presence_id_gen.advance(instance_name, token)
+            for row in rows:
+                self.presence_stream_cache.entity_has_changed(row.user_id, token)
+                self._get_presence_for_user.invalidate((row.user_id,))
+        return super().process_replication_rows(stream_name, instance_name, token, rows)
diff --git a/synapse/storage/databases/main/registration.py b/synapse/storage/databases/main/registration.py
index 833214b7e0..6e5ee557d2 100644
--- a/synapse/storage/databases/main/registration.py
+++ b/synapse/storage/databases/main/registration.py
@@ -91,13 +91,25 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
             id_column=None,
         )
 
-        self._account_validity = hs.config.account_validity
-        if hs.config.run_background_tasks and self._account_validity.enabled:
-            self._clock.call_later(
-                0.0,
-                self._set_expiration_date_when_missing,
+        self._account_validity_enabled = (
+            hs.config.account_validity.account_validity_enabled
+        )
+        self._account_validity_period = None
+        self._account_validity_startup_job_max_delta = None
+        if self._account_validity_enabled:
+            self._account_validity_period = (
+                hs.config.account_validity.account_validity_period
+            )
+            self._account_validity_startup_job_max_delta = (
+                hs.config.account_validity.account_validity_startup_job_max_delta
             )
 
+            if hs.config.run_background_tasks:
+                self._clock.call_later(
+                    0.0,
+                    self._set_expiration_date_when_missing,
+                )
+
         # Create a background job for culling expired 3PID validity tokens
         if hs.config.run_background_tasks:
             self._clock.looping_call(
@@ -194,6 +206,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         expiration_ts: int,
         email_sent: bool,
         renewal_token: Optional[str] = None,
+        token_used_ts: Optional[int] = None,
     ) -> None:
         """Updates the account validity properties of the given account, with the
         given values.
@@ -207,6 +220,8 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 period.
             renewal_token: Renewal token the user can use to extend the validity
                 of their account. Defaults to no token.
+            token_used_ts: A timestamp of when the current token was used to renew
+                the account.
         """
 
         def set_account_validity_for_user_txn(txn):
@@ -218,6 +233,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                     "expiration_ts_ms": expiration_ts,
                     "email_sent": email_sent,
                     "renewal_token": renewal_token,
+                    "token_used_ts_ms": token_used_ts,
                 },
             )
             self._invalidate_cache_and_stream(
@@ -231,7 +247,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
     async def set_renewal_token_for_user(
         self, user_id: str, renewal_token: str
     ) -> None:
-        """Defines a renewal token for a given user.
+        """Defines a renewal token for a given user, and clears the token_used timestamp.
 
         Args:
             user_id: ID of the user to set the renewal token for.
@@ -244,26 +260,40 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
         await self.db_pool.simple_update_one(
             table="account_validity",
             keyvalues={"user_id": user_id},
-            updatevalues={"renewal_token": renewal_token},
+            updatevalues={"renewal_token": renewal_token, "token_used_ts_ms": None},
             desc="set_renewal_token_for_user",
         )
 
-    async def get_user_from_renewal_token(self, renewal_token: str) -> str:
-        """Get a user ID from a renewal token.
+    async def get_user_from_renewal_token(
+        self, renewal_token: str
+    ) -> Tuple[str, int, Optional[int]]:
+        """Get a user ID and renewal status from a renewal token.
 
         Args:
             renewal_token: The renewal token to perform the lookup with.
 
         Returns:
-            The ID of the user to which the token belongs.
+            A tuple of containing the following values:
+                * The ID of a user to which the token belongs.
+                * An int representing the user's expiry timestamp as milliseconds since the
+                    epoch, or 0 if the token was invalid.
+                * An optional int representing the timestamp of when the user renewed their
+                    account timestamp as milliseconds since the epoch. None if the account
+                    has not been renewed using the current token yet.
         """
-        return await self.db_pool.simple_select_one_onecol(
+        ret_dict = await self.db_pool.simple_select_one(
             table="account_validity",
             keyvalues={"renewal_token": renewal_token},
-            retcol="user_id",
+            retcols=["user_id", "expiration_ts_ms", "token_used_ts_ms"],
             desc="get_user_from_renewal_token",
         )
 
+        return (
+            ret_dict["user_id"],
+            ret_dict["expiration_ts_ms"],
+            ret_dict["token_used_ts_ms"],
+        )
+
     async def get_renewal_token_for_user(self, user_id: str) -> str:
         """Get the renewal token associated with a given user ID.
 
@@ -302,7 +332,7 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
             "get_users_expiring_soon",
             select_users_txn,
             self._clock.time_msec(),
-            self.config.account_validity.renew_at,
+            self.config.account_validity_renew_at,
         )
 
     async def set_renewal_mail_status(self, user_id: str, email_sent: bool) -> None:
@@ -964,11 +994,11 @@ class RegistrationWorkerStore(CacheInvalidationWorkerStore):
                 delta equal to 10% of the validity period.
         """
         now_ms = self._clock.time_msec()
-        expiration_ts = now_ms + self._account_validity.period
+        expiration_ts = now_ms + self._account_validity_period
 
         if use_delta:
             expiration_ts = self.rand.randrange(
-                expiration_ts - self._account_validity.startup_job_max_delta,
+                expiration_ts - self._account_validity_startup_job_max_delta,
                 expiration_ts,
             )
 
@@ -1412,7 +1442,7 @@ class RegistrationStore(StatsStore, RegistrationBackgroundUpdateStore):
         except self.database_engine.module.IntegrityError:
             raise StoreError(400, "User ID already taken.", errcode=Codes.USER_IN_USE)
 
-        if self._account_validity.enabled:
+        if self._account_validity_enabled:
             self.set_expiration_date_for_user_txn(txn, user_id)
 
         if create_profile_with_displayname:
diff --git a/synapse/storage/databases/main/roommember.py b/synapse/storage/databases/main/roommember.py
index fd525dce65..2a8532f8c1 100644
--- a/synapse/storage/databases/main/roommember.py
+++ b/synapse/storage/databases/main/roommember.py
@@ -13,7 +13,20 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Dict, FrozenSet, Iterable, List, Optional, Set, Tuple
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Dict,
+    FrozenSet,
+    Iterable,
+    List,
+    Optional,
+    Set,
+    Tuple,
+    Union,
+)
+
+import attr
 
 from synapse.api.constants import EventTypes, Membership
 from synapse.events import EventBase
@@ -33,7 +46,7 @@ from synapse.storage.roommember import (
     ProfileInfo,
     RoomsForUser,
 )
-from synapse.types import Collection, PersistedEventPosition, get_domain_from_id
+from synapse.types import PersistedEventPosition, StateMap, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import _CacheContext, cached, cachedList
@@ -53,6 +66,10 @@ class RoomMemberWorkerStore(EventsWorkerStore):
     def __init__(self, database: DatabasePool, db_conn, hs):
         super().__init__(database, db_conn, hs)
 
+        # Used by `_get_joined_hosts` to ensure only one thing mutates the cache
+        # at a time. Keyed by room_id.
+        self._joined_host_linearizer = Linearizer("_JoinedHostsCache")
+
         # Is the current_state_events.membership up to date? Or is the
         # background update still running?
         self._current_state_events_membership_up_to_date = False
@@ -730,19 +747,82 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cached(num_args=2, max_entries=10000, iterable=True)
     async def _get_joined_hosts(
-        self, room_id, state_group, current_state_ids, state_entry
-    ):
-        # We don't use `state_group`, its there so that we can cache based
-        # on it. However, its important that its never None, since two current_state's
-        # with a state_group of None are likely to be different.
+        self,
+        room_id: str,
+        state_group: int,
+        current_state_ids: StateMap[str],
+        state_entry: "_StateCacheEntry",
+    ) -> FrozenSet[str]:
+        # We don't use `state_group`, its there so that we can cache based on
+        # it. However, its important that its never None, since two
+        # current_state's with a state_group of None are likely to be different.
+        #
+        # The `state_group` must match the `state_entry.state_group` (if not None).
         assert state_group is not None
-
+        assert state_entry.state_group is None or state_entry.state_group == state_group
+
+        # We use a secondary cache of previous work to allow us to build up the
+        # joined hosts for the given state group based on previous state groups.
+        #
+        # We cache one object per room containing the results of the last state
+        # group we got joined hosts for. The idea is that generally
+        # `get_joined_hosts` is called with the "current" state group for the
+        # room, and so consecutive calls will be for consecutive state groups
+        # which point to the previous state group.
         cache = await self._get_joined_hosts_cache(room_id)
-        return await cache.get_destinations(state_entry)
+
+        # If the state group in the cache matches, we already have the data we need.
+        if state_entry.state_group == cache.state_group:
+            return frozenset(cache.hosts_to_joined_users)
+
+        # Since we'll mutate the cache we need to lock.
+        with (await self._joined_host_linearizer.queue(room_id)):
+            if state_entry.state_group == cache.state_group:
+                # Same state group, so nothing to do. We've already checked for
+                # this above, but the cache may have changed while waiting on
+                # the lock.
+                pass
+            elif state_entry.prev_group == cache.state_group:
+                # The cached work is for the previous state group, so we work out
+                # the delta.
+                for (typ, state_key), event_id in state_entry.delta_ids.items():
+                    if typ != EventTypes.Member:
+                        continue
+
+                    host = intern_string(get_domain_from_id(state_key))
+                    user_id = state_key
+                    known_joins = cache.hosts_to_joined_users.setdefault(host, set())
+
+                    event = await self.get_event(event_id)
+                    if event.membership == Membership.JOIN:
+                        known_joins.add(user_id)
+                    else:
+                        known_joins.discard(user_id)
+
+                        if not known_joins:
+                            cache.hosts_to_joined_users.pop(host, None)
+            else:
+                # The cache doesn't match the state group or prev state group,
+                # so we calculate the result from first principles.
+                joined_users = await self.get_joined_users_from_state(
+                    room_id, state_entry
+                )
+
+                cache.hosts_to_joined_users = {}
+                for user_id in joined_users:
+                    host = intern_string(get_domain_from_id(user_id))
+                    cache.hosts_to_joined_users.setdefault(host, set()).add(user_id)
+
+            if state_entry.state_group:
+                cache.state_group = state_entry.state_group
+            else:
+                cache.state_group = object()
+
+        return frozenset(cache.hosts_to_joined_users)
 
     @cached(max_entries=10000)
     def _get_joined_hosts_cache(self, room_id: str) -> "_JoinedHostsCache":
-        return _JoinedHostsCache(self, room_id)
+        return _JoinedHostsCache()
 
     @cached(num_args=2)
     async def did_forget(self, user_id: str, room_id: str) -> bool:
@@ -1052,71 +1132,18 @@ class RoomMemberStore(RoomMemberWorkerStore, RoomMemberBackgroundUpdateStore):
         await self.db_pool.runInteraction("forget_membership", f)
 
 
+@attr.s(slots=True)
 class _JoinedHostsCache:
-    """Cache for joined hosts in a room that is optimised to handle updates
-    via state deltas.
-    """
+    """The cached data used by the `_get_joined_hosts_cache`."""
 
-    def __init__(self, store, room_id):
-        self.store = store
-        self.room_id = room_id
+    # Dict of host to the set of their users in the room at the state group.
+    hosts_to_joined_users = attr.ib(type=Dict[str, Set[str]], factory=dict)
 
-        self.hosts_to_joined_users = {}
-
-        self.state_group = object()
-
-        self.linearizer = Linearizer("_JoinedHostsCache")
-
-        self._len = 0
-
-    async def get_destinations(self, state_entry: "_StateCacheEntry") -> Set[str]:
-        """Get set of destinations for a state entry
-
-        Args:
-            state_entry
-
-        Returns:
-            The destinations as a set.
-        """
-        if state_entry.state_group == self.state_group:
-            return frozenset(self.hosts_to_joined_users)
-
-        with (await self.linearizer.queue(())):
-            if state_entry.state_group == self.state_group:
-                pass
-            elif state_entry.prev_group == self.state_group:
-                for (typ, state_key), event_id in state_entry.delta_ids.items():
-                    if typ != EventTypes.Member:
-                        continue
-
-                    host = intern_string(get_domain_from_id(state_key))
-                    user_id = state_key
-                    known_joins = self.hosts_to_joined_users.setdefault(host, set())
-
-                    event = await self.store.get_event(event_id)
-                    if event.membership == Membership.JOIN:
-                        known_joins.add(user_id)
-                    else:
-                        known_joins.discard(user_id)
-
-                        if not known_joins:
-                            self.hosts_to_joined_users.pop(host, None)
-            else:
-                joined_users = await self.store.get_joined_users_from_state(
-                    self.room_id, state_entry
-                )
-
-                self.hosts_to_joined_users = {}
-                for user_id in joined_users:
-                    host = intern_string(get_domain_from_id(user_id))
-                    self.hosts_to_joined_users.setdefault(host, set()).add(user_id)
-
-            if state_entry.state_group:
-                self.state_group = state_entry.state_group
-            else:
-                self.state_group = object()
-            self._len = sum(len(v) for v in self.hosts_to_joined_users.values())
-        return frozenset(self.hosts_to_joined_users)
+    # The state group `hosts_to_joined_users` is derived from. Will be an object
+    # if the instance is newly created or if the state is not based on a state
+    # group. (An object is used as a sentinel value to ensure that it never is
+    # equal to anything else).
+    state_group = attr.ib(type=Union[object, int], factory=object)
 
     def __len__(self):
-        return self._len
+        return sum(len(v) for v in self.hosts_to_joined_users.values())
diff --git a/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql
new file mode 100644
index 0000000000..4836dac16e
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12account_validity_token_used_ts_ms.sql
@@ -0,0 +1,18 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Track when users renew their account using the value of the 'renewal_token' column.
+-- This field should be set to NULL after a fresh token is generated.
+ALTER TABLE account_validity ADD token_used_ts_ms BIGINT;
diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql
new file mode 100644
index 0000000000..b6ba0bda1a
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance.sql
@@ -0,0 +1,18 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add a column to specify which instance wrote the row. Historic rows have
+-- `NULL`, which indicates that the master instance wrote them.
+ALTER TABLE presence_stream ADD COLUMN instance_name TEXT;
diff --git a/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres
new file mode 100644
index 0000000000..02b182adf9
--- /dev/null
+++ b/synapse/storage/databases/main/schema/delta/59/12presence_stream_instance_seq.sql.postgres
@@ -0,0 +1,20 @@
+/* Copyright 2021 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+CREATE SEQUENCE IF NOT EXISTS presence_stream_sequence;
+
+SELECT setval('presence_stream_sequence', (
+    SELECT COALESCE(MAX(stream_id), 1) FROM presence_stream
+));
diff --git a/synapse/storage/databases/main/search.py b/synapse/storage/databases/main/search.py
index f47e4ed0bc..1c642c753b 100644
--- a/synapse/storage/databases/main/search.py
+++ b/synapse/storage/databases/main/search.py
@@ -15,7 +15,7 @@
 import logging
 import re
 from collections import namedtuple
-from typing import List, Optional, Set
+from typing import Collection, List, Optional, Set
 
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase
@@ -23,7 +23,6 @@ from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_cla
 from synapse.storage.database import DatabasePool
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.engines import PostgresEngine, Sqlite3Engine
-from synapse.types import Collection
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/storage/databases/main/stream.py b/synapse/storage/databases/main/stream.py
index db5ce4ea01..7581c7d3ff 100644
--- a/synapse/storage/databases/main/stream.py
+++ b/synapse/storage/databases/main/stream.py
@@ -37,7 +37,7 @@ what sort order was used:
 import abc
 import logging
 from collections import namedtuple
-from typing import TYPE_CHECKING, Dict, List, Optional, Set, Tuple
+from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set, Tuple
 
 from twisted.internet import defer
 
@@ -53,7 +53,7 @@ from synapse.storage.database import (
 from synapse.storage.databases.main.events_worker import EventsWorkerStore
 from synapse.storage.engines import BaseDatabaseEngine, PostgresEngine
 from synapse.storage.util.id_generators import MultiWriterIdGenerator
-from synapse.types import Collection, PersistedEventPosition, RoomStreamToken
+from synapse.types import PersistedEventPosition, RoomStreamToken
 from synapse.util.caches.descriptors import cached
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 87e040b014..33dc752d8f 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -17,7 +17,7 @@
 import itertools
 import logging
 from collections import deque, namedtuple
-from typing import Dict, Iterable, List, Optional, Set, Tuple
+from typing import Collection, Dict, Iterable, List, Optional, Set, Tuple
 
 from prometheus_client import Counter, Histogram
 
@@ -32,7 +32,6 @@ from synapse.storage.databases import Databases
 from synapse.storage.databases.main.events import DeltaState
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.types import (
-    Collection,
     PersistedEventPosition,
     RoomStreamToken,
     StateMap,
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index 05a9355974..7a2cbee426 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -17,7 +17,7 @@ import logging
 import os
 import re
 from collections import Counter
-from typing import Generator, Iterable, List, Optional, TextIO, Tuple
+from typing import Collection, Generator, Iterable, List, Optional, TextIO, Tuple
 
 import attr
 from typing_extensions import Counter as CounterType
@@ -27,7 +27,6 @@ from synapse.storage.database import LoggingDatabaseConnection
 from synapse.storage.engines import BaseDatabaseEngine
 from synapse.storage.engines.postgres import PostgresEngine
 from synapse.storage.types import Cursor
-from synapse.types import Collection
 
 logger = logging.getLogger(__name__)
 
diff --git a/synapse/types.py b/synapse/types.py
index 21654ae686..e52cd7ffd4 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -15,13 +15,11 @@
 import abc
 import re
 import string
-import sys
 from collections import namedtuple
 from typing import (
     TYPE_CHECKING,
     Any,
     Dict,
-    Iterable,
     Mapping,
     MutableMapping,
     Optional,
@@ -50,18 +48,6 @@ if TYPE_CHECKING:
     from synapse.appservice.api import ApplicationService
     from synapse.storage.databases.main import DataStore
 
-# define a version of typing.Collection that works on python 3.5
-if sys.version_info[:3] >= (3, 6, 0):
-    from typing import Collection
-else:
-    from typing import Container, Sized
-
-    T_co = TypeVar("T_co", covariant=True)
-
-    class Collection(Iterable[T_co], Container[T_co], Sized):  # type: ignore
-        __slots__ = ()
-
-
 # Define a state map type from type/state_key to T (usually an event ID or
 # event)
 T = TypeVar("T")
@@ -213,9 +199,8 @@ def get_localpart_from_id(string):
 DS = TypeVar("DS", bound="DomainSpecificString")
 
 
-class DomainSpecificString(
-    namedtuple("DomainSpecificString", ("localpart", "domain")), metaclass=abc.ABCMeta
-):
+@attr.s(slots=True, frozen=True, repr=False)
+class DomainSpecificString(metaclass=abc.ABCMeta):
     """Common base class among ID/name strings that have a local part and a
     domain name, prefixed with a sigil.
 
@@ -227,11 +212,8 @@ class DomainSpecificString(
 
     SIGIL = abc.abstractproperty()  # type: str  # type: ignore
 
-    # Deny iteration because it will bite you if you try to create a singleton
-    # set by:
-    #    users = set(user)
-    def __iter__(self):
-        raise ValueError("Attempted to iterate a %s" % (type(self).__name__,))
+    localpart = attr.ib(type=str)
+    domain = attr.ib(type=str)
 
     # Because this class is a namedtuple of strings and booleans, it is deeply
     # immutable.
@@ -286,30 +268,35 @@ class DomainSpecificString(
     __repr__ = to_string
 
 
+@attr.s(slots=True, frozen=True, repr=False)
 class UserID(DomainSpecificString):
     """Structure representing a user ID."""
 
     SIGIL = "@"
 
 
+@attr.s(slots=True, frozen=True, repr=False)
 class RoomAlias(DomainSpecificString):
     """Structure representing a room name."""
 
     SIGIL = "#"
 
 
+@attr.s(slots=True, frozen=True, repr=False)
 class RoomID(DomainSpecificString):
     """Structure representing a room id. """
 
     SIGIL = "!"
 
 
+@attr.s(slots=True, frozen=True, repr=False)
 class EventID(DomainSpecificString):
     """Structure representing an event id. """
 
     SIGIL = "$"
 
 
+@attr.s(slots=True, frozen=True, repr=False)
 class GroupID(DomainSpecificString):
     """Structure representing a group ID."""
 
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index 2529845c9e..25ea1bcc91 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -110,7 +110,7 @@ class ResponseCache(Generic[T]):
         return result.observe()
 
     def wrap(
-        self, key: T, callback: "Callable[..., Any]", *args: Any, **kwargs: Any
+        self, key: T, callback: Callable[..., Any], *args: Any, **kwargs: Any
     ) -> defer.Deferred:
         """Wrap together a *get* and *set* call, taking care of logcontexts
 
diff --git a/synapse/util/caches/stream_change_cache.py b/synapse/util/caches/stream_change_cache.py
index 0469e7d120..e81e468899 100644
--- a/synapse/util/caches/stream_change_cache.py
+++ b/synapse/util/caches/stream_change_cache.py
@@ -14,11 +14,10 @@
 
 import logging
 import math
-from typing import Dict, FrozenSet, List, Mapping, Optional, Set, Union
+from typing import Collection, Dict, FrozenSet, List, Mapping, Optional, Set, Union
 
 from sortedcontainers import SortedDict
 
-from synapse.types import Collection
 from synapse.util import caches
 
 logger = logging.getLogger(__name__)
diff --git a/synapse/util/iterutils.py b/synapse/util/iterutils.py
index 6f73b1d56d..abfdc29832 100644
--- a/synapse/util/iterutils.py
+++ b/synapse/util/iterutils.py
@@ -15,6 +15,7 @@
 import heapq
 from itertools import islice
 from typing import (
+    Collection,
     Dict,
     Generator,
     Iterable,
@@ -26,8 +27,6 @@ from typing import (
     TypeVar,
 )
 
-from synapse.types import Collection
-
 T = TypeVar("T")
 
 
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index c0e6fb9a60..cd82777f80 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -132,6 +132,38 @@ def parse_and_validate_server_name(server_name: str) -> Tuple[str, Optional[int]
     return host, port
 
 
+def valid_id_server_location(id_server: str) -> bool:
+    """Check whether an identity server location, such as the one passed as the
+    `id_server` parameter to `/_matrix/client/r0/account/3pid/bind`, is valid.
+
+    A valid identity server location consists of a valid hostname and optional
+    port number, optionally followed by any number of `/` delimited path
+    components, without any fragment or query string parts.
+
+    Args:
+        id_server: identity server location string to validate
+
+    Returns:
+        True if valid, False otherwise.
+    """
+
+    components = id_server.split("/", 1)
+
+    host = components[0]
+
+    try:
+        parse_and_validate_server_name(host)
+    except ValueError:
+        return False
+
+    if len(components) < 2:
+        # no path
+        return True
+
+    path = components[1]
+    return "#" not in path and "?" not in path
+
+
 def parse_and_validate_mxc_uri(mxc: str) -> Tuple[str, Optional[int], str]:
     """Parse the given string as an MXC URI
 
diff --git a/synapse/util/threepids.py b/synapse/util/threepids.py
index 281c5be4fb..a1cf1960b0 100644
--- a/synapse/util/threepids.py
+++ b/synapse/util/threepids.py
@@ -18,6 +18,16 @@ import re
 logger = logging.getLogger(__name__)
 
 
+# it's unclear what the maximum length of an email address is. RFC3696 (as corrected
+# by errata) says:
+#    the upper limit on address lengths should normally be considered to be 254.
+#
+# In practice, mail servers appear to be more tolerant and allow 400 characters
+# or so. Let's allow 500, which should be plenty for everyone.
+#
+MAX_EMAIL_ADDRESS_LENGTH = 500
+
+
 def check_3pid_allowed(hs, medium, address):
     """Checks whether a given format of 3PID is allowed to be used on this HS
 
@@ -70,3 +80,23 @@ def canonicalise_email(address: str) -> str:
         raise ValueError("Unable to parse email address")
 
     return parts[0].casefold() + "@" + parts[1].lower()
+
+
+def validate_email(address: str) -> str:
+    """Does some basic validation on an email address.
+
+    Returns the canonicalised email, as returned by `canonicalise_email`.
+
+    Raises a ValueError if the email is invalid.
+    """
+    # First we try canonicalising in case that fails
+    address = canonicalise_email(address)
+
+    # Email addresses have to be at least 3 characters.
+    if len(address) < 3:
+        raise ValueError("Unable to parse email address")
+
+    if len(address) > MAX_EMAIL_ADDRESS_LENGTH:
+        raise ValueError("Unable to parse email address")
+
+    return address