summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/__init__.py33
-rw-r--r--synapse/handlers/account_data.py14
-rw-r--r--synapse/handlers/account_validity.py3
-rw-r--r--synapse/handlers/appservice.py11
-rw-r--r--synapse/handlers/auth.py11
-rw-r--r--synapse/handlers/deactivate_account.py13
-rw-r--r--synapse/handlers/device.py171
-rw-r--r--synapse/handlers/devicemessage.py25
-rw-r--r--synapse/handlers/directory.py12
-rw-r--r--synapse/handlers/e2e_keys.py16
-rw-r--r--synapse/handlers/federation.py12
-rw-r--r--synapse/handlers/initial_sync.py10
-rw-r--r--synapse/handlers/message.py231
-rw-r--r--synapse/handlers/oidc_handler.py11
-rw-r--r--synapse/handlers/pagination.py4
-rw-r--r--synapse/handlers/password_policy.py10
-rw-r--r--synapse/handlers/profile.py29
-rw-r--r--synapse/handlers/read_marker.py10
-rw-r--r--synapse/handlers/register.py2
-rw-r--r--synapse/handlers/room.py21
-rw-r--r--synapse/handlers/room_member.py120
-rw-r--r--synapse/handlers/stats.py2
-rw-r--r--synapse/handlers/sync.py40
-rw-r--r--synapse/handlers/ui_auth/checkers.py2
24 files changed, 497 insertions, 316 deletions
diff --git a/synapse/handlers/__init__.py b/synapse/handlers/__init__.py
index 286f0054be..bfebb0f644 100644
--- a/synapse/handlers/__init__.py
+++ b/synapse/handlers/__init__.py
@@ -12,36 +12,3 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
-
-from .admin import AdminHandler
-from .directory import DirectoryHandler
-from .federation import FederationHandler
-from .identity import IdentityHandler
-from .search import SearchHandler
-
-
-class Handlers:
-
-    """ Deprecated. A collection of handlers.
-
-    At some point most of the classes whose name ended "Handler" were
-    accessed through this class.
-
-    However this makes it painful to unit test the handlers and to run cut
-    down versions of synapse that only use specific handlers because using a
-    single handler required creating all of the handlers. So some of the
-    handlers have been lifted out of the Handlers object and are now accessed
-    directly through the homeserver object itself.
-
-    Any new handlers should follow the new pattern of being accessed through
-    the homeserver object and should not be added to the Handlers object.
-
-    The remaining handlers should be moved out of the handlers object.
-    """
-
-    def __init__(self, hs):
-        self.federation_handler = FederationHandler(hs)
-        self.directory_handler = DirectoryHandler(hs)
-        self.admin_handler = AdminHandler(hs)
-        self.identity_handler = IdentityHandler(hs)
-        self.search_handler = SearchHandler(hs)
diff --git a/synapse/handlers/account_data.py b/synapse/handlers/account_data.py
index 9112a0ab86..341135822e 100644
--- a/synapse/handlers/account_data.py
+++ b/synapse/handlers/account_data.py
@@ -12,16 +12,24 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+from typing import TYPE_CHECKING, List, Tuple
+
+from synapse.types import JsonDict, UserID
+
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
 
 
 class AccountDataEventSource:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.store = hs.get_datastore()
 
-    def get_current_key(self, direction="f"):
+    def get_current_key(self, direction: str = "f") -> int:
         return self.store.get_max_account_data_stream_id()
 
-    async def get_new_events(self, user, from_key, **kwargs):
+    async def get_new_events(
+        self, user: UserID, from_key: int, **kwargs
+    ) -> Tuple[List[JsonDict], int]:
         user_id = user.to_string()
         last_stream_id = from_key
 
diff --git a/synapse/handlers/account_validity.py b/synapse/handlers/account_validity.py
index 4caf6d591a..f33044e97a 100644
--- a/synapse/handlers/account_validity.py
+++ b/synapse/handlers/account_validity.py
@@ -70,7 +70,8 @@ class AccountValidityHandler:
                     "send_renewals", self._send_renewal_emails
                 )
 
-            self.clock.looping_call(send_emails, 30 * 60 * 1000)
+            if hs.config.run_background_tasks:
+                self.clock.looping_call(send_emails, 30 * 60 * 1000)
 
     async def _send_renewal_emails(self):
         """Gets the list of users whose account is expiring in the amount of time
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 9d4e87dad6..c8d5e58035 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -27,6 +27,7 @@ from synapse.metrics import (
     event_processing_loop_room_count,
 )
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.types import RoomStreamToken
 from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
@@ -47,15 +48,17 @@ class ApplicationServicesHandler:
         self.current_max = 0
         self.is_processing = False
 
-    async def notify_interested_services(self, current_id):
+    async def notify_interested_services(self, max_token: RoomStreamToken):
         """Notifies (pushes) all application services interested in this event.
 
         Pushing is done asynchronously, so this method won't block for any
         prolonged length of time.
-
-        Args:
-            current_id(int): The current maximum ID.
         """
+        # We just use the minimum stream ordering and ignore the vector clock
+        # component. This is safe to do as long as we *always* ignore the vector
+        # clock components.
+        current_id = max_token.stream
+
         services = self.store.get_app_services()
         if not services or not self.notify_appservices:
             return
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 00eae92052..1d1ddc2245 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -164,7 +164,14 @@ class AuthHandler(BaseHandler):
 
         self.bcrypt_rounds = hs.config.bcrypt_rounds
 
+        # we can't use hs.get_module_api() here, because to do so will create an
+        # import loop.
+        #
+        # TODO: refactor this class to separate the lower-level stuff that
+        #   ModuleApi can use from the higher-level stuff that uses ModuleApi, as
+        #   better way to break the loop
         account_handler = ModuleApi(hs, self)
+
         self.password_providers = [
             module(config=config, account_handler=account_handler)
             for module, config in hs.config.password_providers
@@ -212,7 +219,7 @@ class AuthHandler(BaseHandler):
         self._clock = self.hs.get_clock()
 
         # Expire old UI auth sessions after a period of time.
-        if hs.config.worker_app is None:
+        if hs.config.run_background_tasks:
             self._clock.looping_call(
                 run_as_background_process,
                 5 * 60 * 1000,
@@ -1073,7 +1080,7 @@ class AuthHandler(BaseHandler):
         if medium == "email":
             address = canonicalise_email(address)
 
-        identity_handler = self.hs.get_handlers().identity_handler
+        identity_handler = self.hs.get_identity_handler()
         result = await identity_handler.try_unbind_threepid(
             user_id, {"medium": medium, "address": address, "id_server": id_server}
         )
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 0635ad5708..4efe6c530a 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -14,7 +14,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Optional
+from typing import TYPE_CHECKING, Optional
 
 from synapse.api.errors import SynapseError
 from synapse.metrics.background_process_metrics import run_as_background_process
@@ -22,19 +22,22 @@ from synapse.types import UserID, create_requester
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class DeactivateAccountHandler(BaseHandler):
     """Handler which deals with deactivating user accounts."""
 
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.hs = hs
         self._auth_handler = hs.get_auth_handler()
         self._device_handler = hs.get_device_handler()
         self._room_member_handler = hs.get_room_member_handler()
-        self._identity_handler = hs.get_handlers().identity_handler
+        self._identity_handler = hs.get_identity_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
 
         # Flag that indicates whether the process to part users from rooms is running
@@ -42,7 +45,7 @@ class DeactivateAccountHandler(BaseHandler):
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
-        if hs.config.worker_app is None:
+        if hs.config.run_background_tasks:
             hs.get_reactor().callWhenRunning(self._start_user_parting)
 
         self._account_validity_enabled = hs.config.account_validity.enabled
@@ -137,7 +140,7 @@ class DeactivateAccountHandler(BaseHandler):
 
         return identity_server_supports_unbinding
 
-    async def _reject_pending_invites_for_user(self, user_id: str):
+    async def _reject_pending_invites_for_user(self, user_id: str) -> None:
         """Reject pending invites addressed to a given user ID.
 
         Args:
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index b9d9098104..debb1b4f29 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -1,7 +1,7 @@
 # -*- coding: utf-8 -*-
 # Copyright 2016 OpenMarket Ltd
 # Copyright 2019 New Vector Ltd
