summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py17
-rw-r--r--synapse/handlers/device.py17
-rw-r--r--synapse/handlers/directory.py34
-rw-r--r--synapse/handlers/event_auth.py9
-rw-r--r--synapse/handlers/events.py9
-rw-r--r--synapse/handlers/federation.py73
-rw-r--r--synapse/handlers/federation_event.py173
-rw-r--r--synapse/handlers/identity.py56
-rw-r--r--synapse/handlers/initial_sync.py6
-rw-r--r--synapse/handlers/message.py35
-rw-r--r--synapse/handlers/pagination.py2
-rw-r--r--synapse/handlers/presence.py3
-rw-r--r--synapse/handlers/register.py15
-rw-r--r--synapse/handlers/relations.py2
-rw-r--r--synapse/handlers/room.py11
-rw-r--r--synapse/handlers/room_member.py18
-rw-r--r--synapse/handlers/sync.py257
-rw-r--r--synapse/handlers/typing.py17
-rw-r--r--synapse/handlers/ui_auth/checkers.py21
19 files changed, 555 insertions, 220 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index bfa5535044..0327fc57a4 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -280,7 +280,7 @@ class AuthHandler:
         that it isn't stolen by re-authenticating them.
 
         Args:
-            requester: The user, as given by the access token
+            requester: The user making the request, according to the access token.
 
             request: The request sent by the client.
 
@@ -1435,20 +1435,25 @@ class AuthHandler:
             access_token: access token to be deleted
 
         """
-        user_info = await self.auth.get_user_by_access_token(access_token)
+        token = await self.store.get_user_by_access_token(access_token)
+        if not token:
+            # At this point, the token should already have been fetched once by
+            # the caller, so this should not happen, unless of a race condition
+            # between two delete requests
+            raise SynapseError(HTTPStatus.UNAUTHORIZED, "Unrecognised access token")
         await self.store.delete_access_token(access_token)
 
         # see if any modules want to know about this
         await self.password_auth_provider.on_logged_out(
-            user_id=user_info.user_id,
-            device_id=user_info.device_id,
+            user_id=token.user_id,
+            device_id=token.device_id,
             access_token=access_token,
         )
 
         # delete pushers associated with this access token
-        if user_info.token_id is not None:
+        if token.token_id is not None:
             await self.hs.get_pusherpool().remove_pushers_by_access_token(
-                user_info.user_id, (user_info.token_id,)
+                token.user_id, (token.token_id,)
             )
 
     async def delete_access_tokens_for_user(
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 1a8379854c..9c2c3a0e68 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -74,6 +74,7 @@ class DeviceWorkerHandler:
         self._state_storage = hs.get_storage_controllers().state
         self._auth_handler = hs.get_auth_handler()
         self.server_name = hs.hostname
+        self._msc3852_enabled = hs.config.experimental.msc3852_enabled
 
     @trace
     async def get_devices_by_user(self, user_id: str) -> List[JsonDict]:
@@ -309,6 +310,7 @@ class DeviceHandler(DeviceWorkerHandler):
         super().__init__(hs)
 
         self.federation_sender = hs.get_federation_sender()
+        self._storage_controllers = hs.get_storage_controllers()
 
         self.device_list_updater = DeviceListUpdater(hs, self)
 
@@ -693,8 +695,11 @@ class DeviceHandler(DeviceWorkerHandler):
 
                     # Ignore any users that aren't ours
                     if self.hs.is_mine_id(user_id):
-                        joined_user_ids = await self.store.get_users_in_room(room_id)
-                        hosts = {get_domain_from_id(u) for u in joined_user_ids}
+                        hosts = set(
+                            await self._storage_controllers.state.get_current_hosts_in_room(
+                                room_id
+                            )
+                        )
                         hosts.discard(self.server_name)
 
                     # Check if we've already sent this update to some hosts
@@ -747,7 +752,13 @@ def _update_device_from_client_ips(
     device: JsonDict, client_ips: Mapping[Tuple[str, str], Mapping[str, Any]]
 ) -> None:
     ip = client_ips.get((device["user_id"], device["device_id"]), {})
-    device.update({"last_seen_ts": ip.get("last_seen"), "last_seen_ip": ip.get("ip")})
+    device.update(
+        {
+            "last_seen_user_agent": ip.get("user_agent"),
+            "last_seen_ts": ip.get("last_seen"),
+            "last_seen_ip": ip.get("ip"),
+        }
+    )
 
 
 class DeviceListUpdater:
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index 09a7a4b238..7127d5aefc 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -30,7 +30,7 @@ from synapse.api.errors import (
 from synapse.appservice import ApplicationService
 from synapse.module_api import NOT_SPAM
 from synapse.storage.databases.main.directory import RoomAliasMapping
-from synapse.types import JsonDict, Requester, RoomAlias, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, RoomAlias
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -83,8 +83,9 @@ class DirectoryHandler:
         # TODO(erikj): Add transactions.
         # TODO(erikj): Check if there is a current association.
         if not servers:
-            users = await self.store.get_users_in_room(room_id)
-            servers = {get_domain_from_id(u) for u in users}
+            servers = await self._storage_controllers.state.get_current_hosts_in_room(
+                room_id
+            )
 
         if not servers:
             raise SynapseError(400, "Failed to get server list")
@@ -133,7 +134,7 @@ class DirectoryHandler:
         else:
             # Server admins are not subject to the same constraints as normal
             # users when creating an alias (e.g. being in the room).
-            is_admin = await self.auth.is_server_admin(requester.user)
+            is_admin = await self.auth.is_server_admin(requester)
 
             if (self.require_membership and check_membership) and not is_admin:
                 rooms_for_user = await self.store.get_rooms_for_user(user_id)
@@ -197,7 +198,7 @@ class DirectoryHandler:
         user_id = requester.user.to_string()
 
         try:
-            can_delete = await self._user_can_delete_alias(room_alias, user_id)
+            can_delete = await self._user_can_delete_alias(room_alias, requester)
         except StoreError as e:
             if e.code == 404:
                 raise NotFoundError("Unknown room alias")
@@ -287,8 +288,9 @@ class DirectoryHandler:
                 Codes.NOT_FOUND,
             )
 
-        users = await self.store.get_users_in_room(room_id)
-        extra_servers = {get_domain_from_id(u) for u in users}
+        extra_servers = await self._storage_controllers.state.get_current_hosts_in_room(
+            room_id
+        )
         servers_set = set(extra_servers) | set(servers)
 
         # If this server is in the list of servers, return it first.
@@ -400,7 +402,9 @@ class DirectoryHandler:
         # either no interested services, or no service with an exclusive lock
         return True
 
-    async def _user_can_delete_alias(self, alias: RoomAlias, user_id: str) -> bool:
+    async def _user_can_delete_alias(
+        self, alias: RoomAlias, requester: Requester
+    ) -> bool:
         """Determine whether a user can delete an alias.
 
         One of the following must be true:
