summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/appservice.py10
-rw-r--r--synapse/handlers/auth.py2
-rw-r--r--synapse/handlers/cas_handler.py3
-rw-r--r--synapse/handlers/device.py15
-rw-r--r--synapse/handlers/devicemessage.py25
-rw-r--r--synapse/handlers/directory.py68
-rw-r--r--synapse/handlers/e2e_keys.py14
-rw-r--r--synapse/handlers/e2e_room_keys.py7
-rw-r--r--synapse/handlers/federation.py44
-rw-r--r--synapse/handlers/groups_local.py4
-rw-r--r--synapse/handlers/message.py14
-rw-r--r--synapse/handlers/pagination.py28
-rw-r--r--synapse/handlers/presence.py43
-rw-r--r--synapse/handlers/profile.py8
-rw-r--r--synapse/handlers/register.py230
-rw-r--r--synapse/handlers/room.py82
-rw-r--r--synapse/handlers/room_list.py4
-rw-r--r--synapse/handlers/room_member.py7
-rw-r--r--synapse/handlers/sync.py34
-rw-r--r--synapse/handlers/typing.py69
-rw-r--r--synapse/handlers/user_directory.py6
21 files changed, 424 insertions, 293 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index fe62f78e67..904c96eeec 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import itervalues
-
 from prometheus_client import Counter
 
 from twisted.internet import defer
@@ -116,6 +114,12 @@ class ApplicationServicesHandler(object):
                         for service in services:
                             self.scheduler.submit_event_for_as(service, event)
 
+                        now = self.clock.time_msec()
+                        ts = yield self.store.get_received_ts(event.event_id)
+                        synapse.metrics.event_processing_lag_by_event.labels(
+                            "appservice_sender"
+                        ).observe((now - ts) / 1000)
+
                     @defer.inlineCallbacks
                     def handle_room_events(events):
                         for event in events:
@@ -125,7 +129,7 @@ class ApplicationServicesHandler(object):
                         defer.gatherResults(
                             [
                                 run_in_background(handle_room_events, evs)
-                                for evs in itervalues(events_by_room)
+                                for evs in events_by_room.values()
                             ],
                             consumeErrors=True,
                         )
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index bb3b43d5ae..c3f86e7414 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -297,7 +297,7 @@ class AuthHandler(BaseHandler):
 
         # Convert the URI and method to strings.
         uri = request.uri.decode("utf-8")
-        method = request.uri.decode("utf-8")
+        method = request.method.decode("utf-8")
 
         # If there's no session ID, create a new session.
         if not sid:
diff --git a/synapse/handlers/cas_handler.py b/synapse/handlers/cas_handler.py
index 64aaa1335c..76f213723a 100644
--- a/synapse/handlers/cas_handler.py
+++ b/synapse/handlers/cas_handler.py
@@ -14,11 +14,10 @@
 # limitations under the License.
 
 import logging
+import urllib
 import xml.etree.ElementTree as ET
 from typing import Dict, Optional, Tuple
 
-from six.moves import urllib
-
 from twisted.web.client import PartialDownloadError
 
 from synapse.api.errors import Codes, LoginError
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 230d170258..31346b56c3 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Any, Dict, Optional
 
-from six import iteritems, itervalues
-
 from twisted.internet import defer
 
 from synapse.api import errors
@@ -159,7 +157,7 @@ class DeviceWorkerHandler(BaseHandler):
             # The user may have left the room
             # TODO: Check if they actually did or if we were just invited.
             if room_id not in room_ids:
-                for key, event_id in iteritems(current_state_ids):
+                for key, event_id in current_state_ids.items():
                     etype, state_key = key
                     if etype != EventTypes.Member:
                         continue
@@ -182,7 +180,7 @@ class DeviceWorkerHandler(BaseHandler):
                 log_kv(
                     {"event": "encountered empty previous state", "room_id": room_id}
                 )
-                for key, event_id in iteritems(current_state_ids):
+                for key, event_id in current_state_ids.items():
                     etype, state_key = key
                     if etype != EventTypes.Member:
                         continue
@@ -198,10 +196,10 @@ class DeviceWorkerHandler(BaseHandler):
 
             # Check if we've joined the room? If so we just blindly add all the users to
             # the "possibly changed" users.
-            for state_dict in itervalues(prev_state_ids):
+            for state_dict in prev_state_ids.values():
                 member_event = state_dict.get((EventTypes.Member, user_id), None)
                 if not member_event or member_event != current_member_id:
-                    for key, event_id in iteritems(current_state_ids):
+                    for key, event_id in current_state_ids.items():
                         etype, state_key = key
                         if etype != EventTypes.Member:
                             continue
@@ -211,14 +209,14 @@ class DeviceWorkerHandler(BaseHandler):
             # If there has been any change in membership, include them in the
             # possibly changed list. We'll check if they are joined below,
             # and we're not toooo worried about spuriously adding users.
-            for key, event_id in iteritems(current_state_ids):
+            for key, event_id in current_state_ids.items():
                 etype, state_key = key
                 if etype != EventTypes.Member:
                     continue
 
                 # check if this member has changed since any of the extremities
                 # at the stream_ordering, and add them to the list if so.
-                for state_dict in itervalues(prev_state_ids):
+                for state_dict in prev_state_ids.values():
                     prev_event_id = state_dict.get(key, None)
                     if not prev_event_id or prev_event_id != event_id:
                         if state_key != user_id:
@@ -693,6 +691,7 @@ class DeviceListUpdater(object):
 
         return False
 
+    @trace
     @defer.inlineCallbacks
     def _maybe_retry_device_resync(self):
         """Retry to resync device lists that are out of sync, except if another retry is
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 05c4b3eec0..610b08d00b 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -18,8 +18,6 @@ from typing import Any, Dict
 
 from canonicaljson import json
 
-from twisted.internet import defer
-
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
 from synapse.logging.opentracing import (
@@ -51,8 +49,7 @@ class DeviceMessageHandler(object):
 
         self._device_list_updater = hs.get_device_handler().device_list_updater
 
-    @defer.inlineCallbacks
-    def on_direct_to_device_edu(self, origin, content):
+    async def on_direct_to_device_edu(self, origin, content):
         local_messages = {}
         sender_user_id = content["sender"]
         if origin != get_domain_from_id(sender_user_id):
@@ -82,11 +79,11 @@ class DeviceMessageHandler(object):
             }
             local_messages[user_id] = messages_by_device
 
-            yield self._check_for_unknown_devices(
+            await self._check_for_unknown_devices(
                 message_type, sender_user_id, by_device
             )
 
-        stream_id = yield self.store.add_messages_from_remote_to_device_inbox(
+        stream_id = await self.store.add_messages_from_remote_to_device_inbox(
             origin, message_id, local_messages
         )
 
@@ -94,14 +91,13 @@ class DeviceMessageHandler(object):
             "to_device_key", stream_id, users=local_messages.keys()
         )
 
-    @defer.inlineCallbacks
-    def _check_for_unknown_devices(
+    async def _check_for_unknown_devices(
         self,
         message_type: str,
         sender_user_id: str,
         by_device: Dict[str, Dict[str, Any]],
     ):
-        """Checks inbound device messages for unkown remote devices, and if
+        """Checks inbound device messages for unknown remote devices, and if
         found marks the remote cache for the user as stale.
         """
 
@@ -115,7 +111,7 @@ class DeviceMessageHandler(object):
             requesting_device_ids.add(device_id)
 
         # Check if we are tracking the devices of the remote user.
