summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2022-09-20 18:04:30 -0500
committerEric Eastwood <erice@element.io>2022-09-20 18:04:30 -0500
commit05e511368bb65f08e700bba141a702595436e14d (patch)
tree98f408d69c51288e81bf3ba16c69c960e5e27794 /synapse/handlers
parentRemove debug logs (diff)
parentMerge branch 'develop' into madlittlemods/11850-migrate-to-opentelemetry (diff)
downloadsynapse-05e511368bb65f08e700bba141a702595436e14d.tar.xz
Merge branch 'madlittlemods/11850-migrate-to-opentelemetry' into madlittlemods/13356-messages-investigation-scratch-v1
Conflicts:
	poetry.lock
	synapse/handlers/federation.py
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/admin.py1
-rw-r--r--synapse/handlers/auth.py51
-rw-r--r--synapse/handlers/device.py31
-rw-r--r--synapse/handlers/directory.py34
-rw-r--r--synapse/handlers/e2e_keys.py69
-rw-r--r--synapse/handlers/event_auth.py9
-rw-r--r--synapse/handlers/events.py9
-rw-r--r--synapse/handlers/federation.py64
-rw-r--r--synapse/handlers/federation_event.py37
-rw-r--r--synapse/handlers/identity.py198
-rw-r--r--synapse/handlers/initial_sync.py6
-rw-r--r--synapse/handlers/message.py31
-rw-r--r--synapse/handlers/pagination.py43
-rw-r--r--synapse/handlers/presence.py3
-rw-r--r--synapse/handlers/push_rules.py5
-rw-r--r--synapse/handlers/receipts.py29
-rw-r--r--synapse/handlers/register.py15
-rw-r--r--synapse/handlers/relations.py2
-rw-r--r--synapse/handlers/room.py45
-rw-r--r--synapse/handlers/room_member.py33
-rw-r--r--synapse/handlers/room_summary.py1
-rw-r--r--synapse/handlers/sync.py24
-rw-r--r--synapse/handlers/typing.py17
-rw-r--r--synapse/handlers/ui_auth/checkers.py21
24 files changed, 317 insertions, 461 deletions
diff --git a/synapse/handlers/admin.py b/synapse/handlers/admin.py
index d4fe7df533..cf9f19608a 100644
--- a/synapse/handlers/admin.py
+++ b/synapse/handlers/admin.py
@@ -70,6 +70,7 @@ class AdminHandler:
             "appservice_id",
             "consent_server_notice_sent",
             "consent_version",
+            "consent_ts",
             "user_type",
             "is_guest",
         }
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index bfa5535044..eacd631ee0 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -63,7 +63,6 @@ from synapse.http.server import finish_request, respond_with_html
 from synapse.http.site import SynapseRequest
 from synapse.logging.context import defer_to_thread
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.storage.roommember import ProfileInfo
 from synapse.types import JsonDict, Requester, UserID
 from synapse.util import stringutils as stringutils
 from synapse.util.async_helpers import delay_cancellation, maybe_awaitable
@@ -280,7 +279,7 @@ class AuthHandler:
         that it isn't stolen by re-authenticating them.
 
         Args:
-            requester: The user, as given by the access token
+            requester: The user making the request, according to the access token.
 
             request: The request sent by the client.
 
@@ -1435,20 +1434,25 @@ class AuthHandler:
             access_token: access token to be deleted
 
         """
-        user_info = await self.auth.get_user_by_access_token(access_token)
+        token = await self.store.get_user_by_access_token(access_token)
+        if not token:
+            # At this point, the token should already have been fetched once by
+            # the caller, so this should not happen, unless of a race condition
+            # between two delete requests
+            raise SynapseError(HTTPStatus.UNAUTHORIZED, "Unrecognised access token")
         await self.store.delete_access_token(access_token)
 
         # see if any modules want to know about this
         await self.password_auth_provider.on_logged_out(
-            user_id=user_info.user_id,
-            device_id=user_info.device_id,
+            user_id=token.user_id,
+            device_id=token.device_id,
             access_token=access_token,
         )
 
         # delete pushers associated with this access token
-        if user_info.token_id is not None:
+        if token.token_id is not None:
             await self.hs.get_pusherpool().remove_pushers_by_access_token(
-                user_info.user_id, (user_info.token_id,)
+                token.user_id, (token.token_id,)
             )
 
     async def delete_access_tokens_for_user(
@@ -1682,41 +1686,10 @@ class AuthHandler:
             respond_with_html(request, 403, self._sso_account_deactivated_template)
             return
 
-        profile = await self.store.get_profileinfo(
+        user_profile_data = await self.store.get_profileinfo(
             UserID.from_string(registered_user_id).localpart
         )
 
-        self._complete_sso_login(
-            registered_user_id,
-            auth_provider_id,
-            request,
-            client_redirect_url,
-            extra_attributes,
-            new_user=new_user,
-            user_profile_data=profile,
-            auth_provider_session_id=auth_provider_session_id,
-        )
-
-    def _complete_sso_login(
-        self,
-        registered_user_id: str,
-        auth_provider_id: str,
-        request: Request,
-        client_redirect_url: str,
-        extra_attributes: Optional[JsonDict] = None,
-        new_user: bool = False,
-        user_profile_data: Optional[ProfileInfo] = None,
-        auth_provider_session_id: Optional[str] = None,
-    ) -> None:
-        """
-        The synchronous portion of complete_sso_login.
-
-        This exists purely for backwards compatibility of synapse.module_api.ModuleApi.
-        """
-
-        if user_profile_data is None:
-            user_profile_data = ProfileInfo(None, None)
-
         # Store any extra attributes which will be passed in the login response.
         # Note that this is per-user so it may overwrite a previous value, this
         # is considered OK since the newest SSO attributes should be most valid.
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 659ee0ef5e..8e9c98db6c 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -45,13 +45,13 @@ from synapse.types import (
     JsonDict,
     StreamKeyType,
     StreamToken,
-    UserID,
     get_domain_from_id,
     get_verify_key_from_cross_signing_key,
 )
 from synapse.util import stringutils
 from synapse.util.async_helpers import Linearizer
 from synapse.util.caches.expiringcache import ExpiringCache
+from synapse.util.cancellation import cancellable
 from synapse.util.metrics import measure_func
 from synapse.util.retryutils import NotRetryingDestination
 
@@ -74,6 +74,7 @@ class DeviceWorkerHandler:
         self._state_storage = hs.get_storage_controllers().state
         self._auth_handler = hs.get_auth_handler()
         self.server_name = hs.hostname
+        self._msc3852_enabled = hs.config.experimental.msc3852_enabled
 
     @trace
     async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
@@ -123,6 +124,7 @@ class DeviceWorkerHandler:
 
         return device
 
+    @cancellable
     async def get_device_changes_in_shared_rooms(
         self, user_id: str, room_ids: Collection[str], from_token: StreamToken
     ) -> Collection[str]:
@@ -162,6 +164,7 @@ class DeviceWorkerHandler:
 
     @trace
     @measure_func("device.get_user_ids_changed")
+    @cancellable
     async def get_user_ids_changed(
         self, user_id: str, from_token: StreamToken
     ) -> JsonDict:
@@ -309,6 +312,7 @@ class DeviceHandler(DeviceWorkerHandler):
         super().__init__(hs)
 
         self.federation_sender = hs.get_federation_sender()
+        self._storage_controllers = hs.get_storage_controllers()
 
         self.device_list_updater = DeviceListUpdater(hs, self)
 
@@ -319,8 +323,6 @@ class DeviceHandler(DeviceWorkerHandler):
             self.device_list_updater.incoming_device_list_update,
         )
 
-        hs.get_distributor().observe("user_left_room", self.user_left_room)
-
         # Whether `_handle_new_device_update_async` is currently processing.
         self._handle_new_device_update_is_processing = False
 
@@ -564,14 +566,6 @@ class DeviceHandler(DeviceWorkerHandler):
             StreamKeyType.DEVICE_LIST, position, users=[from_user_id]
         )
 
-    async def user_left_room(self, user: UserID, room_id: str) -> None:
-        user_id = user.to_string()
-        room_ids = await self.store.get_rooms_for_user(user_id)
-        if not room_ids:
-            # We no longer share rooms with this user, so we'll no longer
-            # receive device updates. Mark this in DB.
-            await self.store.mark_remote_user_device_list_as_unsubscribed(user_id)
-
     async def store_dehydrated_device(
         self,
         user_id: str,
@@ -693,8 +687,11 @@ class DeviceHandler(DeviceWorkerHandler):
 
                     # Ignore any users that aren't ours
                     if self.hs.is_mine_id(user_id):
-                        joined_user_ids = await self.store.get_users_in_room(room_id)
-                        hosts = {get_domain_from_id(u) for u in joined_user_ids}
+                        hosts = set(
+                            await self._storage_controllers.state.get_current_hosts_in_room(
+                                room_id
+                            )
+                        )
                         hosts.discard(self.server_name)
 
                     # Check if we've already sent this update to some hosts
@@ -747,7 +744,13 @@ def _update_device_from_client_ips(
     device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
 ) -> None:
     ip = client_ips.get((device["user_id"], device["device_id"]), {})
-    device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
+    device.update(
+        {
+            "last_seen_user_agent": ip.get("user_agent"),
+            "last_seen_ts": ip.get("last_seen"),
+            "last_seen_ip": ip.get("ip"),
+        }
+    )
 
 
 class DeviceListUpdater:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 09a7a4b238..7127d5aefc 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -30,7 +30,7 @@ from synapse.api.errors import (
 from synapse.appservice import ApplicationService
 from synapse.module_api import NOT_SPAM
 from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, RoomAlias
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -83,8 +83,9 @@ class DirectoryHandler:
         # TODO(erikj): Add transactions.
         # TODO(erikj): Check if there is a current association.
         if not servers:
-            users = await self.store.get_users_in_room(room_id)
-            servers = {get_domain_from_id(u) for u in users}
+            servers = await self._storage_controllers.state.get_current_hosts_in_room(
+                room_id
+            )
 
         if not servers:
             raise SynapseError(400, "Failed to get server list")
@@ -133,7 +134,7 @@ class DirectoryHandler:
         else:
             # Server admins are not subject to the same constraints as normal
             # users when creating an alias (e.g. being in the room).
-            is_admin = await self.auth.is_server_admin(requester.user)
+            is_admin = await self.auth.is_server_admin(requester)
 
             if (self.require_membership and check_membership) and not is_admin:
                 rooms_for_user = await self.store.get_rooms_for_user(user_id)
@@ -197,7 +198,7 @@ class DirectoryHandler:
         user_id = requester.user.to_string()
 
         try:
-            can_delete = await self._user_can_delete_alias(room_alias, user_id)
+            can_delete = await self._user_can_delete_alias(room_alias, requester)
         except StoreError as e:
             if e.code == 404:
                 raise NotFoundError("Unknown room alias")
@@ -287,8 +288,9 @@ class DirectoryHandler:
                 Codes.NOT_FOUND,
             )
 
-        users = await self.store.get_users_in_room(room_id)
-        extra_servers = {get_domain_from_id(u) for u in users}
+        extra_servers = await self._storage_controllers.state.get_current_hosts_in_room(
+            room_id
+        )
         servers_set = set(extra_servers) | set(servers)
 
         # If this server is in the list of servers, return it first.
@@ -400,7 +402,9 @@ class DirectoryHandler:
         # either no interested services, or no service with an exclusive lock
         return True
 
-    async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
+    async def _user_can_delete_alias(
+        self, alias: RoomAlias, requester: Requester
+    ) -> bool:
         """Determine whether a user can delete an alias.
 
         One of the following must be true:
@@ -413,7 +417,7 @@ class DirectoryHandler:
         """
         creator = await self.store.get_room_alias_creator(alias.to_string())
 
-        if creator == user_id:
+        if creator == requester.user.to_string():
             return True
 
         # Resolve the alias to the corresponding room.
@@ -422,9 +426,7 @@ class DirectoryHandler:
         if not room_id:
             return False
 
-        return await self.auth.check_can_change_room_list(
-            room_id, UserID.from_string(user_id)
-        )
+        return await self.auth.check_can_change_room_list(room_id, requester)
 
     async def edit_published_room_list(
         self, requester: Requester, room_id: str, visibility: str
@@ -463,7 +465,7 @@ class DirectoryHandler:
             raise SynapseError(400, "Unknown room")
 
         can_change_room_list = await self.auth.check_can_change_room_list(
-            room_id, requester.user
+            room_id, requester
         )
         if not can_change_room_list:
             raise AuthError(
@@ -528,10 +530,8 @@ class DirectoryHandler:
         Get a list of the aliases that currently point to this room on this server
         """
         # allow access to server admins and current members of the room
-        is_admin = await self.auth.is_server_admin(requester.user)
+        is_admin = await self.auth.is_server_admin(requester)
         if not is_admin:
-            await self.auth.check_user_in_room_or_world_readable(
-                room_id, requester.user.to_string()
-            )
+            await self.auth.check_user_in_room_or_world_readable(room_id, requester)
 
         return await self.store.get_aliases_for_room(room_id)
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index a3692f00d9..cf788a4a86 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -37,7 +37,8 @@ from synapse.types import (
     get_verify_key_from_cross_signing_key,
 )
 from synapse.util import json_decoder, unwrapFirstError
-from synapse.util.async_helpers import Linearizer
+from synapse.util.async_helpers import Linearizer, delay_cancellation
+from synapse.util.cancellation import cancellable
 from synapse.util.retryutils import NotRetryingDestination
 
 if TYPE_CHECKING:
@@ -91,6 +92,7 @@ class E2eKeysHandler:
         )
 
     @trace
+    @cancellable
     async def query_devices(
         self,
         query_body: JsonDict,
@@ -173,6 +175,35 @@ class E2eKeysHandler:
                     user_ids_not_in_cache,
                     remote_results,
                 ) = await self.store.get_user_devices_from_cache(query_list)
+
+                # Check that the homeserver still shares a room with all cached users.
+                # Note that this check may be slightly racy when a remote user leaves a
+                # room after we have fetched their cached device list. In the worst case
+                # we will do extra federation queries for devices that we had cached.
+                cached_users = set(remote_results.keys())
+                valid_cached_users = (
+                    await self.store.get_users_server_still_shares_room_with(
+                        remote_results.keys()
+                    )
+                )
+                invalid_cached_users = cached_users - valid_cached_users
+                if invalid_cached_users:
+                    # Fix up results. If we get here, it means there was either a bug in
+                    # device list tracking, or we hit the race mentioned above.
+                    # TODO: In practice, this path is hit fairly often in existing
+                    #       deployments when clients query the keys of departed remote
+                    #       users. A background update to mark the appropriate device
+                    #       lists as unsubscribed is needed.
+                    #       https://github.com/matrix-org/synapse/issues/13651
+                    # Note that this currently introduces a failure mode when clients
+                    # are trying to decrypt old messages from a remote user whose
+                    # homeserver is no longer available. We may want to consider falling
+                    # back to the cached data when we fail to retrieve a device list
+                    # over federation for such remote users.
+                    user_ids_not_in_cache.update(invalid_cached_users)
+                    for invalid_user_id in invalid_cached_users:
+                        remote_results.pop(invalid_user_id)
+
                 for user_id, devices in remote_results.items():
                     user_devices = results.setdefault(user_id, {})
                     for device_id, device in devices.items():