@@ -413,7 +417,7 @@ class DirectoryHandler:
         """
         creator = await self.store.get_room_alias_creator(alias.to_string())
 
-        if creator == user_id:
+        if creator == requester.user.to_string():
             return True
 
         # Resolve the alias to the corresponding room.
@@ -422,9 +426,7 @@ class DirectoryHandler:
         if not room_id:
             return False
 
-        return await self.auth.check_can_change_room_list(
-            room_id, UserID.from_string(user_id)
-        )
+        return await self.auth.check_can_change_room_list(room_id, requester)
 
     async def edit_published_room_list(
         self, requester: Requester, room_id: str, visibility: str
@@ -463,7 +465,7 @@ class DirectoryHandler:
             raise SynapseError(400, "Unknown room")
 
         can_change_room_list = await self.auth.check_can_change_room_list(
-            room_id, requester.user
+            room_id, requester
         )
         if not can_change_room_list:
             raise AuthError(
@@ -528,10 +530,8 @@ class DirectoryHandler:
         Get a list of the aliases that currently point to this room on this server
         """
         # allow access to server admins and current members of the room
-        is_admin = await self.auth.is_server_admin(requester.user)
+        is_admin = await self.auth.is_server_admin(requester)
         if not is_admin:
-            await self.auth.check_user_in_room_or_world_readable(
-                room_id, requester.user.to_string()
-            )
+            await self.auth.check_user_in_room_or_world_readable(room_id, requester)
 
         return await self.store.get_aliases_for_room(room_id)
diff --git a/synapse/handlers/event_auth.py b/synapse/handlers/event_auth.py
index a2dd9c7efa..c3ddc5d182 100644
--- a/synapse/handlers/event_auth.py
+++ b/synapse/handlers/event_auth.py
@@ -129,12 +129,9 @@ class EventAuthHandler:
         else:
             users = {}
 
-        # Find the user with the highest power level.
-        users_in_room = await self._store.get_users_in_room(room_id)
-        # Only interested in local users.
-        local_users_in_room = [
-            u for u in users_in_room if get_domain_from_id(u) == self._server_name
-        ]
+        # Find the user with the highest power level (only interested in local
+        # users).
+        local_users_in_room = await self._store.get_local_users_in_room(room_id)
         chosen_user = max(
             local_users_in_room,
             key=lambda user: users.get(user, users_default_level),
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index ac13340d3a..949b69cb41 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -151,7 +151,7 @@ class EventHandler:
         """Retrieve a single specified event.
 
         Args:
-            user: The user requesting the event
+            user: The local user requesting the event
             room_id: The expected room id. We'll return None if the
                 event's room does not match.
             event_id: The event ID to obtain.
@@ -173,8 +173,11 @@ class EventHandler:
         if not event:
             return None
 
-        users = await self.store.get_users_in_room(event.room_id)
-        is_peeking = user.to_string() not in users
+        is_user_in_room = await self.store.check_local_user_in_room(
+            user_id=user.to_string(), room_id=event.room_id
+        )
+        # The user is peeking if they aren't in the room already
+        is_peeking = not is_user_in_room
 
         filtered = await filter_events_for_client(
             self._storage_controllers, user.to_string(), [event], is_peeking=is_peeking
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 5042236742..e151962055 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -32,6 +32,7 @@ from typing import (
 )
 
 import attr
+from prometheus_client import Histogram
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -59,7 +60,7 @@ from synapse.events.validator import EventValidator
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import nested_logging_context
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import SynapseTags, set_tag, tag_args, trace
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.module_api import NOT_SPAM
 from synapse.replication.http.federation import (
@@ -79,6 +80,29 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
+# Added to debug performance and track progress on optimizations
+backfill_processing_before_timer = Histogram(
+    "synapse_federation_backfill_processing_before_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        0.1,
+        0.5,
+        1.0,
+        2.5,
+        5.0,
+        7.5,
+        10.0,
+        15.0,
+        20.0,
+        30.0,
+        40.0,
+        60.0,
+        80.0,
+        "+Inf",
+    ),
+)
+
 
 def get_domains_from_state(state: StateMap[EventBase]) -> List[Tuple[str, int]]:
     """Get joined domains from state
@@ -138,6 +162,7 @@ class FederationHandler:
     def __init__(self, hs: "HomeServer"):
         self.hs = hs
 
+        self.clock = hs.get_clock()
         self.store = hs.get_datastores().main
         self._storage_controllers = hs.get_storage_controllers()
         self._state_storage_controller = self._storage_controllers.state
@@ -197,12 +222,39 @@ class FederationHandler:
                 return. This is used as part of the heuristic to decide if we
                 should back paginate.
         """
+        # Starting the processing time here so we can include the room backfill
+        # linearizer lock queue in the timing
+        processing_start_time = self.clock.time_msec()
+
         async with self._room_backfill.queue(room_id):
-            return await self._maybe_backfill_inner(room_id, current_depth, limit)
+            return await self._maybe_backfill_inner(
+                room_id,
+                current_depth,
+                limit,
+                processing_start_time=processing_start_time,
+            )
 
     async def _maybe_backfill_inner(
-        self, room_id: str, current_depth: int, limit: int
+        self,
+        room_id: str,
+        current_depth: int,
+        limit: int,
+        *,
+        processing_start_time: int,
     ) -> bool:
+        """
+        Checks whether the `current_depth` is at or approaching any backfill
+        points in the room and if so, will backfill. We only care about
+        checking backfill points that happened before the `current_depth`
+        (meaning less than or equal to the `current_depth`).
+
+        Args:
+            room_id: The room to backfill in.
+            current_depth: The depth to check at for any upcoming backfill points.
+            limit: The max number of events to request from the remote federated server.
+            processing_start_time: The time when `maybe_backfill` started
+                processing. Only used for timing.
+        """
         backwards_extremities = [
             _BackfillPoint(event_id, depth, _BackfillPointType.BACKWARDS_EXTREMITY)
             for event_id, depth in await self.store.get_oldest_event_ids_with_depth_in_room(
@@ -370,6 +422,14 @@ class FederationHandler:
         logger.debug(
             "_maybe_backfill_inner: extremities_to_request %s", extremities_to_request
         )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "extremities_to_request",
+            str(extremities_to_request),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "extremities_to_request.length",
+            str(len(extremities_to_request)),
+        )
 
         # Now we need to decide which hosts to hit first.
 
@@ -425,6 +485,11 @@ class FederationHandler:
 
             return False
 
+        processing_end_time = self.clock.time_msec()
+        backfill_processing_before_timer.observe(
+            (processing_end_time - processing_start_time) / 1000
+        )
+
         success = await try_backfill(likely_domains)
         if success:
             return True
@@ -1081,6 +1146,8 @@ class FederationHandler:
 
         return event
 
+    @trace
+    @tag_args
     async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
         """Returns the state at the event. i.e. not including said event."""
         event = await self.store.get_event(event_id, check_room_id=room_id)
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index 8968b705d4..048c4111f6 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -29,7 +29,7 @@ from typing import (
     Tuple,
 )
 
-from prometheus_client import Counter
+from prometheus_client import Counter, Histogram
 
 from synapse import event_auth
 from synapse.api.constants import (
@@ -59,7 +59,13 @@ from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.logging.context import nested_logging_context
-from synapse.logging.opentracing import trace
+from synapse.logging.opentracing import (
+    SynapseTags,
+    set_tag,
+    start_active_span,
+    tag_args,
+    trace,
+)
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
@@ -92,6 +98,36 @@ soft_failed_event_counter = Counter(
     "Events received over federation that we marked as soft_failed",
 )
 
+# Added to debug performance and track progress on optimizations
+backfill_processing_after_timer = Histogram(
+    "synapse_federation_backfill_processing_after_time_seconds",
+    "sec",
+    [],
+    buckets=(
+        0.1,
+        0.25,
+        0.5,
+        1.0,
+        2.5,
+        5.0,
+        7.5,
+        10.0,
+        15.0,
+        20.0,
+        25.0,
+        30.0,
+        40.0,
+        50.0,
+        60.0,
+        80.0,
+        100.0,
+        120.0,
+        150.0,
+        180.0,
+        "+Inf",
+    ),
+)
+
 
 class FederationEventHandler:
     """Handles events that originated from federation.
@@ -410,6 +446,7 @@ class FederationEventHandler:
             prev_member_event,
         )
 
