summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/__init__.py2
-rw-r--r--synapse/api/auth.py7
-rw-r--r--synapse/api/constants.py3
-rw-r--r--synapse/api/errors.py6
-rw-r--r--synapse/api/room_versions.py16
-rw-r--r--synapse/app/federation_sender.py20
-rw-r--r--synapse/crypto/event_signing.py48
-rw-r--r--synapse/event_auth.py58
-rw-r--r--synapse/events/__init__.py33
-rw-r--r--synapse/events/builder.py52
-rw-r--r--synapse/events/utils.py37
-rw-r--r--synapse/federation/federation_client.py60
-rw-r--r--synapse/federation/federation_server.py18
-rw-r--r--synapse/federation/send_queue.py53
-rw-r--r--synapse/federation/transport/client.py5
-rw-r--r--synapse/federation/transport/server.py26
-rw-r--r--synapse/handlers/admin.py62
-rw-r--r--synapse/handlers/device.py8
-rw-r--r--synapse/handlers/devicemessage.py63
-rw-r--r--synapse/handlers/directory.py7
-rw-r--r--synapse/handlers/e2e_keys.py3
-rw-r--r--synapse/handlers/federation.py186
-rw-r--r--synapse/handlers/identity.py4
-rw-r--r--synapse/handlers/message.py16
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/room.py96
-rw-r--r--synapse/handlers/stats.py2
-rw-r--r--synapse/handlers/sync.py1
-rw-r--r--synapse/http/matrixfederationclient.py4
-rw-r--r--synapse/python_dependencies.py4
-rw-r--r--synapse/replication/slave/storage/devices.py2
-rw-r--r--synapse/replication/tcp/client.py4
-rw-r--r--synapse/replication/tcp/protocol.py2
-rw-r--r--synapse/replication/tcp/resource.py9
-rw-r--r--synapse/rest/admin/users.py33
-rw-r--r--synapse/rest/client/v1/room.py12
-rw-r--r--synapse/rest/client/v2_alpha/account.py23
-rw-r--r--synapse/rest/client/v2_alpha/register.py3
-rw-r--r--synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py3
-rw-r--r--synapse/rest/media/v1/media_storage.py1
-rw-r--r--synapse/rest/media/v1/storage_provider.py6
-rw-r--r--synapse/server.pyi12
-rw-r--r--synapse/state/__init__.py2
-rw-r--r--synapse/state/v1.py4
-rw-r--r--synapse/state/v2.py4
-rw-r--r--synapse/storage/data_stores/main/cache.py22
-rw-r--r--synapse/storage/data_stores/main/devices.py71
-rw-r--r--synapse/storage/data_stores/main/events.py185
-rw-r--r--synapse/storage/data_stores/main/events_worker.py2
-rw-r--r--synapse/storage/data_stores/main/registration.py2
-rw-r--r--synapse/storage/data_stores/main/room.py94
-rw-r--r--synapse/storage/data_stores/main/roommember.py37
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql19
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql25
-rw-r--r--synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql24
-rw-r--r--synapse/storage/data_stores/main/state.py165
-rw-r--r--synapse/storage/data_stores/main/stats.py4
-rw-r--r--synapse/storage/engines/postgres.py42
-rw-r--r--synapse/storage/engines/sqlite.py5
-rw-r--r--synapse/storage/persist_events.py136
-rw-r--r--synapse/storage/prepare_database.py5
-rw-r--r--synapse/types.py7
-rw-r--r--synapse/util/stringutils.py17
-rw-r--r--synapse/visibility.py7
64 files changed, 1431 insertions, 460 deletions
diff --git a/synapse/__init__.py b/synapse/__init__.py
index a236888d3c..9d285fca38 100644
--- a/synapse/__init__.py
+++ b/synapse/__init__.py
@@ -36,7 +36,7 @@ try:
 except ImportError:
     pass
 
-__version__ = "1.9.1"
+__version__ = "1.10.0"
 
 if bool(os.environ.get("SYNAPSE_TEST_PATCH_LOG_CONTEXTS", False)):
     # We import here so that we don't have to install a bunch of deps when
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index 2cbfab2569..8b1277ad02 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -33,6 +33,7 @@ from synapse.api.errors import (
     MissingClientTokenError,
     ResourceLimitError,
 )
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.config.server import is_threepid_reserved
 from synapse.types import StateMap, UserID
 from synapse.util.caches import CACHE_SIZE_FACTOR, register_cache
@@ -77,15 +78,17 @@ class Auth(object):
         self._account_validity = hs.config.account_validity
 
     @defer.inlineCallbacks
-    def check_from_context(self, room_version, event, context, do_sig_check=True):
+    def check_from_context(self, room_version: str, event, context, do_sig_check=True):
         prev_state_ids = yield context.get_prev_state_ids()
         auth_events_ids = yield self.compute_auth_events(
             event, prev_state_ids, for_verification=True
         )
         auth_events = yield self.store.get_events(auth_events_ids)
         auth_events = {(e.type, e.state_key): e for e in itervalues(auth_events)}
+
+        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
         event_auth.check(
-            room_version, event, auth_events=auth_events, do_sig_check=do_sig_check
+            room_version_obj, event, auth_events=auth_events, do_sig_check=do_sig_check
         )
 
     @defer.inlineCallbacks
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 0ade47e624..cc8577552b 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -77,12 +77,11 @@ class EventTypes(object):
     Aliases = "m.room.aliases"
     Redaction = "m.room.redaction"
     ThirdPartyInvite = "m.room.third_party_invite"
-    Encryption = "m.room.encryption"
     RelatedGroups = "m.room.related_groups"
 
     RoomHistoryVisibility = "m.room.history_visibility"
     CanonicalAlias = "m.room.canonical_alias"
-    Encryption = "m.room.encryption"
+    Encrypted = "m.room.encrypted"
     RoomAvatar = "m.room.avatar"
     RoomEncryption = "m.room.encryption"
     GuestAccess = "m.room.guest_access"
diff --git a/synapse/api/errors.py b/synapse/api/errors.py
index 1c9456e583..0c20601600 100644
--- a/synapse/api/errors.py
+++ b/synapse/api/errors.py
@@ -402,11 +402,9 @@ class UnsupportedRoomVersionError(SynapseError):
     """The client's request to create a room used a room version that the server does
     not support."""
 
-    def __init__(self):
+    def __init__(self, msg="Homeserver does not support this room version"):
         super(UnsupportedRoomVersionError, self).__init__(
-            code=400,
-            msg="Homeserver does not support this room version",
-            errcode=Codes.UNSUPPORTED_ROOM_VERSION,
+            code=400, msg=msg, errcode=Codes.UNSUPPORTED_ROOM_VERSION,
         )
 
 
diff --git a/synapse/api/room_versions.py b/synapse/api/room_versions.py
index c6f50fd7b9..cf7ee60d3a 100644
--- a/synapse/api/room_versions.py
+++ b/synapse/api/room_versions.py
@@ -57,6 +57,9 @@ class RoomVersion(object):
     state_res = attr.ib()  # int; one of the StateResolutionVersions
     enforce_key_validity = attr.ib()  # bool
 
+    # bool: before MSC2260, anyone was allowed to send an aliases event
+    special_case_aliases_auth = attr.ib(type=bool, default=False)
+
 
 class RoomVersions(object):
     V1 = RoomVersion(
@@ -65,6 +68,7 @@ class RoomVersions(object):
         EventFormatVersions.V1,
         StateResolutionVersions.V1,
         enforce_key_validity=False,
+        special_case_aliases_auth=True,
     )
     V2 = RoomVersion(
         "2",
@@ -72,6 +76,7 @@ class RoomVersions(object):
         EventFormatVersions.V1,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
+        special_case_aliases_auth=True,
     )
     V3 = RoomVersion(
         "3",
@@ -79,6 +84,7 @@ class RoomVersions(object):
         EventFormatVersions.V2,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
+        special_case_aliases_auth=True,
     )
     V4 = RoomVersion(
         "4",
@@ -86,6 +92,7 @@ class RoomVersions(object):
         EventFormatVersions.V3,
         StateResolutionVersions.V2,
         enforce_key_validity=False,
+        special_case_aliases_auth=True,
     )
     V5 = RoomVersion(
         "5",
@@ -93,6 +100,14 @@ class RoomVersions(object):
         EventFormatVersions.V3,
         StateResolutionVersions.V2,
         enforce_key_validity=True,
+        special_case_aliases_auth=True,
+    )
+    MSC2260_DEV = RoomVersion(
+        "org.matrix.msc2260",
+        RoomDisposition.UNSTABLE,
+        EventFormatVersions.V3,
+        StateResolutionVersions.V2,
+        enforce_key_validity=True,
     )
 
 
@@ -104,5 +119,6 @@ KNOWN_ROOM_VERSIONS = {
         RoomVersions.V3,
         RoomVersions.V4,
         RoomVersions.V5,
+        RoomVersions.MSC2260_DEV,
     )
 }  # type: Dict[str, RoomVersion]
diff --git a/synapse/app/federation_sender.py b/synapse/app/federation_sender.py
index 38d11fdd0f..63a91f1177 100644
--- a/synapse/app/federation_sender.py
+++ b/synapse/app/federation_sender.py
@@ -38,7 +38,11 @@ from synapse.replication.slave.storage.receipts import SlavedReceiptsStore
 from synapse.replication.slave.storage.registration import SlavedRegistrationStore
 from synapse.replication.slave.storage.transactions import SlavedTransactionStore
 from synapse.replication.tcp.client import ReplicationClientHandler
-from synapse.replication.tcp.streams._base import ReceiptsStream
+from synapse.replication.tcp.streams._base import (
+    DeviceListsStream,
+    ReceiptsStream,
+    ToDeviceStream,
+)
 from synapse.server import HomeServer
 from synapse.storage.database import Database
 from synapse.types import ReadReceipt
@@ -256,6 +260,20 @@ class FederationSenderHandler(object):
                 "process_receipts_for_federation", self._on_new_receipts, rows
             )
 
+        # ... as well as device updates and messages
+        elif stream_name == DeviceListsStream.NAME:
+            hosts = set(row.destination for row in rows)
+            for host in hosts:
+                self.federation_sender.send_device_messages(host)
+
+        elif stream_name == ToDeviceStream.NAME:
+            # The to_device stream includes stuff to be pushed to both local
+            # clients and remote servers, so we ignore entities that start with
+            # '@' (since they'll be local users rather than destinations).
+            hosts = set(row.entity for row in rows if not row.entity.startswith("@"))
+            for host in hosts:
+                self.federation_sender.send_device_messages(host)
+
     @defer.inlineCallbacks
     def _on_new_receipts(self, rows):
         """
diff --git a/synapse/crypto/event_signing.py b/synapse/crypto/event_signing.py
index e65bd61d97..5f733c1cf5 100644
--- a/synapse/crypto/event_signing.py
+++ b/synapse/crypto/event_signing.py
@@ -1,6 +1,7 @@
 # -*- coding: utf-8 -*-
-
+#
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -17,13 +18,17 @@
 import collections.abc
 import hashlib
 import logging
+from typing import Dict
 
 from canonicaljson import encode_canonical_json
 from signedjson.sign import sign_json
+from signedjson.types import SigningKey
 from unpaddedbase64 import decode_base64, encode_base64
 
 from synapse.api.errors import Codes, SynapseError
+from synapse.api.room_versions import RoomVersion
 from synapse.events.utils import prune_event, prune_event_dict
+from synapse.types import JsonDict
 
 logger = logging.getLogger(__name__)
 
@@ -112,18 +117,28 @@ def compute_event_reference_hash(event, hash_algorithm=hashlib.sha256):
     return hashed.name, hashed.digest()
 
 
-def compute_event_signature(event_dict, signature_name, signing_key):
+def compute_event_signature(
+    room_version: RoomVersion,
+    event_dict: JsonDict,
+    signature_name: str,
+    signing_key: SigningKey,
+) -> Dict[str, Dict[str, str]]:
     """Compute the signature of the event for the given name and key.
 
     Args:
-        event_dict (dict): The event as a dict
-        signature_name (str): The name of the entity signing the event
+        room_version: the version of the room that this event is in.
+            (the room version determines the redaction algorithm and hence the
+            json to be signed)
+
+        event_dict: The event as a dict
+
+        signature_name: The name of the entity signing the event
             (typically the server's hostname).
-        signing_key (syutil.crypto.SigningKey): The key to sign with
+
+        signing_key: The key to sign with
 
     Returns:
-        dict[str, dict[str, str]]: Returns a dictionary in the same format of
-        an event's signatures field.
+        a dictionary in the same format of an event's signatures field.
     """
     redact_json = prune_event_dict(event_dict)
     redact_json.pop("age_ts", None)
@@ -137,23 +152,26 @@ def compute_event_signature(event_dict, signature_name, signing_key):
 
 
 def add_hashes_and_signatures(
-    event_dict, signature_name, signing_key, hash_algorithm=hashlib.sha256
+    room_version: RoomVersion,
+    event_dict: JsonDict,
+    signature_name: str,
+    signing_key: SigningKey,
 ):
     """Add content hash and sign the event
 
     Args:
-        event_dict (dict): The event to add hashes to and sign
-        signature_name (str): The name of the entity signing the event
+        room_version: the version of the room this event is in
+
+        event_dict: The event to add hashes to and sign
+        signature_name: The name of the entity signing the event
             (typically the server's hostname).
-        signing_key (syutil.crypto.SigningKey): The key to sign with
-        hash_algorithm: A hasher from `hashlib`, e.g. hashlib.sha256, to use
-            to hash the event
+        signing_key: The key to sign with
     """
 
-    name, digest = compute_content_hash(event_dict, hash_algorithm=hash_algorithm)
+    name, digest = compute_content_hash(event_dict, hash_algorithm=hashlib.sha256)
 
     event_dict.setdefault("hashes", {})[name] = encode_base64(digest)
 
     event_dict["signatures"] = compute_event_signature(
-        event_dict, signature_name=signature_name, signing_key=signing_key
+        room_version, event_dict, signature_name=signature_name, signing_key=signing_key
     )
diff --git a/synapse/event_auth.py b/synapse/event_auth.py
index e3a1ba47a0..472f165044 100644
--- a/synapse/event_auth.py
+++ b/synapse/event_auth.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014 - 2016 OpenMarket Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -23,17 +24,27 @@ from unpaddedbase64 import decode_base64
 
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.api.errors import AuthError, EventSizeError, SynapseError
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
+from synapse.api.room_versions import (
+    KNOWN_ROOM_VERSIONS,
+    EventFormatVersions,
+    RoomVersion,
+)
 from synapse.types import UserID, get_domain_from_id
 
 logger = logging.getLogger(__name__)
 
 
-def check(room_version, event, auth_events, do_sig_check=True, do_size_check=True):
+def check(
+    room_version_obj: RoomVersion,
+    event,
+    auth_events,
+    do_sig_check=True,
+    do_size_check=True,
+):
     """ Checks if this event is correctly authed.
 
     Args:
-        room_version (str): the version of the room
+        room_version_obj: the version of the room
         event: the event being checked.
         auth_events (dict: event-key -> event): the existing room state.
 
@@ -89,7 +100,12 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru
             if not event.signatures.get(event_id_domain):
                 raise AuthError(403, "Event not signed by sending server")
 
+    # Implementation of https://matrix.org/docs/spec/rooms/v1#authorization-rules
+    #
+    # 1. If type is m.room.create:
     if event.type == EventTypes.Create:
+        # 1b. If the domain of the room_id does not match the domain of the sender,
+        # reject.
         sender_domain = get_domain_from_id(event.sender)
         room_id_domain = get_domain_from_id(event.room_id)
         if room_id_domain != sender_domain:
@@ -97,39 +113,49 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru
                 403, "Creation event's room_id domain does not match sender's"
             )
 
-        room_version = event.content.get("room_version", "1")
-        if room_version not in KNOWN_ROOM_VERSIONS:
+        # 1c. If content.room_version is present and is not a recognised version, reject
+        room_version_prop = event.content.get("room_version", "1")
+        if room_version_prop not in KNOWN_ROOM_VERSIONS:
             raise AuthError(
-                403, "room appears to have unsupported version %s" % (room_version,)
+                403,
+                "room appears to have unsupported version %s" % (room_version_prop,),
             )
-        # FIXME
+
         logger.debug("Allowing! %s", event)
         return
 
+    # 3. If event does not have a m.room.create in its auth_events, reject.
     creation_event = auth_events.get((EventTypes.Create, ""), None)
-
     if not creation_event:
         raise AuthError(403, "No create event in auth events")
 
+    # additional check for m.federate
     creating_domain = get_domain_from_id(event.room_id)
     originating_domain = get_domain_from_id(event.sender)
     if creating_domain != originating_domain:
         if not _can_federate(event, auth_events):
             raise AuthError(403, "This room has been marked as unfederatable.")
 
-    # FIXME: Temp hack
+    # 4. If type is m.room.aliases
     if event.type == EventTypes.Aliases:
+        # 4a. If event has no state_key, reject
         if not event.is_state():
             raise AuthError(403, "Alias event must be a state event")
         if not event.state_key:
             raise AuthError(403, "Alias event must have non-empty state_key")
+
+        # 4b. If sender's domain doesn't matches [sic] state_key, reject
         sender_domain = get_domain_from_id(event.sender)
         if event.state_key != sender_domain:
             raise AuthError(
                 403, "Alias event's state_key does not match sender's domain"
             )
-        logger.debug("Allowing! %s", event)
-        return
+
+        # 4c. Otherwise, allow.
+        # This is removed by https://github.com/matrix-org/matrix-doc/pull/2260
+        if room_version_obj.special_case_aliases_auth:
+            logger.debug("Allowing! %s", event)
+            return
 
     if logger.isEnabledFor(logging.DEBUG):
         logger.debug("Auth events: %s", [a.event_id for a in auth_events.values()])
@@ -160,7 +186,7 @@ def check(room_version, event, auth_events, do_sig_check=True, do_size_check=Tru
         _check_power_levels(event, auth_events)
 
     if event.type == EventTypes.Redaction:
-        check_redaction(room_version, event, auth_events)
+        check_redaction(room_version_obj, event, auth_events)
 
     logger.debug("Allowing! %s", event)
 
@@ -386,7 +412,7 @@ def _can_send_event(event, auth_events):
     return True
 
 
-def check_redaction(room_version, event, auth_events):
+def check_redaction(room_version_obj: RoomVersion, event, auth_events):
     """Check whether the event sender is allowed to redact the target event.
 
     Returns:
@@ -406,11 +432,7 @@ def check_redaction(room_version, event, auth_events):
     if user_level >= redact_level:
         return False
 
-    v = KNOWN_ROOM_VERSIONS.get(room_version)
-    if not v:
-        raise RuntimeError("Unrecognized room version %r" % (room_version,))
-
-    if v.event_format == EventFormatVersions.V1:
+    if room_version_obj.event_format == EventFormatVersions.V1:
         redacter_domain = get_domain_from_id(event.event_id)
         redactee_domain = get_domain_from_id(event.redacts)
         if redacter_domain == redactee_domain:
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index 88ed6d764f..f813fa2fe7 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -23,6 +23,7 @@ from unpaddedbase64 import encode_base64
 
 from synapse.api.errors import UnsupportedRoomVersionError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, EventFormatVersions
+from synapse.types import JsonDict
 from synapse.util.caches import intern_dict
 from synapse.util.frozenutils import freeze
 
@@ -116,16 +117,32 @@ class _EventInternalMetadata(object):
         return getattr(self, "redacted", False)
 
 
-def _event_dict_property(key):
+_SENTINEL = object()
+
+
+def _event_dict_property(key, default=_SENTINEL):
+    """Creates a new property for the given key that delegates access to
+    `self._event_dict`.
+
+    The default is used if the key is missing from the `_event_dict`, if given,
+    otherwise an AttributeError will be raised.
+
+    Note: If a default is given then `hasattr` will always return true.
+    """
+
     # We want to be able to use hasattr with the event dict properties.
     # However, (on python3) hasattr expects AttributeError to be raised. Hence,
     # we need to transform the KeyError into an AttributeError
-    def getter(self):
+
+    def getter_raises(self):
         try:
             return self._event_dict[key]
         except KeyError:
             raise AttributeError(key)
 
+    def getter_default(self):
+        return self._event_dict.get(key, default)
+
     def setter(self, v):
         try:
             self._event_dict[key] = v
@@ -138,7 +155,11 @@ def _event_dict_property(key):
         except KeyError:
             raise AttributeError(key)
 
-    return property(getter, setter, delete)
+    if default is _SENTINEL:
+        # No default given, so use the getter that raises
+        return property(getter_raises, setter, delete)
+    else:
+        return property(getter_default, setter, delete)
 
 
 class EventBase(object):
@@ -165,7 +186,7 @@ class EventBase(object):
     origin = _event_dict_property("origin")
     origin_server_ts = _event_dict_property("origin_server_ts")
     prev_events = _event_dict_property("prev_events")
-    redacts = _event_dict_property("redacts")
+    redacts = _event_dict_property("redacts", None)
     room_id = _event_dict_property("room_id")
     sender = _event_dict_property("sender")
     user_id = _event_dict_property("sender")
@@ -177,7 +198,7 @@ class EventBase(object):
     def is_state(self):
         return hasattr(self, "state_key") and self.state_key is not None
 
-    def get_dict(self):
+    def get_dict(self) -> JsonDict:
         d = dict(self._event_dict)
         d.update({"signatures": self.signatures, "unsigned": dict(self.unsigned)})
 
@@ -189,7 +210,7 @@ class EventBase(object):
     def get_internal_metadata_dict(self):
         return self.internal_metadata.get_dict()
 
-    def get_pdu_json(self, time_now=None):
+    def get_pdu_json(self, time_now=None) -> JsonDict:
         pdu_json = self.get_dict()
 
         if time_now is not None and "age_ts" in pdu_json["unsigned"]:
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 3997751337..8d63ad6dc3 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -12,8 +12,10 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import Optional
 
 import attr
+from nacl.signing import SigningKey
 
 from twisted.internet import defer
 
@@ -23,13 +25,18 @@ from synapse.api.room_versions import (
     KNOWN_EVENT_FORMAT_VERSIONS,
     KNOWN_ROOM_VERSIONS,
     EventFormatVersions,
+    RoomVersion,
 )
 from synapse.crypto.event_signing import add_hashes_and_signatures
-from synapse.types import EventID
+from synapse.events import (
+    EventBase,
+    _EventInternalMetadata,
+    event_type_from_format_version,
+)
+from synapse.types import EventID, JsonDict
+from synapse.util import Clock
 from synapse.util.stringutils import random_string
 
-from . import _EventInternalMetadata, event_type_from_format_version
-
 
 @attr.s(slots=True, cmp=False, frozen=True)
 class EventBuilder(object):
@@ -40,7 +47,7 @@ class EventBuilder(object):
     content/unsigned/internal_metadata fields are still mutable)
 
     Attributes:
-        format_version (int): Event format version
+        room_version: Version of the target room
         room_id (str)
         type (str)
         sender (str)
@@ -63,7 +70,7 @@ class EventBuilder(object):
     _hostname = attr.ib()
     _signing_key = attr.ib()
 
-    format_version = attr.ib()
+    room_version = attr.ib(type=RoomVersion)
 
     room_id = attr.ib()
     type = attr.ib()
@@ -108,7 +115,8 @@ class EventBuilder(object):
         )
         auth_ids = yield self._auth.compute_auth_events(self, state_ids)
 
-        if self.format_version == EventFormatVersions.V1:
+        format_version = self.room_version.event_format
+        if format_version == EventFormatVersions.V1:
             auth_events = yield self._store.add_event_hashes(auth_ids)
             prev_events = yield self._store.add_event_hashes(prev_event_ids)
         else:
@@ -148,7 +156,7 @@ class EventBuilder(object):
             clock=self._clock,
             hostname=self._hostname,
             signing_key=self._signing_key,
-            format_version=self.format_version,
+            room_version=self.room_version,
             event_dict=event_dict,
             internal_metadata_dict=self.internal_metadata.get_dict(),
         )
@@ -201,7 +209,7 @@ class EventBuilderFactory(object):
             clock=self.clock,
             hostname=self.hostname,
             signing_key=self.signing_key,
-            format_version=room_version.event_format,
+            room_version=room_version,
             type=key_values["type"],
             state_key=key_values.get("state_key"),
             room_id=key_values["room_id"],
@@ -214,29 +222,19 @@ class EventBuilderFactory(object):
 
 
 def create_local_event_from_event_dict(
-    clock,
-    hostname,
-    signing_key,
-    format_version,
-    event_dict,
-    internal_metadata_dict=None,
-):
+    clock: Clock,
+    hostname: str,
+    signing_key: SigningKey,
+    room_version: RoomVersion,
+    event_dict: JsonDict,
+    internal_metadata_dict: Optional[JsonDict] = None,
+) -> EventBase:
     """Takes a fully formed event dict, ensuring that fields like `origin`
     and `origin_server_ts` have correct values for a locally produced event,
     then signs and hashes it.
-
-    Args:
-        clock (Clock)
-        hostname (str)
-        signing_key
-        format_version (int)
-        event_dict (dict)
-        internal_metadata_dict (dict|None)
-
-    Returns:
-        FrozenEvent
     """
 
+    format_version = room_version.event_format
     if format_version not in KNOWN_EVENT_FORMAT_VERSIONS:
         raise Exception("No event format defined for version %r" % (format_version,))
 
@@ -257,7 +255,7 @@ def create_local_event_from_event_dict(
 
     event_dict.setdefault("signatures", {})
 
-    add_hashes_and_signatures(event_dict, hostname, signing_key)
+    add_hashes_and_signatures(room_version, event_dict, hostname, signing_key)
     return event_type_from_format_version(format_version)(
         event_dict, internal_metadata_dict=internal_metadata_dict
     )
diff --git a/synapse/events/utils.py b/synapse/events/utils.py
index 07d1c5bcf0..f70f5032fb 100644
--- a/synapse/events/utils.py
+++ b/synapse/events/utils.py
@@ -12,8 +12,9 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
+import collections
 import re
+from typing import Mapping, Union
 
 from six import string_types
 
@@ -422,3 +423,37 @@ class EventClientSerializer(object):
         return yieldable_gather_results(
             self.serialize_event, events, time_now=time_now, **kwargs
         )
+
+
+def copy_power_levels_contents(
+    old_power_levels: Mapping[str, Union[int, Mapping[str, int]]]
+):
+    """Copy the content of a power_levels event, unfreezing frozendicts along the way
+
+    Raises:
+        TypeError if the input does not look like a valid power levels event content
+    """
+    if not isinstance(old_power_levels, collections.Mapping):
+        raise TypeError("Not a valid power-levels content: %r" % (old_power_levels,))
+
+    power_levels = {}
+    for k, v in old_power_levels.items():
+
+        if isinstance(v, int):
+            power_levels[k] = v
+            continue
+
+        if isinstance(v, collections.Mapping):
+            power_levels[k] = h = {}
+            for k1, v1 in v.items():
+                # we should only have one level of nesting
+                if not isinstance(v1, int):
+                    raise TypeError(
+                        "Invalid power_levels value for %s.%s: %r" % (k, k1, v1)
+                    )
+                h[k1] = v1
+            continue
+
+        raise TypeError("Invalid power_levels value for %s: %r" % (k, v))
+
+    return power_levels
diff --git a/synapse/federation/federation_client.py b/synapse/federation/federation_client.py
index af652a7659..f99d17a7de 100644
--- a/synapse/federation/federation_client.py
+++ b/synapse/federation/federation_client.py
@@ -17,6 +17,7 @@
 import copy
 import itertools
 import logging
+from typing import Dict, Iterable
 
 from prometheus_client import Counter
 
@@ -29,6 +30,7 @@ from synapse.api.errors import (
     FederationDeniedError,
     HttpResponseException,
     SynapseError,
+    UnsupportedRoomVersionError,
 )
 from synapse.api.room_versions import (
     KNOWN_ROOM_VERSIONS,
@@ -196,7 +198,7 @@ class FederationClient(FederationBase):
 
         logger.debug("backfill transaction_data=%r", transaction_data)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         pdus = [
@@ -334,7 +336,7 @@ class FederationClient(FederationBase):
     def get_event_auth(self, destination, room_id, event_id):
         res = yield self.transport_layer.get_event_auth(destination, room_id, event_id)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         auth_chain = [
@@ -385,6 +387,8 @@ class FederationClient(FederationBase):
                 return res
             except InvalidResponseError as e:
                 logger.warning("Failed to %s via %s: %s", description, destination, e)
+            except UnsupportedRoomVersionError:
+                raise
             except HttpResponseException as e:
                 if not 500 <= e.code < 600:
                     raise e.to_synapse_error()
@@ -404,7 +408,13 @@ class FederationClient(FederationBase):
         raise SynapseError(502, "Failed to %s via any server" % (description,))
 
     def make_membership_event(
-        self, destinations, room_id, user_id, membership, content, params
+        self,
+        destinations: Iterable[str],
+        room_id: str,
+        user_id: str,
+        membership: str,
+        content: dict,
+        params: Dict[str, str],
     ):
         """
         Creates an m.room.member event, with context, without participating in the room.
@@ -417,21 +427,23 @@ class FederationClient(FederationBase):
         Note that this does not append any events to any graphs.
 
         Args:
-            destinations (Iterable[str]): Candidate homeservers which are probably
+            destinations: Candidate homeservers which are probably
                 participating in the room.
-            room_id (str): The room in which the event will happen.
-            user_id (str): The user whose membership is being evented.
-            membership (str): The "membership" property of the event. Must be
-                one of "join" or "leave".
-            content (dict): Any additional data to put into the content field
-                of the event.
-            params (dict[str, str|Iterable[str]]): Query parameters to include in the
-                request.
+            room_id: The room in which the event will happen.
+            user_id: The user whose membership is being evented.
+            membership: The "membership" property of the event. Must be one of
+                "join" or "leave".
+            content: Any additional data to put into the content field of the
+                event.
+            params: Query parameters to include in the request.
         Return:
-            Deferred[tuple[str, FrozenEvent, int]]: resolves to a tuple of
-            `(origin, event, event_format)` where origin is the remote
-            homeserver which generated the event, and event_format is one of
-            `synapse.api.room_versions.EventFormatVersions`.
+            Deferred[Tuple[str, FrozenEvent, RoomVersion]]: resolves to a tuple of
+            `(origin, event, room_version)` where origin is the remote
+            homeserver which generated the event, and room_version is the
+            version of the room.
+
+            Fails with a `UnsupportedRoomVersionError` if remote responds with
+            a room version we don't understand.
 
             Fails with a ``SynapseError`` if the chosen remote server
             returns a 300/400 code.
@@ -453,8 +465,10 @@ class FederationClient(FederationBase):
 
             # Note: If not supplied, the room version may be either v1 or v2,
             # however either way the event format version will be v1.
-            room_version = ret.get("room_version", RoomVersions.V1.identifier)
-            event_format = room_version_to_event_format(room_version)
+            room_version_id = ret.get("room_version", RoomVersions.V1.identifier)
+            room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+            if not room_version:
+                raise UnsupportedRoomVersionError()
 
             pdu_dict = ret.get("event", None)
             if not isinstance(pdu_dict, dict):
@@ -474,11 +488,11 @@ class FederationClient(FederationBase):
                 self._clock,
                 self.hostname,
                 self.signing_key,
-                format_version=event_format,
+                room_version=room_version,
                 event_dict=pdu_dict,
             )
 
-            return (destination, ev, event_format)
+            return (destination, ev, room_version)
 
         return self._try_destination_list(
             "make_" + membership, destinations, send_request
@@ -633,7 +647,7 @@ class FederationClient(FederationBase):
 
     @defer.inlineCallbacks
     def send_invite(self, destination, room_id, event_id, pdu):
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
 
         content = yield self._do_send_invite(destination, pdu, room_version)
 
@@ -641,7 +655,7 @@ class FederationClient(FederationBase):
 
         logger.debug("Got response to send_invite: %s", pdu_dict)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
 
         pdu = event_from_pdu_json(pdu_dict, format_ver)
@@ -843,7 +857,7 @@ class FederationClient(FederationBase):
                 timeout=timeout,
             )
 
-            room_version = yield self.store.get_room_version(room_id)
+            room_version = yield self.store.get_room_version_id(room_id)
             format_ver = room_version_to_event_format(room_version)
 
             events = [
diff --git a/synapse/federation/federation_server.py b/synapse/federation/federation_server.py
index 8eddb3bf2c..a4c97ed458 100644
--- a/synapse/federation/federation_server.py
+++ b/synapse/federation/federation_server.py
@@ -234,7 +234,7 @@ class FederationServer(FederationBase):
                 continue
 
             try:
-                room_version = await self.store.get_room_version(room_id)
+                room_version = await self.store.get_room_version_id(room_id)
             except NotFoundError:
                 logger.info("Ignoring PDU for unknown room_id: %s", room_id)
                 continue
@@ -334,7 +334,7 @@ class FederationServer(FederationBase):
                 )
             )
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         resp["room_version"] = room_version
 
         return 200, resp
@@ -385,7 +385,7 @@ class FederationServer(FederationBase):
         origin_host, _ = parse_server_name(origin)
         await self.check_server_matches_acl(origin_host, room_id)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         if room_version not in supported_versions:
             logger.warning(
                 "Room version %s not in %s", room_version, supported_versions
@@ -410,14 +410,14 @@ class FederationServer(FederationBase):
         origin_host, _ = parse_server_name(origin)
         await self.check_server_matches_acl(origin_host, pdu.room_id)
         pdu = await self._check_sigs_and_hash(room_version, pdu)
-        ret_pdu = await self.handler.on_invite_request(origin, pdu)
+        ret_pdu = await self.handler.on_invite_request(origin, pdu, room_version)
         time_now = self._clock.time_msec()
         return {"event": ret_pdu.get_pdu_json(time_now)}
 
     async def on_send_join_request(self, origin, content, room_id):
         logger.debug("on_send_join_request: content: %s", content)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
         pdu = event_from_pdu_json(content, format_ver)
 
@@ -440,7 +440,7 @@ class FederationServer(FederationBase):
         await self.check_server_matches_acl(origin_host, room_id)
         pdu = await self.handler.on_make_leave_request(origin, room_id, user_id)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
 
         time_now = self._clock.time_msec()
         return {"event": pdu.get_pdu_json(time_now), "room_version": room_version}
@@ -448,7 +448,7 @@ class FederationServer(FederationBase):
     async def on_send_leave_request(self, origin, content, room_id):
         logger.debug("on_send_leave_request: content: %s", content)
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
         format_ver = room_version_to_event_format(room_version)
         pdu = event_from_pdu_json(content, format_ver)
 
@@ -495,7 +495,7 @@ class FederationServer(FederationBase):
             origin_host, _ = parse_server_name(origin)
             await self.check_server_matches_acl(origin_host, room_id)
 
-            room_version = await self.store.get_room_version(room_id)
+            room_version = await self.store.get_room_version_id(room_id)
             format_ver = room_version_to_event_format(room_version)
 
             auth_chain = [
@@ -664,7 +664,7 @@ class FederationServer(FederationBase):
                 logger.info("Accepting join PDU %s from %s", pdu.event_id, origin)
 
         # We've already checked that we know the room version by this point
-        room_version = await self.store.get_room_version(pdu.room_id)
+        room_version = await self.store.get_room_version_id(pdu.room_id)
 
         # Check signature.
         try:
diff --git a/synapse/federation/send_queue.py b/synapse/federation/send_queue.py
index 174f6e42be..001bb304ae 100644
--- a/synapse/federation/send_queue.py
+++ b/synapse/federation/send_queue.py
@@ -69,8 +69,6 @@ class FederationRemoteSendQueue(object):
 
         self.edus = SortedDict()  # stream position -> Edu
 
-        self.device_messages = SortedDict()  # stream position -> destination
-
         self.pos = 1
         self.pos_time = SortedDict()
 
@@ -92,7 +90,6 @@ class FederationRemoteSendQueue(object):
             "keyed_edu",
             "keyed_edu_changed",
             "edus",
-            "device_messages",
             "pos_time",
             "presence_destinations",
         ]:
@@ -171,12 +168,6 @@ class FederationRemoteSendQueue(object):
             for key in keys[:i]:
                 del self.edus[key]
 
-            # Delete things out of device map
-            keys = self.device_messages.keys()
-            i = self.device_messages.bisect_left(position_to_delete)
-            for key in keys[:i]:
-                del self.device_messages[key]
-
     def notify_new_events(self, current_id):
         """As per FederationSender"""
         # We don't need to replicate this as it gets sent down a different
@@ -249,9 +240,8 @@ class FederationRemoteSendQueue(object):
 
     def send_device_messages(self, destination):
         """As per FederationSender"""
-        pos = self._next_pos()
-        self.device_messages[pos] = destination
-        self.notifier.on_new_replication_data()
+        # We don't need to replicate this as it gets sent down a different
+        # stream.
 
     def get_current_token(self):
         return self.pos - 1
@@ -339,14 +329,6 @@ class FederationRemoteSendQueue(object):
         for (pos, edu) in edus:
             rows.append((pos, EduRow(edu)))
 
-        # Fetch changed device messages
-        i = self.device_messages.bisect_right(from_token)
-        j = self.device_messages.bisect_right(to_token) + 1
-        device_messages = {v: k for k, v in self.device_messages.items()[i:j]}
-
-        for (destination, pos) in iteritems(device_messages):
-            rows.append((pos, DeviceRow(destination=destination)))
-
         # Sort rows based on pos
         rows.sort()
 
@@ -472,28 +454,9 @@ class EduRow(BaseFederationRow, namedtuple("EduRow", ("edu",))):  # Edu
         buff.edus.setdefault(self.edu.destination, []).append(self.edu)
 
 
-class DeviceRow(BaseFederationRow, namedtuple("DeviceRow", ("destination",))):  # str
-    """Streams the fact that either a) there is pending to device messages for
-    users on the remote, or b) a local users device has changed and needs to
-    be sent to the remote.
-    """
-
-    TypeId = "d"
-
-    @staticmethod
-    def from_data(data):
-        return DeviceRow(destination=data["destination"])
-
-    def to_data(self):
-        return {"destination": self.destination}
-
-    def add_to_buffer(self, buff):
-        buff.device_destinations.add(self.destination)
-
-
 TypeToRow = {
     Row.TypeId: Row
-    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow, DeviceRow)
+    for Row in (PresenceRow, PresenceDestinationsRow, KeyedEduRow, EduRow,)
 }
 
 