-        room_ids = yield self.store.get_rooms_for_user(sender_user_id)
+        room_ids = await self.store.get_rooms_for_user(sender_user_id)
         if not room_ids:
             logger.info(
                 "Received device message from remote device we don't"
@@ -127,7 +123,7 @@ class DeviceMessageHandler(object):
 
         # If we are tracking check that we know about the sending
         # devices.
-        cached_devices = yield self.store.get_cached_devices_for_user(sender_user_id)
+        cached_devices = await self.store.get_cached_devices_for_user(sender_user_id)
 
         unknown_devices = requesting_device_ids - set(cached_devices)
         if unknown_devices:
@@ -136,15 +132,14 @@ class DeviceMessageHandler(object):
                 sender_user_id,
                 unknown_devices,
             )
-            yield self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
+            await self.store.mark_remote_user_device_cache_as_stale(sender_user_id)
 
             # Immediately attempt a resync in the background
             run_in_background(
                 self._device_list_updater.user_device_resync, sender_user_id
             )
 
-    @defer.inlineCallbacks
-    def send_device_message(self, sender_user_id, message_type, messages):
+    async def send_device_message(self, sender_user_id, message_type, messages):
         set_tag("number_of_messages", len(messages))
         set_tag("sender", sender_user_id)
         local_messages = {}
@@ -183,7 +178,7 @@ class DeviceMessageHandler(object):
                 }
 
         log_kv({"local_messages": local_messages})