+    @trace
     async def process_remote_join(
         self,
         origin: str,
@@ -597,20 +634,21 @@ class FederationEventHandler:
         if not events:
             return
 
-        # if there are any events in the wrong room, the remote server is buggy and
-        # should not be trusted.
-        for ev in events:
-            if ev.room_id != room_id:
-                raise InvalidResponseError(
-                    f"Remote server {dest} returned event {ev.event_id} which is in "
-                    f"room {ev.room_id}, when we were backfilling in {room_id}"
-                )
+        with backfill_processing_after_timer.time():
+            # if there are any events in the wrong room, the remote server is buggy and
+            # should not be trusted.
+            for ev in events:
+                if ev.room_id != room_id:
+                    raise InvalidResponseError(
+                        f"Remote server {dest} returned event {ev.event_id} which is in "
+                        f"room {ev.room_id}, when we were backfilling in {room_id}"
+                    )
 
-        await self._process_pulled_events(
-            dest,
-            events,
-            backfilled=True,
-        )
+            await self._process_pulled_events(
+                dest,
+                events,
+                backfilled=True,
+            )
 
     @trace
     async def _get_missing_events_for_pdu(
@@ -715,7 +753,7 @@ class FederationEventHandler:
 
     @trace
     async def _process_pulled_events(
-        self, origin: str, events: Iterable[EventBase], backfilled: bool
+        self, origin: str, events: Collection[EventBase], backfilled: bool
     ) -> None:
         """Process a batch of events we have pulled from a remote server
 
@@ -730,6 +768,15 @@ class FederationEventHandler:
             backfilled: True if this is part of a historical batch of events (inhibits
                 notification to clients, and validation of device keys.)
         """
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            str([event.event_id for event in events]),
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(events)),
+        )
+        set_tag(SynapseTags.FUNC_ARG_PREFIX + "backfilled", str(backfilled))
         logger.debug(
             "processing pulled backfilled=%s events=%s",
             backfilled,
@@ -753,6 +800,7 @@ class FederationEventHandler:
                 await self._process_pulled_event(origin, ev, backfilled=backfilled)
 
     @trace
+    @tag_args
     async def _process_pulled_event(
         self, origin: str, event: EventBase, backfilled: bool
     ) -> None:
@@ -854,6 +902,7 @@ class FederationEventHandler:
             else:
                 raise
 
+    @trace
     async def _compute_event_context_with_maybe_missing_prevs(
         self, dest: str, event: EventBase
     ) -> EventContext:
@@ -970,6 +1019,8 @@ class FederationEventHandler:
             event, state_ids_before_event=state_map, partial_state=partial_state
         )
 
+    @trace
+    @tag_args
     async def _get_state_ids_after_missing_prev_event(
         self,
         destination: str,
@@ -1009,10 +1060,10 @@ class FederationEventHandler:
         logger.debug("Fetching %i events from cache/store", len(desired_events))
         have_events = await self._store.have_seen_events(room_id, desired_events)
 
-        missing_desired_events = desired_events - have_events
+        missing_desired_event_ids = desired_events - have_events
         logger.debug(
             "We are missing %i events (got %i)",
-            len(missing_desired_events),
+            len(missing_desired_event_ids),
             len(have_events),
         )
 
@@ -1024,13 +1075,30 @@ class FederationEventHandler:
         #   already have a bunch of the state events. It would be nice if the
         #   federation api gave us a way of finding out which we actually need.
 
-        missing_auth_events = set(auth_event_ids) - have_events
-        missing_auth_events.difference_update(
-            await self._store.have_seen_events(room_id, missing_auth_events)
+        missing_auth_event_ids = set(auth_event_ids) - have_events
+        missing_auth_event_ids.difference_update(
+            await self._store.have_seen_events(room_id, missing_auth_event_ids)
         )
-        logger.debug("We are also missing %i auth events", len(missing_auth_events))
+        logger.debug("We are also missing %i auth events", len(missing_auth_event_ids))
 
-        missing_events = missing_desired_events | missing_auth_events
+        missing_event_ids = missing_desired_event_ids | missing_auth_event_ids
+
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_auth_event_ids",
+            str(missing_auth_event_ids),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_auth_event_ids.length",
+            str(len(missing_auth_event_ids)),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_desired_event_ids",
+            str(missing_desired_event_ids),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "missing_desired_event_ids.length",
+            str(len(missing_desired_event_ids)),
+        )
 
         # Making an individual request for each of 1000s of events has a lot of
         # overhead. On the other hand, we don't really want to fetch all of the events
@@ -1041,13 +1109,13 @@ class FederationEventHandler:
         #
         # TODO: might it be better to have an API which lets us do an aggregate event
         #   request
-        if (len(missing_events) * 10) >= len(auth_event_ids) + len(state_event_ids):
+        if (len(missing_event_ids) * 10) >= len(auth_event_ids) + len(state_event_ids):
             logger.debug("Requesting complete state from remote")
             await self._get_state_and_persist(destination, room_id, event_id)
         else:
-            logger.debug("Fetching %i events from remote", len(missing_events))
+            logger.debug("Fetching %i events from remote", len(missing_event_ids))
             await self._get_events_and_persist(
-                destination=destination, room_id=room_id, event_ids=missing_events
+                destination=destination, room_id=room_id, event_ids=missing_event_ids
             )
 
         # We now need to fill out the state map, which involves fetching the
@@ -1104,6 +1172,14 @@ class FederationEventHandler:
                 event_id,
                 failed_to_fetch,
             )
+            set_tag(
+                SynapseTags.RESULT_PREFIX + "failed_to_fetch",
+                str(failed_to_fetch),
+            )
+            set_tag(
+                SynapseTags.RESULT_PREFIX + "failed_to_fetch.length",
+                str(len(failed_to_fetch)),
+            )
 
         if remote_event.is_state() and remote_event.rejected_reason is None:
             state_map[
@@ -1112,6 +1188,8 @@ class FederationEventHandler:
 
         return state_map
 
+    @trace
+    @tag_args
     async def _get_state_and_persist(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1133,6 +1211,7 @@ class FederationEventHandler:
                 destination=destination, room_id=room_id, event_ids=(event_id,)
             )
 
+    @trace
     async def _process_received_pdu(
         self,
         origin: str,
@@ -1283,6 +1362,7 @@ class FederationEventHandler:
         except Exception:
             logger.exception("Failed to resync device for %s", sender)
 
+    @trace
     async def _handle_marker_event(self, origin: str, marker_event: EventBase) -> None:
         """Handles backfilling the insertion event when we receive a marker
         event that points to one.
@@ -1314,7 +1394,7 @@ class FederationEventHandler:
         logger.debug("_handle_marker_event: received %s", marker_event)
 
         insertion_event_id = marker_event.content.get(
-            EventContentFields.MSC2716_MARKER_INSERTION
+            EventContentFields.MSC2716_INSERTION_EVENT_REFERENCE
         )
 
         if insertion_event_id is None:
@@ -1414,6 +1494,8 @@ class FederationEventHandler:
 
         return event_from_response
 
+    @trace
+    @tag_args
     async def _get_events_and_persist(
         self, destination: str, room_id: str, event_ids: Collection[str]
     ) -> None:
@@ -1459,6 +1541,7 @@ class FederationEventHandler:
         logger.info("Fetched %i events of %i requested", len(events), len(event_ids))
         await self._auth_and_persist_outliers(room_id, events)
 
+    @trace
     async def _auth_and_persist_outliers(
         self, room_id: str, events: Iterable[EventBase]
     ) -> None:
@@ -1477,6 +1560,16 @@ class FederationEventHandler:
         """
         event_map = {event.event_id: event for event in events}
 
+        event_ids = event_map.keys()
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids",
+            str(event_ids),
+        )
+        set_tag(
+            SynapseTags.FUNC_ARG_PREFIX + "event_ids.length",
+            str(len(event_ids)),
+        )
+
         # filter out any events we have already seen. This might happen because
         # the events were eagerly pushed to us (eg, during a room join), or because
         # another thread has raced against us since we decided to request the event.
@@ -1593,6 +1686,7 @@ class FederationEventHandler:
             backfilled=True,
         )
 