@@ -208,22 +239,26 @@ class E2eKeysHandler:
                     r[user_id] = remote_queries[user_id]
 
             # Now fetch any devices that we don't have in our cache
+            # TODO It might make sense to propagate cancellations into the
+            #      deferreds which are querying remote homeservers.
             await make_deferred_yieldable(
-                defer.gatherResults(
-                    [
-                        run_in_background(
-                            self._query_devices_for_destination,
-                            results,
-                            cross_signing_keys,
-                            failures,
-                            destination,
-                            queries,
-                            timeout,
-                        )
-                        for destination, queries in remote_queries_not_in_cache.items()
-                    ],
-                    consumeErrors=True,
-                ).addErrback(unwrapFirstError)
+                delay_cancellation(
+                    defer.gatherResults(
+                        [
+                            run_in_background(
+                                self._query_devices_for_destination,
+                                results,
+                                cross_signing_keys,
+                                failures,
+                                destination,
+                                queries,
+                                timeout,
+                            )
+                            for destination, queries in remote_queries_not_in_cache.items()
+                        ],
+                        consumeErrors=True,
+                    ).addErrback(unwrapFirstError)
+                )
             )
 
             ret = {"device_keys": results, "failures": failures}
@@ -347,6 +382,7 @@ class E2eKeysHandler:
 
         return
 
+    @cancellable
     async def get_cross_signing_keys_from_cache(
         self, query: Iterable[str], from_user_id: Optional[str]
     ) -> Dict[str, Dict[str, dict]]:
@@ -393,6 +429,7 @@ class E2eKeysHandler:
         }
 
     @trace
+    @cancellable
     async def query_local_devices(
         self, query: Mapping[str, Optional[List[str]]]
     ) -> Dict[str, Dict[str, dict]]:
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index a2dd9c7efa..c3ddc5d182 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -129,12 +129,9 @@ class EventAuthHandler:
         else:
             users = {}
 