-        stream_id = yield self.store.add_messages_to_device_inbox(
+        stream_id = await self.store.add_messages_to_device_inbox(
             local_messages, remote_edu_contents
         )
 
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index f2f16b1e43..79a2df6201 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -17,8 +17,6 @@ import logging
 import string
 from typing import Iterable, List, Optional
 
-from twisted.internet import defer
-
 from synapse.api.constants import MAX_ALIAS_LENGTH, EventTypes
 from synapse.api.errors import (
     AuthError,
@@ -55,8 +53,7 @@ class DirectoryHandler(BaseHandler):
 
         self.spam_checker = hs.get_spam_checker()
 
-    @defer.inlineCallbacks
-    def _create_association(
+    async def _create_association(
         self,
         room_alias: RoomAlias,
         room_id: str,
@@ -76,13 +73,13 @@ class DirectoryHandler(BaseHandler):
         # TODO(erikj): Add transactions.
         # TODO(erikj): Check if there is a current association.
         if not servers:
-            users = yield self.state.get_current_users_in_room(room_id)
+            users = await self.state.get_current_users_in_room(room_id)
             servers = {get_domain_from_id(u) for u in users}
 
         if not servers:
             raise SynapseError(400, "Failed to get server list")
 
-        yield self.store.create_room_alias_association(
+        await self.store.create_room_alias_association(
             room_alias, room_id, servers, creator=creator
         )
 
@@ -93,7 +90,7 @@ class DirectoryHandler(BaseHandler):
         room_id: str,
         servers: Optional[List[str]] = None,
         check_membership: bool = True,
-    ):
+    ) -> None:
         """Attempt to create a new alias
 
         Args:
@@ -103,9 +100,6 @@ class DirectoryHandler(BaseHandler):
             servers: Iterable of servers that others servers should try and join via
             check_membership: Whether to check if the user is in the room
                 before the alias can be set (if the server's config requires it).
-
-        Returns:
-            Deferred
         """
 
         user_id = requester.user.to_string()
@@ -148,7 +142,7 @@ class DirectoryHandler(BaseHandler):
                 # per alias creation rule?
                 raise SynapseError(403, "Not allowed to create alias")
 
-            can_create = await self.can_modify_alias(room_alias, user_id=user_id)
+            can_create = self.can_modify_alias(room_alias, user_id=user_id)
             if not can_create:
                 raise AuthError(
                     400,
@@ -158,7 +152,9 @@ class DirectoryHandler(BaseHandler):
 
         await self._create_association(room_alias, room_id, servers, creator=user_id)
 
-    async def delete_association(self, requester: Requester, room_alias: RoomAlias):
+    async def delete_association(
+        self, requester: Requester, room_alias: RoomAlias
+    ) -> str:
         """Remove an alias from the directory
 
         (this is only meant for human users; AS users should call
@@ -169,7 +165,7 @@ class DirectoryHandler(BaseHandler):
             room_alias
 
         Returns:
-            Deferred[unicode]: room id that the alias used to point to
+            room id that the alias used to point to
 
         Raises:
             NotFoundError: if the alias doesn't exist
@@ -191,7 +187,7 @@ class DirectoryHandler(BaseHandler):
         if not can_delete:
             raise AuthError(403, "You don't have permission to delete the alias.")
 
-        can_delete = await self.can_modify_alias(room_alias, user_id=user_id)
+        can_delete = self.can_modify_alias(room_alias, user_id=user_id)
         if not can_delete:
             raise SynapseError(
                 400,
@@ -208,8 +204,7 @@ class DirectoryHandler(BaseHandler):
 
         return room_id
 
-    @defer.inlineCallbacks
-    def delete_appservice_association(
+    async def delete_appservice_association(
         self, service: ApplicationService, room_alias: RoomAlias
     ):
         if not service.is_interested_in_alias(room_alias.to_string()):
@@ -218,29 +213,27 @@ class DirectoryHandler(BaseHandler):
                 "This application service has not reserved this kind of alias",
                 errcode=Codes.EXCLUSIVE,
             )
-        yield self._delete_association(room_alias)
+        await self._delete_association(room_alias)
 
-    @defer.inlineCallbacks
-    def _delete_association(self, room_alias: RoomAlias):
+    async def _delete_association(self, room_alias: RoomAlias):
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room alias must be local")
 
-        room_id = yield self.store.delete_room_alias(room_alias)
+        room_id = await self.store.delete_room_alias(room_alias)
 
         return room_id
 
-    @defer.inlineCallbacks
-    def get_association(self, room_alias: RoomAlias):
+    async def get_association(self, room_alias: RoomAlias):
         room_id = None
         if self.hs.is_mine(room_alias):
-            result = yield self.get_association_from_room_alias(room_alias)
+            result = await self.get_association_from_room_alias(room_alias)
 
             if result:
                 room_id = result.room_id
                 servers = result.servers
         else:
             try:
-                result = yield self.federation.make_query(
+                result = await self.federation.make_query(
                     destination=room_alias.domain,
                     query_type="directory",
                     args={"room_alias": room_alias.to_string()},
@@ -265,7 +258,7 @@ class DirectoryHandler(BaseHandler):
                 Codes.NOT_FOUND,
             )
 
-        users = yield self.state.get_current_users_in_room(room_id)
+        users = await self.state.get_current_users_in_room(room_id)
         extra_servers = {get_domain_from_id(u) for u in users}
         servers = set(extra_servers) | set(servers)
 
@@ -277,13 +270,12 @@ class DirectoryHandler(BaseHandler):
 
         return {"room_id": room_id, "servers": servers}
 
-    @defer.inlineCallbacks
-    def on_directory_query(self, args):
+    async def on_directory_query(self, args):
         room_alias = RoomAlias.from_string(args["room_alias"])
         if not self.hs.is_mine(room_alias):
             raise SynapseError(400, "Room Alias is not hosted on this homeserver")
 
-        result = yield self.get_association_from_room_alias(room_alias)
+        result = await self.get_association_from_room_alias(room_alias)
 
         if result is not None:
             return {"room_id": result.room_id, "servers": result.servers}
@@ -344,16 +336,15 @@ class DirectoryHandler(BaseHandler):
                 ratelimit=False,
             )
 
-    @defer.inlineCallbacks
-    def get_association_from_room_alias(self, room_alias: RoomAlias):
-        result = yield self.store.get_association_from_room_alias(room_alias)
+    async def get_association_from_room_alias(self, room_alias: RoomAlias):
+        result = await self.store.get_association_from_room_alias(room_alias)
         if not result:
             # Query AS to see if it exists
             as_handler = self.appservice_handler
-            result = yield as_handler.query_room_alias_exists(room_alias)
+            result = await as_handler.query_room_alias_exists(room_alias)
         return result
 
-    def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None):
+    def can_modify_alias(self, alias: RoomAlias, user_id: Optional[str] = None) -> bool:
         # Any application service "interested" in an alias they are regexing on
         # can modify the alias.
         # Users can only modify the alias if ALL the interested services have
@@ -366,12 +357,12 @@ class DirectoryHandler(BaseHandler):
         for service in interested_services:
             if user_id == service.sender:
                 # this user IS the app service so they can do whatever they like
-                return defer.succeed(True)
+                return True
             elif service.is_exclusive_alias(alias.to_string()):
                 # another service has an exclusive lock on this alias.
-                return defer.succeed(False)
+                return False
         # either no interested services, or no service with an exclusive lock
-        return defer.succeed(True)
+        return True
 
     async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str):
         """Determine whether a user can delete an alias.
@@ -459,8 +450,7 @@ class DirectoryHandler(BaseHandler):
 
         await self.store.set_room_is_public(room_id, making_public)
 
-    @defer.inlineCallbacks
-    def edit_published_appservice_room_list(
+    async def edit_published_appservice_room_list(
         self, appservice_id: str, network_id: str, room_id: str, visibility: str
     ):
         """Add or remove a room from the appservice/network specific public
@@ -475,7 +465,7 @@ class DirectoryHandler(BaseHandler):
         if visibility not in ["public", "private"]:
             raise SynapseError(400, "Invalid visibility setting")
 
-        yield self.store.set_room_is_public_appservice(
+        await self.store.set_room_is_public_appservice(
             room_id, appservice_id, network_id, visibility == "public"
         )
 
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index 774a252619..a7e60cbc26 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -17,8 +17,6 @@
 
 import logging
 
-from six import iteritems
-
 import attr
 from canonicaljson import encode_canonical_json, json
 from signedjson.key import decode_verify_key_bytes
@@ -135,7 +133,7 @@ class E2eKeysHandler(object):
         remote_queries_not_in_cache = {}
         if remote_queries:
             query_list = []
-            for user_id, device_ids in iteritems(remote_queries):
+            for user_id, device_ids in remote_queries.items():
                 if device_ids:
                     query_list.extend((user_id, device_id) for device_id in device_ids)
                 else:
@@ -145,9 +143,9 @@ class E2eKeysHandler(object):
                 user_ids_not_in_cache,
                 remote_results,
             ) = yield self.store.get_user_devices_from_cache(query_list)
-            for user_id, devices in iteritems(remote_results):
+            for user_id, devices in remote_results.items():
                 user_devices = results.setdefault(user_id, {})
-                for device_id, device in iteritems(devices):
+                for device_id, device in devices.items():
                     keys = device.get("keys", None)
                     device_display_name = device.get("device_display_name", None)
                     if keys:
@@ -446,9 +444,9 @@ class E2eKeysHandler(object):
             ",".join(
                 (
                     "%s for %s:%s" % (key_id, user_id, device_id)
-                    for user_id, user_keys in iteritems(json_result)
-                    for device_id, device_keys in iteritems(user_keys)
-                    for key_id, _ in iteritems(device_keys)
+                    for user_id, user_keys in json_result.items()
+                    for device_id, device_keys in user_keys.items()
+                    for key_id, _ in device_keys.items()
                 )
             ),
         )
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 9abaf13b8f..f55470a707 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six import iteritems
-
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -205,8 +203,8 @@ class E2eRoomKeysHandler(object):
             )
             to_insert = []  # batch the inserts together
             changed = False  # if anything has changed, we need to update the etag
-            for room_id, room in iteritems(room_keys["rooms"]):
-                for session_id, room_key in iteritems(room["sessions"]):
+            for room_id, room in room_keys["rooms"].items():
+                for session_id, room_key in room["sessions"].items():
                     if not isinstance(room_key["is_verified"], bool):
                         msg = (
                             "is_verified must be a boolean in keys for session %s in"
@@ -351,6 +349,7 @@ class E2eRoomKeysHandler(object):
                     raise
 
             res["count"] = yield self.store.count_e2e_room_keys(user_id, res["version"])
+            res["etag"] = str(res["etag"])
             return res
 
     @trace
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index d0b62f4cf2..4dbd8e1d98 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -19,12 +19,9 @@
 
 import itertools
 import logging
+from http import HTTPStatus
 from typing import Dict, Iterable, List, Optional, Sequence, Tuple
 
-import six
-from six import iteritems, itervalues
-from six.moves import http_client, zip
-
 import attr
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
@@ -33,7 +30,12 @@ from unpaddedbase64 import decode_base64
 from twisted.internet import defer
 
 from synapse import event_auth
-from synapse.api.constants import EventTypes, Membership, RejectedReason
+from synapse.api.constants import (
+    EventTypes,
+    Membership,
+    RejectedReason,
+    RoomEncryptionAlgorithms,
+)
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
@@ -374,6 +376,7 @@ class FederationHandler(BaseHandler):
 
                     room_version = await self.store.get_room_version_id(room_id)
                     state_map = await resolve_events_with_store(
+                        self.clock,
                         room_id,
                         room_version,
                         state_maps,
@@ -393,7 +396,7 @@ class FederationHandler(BaseHandler):
                     )
                     event_map.update(evs)
 
-                    state = [event_map[e] for e in six.itervalues(state_map)]
+                    state = [event_map[e] for e in state_map.values()]
                 except Exception:
                     logger.warning(
                         "[%s %s] Error attempting to resolve state at missing "
@@ -742,7 +745,10 @@ class FederationHandler(BaseHandler):
                 if device:
                     keys = device.get("keys", {}).get("keys", {})
 
-                    if event.content.get("algorithm") == "m.megolm.v1.aes-sha2":
+                    if (
+                        event.content.get("algorithm")
+                        == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
+                    ):
                         # For this algorithm we expect a curve25519 key.
                         key_name = "curve25519:%s" % (device_id,)
                         current_keys = [keys.get(key_name)]
@@ -1001,7 +1007,7 @@ class FederationHandler(BaseHandler):
             """
             joined_users = [
                 (state_key, int(event.depth))
-                for (e_type, state_key), event in iteritems(state)
+                for (e_type, state_key), event in state.items()
                 if e_type == EventTypes.Member and event.membership == Membership.JOIN
             ]
 
@@ -1091,16 +1097,16 @@ class FederationHandler(BaseHandler):
         states = dict(zip(event_ids, [s.state for s in states]))
 
         state_map = await self.store.get_events(
-            [e_id for ids in itervalues(states) for e_id in itervalues(ids)],
+            [e_id for ids in states.values() for e_id in ids.values()],
             get_prev_content=False,
         )
         states = {
             key: {
                 k: state_map[e_id]
-                for k, e_id in iteritems(state_dict)
+                for k, e_id in state_dict.items()
                 if e_id in state_map
             }
-            for key, state_dict in iteritems(states)
+            for key, state_dict in states.items()
         }
 
         for e_id, _ in sorted_extremeties_tuple:
@@ -1188,7 +1194,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.prev_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many prev_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
 
         if len(ev.auth_event_ids()) > 10:
             logger.warning(
@@ -1196,7 +1202,7 @@ class FederationHandler(BaseHandler):
                 ev.event_id,
                 len(ev.auth_event_ids()),
             )
-            raise SynapseError(http_client.BAD_REQUEST, "Too many auth_events")
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
 
     async def send_invite(self, target_host, event):
         """ Sends the invite to the remote server for signing.
@@ -1539,7 +1545,7 @@ class FederationHandler(BaseHandler):
 
         # block any attempts to invite the server notices mxid
         if event.state_key == self._server_notices_mxid:
-            raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+            raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
         # keep a record of the room version, if we don't yet know it.
         # (this may get overwritten if we later get a different room version in a
@@ -1725,7 +1731,7 @@ class FederationHandler(BaseHandler):
         state_groups = await self.state_store.get_state_groups(room_id, [event_id])
 
         if state_groups:
-            _, state = list(iteritems(state_groups)).pop()
+            _, state = list(state_groups.items()).pop()
             results = {(e.type, e.state_key): e for e in state}
 
             if event.is_state():
@@ -2088,7 +2094,7 @@ class FederationHandler(BaseHandler):
                     room_version, state_sets, event
                 )
                 current_state_ids = {
-                    k: e.event_id for k, e in iteritems(current_state_ids)
+                    k: e.event_id for k, e in current_state_ids.items()
                 }
             else:
                 current_state_ids = await self.state_handler.get_current_state_ids(
@@ -2104,7 +2110,7 @@ class FederationHandler(BaseHandler):
             # Now check if event pass auth against said current state
             auth_types = auth_types_for_event(event)
             current_state_ids = [
-                e for k, e in iteritems(current_state_ids) if k in auth_types
+                e for k, e in current_state_ids.items() if k in auth_types
             ]
 
             current_auth_events = await self.store.get_events(current_state_ids)
@@ -2420,7 +2426,7 @@ class FederationHandler(BaseHandler):
         else:
             event_key = None
         state_updates = {
-            k: a.event_id for k, a in iteritems(auth_events) if k != event_key
+            k: a.event_id for k, a in auth_events.items() if k != event_key
         }
 
         current_state_ids = await context.get_current_state_ids()
@@ -2431,7 +2437,7 @@ class FederationHandler(BaseHandler):
         prev_state_ids = await context.get_prev_state_ids()
         prev_state_ids = dict(prev_state_ids)
 
-        prev_state_ids.update({k: a.event_id for k, a in iteritems(auth_events)})
+        prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
 
         # create a new state group as a delta from the existing one.
         prev_group = context.state_group
diff --git a/synapse/handlers/groups_local.py b/synapse/handlers/groups_local.py
index ebe8d25bd8..7cb106e365 100644
--- a/synapse/handlers/groups_local.py
+++ b/synapse/handlers/groups_local.py
@@ -16,8 +16,6 @@
 
 import logging
 
-from six import iteritems
-
 from synapse.api.errors import HttpResponseException, RequestSendFailed, SynapseError
 from synapse.types import get_domain_from_id
 
@@ -227,7 +225,7 @@ class GroupsLocalWorkerHandler(object):
 
         results = {}
         failed_results = []
-        for destination, dest_user_ids in iteritems(destinations):
+        for destination, dest_user_ids in destinations.items():
             try:
                 r = await self.transport_client.bulk_get_publicised_groups(
                     destination, list(dest_user_ids)
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 649ca1f08a..665ad19b5d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -17,8 +17,6 @@
 import logging
 from typing import Optional, Tuple
 
-from six import iteritems, itervalues, string_types
-
 from canonicaljson import encode_canonical_json, json
 
 from twisted.internet import defer
@@ -246,7 +244,7 @@ class MessageHandler(object):
                 "avatar_url": profile.avatar_url,
                 "display_name": profile.display_name,
             }
-            for user_id, profile in iteritems(users_with_profile)
+            for user_id, profile in users_with_profile.items()
         }
 
     def maybe_schedule_expiry(self, event):
@@ -715,7 +713,7 @@ class EventCreationHandler(object):
 
             spam_error = self.spam_checker.check_event_for_spam(event)
             if spam_error:
-                if not isinstance(spam_error, string_types):
+                if not isinstance(spam_error, str):
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
@@ -881,7 +879,9 @@ class EventCreationHandler(object):
         """
         room_alias = RoomAlias.from_string(room_alias_str)
         try:
-            mapping = yield directory_handler.get_association(room_alias)
+            mapping = yield defer.ensureDeferred(
+                directory_handler.get_association(room_alias)
+            )
         except SynapseError as e:
             # Turn M_NOT_FOUND errors into M_BAD_ALIAS errors.
             if e.errcode == Codes.NOT_FOUND:
@@ -988,7 +988,7 @@ class EventCreationHandler(object):
 
                 state_to_include_ids = [
                     e_id
-                    for k, e_id in iteritems(current_state_ids)
+                    for k, e_id in current_state_ids.items()
                     if k[0] in self.room_invite_state_types
                     or k == (EventTypes.Member, event.sender)
                 ]
@@ -1002,7 +1002,7 @@ class EventCreationHandler(object):
                         "content": e.content,
                         "sender": e.sender,
                     }
-                    for e in itervalues(state_to_include)
+                    for e in state_to_include.values()
                 ]
 
                 invitee = UserID.from_string(event.state_key)
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index d7442c62a7..da06582d4b 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -15,9 +15,6 @@
 # limitations under the License.
 import logging
 
-from six import iteritems
-
-from twisted.internet import defer
 from twisted.python.failure import Failure
 
 from synapse.api.constants import EventTypes, Membership
@@ -99,8 +96,7 @@ class PaginationHandler(object):
                     job["longest_max_lifetime"],
                 )
 
-    @defer.inlineCallbacks
-    def purge_history_for_rooms_in_range(self, min_ms, max_ms):
+    async def purge_history_for_rooms_in_range(self, min_ms, max_ms):
         """Purge outdated events from rooms within the given retention range.
 
         If a default retention policy is defined in the server's configuration and its
@@ -139,13 +135,13 @@ class PaginationHandler(object):
             include_null,
         )
 
-        rooms = yield self.store.get_rooms_for_retention_period_in_range(
+        rooms = await self.store.get_rooms_for_retention_period_in_range(
             min_ms, max_ms, include_null
         )
 
         logger.debug("[purge] Rooms to purge: %s", rooms)
 
-        for room_id, retention_policy in iteritems(rooms):
+        for room_id, retention_policy in rooms.items():
             logger.info("[purge] Attempting to purge messages in room %s", room_id)
 
             if room_id in self._purges_in_progress_by_room:
@@ -167,9 +163,9 @@ class PaginationHandler(object):
             # Figure out what token we should start purging at.
             ts = self.clock.time_msec() - max_lifetime
 
-            stream_ordering = yield self.store.find_first_stream_ordering_after_ts(ts)
+            stream_ordering = await self.store.find_first_stream_ordering_after_ts(ts)
 
-            r = yield self.store.get_room_event_before_stream_ordering(
+            r = await self.store.get_room_event_before_stream_ordering(
                 room_id, stream_ordering,
             )
             if not r:
@@ -229,8 +225,7 @@ class PaginationHandler(object):
         )
         return purge_id
 
-    @defer.inlineCallbacks
-    def _purge_history(self, purge_id, room_id, token, delete_local_events):
+    async def _purge_history(self, purge_id, room_id, token, delete_local_events):
         """Carry out a history purge on a room.
 
         Args:
@@ -239,14 +234,11 @@ class PaginationHandler(object):
             token (str): topological token to delete events before
             delete_local_events (bool): True to delete local events as well as
                 remote ones
-
-        Returns:
-            Deferred
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
-            with (yield self.pagination_lock.write(room_id)):
-                yield self.storage.purge_events.purge_history(
+            with await self.pagination_lock.write(room_id):
+                await self.storage.purge_events.purge_history(
                     room_id, token, delete_local_events
                 )
             logger.info("[purge] complete")
@@ -284,9 +276,7 @@ class PaginationHandler(object):
             await self.store.get_room_version_id(room_id)
 
             # first check that we have no users in this room
-            joined = await defer.maybeDeferred(
-                self.store.is_host_joined, room_id, self._server_name
-            )
+            joined = await self.store.is_host_joined(room_id, self._server_name)
 
             if joined:
                 raise SynapseError(400, "Users are still joined to this room")
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 3594f3b00f..d2f25ae12a 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -25,9 +25,7 @@ The methods that define policy are:
 import abc
 import logging
 from contextlib import contextmanager
-from typing import Dict, Iterable, List, Set
-
-from six import iteritems, itervalues
+from typing import Dict, Iterable, List, Set, Tuple
 
 from prometheus_client import Counter
 from typing_extensions import ContextManager
@@ -170,14 +168,14 @@ class BasePresenceHandler(abc.ABC):
             for user_id in user_ids
         }
 
-        missing = [user_id for user_id, state in iteritems(states) if not state]
+        missing = [user_id for user_id, state in states.items() if not state]
         if missing:
             # There are things not in our in memory cache. Lets pull them out of
             # the database.
             res = await self.store.get_presence_for_users(missing)
             states.update(res)
 
-            missing = [user_id for user_id, state in iteritems(states) if not state]
+            missing = [user_id for user_id, state in states.items() if not state]
             if missing:
                 new = {
                     user_id: UserPresenceState.default(user_id) for user_id in missing
@@ -632,7 +630,7 @@ class PresenceHandler(BasePresenceHandler):
             await self._update_states(
                 [
                     prev_state.copy_and_replace(last_user_sync_ts=time_now_ms)
-                    for prev_state in itervalues(prev_states)
+                    for prev_state in prev_states.values()
                 ]
             )
             self.external_process_last_updated_ms.pop(process_id, None)
@@ -775,7 +773,9 @@ class PresenceHandler(BasePresenceHandler):
 
         return False
 
-    async def get_all_presence_updates(self, last_id, current_id, limit):
+    async def get_all_presence_updates(
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
         """
         Gets a list of presence update rows from between the given stream ids.
         Each row has:
@@ -787,10 +787,31 @@ class PresenceHandler(BasePresenceHandler):
         - last_user_sync_ts(int)
         - status_msg(int)
         - currently_active(int)
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
+
         # TODO(markjh): replicate the unpersisted changes.
         # This could use the in-memory stores for recent changes.
-        rows = await self.store.get_all_presence_updates(last_id, current_id, limit)
+        rows = await self.store.get_all_presence_updates(
+            instance_name, last_id, current_id, limit
+        )
         return rows
 
     def notify_new_event(self):
@@ -1087,7 +1108,7 @@ class PresenceEventSource(object):
             return (list(updates.values()), max_token)
         else:
             return (
-                [s for s in itervalues(updates) if s.state != PresenceState.OFFLINE],
+                [s for s in updates.values() if s.state != PresenceState.OFFLINE],
                 max_token,
             )
 
@@ -1323,11 +1344,11 @@ def get_interested_remotes(store, states, state_handler):
     # hosts in those rooms.
     room_ids_to_states, users_to_states = yield get_interested_parties(store, states)
 
-    for room_id, states in iteritems(room_ids_to_states):
+    for room_id, states in room_ids_to_states.items():
         hosts = yield state_handler.get_current_hosts_in_room(room_id)
         hosts_and_states.append((hosts, states))
 
-    for user_id, states in iteritems(users_to_states):
+    for user_id, states in users_to_states.items():
         host = get_domain_from_id(user_id)
         hosts_and_states.append(([host], states))
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 302efc1b9a..4b1e3073a8 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import raise_from
-
 from twisted.internet import defer
 
 from synapse.api.errors import (
@@ -84,7 +82,7 @@ class BaseProfileHandler(BaseHandler):
                 )
                 return result
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
@@ -135,7 +133,7 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
@@ -212,7 +210,7 @@ class BaseProfileHandler(BaseHandler):
                     ignore_backoff=True,
                 )
             except RequestSendFailed as e:
-                raise_from(SynapseError(502, "Failed to fetch profile"), e)
+                raise SynapseError(502, "Failed to fetch profile") from e
             except HttpResponseException as e:
                 raise e.to_synapse_error()
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 51979ea43e..78c3772ac1 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -17,7 +17,7 @@
 import logging
 
 from synapse import types
-from synapse.api.constants import MAX_USERID_LENGTH, LoginType
+from synapse.api.constants import MAX_USERID_LENGTH, EventTypes, JoinRules, LoginType
 from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
 from synapse.config.server import is_threepid_reserved
 from synapse.http.servlet import assert_params_in_dict
@@ -26,7 +26,8 @@ from synapse.replication.http.register import (
     ReplicationPostRegisterActionsServlet,
     ReplicationRegisterServlet,
 )
-from synapse.types import RoomAlias, RoomID, UserID, create_requester
+from synapse.storage.state import StateFilter
+from synapse.types import RoomAlias, UserID, create_requester
 from synapse.util.async_helpers import Linearizer
 
 from ._base import BaseHandler
@@ -270,51 +271,157 @@ class RegistrationHandler(BaseHandler):
 
         return user_id
 
-    async def _auto_join_rooms(self, user_id):
-        """Automatically joins users to auto join rooms - creating the room in the first place
-        if the user is the first to be created.
+    async def _create_and_join_rooms(self, user_id: str):
+        """
+        Create the auto-join rooms and join or invite the user to them.
+
+        This should only be called when the first "real" user registers.
 
         Args:
-            user_id(str): The user to join
+            user_id: The user to join
         """
-        # auto-join the user to any rooms we're supposed to dump them into
-        fake_requester = create_requester(user_id)
+        # Getting the handlers during init gives a dependency loop.
+        room_creation_handler = self.hs.get_room_creation_handler()
+        room_member_handler = self.hs.get_room_member_handler()
 
-        # try to create the room if we're the first real user on the server. Note
-        # that an auto-generated support or bot user is not a real user and will never be
-        # the user to create the room
-        should_auto_create_rooms = False
-        is_real_user = await self.store.is_real_user(user_id)
-        if self.hs.config.autocreate_auto_join_rooms and is_real_user:
-            count = await self.store.count_real_users()
-            should_auto_create_rooms = count == 1
-        for r in self.hs.config.auto_join_rooms:
+        # Generate a stub for how the rooms will be configured.
+        stub_config = {
+            "preset": self.hs.config.registration.autocreate_auto_join_room_preset,
+        }
+
+        # If the configuration providers a user ID to create rooms with, use
+        # that instead of the first user registered.
+        requires_join = False
+        if self.hs.config.registration.auto_join_user_id:
+            fake_requester = create_requester(
+                self.hs.config.registration.auto_join_user_id
+            )
+
+            # If the room requires an invite, add the user to the list of invites.
+            if self.hs.config.registration.auto_join_room_requires_invite:
+                stub_config["invite"] = [user_id]
+
+            # If the room is being created by a different user, the first user
+            # registered needs to join it. Note that in the case of an invitation
+            # being necessary this will occur after the invite was sent.
+            requires_join = True
+        else:
+            fake_requester = create_requester(user_id)
+
+        # Choose whether to federate the new room.
+        if not self.hs.config.registration.autocreate_auto_join_rooms_federated:
+            stub_config["creation_content"] = {"m.federate": False}
+
+        for r in self.hs.config.registration.auto_join_rooms:
             logger.info("Auto-joining %s to %s", user_id, r)
+
             try:
-                if should_auto_create_rooms:
-                    room_alias = RoomAlias.from_string(r)
-                    if self.hs.hostname != room_alias.domain:
-                        logger.warning(
-                            "Cannot create room alias %s, "
-                            "it does not match server domain",
-                            r,
-                        )
-                    else:
-                        # create room expects the localpart of the room alias
-                        room_alias_localpart = room_alias.localpart
-
-                        # getting the RoomCreationHandler during init gives a dependency
-                        # loop
-                        await self.hs.get_room_creation_handler().create_room(
-                            fake_requester,
-                            config={
-                                "preset": "public_chat",
-                                "room_alias_name": room_alias_localpart,
-                            },
+                room_alias = RoomAlias.from_string(r)
+
+                if self.hs.hostname != room_alias.domain:
+                    logger.warning(
+                        "Cannot create room alias %s, "
+                        "it does not match server domain",
+                        r,
+                    )
+                else:
+                    # A shallow copy is OK here since the only key that is
+                    # modified is room_alias_name.
+                    config = stub_config.copy()
+                    # create room expects the localpart of the room alias
+                    config["room_alias_name"] = room_alias.localpart
+
+                    info, _ = await room_creation_handler.create_room(
+                        fake_requester, config=config, ratelimit=False,
+                    )
+
+                    # If the room does not require an invite, but another user
+                    # created it, then ensure the first user joins it.
+                    if requires_join:
+                        await room_member_handler.update_membership(
+                            requester=create_requester(user_id),
+                            target=UserID.from_string(user_id),
+                            room_id=info["room_id"],
+                            # Since it was just created, there are no remote hosts.
+                            remote_room_hosts=[],
+                            action="join",
                             ratelimit=False,
                         )
+
+            except ConsentNotGivenError as e:
+                # Technically not necessary to pull out this error though
+                # moving away from bare excepts is a good thing to do.
+                logger.error("Failed to join new user to %r: %r", r, e)
+            except Exception as e:
+                logger.error("Failed to join new user to %r: %r", r, e)
+
+    async def _join_rooms(self, user_id: str):
+        """
+        Join or invite the user to the auto-join rooms.
+
+        Args:
+            user_id: The user to join
+        """
+        room_member_handler = self.hs.get_room_member_handler()
+
+        for r in self.hs.config.registration.auto_join_rooms:
+            logger.info("Auto-joining %s to %s", user_id, r)
+
+            try:
+                room_alias = RoomAlias.from_string(r)
+
+                if RoomAlias.is_valid(r):
+                    (
+                        room_id,
+                        remote_room_hosts,
+                    ) = await room_member_handler.lookup_room_alias(room_alias)
+                    room_id = room_id.to_string()
                 else:
-                    await self._join_user_to_room(fake_requester, r)
+                    raise SynapseError(
+                        400, "%s was not legal room ID or room alias" % (r,)
+                    )
+
+                # Calculate whether the room requires an invite or can be
+                # joined directly. Note that unless a join rule of public exists,
+                # it is treated as requiring an invite.
+                requires_invite = True
+
+                state = await self.store.get_filtered_current_state_ids(
+                    room_id, StateFilter.from_types([(EventTypes.JoinRules, "")])
+                )
+
+                event_id = state.get((EventTypes.JoinRules, ""))
+                if event_id:
+                    join_rules_event = await self.store.get_event(
+                        event_id, allow_none=True
+                    )
+                    if join_rules_event:
+                        join_rule = join_rules_event.content.get("join_rule", None)
+                        requires_invite = join_rule and join_rule != JoinRules.PUBLIC
+
+                # Send the invite, if necessary.
+                if requires_invite:
+                    await room_member_handler.update_membership(
+                        requester=create_requester(
+                            self.hs.config.registration.auto_join_user_id
+                        ),
+                        target=UserID.from_string(user_id),
+                        room_id=room_id,
+                        remote_room_hosts=remote_room_hosts,
+                        action="invite",
+                        ratelimit=False,
+                    )
+
+                # Send the join.
+                await room_member_handler.update_membership(
+                    requester=create_requester(user_id),
+                    target=UserID.from_string(user_id),
+                    room_id=room_id,
+                    remote_room_hosts=remote_room_hosts,
+                    action="join",
+                    ratelimit=False,
+                )
+
             except ConsentNotGivenError as e:
                 # Technically not necessary to pull out this error though
                 # moving away from bare excepts is a good thing to do.
@@ -322,6 +429,29 @@ class RegistrationHandler(BaseHandler):
             except Exception as e:
                 logger.error("Failed to join new user to %r: %r", r, e)
 
+    async def _auto_join_rooms(self, user_id: str):
+        """Automatically joins users to auto join rooms - creating the room in the first place
+        if the user is the first to be created.
+
+        Args:
+            user_id: The user to join
+        """
+        # auto-join the user to any rooms we're supposed to dump them into
+
+        # try to create the room if we're the first real user on the server. Note
+        # that an auto-generated support or bot user is not a real user and will never be
+        # the user to create the room
+        should_auto_create_rooms = False
+        is_real_user = await self.store.is_real_user(user_id)
+        if self.hs.config.registration.autocreate_auto_join_rooms and is_real_user:
+            count = await self.store.count_real_users()
+            should_auto_create_rooms = count == 1
+
+        if should_auto_create_rooms:
+            await self._create_and_join_rooms(user_id)
+        else:
+            await self._join_rooms(user_id)
+
     async def post_consent_actions(self, user_id):
         """A series of registration actions that can only be carried out once consent
         has been granted
@@ -392,30 +522,6 @@ class RegistrationHandler(BaseHandler):
         self._next_generated_user_id += 1
         return str(id)
 
-    async def _join_user_to_room(self, requester, room_identifier):
-        room_member_handler = self.hs.get_room_member_handler()
-        if RoomID.is_valid(room_identifier):
-            room_id = room_identifier
-        elif RoomAlias.is_valid(room_identifier):
-            room_alias = RoomAlias.from_string(room_identifier)
-            room_id, remote_room_hosts = await room_member_handler.lookup_room_alias(
-                room_alias
-            )
-            room_id = room_id.to_string()
-        else:
-            raise SynapseError(
-                400, "%s was not legal room ID or room alias" % (room_identifier,)
-            )
-
-        await room_member_handler.update_membership(
-            requester=requester,
-            target=requester.user,
-            room_id=room_id,
-            remote_room_hosts=remote_room_hosts,
-            action="join",
-            ratelimit=False,
-        )
-
     def check_registration_ratelimit(self, address):
         """A simple helper method to check whether the registration rate limit has been hit
         for a given IP address
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 61db3ccc43..950a84acd0 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -24,9 +24,12 @@ import string
 from collections import OrderedDict
 from typing import Tuple
 
-from six import iteritems, string_types
-
-from synapse.api.constants import EventTypes, JoinRules, RoomCreationPreset
+from synapse.api.constants import (
+    EventTypes,
+    JoinRules,
+    RoomCreationPreset,
+    RoomEncryptionAlgorithms,
+)
 from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion
 from synapse.events.utils import copy_power_levels_contents
@@ -56,31 +59,6 @@ FIVE_MINUTES_IN_MS = 5 * 60 * 1000
 
 
 class RoomCreationHandler(BaseHandler):
-
-    PRESETS_DICT = {
-        RoomCreationPreset.PRIVATE_CHAT: {
-            "join_rules": JoinRules.INVITE,
-            "history_visibility": "shared",
-            "original_invitees_have_ops": False,
-            "guest_can_join": True,
-            "power_level_content_override": {"invite": 0},
-        },
-        RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
-            "join_rules": JoinRules.INVITE,
-            "history_visibility": "shared",
-            "original_invitees_have_ops": True,
-            "guest_can_join": True,
-            "power_level_content_override": {"invite": 0},
-        },
-        RoomCreationPreset.PUBLIC_CHAT: {
-            "join_rules": JoinRules.PUBLIC,
-            "history_visibility": "shared",
-            "original_invitees_have_ops": False,
-            "guest_can_join": False,
-            "power_level_content_override": {},
-        },
-    }
-
     def __init__(self, hs):
         super(RoomCreationHandler, self).__init__(hs)
 
@@ -89,6 +67,39 @@ class RoomCreationHandler(BaseHandler):
         self.room_member_handler = hs.get_room_member_handler()
         self.config = hs.config
 
+        # Room state based off defined presets
+        self._presets_dict = {
+            RoomCreationPreset.PRIVATE_CHAT: {
+                "join_rules": JoinRules.INVITE,
+                "history_visibility": "shared",
+                "original_invitees_have_ops": False,
+                "guest_can_join": True,
+                "power_level_content_override": {"invite": 0},
+            },
+            RoomCreationPreset.TRUSTED_PRIVATE_CHAT: {
+                "join_rules": JoinRules.INVITE,
+                "history_visibility": "shared",
+                "original_invitees_have_ops": True,
+                "guest_can_join": True,
+                "power_level_content_override": {"invite": 0},
+            },
+            RoomCreationPreset.PUBLIC_CHAT: {
+                "join_rules": JoinRules.PUBLIC,
+                "history_visibility": "shared",
+                "original_invitees_have_ops": False,
+                "guest_can_join": False,
+                "power_level_content_override": {},
+            },
+        }
+
+        # Modify presets to selectively enable encryption by default per homeserver config
+        for preset_name, preset_config in self._presets_dict.items():
+            encrypted = (
+                preset_name
+                in self.config.encryption_enabled_by_default_for_room_presets
+            )
+            preset_config["encrypted"] = encrypted
+
         self._replication = hs.get_replication_data_handler()
 
         # linearizer to stop two upgrades happening at once
@@ -364,7 +375,7 @@ class RoomCreationHandler(BaseHandler):
         # map from event_id to BaseEvent
         old_room_state_events = await self.store.get_events(old_room_state_ids.values())
 
-        for k, old_event_id in iteritems(old_room_state_ids):
+        for k, old_event_id in old_room_state_ids.items():
             old_event = old_room_state_events.get(old_event_id)
             if old_event:
                 initial_state[k] = old_event.content
@@ -417,7 +428,7 @@ class RoomCreationHandler(BaseHandler):
         old_room_member_state_events = await self.store.get_events(
             old_room_member_state_ids.values()
         )
-        for k, old_event in iteritems(old_room_member_state_events):
+        for k, old_event in old_room_member_state_events.items():
             # Only transfer ban events
             if (
                 "membership" in old_event.content
@@ -582,7 +593,7 @@ class RoomCreationHandler(BaseHandler):
             "room_version", self.config.default_room_version.identifier
         )
 
-        if not isinstance(room_version_id, string_types):
+        if not isinstance(room_version_id, str):
             raise SynapseError(400, "room_version must be a string", Codes.BAD_JSON)
 
         room_version = KNOWN_ROOM_VERSIONS.get(room_version_id)
@@ -798,7 +809,7 @@ class RoomCreationHandler(BaseHandler):
             )
             return last_stream_id
 
-        config = RoomCreationHandler.PRESETS_DICT[preset_config]
+        config = self._presets_dict[preset_config]
 
         creator_id = creator.user.to_string()
 
@@ -888,6 +899,13 @@ class RoomCreationHandler(BaseHandler):
                 etype=etype, state_key=state_key, content=content
             )
 
+        if config["encrypted"]:
+            last_sent_stream_id = await send(
+                etype=EventTypes.RoomEncryption,
+                state_key="",
+                content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
+            )
+
         return last_sent_stream_id
 
     async def _generate_room_id(
diff --git a/synapse/handlers/room_list.py b/synapse/handlers/room_list.py
index 4cbc02b0d0..5e05be6181 100644
--- a/synapse/handlers/room_list.py
+++ b/synapse/handlers/room_list.py
@@ -17,8 +17,6 @@ import logging
 from collections import namedtuple
 from typing import Any, Dict, Optional
 
-from six import iteritems
-
 import msgpack
 from unpaddedbase64 import decode_base64, encode_base64
 
@@ -271,7 +269,7 @@ class RoomListHandler(BaseHandler):
         event_map = yield self.store.get_events(
             [
                 event_id
-                for key, event_id in iteritems(current_state_ids)
+                for key, event_id in current_state_ids.items()
                 if key[0]
                 in (
                     EventTypes.Create,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f7af982f0..27c479da9e 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,10 +17,9 @@
 
 import abc
 import logging
+from http import HTTPStatus
 from typing import Dict, Iterable, List, Optional, Tuple
 
-from six.moves import http_client
-
 from synapse import types
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import AuthError, Codes, SynapseError
@@ -361,7 +360,7 @@ class RoomMemberHandler(object):
         if effective_membership_state == Membership.INVITE:
             # block any attempts to invite the server notices mxid
             if target.to_string() == self._server_notices_mxid:
-                raise SynapseError(http_client.FORBIDDEN, "Cannot invite this user")
+                raise SynapseError(HTTPStatus.FORBIDDEN, "Cannot invite this user")
 
             block_invite = False
 
@@ -444,7 +443,7 @@ class RoomMemberHandler(object):
                 is_blocked = await self._is_server_notice_room(room_id)
                 if is_blocked:
                     raise SynapseError(
-                        http_client.FORBIDDEN,
+                        HTTPStatus.FORBIDDEN,
                         "You cannot reject this invite",
                         errcode=Codes.CANNOT_LEAVE_SERVER_NOTICE_ROOM,
                     )
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 6bdb24baff..4c7524493e 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -18,8 +18,6 @@ import itertools
 import logging
 from typing import Any, Dict, FrozenSet, List, Optional, Set, Tuple
 
-from six import iteritems, itervalues
-
 import attr
 from prometheus_client import Counter
 
@@ -390,7 +388,7 @@ class SyncHandler(object):
                 # result returned by the event source is poor form (it might cache
                 # the object)
                 room_id = event["room_id"]
-                event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+                event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
             receipt_key = since_token.receipt_key if since_token else "0"
@@ -408,7 +406,7 @@ class SyncHandler(object):
             for event in receipts:
                 room_id = event["room_id"]
                 # exclude room id, as above
-                event_copy = {k: v for (k, v) in iteritems(event) if k != "room_id"}
+                event_copy = {k: v for (k, v) in event.items() if k != "room_id"}
                 ephemeral_by_room.setdefault(room_id, []).append(event_copy)
 
         return now_token, ephemeral_by_room
@@ -454,7 +452,7 @@ class SyncHandler(object):
                     current_state_ids_map = await self.state.get_current_state_ids(
                         room_id
                     )
-                    current_state_ids = frozenset(itervalues(current_state_ids_map))
+                    current_state_ids = frozenset(current_state_ids_map.values())
 
                 recents = await filter_events_for_client(
                     self.storage,
@@ -509,7 +507,7 @@ class SyncHandler(object):
                     current_state_ids_map = await self.state.get_current_state_ids(
                         room_id
                     )
-                    current_state_ids = frozenset(itervalues(current_state_ids_map))
+                    current_state_ids = frozenset(current_state_ids_map.values())
 
                 loaded_recents = await filter_events_for_client(
                     self.storage,
@@ -909,7 +907,7 @@ class SyncHandler(object):
                     logger.debug("filtering state from %r...", state_ids)
                     state_ids = {
                         t: event_id
-                        for t, event_id in iteritems(state_ids)
+                        for t, event_id in state_ids.items()
                         if cache.get(t[1]) != event_id
                     }
                     logger.debug("...to %r", state_ids)
@@ -1430,7 +1428,7 @@ class SyncHandler(object):
         if since_token:
             for joined_sync in sync_result_builder.joined:
                 it = itertools.chain(
-                    joined_sync.timeline.events, itervalues(joined_sync.state)
+                    joined_sync.timeline.events, joined_sync.state.values()
                 )
                 for event in it:
                     if event.type == EventTypes.Member:
@@ -1505,7 +1503,7 @@ class SyncHandler(object):
         newly_left_rooms = []
         room_entries = []
         invited = []
-        for room_id, events in iteritems(mem_change_events_by_room_id):
+        for room_id, events in mem_change_events_by_room_id.items():
             logger.debug(
                 "Membership changes in %s: [%s]",
                 room_id,
@@ -1993,17 +1991,17 @@ def _calculate_state(
     event_id_to_key = {
         e: key
         for key, e in itertools.chain(
-            iteritems(timeline_contains),
-            iteritems(previous),
-            iteritems(timeline_start),
-            iteritems(current),
+            timeline_contains.items(),
+            previous.items(),
+            timeline_start.items(),
+            current.items(),
         )
     }
 
-    c_ids = set(itervalues(current))
-    ts_ids = set(itervalues(timeline_start))
-    p_ids = set(itervalues(previous))
-    tc_ids = set(itervalues(timeline_contains))
+    c_ids = set(current.values())
+    ts_ids = set(timeline_start.values())
+    p_ids = set(previous.values())
+    tc_ids = set(timeline_contains.values())
 
     # If we are lazyloading room members, we explicitly add the membership events
     # for the senders in the timeline into the state block returned by /sync,
@@ -2017,7 +2015,7 @@ def _calculate_state(
 
     if lazy_load_members:
         p_ids.difference_update(
-            e for t, e in iteritems(timeline_start) if t[0] == EventTypes.Member
+            e for t, e in timeline_start.items() if t[0] == EventTypes.Member
         )
 
     state_ids = ((c_ids | ts_ids) - p_ids) - tc_ids
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index c7bc14c623..6c7abaa578 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -15,9 +15,7 @@
 
 import logging
 from collections import namedtuple
-from typing import List
-
-from twisted.internet import defer
+from typing import List, Tuple
 
 from synapse.api.errors import AuthError, SynapseError
 from synapse.logging.context import run_in_background
@@ -115,8 +113,7 @@ class TypingHandler(object):
     def is_typing(self, member):
         return member.user_id in self._room_typing.get(member.room_id, [])
 
-    @defer.inlineCallbacks
-    def started_typing(self, target_user, auth_user, room_id, timeout):
+    async def started_typing(self, target_user, auth_user, room_id, timeout):
         target_user_id = target_user.to_string()
         auth_user_id = auth_user.to_string()
 
@@ -126,7 +123,7 @@ class TypingHandler(object):
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, target_user_id)
 
         logger.debug("%s has started typing in %s", target_user_id, room_id)
 
@@ -145,8 +142,7 @@ class TypingHandler(object):
 
         self._push_update(member=member, typing=True)
 
-    @defer.inlineCallbacks
-    def stopped_typing(self, target_user, auth_user, room_id):
+    async def stopped_typing(self, target_user, auth_user, room_id):
         target_user_id = target_user.to_string()
         auth_user_id = auth_user.to_string()
 
@@ -156,7 +152,7 @@ class TypingHandler(object):
         if target_user_id != auth_user_id:
             raise AuthError(400, "Cannot set another user's typing state")
 
-        yield self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, target_user_id)
 
         logger.debug("%s has stopped typing in %s", target_user_id, room_id)
 
@@ -164,12 +160,11 @@ class TypingHandler(object):
 
         self._stopped_typing(member)
 
-    @defer.inlineCallbacks
     def user_left_room(self, user, room_id):
         user_id = user.to_string()
         if self.is_mine_id(user_id):
             member = RoomMember(room_id=room_id, user_id=user_id)
-            yield self._stopped_typing(member)
+            self._stopped_typing(member)
 
     def _stopped_typing(self, member):
         if member.user_id not in self._room_typing.get(member.room_id, set()):
@@ -188,10 +183,9 @@ class TypingHandler(object):
 
         self._push_update_local(member=member, typing=typing)
 
-    @defer.inlineCallbacks
-    def _push_remote(self, member, typing):
+    async def _push_remote(self, member, typing):
         try:
-            users = yield self.state.get_current_users_in_room(member.room_id)
+            users = await self.state.get_current_users_in_room(member.room_id)
             self._member_last_federation_poke[member] = self.clock.time_msec()
 
             now = self.clock.time_msec()
@@ -215,8 +209,7 @@ class TypingHandler(object):
         except Exception:
             logger.exception("Error pushing typing notif to remotes")
 
-    @defer.inlineCallbacks
-    def _recv_edu(self, origin, content):
+    async def _recv_edu(self, origin, content):
         room_id = content["room_id"]
         user_id = content["user_id"]
 
@@ -231,7 +224,7 @@ class TypingHandler(object):
             )
             return
 
-        users = yield self.state.get_current_users_in_room(room_id)
+        users = await self.state.get_current_users_in_room(room_id)
         domains = {get_domain_from_id(u) for u in users}
 
         if self.server_name in domains:
@@ -259,14 +252,31 @@ class TypingHandler(object):
         )
 
     async def get_all_typing_updates(
-        self, last_id: int, current_id: int, limit: int
-    ) -> List[dict]:
-        """Get up to `limit` typing updates between the given tokens, earliest
-        updates first.
+        self, instance_name: str, last_id: int, current_id: int, limit: int
+    ) -> Tuple[List[Tuple[int, list]], int, bool]:
+        """Get updates for typing replication stream.
+
+        Args:
+            instance_name: The writer we want to fetch updates from. Unused
+                here since there is only ever one writer.
+            last_id: The token to fetch updates from. Exclusive.
+            current_id: The token to fetch updates up to. Inclusive.
+            limit: The requested limit for the number of rows to return. The
+                function may return more or fewer rows.
+
+        Returns:
+            A tuple consisting of: the updates, a token to use to fetch
+            subsequent updates, and whether we returned fewer rows than exists
+            between the requested tokens due to the limit.
+
+            The token returned can be used in a subsequent call to this
+            function to get further updatees.
+
+            The updates are a list of 2-tuples of stream ID and the row data
         """
 
         if last_id == current_id:
-            return []
+            return [], current_id, False
 
         changed_rooms = self._typing_stream_change_cache.get_all_entities_changed(
             last_id
@@ -280,9 +290,16 @@ class TypingHandler(object):
             serial = self._room_serials[room_id]
             if last_id < serial <= current_id:
                 typing = self._room_typing[room_id]
-                rows.append((serial, room_id, list(typing)))
+                rows.append((serial, [room_id, list(typing)]))
         rows.sort()
-        return rows[:limit]
+
+        limited = False
+        if len(rows) > limit:
+            rows = rows[:limit]
+            current_id = rows[-1][0]
+            limited = True
+
+        return rows, current_id, limited
 
     def get_current_token(self):
         return self._latest_room_serial
@@ -306,7 +323,7 @@ class TypingNotificationEventSource(object):
             "content": {"user_ids": list(typing)},
         }
 
-    def get_new_events(self, from_key, room_ids, **kwargs):
+    async def get_new_events(self, from_key, room_ids, **kwargs):
         with Measure(self.clock, "typing.get_new_events"):
             from_key = int(from_key)
             handler = self.get_typing_handler()
@@ -320,7 +337,7 @@ class TypingNotificationEventSource(object):
 
                 events.append(self._make_event_for(room_id))
 
-            return defer.succeed((events, handler._latest_room_serial))
+            return (events, handler._latest_room_serial)
 
     def get_current_key(self):
         return self.get_typing_handler()._latest_room_serial
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index 12423b909a..521b6d620d 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -15,8 +15,6 @@
 
 import logging
 
-from six import iteritems, iterkeys
-
 import synapse.metrics
 from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
@@ -289,7 +287,7 @@ class UserDirectoryHandler(StateDeltasHandler):
         users_with_profile = await self.state.get_current_users_in_room(room_id)
 
         # Remove every user from the sharing tables for that room.
-        for user_id in iterkeys(users_with_profile):
+        for user_id in users_with_profile.keys():
             await self.store.remove_user_who_share_room(user_id, room_id)
 
         # Then, re-add them to the tables.
@@ -298,7 +296,7 @@ class UserDirectoryHandler(StateDeltasHandler):
         # which when ran over an entire room, will result in the same values
         # being added multiple times. The batching upserts shouldn't make this
         # too bad, though.
-        for user_id, profile in iteritems(users_with_profile):
+        for user_id, profile in users_with_profile.items():
             await self._handle_new_user(room_id, user_id, profile)
 
     async def _handle_new_user(self, room_id, user_id, profile):