+    @trace
     async def _check_event_auth(
         self, origin: Optional[str], event: EventBase, context: EventContext
     ) -> None:
@@ -1631,6 +1725,14 @@ class FederationEventHandler:
         claimed_auth_events = await self._load_or_fetch_auth_events_for_event(
             origin, event
         )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "claimed_auth_events",
+            str([ev.event_id for ev in claimed_auth_events]),
+        )
+        set_tag(
+            SynapseTags.RESULT_PREFIX + "claimed_auth_events.length",
+            str(len(claimed_auth_events)),
+        )
 
         # ... and check that the event passes auth at those auth events.
         # https://spec.matrix.org/v1.3/server-server-api/#checks-performed-on-receipt-of-a-pdu:
@@ -1728,6 +1830,7 @@ class FederationEventHandler:
             )
             context.rejected = RejectedReason.AUTH_ERROR
 
+    @trace
     async def _maybe_kick_guest_users(self, event: EventBase) -> None:
         if event.type != EventTypes.GuestAccess:
             return
@@ -1935,6 +2038,8 @@ class FederationEventHandler:
         # instead we raise an AuthError, which will make the caller ignore it.
         raise AuthError(code=HTTPStatus.FORBIDDEN, msg="Auth events could not be found")
 
+    @trace
+    @tag_args
     async def _get_remote_auth_chain_for_event(
         self, destination: str, room_id: str, event_id: str
     ) -> None:
@@ -1963,6 +2068,7 @@ class FederationEventHandler:
 
         await self._auth_and_persist_outliers(room_id, remote_auth_events)
 
+    @trace
     async def _run_push_actions_and_persist_event(
         self, event: EventBase, context: EventContext, backfilled: bool = False
     ) -> None:
@@ -2071,8 +2177,17 @@ class FederationEventHandler:
                     self._message_handler.maybe_schedule_expiry(event)
 
             if not backfilled:  # Never notify for backfilled events
-                for event in events:
-                    await self._notify_persisted_event(event, max_stream_token)
+                with start_active_span("notify_persisted_events"):
+                    set_tag(
+                        SynapseTags.RESULT_PREFIX + "event_ids",
+                        str([ev.event_id for ev in events]),
+                    )
+                    set_tag(
+                        SynapseTags.RESULT_PREFIX + "event_ids.length",
+                        str(len(events)),
+                    )
+                    for event in events:
+                        await self._notify_persisted_event(event, max_stream_token)
 
             return max_stream_token.stream
 
diff --git a/synapse/handlers/identity.py b/synapse/handlers/identity.py
index e5afe84df9..9571d461c8 100644
--- a/synapse/handlers/identity.py
+++ b/synapse/handlers/identity.py
@@ -26,7 +26,6 @@ from synapse.api.errors import (
     SynapseError,
 )
 from synapse.api.ratelimiting import Ratelimiter
-from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.http import RequestTimedOutError
 from synapse.http.client import SimpleHttpClient
 from synapse.http.site import SynapseRequest
@@ -416,48 +415,6 @@ class IdentityHandler:
 
         return session_id
 