@@ -504,7 +467,6 @@ ParsedFederationStreamData = namedtuple(
         "presence_destinations",  # list of tuples of UserPresenceState and destinations
         "keyed_edus",  # dict of destination -> { key -> Edu }
         "edus",  # dict of destination -> [Edu]
-        "device_destinations",  # set of destinations
     ),
 )
 
@@ -523,11 +485,7 @@ def process_rows_for_federation(transaction_queue, rows):
     # them into the appropriate collection and then send them off.
 
     buff = ParsedFederationStreamData(
-        presence=[],
-        presence_destinations=[],
-        keyed_edus={},
-        edus={},
-        device_destinations=set(),
+        presence=[], presence_destinations=[], keyed_edus={}, edus={},
     )
 
     # Parse the rows in the stream and add to the buffer
@@ -555,6 +513,3 @@ def process_rows_for_federation(transaction_queue, rows):
     for destination, edu_list in iteritems(buff.edus):
         for edu in edu_list:
             transaction_queue.send_edu(edu, None)
-
-    for destination in buff.device_destinations:
-        transaction_queue.send_device_messages(destination)
diff --git a/synapse/federation/transport/client.py b/synapse/federation/transport/client.py
index 198257414b..dc563538de 100644
--- a/synapse/federation/transport/client.py
+++ b/synapse/federation/transport/client.py
@@ -15,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from typing import Any, Dict
 
 from six.moves import urllib
 
@@ -352,7 +353,9 @@ class TransportLayerClient(object):
         else:
             path = _create_v1_path("/publicRooms")
 
-            args = {"include_all_networks": "true" if include_all_networks else "false"}
+            args = {
+                "include_all_networks": "true" if include_all_networks else "false"
+            }  # type: Dict[str, Any]
             if third_party_instance_id:
                 args["third_party_instance_id"] = (third_party_instance_id,)
             if limit:
diff --git a/synapse/federation/transport/server.py b/synapse/federation/transport/server.py
index d8cf9ed299..125eadd796 100644
--- a/synapse/federation/transport/server.py
+++ b/synapse/federation/transport/server.py
@@ -18,6 +18,7 @@
 import functools
 import logging
 import re
+from typing import Optional, Tuple, Type
 
 from twisted.internet.defer import maybeDeferred
 
@@ -267,6 +268,8 @@ class BaseFederationServlet(object):
                 returned.
     """
 
+    PATH = ""  # Overridden in subclasses, the regex to match against the path.
+
     REQUIRE_AUTH = True
 
     PREFIX = FEDERATION_V1_PREFIX  # Allows specifying the API version
@@ -347,9 +350,6 @@ class BaseFederationServlet(object):
 
             return response
 
-        # Extra logic that functools.wraps() doesn't finish
-        new_func.__self__ = func.__self__
-
         return new_func
 
     def register(self, server):
@@ -824,7 +824,7 @@ class PublicRoomList(BaseFederationServlet):
         if not self.allow_access:
             raise FederationDeniedError(origin)
 
-        limit = int(content.get("limit", 100))
+        limit = int(content.get("limit", 100))  # type: Optional[int]
         since_token = content.get("since", None)
         search_filter = content.get("filter", None)
 
@@ -971,7 +971,7 @@ class FederationGroupsAddRoomsConfigServlet(BaseFederationServlet):
         if get_domain_from_id(requester_user_id) != origin:
             raise SynapseError(403, "requester_user_id doesn't match origin")
 
-        result = await self.groups_handler.update_room_in_group(
+        result = await self.handler.update_room_in_group(
             group_id, requester_user_id, room_id, config_key, content
         )
 
@@ -1422,11 +1422,13 @@ FEDERATION_SERVLET_CLASSES = (
     On3pidBindServlet,
     FederationVersionServlet,
     RoomComplexityServlet,
-)
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
-OPENID_SERVLET_CLASSES = (OpenIdUserInfo,)
+OPENID_SERVLET_CLASSES = (
+    OpenIdUserInfo,
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
-ROOM_LIST_CLASSES = (PublicRoomList,)
+ROOM_LIST_CLASSES = (PublicRoomList,)  # type: Tuple[Type[PublicRoomList], ...]
 
 GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsProfileServlet,
@@ -1447,17 +1449,19 @@ GROUP_SERVER_SERVLET_CLASSES = (
     FederationGroupsAddRoomsServlet,
     FederationGroupsAddRoomsConfigServlet,
     FederationGroupsSettingJoinPolicyServlet,
-)
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
 
 GROUP_LOCAL_SERVLET_CLASSES = (
     FederationGroupsLocalInviteServlet,
     FederationGroupsRemoveLocalUserServlet,
     FederationGroupsBulkPublicisedServlet,
-)
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
 
-GROUP_ATTESTATION_SERVLET_CLASSES = (FederationGroupsRenewAttestaionServlet,)
+GROUP_ATTESTATION_SERVLET_CLASSES = (
+    FederationGroupsRenewAttestaionServlet,
+)  # type: Tuple[Type[BaseFederationServlet], ...]
 
 DEFAULT_SERVLET_GROUPS = (
     "federation",
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index 60a7c938bc..9205865231 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -62,68 +62,6 @@ class AdminHandler(BaseHandler):
             ret["avatar_url"] = profile.avatar_url
         return ret
 
-    async def get_users(self):
-        """Function to retrieve a list of users in users table.
-
-        Args:
-        Returns:
-            defer.Deferred: resolves to list[dict[str, Any]]
-        """
-        ret = await self.store.get_users()
-
-        return ret
-
-    async def get_users_paginate(self, start, limit, name, guests, deactivated):
-        """Function to retrieve a paginated list of users from
-        users list. This will return a json list of users.
-
-        Args:
-            start (int): start number to begin the query from
-            limit (int): number of rows to retrieve
-            name (string): filter for user names
-            guests (bool): whether to in include guest users
-            deactivated (bool): whether to include deactivated users
-        Returns:
-            defer.Deferred: resolves to json list[dict[str, Any]]
-        """
-        ret = await self.store.get_users_paginate(
-            start, limit, name, guests, deactivated
-        )
-
-        return ret
-
-    async def search_users(self, term):
-        """Function to search users list for one or more users with
-        the matched term.
-
-        Args:
-            term (str): search term
-        Returns:
-            defer.Deferred: resolves to list[dict[str, Any]]
-        """
-        ret = await self.store.search_users(term)
-
-        return ret
-
-    def get_user_server_admin(self, user):
-        """
-        Get the admin bit on a user.
-
-        Args:
-            user_id (UserID): the (necessarily local) user to manipulate
-        """
-        return self.store.is_server_admin(user)
-
-    def set_user_server_admin(self, user, admin):
-        """
-        Set the admin bit on a user.
-
-        Args:
-            user_id (UserID): the (necessarily local) user to manipulate
-            admin (bool): whether or not the user should be an admin of this server
-        """
-        return self.store.set_server_admin(user, admin)
-
     async def export_user_data(self, user_id, writer):
         """Write all data we have on the user to the given writer.
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 26ef5e150c..a9bd431486 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -598,7 +598,13 @@ class DeviceListUpdater(object):
             # happens if we've missed updates.
             resync = yield self._need_to_do_resync(user_id, pending_updates)
 
-            logger.debug("Need to re-sync devices for %r? %r", user_id, resync)
+            if logger.isEnabledFor(logging.INFO):
+                logger.info(
+                    "Received device list update for %s, requiring resync: %s. Devices: %s",
+                    user_id,
+                    resync,
+                    ", ".join(u[0] for u in pending_updates),
+                )
 
             if resync:
                 yield self.user_device_resync(user_id)
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 73b9e120f5..05c4b3eec0 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -14,12 +14,14 @@
 # limitations under the License.
 
 import logging
+from typing import Any, Dict
 
 from canonicaljson import json
 
 from twisted.internet import defer
 
 from synapse.api.errors import SynapseError
+from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
     get_active_span_text_map,
     log_kv,
@@ -47,6 +49,8 @@ class DeviceMessageHandler(object):
             "m.direct_to_device", self.on_direct_to_device_edu
         )
 
+        self._device_list_updater = hs.get_device_handler().device_list_updater
+
     @defer.inlineCallbacks
     def on_direct_to_device_edu(self, origin, content):
         local_messages = {}
@@ -65,6 +69,9 @@ class DeviceMessageHandler(object):
                 logger.warning("Request for keys for non-local user %s", user_id)
                 raise SynapseError(400, "Not a user here")
 
+            if not by_device:
+                continue
+
             messages_by_device = {
                 device_id: {
                     "content": message_content,
@@ -73,8 +80,11 @@ class DeviceMessageHandler(object):
                 }
                 for device_id, message_content in by_device.items()
             }
-            if messages_by_device:
-                local_messages[user_id] = messages_by_device
+            local_messages[user_id] = messages_by_device
+
+            yield self._check_for_unknown_devices(
+                message_type, sender_user_id, by_device
+            )
 
         stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
             origin, message_id, local_messages
@@ -85,6 +95,55 @@ class DeviceMessageHandler(object):
         )
 
     @defer.inlineCallbacks
+    def _check_for_unknown_devices(
+        self,
+        message_type: str,
+        sender_user_id: str,
+        by_device: Dict[str, Dict[str, Any]],
+    ):
+        """Checks inbound device messages for unkown remote devices, and if
+        found marks the remote cache for the user as stale.
+        """
+
+        if message_type != "m.room_key_request":
+            return
+
+        # Get the sending device IDs
+        requesting_device_ids = set()
+        for message_content in by_device.values():
+            device_id = message_content.get("requesting_device_id")
+            requesting_device_ids.add(device_id)
+
+        # Check if we are tracking the devices of the remote user.
+        room_ids = yield self.store.get_rooms_for_user(sender_user_id)
+        if not room_ids:
+            logger.info(
+                "Received device message from remote device we don't"
+                " share a room with: %s %s",
+                sender_user_id,
+                requesting_device_ids,
+            )
+            return
+
+        # If we are tracking check that we know about the sending
+        # devices.
+        cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
+
+        unknown_devices = requesting_device_ids - set(cached_devices)
+        if unknown_devices:
+            logger.info(
+                "Received device message from remote device not in our cache: %s %s",
+                sender_user_id,
+                unknown_devices,
+            )
+            yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
+
+            # Immediately attempt a resync in the background
+            run_in_background(
+                self._device_list_updater.user_device_resync, sender_user_id
+            )
+
+    @defer.inlineCallbacks
     def send_device_message(self, sender_user_id, message_type, messages):
         set_tag("number_of_messages", len(messages))
         set_tag("sender", sender_user_id)
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index a07d2f1a17..8c5980cb0c 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -151,7 +151,12 @@ class DirectoryHandler(BaseHandler):
 
         yield self._create_association(room_alias, room_id, servers, creator=user_id)
         if send_event:
-            yield self.send_room_alias_update_event(requester, room_id)
+            try:
+                yield self.send_room_alias_update_event(requester, room_id)
+            except AuthError as e:
+                # sending the aliases event may fail due to the user not having
+                # permission in the room; this is permitted.
+                logger.info("Skipping updating aliases event due to auth error %s", e)
 
     @defer.inlineCallbacks
     def delete_association(self, requester, room_alias, send_event=True):
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 2d889364d4..95a9d71f41 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -208,8 +208,9 @@ class E2eKeysHandler(object):
                         )
 
                     user_devices = user_devices["devices"]
+                    user_results = results.setdefault(user_id, {})
                     for device in user_devices:
-                        results[user_id] = {device["device_id"]: device["keys"]}
+                        user_results[device["device_id"]] = device["keys"]
                     user_ids_updated.append(user_id)
                 except Exception as e:
                     failures[destination] = _exception_to_failure(e)
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d4f9a792fc..e9441bbeff 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -44,7 +44,7 @@ from synapse.api.errors import (
     StoreError,
     SynapseError,
 )
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
 from synapse.event_auth import auth_types_for_event
 from synapse.events import EventBase
@@ -57,6 +57,7 @@ from synapse.logging.context import (
     run_in_background,
 )
 from synapse.logging.utils import log_function
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
     ReplicationFederationSendEventsRestServlet,
@@ -156,6 +157,13 @@ class FederationHandler(BaseHandler):
             hs
         )
 
+        if hs.config.worker_app:
+            self._user_device_resync = ReplicationUserDevicesResyncRestServlet.make_client(
+                hs
+            )
+        else:
+            self._device_list_updater = hs.get_device_handler().device_list_updater
+
         # When joining a room we need to queue any events for that room up
         self.room_queues = {}
         self._room_pdu_linearizer = Linearizer("fed_room_pdu")
@@ -380,7 +388,7 @@ class FederationHandler(BaseHandler):
                             for x in remote_state:
                                 event_map[x.event_id] = x
 