-# Copyright 2019 The Matrix.org Foundation C.I.C.
+# Copyright 2019,2020 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import Any, Dict, List, Optional
+from typing import TYPE_CHECKING, Any, Dict, Iterable, List, Optional, Set, Tuple
 
 from synapse.api import errors
 from synapse.api.constants import EventTypes
@@ -29,7 +29,10 @@ from synapse.api.errors import (
 from synapse.logging.opentracing import log_kv, set_tag, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.types import (
+    Collection,
+    JsonDict,
     StreamToken,
+    UserID,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
 )
@@ -41,13 +44,16 @@ from synapse.util.retryutils import NotRetryingDestination
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 MAX_DEVICE_DISPLAY_NAME_LEN = 100
 
 
 class DeviceWorkerHandler(BaseHandler):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.hs = hs
@@ -105,7 +111,9 @@ class DeviceWorkerHandler(BaseHandler):
 
     @trace
     @measure_func("device.get_user_ids_changed")
-    async def get_user_ids_changed(self, user_id: str, from_token: StreamToken):
+    async def get_user_ids_changed(
+        self, user_id: str, from_token: StreamToken
+    ) -> JsonDict:
         """Get list of users that have had the devices updated, or have newly
         joined a room, that `user_id` may be interested in.
         """
@@ -221,8 +229,8 @@ class DeviceWorkerHandler(BaseHandler):
             possibly_joined = possibly_changed & users_who_share_room
             possibly_left = (possibly_changed | possibly_left) - users_who_share_room
         else:
-            possibly_joined = []
-            possibly_left = []
+            possibly_joined = set()
+            possibly_left = set()
 
         result = {"changed": list(possibly_joined), "left": list(possibly_left)}
 
@@ -230,7 +238,7 @@ class DeviceWorkerHandler(BaseHandler):
 
         return result
 
-    async def on_federation_query_user_devices(self, user_id):
+    async def on_federation_query_user_devices(self, user_id: str) -> JsonDict:
         stream_id, devices = await self.store.get_e2e_device_keys_for_federation_query(
             user_id
         )
@@ -249,7 +257,7 @@ class DeviceWorkerHandler(BaseHandler):
 
 
 class DeviceHandler(DeviceWorkerHandler):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
 
         self.federation_sender = hs.get_federation_sender()
@@ -264,7 +272,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         hs.get_distributor().observe("user_left_room", self.user_left_room)
 
-    def _check_device_name_length(self, name: str):
+    def _check_device_name_length(self, name: Optional[str]):
         """
         Checks whether a device name is longer than the maximum allowed length.
 
@@ -283,8 +291,11 @@ class DeviceHandler(DeviceWorkerHandler):
             )
 
     async def check_device_registered(
-        self, user_id, device_id, initial_device_display_name=None
-    ):
+        self,
+        user_id: str,
+        device_id: Optional[str],
+        initial_device_display_name: Optional[str] = None,
+    ) -> str:
         """
         If the given device has not been registered, register it with the
         supplied display name.
@@ -292,12 +303,11 @@ class DeviceHandler(DeviceWorkerHandler):
         If no device_id is supplied, we make one up.
 
         Args:
-            user_id (str):  @user:id
-            device_id (str | None): device id supplied by client
-            initial_device_display_name (str | None): device display name from
-                 client
+            user_id:  @user:id
+            device_id: device id supplied by client
+            initial_device_display_name: device display name from client
         Returns:
-            str: device id (generated if none was supplied)
+            device id (generated if none was supplied)
         """
 
         self._check_device_name_length(initial_device_display_name)
@@ -316,15 +326,15 @@ class DeviceHandler(DeviceWorkerHandler):
         # times in case of a clash.
         attempts = 0
         while attempts < 5:
-            device_id = stringutils.random_string(10).upper()
+            new_device_id = stringutils.random_string(10).upper()
             new_device = await self.store.store_device(
                 user_id=user_id,
-                device_id=device_id,
+                device_id=new_device_id,
                 initial_device_display_name=initial_device_display_name,
             )
             if new_device:
-                await self.notify_device_update(user_id, [device_id])
-                return device_id
+                await self.notify_device_update(user_id, [new_device_id])
+                return new_device_id
             attempts += 1
 
         raise errors.StoreError(500, "Couldn't generate a device ID.")
@@ -433,7 +443,9 @@ class DeviceHandler(DeviceWorkerHandler):
 
     @trace
     @measure_func("notify_device_update")
-    async def notify_device_update(self, user_id, device_ids):
+    async def notify_device_update(
+        self, user_id: str, device_ids: Collection[str]
+    ) -> None:
         """Notify that a user's device(s) has changed. Pokes the notifier, and
         remote servers if the user is local.
         """
@@ -445,7 +457,7 @@ class DeviceHandler(DeviceWorkerHandler):
             user_id
         )
 
-        hosts = set()
+        hosts = set()  # type: Set[str]
         if self.hs.is_mine_id(user_id):
             hosts.update(get_domain_from_id(u) for u in users_who_share_room)
             hosts.discard(self.server_name)
@@ -497,7 +509,7 @@ class DeviceHandler(DeviceWorkerHandler):
 
         self.notifier.on_new_event("device_list_key", position, users=[from_user_id])
 
-    async def user_left_room(self, user, room_id):
+    async def user_left_room(self, user: UserID, room_id: str) -> None:
         user_id = user.to_string()
         room_ids = await self.store.get_rooms_for_user(user_id)
         if not room_ids:
@@ -505,8 +517,89 @@ class DeviceHandler(DeviceWorkerHandler):
             # receive device updates. Mark this in DB.
             await self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
 
+    async def store_dehydrated_device(
+        self,
+        user_id: str,
+        device_data: JsonDict,
+        initial_device_display_name: Optional[str] = None,
+    ) -> str:
+        """Store a dehydrated device for a user.  If the user had a previous
+        dehydrated device, it is removed.
+
+        Args:
+            user_id: the user that we are storing the device for
+            device_data: the dehydrated device information
+            initial_device_display_name: The display name to use for the device
+        Returns:
+            device id of the dehydrated device
+        """
+        device_id = await self.check_device_registered(
+            user_id, None, initial_device_display_name,
+        )
+        old_device_id = await self.store.store_dehydrated_device(
+            user_id, device_id, device_data
+        )
+        if old_device_id is not None:
+            await self.delete_device(user_id, old_device_id)
+        return device_id
+
+    async def get_dehydrated_device(
+        self, user_id: str
+    ) -> Optional[Tuple[str, JsonDict]]:
+        """Retrieve the information for a dehydrated device.
+
+        Args:
+            user_id: the user whose dehydrated device we are looking for
+        Returns:
+            a tuple whose first item is the device ID, and the second item is
+            the dehydrated device information
+        """
+        return await self.store.get_dehydrated_device(user_id)
+
+    async def rehydrate_device(
+        self, user_id: str, access_token: str, device_id: str
+    ) -> dict:
+        """Process a rehydration request from the user.
+
+        Args:
+            user_id: the user who is rehydrating the device
+            access_token: the access token used for the request
+            device_id: the ID of the device that will be rehydrated
+        Returns:
+            a dict containing {"success": True}
+        """
+        success = await self.store.remove_dehydrated_device(user_id, device_id)
+
+        if not success:
+            raise errors.NotFoundError()
+
+        # If the dehydrated device was successfully deleted (the device ID
+        # matched the stored dehydrated device), then modify the access
+        # token to use the dehydrated device's ID and copy the old device
+        # display name to the dehydrated device, and destroy the old device
+        # ID
+        old_device_id = await self.store.set_device_for_access_token(
+            access_token, device_id
+        )
+        old_device = await self.store.get_device(user_id, old_device_id)
+        await self.store.update_device(user_id, device_id, old_device["display_name"])
+        # can't call self.delete_device because that will clobber the
+        # access token so call the storage layer directly
+        await self.store.delete_device(user_id, old_device_id)
+        await self.store.delete_e2e_keys_by_device(
+            user_id=user_id, device_id=old_device_id
+        )
 
-def _update_device_from_client_ips(device, client_ips):
+        # tell everyone that the old device is gone and that the dehydrated
+        # device has a new display name
+        await self.notify_device_update(user_id, [old_device_id, device_id])
+
+        return {"success": True}
+
+
+def _update_device_from_client_ips(
+    device: Dict[str, Any], client_ips: Dict[Tuple[str, str], Dict[str, Any]]
+) -> None:
     ip = client_ips.get((device["user_id"], device["device_id"]), {})
     device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
 
@@ -514,7 +607,7 @@ def _update_device_from_client_ips(device, client_ips):
 class DeviceListUpdater:
     "Handles incoming device list updates from federation and updates the DB"
 
-    def __init__(self, hs, device_handler):
+    def __init__(self, hs: "HomeServer", device_handler: DeviceHandler):
         self.store = hs.get_datastore()
         self.federation = hs.get_federation_client()
         self.clock = hs.get_clock()
@@ -523,7 +616,9 @@ class DeviceListUpdater:
         self._remote_edu_linearizer = Linearizer(name="remote_device_list")
 
         # user_id -> list of updates waiting to be handled.
-        self._pending_updates = {}
+        self._pending_updates = (
+            {}
+        )  # type: Dict[str, List[Tuple[str, str, Iterable[str], JsonDict]]]
 
         # Recently seen stream ids. We don't bother keeping these in the DB,
         # but they're useful to have them about to reduce the number of spurious
@@ -546,7 +641,9 @@ class DeviceListUpdater:
         )
 
     @trace
-    async def incoming_device_list_update(self, origin, edu_content):
+    async def incoming_device_list_update(
+        self, origin: str, edu_content: JsonDict
+    ) -> None:
         """Called on incoming device list update from federation. Responsible
         for parsing the EDU and adding to pending updates list.
         """
@@ -607,7 +704,7 @@ class DeviceListUpdater:
         await self._handle_device_updates(user_id)
 
     @measure_func("_incoming_device_list_update")
-    async def _handle_device_updates(self, user_id):
+    async def _handle_device_updates(self, user_id: str) -> None:
         "Actually handle pending updates."
 
         with (await self._remote_edu_linearizer.queue(user_id)):
@@ -655,7 +752,9 @@ class DeviceListUpdater:
                     stream_id for _, stream_id, _, _ in pending_updates
                 )
 
-    async def _need_to_do_resync(self, user_id, updates):
+    async def _need_to_do_resync(
+        self, user_id: str, updates: Iterable[Tuple[str, str, Iterable[str], JsonDict]]
+    ) -> bool:
         """Given a list of updates for a user figure out if we need to do a full
         resync, or whether we have enough data that we can just apply the delta.
         """
@@ -686,7 +785,7 @@ class DeviceListUpdater:
         return False
 
     @trace
-    async def _maybe_retry_device_resync(self):
+    async def _maybe_retry_device_resync(self) -> None:
         """Retry to resync device lists that are out of sync, except if another retry is
         in progress.
         """
@@ -729,7 +828,7 @@ class DeviceListUpdater:
 
     async def user_device_resync(
         self, user_id: str, mark_failed_as_stale: bool = True
-    ) -> Optional[dict]:
+    ) -> Optional[JsonDict]:
         """Fetches all devices for a user and updates the device cache with them.
 
         Args:
@@ -753,7 +852,7 @@ class DeviceListUpdater:
                 # it later.
                 await self.store.mark_remote_user_device_cache_as_stale(user_id)
 
-            return
+            return None
         except (RequestSendFailed, HttpResponseException) as e:
             logger.warning(
                 "Failed to handle device list update for %s: %s", user_id, e,
@@ -770,12 +869,12 @@ class DeviceListUpdater:
             # next time we get a device list update for this user_id.
             # This makes it more likely that the device lists will
             # eventually become consistent.
-            return
+            return None
         except FederationDeniedError as e:
             set_tag("error", True)
             log_kv({"reason": "FederationDeniedError"})
             logger.info(e)
-            return
+            return None
         except Exception as e:
             set_tag("error", True)
             log_kv(
@@ -788,7 +887,7 @@ class DeviceListUpdater:
                 # it later.
                 await self.store.mark_remote_user_device_cache_as_stale(user_id)
 
-            return
+            return None
         log_kv({"result": result})
         stream_id = result["stream_id"]
         devices = result["devices"]
@@ -849,7 +948,7 @@ class DeviceListUpdater:
         user_id: str,
         master_key: Optional[Dict[str, Any]],
         self_signing_key: Optional[Dict[str, Any]],
-    ) -> list:
+    ) -> List[str]:
         """Process the given new master and self-signing key for the given remote user.
 
         Args:
diff --git a/synapse/handlers/devicemessage.py b/synapse/handlers/devicemessage.py
index 64ef7f63ab..9cac5a8463 100644
--- a/synapse/handlers/devicemessage.py
+++ b/synapse/handlers/devicemessage.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import Any, Dict
+from typing import TYPE_CHECKING, Any, Dict
 
 from synapse.api.errors import SynapseError
 from synapse.logging.context import run_in_background
@@ -24,18 +24,22 @@ from synapse.logging.opentracing import (
     set_tag,
     start_active_span,
 )
-from synapse.types import UserID, get_domain_from_id
+from synapse.types import JsonDict, UserID, get_domain_from_id
 from synapse.util import json_encoder
 from synapse.util.stringutils import random_string
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
+
 logger = logging.getLogger(__name__)
 
 
 class DeviceMessageHandler:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         """
         Args:
-            hs (synapse.server.HomeServer): server
+            hs: server
         """
         self.store = hs.get_datastore()
         self.notifier = hs.get_notifier()
@@ -48,7 +52,7 @@ class DeviceMessageHandler:
 
         self._device_list_updater = hs.get_device_handler().device_list_updater
 
-    async def on_direct_to_device_edu(self, origin, content):
+    async def on_direct_to_device_edu(self, origin: str, content: JsonDict) -> None:
         local_messages = {}
         sender_user_id = content["sender"]
         if origin != get_domain_from_id(sender_user_id):
@@ -95,7 +99,7 @@ class DeviceMessageHandler:
         message_type: str,
         sender_user_id: str,
         by_device: Dict[str, Dict[str, Any]],
-    ):
+    ) -> None:
         """Checks inbound device messages for unknown remote devices, and if
         found marks the remote cache for the user as stale.
         """
@@ -138,11 +142,16 @@ class DeviceMessageHandler:
                 self._device_list_updater.user_device_resync, sender_user_id
             )
 
-    async def send_device_message(self, sender_user_id, message_type, messages):
+    async def send_device_message(
+        self,
+        sender_user_id: str,
+        message_type: str,
+        messages: Dict[str, Dict[str, JsonDict]],
+    ) -> None:
         set_tag("number_of_messages", len(messages))
         set_tag("sender", sender_user_id)
         local_messages = {}
-        remote_messages = {}
+        remote_messages = {}  # type: Dict[str, Dict[str, Dict[str, JsonDict]]]
         for user_id, by_device in messages.items():
             # we use UserID.from_string to catch invalid user ids
             if self.is_mine(UserID.from_string(user_id)):
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 62aa9a2da8..ad5683d251 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -46,6 +46,7 @@ class DirectoryHandler(BaseHandler):
         self.config = hs.config
         self.enable_room_list_search = hs.config.enable_room_list_search
         self.require_membership = hs.config.require_membership_for_aliases
+        self.third_party_event_rules = hs.get_third_party_event_rules()
 
         self.federation = hs.get_federation_client()
         hs.get_federation_registry().register_query_handler(
@@ -383,7 +384,7 @@ class DirectoryHandler(BaseHandler):
         """
         creator = await self.store.get_room_alias_creator(alias.to_string())
 
-        if creator is not None and creator == user_id:
+        if creator == user_id:
             return True
 
         # Resolve the alias to the corresponding room.
@@ -454,6 +455,15 @@ class DirectoryHandler(BaseHandler):
                 # per alias creation rule?
                 raise SynapseError(403, "Not allowed to publish room")
 
+            # Check if publishing is blocked by a third party module
+            allowed_by_third_party_rules = await (
+                self.third_party_event_rules.check_visibility_can_be_modified(
+                    room_id, visibility
+                )
+            )
+            if not allowed_by_third_party_rules:
+                raise SynapseError(403, "Not allowed to publish room")
+
         await self.store.set_room_is_public(room_id, making_public)
 
     async def edit_published_appservice_room_list(
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index dd40fd1299..611742ae72 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -496,6 +496,22 @@ class E2eKeysHandler:
             log_kv(
                 {"message": "Did not update one_time_keys", "reason": "no keys given"}
             )
+        fallback_keys = keys.get("org.matrix.msc2732.fallback_keys", None)
+        if fallback_keys and isinstance(fallback_keys, dict):
+            log_kv(
+                {
+                    "message": "Updating fallback_keys for device.",
+                    "user_id": user_id,
+                    "device_id": device_id,
+                }
+            )
+            await self.store.set_e2e_fallback_keys(user_id, device_id, fallback_keys)
+        elif fallback_keys:
+            log_kv({"message": "Did not update fallback_keys", "reason": "not a dict"})
+        else:
+            log_kv(
+                {"message": "Did not update fallback_keys", "reason": "no keys given"}
+            )
 
         # the device should have been registered already, but it may have been
         # deleted due to a race with a DELETE request. Or we may be using an
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 1a8144405a..455acd7669 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -2966,17 +2966,20 @@ class FederationHandler(BaseHandler):
             return result["max_stream_id"]
         else:
             assert self.storage.persistence
-            max_stream_token = await self.storage.persistence.persist_events(
+
+            # Note that this returns the events that were persisted, which may not be
+            # the same as were passed in if some were deduplicated due to transaction IDs.
+            events, max_stream_token = await self.storage.persistence.persist_events(
                 event_and_contexts, backfilled=backfilled
             )
 
             if self._ephemeral_messages_enabled:
-                for (event, context) in event_and_contexts:
+                for event in events:
                     # If there's an expiry timestamp on the event, schedule its expiry.
                     self._message_handler.maybe_schedule_expiry(event)
 
             if not backfilled:  # Never notify for backfilled events
-                for event, _ in event_and_contexts:
+                for event in events:
                     await self._notify_persisted_event(event, max_stream_token)
 
             return max_stream_token.stream
@@ -3008,6 +3011,9 @@ class FederationHandler(BaseHandler):
         elif event.internal_metadata.is_outlier():
             return
 
+        # the event has been persisted so it should have a stream ordering.
+        assert event.internal_metadata.stream_ordering
+
         event_pos = PersistedEventPosition(
             self._instance_name, event.internal_metadata.stream_ordering
         )
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 39a85801c1..98075f48d2 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -14,7 +14,7 @@
 # limitations under the License.
 
 import logging
-from typing import TYPE_CHECKING
+from typing import TYPE_CHECKING, Optional, Tuple
 
 from twisted.internet import defer
 
@@ -47,12 +47,14 @@ class InitialSyncHandler(BaseHandler):
         self.state = hs.get_state_handler()
         self.clock = hs.get_clock()
         self.validator = EventValidator()
-        self.snapshot_cache = ResponseCache(hs, "initial_sync_cache")
+        self.snapshot_cache = ResponseCache(
+            hs, "initial_sync_cache"
+        )  # type: ResponseCache[Tuple[str, Optional[StreamToken], Optional[StreamToken], str, Optional[int], bool, bool]]
         self._event_serializer = hs.get_event_client_serializer()
         self.storage = hs.get_storage()
         self.state_store = self.storage.state
 
-    def snapshot_all_rooms(
+    async def snapshot_all_rooms(
         self,
         user_id: str,
         pagin_config: PaginationConfig,
@@ -84,7 +86,7 @@ class InitialSyncHandler(BaseHandler):
             include_archived,
         )
 
-        return self.snapshot_cache.wrap(
+        return await self.snapshot_cache.wrap(
             key,
             self._snapshot_all_rooms,
             user_id,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index ee271e85e5..f18f882596 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -59,6 +59,7 @@ from synapse.visibility import filter_events_for_client
 from ._base import BaseHandler
 
 if TYPE_CHECKING:
+    from synapse.events.third_party_rules import ThirdPartyEventRules
     from synapse.server import HomeServer
 
 logger = logging.getLogger(__name__)
@@ -393,27 +394,31 @@ class EventCreationHandler:
         self.action_generator = hs.get_action_generator()
 
         self.spam_checker = hs.get_spam_checker()
-        self.third_party_event_rules = hs.get_third_party_event_rules()
+        self.third_party_event_rules = (
+            self.hs.get_third_party_event_rules()
+        )  # type: ThirdPartyEventRules
 
         self._block_events_without_consent_error = (
             self.config.block_events_without_consent_error
         )
 
+        # we need to construct a ConsentURIBuilder here, as it checks that the necessary
+        # config options, but *only* if we have a configuration for which we are
+        # going to need it.
+        if self._block_events_without_consent_error:
+            self._consent_uri_builder = ConsentURIBuilder(self.config)
+
         # Rooms which should be excluded from dummy insertion. (For instance,
         # those without local users who can send events into the room).
         #
         # map from room id to time-of-last-attempt.
         #
         self._rooms_to_exclude_from_dummy_event_insertion = {}  # type: Dict[str, int]
-
-        # we need to construct a ConsentURIBuilder here, as it checks that the necessary
-        # config options, but *only* if we have a configuration for which we are
-        # going to need it.
-        if self._block_events_without_consent_error:
-            self._consent_uri_builder = ConsentURIBuilder(self.config)
+        # The number of forward extremeities before a dummy event is sent.
+        self._dummy_events_threshold = hs.config.dummy_events_threshold
 
         if (
-            not self.config.worker_app
+            self.config.run_background_tasks
             and self.config.cleanup_extremities_with_dummy_events
         ):
             self.clock.looping_call(
@@ -428,15 +433,13 @@ class EventCreationHandler:
 
         self._ephemeral_events_enabled = hs.config.enable_ephemeral_messages
 
-        self._dummy_events_threshold = hs.config.dummy_events_threshold
-
     async def create_event(
         self,
         requester: Requester,
         event_dict: dict,
-        token_id: Optional[str] = None,
         txn_id: Optional[str] = None,
         prev_event_ids: Optional[List[str]] = None,
+        auth_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
     ) -> Tuple[EventBase, EventContext]:
         """
@@ -450,13 +453,18 @@ class EventCreationHandler:
         Args:
             requester
             event_dict: An entire event
-            token_id
             txn_id
             prev_event_ids:
                 the forward extremities to use as the prev_events for the
                 new event.
 
                 If None, they will be requested from the database.
+
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
+
             require_consent: Whether to check if the requester has
                 consented to the privacy policy.
         Raises:
@@ -508,14 +516,17 @@ class EventCreationHandler:
         if require_consent and not is_exempt:
             await self.assert_accepted_privacy_policy(requester)
 
-        if token_id is not None:
-            builder.internal_metadata.token_id = token_id
+        if requester.access_token_id is not None:
+            builder.internal_metadata.token_id = requester.access_token_id
 
         if txn_id is not None:
             builder.internal_metadata.txn_id = txn_id
 
         event, context = await self.create_new_client_event(
-            builder=builder, requester=requester, prev_event_ids=prev_event_ids,
+            builder=builder,
+            requester=requester,
+            prev_event_ids=prev_event_ids,
+            auth_event_ids=auth_event_ids,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -635,59 +646,6 @@ class EventCreationHandler:
         msg = self._block_events_without_consent_error % {"consent_uri": consent_uri}
         raise ConsentNotGivenError(msg=msg, consent_uri=consent_uri)
 
-    async def send_nonmember_event(
-        self,
-        requester: Requester,
-        event: EventBase,
-        context: EventContext,
-        ratelimit: bool = True,
-        ignore_shadow_ban: bool = False,
-    ) -> int:
-        """
-        Persists and notifies local clients and federation of an event.
-
-        Args:
-            requester: The requester sending the event.
-            event: The event to send.
-            context: The context of the event.
-            ratelimit: Whether to rate limit this send.
-            ignore_shadow_ban: True if shadow-banned users should be allowed to
-                send this event.
-
-        Return:
-            The stream_id of the persisted event.
-
-        Raises:
-            ShadowBanError if the requester has been shadow-banned.
-        """
-        if event.type == EventTypes.Member:
-            raise SynapseError(
-                500, "Tried to send member event through non-member codepath"
-            )
-
-        if not ignore_shadow_ban and requester.shadow_banned:
-            # We randomly sleep a bit just to annoy the requester.
-            await self.clock.sleep(random.randint(1, 10))
-            raise ShadowBanError()
-
-        user = UserID.from_string(event.sender)
-
-        assert self.hs.is_mine(user), "User must be our own: %s" % (user,)
-
-        if event.is_state():
-            prev_event = await self.deduplicate_state_event(event, context)
-            if prev_event is not None:
-                logger.info(
-                    "Not bothering to persist state event %s duplicated by %s",
-                    event.event_id,
-                    prev_event.event_id,
-                )
-                return await self.store.get_stream_id_for_event(prev_event.event_id)
-
-        return await self.handle_new_client_event(
-            requester=requester, event=event, context=context, ratelimit=ratelimit
-        )
-
     async def deduplicate_state_event(
         self, event: EventBase, context: EventContext
     ) -> Optional[EventBase]:
@@ -728,7 +686,7 @@ class EventCreationHandler:
         """
         Creates an event, then sends it.
 
-        See self.create_event and self.send_nonmember_event.
+        See self.create_event and self.handle_new_client_event.
 
         Args:
             requester: The requester sending the event.
@@ -738,9 +696,19 @@ class EventCreationHandler:
             ignore_shadow_ban: True if shadow-banned users should be allowed to
                 send this event.
 
+        Returns:
+            The event, and its stream ordering (if deduplication happened,
+            the previous, duplicate event).
+
         Raises:
             ShadowBanError if the requester has been shadow-banned.
         """
+
+        if event_dict["type"] == EventTypes.Member:
+            raise SynapseError(
+                500, "Tried to send member event through non-member codepath"
+            )
+
         if not ignore_shadow_ban and requester.shadow_banned:
             # We randomly sleep a bit just to annoy the requester.
             await self.clock.sleep(random.randint(1, 10))
@@ -752,8 +720,25 @@ class EventCreationHandler:
         # extremities to pile up, which in turn leads to state resolution
         # taking longer.
         with (await self.limiter.queue(event_dict["room_id"])):
+            if txn_id and requester.access_token_id:
+                existing_event_id = await self.store.get_event_id_from_transaction_id(
+                    event_dict["room_id"],
+                    requester.user.to_string(),
+                    requester.access_token_id,
+                    txn_id,
+                )
+                if existing_event_id:
+                    event = await self.store.get_event(existing_event_id)
+                    # we know it was persisted, so must have a stream ordering
+                    assert event.internal_metadata.stream_ordering
+                    return event, event.internal_metadata.stream_ordering
+
             event, context = await self.create_event(
-                requester, event_dict, token_id=requester.access_token_id, txn_id=txn_id
+                requester, event_dict, txn_id=txn_id
+            )
+
+            assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+                event.sender,
             )
 
             spam_error = self.spam_checker.check_event_for_spam(event)
@@ -762,14 +747,17 @@ class EventCreationHandler:
                     spam_error = "Spam is not permitted here"
                 raise SynapseError(403, spam_error, Codes.FORBIDDEN)
 
-            stream_id = await self.send_nonmember_event(
-                requester,
-                event,
-                context,
+            ev = await self.handle_new_client_event(
+                requester=requester,
+                event=event,
+                context=context,
                 ratelimit=ratelimit,
                 ignore_shadow_ban=ignore_shadow_ban,
             )
-        return event, stream_id
+
+        # we know it was persisted, so must have a stream ordering
+        assert ev.internal_metadata.stream_ordering
+        return ev, ev.internal_metadata.stream_ordering
 
     @measure_func("create_new_client_event")
     async def create_new_client_event(
@@ -777,6 +765,7 @@ class EventCreationHandler:
         builder: EventBuilder,
         requester: Optional[Requester] = None,
         prev_event_ids: Optional[List[str]] = None,
+        auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
@@ -789,6 +778,11 @@ class EventCreationHandler:
 
                 If None, they will be requested from the database.
 
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
+
         Returns:
             Tuple of created event, context
         """
@@ -810,7 +804,9 @@ class EventCreationHandler:
             builder.type == EventTypes.Create or len(prev_event_ids) > 0
         ), "Attempting to create an event with no prev_events"
 
-        event = await builder.build(prev_event_ids=prev_event_ids)
+        event = await builder.build(
+            prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
+        )
         context = await self.state.compute_event_context(event)
         if requester:
             context.app_service = requester.app_service
@@ -843,8 +839,11 @@ class EventCreationHandler:
         context: EventContext,
         ratelimit: bool = True,
         extra_users: List[UserID] = [],
-    ) -> int:
-        """Processes a new event. This includes checking auth, persisting it,
+        ignore_shadow_ban: bool = False,
+    ) -> EventBase:
+        """Processes a new event.
+
+        This includes deduplicating, checking auth, persisting,
         notifying users, sending to remote servers, etc.
 
         If called from a worker will hit out to the master process for final
@@ -857,10 +856,39 @@ class EventCreationHandler:
             ratelimit
             extra_users: Any extra users to notify about event
 
+            ignore_shadow_ban: True if shadow-banned users should be allowed to
+                send this event.
+
         Return:
-            The stream_id of the persisted event.
+            If the event was deduplicated, the previous, duplicate, event. Otherwise,
+            `event`.
+
+        Raises:
+            ShadowBanError if the requester has been shadow-banned.
         """
 
+        # we don't apply shadow-banning to membership events here. Invites are blocked
+        # higher up the stack, and we allow shadow-banned users to send join and leave
+        # events as normal.
+        if (
+            event.type != EventTypes.Member
+            and not ignore_shadow_ban
+            and requester.shadow_banned
+        ):
+            # We randomly sleep a bit just to annoy the requester.
+            await self.clock.sleep(random.randint(1, 10))
+            raise ShadowBanError()
+
+        if event.is_state():
+            prev_event = await self.deduplicate_state_event(event, context)
+            if prev_event is not None:
+                logger.info(
+                    "Not bothering to persist state event %s duplicated by %s",
+                    event.event_id,
+                    prev_event.event_id,
+                )
+                return prev_event
+
         if event.is_state() and (event.type, event.state_key) == (
             EventTypes.Create,
             "",
@@ -914,14 +942,24 @@ class EventCreationHandler:
                     extra_users=extra_users,
                 )
                 stream_id = result["stream_id"]
-                event.internal_metadata.stream_ordering = stream_id
-                return stream_id
-
-            stream_id = await self.persist_and_notify_client_event(
+                event_id = result["event_id"]
+                if event_id != event.event_id:
+                    # If we get a different event back then it means that its
+                    # been de-duplicated, so we replace the given event with the
+                    # one already persisted.
+                    event = await self.store.get_event(event_id)
+                else:
+                    # If we newly persisted the event then we need to update its
+                    # stream_ordering entry manually (as it was persisted on
+                    # another worker).
+                    event.internal_metadata.stream_ordering = stream_id
+                return event
+
+            event = await self.persist_and_notify_client_event(
                 requester, event, context, ratelimit=ratelimit, extra_users=extra_users
             )
 
-            return stream_id
+            return event
         except Exception:
             # Ensure that we actually remove the entries in the push actions
             # staging area, if we calculated them.
@@ -966,11 +1004,16 @@ class EventCreationHandler:
         context: EventContext,
         ratelimit: bool = True,
         extra_users: List[UserID] = [],
-    ) -> int:
+    ) -> EventBase:
         """Called when we have fully built the event, have already
         calculated the push actions for the event, and checked auth.
 
         This should only be run on the instance in charge of persisting events.
+
+        Returns:
+            The persisted event. This may be different than the given event if
+            it was de-duplicated (e.g. because we had already persisted an
+            event with the same transaction ID.)
         """
         assert self.storage.persistence is not None
         assert self._events_shard_config.should_handle(
@@ -1018,7 +1061,7 @@ class EventCreationHandler:
 
             # Check the alias is currently valid (if it has changed).
             room_alias_str = event.content.get("alias", None)
-            directory_handler = self.hs.get_handlers().directory_handler
+            directory_handler = self.hs.get_directory_handler()
             if room_alias_str and room_alias_str != original_alias:
                 await self._validate_canonical_alias(
                     directory_handler, room_alias_str, event.room_id
@@ -1044,7 +1087,7 @@ class EventCreationHandler:
                         directory_handler, alias_str, event.room_id
                     )
 
-        federation_handler = self.hs.get_handlers().federation_handler
+        federation_handler = self.hs.get_federation_handler()
 
         if event.type == EventTypes.Member:
             if event.content["membership"] == Membership.INVITE:
@@ -1138,9 +1181,13 @@ class EventCreationHandler:
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
-        event_pos, max_stream_token = await self.storage.persistence.persist_event(
-            event, context=context
-        )
+        # Note that this returns the event that was persisted, which may not be
+        # the same as we passed in if it was deduplicated due transaction IDs.
+        (
+            event,
+            event_pos,
+            max_stream_token,
+        ) = await self.storage.persistence.persist_event(event, context=context)
 
         if self._ephemeral_events_enabled:
             # If there's an expiry timestamp on the event, schedule its expiry.
@@ -1161,7 +1208,7 @@ class EventCreationHandler:
             # matters as sometimes presence code can take a while.
             run_in_background(self._bump_active_time, requester.user)
 
-        return event_pos.stream
+        return event
 
     async def _bump_active_time(self, user: UserID) -> None:
         try:
@@ -1232,7 +1279,7 @@ class EventCreationHandler:
 
                 # Since this is a dummy-event it is OK if it is sent by a
                 # shadow-banned user.
-                await self.send_nonmember_event(
+                await self.handle_new_client_event(
                     requester, event, context, ratelimit=False, ignore_shadow_ban=True,
                 )
                 return True
diff --git a/synapse/handlers/oidc_handler.py b/synapse/handlers/oidc_handler.py
index 19cd652675..05ac86e697 100644
--- a/synapse/handlers/oidc_handler.py
+++ b/synapse/handlers/oidc_handler.py
@@ -96,6 +96,7 @@ class OidcHandler:
         self.hs = hs
         self._callback_url = hs.config.oidc_callback_url  # type: str
         self._scopes = hs.config.oidc_scopes  # type: List[str]
+        self._user_profile_method = hs.config.oidc_user_profile_method  # type: str
         self._client_auth = ClientAuth(
             hs.config.oidc_client_id,
             hs.config.oidc_client_secret,
@@ -196,11 +197,11 @@ class OidcHandler:
                     % (m["response_types_supported"],)
                 )
 
-        # If the openid scope was not requested, we need a userinfo endpoint to fetch user infos
+        # Ensure there's a userinfo endpoint to fetch from if it is required.
         if self._uses_userinfo:
             if m.get("userinfo_endpoint") is None:
                 raise ValueError(
-                    'provider has no "userinfo_endpoint", even though it is required because the "openid" scope is not requested'
+                    'provider has no "userinfo_endpoint", even though it is required'
                 )
         else:
             # If we're not using userinfo, we need a valid jwks to validate the ID token
@@ -220,8 +221,10 @@ class OidcHandler:
         ``access_token`` with the ``userinfo_endpoint``.
         """
 
-        # Maybe that should be user-configurable and not inferred?
-        return "openid" not in self._scopes
+        return (
+            "openid" not in self._scopes
+            or self._user_profile_method == "userinfo_endpoint"
+        )
 
     async def load_metadata(self) -> OpenIDProviderMetadata:
         """Load and validate the provider metadata.
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 2c2a633938..426b58da9e 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -92,7 +92,7 @@ class PaginationHandler:
         self._retention_allowed_lifetime_min = hs.config.retention_allowed_lifetime_min
         self._retention_allowed_lifetime_max = hs.config.retention_allowed_lifetime_max
 
-        if hs.config.retention_enabled:
+        if hs.config.run_background_tasks and hs.config.retention_enabled:
             # Run the purge jobs described in the configuration file.
             for job in hs.config.retention_purge_jobs:
                 logger.info("Setting up purge job with config: %s", job)
@@ -383,7 +383,7 @@ class PaginationHandler:
                             "room_key", leave_token
                         )
 
-                await self.hs.get_handlers().federation_handler.maybe_backfill(
+                await self.hs.get_federation_handler().maybe_backfill(
                     room_id, curr_topo, limit=pagin_config.limit,
                 )
 
diff --git a/synapse/handlers/password_policy.py b/synapse/handlers/password_policy.py
index 88e2f87200..6c635cc31b 100644
--- a/synapse/handlers/password_policy.py
+++ b/synapse/handlers/password_policy.py
@@ -16,14 +16,18 @@
 
 import logging
 import re
+from typing import TYPE_CHECKING
 
 from synapse.api.errors import Codes, PasswordRefusedError
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class PasswordPolicyHandler:
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         self.policy = hs.config.password_policy
         self.enabled = hs.config.password_policy_enabled
 
@@ -33,11 +37,11 @@ class PasswordPolicyHandler:
         self.regexp_uppercase = re.compile("[A-Z]")
         self.regexp_lowercase = re.compile("[a-z]")
 
-    def validate_password(self, password):
+    def validate_password(self, password: str) -> None:
         """Checks whether a given password complies with the server's policy.
 
         Args:
-            password (str): The password to check against the server's policy.
+            password: The password to check against the server's policy.
 
         Raises:
             PasswordRefusedError: The password doesn't comply with the server's policy.
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 5453e6dfc8..b784938755 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -35,14 +35,16 @@ MAX_DISPLAYNAME_LEN = 256
 MAX_AVATAR_URL_LEN = 1000
 
 
-class BaseProfileHandler(BaseHandler):
+class ProfileHandler(BaseHandler):
     """Handles fetching and updating user profile information.
 
-    BaseProfileHandler can be instantiated directly on workers and will
-    delegate to master when necessary. The master process should use the
-    subclass MasterProfileHandler
+    ProfileHandler can be instantiated directly on workers and will
+    delegate to master when necessary.
     """
 
+    PROFILE_UPDATE_MS = 60 * 1000
+    PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
+
     def __init__(self, hs):
         super().__init__(hs)
 
@@ -53,6 +55,11 @@ class BaseProfileHandler(BaseHandler):
 
         self.user_directory_handler = hs.get_user_directory_handler()
 
+        if hs.config.run_background_tasks:
+            self.clock.looping_call(
+                self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
+            )
+
     async def get_profile(self, user_id):
         target_user = UserID.from_string(user_id)
 
@@ -363,20 +370,6 @@ class BaseProfileHandler(BaseHandler):
                 raise SynapseError(403, "Profile isn't available", Codes.FORBIDDEN)
             raise
 
-
-class MasterProfileHandler(BaseProfileHandler):
-    PROFILE_UPDATE_MS = 60 * 1000
-    PROFILE_UPDATE_EVERY_MS = 24 * 60 * 60 * 1000
-
-    def __init__(self, hs):
-        super().__init__(hs)
-
-        assert hs.config.worker_app is None
-
-        self.clock.looping_call(
-            self._start_update_remote_profile_cache, self.PROFILE_UPDATE_MS
-        )
-
     def _start_update_remote_profile_cache(self):
         return run_as_background_process(
             "Update remote profile", self._update_remote_profile_cache
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index c32f314a1c..a7550806e6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -14,23 +14,29 @@
 # limitations under the License.
 
 import logging
+from typing import TYPE_CHECKING
 
 from synapse.util.async_helpers import Linearizer
 
 from ._base import BaseHandler
 
+if TYPE_CHECKING:
+    from synapse.app.homeserver import HomeServer
+
 logger = logging.getLogger(__name__)
 
 
 class ReadMarkerHandler(BaseHandler):
-    def __init__(self, hs):
+    def __init__(self, hs: "HomeServer"):
         super().__init__(hs)
         self.server_name = hs.config.server_name
         self.store = hs.get_datastore()
         self.read_marker_linearizer = Linearizer(name="read_marker")
         self.notifier = hs.get_notifier()
 
-    async def received_client_read_marker(self, room_id, user_id, event_id):
+    async def received_client_read_marker(
+        self, room_id: str, user_id: str, event_id: str
+    ) -> None:
         """Updates the read marker for a given user in a given room if the event ID given
         is ahead in the stream relative to the current read marker.
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 538f4b2a61..a6f1d21674 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -48,7 +48,7 @@ class RegistrationHandler(BaseHandler):
         self._auth_handler = hs.get_auth_handler()
         self.profile_handler = hs.get_profile_handler()
         self.user_directory_handler = hs.get_user_directory_handler()
-        self.identity_handler = self.hs.get_handlers().identity_handler
+        self.identity_handler = self.hs.get_identity_handler()
         self.ratelimiter = hs.get_registration_ratelimiter()
         self.macaroon_gen = hs.get_macaroon_generator()
         self._server_notices_mxid = hs.config.server_notices_mxid
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index d5f7c78edf..ec300d8877 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -120,7 +120,7 @@ class RoomCreationHandler(BaseHandler):
         # subsequent requests
         self._upgrade_response_cache = ResponseCache(
             hs, "room_upgrade", timeout_ms=FIVE_MINUTES_IN_MS
-        )
+        )  # type: ResponseCache[Tuple[str, str]]
         self._server_notices_mxid = hs.config.server_notices_mxid
 
         self.third_party_event_rules = hs.get_third_party_event_rules()
@@ -185,6 +185,7 @@ class RoomCreationHandler(BaseHandler):
             ShadowBanError if the requester is shadow-banned.
         """
         user_id = requester.user.to_string()
+        assert self.hs.is_mine_id(user_id), "User must be our own: %s" % (user_id,)
 
         # start by allocating a new room id
         r = await self.store.get_room(old_room_id)
@@ -213,7 +214,6 @@ class RoomCreationHandler(BaseHandler):
                     "replacement_room": new_room_id,
                 },
             },
-            token_id=requester.access_token_id,
         )
         old_room_version = await self.store.get_room_version_id(old_room_id)
         await self.auth.check_from_context(
@@ -229,8 +229,8 @@ class RoomCreationHandler(BaseHandler):
         )
 
         # now send the tombstone
-        await self.event_creation_handler.send_nonmember_event(
-            requester, tombstone_event, tombstone_context
+        await self.event_creation_handler.handle_new_client_event(
+            requester=requester, event=tombstone_event, context=tombstone_context,
         )
 
         old_room_state = await tombstone_context.get_current_state_ids()
@@ -681,7 +681,16 @@ class RoomCreationHandler(BaseHandler):
             creator_id=user_id, is_public=is_public, room_version=room_version,
         )
 
-        directory_handler = self.hs.get_handlers().directory_handler
+        # Check whether this visibility value is blocked by a third party module
+        allowed_by_third_party_rules = await (
+            self.third_party_event_rules.check_visibility_can_be_modified(
+                room_id, visibility
+            )
+        )
+        if not allowed_by_third_party_rules:
+            raise SynapseError(403, "Room visibility value not allowed.")
+
+        directory_handler = self.hs.get_directory_handler()
         if room_alias:
             await directory_handler.create_association(
                 requester=requester,
@@ -962,8 +971,6 @@ class RoomCreationHandler(BaseHandler):
             try:
                 random_string = stringutils.random_string(18)
                 gen_room_id = RoomID(random_string, self.hs.hostname).to_string()
-                if isinstance(gen_room_id, bytes):
-                    gen_room_id = gen_room_id.decode("utf-8")
                 await self.store.store_room(
                     room_id=gen_room_id,
                     room_creator_user_id=creator_id,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8feba8c90a..ec784030e9 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -17,12 +17,10 @@ import abc
 import logging
 import random
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple, Union
-
-from unpaddedbase64 import encode_base64
+from typing import TYPE_CHECKING, Iterable, List, Optional, Tuple
 
 from synapse import types
-from synapse.api.constants import MAX_DEPTH, EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -31,12 +29,8 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.api.ratelimiting import Ratelimiter
-from synapse.api.room_versions import EventFormatVersions
-from synapse.crypto.event_signing import compute_event_reference_hash
 from synapse.events import EventBase
-from synapse.events.builder import create_local_event_from_event_dict
 from synapse.events.snapshot import EventContext
-from synapse.events.validator import EventValidator
 from synapse.storage.roommember import RoomsForUser
 from synapse.types import JsonDict, Requester, RoomAlias, RoomID, StateMap, UserID
 from synapse.util.async_helpers import Linearizer
@@ -64,9 +58,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         self.state_handler = hs.get_state_handler()
         self.config = hs.config
 
-        self.federation_handler = hs.get_handlers().federation_handler
-        self.directory_handler = hs.get_handlers().directory_handler
-        self.identity_handler = hs.get_handlers().identity_handler
+        self.federation_handler = hs.get_federation_handler()
+        self.directory_handler = hs.get_directory_handler()
+        self.identity_handler = hs.get_identity_handler()
         self.registration_handler = hs.get_registration_handler()
         self.profile_handler = hs.get_profile_handler()
         self.event_creation_handler = hs.get_event_creation_handler()
@@ -171,6 +165,17 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         if requester.is_guest:
             content["kind"] = "guest"
 
+        # Check if we already have an event with a matching transaction ID. (We
+        # do this check just before we persist an event as well, but may as well
+        # do it up front for efficiency.)
+        if txn_id and requester.access_token_id:
+            existing_event_id = await self.store.get_event_id_from_transaction_id(
+                room_id, requester.user.to_string(), requester.access_token_id, txn_id,
+            )
+            if existing_event_id:
+                event_pos = await self.store.get_position_for_event(existing_event_id)
+                return existing_event_id, event_pos.stream
+
         event, context = await self.event_creation_handler.create_event(
             requester,
             {
@@ -182,21 +187,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 # For backwards compatibility:
                 "membership": membership,
             },
-            token_id=requester.access_token_id,
             txn_id=txn_id,
             prev_event_ids=prev_event_ids,
             require_consent=require_consent,
         )
 
-        # Check if this event matches the previous membership event for the user.
-        duplicate = await self.event_creation_handler.deduplicate_state_event(
-            event, context
-        )
-        if duplicate is not None:
-            # Discard the new event since this membership change is a no-op.
-            _, stream_id = await self.store.get_event_ordering(duplicate.event_id)
-            return duplicate.event_id, stream_id
-
         prev_state_ids = await context.get_prev_state_ids()
 
         prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
@@ -221,7 +216,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                         retry_after_ms=int(1000 * (time_allowed - time_now_s))
                     )
 
-        stream_id = await self.event_creation_handler.handle_new_client_event(
+        result_event = await self.event_creation_handler.handle_new_client_event(
             requester, event, context, extra_users=[target], ratelimit=ratelimit,
         )
 
@@ -231,7 +226,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 if prev_member_event.membership == Membership.JOIN:
                     await self._user_left_room(target, room_id)
 
-        return event.event_id, stream_id
+        # we know it was persisted, so should have a stream ordering
+        assert result_event.internal_metadata.stream_ordering
+        return result_event.event_id, result_event.internal_metadata.stream_ordering
 
     async def copy_room_tags_and_direct_to_room(
         self, old_room_id, new_room_id, user_id
@@ -247,7 +244,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         user_account_data, _ = await self.store.get_account_data_for_user(user_id)
 
         # Copy direct message state if applicable
-        direct_rooms = user_account_data.get("m.direct", {})
+        direct_rooms = user_account_data.get(AccountDataTypes.DIRECT, {})
 
         # Check which key this room is under
         if isinstance(direct_rooms, dict):
@@ -258,7 +255,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
                     # Save back to user's m.direct account data
                     await self.store.add_account_data_for_user(
-                        user_id, "m.direct", direct_rooms
+                        user_id, AccountDataTypes.DIRECT, direct_rooms
                     )
                     break
 
@@ -441,12 +438,12 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 same_membership = old_membership == effective_membership_state
                 same_sender = requester.user.to_string() == old_state.sender
                 if same_sender and same_membership and same_content:
-                    _, stream_id = await self.store.get_event_ordering(
-                        old_state.event_id
-                    )
+                    # duplicate event.
+                    # we know it was persisted, so must have a stream ordering.
+                    assert old_state.internal_metadata.stream_ordering
                     return (
                         old_state.event_id,
-                        stream_id,
+                        old_state.internal_metadata.stream_ordering,
                     )
 
             if old_membership in ["ban", "leave"] and action == "kick":
@@ -642,7 +639,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
     async def send_membership_event(
         self,
-        requester: Requester,
+        requester: Optional[Requester],
         event: EventBase,
         context: EventContext,
         ratelimit: bool = True,
@@ -672,12 +669,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         else:
             requester = types.create_requester(target_user)
 
-        prev_event = await self.event_creation_handler.deduplicate_state_event(
-            event, context
-        )
-        if prev_event is not None:
-            return
-
         prev_state_ids = await context.get_prev_state_ids()
         if event.membership == Membership.JOIN:
             if requester.is_guest:
@@ -692,7 +683,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             if is_blocked:
                 raise SynapseError(403, "This room has been blocked on this server")
 
-        await self.event_creation_handler.handle_new_client_event(
+        event = await self.event_creation_handler.handle_new_client_event(
             requester, event, context, extra_users=[target_user], ratelimit=ratelimit
         )
 
@@ -1135,31 +1126,10 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         room_id = invite_event.room_id
         target_user = invite_event.state_key
-        room_version = await self.store.get_room_version(room_id)
 
         content["membership"] = Membership.LEAVE
 
-        # the auth events for the new event are the same as that of the invite, plus
-        # the invite itself.
-        #
-        # the prev_events are just the invite.
-        invite_hash = invite_event.event_id  # type: Union[str, Tuple]
-        if room_version.event_format == EventFormatVersions.V1:
-            alg, h = compute_event_reference_hash(invite_event)
-            invite_hash = (invite_event.event_id, {alg: encode_base64(h)})
-
-        auth_events = tuple(invite_event.auth_events) + (invite_hash,)
-        prev_events = (invite_hash,)
-
-        # we cap depth of generated events, to ensure that they are not
-        # rejected by other servers (and so that they can be persisted in
-        # the db)
-        depth = min(invite_event.depth + 1, MAX_DEPTH)
-
         event_dict = {
-            "depth": depth,
-            "auth_events": auth_events,
-            "prev_events": prev_events,
             "type": EventTypes.Member,
             "room_id": room_id,
             "sender": target_user,
@@ -1167,28 +1137,30 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             "state_key": target_user,
         }
 
-        event = create_local_event_from_event_dict(
-            clock=self.clock,
-            hostname=self.hs.hostname,
-            signing_key=self.hs.signing_key,
-            room_version=room_version,
-            event_dict=event_dict,
+        # the auth events for the new event are the same as that of the invite, plus
+        # the invite itself.
+        #
+        # the prev_events are just the invite.
+        prev_event_ids = [invite_event.event_id]
+        auth_event_ids = invite_event.auth_event_ids() + prev_event_ids
+
+        event, context = await self.event_creation_handler.create_event(
+            requester,
+            event_dict,
+            txn_id=txn_id,
+            prev_event_ids=prev_event_ids,
+            auth_event_ids=auth_event_ids,
         )
         event.internal_metadata.outlier = True
         event.internal_metadata.out_of_band_membership = True
-        if txn_id is not None:
-            event.internal_metadata.txn_id = txn_id
-        if requester.access_token_id is not None:
-            event.internal_metadata.token_id = requester.access_token_id
-
-        EventValidator().validate_new(event, self.config)
 
-        context = await self.state_handler.compute_event_context(event)
-        context.app_service = requester.app_service
-        stream_id = await self.event_creation_handler.handle_new_client_event(
+        result_event = await self.event_creation_handler.handle_new_client_event(
             requester, event, context, extra_users=[UserID.from_string(target_user)],
         )
-        return event.event_id, stream_id
+        # we know it was persisted, so must have a stream ordering
+        assert result_event.internal_metadata.stream_ordering
+
+        return result_event.event_id, result_event.internal_metadata.stream_ordering
 
     async def _user_left_room(self, target: UserID, room_id: str) -> None:
         """Implements RoomMemberHandler._user_left_room
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 249ffe2a55..dc62b21c06 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -49,7 +49,7 @@ class StatsHandler:
         # Guard to ensure we only process deltas one at a time
         self._is_processing = False
 
-        if hs.config.stats_enabled:
+        if self.stats_enabled and hs.config.run_background_tasks:
             self.notifier.add_replication_callback(self.notify_new_event)
 
             # We kick this off so that we don't have to wait for a change before
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index bfe2583002..a306631094 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -21,7 +21,7 @@ from typing import TYPE_CHECKING, Any, Dict, FrozenSet, List, Optional, Set, Tup
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import AccountDataTypes, EventTypes, Membership
 from synapse.api.filtering import FilterCollection
 from synapse.events import EventBase
 from synapse.logging.context import current_context
@@ -87,7 +87,7 @@ class SyncConfig:
 class TimelineBatch:
     prev_batch = attr.ib(type=StreamToken)
     events = attr.ib(type=List[EventBase])
-    limited = attr.ib(bool)
+    limited = attr.ib(type=bool)
 
     def __bool__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -201,6 +201,8 @@ class SyncResult:
         device_lists: List of user_ids whose devices have changed
         device_one_time_keys_count: Dict of algorithm to count for one time keys
             for this device
+        device_unused_fallback_key_types: List of key types that have an unused fallback
+            key
         groups: Group updates, if any
     """
 
@@ -213,6 +215,7 @@ class SyncResult:
     to_device = attr.ib(type=List[JsonDict])
     device_lists = attr.ib(type=DeviceLists)
     device_one_time_keys_count = attr.ib(type=JsonDict)
+    device_unused_fallback_key_types = attr.ib(type=List[str])
     groups = attr.ib(type=Optional[GroupsSyncResult])
 
     def __bool__(self) -> bool:
@@ -240,7 +243,9 @@ class SyncHandler:
         self.presence_handler = hs.get_presence_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
-        self.response_cache = ResponseCache(hs, "sync")
+        self.response_cache = ResponseCache(
+            hs, "sync"
+        )  # type: ResponseCache[Tuple[Any, ...]]
         self.state = hs.get_state_handler()
         self.auth = hs.get_auth()
         self.storage = hs.get_storage()
@@ -457,8 +462,13 @@ class SyncHandler:
                 recents = []
 
             if not limited or block_all_timeline:
+                prev_batch_token = now_token
+                if recents:
+                    room_key = recents[0].internal_metadata.before
+                    prev_batch_token = now_token.copy_and_replace("room_key", room_key)
+
                 return TimelineBatch(
-                    events=recents, prev_batch=now_token, limited=False
+                    events=recents, prev_batch=prev_batch_token, limited=False
                 )
 
             filtering_factor = 2
@@ -1014,10 +1024,14 @@ class SyncHandler:
         logger.debug("Fetching OTK data")
         device_id = sync_config.device_id
         one_time_key_counts = {}  # type: JsonDict
+        unused_fallback_key_types = []  # type: List[str]
         if device_id:
             one_time_key_counts = await self.store.count_e2e_one_time_keys(
                 user_id, device_id
             )
+            unused_fallback_key_types = await self.store.get_e2e_unused_fallback_key_types(
+                user_id, device_id
+            )
 
         logger.debug("Fetching group data")
         await self._generate_sync_entry_for_groups(sync_result_builder)
@@ -1041,6 +1055,7 @@ class SyncHandler:
             device_lists=device_lists,
             groups=sync_result_builder.groups,
             device_one_time_keys_count=one_time_key_counts,
+            device_unused_fallback_key_types=unused_fallback_key_types,
             next_batch=sync_result_builder.now_token,
         )
 
@@ -1378,13 +1393,16 @@ class SyncHandler:
                         return set(), set(), set(), set()
 
         ignored_account_data = await self.store.get_global_account_data_by_type_for_user(
-            "m.ignored_user_list", user_id=user_id
+            AccountDataTypes.IGNORED_USER_LIST, user_id=user_id
         )
 
+        # If there is ignored users account data and it matches the proper type,
+        # then use it.
+        ignored_users = frozenset()  # type: FrozenSet[str]
         if ignored_account_data:
-            ignored_users = ignored_account_data.get("ignored_users", {}).keys()
-        else:
-            ignored_users = frozenset()
+            ignored_users_data = ignored_account_data.get("ignored_users", {})
+            if isinstance(ignored_users_data, dict):
+                ignored_users = frozenset(ignored_users_data.keys())
 
         if since_token:
             room_changes = await self._get_rooms_changed(
@@ -1478,7 +1496,7 @@ class SyncHandler:
         return False
 
     async def _get_rooms_changed(
-        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
     ) -> _RoomChanges:
         """Gets the the changes that have happened since the last sync.
         """
@@ -1690,7 +1708,7 @@ class SyncHandler:
         return _RoomChanges(room_entries, invited, newly_joined_rooms, newly_left_rooms)
 
     async def _get_all_rooms(
-        self, sync_result_builder: "SyncResultBuilder", ignored_users: Set[str]
+        self, sync_result_builder: "SyncResultBuilder", ignored_users: FrozenSet[str]
     ) -> _RoomChanges:
         """Returns entries for all rooms for the user.
 
@@ -1764,7 +1782,7 @@ class SyncHandler:
     async def _generate_room_entry(
         self,
         sync_result_builder: "SyncResultBuilder",
-        ignored_users: Set[str],
+        ignored_users: FrozenSet[str],
         room_builder: "RoomSyncResultBuilder",
         ephemeral: List[JsonDict],
         tags: Optional[Dict[str, Dict[str, Any]]],
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 9146dc1a3b..3d66bf305e 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -143,7 +143,7 @@ class _BaseThreepidAuthChecker:
 
         threepid_creds = authdict["threepid_creds"]
 
-        identity_handler = self.hs.get_handlers().identity_handler
+        identity_handler = self.hs.get_identity_handler()
 
         logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))