-    async def request_email_token(
-        self,
-        id_server: str,
-        email: str,
-        client_secret: str,
-        send_attempt: int,
-        next_link: Optional[str] = None,
-    ) -> JsonDict:
-        """
-        Request an external server send an email on our behalf for the purposes of threepid
-        validation.
-
-        Args:
-            id_server: The identity server to proxy to
-            email: The email to send the message to
-            client_secret: The unique client_secret sends by the user
-            send_attempt: Which attempt this is
-            next_link: A link to redirect the user to once they submit the token
-
-        Returns:
-            The json response body from the server
-        """
-        params = {
-            "email": email,
-            "client_secret": client_secret,
-            "send_attempt": send_attempt,
-        }
-        if next_link:
-            params["next_link"] = next_link
-
-        try:
-            data = await self.http_client.post_json_get_json(
-                id_server + "/_matrix/identity/api/v1/validate/email/requestToken",
-                params,
-            )
-            return data
-        except HttpResponseException as e:
-            logger.info("Proxied requestToken failed: %r", e)
-            raise e.to_synapse_error()
-        except RequestTimedOutError:
-            raise SynapseError(500, "Timed out contacting identity server")
-
     async def requestMsisdnToken(
         self,
         id_server: str,
@@ -531,18 +488,7 @@ class IdentityHandler:
         validation_session = None
 
         # Try to validate as email
-        if self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.REMOTE:
-            # Remote emails will only be used if a valid identity server is provided.
-            assert (
-                self.hs.config.registration.account_threepid_delegate_email is not None
-            )
-
-            # Ask our delegated email identity server
-            validation_session = await self.threepid_from_creds(
-                self.hs.config.registration.account_threepid_delegate_email,
-                threepid_creds,
-            )
-        elif self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL:
+        if self.hs.config.email.can_verify_email:
             # Get a validated session matching these details
             validation_session = await self.store.get_threepid_validation_session(
                 "email", client_secret, sid=sid, validated=True
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 6484e47e5f..860c82c110 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -309,18 +309,18 @@ class InitialSyncHandler:
         if blocked:
             raise SynapseError(403, "This room has been blocked on this server")
 
-        user_id = requester.user.to_string()
-
         (
             membership,
             member_event_id,
         ) = await self.auth.check_user_in_room_or_world_readable(
             room_id,
-            user_id,
+            requester,
             allow_departed_users=True,
         )
         is_peeking = member_event_id is None
 
+        user_id = requester.user.to_string()
+
         if membership == Membership.JOIN:
             result = await self._room_initial_sync_joined(
                 user_id, room_id, pagin_config, membership, is_peeking
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index d77f2c2fb6..ec7c73a758 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -104,7 +104,7 @@ class MessageHandler:
 
     async def get_room_data(
         self,
-        user_id: str,
+        requester: Requester,
         room_id: str,
         event_type: str,
         state_key: str,
@@ -112,7 +112,7 @@ class MessageHandler:
         """Get data from a room.
 
         Args:
-            user_id
+            requester: The user who did the request.
             room_id
             event_type
             state_key
@@ -125,7 +125,7 @@ class MessageHandler:
             membership,
             membership_event_id,
         ) = await self.auth.check_user_in_room_or_world_readable(
-            room_id, user_id, allow_departed_users=True
+            room_id, requester, allow_departed_users=True
         )
 
         if membership == Membership.JOIN:
@@ -161,11 +161,10 @@ class MessageHandler:
 
     async def get_state_events(
         self,
-        user_id: str,
+        requester: Requester,
         room_id: str,
         state_filter: Optional[StateFilter] = None,
         at_token: Optional[StreamToken] = None,
-        is_guest: bool = False,
     ) -> List[dict]:
         """Retrieve all state events for a given room. If the user is
         joined to the room then return the current state. If the user has
@@ -174,14 +173,13 @@ class MessageHandler:
         visible.
 
         Args:
-            user_id: The user requesting state events.
+            requester: The user requesting state events.
             room_id: The room ID to get all state events from.
             state_filter: The state filter used to fetch state from the database.
             at_token: the stream token of the at which we are requesting
                 the stats. If the user is not allowed to view the state as of that
                 stream token, we raise a 403 SynapseError. If None, returns the current
                 state based on the current_state_events table.
-            is_guest: whether this user is a guest
         Returns:
             A list of dicts representing state events. [{}, {}, {}]
         Raises:
@@ -191,6 +189,7 @@ class MessageHandler:
             members of this room.
         """
         state_filter = state_filter or StateFilter.all()
+        user_id = requester.user.to_string()
 
         if at_token:
             last_event_id = (
@@ -223,7 +222,7 @@ class MessageHandler:
                 membership,
                 membership_event_id,
             ) = await self.auth.check_user_in_room_or_world_readable(
-                room_id, user_id, allow_departed_users=True
+                room_id, requester, allow_departed_users=True
             )
 
             if membership == Membership.JOIN:
@@ -317,12 +316,11 @@ class MessageHandler:
         Returns:
             A dict of user_id to profile info
         """
-        user_id = requester.user.to_string()
         if not requester.app_service:
             # We check AS auth after fetching the room membership, as it
             # requires us to pull out all joined members anyway.
             membership, _ = await self.auth.check_user_in_room_or_world_readable(
-                room_id, user_id, allow_departed_users=True
+                room_id, requester, allow_departed_users=True
             )
             if membership != Membership.JOIN:
                 raise SynapseError(
@@ -331,12 +329,19 @@ class MessageHandler:
                     msg="Getting joined members while not being a current member of the room is forbidden.",
                 )
 
-        users_with_profile = await self.store.get_users_in_room_with_profiles(room_id)
+        users_with_profile = (
+            await self._state_storage_controller.get_users_in_room_with_profiles(
+                room_id
+            )
+        )
 
         # If this is an AS, double check that they are allowed to see the members.
         # This can either be because the AS user is in the room or because there
         # is a user in the room that the AS is "interested in"
-        if requester.app_service and user_id not in users_with_profile:
+        if (
+            requester.app_service
+            and requester.user.to_string() not in users_with_profile
+        ):
             for uid in users_with_profile:
                 if requester.app_service.is_interested_in_user(uid):
                     break
@@ -847,8 +852,10 @@ class EventCreationHandler:
     async def _is_server_notices_room(self, room_id: str) -> bool:
         if self.config.servernotices.server_notices_mxid is None:
             return False
-        user_ids = await self.store.get_users_in_room(room_id)
-        return self.config.servernotices.server_notices_mxid in user_ids
+        is_server_notices_room = await self.store.check_local_user_in_room(
+            user_id=self.config.servernotices.server_notices_mxid, room_id=room_id
+        )
+        return is_server_notices_room
 
     async def assert_accepted_privacy_policy(self, requester: Requester) -> None:
         """Check if a user has accepted the privacy policy
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index e1e34e3b16..74e944bce7 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -464,7 +464,7 @@ class PaginationHandler:
                 membership,
                 member_event_id,
             ) = await self.auth.check_user_in_room_or_world_readable(
-                room_id, user_id, allow_departed_users=True
+                room_id, requester, allow_departed_users=True
             )
 
             if pagin_config.direction == "b":
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 741504ba9f..4e575ffbaa 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -2051,8 +2051,7 @@ async def get_interested_remotes(
     )
 
     for room_id, states in room_ids_to_states.items():
-        user_ids = await store.get_users_in_room(room_id)
-        hosts = {get_domain_from_id(user_id) for user_id in user_ids}
+        hosts = await store.get_current_hosts_in_room(room_id)
         for host in hosts:
             hosts_and_states.setdefault(host, set()).update(states)
 
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index c77d181722..20ec22105a 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -29,7 +29,13 @@ from synapse.api.constants import (
     JoinRules,
     LoginType,
 )
-from synapse.api.errors import AuthError, Codes, ConsentNotGivenError, SynapseError
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    ConsentNotGivenError,
+    InvalidClientTokenError,
+    SynapseError,
+)
 from synapse.appservice import ApplicationService
 from synapse.config.server import is_threepid_reserved
 from synapse.http.servlet import assert_params_in_dict
@@ -180,10 +186,7 @@ class RegistrationHandler:
                 )
             if guest_access_token:
                 user_data = await self.auth.get_user_by_access_token(guest_access_token)
-                if (
-                    not user_data.is_guest
-                    or UserID.from_string(user_data.user_id).localpart != localpart
-                ):
+                if not user_data.is_guest or user_data.user.localpart != localpart:
                     raise AuthError(
                         403,
                         "Cannot register taken user ID without valid guest "
@@ -618,7 +621,7 @@ class RegistrationHandler:
         user_id = user.to_string()
         service = self.store.get_app_service_by_token(as_token)
         if not service:
-            raise AuthError(403, "Invalid application service token.")
+            raise InvalidClientTokenError()
         if not service.is_interested_in_user(user_id):
             raise SynapseError(
                 400,
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
index 72d25df8c8..28d7093f08 100644
--- a/synapse/handlers/relations.py
+++ b/synapse/handlers/relations.py
@@ -103,7 +103,7 @@ class RelationsHandler:
 
         # TODO Properly handle a user leaving a room.
         (_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
-            room_id, user_id, allow_departed_users=True
+            room_id, requester, allow_departed_users=True
         )
 
         # This gets the original event and checks that a) the event exists and
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index c5f7e1b286..0970e88ad5 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -722,7 +722,7 @@ class RoomCreationHandler:
             # allow the server notices mxid to create rooms
             is_requester_admin = True
         else:
-            is_requester_admin = await self.auth.is_server_admin(requester.user)
+            is_requester_admin = await self.auth.is_server_admin(requester)
 
         # Let the third party rules modify the room creation config if needed, or abort
         # the room creation entirely with an exception.
@@ -1358,13 +1358,16 @@ class RoomContextHandler:
         """
         user = requester.user
         if use_admin_priviledge:
-            await assert_user_is_admin(self.auth, requester.user)
+            await assert_user_is_admin(self.auth, requester)
 
         before_limit = math.floor(limit / 2.0)
         after_limit = limit - before_limit
 
-        users = await self.store.get_users_in_room(room_id)
-        is_peeking = user.to_string() not in users
+        is_user_in_room = await self.store.check_local_user_in_room(
+            user_id=user.to_string(), room_id=room_id
+        )
+        # The user is peeking if they aren't in the room already
+        is_peeking = not is_user_in_room
 
         async def filter_evts(events: List[EventBase]) -> List[EventBase]:
             if use_admin_priviledge:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 70dc69c809..709682622f 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -179,7 +179,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         """Try and join a room that this server is not in
 
         Args:
-            requester
+            requester: The user making the request, according to the access token.
             remote_room_hosts: List of servers that can be used to join via.
             room_id: Room that we are trying to join
             user: User who is trying to join
@@ -689,7 +689,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 errcode=Codes.BAD_JSON,
             )
 
-        if "avatar_url" in content:
+        if "avatar_url" in content and content.get("avatar_url") is not None:
             if not await self.profile_handler.check_avatar_size_and_mime_type(
                 content["avatar_url"],
             ):
@@ -744,7 +744,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 is_requester_admin = True
 
             else:
-                is_requester_admin = await self.auth.is_server_admin(requester.user)
+                is_requester_admin = await self.auth.is_server_admin(requester)
 
             if not is_requester_admin:
                 if self.config.server.block_non_admin_invites:
@@ -868,7 +868,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 bypass_spam_checker = True
 
             else:
-                bypass_spam_checker = await self.auth.is_server_admin(requester.user)
+                bypass_spam_checker = await self.auth.is_server_admin(requester)
 
             inviter = await self._get_inviter(target.to_string(), room_id)
             if (
@@ -1410,7 +1410,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             ShadowBanError if the requester has been shadow-banned.
         """
         if self.config.server.block_non_admin_invites:
-            is_requester_admin = await self.auth.is_server_admin(requester.user)
+            is_requester_admin = await self.auth.is_server_admin(requester)
             if not is_requester_admin:
                 raise SynapseError(
                     403, "Invites have been disabled on this server", Codes.FORBIDDEN
@@ -1620,8 +1620,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
     async def _is_server_notice_room(self, room_id: str) -> bool:
         if self._server_notices_mxid is None:
             return False
-        user_ids = await self.store.get_users_in_room(room_id)
-        return self._server_notices_mxid in user_ids
+        is_server_notices_room = await self.store.check_local_user_in_room(
+            user_id=self._server_notices_mxid, room_id=room_id
+        )
+        return is_server_notices_room
 
 
 class RoomMemberMasterHandler(RoomMemberHandler):
@@ -1693,7 +1695,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             check_complexity
             and self.hs.config.server.limit_remote_rooms.admins_can_join
         ):
-            check_complexity = not await self.auth.is_server_admin(user)
+            check_complexity = not await self.store.is_server_admin(user)
 
         if check_complexity:
             # Fetch the room complexity
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 3ca01391c9..2d95b1fa24 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -16,9 +16,11 @@ import logging
 from typing import (
     TYPE_CHECKING,
     Any,
+    Collection,
     Dict,
     FrozenSet,
     List,
+    Mapping,
     Optional,
     Sequence,
     Set,
@@ -517,10 +519,17 @@ class SyncHandler:
                 # ensure that we always include current state in the timeline
                 current_state_ids: FrozenSet[str] = frozenset()
                 if any(e.is_state() for e in recents):
+                    # FIXME(faster_joins): We use the partial state here as
+                    # we don't want to block `/sync` on finishing a lazy join.
+                    # Which should be fine once
+                    # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+                    # since we shouldn't reach here anymore?
+                    # Note that we use the current state as a whitelist for filtering
+                    # `recents`, so partial state is only a problem when a membership
+                    # event turns up in `recents` but has not made it into the current
+                    # state.
                     current_state_ids_map = (
-                        await self._state_storage_controller.get_current_state_ids(
-                            room_id
-                        )
+                        await self.store.get_partial_current_state_ids(room_id)
                     )
                     current_state_ids = frozenset(current_state_ids_map.values())
 
@@ -589,7 +598,13 @@ class SyncHandler:
                 if any(e.is_state() for e in loaded_recents):
                     # FIXME(faster_joins): We use the partial state here as
                     # we don't want to block `/sync` on finishing a lazy join.
-                    # Is this the correct way of doing it?
+                    # Which should be fine once
+                    # https://github.com/matrix-org/synapse/issues/12989 is resolved,
+                    # since we shouldn't reach here anymore?
+                    # Note that we use the current state as a whitelist for filtering
+                    # `loaded_recents`, so partial state is only a problem when a
+                    # membership event turns up in `loaded_recents` but has not made it
+                    # into the current state.
                     current_state_ids_map = (
                         await self.store.get_partial_current_state_ids(room_id)
                     )
@@ -637,7 +652,10 @@ class SyncHandler:
         )
 
     async def get_state_after_event(
-        self, event_id: str, state_filter: Optional[StateFilter] = None
+        self,
+        event_id: str,
+        state_filter: Optional[StateFilter] = None,
+        await_full_state: bool = True,
     ) -> StateMap[str]:
         """
         Get the room state after the given event
@@ -645,9 +663,14 @@ class SyncHandler:
         Args:
             event_id: event of interest
             state_filter: The state filter used to fetch state from the database.
+            await_full_state: if `True`, will block if we do not yet have complete state
+                at the event and `state_filter` is not satisfied by partial state.
+                Defaults to `True`.
         """
         state_ids = await self._state_storage_controller.get_state_ids_for_event(
-            event_id, state_filter=state_filter or StateFilter.all()
+            event_id,
+            state_filter=state_filter or StateFilter.all(),
+            await_full_state=await_full_state,
         )
 
         # using get_metadata_for_events here (instead of get_event) sidesteps an issue
@@ -670,6 +693,7 @@ class SyncHandler:
         room_id: str,
         stream_position: StreamToken,
         state_filter: Optional[StateFilter] = None,
+        await_full_state: bool = True,
     ) -> StateMap[str]:
         """Get the room state at a particular stream position
 
@@ -677,6 +701,9 @@ class SyncHandler:
             room_id: room for which to get state
             stream_position: point at which to get state
             state_filter: The state filter used to fetch state from the database.
+            await_full_state: if `True`, will block if we do not yet have complete state
+                at the last event in the room before `stream_position` and
+                `state_filter` is not satisfied by partial state. Defaults to `True`.
         """
         # FIXME: This gets the state at the latest event before the stream ordering,
         # which might not be the same as the "current state" of the room at the time
@@ -688,7 +715,9 @@ class SyncHandler:
 
         if last_event_id:
             state = await self.get_state_after_event(
-                last_event_id, state_filter=state_filter or StateFilter.all()
+                last_event_id,
+                state_filter=state_filter or StateFilter.all(),
+                await_full_state=await_full_state,
             )
 
         else:
@@ -891,7 +920,15 @@ class SyncHandler:
         with Measure(self.clock, "compute_state_delta"):
             # The memberships needed for events in the timeline.
             # Only calculated when `lazy_load_members` is on.
-            members_to_fetch = None
+            members_to_fetch: Optional[Set[str]] = None
+
+            # A dictionary mapping user IDs to the first event in the timeline sent by
+            # them. Only calculated when `lazy_load_members` is on.
+            first_event_by_sender_map: Optional[Dict[str, EventBase]] = None
+
+            # The contribution to the room state from state events in the timeline.
+            # Only contains the last event for any given state key.
+            timeline_state: StateMap[str]
 
             lazy_load_members = sync_config.filter_collection.lazy_load_members()
             include_redundant_members = (
@@ -902,10 +939,23 @@ class SyncHandler:
                 # We only request state for the members needed to display the
                 # timeline:
 
-                members_to_fetch = {
-                    event.sender  # FIXME: we also care about invite targets etc.
-                    for event in batch.events
-                }
+                timeline_state = {}
+
+                members_to_fetch = set()
+                first_event_by_sender_map = {}
+                for event in batch.events:
+                    # Build the map from user IDs to the first timeline event they sent.
+                    if event.sender not in first_event_by_sender_map:
+                        first_event_by_sender_map[event.sender] = event
+
+                    # We need the event's sender, unless their membership was in a
+                    # previous timeline event.
+                    if (EventTypes.Member, event.sender) not in timeline_state:
+                        members_to_fetch.add(event.sender)
+                    # FIXME: we also care about invite targets etc.
+
+                    if event.is_state():
+                        timeline_state[(event.type, event.state_key)] = event.event_id
 
                 if full_state:
                     # always make sure we LL ourselves so we know we're in the room
@@ -915,16 +965,21 @@ class SyncHandler:
                     members_to_fetch.add(sync_config.user.to_string())
 
                 state_filter = StateFilter.from_lazy_load_member_list(members_to_fetch)
+
+                # We are happy to use partial state to compute the `/sync` response.
+                # Since partial state may not include the lazy-loaded memberships we
+                # require, we fix up the state response afterwards with memberships from
+                # auth events.
+                await_full_state = False
             else:
-                state_filter = StateFilter.all()
+                timeline_state = {
+                    (event.type, event.state_key): event.event_id
+                    for event in batch.events
+                    if event.is_state()
+                }
 
-            # The contribution to the room state from state events in the timeline.
-            # Only contains the last event for any given state key.
-            timeline_state = {
-                (event.type, event.state_key): event.event_id
-                for event in batch.events
-                if event.is_state()
-            }
+                state_filter = StateFilter.all()
+                await_full_state = True
 
             # Now calculate the state to return in the sync response for the room.
             # This is more or less the change in state between the end of the previous
@@ -936,19 +991,26 @@ class SyncHandler:
                 if batch:
                     state_at_timeline_end = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[-1].event_id, state_filter=state_filter
+                            batch.events[-1].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
 
                     state_at_timeline_start = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[0].event_id, state_filter=state_filter
+                            batch.events[0].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
 
                 else:
                     state_at_timeline_end = await self.get_state_at(
-                        room_id, stream_position=now_token, state_filter=state_filter
+                        room_id,
+                        stream_position=now_token,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
                     )
 
                     state_at_timeline_start = state_at_timeline_end
@@ -964,14 +1026,19 @@ class SyncHandler:
                 if batch:
                     state_at_timeline_start = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[0].event_id, state_filter=state_filter
+                            batch.events[0].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
                 else:
                     # We can get here if the user has ignored the senders of all
                     # the recent events.
                     state_at_timeline_start = await self.get_state_at(
-                        room_id, stream_position=now_token, state_filter=state_filter
+                        room_id,
+                        stream_position=now_token,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
                     )
 
                 # for now, we disable LL for gappy syncs - see
@@ -993,20 +1060,28 @@ class SyncHandler:
                 # is indeed the case.
                 assert since_token is not None
                 state_at_previous_sync = await self.get_state_at(
-                    room_id, stream_position=since_token, state_filter=state_filter
+                    room_id,
+                    stream_position=since_token,
+                    state_filter=state_filter,
+                    await_full_state=await_full_state,
                 )
 
                 if batch:
                     state_at_timeline_end = (
                         await self._state_storage_controller.get_state_ids_for_event(
-                            batch.events[-1].event_id, state_filter=state_filter
+                            batch.events[-1].event_id,
+                            state_filter=state_filter,
+                            await_full_state=await_full_state,
                         )
                     )
                 else:
                     # We can get here if the user has ignored the senders of all
                     # the recent events.
                     state_at_timeline_end = await self.get_state_at(
-                        room_id, stream_position=now_token, state_filter=state_filter
+                        room_id,
+                        stream_position=now_token,
+                        state_filter=state_filter,
+                        await_full_state=await_full_state,
                     )
 
                 state_ids = _calculate_state(
@@ -1036,8 +1111,23 @@ class SyncHandler:
                                 (EventTypes.Member, member)
                                 for member in members_to_fetch
                             ),
+                            await_full_state=False,
                         )
 
+            # If we only have partial state for the room, `state_ids` may be missing the
+            # memberships we wanted. We attempt to find some by digging through the auth
+            # events of timeline events.
+            if lazy_load_members and await self.store.is_partial_state_room(room_id):
+                assert members_to_fetch is not None
+                assert first_event_by_sender_map is not None
+
+                additional_state_ids = (
+                    await self._find_missing_partial_state_memberships(
+                        room_id, members_to_fetch, first_event_by_sender_map, state_ids
+                    )
+                )
+                state_ids = {**state_ids, **additional_state_ids}
+
             # At this point, if `lazy_load_members` is enabled, `state_ids` includes
             # the memberships of all event senders in the timeline. This is because we
             # may not have sent the memberships in a previous sync.
@@ -1086,6 +1176,99 @@ class SyncHandler:
             if e.type != EventTypes.Aliases  # until MSC2261 or alternative solution
         }
 
+    async def _find_missing_partial_state_memberships(
+        self,
+        room_id: str,
+        members_to_fetch: Collection[str],
+        events_with_membership_auth: Mapping[str, EventBase],
+        found_state_ids: StateMap[str],
+    ) -> StateMap[str]:
+        """Finds missing memberships from a set of auth events and returns them as a
+        state map.
+
+        Args:
+            room_id: The partial state room to find the remaining memberships for.
+            members_to_fetch: The memberships to find.
+            events_with_membership_auth: A mapping from user IDs to events whose auth
+                events are known to contain their membership.
+            found_state_ids: A dict from (type, state_key) -> state_event_id, containing
+                memberships that have been previously found. Entries in
+                `members_to_fetch` that have a membership in `found_state_ids` are
+                ignored.
+
+        Returns:
+            A dict from ("m.room.member", state_key) -> state_event_id, containing the
+            memberships missing from `found_state_ids`.
+
+        Raises:
+            KeyError: if `events_with_membership_auth` does not have an entry for a
+                missing membership. Memberships in `found_state_ids` do not need an
+                entry in `events_with_membership_auth`.
+        """
+        additional_state_ids: MutableStateMap[str] = {}
+
+        # Tracks the missing members for logging purposes.
+        missing_members = set()
+
+        # Identify memberships missing from `found_state_ids` and pick out the auth
+        # events in which to look for them.
+        auth_event_ids: Set[str] = set()
+        for member in members_to_fetch:
+            if (EventTypes.Member, member) in found_state_ids:
+                continue
+
+            missing_members.add(member)
+            event_with_membership_auth = events_with_membership_auth[member]
+            auth_event_ids.update(event_with_membership_auth.auth_event_ids())
+
+        auth_events = await self.store.get_events(auth_event_ids)
+
+        # Run through the missing memberships once more, picking out the memberships
+        # from the pile of auth events we have just fetched.
+        for member in members_to_fetch:
+            if (EventTypes.Member, member) in found_state_ids:
+                continue
+
+            event_with_membership_auth = events_with_membership_auth[member]
+
+            # Dig through the auth events to find the desired membership.
+            for auth_event_id in event_with_membership_auth.auth_event_ids():
+                # We only store events once we have all their auth events,
+                # so the auth event must be in the pile we have just
+                # fetched.
+                auth_event = auth_events[auth_event_id]
+
+                if (
+                    auth_event.type == EventTypes.Member
+                    and auth_event.state_key == member
+                ):
+                    missing_members.remove(member)
+                    additional_state_ids[
+                        (EventTypes.Member, member)
+                    ] = auth_event.event_id
+                    break
+
+        if missing_members:
+            # There really shouldn't be any missing memberships now. Either:
+            #  * we couldn't find an auth event, which shouldn't happen because we do
+            #    not persist events with persisting their auth events first, or
+            #  * the set of auth events did not contain a membership we wanted, which
+            #    means our caller didn't compute the events in `members_to_fetch`
+            #    correctly, or we somehow accepted an event whose auth events were
+            #    dodgy.
+            logger.error(
+                "Failed to find memberships for %s in partial state room "
+                "%s in the auth events of %s.",
+                missing_members,
+                room_id,
+                [
+                    events_with_membership_auth[member].event_id
+                    for member in missing_members
+                ],
+            )
+
+        return additional_state_ids
+
     async def unread_notifs_for_room_id(
         self, room_id: str, sync_config: SyncConfig
     ) -> NotifCounts:
@@ -1730,7 +1913,11 @@ class SyncHandler:
                 continue
 
             if room_id in sync_result_builder.joined_room_ids or has_join:
-                old_state_ids = await self.get_state_at(room_id, since_token)
+                old_state_ids = await self.get_state_at(
+                    room_id,
+                    since_token,
+                    state_filter=StateFilter.from_types([(EventTypes.Member, user_id)]),
+                )
                 old_mem_ev_id = old_state_ids.get((EventTypes.Member, user_id), None)
                 old_mem_ev = None
                 if old_mem_ev_id:
@@ -1756,7 +1943,13 @@ class SyncHandler:
                     newly_left_rooms.append(room_id)
                 else:
                     if not old_state_ids:
-                        old_state_ids = await self.get_state_at(room_id, since_token)
+                        old_state_ids = await self.get_state_at(
+                            room_id,
+                            since_token,
+                            state_filter=StateFilter.from_types(
+                                [(EventTypes.Member, user_id)]
+                            ),
+                        )
                         old_mem_ev_id = old_state_ids.get(
                             (EventTypes.Member, user_id), None
                         )
@@ -2228,10 +2421,10 @@ class SyncHandler:
                     joined_room.room_id, joined_room.event_pos.stream
                 )
             )