-                    room_version = await self.store.get_room_version(room_id)
+                    room_version = await self.store.get_room_version_id(room_id)
                     state_map = await resolve_events_with_store(
                         room_id,
                         room_version,
@@ -703,8 +711,20 @@ class FederationHandler(BaseHandler):
 
         if not room:
             try:
+                prev_state_ids = await context.get_prev_state_ids()
+                create_event = await self.store.get_event(
+                    prev_state_ids[(EventTypes.Create, "")]
+                )
+
+                room_version_id = create_event.content.get(
+                    "room_version", RoomVersions.V1.identifier
+                )
+
                 await self.store.store_room(
-                    room_id=room_id, room_creator_user_id="", is_public=False
+                    room_id=room_id,
+                    room_creator_user_id="",
+                    is_public=False,
+                    room_version=KNOWN_ROOM_VERSIONS[room_version_id],
                 )
             except StoreError:
                 logger.exception("Failed to store room.")
@@ -730,6 +750,78 @@ class FederationHandler(BaseHandler):
                     user = UserID.from_string(event.state_key)
                     await self.user_joined_room(user, room_id)
 
+        # For encrypted messages we check that we know about the sending device,
+        # if we don't then we mark the device cache for that user as stale.
+        if event.type == EventTypes.Encrypted:
+            device_id = event.content.get("device_id")
+            sender_key = event.content.get("sender_key")
+
+            cached_devices = await self.store.get_cached_devices_for_user(event.sender)
+
+            resync = False  # Whether we should resync device lists.
+
+            device = None
+            if device_id is not None:
+                device = cached_devices.get(device_id)
+                if device is None:
+                    logger.info(
+                        "Received event from remote device not in our cache: %s %s",
+                        event.sender,
+                        device_id,
+                    )
+                    resync = True
+
+            # We also check if the `sender_key` matches what we expect.
+            if sender_key is not None:
+                # Figure out what sender key we're expecting. If we know the
+                # device and recognize the algorithm then we can work out the
+                # exact key to expect. Otherwise check it matches any key we
+                # have for that device.
+                if device:
+                    keys = device.get("keys", {}).get("keys", {})
+
+                    if event.content.get("algorithm") == "m.megolm.v1.aes-sha2":
+                        # For this algorithm we expect a curve25519 key.
+                        key_name = "curve25519:%s" % (device_id,)
+                        current_keys = [keys.get(key_name)]
+                    else:
+                        # We don't know understand the algorithm, so we just
+                        # check it matches a key for the device.
+                        current_keys = keys.values()
+                elif device_id:
+                    # We don't have any keys for the device ID.
+                    current_keys = []
+                else:
+                    # The event didn't include a device ID, so we just look for
+                    # keys across all devices.
+                    current_keys = (
+                        key
+                        for device in cached_devices
+                        for key in device.get("keys", {}).get("keys", {}).values()
+                    )
+
+                # We now check that the sender key matches (one of) the expected
+                # keys.
+                if sender_key not in current_keys:
+                    logger.info(
+                        "Received event from remote device with unexpected sender key: %s %s: %s",
+                        event.sender,
+                        device_id or "<no device_id>",
+                        sender_key,
+                    )
+                    resync = True
+
+            if resync:
+                await self.store.mark_remote_user_device_cache_as_stale(event.sender)
+
+                # Immediately attempt a resync in the background
+                if self.config.worker_app:
+                    return run_in_background(self._user_device_resync, event.sender)
+                else:
+                    return run_in_background(
+                        self._device_list_updater.user_device_resync, event.sender
+                    )
+
     @log_function
     async def backfill(self, dest, room_id, limit, extremities):
         """ Trigger a backfill request to `dest` for the given `room_id`
@@ -1064,7 +1156,7 @@ class FederationHandler(BaseHandler):
         Logs a warning if we can't find the given event.
         """
 
-        room_version = await self.store.get_room_version(room_id)
+        room_version = await self.store.get_room_version_id(room_id)
 
         event_infos = []
 
@@ -1186,7 +1278,7 @@ class FederationHandler(BaseHandler):
         """
         logger.debug("Joining %s to %s", joinee, room_id)
 
-        origin, event, event_format_version = yield self._make_and_verify_event(
+        origin, event, room_version_obj = yield self._make_and_verify_event(
             target_hosts,
             room_id,
             joinee,
@@ -1214,6 +1306,8 @@ class FederationHandler(BaseHandler):
                 target_hosts.insert(0, origin)
             except ValueError:
                 pass
+
+            event_format_version = room_version_obj.event_format
             ret = yield self.federation_client.send_join(
                 target_hosts, event, event_format_version
             )
@@ -1234,13 +1328,18 @@ class FederationHandler(BaseHandler):
 
             try:
                 yield self.store.store_room(
-                    room_id=room_id, room_creator_user_id="", is_public=False
+                    room_id=room_id,
+                    room_creator_user_id="",
+                    is_public=False,
+                    room_version=room_version_obj,
                 )
             except Exception:
                 # FIXME
                 pass
 
-            yield self._persist_auth_tree(origin, auth_chain, state, event)
+            yield self._persist_auth_tree(
+                origin, auth_chain, state, event, room_version_obj
+            )
 
             # Check whether this room is the result of an upgrade of a room we already know
             # about. If so, migrate over user information
@@ -1320,7 +1419,7 @@ class FederationHandler(BaseHandler):
 
         event_content = {"membership": Membership.JOIN}
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
 
         builder = self.event_builder_factory.new(
             room_version,
@@ -1429,13 +1528,13 @@ class FederationHandler(BaseHandler):
         return {"state": list(state.values()), "auth_chain": auth_chain}
 
     @defer.inlineCallbacks
-    def on_invite_request(self, origin, pdu):
+    def on_invite_request(
+        self, origin: str, event: EventBase, room_version: RoomVersion
+    ):
         """ We've got an invite event. Process and persist it. Sign it.
 
         Respond with the now signed event.
         """
-        event = pdu
-
         if event.state_key is None:
             raise SynapseError(400, "The invite event did not have a state key")
 
@@ -1475,7 +1574,10 @@ class FederationHandler(BaseHandler):
 
         event.signatures.update(
             compute_event_signature(
-                event.get_pdu_json(), self.hs.hostname, self.hs.config.signing_key[0]
+                room_version,
+                event.get_pdu_json(),
+                self.hs.hostname,
+                self.hs.config.signing_key[0],
             )
         )
 
@@ -1486,7 +1588,7 @@ class FederationHandler(BaseHandler):
 
     @defer.inlineCallbacks
     def do_remotely_reject_invite(self, target_hosts, room_id, user_id, content):
-        origin, event, event_format_version = yield self._make_and_verify_event(
+        origin, event, room_version = yield self._make_and_verify_event(
             target_hosts, room_id, user_id, "leave", content=content
         )
         # Mark as outlier as we don't have any state for this event; we're not
@@ -1513,7 +1615,11 @@ class FederationHandler(BaseHandler):
     def _make_and_verify_event(
         self, target_hosts, room_id, user_id, membership, content={}, params=None
     ):
-        origin, event, format_ver = yield self.federation_client.make_membership_event(
+        (
+            origin,
+            event,
+            room_version,
+        ) = yield self.federation_client.make_membership_event(
             target_hosts, room_id, user_id, membership, content, params=params
         )
 
@@ -1525,7 +1631,7 @@ class FederationHandler(BaseHandler):
         assert event.user_id == user_id
         assert event.state_key == user_id
         assert event.room_id == room_id
-        return origin, event, format_ver
+        return origin, event, room_version
 
     @defer.inlineCallbacks
     @log_function
@@ -1550,7 +1656,7 @@ class FederationHandler(BaseHandler):
             )
             raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
         builder = self.event_builder_factory.new(
             room_version,
             {
@@ -1810,7 +1916,14 @@ class FederationHandler(BaseHandler):
         )
 
     @defer.inlineCallbacks
-    def _persist_auth_tree(self, origin, auth_events, state, event):
+    def _persist_auth_tree(
+        self,
+        origin: str,
+        auth_events: List[EventBase],
+        state: List[EventBase],
+        event: EventBase,
+        room_version: RoomVersion,
+    ):
         """Checks the auth chain is valid (and passes auth checks) for the
         state and event. Then persists the auth chain and state atomically.
         Persists the event separately. Notifies about the persisted events
@@ -1819,10 +1932,12 @@ class FederationHandler(BaseHandler):
         Will attempt to fetch missing auth events.
 
         Args:
-            origin (str): Where the events came from
-            auth_events (list)
-            state (list)
-            event (Event)
+            origin: Where the events came from
+            auth_events
+            state
+            event
+            room_version: The room version we expect this room to have, and
+                will raise if it doesn't match the version in the create event.
 
         Returns:
             Deferred
@@ -1848,10 +1963,13 @@ class FederationHandler(BaseHandler):
             # invalid, and it would fail auth checks anyway.
             raise SynapseError(400, "No create event in state")
 
-        room_version = create_event.content.get(
+        room_version_id = create_event.content.get(
             "room_version", RoomVersions.V1.identifier
         )
 
+        if room_version.identifier != room_version_id:
+            raise SynapseError(400, "Room version mismatch")
+
         missing_auth_events = set()
         for e in itertools.chain(auth_events, state, [event]):
             for e_id in e.auth_event_ids():
@@ -1860,7 +1978,11 @@ class FederationHandler(BaseHandler):
 
         for e_id in missing_auth_events:
             m_ev = yield self.federation_client.get_pdu(
-                [origin], e_id, room_version=room_version, outlier=True, timeout=10000
+                [origin],
+                e_id,
+                room_version=room_version.identifier,
+                outlier=True,
+                timeout=10000,
             )
             if m_ev and m_ev.event_id == e_id:
                 event_map[e_id] = m_ev
@@ -1986,7 +2108,8 @@ class FederationHandler(BaseHandler):
                 do_soft_fail_check = False
 
         if do_soft_fail_check:
-            room_version = yield self.store.get_room_version(event.room_id)
+            room_version = yield self.store.get_room_version_id(event.room_id)
+            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
             # Calculate the "current state".
             if state is not None:
@@ -2036,7 +2159,9 @@ class FederationHandler(BaseHandler):
             }
 
             try:
-                event_auth.check(room_version, event, auth_events=current_auth_events)
+                event_auth.check(
+                    room_version_obj, event, auth_events=current_auth_events
+                )
             except AuthError as e:
                 logger.warning("Soft-failing %r because %s", event, e)
                 event.internal_metadata.soft_failed = True
@@ -2119,7 +2244,8 @@ class FederationHandler(BaseHandler):
         Returns:
             defer.Deferred[EventContext]: updated context object
         """
-        room_version = yield self.store.get_room_version(event.room_id)
+        room_version = yield self.store.get_room_version_id(event.room_id)
+        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
         try:
             context = yield self._update_auth_events_and_context_for_auth(
@@ -2137,7 +2263,7 @@ class FederationHandler(BaseHandler):
             )
 
         try:
-            event_auth.check(room_version, event, auth_events=auth_events)
+            event_auth.check(room_version_obj, event, auth_events=auth_events)
         except AuthError as e:
             logger.warning("Failed auth resolution for %r because %s", event, e)
             context.rejected = RejectedReason.AUTH_ERROR
@@ -2290,7 +2416,7 @@ class FederationHandler(BaseHandler):
         remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
         remote_state = remote_auth_events.values()
 
-        room_version = yield self.store.get_room_version(event.room_id)
+        room_version = yield self.store.get_room_version_id(event.room_id)
         new_state = yield self.state_handler.resolve_events(
             room_version, (local_state, remote_state), event
         )
@@ -2514,7 +2640,7 @@ class FederationHandler(BaseHandler):
         }
 
         if (yield self.auth.check_host_in_room(room_id, self.hs.hostname)):
-            room_version = yield self.store.get_room_version(room_id)
+            room_version = yield self.store.get_room_version_id(room_id)
             builder = self.event_builder_factory.new(room_version, event_dict)
 
             EventValidator().validate_builder(builder)
@@ -2577,7 +2703,7 @@ class FederationHandler(BaseHandler):
         Returns:
             Deferred: resolves (to None)
         """
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
 
         # NB: event_dict has a particular specced format we might need to fudge
         # if we change event formats too much.
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index 000fbf090f..23f07832e7 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -38,7 +38,7 @@ from synapse.api.errors import (
 from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.http.client import SimpleHttpClient
 from synapse.util.hash import sha256_and_url_safe_base64
-from synapse.util.stringutils import random_string
+from synapse.util.stringutils import assert_valid_client_secret, random_string
 
 from ._base import BaseHandler
 
@@ -84,6 +84,8 @@ class IdentityHandler(BaseHandler):
             raise SynapseError(
                 400, "Missing param client_secret in creds", errcode=Codes.MISSING_PARAM
             )
+        assert_valid_client_secret(client_secret)
+
         session_id = creds.get("sid")
         if not session_id:
             raise SynapseError(
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 8ea3aca2f4..bdf16c84d3 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -40,7 +40,7 @@ from synapse.api.errors import (
     NotFoundError,
     SynapseError,
 )
-from synapse.api.room_versions import RoomVersions
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersions
 from synapse.api.urls import ConsentURIBuilder
 from synapse.events.validator import EventValidator
 from synapse.logging.context import run_in_background
@@ -459,7 +459,9 @@ class EventCreationHandler(object):
             room_version = event_dict["content"]["room_version"]
         else:
             try:
-                room_version = yield self.store.get_room_version(event_dict["room_id"])
+                room_version = yield self.store.get_room_version_id(
+                    event_dict["room_id"]
+                )
             except NotFoundError:
                 raise AuthError(403, "Unknown room")
 
@@ -788,7 +790,7 @@ class EventCreationHandler(object):
         ):
             room_version = event.content.get("room_version", RoomVersions.V1.identifier)
         else:
-            room_version = yield self.store.get_room_version(event.room_id)
+            room_version = yield self.store.get_room_version_id(event.room_id)
 
         event_allowed = yield self.third_party_event_rules.check_event_allowed(
             event, context
@@ -962,9 +964,13 @@ class EventCreationHandler(object):
             )
             auth_events = yield self.store.get_events(auth_events_ids)
             auth_events = {(e.type, e.state_key): e for e in auth_events.values()}
-            room_version = yield self.store.get_room_version(event.room_id)
 
-            if event_auth.check_redaction(room_version, event, auth_events=auth_events):
+            room_version = yield self.store.get_room_version_id(event.room_id)
+            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+            if event_auth.check_redaction(
+                room_version_obj, event, auth_events=auth_events
+            ):
                 # this user doesn't have 'redact' rights, so we need to do some more
                 # checks on the original event. Let's start by checking the original
                 # event exists.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 71d76202c9..caf841a643 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -281,7 +281,7 @@ class PaginationHandler(object):
         """Purge the given room from the database"""
         with (await self.pagination_lock.write(room_id)):
             # check we know about the room
-            await self.store.get_room_version(room_id)
+            await self.store.get_room_version_id(room_id)
 
             # first check that we have no users in this room
             joined = await defer.maybeDeferred(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 9f50196ea7..b609a65f47 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -29,7 +29,8 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
-from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
+from synapse.events.utils import copy_power_levels_contents
 from synapse.http.endpoint import parse_and_validate_server_name
 from synapse.storage.state import StateFilter
 from synapse.types import (
@@ -100,13 +101,15 @@ class RoomCreationHandler(BaseHandler):
         self.third_party_event_rules = hs.get_third_party_event_rules()
 
     @defer.inlineCallbacks
-    def upgrade_room(self, requester, old_room_id, new_version):
+    def upgrade_room(
+        self, requester: Requester, old_room_id: str, new_version: RoomVersion
+    ):
         """Replace a room with a new room with a different version
 
         Args:
-            requester (synapse.types.Requester): the user requesting the upgrade
-            old_room_id (unicode): the id of the room to be replaced
-            new_version (unicode): the new room version to use
+            requester: the user requesting the upgrade
+            old_room_id: the id of the room to be replaced
+            new_version: the new room version to use
 
         Returns:
             Deferred[unicode]: the new room id
@@ -151,7 +154,7 @@ class RoomCreationHandler(BaseHandler):
         if r is None:
             raise NotFoundError("Unknown room id %s" % (old_room_id,))
         new_room_id = yield self._generate_room_id(
-            creator_id=user_id, is_public=r["is_public"]
+            creator_id=user_id, is_public=r["is_public"], room_version=new_version,
         )
 
         logger.info("Creating new room %s to replace %s", new_room_id, old_room_id)
@@ -175,7 +178,7 @@ class RoomCreationHandler(BaseHandler):
             },
             token_id=requester.access_token_id,
         )
-        old_room_version = yield self.store.get_room_version(old_room_id)
+        old_room_version = yield self.store.get_room_version_id(old_room_id)
         yield self.auth.check_from_context(
             old_room_version, tombstone_event, tombstone_context
         )
@@ -284,7 +287,16 @@ class RoomCreationHandler(BaseHandler):
             except AuthError as e:
                 logger.warning("Unable to update PLs in old room: %s", e)
 
-        logger.info("Setting correct PLs in new room to %s", old_room_pl_state.content)
+        new_pl_content = copy_power_levels_contents(old_room_pl_state.content)
+
+        # pre-msc2260 rooms may not have the right setting for aliases. If no other
+        # value is set, set it now.
+        events_default = new_pl_content.get("events_default", 0)
+        new_pl_content.setdefault("events", {}).setdefault(
+            EventTypes.Aliases, events_default
+        )
+
+        logger.info("Setting correct PLs in new room to %s", new_pl_content)
         yield self.event_creation_handler.create_and_send_nonmember_event(
             requester,
             {
@@ -292,25 +304,29 @@ class RoomCreationHandler(BaseHandler):
                 "state_key": "",
                 "room_id": new_room_id,
                 "sender": requester.user.to_string(),
-                "content": old_room_pl_state.content,
+                "content": new_pl_content,
             },
             ratelimit=False,
         )
 
     @defer.inlineCallbacks
     def clone_existing_room(
-        self, requester, old_room_id, new_room_id, new_room_version, tombstone_event_id
+        self,
+        requester: Requester,
+        old_room_id: str,
+        new_room_id: str,
+        new_room_version: RoomVersion,
+        tombstone_event_id: str,
     ):
         """Populate a new room based on an old room
 
         Args:
-            requester (synapse.types.Requester): the user requesting the upgrade
-            old_room_id (unicode): the id of the room to be replaced
-            new_room_id (unicode): the id to give the new room (should already have been
+            requester: the user requesting the upgrade
+            old_room_id : the id of the room to be replaced
+            new_room_id: the id to give the new room (should already have been
                 created with _gemerate_room_id())
-            new_room_version (unicode): the new room version to use
-            tombstone_event_id (unicode|str): the ID of the tombstone event in the old
-                room.
+            new_room_version: the new room version to use
+            tombstone_event_id: the ID of the tombstone event in the old room.
         Returns:
             Deferred
         """
@@ -320,7 +336,7 @@ class RoomCreationHandler(BaseHandler):
             raise SynapseError(403, "You are not permitted to create rooms")
 
         creation_content = {
-            "room_version": new_room_version,
+            "room_version": new_room_version.identifier,
             "predecessor": {"room_id": old_room_id, "event_id": tombstone_event_id},
         }
 
@@ -344,7 +360,7 @@ class RoomCreationHandler(BaseHandler):
             (EventTypes.RoomHistoryVisibility, ""),
             (EventTypes.GuestAccess, ""),
             (EventTypes.RoomAvatar, ""),
-            (EventTypes.Encryption, ""),
+            (EventTypes.RoomEncryption, ""),
             (EventTypes.ServerACL, ""),
             (EventTypes.RelatedGroups, ""),
             (EventTypes.PowerLevels, ""),
@@ -361,6 +377,15 @@ class RoomCreationHandler(BaseHandler):
             if old_event:
                 initial_state[k] = old_event.content
 
+        # deep-copy the power-levels event before we start modifying it
+        # note that if frozen_dicts are enabled, `power_levels` will be a frozen
+        # dict so we can't just copy.deepcopy it.
+        initial_state[
+            (EventTypes.PowerLevels, "")
+        ] = power_levels = copy_power_levels_contents(
+            initial_state[(EventTypes.PowerLevels, "")]
+        )
+
         # Resolve the minimum power level required to send any state event
         # We will give the upgrading user this power level temporarily (if necessary) such that
         # they are able to copy all of the state events over, then revert them back to their
@@ -369,8 +394,6 @@ class RoomCreationHandler(BaseHandler):
         # Copy over user power levels now as this will not be possible with >100PL users once
         # the room has been created
 
-        power_levels = initial_state[(EventTypes.PowerLevels, "")]
-
         # Calculate the minimum power level needed to clone the room
         event_power_levels = power_levels.get("events", {})
         state_default = power_levels.get("state_default", 0)
@@ -380,16 +403,7 @@ class RoomCreationHandler(BaseHandler):
         # Raise the requester's power level in the new room if necessary
         current_power_level = power_levels["users"][user_id]
         if current_power_level < needed_power_level:
-            # make sure we copy the event content rather than overwriting it.
-            # note that if frozen_dicts are enabled, `power_levels` will be a frozen
-            # dict so we can't just copy.deepcopy it.
-
-            new_power_levels = {k: v for k, v in power_levels.items() if k != "users"}
-            new_power_levels["users"] = {
-                k: v for k, v in power_levels.get("users", {}).items() if k != user_id
-            }
-            new_power_levels["users"][user_id] = needed_power_level
-            initial_state[(EventTypes.PowerLevels, "")] = new_power_levels
+            power_levels["users"][user_id] = needed_power_level
 
         yield self._send_events_for_new_room(
             requester,
@@ -577,14 +591,15 @@ class RoomCreationHandler(BaseHandler):
         if ratelimit:
             yield self.ratelimit(requester)
 
-        room_version = config.get(
+        room_version_id = config.get(
             "room_version", self.config.default_room_version.identifier
         )
 
-        if not isinstance(room_version, string_types):
+        if not isinstance(room_version_id, string_types):
             raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
 
-        if room_version not in KNOWN_ROOM_VERSIONS:
+        room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
+        if room_version is None:
             raise SynapseError(
                 400,
                 "Your homeserver does not support this room version",
@@ -631,7 +646,9 @@ class RoomCreationHandler(BaseHandler):
         visibility = config.get("visibility", None)
         is_public = visibility == "public"
 
-        room_id = yield self._generate_room_id(creator_id=user_id, is_public=is_public)
+        room_id = yield self._generate_room_id(
+            creator_id=user_id, is_public=is_public, room_version=room_version,
+        )
 
         directory_handler = self.hs.get_handlers().directory_handler
         if room_alias:
@@ -660,7 +677,7 @@ class RoomCreationHandler(BaseHandler):
         creation_content = config.get("creation_content", {})
 
         # override any attempt to set room versions via the creation_content
-        creation_content["room_version"] = room_version
+        creation_content["room_version"] = room_version.identifier
 
         yield self._send_events_for_new_room(
             requester,
@@ -804,6 +821,10 @@ class RoomCreationHandler(BaseHandler):
                     EventTypes.RoomHistoryVisibility: 100,
                     EventTypes.CanonicalAlias: 50,
                     EventTypes.RoomAvatar: 50,
+                    # MSC2260: Allow everybody to send alias events by default
+                    # This will be reudundant on pre-MSC2260 rooms, since the
+                    # aliases event is special-cased.
+                    EventTypes.Aliases: 0,
                 },
                 "events_default": 0,
                 "state_default": 50,
@@ -849,7 +870,9 @@ class RoomCreationHandler(BaseHandler):
             yield send(etype=etype, state_key=state_key, content=content)
 
     @defer.inlineCallbacks
-    def _generate_room_id(self, creator_id, is_public):
+    def _generate_room_id(
+        self, creator_id: str, is_public: str, room_version: RoomVersion,
+    ):
         # autogen room IDs and try to create it. We may clash, so just
         # try a few times till one goes through, giving up eventually.
         attempts = 0
@@ -863,6 +886,7 @@ class RoomCreationHandler(BaseHandler):
                     room_id=gen_room_id,
                     room_creator_user_id=creator_id,
                     is_public=is_public,
+                    room_version=room_version,
                 )
                 return gen_room_id
             except StoreError:
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 7f7d56390e..68e6edace5 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -286,7 +286,7 @@ class StatsHandler(StateDeltasHandler):
                 room_state["history_visibility"] = event_content.get(
                     "history_visibility"
                 )
-            elif typ == EventTypes.Encryption:
+            elif typ == EventTypes.RoomEncryption:
                 room_state["encryption"] = event_content.get("algorithm")
             elif typ == EventTypes.Name:
                 room_state["name"] = event_content.get("name")
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index cd95f85e3f..2b62fd83fd 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -883,6 +883,7 @@ class SyncHandler(object):
             for e in sync_config.filter_collection.filter_room_state(
                 list(state.values())
             )
+            if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
         }
 
     async def unread_notifs_for_room_id(self, room_id, sync_config):
diff --git a/synapse/http/matrixfederationclient.py b/synapse/http/matrixfederationclient.py
index 16765d54e0..6f1bb04d8b 100644
--- a/synapse/http/matrixfederationclient.py
+++ b/synapse/http/matrixfederationclient.py
@@ -408,6 +408,8 @@ class MatrixFederationHttpClient(object):
                         _sec_timeout,
                     )
 
+                    outgoing_requests_counter.labels(method_bytes).inc()
+
                     try:
                         with Measure(self.clock, "outbound_request"):
                             # we don't want all the fancy cookie and redirect handling
@@ -440,6 +442,8 @@ class MatrixFederationHttpClient(object):
                         response.phrase.decode("ascii", errors="replace"),
                     )
 
+                    incoming_responses_counter.labels(method_bytes, response.code).inc()
+
                     set_tag(tags.HTTP_STATUS_CODE, response.code)
 
                     if 200 <= response.code < 300:
diff --git a/synapse/python_dependencies.py b/synapse/python_dependencies.py
index 5871feaafd..8de8cb2c12 100644
--- a/synapse/python_dependencies.py
+++ b/synapse/python_dependencies.py
@@ -1,6 +1,7 @@
 # Copyright 2015, 2016 OpenMarket Ltd
 # Copyright 2017 Vector Creations Ltd
 # Copyright 2018 New Vector Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -43,7 +44,8 @@ REQUIREMENTS = [
     "frozendict>=1",
     "unpaddedbase64>=1.1.0",
     "canonicaljson>=1.1.3",
-    "signedjson>=1.0.0",
+    # we use the type definitions added in signedjson 1.1.
+    "signedjson>=1.1.0",
     "pynacl>=1.2.1",
     "idna>=2.5",
     # validating SSL certs for IP addresses requires service_identity 18.1.
diff --git a/synapse/replication/slave/storage/devices.py b/synapse/replication/slave/storage/devices.py
index dc625e0d7a..1c77687eea 100644
--- a/synapse/replication/slave/storage/devices.py
+++ b/synapse/replication/slave/storage/devices.py
@@ -72,6 +72,6 @@ class SlavedDeviceStore(EndToEndKeyWorkerStore, DeviceWorkerStore, BaseSlavedSto
                 destination, token
             )
 
-        self._get_cached_devices_for_user.invalidate((user_id,))
+        self.get_cached_devices_for_user.invalidate((user_id,))
         self._get_cached_user_device.invalidate_many((user_id,))
         self.get_device_list_last_stream_id_for_remote.invalidate((user_id,))
diff --git a/synapse/replication/tcp/client.py b/synapse/replication/tcp/client.py
index fc06a7b053..02ab5b66ea 100644
--- a/synapse/replication/tcp/client.py
+++ b/synapse/replication/tcp/client.py
@@ -31,6 +31,7 @@ from .commands import (
     Command,
     FederationAckCommand,
     InvalidateCacheCommand,
+    RemoteServerUpCommand,
     RemovePusherCommand,
     UserIpCommand,
     UserSyncCommand,
@@ -210,6 +211,9 @@ class ReplicationClientHandler(AbstractReplicationClientHandler):
         cmd = UserIpCommand(user_id, access_token, ip, user_agent, device_id, last_seen)
         self.send_command(cmd)
 
+    def send_remote_server_up(self, server: str):
+        self.send_command(RemoteServerUpCommand(server))
+
     def await_sync(self, data):
         """Returns a deferred that is resolved when we receive a SYNC command
         with given data.
diff --git a/synapse/replication/tcp/protocol.py b/synapse/replication/tcp/protocol.py
index 131e5acb09..bc1482a9bb 100644
--- a/synapse/replication/tcp/protocol.py
+++ b/synapse/replication/tcp/protocol.py
@@ -459,7 +459,7 @@ class ServerReplicationStreamProtocol(BaseReplicationStreamProtocol):
         await self.streamer.on_remove_pusher(cmd.app_id, cmd.push_key, cmd.user_id)
 
     async def on_INVALIDATE_CACHE(self, cmd):
-        self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
+        await self.streamer.on_invalidate_cache(cmd.cache_func, cmd.keys)
 
     async def on_REMOTE_SERVER_UP(self, cmd: RemoteServerUpCommand):
         self.streamer.on_remote_server_up(cmd.data)
diff --git a/synapse/replication/tcp/resource.py b/synapse/replication/tcp/resource.py
index 6ebf944f66..ce60ae2e07 100644
--- a/synapse/replication/tcp/resource.py
+++ b/synapse/replication/tcp/resource.py
@@ -17,7 +17,7 @@
 
 import logging
 import random
-from typing import List
+from typing import Any, List
 
 from six import itervalues
 
@@ -271,11 +271,14 @@ class ReplicationStreamer(object):
         self.notifier.on_new_replication_data()
 
     @measure_func("repl.on_invalidate_cache")
-    def on_invalidate_cache(self, cache_func, keys):
+    async def on_invalidate_cache(self, cache_func: str, keys: List[Any]):
         """The client has asked us to invalidate a cache
         """
         invalidate_cache_counter.inc()
-        getattr(self.store, cache_func).invalidate(tuple(keys))
+
+        # We invalidate the cache locally, but then also stream that to other
+        # workers.
+        await self.store.invalidate_cache_and_stream(cache_func, tuple(keys))
 
     @measure_func("repl.on_user_ip")
     async def on_user_ip(
diff --git a/synapse/rest/admin/users.py b/synapse/rest/admin/users.py
index 52d27fa3e3..3455741195 100644
--- a/synapse/rest/admin/users.py
+++ b/synapse/rest/admin/users.py
@@ -45,6 +45,7 @@ class UsersRestServlet(RestServlet):
 
     def __init__(self, hs):
         self.hs = hs
+        self.store = hs.get_datastore()
         self.auth = hs.get_auth()
         self.admin_handler = hs.get_handlers().admin_handler
 
@@ -55,7 +56,7 @@ class UsersRestServlet(RestServlet):
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "Can only users a local user")
 
-        ret = await self.admin_handler.get_users()
+        ret = await self.store.get_users()
 
         return 200, ret
 
@@ -80,6 +81,7 @@ class UsersRestServletV2(RestServlet):
 
     def __init__(self, hs):
         self.hs = hs
+        self.store = hs.get_datastore()
         self.auth = hs.get_auth()
         self.admin_handler = hs.get_handlers().admin_handler
 
@@ -92,7 +94,7 @@ class UsersRestServletV2(RestServlet):
         guests = parse_boolean(request, "guests", default=True)
         deactivated = parse_boolean(request, "deactivated", default=False)
 
-        users = await self.admin_handler.get_users_paginate(
+        users = await self.store.get_users_paginate(
             start, limit, user_id, guests, deactivated
         )
         ret = {"users": users}
@@ -151,7 +153,8 @@ class UserRestServletV2(RestServlet):
         return 200, ret
 
     async def on_PUT(self, request, user_id):
-        await assert_requester_is_admin(self.auth, request)
+        requester = await self.auth.get_user_by_req(request)
+        await assert_user_is_admin(self.auth, requester.user)
 
         target_user = UserID.from_string(user_id)
         body = parse_json_object_from_request(request)
@@ -162,8 +165,6 @@ class UserRestServletV2(RestServlet):
         user = await self.admin_handler.get_user(target_user)
 
         if user:  # modify user
-            requester = await self.auth.get_user_by_req(request)
-
             if "displayname" in body:
                 await self.profile_handler.set_displayname(
                     target_user, requester, body["displayname"], True
@@ -210,11 +211,8 @@ class UserRestServletV2(RestServlet):
             return 200, user
 
         else:  # create user
-            if "password" not in body:
-                raise SynapseError(
-                    400, "password must be specified", errcode=Codes.BAD_JSON
-                )
-            elif (
+            password = body.get("password")
+            if password is not None and (
                 not isinstance(body["password"], text_type)
                 or len(body["password"]) > 512
             ):
@@ -229,7 +227,7 @@ class UserRestServletV2(RestServlet):
 
             user_id = await self.registration_handler.register_user(
                 localpart=target_user.localpart,
-                password=body["password"],
+                password=password,
                 admin=bool(admin),
                 default_display_name=displayname,
                 user_type=user_type,
@@ -516,8 +514,8 @@ class SearchUsersRestServlet(RestServlet):
     PATTERNS = historical_admin_path_patterns("/search_users/(?P<target_user_id>[^/]*)")
 
     def __init__(self, hs):
-        self.store = hs.get_datastore()
         self.hs = hs
+        self.store = hs.get_datastore()
         self.auth = hs.get_auth()
         self.handlers = hs.get_handlers()
 
@@ -540,7 +538,7 @@ class SearchUsersRestServlet(RestServlet):
         term = parse_string(request, "term", required=True)
         logger.info("term: %s ", term)
 
-        ret = await self.handlers.admin_handler.search_users(term)
+        ret = await self.handlers.store.search_users(term)
         return 200, ret
 
 
@@ -574,8 +572,8 @@ class UserAdminServlet(RestServlet):
 
     def __init__(self, hs):
         self.hs = hs
+        self.store = hs.get_datastore()
         self.auth = hs.get_auth()
-        self.handlers = hs.get_handlers()
 
     async def on_GET(self, request, user_id):
         await assert_requester_is_admin(self.auth, request)
@@ -585,8 +583,7 @@ class UserAdminServlet(RestServlet):
         if not self.hs.is_mine(target_user):
             raise SynapseError(400, "Only local users can be admins of this homeserver")
 
-        is_admin = await self.handlers.admin_handler.get_user_server_admin(target_user)
-        is_admin = bool(is_admin)
+        is_admin = await self.store.is_server_admin(target_user)
 
         return 200, {"admin": is_admin}
 
@@ -609,8 +606,6 @@ class UserAdminServlet(RestServlet):
         if target_user == auth_user and not set_admin_to:
             raise SynapseError(400, "You may not demote yourself.")
 
-        await self.handlers.admin_handler.set_user_server_admin(
-            target_user, set_admin_to
-        )
+        await self.store.set_user_server_admin(target_user, set_admin_to)
 
         return 200, {}
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 5aef8238b8..6f31584c51 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -184,6 +184,12 @@ class RoomStateEventRestServlet(TransactionRestServlet):
 
         content = parse_json_object_from_request(request)
 
+        if event_type == EventTypes.Aliases:
+            # MSC2260
+            raise SynapseError(
+                400, "Cannot send m.room.aliases events via /rooms/{room_id}/state"
+            )
+
         event_dict = {
             "type": event_type,
             "content": content,
@@ -231,6 +237,12 @@ class RoomSendEventRestServlet(TransactionRestServlet):
         requester = await self.auth.get_user_by_req(request, allow_guest=True)
         content = parse_json_object_from_request(request)
 
+        if event_type == EventTypes.Aliases:
+            # MSC2260
+            raise SynapseError(
+                400, "Cannot send m.room.aliases events via /rooms/{room_id}/send"
+            )
+
         event_dict = {
             "type": event_type,
             "content": content,
diff --git a/synapse/rest/client/v2_alpha/account.py b/synapse/rest/client/v2_alpha/account.py
index fc240f5cf8..dc837d6c75 100644
--- a/synapse/rest/client/v2_alpha/account.py
+++ b/synapse/rest/client/v2_alpha/account.py
@@ -30,6 +30,7 @@ from synapse.http.servlet import (
 )
 from synapse.push.mailer import Mailer, load_jinja2_templates
 from synapse.util.msisdn import phone_number_to_msisdn
+from synapse.util.stringutils import assert_valid_client_secret
 from synapse.util.threepids import check_3pid_allowed
 
 from ._base import client_patterns, interactive_auth_handler
@@ -81,6 +82,8 @@ class EmailPasswordRequestTokenRestServlet(RestServlet):
 
         # Extract params from body
         client_secret = body["client_secret"]
+        assert_valid_client_secret(client_secret)
+
         email = body["email"]
         send_attempt = body["send_attempt"]
         next_link = body.get("next_link")  # Optional param
@@ -166,8 +169,9 @@ class PasswordResetSubmitTokenServlet(RestServlet):
             )
 
         sid = parse_string(request, "sid", required=True)
-        client_secret = parse_string(request, "client_secret", required=True)
         token = parse_string(request, "token", required=True)
+        client_secret = parse_string(request, "client_secret", required=True)
+        assert_valid_client_secret(client_secret)
 
         # Attempt to validate a 3PID session
         try:
@@ -353,6 +357,8 @@ class EmailThreepidRequestTokenRestServlet(RestServlet):
         body = parse_json_object_from_request(request)
         assert_params_in_dict(body, ["client_secret", "email", "send_attempt"])
         client_secret = body["client_secret"]
+        assert_valid_client_secret(client_secret)
+
         email = body["email"]
         send_attempt = body["send_attempt"]
         next_link = body.get("next_link")  # Optional param
@@ -413,6 +419,8 @@ class MsisdnThreepidRequestTokenRestServlet(RestServlet):
             body, ["client_secret", "country", "phone_number", "send_attempt"]
         )
         client_secret = body["client_secret"]
+        assert_valid_client_secret(client_secret)
+
         country = body["country"]
         phone_number = body["phone_number"]
         send_attempt = body["send_attempt"]
@@ -493,8 +501,9 @@ class AddThreepidEmailSubmitTokenServlet(RestServlet):
             )
 
         sid = parse_string(request, "sid", required=True)
-        client_secret = parse_string(request, "client_secret", required=True)
         token = parse_string(request, "token", required=True)
+        client_secret = parse_string(request, "client_secret", required=True)
+        assert_valid_client_secret(client_secret)
 
         # Attempt to validate a 3PID session
         try:
@@ -559,6 +568,7 @@ class AddThreepidMsisdnSubmitTokenServlet(RestServlet):
 
         body = parse_json_object_from_request(request)
         assert_params_in_dict(body, ["client_secret", "sid", "token"])
+        assert_valid_client_secret(body["client_secret"])
 
         # Proxy submit_token request to msisdn threepid delegate
         response = await self.identity_handler.proxy_msisdn_submit_token(
@@ -600,8 +610,9 @@ class ThreepidRestServlet(RestServlet):
             )
         assert_params_in_dict(threepid_creds, ["client_secret", "sid"])
 
-        client_secret = threepid_creds["client_secret"]
         sid = threepid_creds["sid"]
+        client_secret = threepid_creds["client_secret"]
+        assert_valid_client_secret(client_secret)
 
         validation_session = await self.identity_handler.validate_threepid_session(
             client_secret, sid
@@ -637,8 +648,9 @@ class ThreepidAddRestServlet(RestServlet):
         body = parse_json_object_from_request(request)
 
         assert_params_in_dict(body, ["client_secret", "sid"])
-        client_secret = body["client_secret"]
         sid = body["sid"]
+        client_secret = body["client_secret"]
+        assert_valid_client_secret(client_secret)
 
         await self.auth_handler.validate_user_via_ui_auth(
             requester, body, self.hs.get_ip_from_request(request)
@@ -676,8 +688,9 @@ class ThreepidBindRestServlet(RestServlet):
         assert_params_in_dict(body, ["id_server", "sid", "client_secret"])
         id_server = body["id_server"]
         sid = body["sid"]
-        client_secret = body["client_secret"]
         id_access_token = body.get("id_access_token")  # optional
+        client_secret = body["client_secret"]
+        assert_valid_client_secret(client_secret)
 
         requester = await self.auth.get_user_by_req(request)
         user_id = requester.user.to_string()
diff --git a/synapse/rest/client/v2_alpha/register.py b/synapse/rest/client/v2_alpha/register.py
index 1bda9aec7e..a09189b1b4 100644
--- a/synapse/rest/client/v2_alpha/register.py
+++ b/synapse/rest/client/v2_alpha/register.py
@@ -49,6 +49,7 @@ from synapse.http.servlet import (
 from synapse.push.mailer import load_jinja2_templates
 from synapse.util.msisdn import phone_number_to_msisdn
 from synapse.util.ratelimitutils import FederationRateLimiter
+from synapse.util.stringutils import assert_valid_client_secret
 from synapse.util.threepids import check_3pid_allowed
 
 from ._base import client_patterns, interactive_auth_handler
@@ -116,6 +117,8 @@ class EmailRegisterRequestTokenRestServlet(RestServlet):
 
         # Extract params from body
         client_secret = body["client_secret"]
+        assert_valid_client_secret(client_secret)
+
         email = body["email"]
         send_attempt = body["send_attempt"]
         next_link = body.get("next_link")  # Optional param
diff --git a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
index ca97330797..f357015a70 100644
--- a/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
+++ b/synapse/rest/client/v2_alpha/room_upgrade_rest_servlet.py
@@ -64,7 +64,8 @@ class RoomUpgradeRestServlet(RestServlet):
         assert_params_in_dict(content, ("new_version",))
         new_version = content["new_version"]
 
-        if new_version not in KNOWN_ROOM_VERSIONS:
+        new_version = KNOWN_ROOM_VERSIONS.get(content["new_version"])
+        if new_version is None:
             raise SynapseError(
                 400,
                 "Your homeserver does not support this room version",
diff --git a/synapse/rest/media/v1/media_storage.py b/synapse/rest/media/v1/media_storage.py
index 3b87717a5a..683a79c966 100644
--- a/synapse/rest/media/v1/media_storage.py
+++ b/synapse/rest/media/v1/media_storage.py
@@ -148,6 +148,7 @@ class MediaStorage(object):
         for provider in self.storage_providers:
             res = yield provider.fetch(path, file_info)
             if res:
+                logger.debug("Streaming %s from %s", path, provider)
                 return res
 
         return None
diff --git a/synapse/rest/media/v1/storage_provider.py b/synapse/rest/media/v1/storage_provider.py
index 37687ea7f4..858680be26 100644
--- a/synapse/rest/media/v1/storage_provider.py
+++ b/synapse/rest/media/v1/storage_provider.py
@@ -77,6 +77,9 @@ class StorageProviderWrapper(StorageProvider):
         self.store_synchronous = store_synchronous
         self.store_remote = store_remote
 
+    def __str__(self):
+        return "StorageProviderWrapper[%s]" % (self.backend,)
+
     def store_file(self, path, file_info):
         if not file_info.server_name and not self.store_local:
             return defer.succeed(None)
@@ -114,6 +117,9 @@ class FileStorageProviderBackend(StorageProvider):
         self.cache_directory = hs.config.media_store_path
         self.base_directory = config
 
+    def __str__(self):
+        return "FileStorageProviderBackend[%s]" % (self.base_directory,)
+
     def store_file(self, path, file_info):
         """See StorageProvider.store_file"""
 
diff --git a/synapse/server.pyi b/synapse/server.pyi
index 0731403047..90347ac23e 100644
--- a/synapse/server.pyi
+++ b/synapse/server.pyi
@@ -2,8 +2,8 @@ import twisted.internet
 
 import synapse.api.auth
 import synapse.config.homeserver
+import synapse.crypto.keyring
 import synapse.federation.sender
-import synapse.federation.transaction_queue
 import synapse.federation.transport.client
 import synapse.handlers
 import synapse.handlers.auth
@@ -17,6 +17,7 @@ import synapse.handlers.room_member
 import synapse.handlers.set_password
 import synapse.http.client
 import synapse.notifier
+import synapse.replication.tcp.client
 import synapse.rest.media.v1.media_repository
 import synapse.server_notices.server_notices_manager
 import synapse.server_notices.server_notices_sender
@@ -27,6 +28,9 @@ class HomeServer(object):
     @property
     def config(self) -> synapse.config.homeserver.HomeServerConfig:
         pass
+    @property
+    def hostname(self) -> str:
+        pass
     def get_auth(self) -> synapse.api.auth.Auth:
         pass
     def get_auth_handler(self) -> synapse.handlers.auth.AuthHandler:
@@ -97,3 +101,9 @@ class HomeServer(object):
         pass
     def get_reactor(self) -> twisted.internet.base.ReactorBase:
         pass
+    def get_keyring(self) -> synapse.crypto.keyring.Keyring:
+        pass
+    def get_tcp_replication(
+        self,
+    ) -> synapse.replication.tcp.client.ReplicationClientHandler:
+        pass
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index cacd0c0c2b..fdd6bef6b4 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -394,7 +394,7 @@ class StateHandler(object):
                 delta_ids=delta_ids,
             )
 
-        room_version = yield self.store.get_room_version(room_id)
+        room_version = yield self.store.get_room_version_id(room_id)
 
         result = yield self._state_resolution_handler.resolve_state_groups(
             room_id,
diff --git a/synapse/state/v1.py b/synapse/state/v1.py
index d6c34ce3b7..24b7c0faef 100644
--- a/synapse/state/v1.py
+++ b/synapse/state/v1.py
@@ -281,7 +281,7 @@ def _resolve_auth_events(events, auth_events):
         try:
             # The signatures have already been checked at this point
             event_auth.check(
-                RoomVersions.V1.identifier,
+                RoomVersions.V1,
                 event,
                 auth_events,
                 do_sig_check=False,
@@ -299,7 +299,7 @@ def _resolve_normal_events(events, auth_events):
         try:
             # The signatures have already been checked at this point
             event_auth.check(
-                RoomVersions.V1.identifier,
+                RoomVersions.V1,
                 event,
                 auth_events,
                 do_sig_check=False,
diff --git a/synapse/state/v2.py b/synapse/state/v2.py
index 6216fdd204..531018c6a5 100644
--- a/synapse/state/v2.py
+++ b/synapse/state/v2.py
@@ -26,6 +26,7 @@ import synapse.state
 from synapse import event_auth
 from synapse.api.constants import EventTypes
 from synapse.api.errors import AuthError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
 from synapse.types import StateMap
 
@@ -402,6 +403,7 @@ def _iterative_auth_checks(
         Deferred[StateMap[str]]: Returns the final updated state
     """
     resolved_state = base_state.copy()
+    room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
     for event_id in event_ids:
         event = event_map[event_id]
@@ -430,7 +432,7 @@ def _iterative_auth_checks(
 
         try:
             event_auth.check(
-                room_version,
+                room_version_obj,
                 event,
                 auth_events,
                 do_sig_check=False,
diff --git a/synapse/storage/data_stores/main/cache.py b/synapse/storage/data_stores/main/cache.py
index afa2b41c98..d4c44dcc75 100644
--- a/synapse/storage/data_stores/main/cache.py
+++ b/synapse/storage/data_stores/main/cache.py
@@ -16,7 +16,7 @@
 
 import itertools
 import logging
-from typing import Any, Iterable, Optional
+from typing import Any, Iterable, Optional, Tuple
 
 from twisted.internet import defer
 
@@ -33,6 +33,26 @@ CURRENT_STATE_CACHE_NAME = "cs_cache_fake"
 
 
 class CacheInvalidationStore(SQLBaseStore):
+    async def invalidate_cache_and_stream(self, cache_name: str, keys: Tuple[Any, ...]):
+        """Invalidates the cache and adds it to the cache stream so slaves
+        will know to invalidate their caches.
+
+        This should only be used to invalidate caches where slaves won't
+        otherwise know from other replication streams that the cache should
+        be invalidated.
+        """
+        cache_func = getattr(self, cache_name, None)
+        if not cache_func:
+            return
+
+        cache_func.invalidate(keys)
+        await self.runInteraction(
+            "invalidate_cache_and_stream",
+            self._send_invalidation_to_replication,
+            cache_func.__name__,
+            keys,
+        )
+
     def _invalidate_cache_and_stream(self, txn, cache_func, keys):
         """Invalidates the cache and adds it to the cache stream so slaves
         will know to invalidate their caches.
diff --git a/synapse/storage/data_stores/main/devices.py b/synapse/storage/data_stores/main/devices.py
index f0a7962dd0..b7617efb80 100644
--- a/synapse/storage/data_stores/main/devices.py
+++ b/synapse/storage/data_stores/main/devices.py
@@ -32,7 +32,7 @@ from synapse.logging.opentracing import (
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage._base import SQLBaseStore, db_to_json, make_in_list_sql_clause
 from synapse.storage.database import Database
-from synapse.types import get_verify_key_from_cross_signing_key
+from synapse.types import Collection, get_verify_key_from_cross_signing_key
 from synapse.util.caches.descriptors import (
     Cache,
     cached,
@@ -320,6 +320,11 @@ class DeviceWorkerStore(SQLBaseStore):
                     device_display_name = device.get("device_display_name", None)
                     if device_display_name:
                         result["device_display_name"] = device_display_name
+                    if "signatures" in device:
+                        for sig_user_id, sigs in device["signatures"].items():
+                            result["keys"].setdefault("signatures", {}).setdefault(
+                                sig_user_id, {}
+                            ).update(sigs)
                 else:
                     result["deleted"] = True
 
@@ -443,8 +448,15 @@ class DeviceWorkerStore(SQLBaseStore):
         """
         user_ids = set(user_id for user_id, _ in query_list)
         user_map = yield self.get_device_list_last_stream_id_for_remotes(list(user_ids))
-        user_ids_in_cache = set(
-            user_id for user_id, stream_id in user_map.items() if stream_id
+
+        # We go and check if any of the users need to have their device lists
+        # resynced. If they do then we remove them from the cached list.
+        users_needing_resync = yield self.get_user_ids_requiring_device_list_resync(
+            user_ids
+        )
+        user_ids_in_cache = (
+            set(user_id for user_id, stream_id in user_map.items() if stream_id)
+            - users_needing_resync
         )
         user_ids_not_in_cache = user_ids - user_ids_in_cache
 
@@ -457,7 +469,7 @@ class DeviceWorkerStore(SQLBaseStore):
                 device = yield self._get_cached_user_device(user_id, device_id)
                 results.setdefault(user_id, {})[device_id] = device
             else:
-                results[user_id] = yield self._get_cached_devices_for_user(user_id)
+                results[user_id] = yield self.get_cached_devices_for_user(user_id)
 
         set_tag("in_cache", results)
         set_tag("not_in_cache", user_ids_not_in_cache)
@@ -475,12 +487,12 @@ class DeviceWorkerStore(SQLBaseStore):
         return db_to_json(content)
 
     @cachedInlineCallbacks()
-    def _get_cached_devices_for_user(self, user_id):
+    def get_cached_devices_for_user(self, user_id):
         devices = yield self.db.simple_select_list(
             table="device_lists_remote_cache",
             keyvalues={"user_id": user_id},
             retcols=("device_id", "content"),
-            desc="_get_cached_devices_for_user",
+            desc="get_cached_devices_for_user",
         )
         return {
             device["device_id"]: db_to_json(device["content"]) for device in devices
@@ -517,6 +529,11 @@ class DeviceWorkerStore(SQLBaseStore):
                 device_display_name = device.get("device_display_name", None)
                 if device_display_name:
                     result["device_display_name"] = device_display_name
+                if "signatures" in device:
+                    for sig_user_id, sigs in device["signatures"].items():
+                        result["keys"].setdefault("signatures", {}).setdefault(
+                            sig_user_id, {}
+                        ).update(sigs)
 
                 results.append(result)
 
@@ -641,6 +658,37 @@ class DeviceWorkerStore(SQLBaseStore):
 
         return results
 
+    @defer.inlineCallbacks
+    def get_user_ids_requiring_device_list_resync(self, user_ids: Collection[str]):
+        """Given a list of remote users return the list of users that we
+        should resync the device lists for.
+
+        Returns:
+            Deferred[Set[str]]
+        """
+
+        rows = yield self.db.simple_select_many_batch(
+            table="device_lists_remote_resync",
+            column="user_id",
+            iterable=user_ids,
+            retcols=("user_id",),
+            desc="get_user_ids_requiring_device_list_resync",
+        )
+
+        return {row["user_id"] for row in rows}
+
+    def mark_remote_user_device_cache_as_stale(self, user_id: str):
+        """Records that the server has reason to believe the cache of the devices
+        for the remote users is out of date.
+        """
+        return self.db.simple_upsert(
+            table="device_lists_remote_resync",
+            keyvalues={"user_id": user_id},
+            values={},
+            insertion_values={"added_ts": self._clock.time_msec()},
+            desc="make_remote_user_device_cache_as_stale",
+        )
+
 
 class DeviceBackgroundUpdateStore(SQLBaseStore):
     def __init__(self, database: Database, db_conn, hs):
@@ -887,7 +935,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             )
 
         txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
-        txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
+        txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
         txn.call_after(
             self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
         )
@@ -942,7 +990,7 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             ],
         )
 
-        txn.call_after(self._get_cached_devices_for_user.invalidate, (user_id,))
+        txn.call_after(self.get_cached_devices_for_user.invalidate, (user_id,))
         txn.call_after(self._get_cached_user_device.invalidate_many, (user_id,))
         txn.call_after(
             self.get_device_list_last_stream_id_for_remote.invalidate, (user_id,)
@@ -958,6 +1006,13 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
             lock=False,
         )
 
+        # If we're replacing the remote user's device list cache presumably
+        # we've done a full resync, so we remove the entry that says we need
+        # to resync
+        self.db.simple_delete_txn(
+            txn, table="device_lists_remote_resync", keyvalues={"user_id": user_id},
+        )
+
     @defer.inlineCallbacks
     def add_device_change_to_streams(self, user_id, device_ids, hosts):
         """Persist that a user's devices have been updated, and which hosts
diff --git a/synapse/storage/data_stores/main/events.py b/synapse/storage/data_stores/main/events.py
index 596daf8909..c9d0d68c3a 100644
--- a/synapse/storage/data_stores/main/events.py
+++ b/synapse/storage/data_stores/main/events.py
@@ -32,6 +32,7 @@ from twisted.internet import defer
 import synapse.metrics
 from synapse.api.constants import EventContentFields, EventTypes
 from synapse.api.errors import SynapseError
+from synapse.api.room_versions import RoomVersions
 from synapse.events import EventBase  # noqa: F401
 from synapse.events.snapshot import EventContext  # noqa: F401
 from synapse.events.utils import prune_event_dict
@@ -468,84 +469,93 @@ class EventsStore(
             to_delete = delta_state.to_delete
             to_insert = delta_state.to_insert
 
-            # First we add entries to the current_state_delta_stream. We
-            # do this before updating the current_state_events table so
-            # that we can use it to calculate the `prev_event_id`. (This
-            # allows us to not have to pull out the existing state
-            # unnecessarily).
-            #
-            # The stream_id for the update is chosen to be the minimum of the stream_ids
-            # for the batch of the events that we are persisting; that means we do not
-            # end up in a situation where workers see events before the
-            # current_state_delta updates.
-            #
-            sql = """
-                INSERT INTO current_state_delta_stream
-                (stream_id, room_id, type, state_key, event_id, prev_event_id)
-                SELECT ?, ?, ?, ?, ?, (
-                    SELECT event_id FROM current_state_events
-                    WHERE room_id = ? AND type = ? AND state_key = ?
+            if delta_state.no_longer_in_room:
+                # Server is no longer in the room so we delete the room from
+                # current_state_events, being careful we've already updated the
+                # rooms.room_version column (which gets populated in a
+                # background task).
+                self._upsert_room_version_txn(txn, room_id)
+
+                # Before deleting we populate the current_state_delta_stream
+                # so that async background tasks get told what happened.
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                        (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, room_id, type, state_key, null, event_id
+                        FROM current_state_events
+                        WHERE room_id = ?
+                """
+                txn.execute(sql, (stream_id, room_id))
+
+                self.db.simple_delete_txn(
+                    txn, table="current_state_events", keyvalues={"room_id": room_id},
                 )
-            """
-            txn.executemany(
-                sql,
-                (
-                    (
-                        stream_id,
-                        room_id,
-                        etype,
-                        state_key,
-                        None,
-                        room_id,
-                        etype,
-                        state_key,
+            else:
+                # We're still in the room, so we update the current state as normal.
+
+                # First we add entries to the current_state_delta_stream. We
+                # do this before updating the current_state_events table so
+                # that we can use it to calculate the `prev_event_id`. (This
+                # allows us to not have to pull out the existing state
+                # unnecessarily).
+                #
+                # The stream_id for the update is chosen to be the minimum of the stream_ids
+                # for the batch of the events that we are persisting; that means we do not
+                # end up in a situation where workers see events before the
+                # current_state_delta updates.
+                #
+                sql = """
+                    INSERT INTO current_state_delta_stream
+                    (stream_id, room_id, type, state_key, event_id, prev_event_id)
+                    SELECT ?, ?, ?, ?, ?, (
+                        SELECT event_id FROM current_state_events
+                        WHERE room_id = ? AND type = ? AND state_key = ?
                     )
-                    for etype, state_key in to_delete
-                    # We sanity check that we're deleting rather than updating
-                    if (etype, state_key) not in to_insert
-                ),
-            )
-            txn.executemany(
-                sql,
-                (
+                """
+                txn.executemany(
+                    sql,
                     (
-                        stream_id,
-                        room_id,
-                        etype,
-                        state_key,
-                        ev_id,
-                        room_id,
-                        etype,
-                        state_key,
-                    )
-                    for (etype, state_key), ev_id in iteritems(to_insert)
-                ),
-            )
+                        (
+                            stream_id,
+                            room_id,
+                            etype,
+                            state_key,
+                            to_insert.get((etype, state_key)),
+                            room_id,
+                            etype,
+                            state_key,
+                        )
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
+                )
+                # Now we actually update the current_state_events table
 
-            # Now we actually update the current_state_events table
+                txn.executemany(
+                    "DELETE FROM current_state_events"
+                    " WHERE room_id = ? AND type = ? AND state_key = ?",
+                    (
+                        (room_id, etype, state_key)
+                        for etype, state_key in itertools.chain(to_delete, to_insert)
+                    ),
+                )
 
-            txn.executemany(
-                "DELETE FROM current_state_events"
-                " WHERE room_id = ? AND type = ? AND state_key = ?",
-                (
-                    (room_id, etype, state_key)
-                    for etype, state_key in itertools.chain(to_delete, to_insert)
-                ),
-            )
+                # We include the membership in the current state table, hence we do
+                # a lookup when we insert. This assumes that all events have already
+                # been inserted into room_memberships.
+                txn.executemany(
+                    """INSERT INTO current_state_events
+                        (room_id, type, state_key, event_id, membership)
+                    VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
+                    """,
+                    [
+                        (room_id, key[0], key[1], ev_id, ev_id)
+                        for key, ev_id in iteritems(to_insert)
+                    ],
+                )
 
-            # We include the membership in the current state table, hence we do
-            # a lookup when we insert. This assumes that all events have already
-            # been inserted into room_memberships.
-            txn.executemany(
-                """INSERT INTO current_state_events
-                    (room_id, type, state_key, event_id, membership)
-                VALUES (?, ?, ?, ?, (SELECT membership FROM room_memberships WHERE event_id = ?))
-                """,
-                [
-                    (room_id, key[0], key[1], ev_id, ev_id)
-                    for key, ev_id in iteritems(to_insert)
-                ],
-            )
+            # We now update `local_current_membership`. We do this regardless
+            # of whether we're still in the room or not to handle the case where
+            # e.g. we just got banned (where we need to record that fact here).
 
             # Note: Do we really want to delete rows here (that we do not
             # subsequently reinsert below)? While technically correct it means
@@ -601,6 +611,35 @@ class EventsStore(
 
             self._invalidate_state_caches_and_stream(txn, room_id, members_changed)
 
+    def _upsert_room_version_txn(self, txn: LoggingTransaction, room_id: str):
+        """Update the room version in the database based off current state
+        events.
+
+        This is used when we're about to delete current state and we want to
+        ensure that the `rooms.room_version` column is up to date.
+        """
+
+        sql = """
+            SELECT json FROM event_json
+            INNER JOIN current_state_events USING (room_id, event_id)
+            WHERE room_id = ? AND type = ? AND state_key = ?
+        """
+        txn.execute(sql, (room_id, EventTypes.Create, ""))
+        row = txn.fetchone()
+        if row:
+            event_json = json.loads(row[0])
+            content = event_json.get("content", {})
+            creator = content.get("creator")
+            room_version_id = content.get("room_version", RoomVersions.V1.identifier)
+
+            self.db.simple_upsert_txn(
+                txn,
+                table="rooms",
+                keyvalues={"room_id": room_id},
+                values={"room_version": room_version_id},
+                insertion_values={"is_public": False, "creator": creator},
+            )
+
     def _update_forward_extremities_txn(
         self, txn, new_forward_extremities, max_stream_order
     ):
@@ -951,7 +990,7 @@ class EventsStore(
             elif event.type == EventTypes.Message:
                 # Insert into the event_search table.
                 self._store_room_message_txn(txn, event)
-            elif event.type == EventTypes.Redaction:
+            elif event.type == EventTypes.Redaction and event.redacts is not None:
                 # Insert into the redactions table.
                 self._store_redaction(txn, event)
             elif event.type == EventTypes.Retention:
diff --git a/synapse/storage/data_stores/main/events_worker.py b/synapse/storage/data_stores/main/events_worker.py
index 3b93e0597a..7251e819f5 100644
--- a/synapse/storage/data_stores/main/events_worker.py
+++ b/synapse/storage/data_stores/main/events_worker.py
@@ -287,7 +287,7 @@ class EventsWorkerStore(SQLBaseStore):
             # we have to recheck auth now.
 
             if not allow_rejected and entry.event.type == EventTypes.Redaction:
-                if not hasattr(entry.event, "redacts"):
+                if entry.event.redacts is None:
                     # A redacted redaction doesn't have a `redacts` key, in
                     # which case lets just withhold the event.
                     #
diff --git a/synapse/storage/data_stores/main/registration.py b/synapse/storage/data_stores/main/registration.py
index cb4b2b39a0..49306642ed 100644
--- a/synapse/storage/data_stores/main/registration.py
+++ b/synapse/storage/data_stores/main/registration.py
@@ -291,7 +291,7 @@ class RegistrationWorkerStore(SQLBaseStore):
             desc="is_server_admin",
         )
 
-        return res if res else False
+        return bool(res) if res else False
 
     def set_server_admin(self, user, admin):
         """Sets whether a user is an admin of this homeserver.
diff --git a/synapse/storage/data_stores/main/room.py b/synapse/storage/data_stores/main/room.py
index d968803ad2..9a17e336ba 100644
--- a/synapse/storage/data_stores/main/room.py
+++ b/synapse/storage/data_stores/main/room.py
@@ -29,9 +29,10 @@ from twisted.internet import defer
 
 from synapse.api.constants import EventTypes
 from synapse.api.errors import StoreError
+from synapse.api.room_versions import RoomVersion, RoomVersions
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.data_stores.main.search import SearchStore
-from synapse.storage.database import Database
+from synapse.storage.database import Database, LoggingTransaction
 from synapse.types import ThirdPartyInstanceID
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
 
@@ -734,6 +735,7 @@ class RoomWorkerStore(SQLBaseStore):
 
 class RoomBackgroundUpdateStore(SQLBaseStore):
     REMOVE_TOMESTONED_ROOMS_BG_UPDATE = "remove_tombstoned_rooms_from_directory"
+    ADD_ROOMS_ROOM_VERSION_COLUMN = "add_rooms_room_version_column"
 
     def __init__(self, database: Database, db_conn, hs):
         super(RoomBackgroundUpdateStore, self).__init__(database, db_conn, hs)
@@ -749,6 +751,11 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
             self._remove_tombstoned_rooms_from_directory,
         )
 
+        self.db.updates.register_background_update_handler(
+            self.ADD_ROOMS_ROOM_VERSION_COLUMN,
+            self._background_add_rooms_room_version_column,
+        )
+
     @defer.inlineCallbacks
     def _background_insert_retention(self, progress, batch_size):
         """Retrieves a list of all rooms within a range and inserts an entry for each of
@@ -817,6 +824,73 @@ class RoomBackgroundUpdateStore(SQLBaseStore):
 
         defer.returnValue(batch_size)
 
+    async def _background_add_rooms_room_version_column(
+        self, progress: dict, batch_size: int
+    ):
+        """Background update to go and add room version inforamtion to `rooms`
+        table from `current_state_events` table.
+        """
+
+        last_room_id = progress.get("room_id", "")
+
+        def _background_add_rooms_room_version_column_txn(txn: LoggingTransaction):
+            sql = """
+                SELECT room_id, json FROM current_state_events
+                INNER JOIN event_json USING (room_id, event_id)
+                WHERE room_id > ? AND type = 'm.room.create' AND state_key = ''
+                ORDER BY room_id
+                LIMIT ?
+            """
+
+            txn.execute(sql, (last_room_id, batch_size))
+
+            updates = []
+            for room_id, event_json in txn:
+                event_dict = json.loads(event_json)
+                room_version_id = event_dict.get("content", {}).get(
+                    "room_version", RoomVersions.V1.identifier
+                )
+
+                creator = event_dict.get("content").get("creator")
+
+                updates.append((room_id, creator, room_version_id))
+
+            if not updates:
+                return True
+
+            new_last_room_id = ""
+            for room_id, creator, room_version_id in updates:
+                # We upsert here just in case we don't already have a row,
+                # mainly for paranoia as much badness would happen if we don't
+                # insert the row and then try and get the room version for the
+                # room.
+                self.db.simple_upsert_txn(
+                    txn,
+                    table="rooms",
+                    keyvalues={"room_id": room_id},
+                    values={"room_version": room_version_id},
+                    insertion_values={"is_public": False, "creator": creator},
+                )
+                new_last_room_id = room_id
+
+            self.db.updates._background_update_progress_txn(
+                txn, self.ADD_ROOMS_ROOM_VERSION_COLUMN, {"room_id": new_last_room_id}
+            )
+
+            return False
+
+        end = await self.db.runInteraction(
+            "_background_add_rooms_room_version_column",
+            _background_add_rooms_room_version_column_txn,
+        )
+
+        if end:
+            await self.db.updates._end_background_update(
+                self.ADD_ROOMS_ROOM_VERSION_COLUMN
+            )
+
+        return batch_size
+
     async def _remove_tombstoned_rooms_from_directory(
         self, progress, batch_size
     ) -> int:
@@ -881,14 +955,21 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
         self.config = hs.config
 
     @defer.inlineCallbacks
-    def store_room(self, room_id, room_creator_user_id, is_public):
+    def store_room(
+        self,
+        room_id: str,
+        room_creator_user_id: str,
+        is_public: bool,
+        room_version: RoomVersion,
+    ):
         """Stores a room.
 
         Args:
-            room_id (str): The desired room ID, can be None.
-            room_creator_user_id (str): The user ID of the room creator.
-            is_public (bool): True to indicate that this room should appear in
-            public room lists.
+            room_id: The desired room ID, can be None.
+            room_creator_user_id: The user ID of the room creator.
+            is_public: True to indicate that this room should appear in
+                public room lists.
+            room_version: The version of the room
         Raises:
             StoreError if the room could not be stored.
         """
@@ -902,6 +983,7 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore, SearchStore):
                         "room_id": room_id,
                         "creator": room_creator_user_id,
                         "is_public": is_public,
+                        "room_version": room_version.identifier,
                     },
                 )
                 if is_public:
diff --git a/synapse/storage/data_stores/main/roommember.py b/synapse/storage/data_stores/main/roommember.py
index 9acef7c950..042289f0e0 100644
--- a/synapse/storage/data_stores/main/roommember.py
+++ b/synapse/storage/data_stores/main/roommember.py
@@ -15,7 +15,7 @@
 # limitations under the License.
 
 import logging
-from typing import Iterable, List
+from typing import Iterable, List, Set
 
 from six import iteritems, itervalues
 
@@ -40,7 +40,7 @@ from synapse.storage.roommember import (
     ProfileInfo,
     RoomsForUser,
 )
-from synapse.types import get_domain_from_id
+from synapse.types import Collection, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches import intern_string
 from synapse.util.caches.descriptors import cached, cachedInlineCallbacks, cachedList
@@ -439,6 +439,39 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
         return results
 
+    async def get_users_server_still_shares_room_with(
+        self, user_ids: Collection[str]
+    ) -> Set[str]:
+        """Given a list of users return the set that the server still share a
+        room with.
+        """
+
+        if not user_ids:
+            return set()
+
+        def _get_users_server_still_shares_room_with_txn(txn):
+            sql = """
+                SELECT state_key FROM current_state_events
+                WHERE
+                    type = 'm.room.member'
+                    AND membership = 'join'
+                    AND %s
+                GROUP BY state_key
+            """
+
+            clause, args = make_in_list_sql_clause(
+                self.database_engine, "state_key", user_ids
+            )
+
+            txn.execute(sql % (clause,), args)
+
+            return set(row[0] for row in txn)
+
+        return await self.db.runInteraction(
+            "get_users_server_still_shares_room_with",
+            _get_users_server_still_shares_room_with_txn,
+        )
+
     @defer.inlineCallbacks
     def get_rooms_for_user(self, user_id, on_invalidate=None):
         """Returns a set of room_ids the user is currently joined to.
diff --git a/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
new file mode 100644
index 0000000000..a133d87a19
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/delete_old_current_state_events.sql
@@ -0,0 +1,19 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Add background update to go and delete current state events for rooms the
+-- server is no longer in.
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('delete_old_current_state_events', '{}');
diff --git a/synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql b/synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql
new file mode 100644
index 0000000000..c3b6de2099
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/device_list_remote_cache_stale.sql
@@ -0,0 +1,25 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- Records whether the server thinks that the remote users cached device lists
+-- may be out of date (e.g. if we have received a to device message from a
+-- device we don't know about).
+CREATE TABLE IF NOT EXISTS device_lists_remote_resync (
+    user_id TEXT NOT NULL,
+    added_ts BIGINT NOT NULL
+);
+
+CREATE UNIQUE INDEX device_lists_remote_resync_idx ON device_lists_remote_resync (user_id);
+CREATE INDEX device_lists_remote_resync_ts_idx ON device_lists_remote_resync (added_ts);
diff --git a/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql
new file mode 100644
index 0000000000..352a66f5b0
--- /dev/null
+++ b/synapse/storage/data_stores/main/schema/delta/57/rooms_version_column.sql
@@ -0,0 +1,24 @@
+/* Copyright 2020 The Matrix.org Foundation C.I.C
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+-- We want to start storing the room version independently of
+-- `current_state_events` so that we can delete stale entries from it without
+-- losing the information.
+ALTER TABLE rooms ADD COLUMN room_version TEXT;
+
+
+INSERT into background_updates (update_name, progress_json)
+    VALUES ('add_rooms_room_version_column', '{}');
diff --git a/synapse/storage/data_stores/main/state.py b/synapse/storage/data_stores/main/state.py
index 33bebd1c48..3d34103e67 100644
--- a/synapse/storage/data_stores/main/state.py
+++ b/synapse/storage/data_stores/main/state.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -21,12 +22,14 @@ from six import iteritems
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
-from synapse.api.errors import NotFoundError
+from synapse.api.constants import EventTypes, Membership
+from synapse.api.errors import NotFoundError, UnsupportedRoomVersionError
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.storage._base import SQLBaseStore
 from synapse.storage.data_stores.main.events_worker import EventsWorkerStore
+from synapse.storage.data_stores.main.roommember import RoomMemberWorkerStore
 from synapse.storage.database import Database
 from synapse.storage.state import StateFilter
 from synapse.util.caches import intern_string
@@ -60,24 +63,55 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
     def __init__(self, database: Database, db_conn, hs):
         super(StateGroupWorkerStore, self).__init__(database, db_conn, hs)
 
-    @defer.inlineCallbacks
-    def get_room_version(self, room_id):
+    async def get_room_version(self, room_id: str) -> RoomVersion:
         """Get the room_version of a given room
 
-        Args:
-            room_id (str)
+        Raises:
+            NotFoundError: if the room is unknown
 
-        Returns:
-            Deferred[str]
+            UnsupportedRoomVersionError: if the room uses an unknown room version.
+                Typically this happens if support for the room's version has been
+                removed from Synapse.
+        """
+        room_version_id = await self.get_room_version_id(room_id)
+        v = KNOWN_ROOM_VERSIONS.get(room_version_id)
+
+        if not v:
+            raise UnsupportedRoomVersionError(
+                "Room %s uses a room version %s which is no longer supported"
+                % (room_id, room_version_id)
+            )
+
+        return v
+
+    @cached(max_entries=10000)
+    async def get_room_version_id(self, room_id: str) -> str:
+        """Get the room_version of a given room
 
         Raises:
-            NotFoundError if the room is unknown
+            NotFoundError: if the room is unknown
         """
-        # for now we do this by looking at the create event. We may want to cache this
-        # more intelligently in future.
+
+        # First we try looking up room version from the database, but for old
+        # rooms we might not have added the room version to it yet so we fall
+        # back to previous behaviour and look in current state events.
+
+        # We really should have an entry in the rooms table for every room we
+        # care about, but let's be a bit paranoid (at least while the background
+        # update is happening) to avoid breaking existing rooms.
+        version = await self.db.simple_select_one_onecol(
+            table="rooms",
+            keyvalues={"room_id": room_id},
+            retcol="room_version",
+            desc="get_room_version",
+            allow_none=True,
+        )
+
+        if version is not None:
+            return version
 
         # Retrieve the room's create event
-        create_event = yield self.get_create_event_for_room(room_id)
+        create_event = await self.get_create_event_for_room(room_id)
         return create_event.content.get("room_version", "1")
 
     @defer.inlineCallbacks
@@ -290,14 +324,17 @@ class StateGroupWorkerStore(EventsWorkerStore, SQLBaseStore):
         return set(row["state_group"] for row in rows)
 
 
-class MainStateBackgroundUpdateStore(SQLBaseStore):
+class MainStateBackgroundUpdateStore(RoomMemberWorkerStore):
 
     CURRENT_STATE_INDEX_UPDATE_NAME = "current_state_members_idx"
     EVENT_STATE_GROUP_INDEX_UPDATE_NAME = "event_to_state_groups_sg_index"
+    DELETE_CURRENT_STATE_UPDATE_NAME = "delete_old_current_state_events"
 
     def __init__(self, database: Database, db_conn, hs):
         super(MainStateBackgroundUpdateStore, self).__init__(database, db_conn, hs)
 
+        self.server_name = hs.hostname
+
         self.db.updates.register_background_index_update(
             self.CURRENT_STATE_INDEX_UPDATE_NAME,
             index_name="current_state_events_member_index",
@@ -311,6 +348,108 @@ class MainStateBackgroundUpdateStore(SQLBaseStore):
             table="event_to_state_groups",
             columns=["state_group"],
         )
+        self.db.updates.register_background_update_handler(
+            self.DELETE_CURRENT_STATE_UPDATE_NAME, self._background_remove_left_rooms,
+        )
+
+    async def _background_remove_left_rooms(self, progress, batch_size):
+        """Background update to delete rows from `current_state_events` and
+        `event_forward_extremities` tables of rooms that the server is no
+        longer joined to.
+        """
+
+        last_room_id = progress.get("last_room_id", "")
+
+        def _background_remove_left_rooms_txn(txn):
+            sql = """
+                SELECT DISTINCT room_id FROM current_state_events
+                WHERE room_id > ? ORDER BY room_id LIMIT ?
+            """
+
+            txn.execute(sql, (last_room_id, batch_size))
+            room_ids = list(row[0] for row in txn)
+            if not room_ids:
+                return True, set()
+
+            sql = """
+                SELECT room_id
+                FROM current_state_events
+                WHERE
+                    room_id > ? AND room_id <= ?
+                    AND type = 'm.room.member'
+                    AND membership = 'join'
+                    AND state_key LIKE ?
+                GROUP BY room_id
+            """
+
+            txn.execute(sql, (last_room_id, room_ids[-1], "%:" + self.server_name))
+
+            joined_room_ids = set(row[0] for row in txn)
+
+            left_rooms = set(room_ids) - joined_room_ids
+
+            logger.info("Deleting current state left rooms: %r", left_rooms)
+
+            # First we get all users that we still think were joined to the
+            # room. This is so that we can mark those device lists as
+            # potentially stale, since there may have been a period where the
+            # server didn't share a room with the remote user and therefore may
+            # have missed any device updates.
+            rows = self.db.simple_select_many_txn(
+                txn,
+                table="current_state_events",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={"type": EventTypes.Member, "membership": Membership.JOIN},
+                retcols=("state_key",),
+            )
+
+            potentially_left_users = set(row["state_key"] for row in rows)
+
+            # Now lets actually delete the rooms from the DB.
+            self.db.simple_delete_many_txn(
+                txn,
+                table="current_state_events",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={},
+            )
+
+            self.db.simple_delete_many_txn(
+                txn,
+                table="event_forward_extremities",
+                column="room_id",
+                iterable=left_rooms,
+                keyvalues={},
+            )
+
+            self.db.updates._background_update_progress_txn(
+                txn,
+                self.DELETE_CURRENT_STATE_UPDATE_NAME,
+                {"last_room_id": room_ids[-1]},
+            )
+
+            return False, potentially_left_users
+
+        finished, potentially_left_users = await self.db.runInteraction(
+            "_background_remove_left_rooms", _background_remove_left_rooms_txn
+        )
+
+        if finished:
+            await self.db.updates._end_background_update(
+                self.DELETE_CURRENT_STATE_UPDATE_NAME
+            )
+
+        # Now go and check if we still share a room with the remote users in
+        # the deleted rooms. If not mark their device lists as stale.
+        joined_users = await self.get_users_server_still_shares_room_with(
+            potentially_left_users
+        )
+
+        for user_id in potentially_left_users - joined_users:
+            await self.mark_remote_user_device_list_as_unsubscribed(user_id)
+
+        return batch_size
 
 
 class StateStore(StateGroupWorkerStore, MainStateBackgroundUpdateStore):
diff --git a/synapse/storage/data_stores/main/stats.py b/synapse/storage/data_stores/main/stats.py
index 7bc186e9a1..7af1495e47 100644
--- a/synapse/storage/data_stores/main/stats.py
+++ b/synapse/storage/data_stores/main/stats.py
@@ -744,7 +744,7 @@ class StatsStore(StateDeltasStore):
                     EventTypes.Create,
                     EventTypes.JoinRules,
                     EventTypes.RoomHistoryVisibility,
-                    EventTypes.Encryption,
+                    EventTypes.RoomEncryption,
                     EventTypes.Name,
                     EventTypes.Topic,
                     EventTypes.RoomAvatar,
@@ -816,7 +816,7 @@ class StatsStore(StateDeltasStore):
                 room_state["history_visibility"] = event.content.get(
                     "history_visibility"
                 )
-            elif event.type == EventTypes.Encryption:
+            elif event.type == EventTypes.RoomEncryption:
                 room_state["encryption"] = event.content.get("algorithm")
             elif event.type == EventTypes.Name:
                 room_state["name"] = event.content.get("name")
diff --git a/synapse/storage/engines/postgres.py b/synapse/storage/engines/postgres.py
index c84cb452b0..a077345960 100644
--- a/synapse/storage/engines/postgres.py
+++ b/synapse/storage/engines/postgres.py
@@ -13,8 +13,12 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import logging
+
 from ._base import IncorrectDatabaseSetup
 
+logger = logging.getLogger(__name__)
+
 
 class PostgresEngine(object):
     single_threaded = False
@@ -52,6 +56,44 @@ class PostgresEngine(object):
                     "See docs/postgres.rst for more information." % (rows[0][0],)
                 )
 
+            txn.execute(
+                "SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()"
+            )
+            collation, ctype = txn.fetchone()
+            if collation != "C":
+                logger.warning(
+                    "Database has incorrect collation of %r. Should be 'C'", collation
+                )
+
+            if ctype != "C":
+                logger.warning(
+                    "Database has incorrect ctype of %r. Should be 'C'", ctype
+                )
+
+    def check_new_database(self, txn):
+        """Gets called when setting up a brand new database. This allows us to
+        apply stricter checks on new databases versus existing database.
+        """
+
+        txn.execute(
+            "SELECT datcollate, datctype FROM pg_database WHERE datname = current_database()"
+        )
+        collation, ctype = txn.fetchone()
+
+        errors = []
+
+        if collation != "C":
+            errors.append("    - 'COLLATE' is set to %r. Should be 'C'" % (collation,))
+
+        if ctype != "C":
+            errors.append("    - 'CTYPE' is set to %r. Should be 'C'" % (collation,))
+
+        if errors:
+            raise IncorrectDatabaseSetup(
+                "Database is incorrectly configured:\n\n%s\n\n"
+                "See docs/postgres.md for more information." % ("\n".join(errors))
+            )
+
     def convert_param_style(self, sql):
         return sql.replace("?", "%s")
 
diff --git a/synapse/storage/engines/sqlite.py b/synapse/storage/engines/sqlite.py
index cbf52f5191..641e490697 100644
--- a/synapse/storage/engines/sqlite.py
+++ b/synapse/storage/engines/sqlite.py
@@ -59,6 +59,11 @@ class Sqlite3Engine(object):
             if version < (3, 11, 0):
                 raise RuntimeError("Synapse requires sqlite 3.11 or above.")
 
+    def check_new_database(self, txn):
+        """Gets called when setting up a brand new database. This allows us to
+        apply stricter checks on new databases versus existing database.
+        """
+
     def convert_param_style(self, sql):
         return sql
 
diff --git a/synapse/storage/persist_events.py b/synapse/storage/persist_events.py
index 368c457321..af3fd67ab9 100644
--- a/synapse/storage/persist_events.py
+++ b/synapse/storage/persist_events.py
@@ -15,9 +15,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+import itertools
 import logging
 from collections import deque, namedtuple
-from typing import Iterable, List, Optional, Tuple
+from typing import Iterable, List, Optional, Set, Tuple
 
 from six import iteritems
 from six.moves import range
@@ -27,7 +28,7 @@ from prometheus_client import Counter, Histogram
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes
+from synapse.api.constants import EventTypes, Membership
 from synapse.events import FrozenEvent
 from synapse.events.snapshot import EventContext
 from synapse.logging.context import PreserveLoggingContext, make_deferred_yieldable
@@ -72,17 +73,20 @@ stale_forward_extremities_counter = Histogram(
 )
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True)
 class DeltaState:
     """Deltas to use to update the `current_state_events` table.
 
     Attributes:
         to_delete: List of type/state_keys to delete from current state
         to_insert: Map of state to upsert into current state
+        no_longer_in_room: The server is not longer in the room, so the room
+            should e.g. be removed from `current_state_events` table.
     """
 
     to_delete = attr.ib(type=List[Tuple[str, str]])
     to_insert = attr.ib(type=StateMap[str])
+    no_longer_in_room = attr.ib(type=bool, default=False)
 
 
 class _EventPeristenceQueue(object):
@@ -314,6 +318,11 @@ class EventsPersistenceStorage(object):
             # room
             state_delta_for_room = {}
 
+            # Set of remote users which were in rooms the server has left. We
+            # should check if we still share any rooms and if not we mark their
+            # device lists as stale.
+            potentially_left_users = set()  # type: Set[str]
+
             if not backfilled:
                 with Measure(self._clock, "_calculate_state_and_extrem"):
                     # Work out the new "current state" for each room.
@@ -396,11 +405,12 @@ class EventsPersistenceStorage(object):
                         # If either are not None then there has been a change,
                         # and we need to work out the delta (or use that
                         # given)
+                        delta = None
                         if delta_ids is not None:
                             # If there is a delta we know that we've
                             # only added or replaced state, never
                             # removed keys entirely.
-                            state_delta_for_room[room_id] = DeltaState([], delta_ids)
+                            delta = DeltaState([], delta_ids)
                         elif current_state is not None:
                             with Measure(
                                 self._clock, "persist_events.calculate_state_delta"
@@ -408,6 +418,26 @@ class EventsPersistenceStorage(object):
                                 delta = await self._calculate_state_delta(
                                     room_id, current_state
                                 )
+
+                        if delta:
+                            # If we have a change of state then lets check
+                            # whether we're actually still a member of the room,
+                            # or if our last user left. If we're no longer in
+                            # the room then we delete the current state and
+                            # extremities.
+                            is_still_joined = await self._is_server_still_joined(
+                                room_id,
+                                ev_ctx_rm,
+                                delta,
+                                current_state,
+                                potentially_left_users,
+                            )
+                            if not is_still_joined:
+                                logger.info("Server no longer in room %s", room_id)
+                                latest_event_ids = []
+                                current_state = {}
+                                delta.no_longer_in_room = True
+
                             state_delta_for_room[room_id] = delta
 
                         # If we have the current_state then lets prefill
@@ -423,6 +453,8 @@ class EventsPersistenceStorage(object):
                 backfilled=backfilled,
             )
 
+            await self._handle_potentially_left_users(potentially_left_users)
+
     async def _calculate_new_extremities(
         self,
         room_id: str,
@@ -629,7 +661,7 @@ class EventsPersistenceStorage(object):
                 break
 
         if not room_version:
-            room_version = await self.main_store.get_room_version(room_id)
+            room_version = await self.main_store.get_room_version_id(room_id)
 
         logger.debug("calling resolve_state_groups from preserve_events")
         res = await self._state_resolution_handler.resolve_state_groups(
@@ -660,3 +692,97 @@ class EventsPersistenceStorage(object):
         }
 
         return DeltaState(to_delete=to_delete, to_insert=to_insert)
+
+    async def _is_server_still_joined(
+        self,
+        room_id: str,
+        ev_ctx_rm: List[Tuple[FrozenEvent, EventContext]],
+        delta: DeltaState,
+        current_state: Optional[StateMap[str]],
+        potentially_left_users: Set[str],
+    ) -> bool:
+        """Check if the server will still be joined after the given events have
+        been persised.
+
+        Args:
+            room_id
+            ev_ctx_rm
+            delta: The delta of current state between what is in the database
+                and what the new current state will be.
+            current_state: The new current state if it already been calculated,
+                otherwise None.
+            potentially_left_users: If the server has left the room, then joined
+                remote users will be added to this set to indicate that the
+                server may no longer be sharing a room with them.
+        """
+
+        if not any(
+            self.is_mine_id(state_key)
+            for typ, state_key in itertools.chain(delta.to_delete, delta.to_insert)
+            if typ == EventTypes.Member
+        ):
+            # There have been no changes to membership of our users, so nothing
+            # has changed and we assume we're still in the room.
+            return True
+
+        # Check if any of the given events are a local join that appear in the
+        # current state
+        for (typ, state_key), event_id in delta.to_insert.items():
+            if typ != EventTypes.Member or not self.is_mine_id(state_key):
+                continue
+
+            for event, _ in ev_ctx_rm:
+                if event_id == event.event_id:
+                    if event.membership == Membership.JOIN:
+                        return True
+
+        # There's been a change of membership but we don't have a local join
+        # event in the new events, so we need to check the full state.
+        if current_state is None:
+            current_state = await self.main_store.get_current_state_ids(room_id)
+            current_state = dict(current_state)
+            for key in delta.to_delete:
+                current_state.pop(key, None)
+
+            current_state.update(delta.to_insert)
+
+        event_ids = [
+            event_id
+            for (typ, state_key,), event_id in current_state.items()
+            if typ == EventTypes.Member and self.is_mine_id(state_key)
+        ]
+
+        rows = await self.main_store.get_membership_from_event_ids(event_ids)
+        is_still_joined = any(row["membership"] == Membership.JOIN for row in rows)
+        if is_still_joined:
+            return True
+
+        # The server will leave the room, so we go and find out which remote
+        # users will still be joined when we leave.
+        remote_event_ids = [
+            event_id
+            for (typ, state_key,), event_id in current_state.items()
+            if typ == EventTypes.Member and not self.is_mine_id(state_key)
+        ]
+        rows = await self.main_store.get_membership_from_event_ids(remote_event_ids)
+        potentially_left_users.update(
+            row["user_id"] for row in rows if row["membership"] == Membership.JOIN
+        )
+
+        return False
+
+    async def _handle_potentially_left_users(self, user_ids: Set[str]):
+        """Given a set of remote users check if the server still shares a room with
+        them. If not then mark those users' device cache as stale.
+        """
+
+        if not user_ids:
+            return
+
+        joined_users = await self.main_store.get_users_server_still_shares_room_with(
+            user_ids
+        )
+        left_users = user_ids - joined_users
+
+        for user_id in left_users:
+            await self.main_store.mark_remote_user_device_list_as_unsubscribed(user_id)
diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py
index e86984cd50..c285ef52a0 100644
--- a/synapse/storage/prepare_database.py
+++ b/synapse/storage/prepare_database.py
@@ -136,6 +136,11 @@ def _setup_new_database(cur, database_engine, data_stores):
         data_stores (list[str]): The names of the data stores to instantiate
             on the given database.
     """
+
+    # We're about to set up a brand new database so we check that its
+    # configured to our liking.
+    database_engine.check_new_database(cur)
+
     current_dir = os.path.join(dir_path, "schema", "full_schemas")
     directory_entries = os.listdir(current_dir)
 
diff --git a/synapse/types.py b/synapse/types.py
index 65e4d8c181..f3cd465735 100644
--- a/synapse/types.py
+++ b/synapse/types.py
@@ -17,7 +17,7 @@ import re
 import string
 import sys
 from collections import namedtuple
-from typing import Dict, Tuple, TypeVar
+from typing import Any, Dict, Tuple, TypeVar
 
 import attr
 from signedjson.key import decode_verify_key_bytes
@@ -43,6 +43,11 @@ T = TypeVar("T")
 StateMap = Dict[Tuple[str, str], T]
 
 
+# the type of a JSON-serialisable dict. This could be made stronger, but it will
+# do for now.
+JsonDict = Dict[str, Any]
+
+
 class Requester(
     namedtuple(
         "Requester", ["user", "access_token_id", "is_guest", "device_id", "app_service"]
diff --git a/synapse/util/stringutils.py b/synapse/util/stringutils.py
index 982c6d81ca..2c0dcb5208 100644
--- a/synapse/util/stringutils.py
+++ b/synapse/util/stringutils.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2014-2016 OpenMarket Ltd
+# Copyright 2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,14 +15,22 @@
 # limitations under the License.
 
 import random
+import re
 import string
 
 import six
 from six import PY2, PY3
 from six.moves import range
 
+from synapse.api.errors import Codes, SynapseError
+
 _string_with_symbols = string.digits + string.ascii_letters + ".,;:^&*-_+=#~@"
 
+# https://matrix.org/docs/spec/client_server/r0.6.0#post-matrix-client-r0-register-email-requesttoken
+# Note: The : character is allowed here for older clients, but will be removed in a
+# future release. Context: https://github.com/matrix-org/synapse/issues/6766
+client_secret_regex = re.compile(r"^[0-9a-zA-Z\.\=\_\-\:]+$")
+
 # random_string and random_string_with_symbols are used for a range of things,
 # some cryptographically important, some less so. We use SystemRandom to make sure
 # we get cryptographically-secure randoms.
@@ -109,3 +118,11 @@ def exception_to_unicode(e):
         return msg.decode("utf-8", errors="replace")
     else:
         return msg
+
+
+def assert_valid_client_secret(client_secret):
+    """Validate that a given string matches the client_secret regex defined by the spec"""
+    if client_secret_regex.match(client_secret) is None:
+        raise SynapseError(
+            400, "Invalid client_secret parameter", errcode=Codes.INVALID_PARAM
+        )
diff --git a/synapse/visibility.py b/synapse/visibility.py
index 100dc47a8a..d0abd8f04f 100644
--- a/synapse/visibility.py
+++ b/synapse/visibility.py
@@ -122,6 +122,13 @@ def filter_events_for_client(
         if not event.is_state() and event.sender in ignore_list:
             return None
 
+        # Until MSC2261 has landed we can't redact malicious alias events, so for
+        # now we temporarily filter out m.room.aliases entirely to mitigate
+        # abuse, while we spec a better solution to advertising aliases
+        # on rooms.
+        if event.type == EventTypes.Aliases:
+            return None
+
         # Don't try to apply the room's retention policy if the event is a state event, as
         # MSC1763 states that retention is only considered for non-state events.
         if apply_retention_policies and not event.is_state():