-        # Find the user with the highest power level.
-        users_in_room = await self._store.get_users_in_room(room_id)
-        # Only interested in local users.
-        local_users_in_room = [
-            u for u in users_in_room if get_domain_from_id(u) == self._server_name
-        ]
+        # Find the user with the highest power level (only interested in local
+        # users).
+        local_users_in_room = await self._store.get_local_users_in_room(room_id)
         chosen_user = max(
             local_users_in_room,
             key=lambda user: users.get(user, users_default_level),
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index ac13340d3a..949b69cb41 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -151,7 +151,7 @@ class EventHandler:
         """Retrieve a single specified event.
 
         Args:
-            user: The user requesting the event
+            user: The local user requesting the event
             room_id: The expected room id. We'll return None if the
                 event's room does not match.
             event_id: The event ID to obtain.
@@ -173,8 +173,11 @@ class EventHandler:
         if not event:
             return None
 
-        users = await self.store.get_users_in_room(event.room_id)
-        is_peeking = user.to_string() not in users
+        is_user_in_room = await self.store.check_local_user_in_room(
+            user_id=user.to_string(), room_id=event.room_id
+        )
+        # The user is peeking if they aren't in the room already
+        is_peeking = not is_user_in_room
 
         filtered = await filter_events_for_client(
             self._storage_controllers, user.to_string(), [event], is_peeking=is_peeking
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index ad4e09c8f8..377ac04f8c 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -76,7 +76,7 @@ from synapse.replication.http.federation import (
 from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
-from synapse.types import JsonDict, StateMap, get_domain_from_id
+from synapse.types import JsonDict, get_domain_from_id
 from synapse.util.async_helpers import Linearizer
 from synapse.util.retryutils import NotRetryingDestination
 from synapse.visibility import filter_events_for_server
@@ -92,9 +92,14 @@ backfill_processing_before_timer = Histogram(
     "sec",
     [],
     buckets=(
+        0.1,
+        0.5,
         1.0,
+        2.5,
         5.0,
+        7.5,
         10.0,
+        15.0,
         20.0,
         30.0,
         40.0,
@@ -105,37 +110,6 @@ backfill_processing_before_timer = Histogram(
 )
 
 
-def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
-    """Get joined domains from state
-
-    Args:
-        state: State map from type/state key to event.
-
-    Returns:
-        Returns a list of servers with the lowest depth of their joins.
-            Sorted by lowest depth first.
-    """
-    joined_users = [
-        (state_key, int(event.depth))
-        for (e_type, state_key), event in state.items()
-        if e_type == EventTypes.Member and event.membership == Membership.JOIN
-    ]
-
-    joined_domains: Dict[str, int] = {}
-    for u, d in joined_users:
-        try:
-            dom = get_domain_from_id(u)
-            old_d = joined_domains.get(dom)
-            if old_d:
-                joined_domains[dom] = min(d, old_d)
-            else:
-                joined_domains[dom] = d
-        except Exception:
-            pass
-
-    return sorted(joined_domains.items(), key=lambda d: d[1])
-
-
 class _BackfillPointType(Enum):
     # a regular backwards extremity (ie, an event which we don't yet have, but which
     # is referred to by other events in the DAG)
@@ -436,25 +410,21 @@ class FederationHandler:
             str(len(extremities_to_request)),
         )
 
-        # Now we need to decide which hosts to hit first.
-
-        # First we try hosts that are already in the room
-        # TODO: HEURISTIC ALERT.
-
         with start_active_span("getting likely_domains"):
-            curr_state = await self._storage_controllers.state.get_current_state(
-                room_id
+            # Now we need to decide which hosts to hit first.
+            # First we try hosts that are already in the room.
+            # TODO: HEURISTIC ALERT.
+            likely_domains = (
+                await self._storage_controllers.state.get_current_hosts_in_room(room_id)
             )
 
-            curr_domains = get_domains_from_state(curr_state)
-
-            likely_domains = [
-                domain for domain, depth in curr_domains if domain != self.server_name
-            ]
-
-        async def try_backfill(domains: List[str]) -> bool:
+        async def try_backfill(domains: Collection[str]) -> bool:
             # TODO: Should we try multiple of these at a time?
             for dom in domains:
+                # We don't want to ask our own server for information we don't have
+                if dom == self.server_name:
+                    continue
+
                 try:
                     await self._federation_event_handler.backfill(
                         dom, room_id, limit=100, extremities=extremities_to_request
@@ -495,7 +465,7 @@ class FederationHandler:
 
         processing_end_time = self.clock.time_msec()
         backfill_processing_before_timer.observe(
-            (processing_start_time - processing_end_time) / 1000
+            (processing_end_time - processing_start_time) / 1000
         )
 
         success = await try_backfill(likely_domains)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index c50c3a2387..c87925aa51 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -104,15 +104,25 @@ backfill_processing_after_timer = Histogram(
     "sec",
     [],
     buckets=(
+        0.1,
+        0.25,
+        0.5,
         1.0,
+        2.5,
         5.0,
+        7.5,
         10.0,
+        15.0,
         20.0,
+        25.0,
         30.0,
         40.0,
+        50.0,
         60.0,
         80.0,
+        100.0,
         120.0,
+        150.0,
         180.0,
         "+Inf",
     ),
@@ -852,7 +862,15 @@ class FederationEventHandler:
             self._sanity_check_event(event)
         except SynapseError as err:
             logger.warning("Event %s failed sanity check: %s", event_id, err)
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(err)
+            )
             return
+        except Exception as exc:
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(exc)
+            )
+            raise exc
 
         try:
             try:
@@ -887,10 +905,19 @@ class FederationEventHandler:
                     backfilled=backfilled,
                 )
         except FederationError as e:
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(e)
+            )
+
             if e.code == 403:
                 logger.warning("Pulled event %s failed history check.", event_id)
             else:
                 raise
+        except Exception as exc:
+            await self._store.record_event_failed_pull_attempt(
+                event.room_id, event_id, str(exc)
+            )
+            raise exc
 
     @trace
     async def _compute_event_context_with_maybe_missing_prevs(
@@ -1031,6 +1058,14 @@ class FederationEventHandler:
             InvalidResponseError: if the remote homeserver's response contains fields
                 of the wrong type.
         """
+
+        # It would be better if we could query the difference from our known
+        # state to the given `event_id` so the sending server doesn't have to
+        # send as much and we don't have to process as many events. For example
+        # in a room like #matrix:matrix.org, we get 200k events (77k state_events, 122k
+        # auth_events) from this call.
+        #
+        # Tracked by https://github.com/matrix-org/synapse/issues/13618
         (
             state_event_ids,
             auth_event_ids,
@@ -1389,7 +1424,7 @@ class FederationEventHandler:
         logger.debug("_handle_marker_event: received %s", marker_event)
 
         insertion_event_id = marker_event.content.get(
-            EventContentFields.MSC2716_MARKER_INSERTION
+            EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
         )
 
         if insertion_event_id is None:
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index e5afe84df9..93d09e9939 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -26,7 +26,6 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.api.ratelimiting import Ratelimiter
-from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.http import RequestTimedOutError
 from synapse.http.client import SimpleHttpClient
 from synapse.http.site import SynapseRequest
@@ -416,48 +415,6 @@ class IdentityHandler:
 
         return session_id
 
-    async def request_email_token(
-        self,
-        id_server: str,
-        email: str,
-        client_secret: str,
-        send_attempt: int,
-        next_link: Optional[str] = None,
-    ) -> JsonDict:
-        """
-        Request an external server send an email on our behalf for the purposes of threepid
-        validation.
-
-        Args:
-            id_server: The identity server to proxy to
-            email: The email to send the message to
-            client_secret: The unique client_secret sends by the user
-            send_attempt: Which attempt this is
-            next_link: A link to redirect the user to once they submit the token
-
-        Returns:
-            The json response body from the server
-        """
-        params = {
-            "email": email,
-            "client_secret": client_secret,
-            "send_attempt": send_attempt,
-        }
-        if next_link:
-            params["next_link"] = next_link
-
-        try:
-            data = await self.http_client.post_json_get_json(
-                id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
-                params,
-            )
-            return data
-        except HttpResponseException as e:
-            logger.info("Proxied requestToken failed: %r", e)
-            raise e.to_synapse_error()
-        except RequestTimedOutError:
-            raise SynapseError(500, "Timed out contacting identity server")
-
     async def requestMsisdnToken(
         self,
         id_server: str,
@@ -531,18 +488,7 @@ class IdentityHandler:
         validation_session = None
 
         # Try to validate as email
-        if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
-            # Remote emails will only be used if a valid identity server is provided.
-            assert (
-                self.hs.config.registration.account_threepid_delegate_email is not None
-            )
-
-            # Ask our delegated email identity server
-            validation_session = await self.threepid_from_creds(
-                self.hs.config.registration.account_threepid_delegate_email,
-                threepid_creds,
-            )
-        elif self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
+        if self.hs.config.email.can_verify_email:
             # Get a validated session matching these details
             validation_session = await self.store.get_threepid_validation_session(
                 "email", client_secret, sid=sid, validated=True
@@ -592,11 +538,7 @@ class IdentityHandler:
             raise SynapseError(400, "Error contacting the identity server")
 
     async def lookup_3pid(
-        self,
-        id_server: str,
-        medium: str,
-        address: str,
-        id_access_token: Optional[str] = None,
+        self, id_server: str, medium: str, address: str, id_access_token: str
     ) -> Optional[str]:
         """Looks up a 3pid in the passed identity server.
 
@@ -611,60 +553,15 @@ class IdentityHandler:
         Returns:
             the matrix ID of the 3pid, or None if it is not recognized.
         """
-        if id_access_token is not None:
-            try:
-                results = await self._lookup_3pid_v2(
-                    id_server, id_access_token, medium, address
-                )
-                return results
-
-            except Exception as e:
-                # Catch HttpResponseExcept for a non-200 response code
-                # Check if this identity server does not know about v2 lookups
-                if isinstance(e, HttpResponseException) and e.code == 404:
-                    # This is an old identity server that does not yet support v2 lookups
-                    logger.warning(
-                        "Attempted v2 lookup on v1 identity server %s. Falling "
-                        "back to v1",
-                        id_server,
-                    )
-                else:
-                    logger.warning("Error when looking up hashing details: %s", e)
-                    return None
-
-        return await self._lookup_3pid_v1(id_server, medium, address)
-
-    async def _lookup_3pid_v1(
-        self, id_server: str, medium: str, address: str
-    ) -> Optional[str]:
-        """Looks up a 3pid in the passed identity server using v1 lookup.
-
-        Args:
-            id_server: The server name (including port, if required)
-                of the identity server to use.
-            medium: The type of the third party identifier (e.g. "email").
-            address: The third party identifier (e.g. "foo@example.com").
 
-        Returns:
-            the matrix ID of the 3pid, or None if it is not recognized.
-        """
         try:
-            data = await self.blacklisting_http_client.get_json(
-                "%s%s/_matrix/identity/api/v1/lookup" % (id_server_scheme, id_server),
-                {"medium": medium, "address": address},
+            results = await self._lookup_3pid_v2(
+                id_server, id_access_token, medium, address
             )
-
-            if "mxid" in data:
-                # note: we used to verify the identity server's signature here, but no longer
-                # require or validate it. See the following for context:
-                # https://github.com/matrix-org/synapse/issues/5253#issuecomment-666246950
-                return data["mxid"]
-        except RequestTimedOutError:
-            raise SynapseError(500, "Timed out contacting identity server")
-        except OSError as e:
-            logger.warning("Error from v1 identity server lookup: %s" % (e,))
-
-        return None
+            return results
+        except Exception as e:
+            logger.warning("Error when looking up hashing details: %s", e)
+            return None
 
     async def _lookup_3pid_v2(
         self, id_server: str, id_access_token: str, medium: str, address: str
@@ -793,7 +690,7 @@ class IdentityHandler:
         room_type: Optional[str],
         inviter_display_name: str,
         inviter_avatar_url: str,
-        id_access_token: Optional[str] = None,
+        id_access_token: str,
     ) -> Tuple[str, List[Dict[str, str]], Dict[str, str], str]:
         """
         Asks an identity server for a third party invite.
@@ -814,7 +711,7 @@ class IdentityHandler:
             inviter_display_name: The current display name of the
                 inviter.
             inviter_avatar_url: The URL of the inviter's avatar.
-            id_access_token (str|None): The access token to authenticate to the identity
+            id_access_token (str): The access token to authenticate to the identity
                 server with
 
         Returns:
@@ -846,71 +743,24 @@ class IdentityHandler:
             invite_config["org.matrix.web_client_location"] = self._web_client_location
 
         # Add the identity service access token to the JSON body and use the v2
-        # Identity Service endpoints if id_access_token is present
+        # Identity Service endpoints
         data = None
-        base_url = "%s%s/_matrix/identity" % (id_server_scheme, id_server)
 
-        if id_access_token:
-            key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % (
-                id_server_scheme,
-                id_server,
-            )
+        key_validity_url = "%s%s/_matrix/identity/v2/pubkey/isvalid" % (
+            id_server_scheme,
+            id_server,
+        )
 
-            # Attempt a v2 lookup
-            url = base_url + "/v2/store-invite"
-            try:
-                data = await self.blacklisting_http_client.post_json_get_json(
-                    url,
-                    invite_config,
-                    {"Authorization": create_id_access_token_header(id_access_token)},
-                )
-            except RequestTimedOutError:
-                raise SynapseError(500, "Timed out contacting identity server")
-            except HttpResponseException as e:
-                if e.code != 404:
-                    logger.info("Failed to POST %s with JSON: %s", url, e)
-                    raise e
-
-        if data is None:
-            key_validity_url = "%s%s/_matrix/identity/api/v1/pubkey/isvalid" % (
-                id_server_scheme,
-                id_server,
+        url = "%s%s/_matrix/identity/v2/store-invite" % (id_server_scheme, id_server)
+        try:
+            data = await self.blacklisting_http_client.post_json_get_json(
+                url,
+                invite_config,
+                {"Authorization": create_id_access_token_header(id_access_token)},
             )
-            url = base_url + "/api/v1/store-invite"
-
-            try:
-                data = await self.blacklisting_http_client.post_json_get_json(
-                    url, invite_config
-                )
-            except RequestTimedOutError:
-                raise SynapseError(500, "Timed out contacting identity server")
-            except HttpResponseException as e:
-                logger.warning(
-                    "Error trying to call /store-invite on %s%s: %s",
-                    id_server_scheme,
-                    id_server,
-                    e,
-                )
-
-            if data is None:
-                # Some identity servers may only support application/x-www-form-urlencoded
-                # types. This is especially true with old instances of Sydent, see
-                # https://github.com/matrix-org/sydent/pull/170
-                try:
-                    data = await self.blacklisting_http_client.post_urlencoded_get_json(
-                        url, invite_config
-                    )
-                except HttpResponseException as e:
-                    logger.warning(
-                        "Error calling /store-invite on %s%s with fallback "
-                        "encoding: %s",
-                        id_server_scheme,
-                        id_server,
-                        e,
-                    )
-                    raise e
-
-        # TODO: Check for success
+        except RequestTimedOutError:
+            raise SynapseError(500, "Timed out contacting identity server")
+
         token = data["token"]
         public_keys = data.get("public_keys", [])
         if "public_key" in data:
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 6484e47e5f..860c82c110 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -309,18 +309,18 @@ class InitialSyncHandler:
         if blocked:
             raise SynapseError(403, "This room has been blocked on this server")
 
-        user_id = requester.user.to_string()
-
         (
             membership,
             member_event_id,
         ) = await self.auth.check_user_in_room_or_world_readable(
             room_id,
-            user_id,
+            requester,
             allow_departed_users=True,
         )
         is_peeking = member_event_id is None
 
+        user_id = requester.user.to_string()
+
         if membership == Membership.JOIN:
             result = await self._room_initial_sync_joined(
                 user_id, room_id, pagin_config, membership, is_peeking
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index a7af04f6f3..10b5dad030 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -104,7 +104,7 @@ class MessageHandler:
 
     async def get_room_data(
         self,
-        user_id: str,
+        requester: Requester,
         room_id: str,
         event_type: str,
         state_key: str,
@@ -112,7 +112,7 @@ class MessageHandler:
         """Get data from a room.
 
         Args:
-            user_id
+            requester: The user who did the request.
             room_id
             event_type
             state_key
@@ -125,7 +125,7 @@ class MessageHandler:
             membership,
             membership_event_id,
         ) = await self.auth.check_user_in_room_or_world_readable(
-            room_id, user_id, allow_departed_users=True
+            room_id, requester, allow_departed_users=True
         )
 
         if membership == Membership.JOIN:
@@ -161,11 +161,10 @@ class MessageHandler:
 
     async def get_state_events(
         self,
-        user_id: str,
+        requester: Requester,
         room_id: str,
         state_filter: Optional[StateFilter] = None,
         at_token: Optional[StreamToken] = None,
-        is_guest: bool = False,
     ) -> List[dict]:
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
@@ -174,14 +173,13 @@ class MessageHandler:
         visible.
 
         Args:
-            user_id: The user requesting state events.
+            requester: The user requesting state events.
             room_id: The room ID to get all state events from.
             state_filter: The state filter used to fetch state from the database.
             at_token: the stream token of the at which we are requesting
                 the stats. If the user is not allowed to view the state as of that
                 stream token, we raise a 403 SynapseError. If None, returns the current
                 state based on the current_state_events table.
-            is_guest: whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         Raises:
@@ -191,6 +189,7 @@ class MessageHandler:
             members of this room.
         """
         state_filter = state_filter or StateFilter.all()
+        user_id = requester.user.to_string()
 
         if at_token:
             last_event_id = (
@@ -223,7 +222,7 @@ class MessageHandler:
                 membership,
                 membership_event_id,
             ) = await self.auth.check_user_in_room_or_world_readable(
-                room_id, user_id, allow_departed_users=True
+                room_id, requester, allow_departed_users=True
             )
 
             if membership == Membership.JOIN:
@@ -317,12 +316,11 @@ class MessageHandler:
         Returns:
             A dict of user_id to profile info
         """
-        user_id = requester.user.to_string()
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
             membership, _ = await self.auth.check_user_in_room_or_world_readable(
-                room_id, user_id, allow_departed_users=True
+                room_id, requester, allow_departed_users=True
             )
             if membership != Membership.JOIN:
                 raise SynapseError(
@@ -340,7 +338,10 @@ class MessageHandler:
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
         # is a user in the room that the AS is "interested in"
-        if requester.app_service and user_id not in users_with_profile:
+        if (
+            requester.app_service
+            and requester.user.to_string() not in users_with_profile
+        ):
             for uid in users_with_profile:
                 if requester.app_service.is_interested_in_user(uid):
                     break
@@ -751,18 +752,12 @@ class EventCreationHandler:
         if builder.type == EventTypes.Member:
             membership = builder.content.get("membership", None)
             if membership == Membership.JOIN:
-                return await self._is_server_notices_room(builder.room_id)
+                return await self.store.is_server_notice_room(builder.room_id)
             elif membership == Membership.LEAVE:
                 # the user is always allowed to leave (but not kick people)
                 return builder.state_key == requester.user.to_string()
         return False
 
-    async def _is_server_notices_room(self, room_id: str) -> bool:
-        if self.config.servernotices.server_notices_mxid is None:
-            return False
-        user_ids = await self.store.get_users_in_room(room_id)
-        return self.config.servernotices.server_notices_mxid in user_ids
-
     async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
         """Check if a user has accepted the privacy policy
 
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 14ba19d4f8..d865ee6e73 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -26,6 +26,7 @@ from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.room import ShutdownRoomResponse
 from synapse.logging.tracing import trace
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.rest.admin._base import assert_user_is_admin
 from synapse.storage.state import StateFilter
 from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, Requester, StreamKeyType
@@ -159,11 +160,9 @@ class PaginationHandler:
         self._retention_allowed_lifetime_max = (
             hs.config.retention.retention_allowed_lifetime_max
         )
+        self._is_master = hs.config.worker.worker_app is None
 
-        if (
-            hs.config.worker.run_background_tasks
-            and hs.config.retention.retention_enabled
-        ):
+        if hs.config.retention.retention_enabled and self._is_master:
             # Run the purge jobs described in the configuration file.
             for job in hs.config.retention.retention_purge_jobs:
                 logger.info("Setting up purge job with config: %s", job)
@@ -425,6 +424,7 @@ class PaginationHandler:
         pagin_config: PaginationConfig,
         as_client_event: bool = True,
         event_filter: Optional[Filter] = None,
+        use_admin_priviledge: bool = False,
     ) -> JsonDict:
         """Get messages in a room.
 
@@ -434,10 +434,16 @@ class PaginationHandler:
             pagin_config: The pagination config rules to apply, if any.
             as_client_event: True to get events in client-server format.
             event_filter: Filter to apply to results or None
+            use_admin_priviledge: if `True`, return all events, regardless
+                of whether `user` has access to them. To be used **ONLY**
+                from the admin API.
 
         Returns:
             Pagination API results
         """
+        if use_admin_priviledge:
+            await assert_user_is_admin(self.auth, requester)
+
         user_id = requester.user.to_string()
 
         if pagin_config.from_token:
@@ -460,12 +466,14 @@ class PaginationHandler:
         room_token = from_token.room_key
 
         async with self.pagination_lock.read(room_id):
-            (
-                membership,
-                member_event_id,
-            ) = await self.auth.check_user_in_room_or_world_readable(
-                room_id, user_id, allow_departed_users=True
-            )
+            (membership, member_event_id) = (None, None)
+            if not use_admin_priviledge:
+                (
+                    membership,
+                    member_event_id,
+                ) = await self.auth.check_user_in_room_or_world_readable(
+                    room_id, requester, allow_departed_users=True
+                )
 
             if pagin_config.direction == "b":
                 # if we're going backwards, we might need to backfill. This
@@ -477,7 +485,7 @@ class PaginationHandler:
                         room_id, room_token.stream
                     )
 
-                if membership == Membership.LEAVE:
+                if not use_admin_priviledge and membership == Membership.LEAVE:
                     # If they have left the room then clamp the token to be before
                     # they left the room, to save the effort of loading from the
                     # database.
@@ -530,12 +538,13 @@ class PaginationHandler:
         if event_filter:
             events = await event_filter.filter(events)
 
-        events = await filter_events_for_client(
-            self._storage_controllers,
-            user_id,
-            events,
-            is_peeking=(member_event_id is None),
-        )
+        if not use_admin_priviledge:
+            events = await filter_events_for_client(
+                self._storage_controllers,
+                user_id,
+                events,
+                is_peeking=(member_event_id is None),
+            )
 
         # if after the filter applied there are no more events
         # return immediately - but there might be more in next_token batch
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 741504ba9f..4e575ffbaa 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2051,8 +2051,7 @@ async def get_interested_remotes(
     )
 
     for room_id, states in room_ids_to_states.items():
-        user_ids = await store.get_users_in_room(room_id)
-        hosts = {get_domain_from_id(user_id) for user_id in user_ids}
+        hosts = await store.get_current_hosts_in_room(room_id)
         for host in hosts:
             hosts_and_states.setdefault(host, set()).update(states)
 
diff --git a/synapse/handlers/push_rules.py b/synapse/handlers/push_rules.py
index 2599160bcc..1219672a59 100644
--- a/synapse/handlers/push_rules.py
+++ b/synapse/handlers/push_rules.py
@@ -16,14 +16,17 @@ from typing import TYPE_CHECKING, List, Optional, Union
 import attr
 
 from synapse.api.errors import SynapseError, UnrecognizedRequestError
-from synapse.push.baserules import BASE_RULE_IDS
 from synapse.storage.push_rule import RuleNotFoundException
+from synapse.synapse_rust.push import get_base_rule_ids
 from synapse.types import JsonDict
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
 
 
+BASE_RULE_IDS = get_base_rule_ids()
+
+
 @attr.s(slots=True, frozen=True, auto_attribs=True)
 class RuleSpec:
     scope: str
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index d4a866b346..d2bdb9c8be 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -163,10 +163,7 @@ class ReceiptsHandler:
         if not is_new:
             return
 
-        if self.federation_sender and receipt_type not in (
-            ReceiptTypes.READ_PRIVATE,
-            ReceiptTypes.UNSTABLE_READ_PRIVATE,
-        ):
+        if self.federation_sender and receipt_type != ReceiptTypes.READ_PRIVATE:
             await self.federation_sender.send_read_receipt(receipt)
 
 
@@ -206,38 +203,24 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
             for event_id, orig_event_content in room.get("content", {}).items():
                 event_content = orig_event_content
                 # If there are private read receipts, additional logic is necessary.
-                if (
-                    ReceiptTypes.READ_PRIVATE in event_content
-                    or ReceiptTypes.UNSTABLE_READ_PRIVATE in event_content
-                ):
+                if ReceiptTypes.READ_PRIVATE in event_content:
                     # Make a copy without private read receipts to avoid leaking
                     # other user's private read receipts..
                     event_content = {
                         receipt_type: receipt_value
                         for receipt_type, receipt_value in event_content.items()
-                        if receipt_type
-                        not in (
-                            ReceiptTypes.READ_PRIVATE,
-                            ReceiptTypes.UNSTABLE_READ_PRIVATE,
-                        )
+                        if receipt_type != ReceiptTypes.READ_PRIVATE
                     }
 
                     # Copy the current user's private read receipt from the
                     # original content, if it exists.
-                    user_private_read_receipt = orig_event_content.get(
-                        ReceiptTypes.READ_PRIVATE, {}
-                    ).get(user_id, None)
+                    user_private_read_receipt = orig_event_content[
+                        ReceiptTypes.READ_PRIVATE
+                    ].get(user_id, None)
                     if user_private_read_receipt:
                         event_content[ReceiptTypes.READ_PRIVATE] = {
                             user_id: user_private_read_receipt
                         }
-                    user_unstable_private_read_receipt = orig_event_content.get(
-                        ReceiptTypes.UNSTABLE_READ_PRIVATE, {}
-                    ).get(user_id, None)
-                    if user_unstable_private_read_receipt:
-                        event_content[ReceiptTypes.UNSTABLE_READ_PRIVATE] = {
-                            user_id: user_unstable_private_read_receipt
-                        }
 
                 # Include the event if there is at least one non-private read
                 # receipt or the current user has a private read receipt.
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c77d181722..20ec22105a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -29,7 +29,13 @@ from synapse.api.constants import (
     JoinRules,
     LoginType,
 )
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    ConsentNotGivenError,
+    InvalidClientTokenError,
+    SynapseError,
+)
 from synapse.appservice import ApplicationService
 from synapse.config.server import is_threepid_reserved
 from synapse.http.servlet import assert_params_in_dict
@@ -180,10 +186,7 @@ class RegistrationHandler:
                 )
             if guest_access_token:
                 user_data = await self.auth.get_user_by_access_token(guest_access_token)
-                if (
-                    not user_data.is_guest
-                    or UserID.from_string(user_data.user_id).localpart != localpart
-                ):
+                if not user_data.is_guest or user_data.user.localpart != localpart:
                     raise AuthError(
                         403,
                         "Cannot register taken user ID without valid guest "
@@ -618,7 +621,7 @@ class RegistrationHandler:
         user_id = user.to_string()
         service = self.store.get_app_service_by_token(as_token)
         if not service:
-            raise AuthError(403, "Invalid application service token.")
+            raise InvalidClientTokenError()
         if not service.is_interested_in_user(user_id):
             raise SynapseError(
                 400,
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 70497353a3..48cc9c1ac5 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -103,7 +103,7 @@ class RelationsHandler:
 
         # TODO Properly handle a user leaving a room.
         (_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
-            room_id, user_id, allow_departed_users=True
+            room_id, requester, allow_departed_users=True
         )
 
         # This gets the original event and checks that a) the event exists and
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 55395457c3..33e9a87002 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -19,6 +19,7 @@ import math
 import random
 import string
 from collections import OrderedDict
+from http import HTTPStatus
 from typing import (
     TYPE_CHECKING,
     Any,
@@ -60,7 +61,6 @@ from synapse.event_auth import validate_event_for_room_version
 from synapse.events import EventBase
 from synapse.events.utils import copy_and_fixup_power_levels_contents
 from synapse.federation.federation_client import InvalidResponseError
-from synapse.handlers.federation import get_domains_from_state
 from synapse.handlers.relations import BundledAggregations
 from synapse.module_api import NOT_SPAM
 from synapse.rest.admin._base import assert_user_is_admin
@@ -705,8 +705,8 @@ class RoomCreationHandler:
                 was, requested, `room_alias`. Secondly, the stream_id of the
                 last persisted event.
         Raises:
-            SynapseError if the room ID couldn't be stored, or something went
-            horribly wrong.
+            SynapseError if the room ID couldn't be stored, 3pid invitation config
+            validation failed, or something went horribly wrong.
             ResourceLimitError if server is blocked to some resource being
             exceeded
         """
@@ -721,7 +721,7 @@ class RoomCreationHandler:
             # allow the server notices mxid to create rooms
             is_requester_admin = True
         else:
-            is_requester_admin = await self.auth.is_server_admin(requester.user)
+            is_requester_admin = await self.auth.is_server_admin(requester)
 
         # Let the third party rules modify the room creation config if needed, or abort
         # the room creation entirely with an exception.
@@ -732,6 +732,19 @@ class RoomCreationHandler:
         invite_3pid_list = config.get("invite_3pid", [])
         invite_list = config.get("invite", [])
 
+        # validate each entry for correctness
+        for invite_3pid in invite_3pid_list:
+            if not all(
+                key in invite_3pid
+                for key in ("medium", "address", "id_server", "id_access_token")
+            ):
+                raise SynapseError(
+                    HTTPStatus.BAD_REQUEST,
+                    "all of `medium`, `address`, `id_server` and `id_access_token` "
+                    "are required when making a 3pid invite",
+                    Codes.MISSING_PARAM,
+                )
+
         if not is_requester_admin:
             spam_check = await self.spam_checker.user_may_create_room(user_id)
             if spam_check != NOT_SPAM:
@@ -979,7 +992,7 @@ class RoomCreationHandler:
 
         for invite_3pid in invite_3pid_list:
             id_server = invite_3pid["id_server"]
-            id_access_token = invite_3pid.get("id_access_token")  # optional
+            id_access_token = invite_3pid["id_access_token"]
             address = invite_3pid["address"]
             medium = invite_3pid["medium"]
             # Note that do_3pid_invite can raise a  ShadowBanError, but this was
@@ -1279,13 +1292,16 @@ class RoomContextHandler:
         """
         user = requester.user
         if use_admin_priviledge:
-            await assert_user_is_admin(self.auth, requester.user)
+            await assert_user_is_admin(self.auth, requester)
 
         before_limit = math.floor(limit / 2.0)
         after_limit = limit - before_limit
 
-        users = await self.store.get_users_in_room(room_id)
-        is_peeking = user.to_string() not in users
+        is_user_in_room = await self.store.check_local_user_in_room(
+            user_id=user.to_string(), room_id=room_id
+        )
+        # The user is peeking if they aren't in the room already
+        is_peeking = not is_user_in_room
 
         async def filter_evts(events: List[EventBase]) -> List[EventBase]:
             if use_admin_priviledge:
@@ -1459,17 +1475,16 @@ class TimestampLookupHandler:
                 timestamp,
             )
 
-            # Find other homeservers from the given state in the room
-            curr_state = await self._storage_controllers.state.get_current_state(
-                room_id
+            likely_domains = (
+                await self._storage_controllers.state.get_current_hosts_in_room(room_id)
             )
-            curr_domains = get_domains_from_state(curr_state)
-            likely_domains = [
-                domain for domain, depth in curr_domains if domain != self.server_name
-            ]
 
             # Loop through each homeserver candidate until we get a succesful response
             for domain in likely_domains:
+                # We don't want to ask our own server for information we don't have
+                if domain == self.server_name:
+                    continue
+
                 try:
                     remote_response = await self.federation_client.timestamp_to_event(
                         domain, room_id, timestamp, direction
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0f95d4127e..e0d0a8941c 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -179,7 +179,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         """Try and join a room that this server is not in
 
         Args:
-            requester
+            requester: The user making the request, according to the access token.
             remote_room_hosts: List of servers that can be used to join via.
             room_id: Room that we are trying to join
             user: User who is trying to join
@@ -689,7 +689,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 errcode=Codes.BAD_JSON,
             )
 
-        if "avatar_url" in content:
+        if "avatar_url" in content and content.get("avatar_url") is not None:
             if not await self.profile_handler.check_avatar_size_and_mime_type(
                 content["avatar_url"],
             ):
@@ -744,7 +744,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 is_requester_admin = True
 
             else:
-                is_requester_admin = await self.auth.is_server_admin(requester.user)
+                is_requester_admin = await self.auth.is_server_admin(requester)
 
             if not is_requester_admin:
                 if self.config.server.block_non_admin_invites:
@@ -837,7 +837,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 old_membership == Membership.INVITE
                 and effective_membership_state == Membership.LEAVE
             ):
-                is_blocked = await self._is_server_notice_room(room_id)
+                is_blocked = await self.store.is_server_notice_room(room_id)
                 if is_blocked:
                     raise SynapseError(
                         HTTPStatus.FORBIDDEN,
@@ -868,7 +868,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 bypass_spam_checker = True
 
             else:
-                bypass_spam_checker = await self.auth.is_server_admin(requester.user)
+                bypass_spam_checker = await self.auth.is_server_admin(requester)
 
             inviter = await self._get_inviter(target.to_string(), room_id)
             if (
@@ -1382,7 +1382,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         id_server: str,
         requester: Requester,
         txn_id: Optional[str],
-        id_access_token: Optional[str] = None,
+        id_access_token: str,
         prev_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
     ) -> Tuple[str, int]:
@@ -1397,7 +1397,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             requester: The user making the request.
             txn_id: The transaction ID this is part of, or None if this is not
                 part of a transaction.
-            id_access_token: The optional identity server access token.
+            id_access_token: Identity server access token.
             depth: Override the depth used to order the event in the DAG.
             prev_event_ids: The event IDs to use as the prev events
                 Should normally be set to None, which will cause the depth to be calculated
@@ -1410,7 +1410,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             ShadowBanError if the requester has been shadow-banned.
         """
         if self.config.server.block_non_admin_invites:
-            is_requester_admin = await self.auth.is_server_admin(requester.user)
+            is_requester_admin = await self.auth.is_server_admin(requester)
             if not is_requester_admin:
                 raise SynapseError(
                     403, "Invites have been disabled on this server", Codes.FORBIDDEN
@@ -1494,7 +1494,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         room_id: str,
         user: UserID,
         txn_id: Optional[str],
-        id_access_token: Optional[str] = None,
+        id_access_token: str,
         prev_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, int]:
@@ -1617,12 +1617,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
 
         return False
 
-    async def _is_server_notice_room(self, room_id: str) -> bool:
-        if self._server_notices_mxid is None:
-            return False
-        user_ids = await self.store.get_users_in_room(room_id)
-        return self._server_notices_mxid in user_ids
-
 
 class RoomMemberMasterHandler(RoomMemberHandler):
     def __init__(self, hs: "HomeServer"):
@@ -1693,7 +1687,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             check_complexity
             and self.hs.config.server.limit_remote_rooms.admins_can_join
         ):
-            check_complexity = not await self.auth.is_server_admin(user)
+            check_complexity = not await self.store.is_server_admin(user)
 
         if check_complexity:
             # Fetch the room complexity
@@ -1923,8 +1917,11 @@ class RoomMemberMasterHandler(RoomMemberHandler):
         ]:
             raise SynapseError(400, "User %s in room %s" % (user_id, room_id))
 
-        if membership:
-            await self.store.forget(user_id, room_id)
+        # In normal case this call is only required if `membership` is not `None`.
+        # But: After the last member had left the room, the background update
+        # `_background_remove_left_rooms` is deleting rows related to this room from
+        # the table `current_state_events` and `get_current_state_events` is `None`.
+        await self.store.forget(user_id, room_id)
 
 
 def get_users_which_can_issue_invite(auth_events: StateMap[EventBase]) -> List[str]:
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 732b0310bc..ebd445adca 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -453,7 +453,6 @@ class RoomSummaryHandler:
                 "type": e.type,
                 "state_key": e.state_key,
                 "content": e.content,
-                "room_id": e.room_id,
                 "sender": e.sender,
                 "origin_server_ts": e.origin_server_ts,
             }
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 2991967226..f7fd6d7933 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -15,6 +15,7 @@ import itertools
 import logging
 from typing import (
     TYPE_CHECKING,
+    AbstractSet,
     Any,
     Collection,
     Dict,
@@ -1418,10 +1419,10 @@ class SyncHandler:
     async def _generate_sync_entry_for_device_list(
         self,
         sync_result_builder: "SyncResultBuilder",
-        newly_joined_rooms: Set[str],
-        newly_joined_or_invited_or_knocked_users: Set[str],
-        newly_left_rooms: Set[str],
-        newly_left_users: Set[str],
+        newly_joined_rooms: AbstractSet[str],
+        newly_joined_or_invited_or_knocked_users: AbstractSet[str],
+        newly_left_rooms: AbstractSet[str],
+        newly_left_users: AbstractSet[str],
     ) -> DeviceListUpdates:
         """Generate the DeviceListUpdates section of sync
 
@@ -1439,8 +1440,7 @@ class SyncHandler:
         user_id = sync_result_builder.sync_config.user.to_string()
         since_token = sync_result_builder.since_token
 
-        # We're going to mutate these fields, so lets copy them rather than
-        # assume they won't get used later.
+        # Take a copy since these fields will be mutated later.
         newly_joined_or_invited_or_knocked_users = set(
             newly_joined_or_invited_or_knocked_users
         )
@@ -1640,8 +1640,8 @@ class SyncHandler:
     async def _generate_sync_entry_for_presence(
         self,
         sync_result_builder: "SyncResultBuilder",
-        newly_joined_rooms: Set[str],
-        newly_joined_or_invited_users: Set[str],
+        newly_joined_rooms: AbstractSet[str],
+        newly_joined_or_invited_users: AbstractSet[str],
     ) -> None:
         """Generates the presence portion of the sync response. Populates the
         `sync_result_builder` with the result.
@@ -1699,7 +1699,7 @@ class SyncHandler:
         self,
         sync_result_builder: "SyncResultBuilder",
         account_data_by_room: Dict[str, Dict[str, JsonDict]],
-    ) -> Tuple[Set[str], Set[str], Set[str], Set[str]]:
+    ) -> Tuple[AbstractSet[str], AbstractSet[str], AbstractSet[str], AbstractSet[str]]:
         """Generates the rooms portion of the sync response. Populates the
         `sync_result_builder` with the result.
 
@@ -2426,10 +2426,10 @@ class SyncHandler:
                     joined_room.room_id, joined_room.event_pos.stream
                 )
             )
-            users_in_room = await self.state.get_current_users_in_room(
+            user_ids_in_room = await self.state.get_current_user_ids_in_room(
                 joined_room.room_id, extrems
             )
-            if user_id in users_in_room:
+            if user_id in user_ids_in_room:
                 joined_room_ids.add(joined_room.room_id)
 
         return frozenset(joined_room_ids)
@@ -2539,7 +2539,7 @@ class SyncResultBuilder:
     archived: List[ArchivedSyncResult] = attr.Factory(list)
     to_device: List[JsonDict] = attr.Factory(list)
 
-    def calculate_user_changes(self) -> Tuple[Set[str], Set[str]]:
+    def calculate_user_changes(self) -> Tuple[AbstractSet[str], AbstractSet[str]]:
         """Work out which other users have joined or left rooms we are joined to.
 
         This data only is only useful for an incremental sync.
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 27aa0d3126..a4cd8b8f0c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,7 +26,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.replication.tcp.streams import TypingStream
 from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -253,12 +253,11 @@ class TypingWriterHandler(FollowerTypingHandler):
         self, target_user: UserID, requester: Requester, room_id: str, timeout: int
     ) -> None:
         target_user_id = target_user.to_string()
-        auth_user_id = requester.user.to_string()
 
         if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this homeserver")
 
-        if target_user_id != auth_user_id:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's typing state")
 
         if requester.shadow_banned:
@@ -266,7 +265,7 @@ class TypingWriterHandler(FollowerTypingHandler):
             await self.clock.sleep(random.randint(1, 10))
             raise ShadowBanError()
 
-        await self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, requester)
 
         logger.debug("%s has started typing in %s", target_user_id, room_id)
 
@@ -289,12 +288,11 @@ class TypingWriterHandler(FollowerTypingHandler):
         self, target_user: UserID, requester: Requester, room_id: str
     ) -> None:
         target_user_id = target_user.to_string()
-        auth_user_id = requester.user.to_string()
 
         if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this homeserver")
 
-        if target_user_id != auth_user_id:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's typing state")
 
         if requester.shadow_banned:
@@ -302,7 +300,7 @@ class TypingWriterHandler(FollowerTypingHandler):
             await self.clock.sleep(random.randint(1, 10))
             raise ShadowBanError()
 
-        await self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, requester)
 
         logger.debug("%s has stopped typing in %s", target_user_id, room_id)
 
@@ -364,8 +362,9 @@ class TypingWriterHandler(FollowerTypingHandler):
             )
             return
 
-        users = await self.store.get_users_in_room(room_id)
-        domains = {get_domain_from_id(u) for u in users}
+        domains = await self._storage_controllers.state.get_current_hosts_in_room(
+            room_id
+        )
 
         if self.server_name in domains:
             logger.info("Got typing update from %s: %r", user_id, content)
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 05cebb5d4d..a744d68c64 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -19,7 +19,6 @@ from twisted.web.client import PartialDownloadError
 
 from synapse.api.constants import LoginType
 from synapse.api.errors import Codes, LoginError, SynapseError
-from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.util import json_decoder
 
 if TYPE_CHECKING:
@@ -153,7 +152,7 @@ class _BaseThreepidAuthChecker:
 
         logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
 
-        # msisdns are currently always ThreepidBehaviour.REMOTE
+        # msisdns are currently always verified via the IS
         if medium == "msisdn":
             if not self.hs.config.registration.account_threepid_delegate_msisdn:
                 raise SynapseError(
@@ -164,18 +163,7 @@ class _BaseThreepidAuthChecker:
                 threepid_creds,
             )
         elif medium == "email":
-            if (
-                self.hs.config.email.threepid_behaviour_email
-                == ThreepidBehaviour.REMOTE
-            ):
-                assert self.hs.config.registration.account_threepid_delegate_email
-                threepid = await identity_handler.threepid_from_creds(
-                    self.hs.config.registration.account_threepid_delegate_email,
-                    threepid_creds,
-                )
-            elif (
-                self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
-            ):
+            if self.hs.config.email.can_verify_email:
                 threepid = None
                 row = await self.store.get_threepid_validation_session(
                     medium,
@@ -227,10 +215,7 @@ class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChec
         _BaseThreepidAuthChecker.__init__(self, hs)
 
     def is_enabled(self) -> bool:
-        return self.hs.config.email.threepid_behaviour_email in (
-            ThreepidBehaviour.REMOTE,
-            ThreepidBehaviour.LOCAL,
-        )
+        return self.hs.config.email.can_verify_email
 
     async def check_auth(self, authdict: dict, clientip: str) -> Any:
         return await self._check_threepid("email", authdict)