-            users_in_room = await self.state.get_current_users_in_room(
+            user_ids_in_room = await self.state.get_current_user_ids_in_room(
                 joined_room.room_id, extrems
             )
-            if user_id in users_in_room:
+            if user_id in user_ids_in_room:
                 joined_room_ids.add(joined_room.room_id)
 
         return frozenset(joined_room_ids)
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 27aa0d3126..a4cd8b8f0c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -26,7 +26,7 @@ from synapse.metrics.background_process_metrics import (
 )
 from synapse.replication.tcp.streams import TypingStream
 from synapse.streams import EventSource
-from synapse.types import JsonDict, Requester, StreamKeyType, UserID, get_domain_from_id
+from synapse.types import JsonDict, Requester, StreamKeyType, UserID
 from synapse.util.caches.stream_change_cache import StreamChangeCache
 from synapse.util.metrics import Measure
 from synapse.util.wheel_timer import WheelTimer
@@ -253,12 +253,11 @@ class TypingWriterHandler(FollowerTypingHandler):
         self, target_user: UserID, requester: Requester, room_id: str, timeout: int
     ) -> None:
         target_user_id = target_user.to_string()
-        auth_user_id = requester.user.to_string()
 
         if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this homeserver")
 
-        if target_user_id != auth_user_id:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's typing state")
 
         if requester.shadow_banned:
@@ -266,7 +265,7 @@ class TypingWriterHandler(FollowerTypingHandler):
             await self.clock.sleep(random.randint(1, 10))
             raise ShadowBanError()
 
-        await self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, requester)
 
         logger.debug("%s has started typing in %s", target_user_id, room_id)
 
@@ -289,12 +288,11 @@ class TypingWriterHandler(FollowerTypingHandler):
         self, target_user: UserID, requester: Requester, room_id: str
     ) -> None:
         target_user_id = target_user.to_string()
-        auth_user_id = requester.user.to_string()
 
         if not self.is_mine_id(target_user_id):
             raise SynapseError(400, "User is not hosted on this homeserver")
 
-        if target_user_id != auth_user_id:
+        if target_user != requester.user:
             raise AuthError(400, "Cannot set another user's typing state")
 
         if requester.shadow_banned:
@@ -302,7 +300,7 @@ class TypingWriterHandler(FollowerTypingHandler):
             await self.clock.sleep(random.randint(1, 10))
             raise ShadowBanError()
 
-        await self.auth.check_user_in_room(room_id, target_user_id)
+        await self.auth.check_user_in_room(room_id, requester)
 
         logger.debug("%s has stopped typing in %s", target_user_id, room_id)
 
@@ -364,8 +362,9 @@ class TypingWriterHandler(FollowerTypingHandler):
             )
             return
 
-        users = await self.store.get_users_in_room(room_id)
-        domains = {get_domain_from_id(u) for u in users}
+        domains = await self._storage_controllers.state.get_current_hosts_in_room(
+            room_id
+        )
 
         if self.server_name in domains:
             logger.info("Got typing update from %s: %r", user_id, content)
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 05cebb5d4d..a744d68c64 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -19,7 +19,6 @@ from twisted.web.client import PartialDownloadError
 
 from synapse.api.constants import LoginType
 from synapse.api.errors import Codes, LoginError, SynapseError
-from synapse.config.emailconfig import ThreepidBehaviour
 from synapse.util import json_decoder
 
 if TYPE_CHECKING:
@@ -153,7 +152,7 @@ class _BaseThreepidAuthChecker:
 
         logger.info("Getting validated threepid. threepidcreds: %r", (threepid_creds,))
 
-        # msisdns are currently always ThreepidBehaviour.REMOTE
+        # msisdns are currently always verified via the IS
         if medium == "msisdn":
             if not self.hs.config.registration.account_threepid_delegate_msisdn:
                 raise SynapseError(
@@ -164,18 +163,7 @@ class _BaseThreepidAuthChecker:
                 threepid_creds,
             )
         elif medium == "email":
-            if (
-                self.hs.config.email.threepid_behaviour_email
-                == ThreepidBehaviour.REMOTE
-            ):
-                assert self.hs.config.registration.account_threepid_delegate_email
-                threepid = await identity_handler.threepid_from_creds(
-                    self.hs.config.registration.account_threepid_delegate_email,
-                    threepid_creds,
-                )
-            elif (
-                self.hs.config.email.threepid_behaviour_email == ThreepidBehaviour.LOCAL
-            ):
+            if self.hs.config.email.can_verify_email:
                 threepid = None
                 row = await self.store.get_threepid_validation_session(
                     medium,
@@ -227,10 +215,7 @@ class EmailIdentityAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChec
         _BaseThreepidAuthChecker.__init__(self, hs)
 
     def is_enabled(self) -> bool:
-        return self.hs.config.email.threepid_behaviour_email in (
-            ThreepidBehaviour.REMOTE,
-            ThreepidBehaviour.LOCAL,
-        )
+        return self.hs.config.email.can_verify_email
 
     async def check_auth(self, authdict: dict, clientip: str) -> Any:
         return await self._check_threepid("email", authdict)