summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--synapse/handlers/account.py11
-rw-r--r--synapse/handlers/appservice.py4
-rw-r--r--synapse/handlers/deactivate_account.py12
-rw-r--r--synapse/handlers/device.py4
-rw-r--r--synapse/handlers/directory.py6
-rw-r--r--synapse/handlers/events.py3
-rw-r--r--synapse/handlers/federation.py121
-rw-r--r--synapse/handlers/initial_sync.py14
-rw-r--r--synapse/handlers/message.py71
-rw-r--r--synapse/handlers/oidc.py18
-rw-r--r--synapse/handlers/pagination.py25
-rw-r--r--synapse/handlers/presence.py6
-rw-r--r--synapse/handlers/profile.py6
-rw-r--r--synapse/handlers/receipts.py2
-rw-r--r--synapse/handlers/relations.py271
-rw-r--r--synapse/handlers/room.py13
-rw-r--r--synapse/handlers/room_batch.py112
-rw-r--r--synapse/handlers/room_member.py33
-rw-r--r--synapse/handlers/room_summary.py15
-rw-r--r--synapse/handlers/search.py3
-rw-r--r--synapse/handlers/sso.py8
-rw-r--r--synapse/handlers/sync.py39
-rw-r--r--synapse/handlers/typing.py9
-rw-r--r--synapse/handlers/user_directory.py4
24 files changed, 579 insertions, 231 deletions
diff --git a/synapse/handlers/account.py b/synapse/handlers/account.py
index d5badf635b..c05a14304c 100644
--- a/synapse/handlers/account.py
+++ b/synapse/handlers/account.py
@@ -26,6 +26,10 @@ class AccountHandler:
         self._main_store = hs.get_datastores().main
         self._is_mine = hs.is_mine
         self._federation_client = hs.get_federation_client()
+        self._use_account_validity_in_account_status = (
+            hs.config.server.use_account_validity_in_account_status
+        )
+        self._account_validity_handler = hs.get_account_validity_handler()
 
     async def get_account_statuses(
         self,
@@ -106,6 +110,13 @@ class AccountHandler:
                 "deactivated": userinfo.is_deactivated,
             }
 
+            if self._use_account_validity_in_account_status:
+                status[
+                    "org.matrix.expired"
+                ] = await self._account_validity_handler.is_user_expired(
+                    user_id.to_string()
+                )
+
         return status
 
     async def _get_remote_account_statuses(
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index e6461cc3c9..bd913e524e 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -571,7 +571,7 @@ class ApplicationServicesHandler:
         room_alias_str = room_alias.to_string()
         services = self.store.get_app_services()
         alias_query_services = [
-            s for s in services if (s.is_interested_in_alias(room_alias_str))
+            s for s in services if (s.is_room_alias_in_namespace(room_alias_str))
         ]
         for alias_service in alias_query_services:
             is_known_alias = await self.appservice_api.query_alias(
@@ -660,7 +660,7 @@ class ApplicationServicesHandler:
         # inside of a list comprehension anymore.
         interested_list = []
         for s in services:
-            if await s.is_interested(event, self.store):
+            if await s.is_interested_in_event(event.event_id, event, self.store):
                 interested_list.append(s)
 
         return interested_list
diff --git a/synapse/handlers/deactivate_account.py b/synapse/handlers/deactivate_account.py
index 76ae768e6e..816e1a6d79 100644
--- a/synapse/handlers/deactivate_account.py
+++ b/synapse/handlers/deactivate_account.py
@@ -17,7 +17,7 @@ from typing import TYPE_CHECKING, Optional
 
 from synapse.api.errors import SynapseError
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import Requester, UserID, create_requester
+from synapse.types import Codes, Requester, UserID, create_requester
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -42,6 +42,7 @@ class DeactivateAccountHandler:
 
         # Flag that indicates whether the process to part users from rooms is running
         self._user_parter_running = False
+        self._third_party_rules = hs.get_third_party_event_rules()
 
         # Start the user parter loop so it can resume parting users from rooms where
         # it left off (if it has work left to do).
@@ -74,6 +75,15 @@ class DeactivateAccountHandler:
         Returns:
             True if identity server supports removing threepids, otherwise False.
         """
+
+        # Check if this user can be deactivated
+        if not await self._third_party_rules.check_can_deactivate_user(
+            user_id, by_admin
+        ):
+            raise SynapseError(
+                403, "Deactivation of this user is forbidden", Codes.FORBIDDEN
+            )
+
         # FIXME: Theoretically there is a race here wherein user resets
         # password using threepid.
 
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index 934b5bd734..d5ccaa0c37 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -371,7 +371,6 @@ class DeviceHandler(DeviceWorkerHandler):
                 log_kv(
                     {"reason": "User doesn't have device id.", "device_id": device_id}
                 )
-                pass
             else:
                 raise
 
@@ -414,7 +413,6 @@ class DeviceHandler(DeviceWorkerHandler):
                 # no match
                 set_tag("error", True)
                 set_tag("reason", "User doesn't have that device id.")
-                pass
             else:
                 raise
 
@@ -506,7 +504,7 @@ class DeviceHandler(DeviceWorkerHandler):
                 "Sending device list update notif for %r to: %r", user_id, hosts
             )
             for host in hosts:
-                self.federation_sender.send_device_messages(host)
+                self.federation_sender.send_device_messages(host, immediate=False)
                 log_kv({"message": "sent device update to host", "host": host})
 
     async def notify_user_signature_update(
diff --git a/synapse/handlers/directory.py b/synapse/handlers/directory.py
index b7064c6624..33d827a45b 100644
--- a/synapse/handlers/directory.py
+++ b/synapse/handlers/directory.py
@@ -119,7 +119,7 @@ class DirectoryHandler:
 
         service = requester.app_service
         if service:
-            if not service.is_interested_in_alias(room_alias_str):
+            if not service.is_room_alias_in_namespace(room_alias_str):
                 raise SynapseError(
                     400,
                     "This application service has not reserved this kind of alias.",
@@ -221,7 +221,7 @@ class DirectoryHandler:
     async def delete_appservice_association(
         self, service: ApplicationService, room_alias: RoomAlias
     ) -> None:
-        if not service.is_interested_in_alias(room_alias.to_string()):
+        if not service.is_room_alias_in_namespace(room_alias.to_string()):
             raise SynapseError(
                 400,
                 "This application service has not reserved this kind of alias",
@@ -376,7 +376,7 @@ class DirectoryHandler:
         # non-exclusive locks on the alias (or there are no interested services)
         services = self.store.get_app_services()
         interested_services = [
-            s for s in services if s.is_interested_in_alias(alias.to_string())
+            s for s in services if s.is_room_alias_in_namespace(alias.to_string())
         ]
 
         for service in interested_services:
diff --git a/synapse/handlers/events.py b/synapse/handlers/events.py
index 97e75e60c3..d2ccb5c5d3 100644
--- a/synapse/handlers/events.py
+++ b/synapse/handlers/events.py
@@ -19,6 +19,7 @@ from typing import TYPE_CHECKING, Iterable, List, Optional
 from synapse.api.constants import EduTypes, EventTypes, Membership
 from synapse.api.errors import AuthError, SynapseError
 from synapse.events import EventBase
+from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.presence import format_user_presence_state
 from synapse.streams.config import PaginationConfig
 from synapse.types import JsonDict, UserID
@@ -120,7 +121,7 @@ class EventStreamHandler:
             chunks = self._event_serializer.serialize_events(
                 events,
                 time_now,
-                as_client_event=as_client_event,
+                config=SerializeEventConfig(as_client_event=as_client_event),
             )
 
             chunk = {
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index eb03a5accb..350ec9c03a 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -23,8 +23,6 @@ from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
 
-from twisted.internet import defer
-
 from synapse import event_auth
 from synapse.api.constants import EventContentFields, EventTypes, Membership
 from synapse.api.errors import (
@@ -45,11 +43,7 @@ from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.http.servlet import assert_params_in_dict
-from synapse.logging.context import (
-    make_deferred_yieldable,
-    nested_logging_context,
-    preserve_fn,
-)
+from synapse.logging.context import nested_logging_context
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
@@ -355,56 +349,8 @@ class FederationHandler:
         if success:
             return True
 
-        # Huh, well *those* domains didn't work out. Lets try some domains
-        # from the time.
-
-        tried_domains = set(likely_domains)
-        tried_domains.add(self.server_name)
-
-        event_ids = list(extremities.keys())
-
-        logger.debug("calling resolve_state_groups in _maybe_backfill")
-        resolve = preserve_fn(self.state_handler.resolve_state_groups_for_events)
-        states_list = await make_deferred_yieldable(
-            defer.gatherResults(
-                [resolve(room_id, [e]) for e in event_ids], consumeErrors=True
-            )
-        )
-
-        # A map from event_id to state map of event_ids.
-        state_ids: Dict[str, StateMap[str]] = dict(
-            zip(event_ids, [s.state for s in states_list])
-        )
-
-        state_map = await self.store.get_events(
-            [e_id for ids in state_ids.values() for e_id in ids.values()],
-            get_prev_content=False,
-        )
-
-        # A map from event_id to state map of events.
-        state_events: Dict[str, StateMap[EventBase]] = {
-            key: {
-                k: state_map[e_id]
-                for k, e_id in state_dict.items()
-                if e_id in state_map
-            }
-            for key, state_dict in state_ids.items()
-        }
-
-        for e_id in event_ids:
-            likely_extremeties_domains = get_domains_from_state(state_events[e_id])
-
-            success = await try_backfill(
-                [
-                    dom
-                    for dom, _ in likely_extremeties_domains
-                    if dom not in tried_domains
-                ]
-            )
-            if success:
-                return True
-
-            tried_domains.update(dom for dom, _ in likely_extremeties_domains)
+        # TODO: we could also try servers which were previously in the room, but
+        #   are no longer.
 
         return False
 
@@ -1004,54 +950,35 @@ class FederationHandler:
 
         return event
 
-    async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
-        """Returns the state at the event. i.e. not including said event."""
-
-        event = await self.store.get_event(event_id, check_room_id=room_id)
-
-        state_groups = await self.state_store.get_state_groups(room_id, [event_id])
-
-        if state_groups:
-            _, state = list(state_groups.items()).pop()
-            results = {(e.type, e.state_key): e for e in state}
-
-            if event.is_state():
-                # Get previous state
-                if "replaces_state" in event.unsigned:
-                    prev_id = event.unsigned["replaces_state"]
-                    if prev_id != event.event_id:
-                        prev_event = await self.store.get_event(prev_id)
-                        results[(event.type, event.state_key)] = prev_event
-                else:
-                    del results[(event.type, event.state_key)]
-
-            res = list(results.values())
-            return res
-        else:
-            return []
-
     async def get_state_ids_for_pdu(self, room_id: str, event_id: str) -> List[str]:
         """Returns the state at the event. i.e. not including said event."""
         event = await self.store.get_event(event_id, check_room_id=room_id)
+        if event.internal_metadata.outlier:
+            raise NotFoundError("State not known at event %s" % (event_id,))
 
         state_groups = await self.state_store.get_state_groups_ids(room_id, [event_id])
 
-        if state_groups:
-            _, state = list(state_groups.items()).pop()
-            results = state
+        # get_state_groups_ids should return exactly one result
+        assert len(state_groups) == 1
 
-            if event.is_state():
-                # Get previous state
-                if "replaces_state" in event.unsigned:
-                    prev_id = event.unsigned["replaces_state"]
-                    if prev_id != event.event_id:
-                        results[(event.type, event.state_key)] = prev_id
-                else:
-                    results.pop((event.type, event.state_key), None)
+        state_map = next(iter(state_groups.values()))
 
-            return list(results.values())
-        else:
-            return []
+        state_key = event.get_state_key()
+        if state_key is not None:
+            # the event was not rejected (get_event raises a NotFoundError for rejected
+            # events) so the state at the event should include the event itself.
+            assert (
+                state_map.get((event.type, state_key)) == event.event_id
+            ), "State at event did not include event itself"
+
+            # ... but we need the state *before* that event
+            if "replaces_state" in event.unsigned:
+                prev_id = event.unsigned["replaces_state"]
+                state_map[(event.type, state_key)] = prev_id
+            else:
+                del state_map[(event.type, state_key)]
+
+        return list(state_map.values())
 
     async def on_backfill_request(
         self, origin: str, room_id: str, pdu_list: List[str], limit: int
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index 344f20f37c..a7db8feb57 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -18,6 +18,7 @@ from typing import TYPE_CHECKING, List, Optional, Tuple, cast
 from synapse.api.constants import EduTypes, EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.events import EventBase
+from synapse.events.utils import SerializeEventConfig
 from synapse.events.validator import EventValidator
 from synapse.handlers.presence import format_user_presence_state
 from synapse.handlers.receipts import ReceiptEventSource
@@ -152,10 +153,13 @@ class InitialSyncHandler:
 
         public_room_ids = await self.store.get_public_room_ids()
 
-        limit = pagin_config.limit
-        if limit is None:
+        if pagin_config.limit is not None:
+            limit = pagin_config.limit
+        else:
             limit = 10
 
+        serializer_options = SerializeEventConfig(as_client_event=as_client_event)
+
         async def handle_room(event: RoomsForUser) -> None:
             d: JsonDict = {
                 "room_id": event.room_id,
@@ -173,7 +177,7 @@ class InitialSyncHandler:
                 d["invite"] = self._event_serializer.serialize_event(
                     invite_event,
                     time_now,
-                    as_client_event=as_client_event,
+                    config=serializer_options,
                 )
 
             rooms_ret.append(d)
@@ -225,7 +229,7 @@ class InitialSyncHandler:
                         self._event_serializer.serialize_events(
                             messages,
                             time_now=time_now,
-                            as_client_event=as_client_event,
+                            config=serializer_options,
                         )
                     ),
                     "start": await start_token.to_string(self.store),
@@ -235,7 +239,7 @@ class InitialSyncHandler:
                 d["state"] = self._event_serializer.serialize_events(
                     current_state.values(),
                     time_now=time_now,
-                    as_client_event=as_client_event,
+                    config=serializer_options,
                 )
 
                 account_data_events = []
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 61cb133ef2..1c4fb4360a 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -493,6 +493,7 @@ class EventCreationHandler:
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
+        state_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
         outlier: bool = False,
         historical: bool = False,
@@ -527,6 +528,15 @@ class EventCreationHandler:
 
                 If non-None, prev_event_ids must also be provided.
 
+            state_event_ids:
+                The full state at a given event. This is used particularly by the MSC2716
+                /batch_send endpoint. One use case is with insertion events which float at
+                the beginning of a historical batch and don't have any `prev_events` to
+                derive from; we add all of these state events as the explicit state so the
+                rest of the historical batch can inherit the same state and state_group.
+                This should normally be left as None, which will cause the auth_event_ids
+                to be calculated based on the room state at the prev_events.
+
             require_consent: Whether to check if the requester has
                 consented to the privacy policy.
 
@@ -612,6 +622,7 @@ class EventCreationHandler:
             allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
+            state_event_ids=state_event_ids,
             depth=depth,
         )
 
@@ -772,6 +783,7 @@ class EventCreationHandler:
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
+        state_event_ids: Optional[List[str]] = None,
         ratelimit: bool = True,
         txn_id: Optional[str] = None,
         ignore_shadow_ban: bool = False,
@@ -801,6 +813,14 @@ class EventCreationHandler:
                 based on the room state at the prev_events.
 
                 If non-None, prev_event_ids must also be provided.
+            state_event_ids:
+                The full state at a given event. This is used particularly by the MSC2716
+                /batch_send endpoint. One use case is with insertion events which float at
+                the beginning of a historical batch and don't have any `prev_events` to
+                derive from; we add all of these state events as the explicit state so the
+                rest of the historical batch can inherit the same state and state_group.
+                This should normally be left as None, which will cause the auth_event_ids
+                to be calculated based on the room state at the prev_events.
             ratelimit: Whether to rate limit this send.
             txn_id: The transaction ID.
             ignore_shadow_ban: True if shadow-banned users should be allowed to
@@ -856,8 +876,10 @@ class EventCreationHandler:
                 requester,
                 event_dict,
                 txn_id=txn_id,
+                allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
+                state_event_ids=state_event_ids,
                 outlier=outlier,
                 historical=historical,
                 depth=depth,
@@ -893,6 +915,7 @@ class EventCreationHandler:
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
+        state_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
@@ -915,6 +938,15 @@ class EventCreationHandler:
                 Should normally be left as None, which will cause them to be calculated
                 based on the room state at the prev_events.
 
+            state_event_ids:
+                The full state at a given event. This is used particularly by the MSC2716
+                /batch_send endpoint. One use case is with insertion events which float at
+                the beginning of a historical batch and don't have any `prev_events` to
+                derive from; we add all of these state events as the explicit state so the
+                rest of the historical batch can inherit the same state and state_group.
+                This should normally be left as None, which will cause the auth_event_ids
+                to be calculated based on the room state at the prev_events.
+
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
@@ -922,31 +954,26 @@ class EventCreationHandler:
         Returns:
             Tuple of created event, context
         """
-        # Strip down the auth_event_ids to only what we need to auth the event.
+        # Strip down the state_event_ids to only what we need to auth the event.
         # For example, we don't need extra m.room.member that don't match event.sender
-        full_state_ids_at_event = None
-        if auth_event_ids is not None:
-            # If auth events are provided, prev events must be also.
+        if state_event_ids is not None:
+            # Do a quick check to make sure that prev_event_ids is present to
+            # make the type-checking around `builder.build` happy.
             # prev_event_ids could be an empty array though.
             assert prev_event_ids is not None
 
-            # Copy the full auth state before it stripped down
-            full_state_ids_at_event = auth_event_ids.copy()
-
             temp_event = await builder.build(
                 prev_event_ids=prev_event_ids,
-                auth_event_ids=auth_event_ids,
+                auth_event_ids=state_event_ids,
                 depth=depth,
             )
-            auth_events = await self.store.get_events_as_list(auth_event_ids)
+            state_events = await self.store.get_events_as_list(state_event_ids)
             # Create a StateMap[str]
-            auth_event_state_map = {
-                (e.type, e.state_key): e.event_id for e in auth_events
-            }
-            # Actually strip down and use the necessary auth events
+            state_map = {(e.type, e.state_key): e.event_id for e in state_events}
+            # Actually strip down and only use the necessary auth events
             auth_event_ids = self._event_auth_handler.compute_auth_events(
                 event=temp_event,
-                current_state_ids=auth_event_state_map,
+                current_state_ids=state_map,
                 for_verification=False,
             )
 
@@ -989,12 +1016,16 @@ class EventCreationHandler:
             context = EventContext.for_outlier()
         elif (
             event.type == EventTypes.MSC2716_INSERTION
-            and full_state_ids_at_event
+            and state_event_ids
             and builder.internal_metadata.is_historical()
         ):
+            # Add explicit state to the insertion event so it has state to derive
+            # from even though it's floating with no `prev_events`. The rest of
+            # the batch can derive from this state and state_group.
+            #
             # TODO(faster_joins): figure out how this works, and make sure that the
             #   old state is complete.
-            old_state = await self.store.get_events_as_list(full_state_ids_at_event)
+            old_state = await self.store.get_events_as_list(state_event_ids)
             context = await self.state.compute_event_context(event, old_state=old_state)
         else:
             context = await self.state.compute_event_context(event)
@@ -1069,6 +1100,9 @@ class EventCreationHandler:
         if relation_type == RelationTypes.ANNOTATION:
             aggregation_key = relation["key"]
 
+            if len(aggregation_key) > 500:
+                raise SynapseError(400, "Aggregation key is too long")
+
             already_exists = await self.store.has_user_annotated_event(
                 relates_to, event.type, aggregation_key, event.sender
             )
@@ -1076,7 +1110,10 @@ class EventCreationHandler:
                 raise SynapseError(400, "Can't send same reaction twice")
 
         # Don't attempt to start a thread if the parent event is a relation.
-        elif relation_type == RelationTypes.THREAD:
+        elif (
+            relation_type == RelationTypes.THREAD
+            or relation_type == RelationTypes.UNSTABLE_THREAD
+        ):
             if await self.store.event_includes_relation(relates_to):
                 raise SynapseError(
                     400, "Cannot start threads from an event with a relation"
diff --git a/synapse/handlers/oidc.py b/synapse/handlers/oidc.py
index 593a2aac66..724b9cfcb4 100644
--- a/synapse/handlers/oidc.py
+++ b/synapse/handlers/oidc.py
@@ -45,6 +45,7 @@ from synapse.types import JsonDict, UserID, map_username_to_mxid_localpart
 from synapse.util import Clock, json_decoder
 from synapse.util.caches.cached_call import RetryOnExceptionCachedCall
 from synapse.util.macaroons import get_value_from_macaroon, satisfy_expiry
+from synapse.util.templates import _localpart_from_email_filter
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -1228,6 +1229,7 @@ class OidcSessionData:
 
 class UserAttributeDict(TypedDict):
     localpart: Optional[str]
+    confirm_localpart: bool
     display_name: Optional[str]
     emails: List[str]
 
@@ -1307,6 +1309,11 @@ def jinja_finalize(thing: Any) -> Any:
 
 
 env = Environment(finalize=jinja_finalize)
+env.filters.update(
+    {
+        "localpart_from_email": _localpart_from_email_filter,
+    }
+)
 
 
 @attr.s(slots=True, frozen=True, auto_attribs=True)
@@ -1316,6 +1323,7 @@ class JinjaOidcMappingConfig:
     display_name_template: Optional[Template]
     email_template: Optional[Template]
     extra_attributes: Dict[str, Template]
+    confirm_localpart: bool = False
 
 
 class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
@@ -1357,12 +1365,17 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
                         "invalid jinja template", path=["extra_attributes", key]
                     ) from e
 
+        confirm_localpart = config.get("confirm_localpart") or False
+        if not isinstance(confirm_localpart, bool):
+            raise ConfigError("must be a bool", path=["confirm_localpart"])
+
         return JinjaOidcMappingConfig(
             subject_claim=subject_claim,
             localpart_template=localpart_template,
             display_name_template=display_name_template,
             email_template=email_template,
             extra_attributes=extra_attributes,
+            confirm_localpart=confirm_localpart,
         )
 
     def get_remote_user_id(self, userinfo: UserInfo) -> str:
@@ -1398,7 +1411,10 @@ class JinjaOidcMappingProvider(OidcMappingProvider[JinjaOidcMappingConfig]):
             emails.append(email)
 
         return UserAttributeDict(
-            localpart=localpart, display_name=display_name, emails=emails
+            localpart=localpart,
+            display_name=display_name,
+            emails=emails,
+            confirm_localpart=self._config.confirm_localpart,
         )
 
     async def get_extra_attributes(self, userinfo: UserInfo, token: Token) -> JsonDict:
diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 5c01a426ff..876b879483 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 import logging
-from typing import TYPE_CHECKING, Any, Collection, Dict, List, Optional, Set
+from typing import TYPE_CHECKING, Collection, Dict, List, Optional, Set
 
 import attr
 
@@ -22,6 +22,7 @@ from twisted.python.failure import Failure
 from synapse.api.constants import EventTypes, Membership
 from synapse.api.errors import SynapseError
 from synapse.api.filtering import Filter
+from synapse.events.utils import SerializeEventConfig
 from synapse.handlers.room import ShutdownRoomResponse
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.storage.state import StateFilter
@@ -133,6 +134,7 @@ class PaginationHandler:
         self.clock = hs.get_clock()
         self._server_name = hs.hostname
         self._room_shutdown_handler = hs.get_room_shutdown_handler()
+        self._relations_handler = hs.get_relations_handler()
 
         self.pagination_lock = ReadWriteLock()
         # IDs of rooms in which there currently an active purge *or delete* operation.
@@ -349,7 +351,7 @@ class PaginationHandler:
         """
         self._purges_in_progress_by_room.add(room_id)
         try:
-            with await self.pagination_lock.write(room_id):
+            async with self.pagination_lock.write(room_id):
                 await self.storage.purge_events.purge_history(
                     room_id, token, delete_local_events
                 )
@@ -405,7 +407,7 @@ class PaginationHandler:
             room_id: room to be purged
             force: set true to skip checking for joined users.
         """
-        with await self.pagination_lock.write(room_id):
+        async with self.pagination_lock.write(room_id):
             # first check that we have no users in this room
             if not force:
                 joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -421,7 +423,7 @@ class PaginationHandler:
         pagin_config: PaginationConfig,
         as_client_event: bool = True,
         event_filter: Optional[Filter] = None,
-    ) -> Dict[str, Any]:
+    ) -> JsonDict:
         """Get messages in a room.
 
         Args:
@@ -430,6 +432,7 @@ class PaginationHandler:
             pagin_config: The pagination config rules to apply, if any.
             as_client_event: True to get events in client-server format.
             event_filter: Filter to apply to results or None
+
         Returns:
             Pagination API results
         """
@@ -447,7 +450,7 @@ class PaginationHandler:
 
         room_token = from_token.room_key
 
-        with await self.pagination_lock.read(room_id):
+        async with self.pagination_lock.read(room_id):
             (
                 membership,
                 member_event_id,
@@ -537,17 +540,21 @@ class PaginationHandler:
                 state_dict = await self.store.get_events(list(state_ids.values()))
                 state = state_dict.values()
 
-        aggregations = await self.store.get_bundled_aggregations(events, user_id)
+        aggregations = await self._relations_handler.get_bundled_aggregations(
+            events, user_id
+        )
 
         time_now = self.clock.time_msec()
 
+        serialize_options = SerializeEventConfig(as_client_event=as_client_event)
+
         chunk = {
             "chunk": (
                 self._event_serializer.serialize_events(
                     events,
                     time_now,
+                    config=serialize_options,
                     bundle_aggregations=aggregations,
-                    as_client_event=as_client_event,
                 )
             ),
             "start": await from_token.to_string(self.store),
@@ -556,7 +563,7 @@ class PaginationHandler:
 
         if state:
             chunk["state"] = self._event_serializer.serialize_events(
-                state, time_now, as_client_event=as_client_event
+                state, time_now, config=serialize_options
             )
 
         return chunk
@@ -612,7 +619,7 @@ class PaginationHandler:
 
         self._purges_in_progress_by_room.add(room_id)
         try:
-            with await self.pagination_lock.write(room_id):
+            async with self.pagination_lock.write(room_id):
                 self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
                 self._delete_by_id[
                     delete_id
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index c155098bee..34d9411bbf 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -267,7 +267,6 @@ class BasePresenceHandler(abc.ABC):
             is_syncing: Whether or not the user is now syncing
             sync_time_msec: Time in ms when the user was last syncing
         """
-        pass
 
     async def update_external_syncs_clear(self, process_id: str) -> None:
         """Marks all users that had been marked as syncing by a given process
@@ -277,7 +276,6 @@ class BasePresenceHandler(abc.ABC):
 
         This is a no-op when presence is handled by a different worker.
         """
-        pass
 
     async def process_replication_rows(
         self, stream_name: str, instance_name: str, token: int, rows: list
@@ -424,13 +422,13 @@ class WorkerPresenceHandler(BasePresenceHandler):
 
     async def _on_shutdown(self) -> None:
         if self._presence_enabled:
-            self.hs.get_tcp_replication().send_command(
+            self.hs.get_replication_command_handler().send_command(
                 ClearUserSyncsCommand(self.instance_id)
             )
 
     def send_user_sync(self, user_id: str, is_syncing: bool, last_sync_ms: int) -> None:
         if self._presence_enabled:
-            self.hs.get_tcp_replication().send_user_sync(
+            self.hs.get_replication_command_handler().send_user_sync(
                 self.instance_id, user_id, is_syncing, last_sync_ms
             )
 
diff --git a/synapse/handlers/profile.py b/synapse/handlers/profile.py
index 6554c0d3c2..239b0aa744 100644
--- a/synapse/handlers/profile.py
+++ b/synapse/handlers/profile.py
@@ -336,12 +336,18 @@ class ProfileHandler:
         """Check that the size and content type of the avatar at the given MXC URI are
         within the configured limits.
 
+        If the given `mxc` is empty, no checks are performed. (Users are always able to
+        unset their avatar.)
+
         Args:
             mxc: The MXC URI at which the avatar can be found.
 
         Returns:
              A boolean indicating whether the file can be allowed to be set as an avatar.
         """
+        if mxc == "":
+            return True
+
         if not self.max_avatar_size and not self.allowed_avatar_mimetypes:
             return True
 
diff --git a/synapse/handlers/receipts.py b/synapse/handlers/receipts.py
index b4132c353a..6250bb3bdf 100644
--- a/synapse/handlers/receipts.py
+++ b/synapse/handlers/receipts.py
@@ -269,7 +269,7 @@ class ReceiptEventSource(EventSource[int, JsonDict]):
         # Then filter down to rooms that the AS can read
         events = []
         for room_id, event in rooms_to_events.items():
-            if not await service.matches_user_in_member_list(room_id, self.store):
+            if not await service.is_interested_in_room(room_id, self.store):
                 continue
 
             events.append(event)
diff --git a/synapse/handlers/relations.py b/synapse/handlers/relations.py
new file mode 100644
index 0000000000..73217d135d
--- /dev/null
+++ b/synapse/handlers/relations.py
@@ -0,0 +1,271 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+import logging
+from typing import TYPE_CHECKING, Dict, Iterable, Optional, cast
+
+import attr
+from frozendict import frozendict
+
+from synapse.api.constants import RelationTypes
+from synapse.api.errors import SynapseError
+from synapse.events import EventBase
+from synapse.types import JsonDict, Requester, StreamToken
+from synapse.visibility import filter_events_for_client
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+    from synapse.storage.databases.main import DataStore
+
+
+logger = logging.getLogger(__name__)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _ThreadAggregation:
+    # The latest event in the thread.
+    latest_event: EventBase
+    # The latest edit to the latest event in the thread.
+    latest_edit: Optional[EventBase]
+    # The total number of events in the thread.
+    count: int
+    # True if the current user has sent an event to the thread.
+    current_user_participated: bool
+
+
+@attr.s(slots=True, auto_attribs=True)
+class BundledAggregations:
+    """
+    The bundled aggregations for an event.
+
+    Some values require additional processing during serialization.
+    """
+
+    annotations: Optional[JsonDict] = None
+    references: Optional[JsonDict] = None
+    replace: Optional[EventBase] = None
+    thread: Optional[_ThreadAggregation] = None
+
+    def __bool__(self) -> bool:
+        return bool(self.annotations or self.references or self.replace or self.thread)
+
+
+class RelationsHandler:
+    def __init__(self, hs: "HomeServer"):
+        self._main_store = hs.get_datastores().main
+        self._storage = hs.get_storage()
+        self._auth = hs.get_auth()
+        self._clock = hs.get_clock()
+        self._event_handler = hs.get_event_handler()
+        self._event_serializer = hs.get_event_client_serializer()
+
+    async def get_relations(
+        self,
+        requester: Requester,
+        event_id: str,
+        room_id: str,
+        relation_type: Optional[str] = None,
+        event_type: Optional[str] = None,
+        aggregation_key: Optional[str] = None,
+        limit: int = 5,
+        direction: str = "b",
+        from_token: Optional[StreamToken] = None,
+        to_token: Optional[StreamToken] = None,
+    ) -> JsonDict:
+        """Get related events of a event, ordered by topological ordering.
+
+        TODO Accept a PaginationConfig instead of individual pagination parameters.
+
+        Args:
+            requester: The user requesting the relations.
+            event_id: Fetch events that relate to this event ID.
+            room_id: The room the event belongs to.
+            relation_type: Only fetch events with this relation type, if given.
+            event_type: Only fetch events with this event type, if given.
+            aggregation_key: Only fetch events with this aggregation key, if given.
+            limit: Only fetch the most recent `limit` events.
+            direction: Whether to fetch the most recent first (`"b"`) or the
+                oldest first (`"f"`).
+            from_token: Fetch rows from the given token, or from the start if None.
+            to_token: Fetch rows up to the given token, or up to the end if None.
+
+        Returns:
+            The pagination chunk.
+        """
+
+        user_id = requester.user.to_string()
+
+        # TODO Properly handle a user leaving a room.
+        (_, member_event_id) = await self._auth.check_user_in_room_or_world_readable(
+            room_id, user_id, allow_departed_users=True
+        )
+
+        # This gets the original event and checks that a) the event exists and
+        # b) the user is allowed to view it.
+        event = await self._event_handler.get_event(requester.user, room_id, event_id)
+        if event is None:
+            raise SynapseError(404, "Unknown parent event.")
+
+        pagination_chunk = await self._main_store.get_relations_for_event(
+            event_id=event_id,
+            event=event,
+            room_id=room_id,
+            relation_type=relation_type,
+            event_type=event_type,
+            aggregation_key=aggregation_key,
+            limit=limit,
+            direction=direction,
+            from_token=from_token,
+            to_token=to_token,
+        )
+
+        events = await self._main_store.get_events_as_list(
+            [c["event_id"] for c in pagination_chunk.chunk]
+        )
+
+        events = await filter_events_for_client(
+            self._storage, user_id, events, is_peeking=(member_event_id is None)
+        )
+
+        now = self._clock.time_msec()
+        # Do not bundle aggregations when retrieving the original event because
+        # we want the content before relations are applied to it.
+        original_event = self._event_serializer.serialize_event(
+            event, now, bundle_aggregations=None
+        )
+        # The relations returned for the requested event do include their
+        # bundled aggregations.
+        aggregations = await self.get_bundled_aggregations(
+            events, requester.user.to_string()
+        )
+        serialized_events = self._event_serializer.serialize_events(
+            events, now, bundle_aggregations=aggregations
+        )
+
+        return_value = await pagination_chunk.to_dict(self._main_store)
+        return_value["chunk"] = serialized_events
+        return_value["original_event"] = original_event
+
+        return return_value
+
+    async def _get_bundled_aggregation_for_event(
+        self, event: EventBase, user_id: str
+    ) -> Optional[BundledAggregations]:
+        """Generate bundled aggregations for an event.
+
+        Note that this does not use a cache, but depends on cached methods.
+
+        Args:
+            event: The event to calculate bundled aggregations for.
+            user_id: The user requesting the bundled aggregations.
+
+        Returns:
+            The bundled aggregations for an event, if bundled aggregations are
+            enabled and the event can have bundled aggregations.
+        """
+
+        # Do not bundle aggregations for an event which represents an edit or an
+        # annotation. It does not make sense for them to have related events.
+        relates_to = event.content.get("m.relates_to")
+        if isinstance(relates_to, (dict, frozendict)):
+            relation_type = relates_to.get("rel_type")
+            if relation_type in (RelationTypes.ANNOTATION, RelationTypes.REPLACE):
+                return None
+
+        event_id = event.event_id
+        room_id = event.room_id
+
+        # The bundled aggregations to include, a mapping of relation type to a
+        # type-specific value. Some types include the direct return type here
+        # while others need more processing during serialization.
+        aggregations = BundledAggregations()
+
+        annotations = await self._main_store.get_aggregation_groups_for_event(
+            event_id, room_id
+        )
+        if annotations.chunk:
+            aggregations.annotations = await annotations.to_dict(
+                cast("DataStore", self)
+            )
+
+        references = await self._main_store.get_relations_for_event(
+            event_id, event, room_id, RelationTypes.REFERENCE, direction="f"
+        )
+        if references.chunk:
+            aggregations.references = await references.to_dict(cast("DataStore", self))
+
+        # Store the bundled aggregations in the event metadata for later use.
+        return aggregations
+
+    async def get_bundled_aggregations(
+        self, events: Iterable[EventBase], user_id: str
+    ) -> Dict[str, BundledAggregations]:
+        """Generate bundled aggregations for events.
+
+        Args:
+            events: The iterable of events to calculate bundled aggregations for.
+            user_id: The user requesting the bundled aggregations.
+
+        Returns:
+            A map of event ID to the bundled aggregation for the event. Not all
+            events may have bundled aggregations in the results.
+        """
+        # De-duplicate events by ID to handle the same event requested multiple times.
+        #
+        # State events do not get bundled aggregations.
+        events_by_id = {
+            event.event_id: event for event in events if not event.is_state()
+        }
+
+        # event ID -> bundled aggregation in non-serialized form.
+        results: Dict[str, BundledAggregations] = {}
+
+        # Fetch other relations per event.
+        for event in events_by_id.values():
+            event_result = await self._get_bundled_aggregation_for_event(event, user_id)
+            if event_result:
+                results[event.event_id] = event_result
+
+        # Fetch any edits (but not for redacted events).
+        edits = await self._main_store.get_applicable_edits(
+            [
+                event_id
+                for event_id, event in events_by_id.items()
+                if not event.internal_metadata.is_redacted()
+            ]
+        )
+        for event_id, edit in edits.items():
+            results.setdefault(event_id, BundledAggregations()).replace = edit
+
+        # Fetch thread summaries.
+        summaries = await self._main_store.get_thread_summaries(events_by_id.keys())
+        # Only fetch participated for a limited selection based on what had
+        # summaries.
+        participated = await self._main_store.get_threads_participated(
+            [event_id for event_id, summary in summaries.items() if summary], user_id
+        )
+        for event_id, summary in summaries.items():
+            if summary:
+                thread_count, latest_thread_event, edit = summary
+                results.setdefault(
+                    event_id, BundledAggregations()
+                ).thread = _ThreadAggregation(
+                    latest_event=latest_thread_event,
+                    latest_edit=edit,
+                    count=thread_count,
+                    # If there's a thread summary it must also exist in the
+                    # participated dictionary.
+                    current_user_participated=participated[event_id],
+                )
+
+        return results
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 7b965b4b96..092e185c99 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -60,8 +60,8 @@ from synapse.events import EventBase
 from synapse.events.utils import copy_power_levels_contents
 from synapse.federation.federation_client import InvalidResponseError
 from synapse.handlers.federation import get_domains_from_state
+from synapse.handlers.relations import BundledAggregations
 from synapse.rest.admin._base import assert_user_is_admin
-from synapse.storage.databases.main.relations import BundledAggregations
 from synapse.storage.state import StateFilter
 from synapse.streams import EventSource
 from synapse.types import (
@@ -1118,6 +1118,7 @@ class RoomContextHandler:
         self.store = hs.get_datastores().main
         self.storage = hs.get_storage()
         self.state_store = self.storage.state
+        self._relations_handler = hs.get_relations_handler()
 
     async def get_event_context(
         self,
@@ -1190,7 +1191,7 @@ class RoomContextHandler:
         event = filtered[0]
 
         # Fetch the aggregations.
-        aggregations = await self.store.get_bundled_aggregations(
+        aggregations = await self._relations_handler.get_bundled_aggregations(
             itertools.chain(events_before, (event,), events_after),
             user.to_string(),
         )
@@ -1475,6 +1476,7 @@ class RoomShutdownHandler:
         self.room_member_handler = hs.get_room_member_handler()
         self._room_creation_handler = hs.get_room_creation_handler()
         self._replication = hs.get_replication_data_handler()
+        self._third_party_rules = hs.get_third_party_event_rules()
         self.event_creation_handler = hs.get_event_creation_handler()
         self.store = hs.get_datastores().main
 
@@ -1548,6 +1550,13 @@ class RoomShutdownHandler:
         if not RoomID.is_valid(room_id):
             raise SynapseError(400, "%s is not a legal room ID" % (room_id,))
 
+        if not await self._third_party_rules.check_can_shutdown_room(
+            requester_user_id, room_id
+        ):
+            raise SynapseError(
+                403, "Shutdown of this room is forbidden", Codes.FORBIDDEN
+            )
+
         # Action the block first (even if the room doesn't exist yet)
         if block:
             # This will work even if the room is already blocked, but that is
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index abbf7b7b27..a0255bd143 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -121,12 +121,11 @@ class RoomBatchHandler:
 
         return create_requester(user_id, app_service=app_service)
 
-    async def get_most_recent_auth_event_ids_from_event_id_list(
+    async def get_most_recent_full_state_ids_from_event_id_list(
         self, event_ids: List[str]
     ) -> List[str]:
-        """Find the most recent auth event ids (derived from state events) that
-        allowed that message to be sent. We will use this as a base
-        to auth our historical messages against.
+        """Find the most recent event_id and grab the full state at that event.
+        We will use this as a base to auth our historical messages against.
 
         Args:
             event_ids: List of event ID's to look at
@@ -136,38 +135,37 @@ class RoomBatchHandler:
         """
 
         (
-            most_recent_prev_event_id,
+            most_recent_event_id,
             _,
         ) = await self.store.get_max_depth_of(event_ids)
         # mapping from (type, state_key) -> state_event_id
         prev_state_map = await self.state_store.get_state_ids_for_event(
-            most_recent_prev_event_id
+            most_recent_event_id
         )
         # List of state event ID's
-        prev_state_ids = list(prev_state_map.values())
-        auth_event_ids = prev_state_ids
+        full_state_ids = list(prev_state_map.values())
 
-        return auth_event_ids
+        return full_state_ids
 
     async def persist_state_events_at_start(
         self,
         state_events_at_start: List[JsonDict],
         room_id: str,
-        initial_auth_event_ids: List[str],
+        initial_state_event_ids: List[str],
         app_service_requester: Requester,
     ) -> List[str]:
         """Takes all `state_events_at_start` event dictionaries and creates/persists
-        them as floating state events which don't resolve into the current room state.
-        They are floating because they reference a fake prev_event which doesn't connect
-        to the normal DAG at all.
+        them in a floating state event chain which don't resolve into the current room
+        state. They are floating because they reference no prev_events and are marked
+        as outliers which disconnects them from the normal DAG.
 
         Args:
             state_events_at_start:
             room_id: Room where you want the events persisted in.
-            initial_auth_event_ids: These will be the auth_events for the first
-                state event created. Each event created afterwards will be
-                added to the list of auth events for the next state event
-                created.
+            initial_state_event_ids:
+                The base set of state for the historical batch which the floating
+                state chain will derive from. This should probably be the state
+                from the `prev_event` defined by `/batch_send?prev_event_id=$abc`.
             app_service_requester: The requester of an application service.
 
         Returns:
@@ -176,7 +174,7 @@ class RoomBatchHandler:
         assert app_service_requester.app_service
 
         state_event_ids_at_start = []
-        auth_event_ids = initial_auth_event_ids.copy()
+        state_event_ids = initial_state_event_ids.copy()
 
         # Make the state events float off on their own by specifying no
         # prev_events for the first one in the chain so we don't have a bunch of
@@ -189,9 +187,7 @@ class RoomBatchHandler:
             )
 
             logger.debug(
-                "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s",
-                state_event,
-                auth_event_ids,
+                "RoomBatchSendEventRestServlet inserting state_event=%s", state_event
             )
 
             event_dict = {
@@ -217,16 +213,26 @@ class RoomBatchHandler:
                     room_id=room_id,
                     action=membership,
                     content=event_dict["content"],
+                    # Mark as an outlier to disconnect it from the normal DAG
+                    # and not show up between batches of history.
                     outlier=True,
                     historical=True,
-                    # Only the first event in the chain should be floating.
+                    # Only the first event in the state chain should be floating.
                     # The rest should hang off each other in a chain.
                     allow_no_prev_events=index == 0,
                     prev_event_ids=prev_event_ids_for_state_chain,
+                    # Since each state event is marked as an outlier, the
+                    # `EventContext.for_outlier()` won't have any `state_ids`
+                    # set and therefore can't derive any state even though the
+                    # prev_events are set. Also since the first event in the
+                    # state chain is floating with no `prev_events`, it can't
+                    # derive state from anywhere automatically. So we need to
+                    # set some state explicitly.
+                    #
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
-                    auth_event_ids=auth_event_ids.copy(),
+                    state_event_ids=state_event_ids.copy(),
                 )
             else:
                 # TODO: Add some complement tests that adds state that is not member joins
@@ -240,21 +246,31 @@ class RoomBatchHandler:
                         state_event["sender"], app_service_requester.app_service
                     ),
                     event_dict,
+                    # Mark as an outlier to disconnect it from the normal DAG
+                    # and not show up between batches of history.
                     outlier=True,
                     historical=True,
-                    # Only the first event in the chain should be floating.
+                    # Only the first event in the state chain should be floating.
                     # The rest should hang off each other in a chain.
                     allow_no_prev_events=index == 0,
                     prev_event_ids=prev_event_ids_for_state_chain,
+                    # Since each state event is marked as an outlier, the
+                    # `EventContext.for_outlier()` won't have any `state_ids`
+                    # set and therefore can't derive any state even though the
+                    # prev_events are set. Also since the first event in the
+                    # state chain is floating with no `prev_events`, it can't
+                    # derive state from anywhere automatically. So we need to
+                    # set some state explicitly.
+                    #
                     # Make sure to use a copy of this list because we modify it
                     # later in the loop here. Otherwise it will be the same
                     # reference and also update in the event when we append later.
-                    auth_event_ids=auth_event_ids.copy(),
+                    state_event_ids=state_event_ids.copy(),
                 )
                 event_id = event.event_id
 
             state_event_ids_at_start.append(event_id)
-            auth_event_ids.append(event_id)
+            state_event_ids.append(event_id)
             # Connect all the state in a floating chain
             prev_event_ids_for_state_chain = [event_id]
 
@@ -265,7 +281,7 @@ class RoomBatchHandler:
         events_to_create: List[JsonDict],
         room_id: str,
         inherited_depth: int,
-        auth_event_ids: List[str],
+        initial_state_event_ids: List[str],
         app_service_requester: Requester,
     ) -> List[str]:
         """Create and persists all events provided sequentially. Handles the
@@ -281,8 +297,10 @@ class RoomBatchHandler:
             room_id: Room where you want the events persisted in.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
-            auth_event_ids: Define which events allow you to create the given
-                event in the room.
+            initial_state_event_ids:
+                This is used to set explicit state for the insertion event at
+                the start of the historical batch since it's floating with no
+                prev_events to derive state from automatically.
             app_service_requester: The requester of an application service.
 
         Returns:
@@ -290,6 +308,11 @@ class RoomBatchHandler:
         """
         assert app_service_requester.app_service
 
+        # We expect the first event in a historical batch to be an insertion event
+        assert events_to_create[0]["type"] == EventTypes.MSC2716_INSERTION
+        # We expect the last event in a historical batch to be an batch event
+        assert events_to_create[-1]["type"] == EventTypes.MSC2716_BATCH
+
         # Make the historical event chain float off on its own by specifying no
         # prev_events for the first event in the chain which causes the HS to
         # ask for the state at the start of the batch later.
@@ -321,11 +344,16 @@ class RoomBatchHandler:
                     ev["sender"], app_service_requester.app_service
                 ),
                 event_dict,
-                # Only the first event in the chain should be floating.
-                # The rest should hang off each other in a chain.
+                # Only the first event (which is the insertion event) in the
+                # chain should be floating. The rest should hang off each other
+                # in a chain.
                 allow_no_prev_events=index == 0,
                 prev_event_ids=event_dict.get("prev_events"),
-                auth_event_ids=auth_event_ids,
+                # Since the first event (which is the insertion event) in the
+                # chain is floating with no `prev_events`, it can't derive state
+                # from anywhere automatically. So we need to set some state
+                # explicitly.
+                state_event_ids=initial_state_event_ids if index == 0 else None,
                 historical=True,
                 depth=inherited_depth,
             )
@@ -343,10 +371,9 @@ class RoomBatchHandler:
             )
 
             logger.debug(
-                "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
+                "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s",
                 event,
                 prev_event_ids,
-                auth_event_ids,
             )
 
             events_to_persist.append((event, context))
@@ -376,12 +403,12 @@ class RoomBatchHandler:
         room_id: str,
         batch_id_to_connect_to: str,
         inherited_depth: int,
-        auth_event_ids: List[str],
+        initial_state_event_ids: List[str],
         app_service_requester: Requester,
     ) -> Tuple[List[str], str]:
         """
-        Handles creating and persisting all of the historical events as well
-        as insertion and batch meta events to make the batch navigable in the DAG.
+        Handles creating and persisting all of the historical events as well as
+        insertion and batch meta events to make the batch navigable in the DAG.
 
         Args:
             events_to_create: List of historical events to create in JSON
@@ -391,8 +418,13 @@ class RoomBatchHandler:
                 want this batch to connect to.
             inherited_depth: The depth to create the events at (you will
                 probably by calling inherit_depth_from_prev_ids(...)).
-            auth_event_ids: Define which events allow you to create the given
-                event in the room.
+            initial_state_event_ids:
+                This is used to set explicit state for the insertion event at
+                the start of the historical batch since it's floating with no
+                prev_events to derive state from automatically. This should
+                probably be the state from the `prev_event` defined by
+                `/batch_send?prev_event_id=$abc` plus the outcome of
+                `persist_state_events_at_start`
             app_service_requester: The requester of an application service.
 
         Returns:
@@ -438,7 +470,7 @@ class RoomBatchHandler:
             events_to_create=events_to_create,
             room_id=room_id,
             inherited_depth=inherited_depth,
-            auth_event_ids=auth_event_ids,
+            initial_state_event_ids=initial_state_event_ids,
             app_service_requester=app_service_requester,
         )
 
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a582837cf0..a33fa34aa8 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -272,6 +272,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
+        state_event_ids: Optional[List[str]] = None,
         txn_id: Optional[str] = None,
         ratelimit: bool = True,
         content: Optional[dict] = None,
@@ -298,6 +299,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 The event ids to use as the auth_events for the new event.
                 Should normally be left as None, which will cause them to be calculated
                 based on the room state at the prev_events.
+            state_event_ids:
+                The full state at a given event. This is used particularly by the MSC2716
+                /batch_send endpoint. One use case is the historical `state_events_at_start`;
+                since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
+                have any `state_ids` set and therefore can't derive any state even though the
+                prev_events are set so we need to set them ourself via this argument.
+                This should normally be left as None, which will cause the auth_event_ids
+                to be calculated based on the room state at the prev_events.
 
             txn_id:
             ratelimit:
@@ -353,6 +362,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             allow_no_prev_events=allow_no_prev_events,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
+            state_event_ids=state_event_ids,
             require_consent=require_consent,
             outlier=outlier,
             historical=historical,
@@ -456,6 +466,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
+        state_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
         """Update a user's membership in a room.
 
@@ -487,6 +498,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 The event ids to use as the auth_events for the new event.
                 Should normally be left as None, which will cause them to be calculated
                 based on the room state at the prev_events.
+            state_event_ids:
+                The full state at a given event. This is used particularly by the MSC2716
+                /batch_send endpoint. One use case is the historical `state_events_at_start`;
+                since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
+                have any `state_ids` set and therefore can't derive any state even though the
+                prev_events are set so we need to set them ourself via this argument.
+                This should normally be left as None, which will cause the auth_event_ids
+                to be calculated based on the room state at the prev_events.
 
         Returns:
             A tuple of the new event ID and stream ID.
@@ -526,6 +545,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                     allow_no_prev_events=allow_no_prev_events,
                     prev_event_ids=prev_event_ids,
                     auth_event_ids=auth_event_ids,
+                    state_event_ids=state_event_ids,
                 )
 
         return result
@@ -548,6 +568,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         allow_no_prev_events: bool = False,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
+        state_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
         """Helper for update_membership.
 
@@ -581,6 +602,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 The event ids to use as the auth_events for the new event.
                 Should normally be left as None, which will cause them to be calculated
                 based on the room state at the prev_events.
+            state_event_ids:
+                The full state at a given event. This is used particularly by the MSC2716
+                /batch_send endpoint. One use case is the historical `state_events_at_start`;
+                since each is marked as an `outlier`, the `EventContext.for_outlier()` won't
+                have any `state_ids` set and therefore can't derive any state even though the
+                prev_events are set so we need to set them ourself via this argument.
+                This should normally be left as None, which will cause the auth_event_ids
+                to be calculated based on the room state at the prev_events.
 
         Returns:
             A tuple of the new event ID and stream ID.
@@ -708,6 +737,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 allow_no_prev_events=allow_no_prev_events,
                 prev_event_ids=prev_event_ids,
                 auth_event_ids=auth_event_ids,
+                state_event_ids=state_event_ids,
                 content=content,
                 require_consent=require_consent,
                 outlier=outlier,
@@ -932,6 +962,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             ratelimit=ratelimit,
             prev_event_ids=latest_event_ids,
             auth_event_ids=auth_event_ids,
+            state_event_ids=state_event_ids,
             content=content,
             require_consent=require_consent,
             outlier=outlier,
@@ -1736,8 +1767,8 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             txn_id=txn_id,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
+            outlier=True,
         )
-        event.internal_metadata.outlier = True
         event.internal_metadata.out_of_band_membership = True
 
         result_event = await self.event_creation_handler.handle_new_client_event(
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index 3979cbba71..486145f48a 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -295,7 +295,7 @@ class RoomSummaryHandler:
             # inaccessible to the requesting user.
             if room_entry:
                 # Add the room (including the stripped m.space.child events).
-                rooms_result.append(room_entry.as_json())
+                rooms_result.append(room_entry.as_json(for_client=True))
 
                 # If this room is not at the max-depth, check if there are any
                 # children to process.
@@ -843,14 +843,25 @@ class _RoomEntry:
     # This may not include all children.
     children_state_events: Sequence[JsonDict] = ()
 
-    def as_json(self) -> JsonDict:
+    def as_json(self, for_client: bool = False) -> JsonDict:
         """
         Returns a JSON dictionary suitable for the room hierarchy endpoint.
 
         It returns the room summary including the stripped m.space.child events
         as a sub-key.
+
+        Args:
+            for_client: If true, any server-server only fields are stripped from
+                the result.
+
         """
         result = dict(self.room)
+
+        # Before returning to the client, remove the allowed_room_ids key, if it
+        # exists.
+        if for_client:
+            result.pop("allowed_room_ids", False)
+
         result["children_state"] = self.children_state_events
         return result
 
diff --git a/synapse/handlers/search.py b/synapse/handlers/search.py
index aa16e417eb..30eddda65f 100644
--- a/synapse/handlers/search.py
+++ b/synapse/handlers/search.py
@@ -54,6 +54,7 @@ class SearchHandler:
         self.clock = hs.get_clock()
         self.hs = hs
         self._event_serializer = hs.get_event_client_serializer()
+        self._relations_handler = hs.get_relations_handler()
         self.storage = hs.get_storage()
         self.state_store = self.storage.state
         self.auth = hs.get_auth()
@@ -354,7 +355,7 @@ class SearchHandler:
 
         aggregations = None
         if self._msc3666_enabled:
-            aggregations = await self.store.get_bundled_aggregations(
+            aggregations = await self._relations_handler.get_bundled_aggregations(
                 # Generate an iterable of EventBase for all the events that will be
                 # returned, including contextual events.
                 itertools.chain(
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index ff5b5169ca..4f02a060d9 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -132,6 +132,7 @@ class UserAttributes:
     # if `None`, the mapper has not picked a userid, and the user should be prompted to
     # enter one.
     localpart: Optional[str]
+    confirm_localpart: bool = False
     display_name: Optional[str] = None
     emails: Collection[str] = attr.Factory(list)
 
@@ -561,9 +562,10 @@ class SsoHandler:
         # Must provide either attributes or session, not both
         assert (attributes is not None) != (session is not None)
 
-        if (attributes and attributes.localpart is None) or (
-            session and session.chosen_localpart is None
-        ):
+        if (
+            attributes
+            and (attributes.localpart is None or attributes.confirm_localpart is True)
+        ) or (session and session.chosen_localpart is None):
             return b"/_synapse/client/pick_username/account_details"
         elif self._consent_at_registration and not (
             session and session.terms_accepted_version
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 0aa3052fd6..6c569cfb1c 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -28,16 +28,16 @@ from typing import (
 import attr
 from prometheus_client import Counter
 
-from synapse.api.constants import AccountDataTypes, EventTypes, Membership, ReceiptTypes
+from synapse.api.constants import EventTypes, Membership, ReceiptTypes
 from synapse.api.filtering import FilterCollection
 from synapse.api.presence import UserPresenceState
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
+from synapse.handlers.relations import BundledAggregations
 from synapse.logging.context import current_context
 from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
 from synapse.push.clientformat import format_push_rules_for_user
 from synapse.storage.databases.main.event_push_actions import NotifCounts
-from synapse.storage.databases.main.relations import BundledAggregations
 from synapse.storage.roommember import MemberSummary
 from synapse.storage.state import StateFilter
 from synapse.types import (
@@ -269,6 +269,7 @@ class SyncHandler:
         self.store = hs.get_datastores().main
         self.notifier = hs.get_notifier()
         self.presence_handler = hs.get_presence_handler()
+        self._relations_handler = hs.get_relations_handler()
         self.event_sources = hs.get_event_sources()
         self.clock = hs.get_clock()
         self.state = hs.get_state_handler()
@@ -638,8 +639,10 @@ class SyncHandler:
         # as clients will have all the necessary information.
         bundled_aggregations = None
         if limited or newly_joined_room:
-            bundled_aggregations = await self.store.get_bundled_aggregations(
-                recents, sync_config.user.to_string()
+            bundled_aggregations = (
+                await self._relations_handler.get_bundled_aggregations(
+                    recents, sync_config.user.to_string()
+                )
             )
 
         return TimelineBatch(
@@ -1601,7 +1604,7 @@ class SyncHandler:
                         return set(), set(), set(), set()
 
         # 3. Work out which rooms need reporting in the sync response.
-        ignored_users = await self._get_ignored_users(user_id)
+        ignored_users = await self.store.ignored_users(user_id)
         if since_token:
             room_changes = await self._get_rooms_changed(
                 sync_result_builder, ignored_users
@@ -1627,7 +1630,6 @@ class SyncHandler:
             logger.debug("Generating room entry for %s", room_entry.room_id)
             await self._generate_room_entry(
                 sync_result_builder,
-                ignored_users,
                 room_entry,
                 ephemeral=ephemeral_by_room.get(room_entry.room_id, []),
                 tags=tags_by_room.get(room_entry.room_id),
@@ -1657,29 +1659,6 @@ class SyncHandler:
             newly_left_users,
         )
 
-    async def _get_ignored_users(self, user_id: str) -> FrozenSet[str]:
-        """Retrieve the users ignored by the given user from their global account_data.
-
-        Returns an empty set if
-        - there is no global account_data entry for ignored_users
-        - there is such an entry, but it's not a JSON object.
-        """
-        # TODO: Can we `SELECT ignored_user_id FROM ignored_users WHERE ignorer_user_id=?;` instead?
-        ignored_account_data = (
-            await self.store.get_global_account_data_by_type_for_user(
-                user_id=user_id, data_type=AccountDataTypes.IGNORED_USER_LIST
-            )
-        )
-
-        # If there is ignored users account data and it matches the proper type,
-        # then use it.
-        ignored_users: FrozenSet[str] = frozenset()
-        if ignored_account_data:
-            ignored_users_data = ignored_account_data.get("ignored_users", {})
-            if isinstance(ignored_users_data, dict):
-                ignored_users = frozenset(ignored_users_data.keys())
-        return ignored_users
-
     async def _have_rooms_changed(
         self, sync_result_builder: "SyncResultBuilder"
     ) -> bool:
@@ -2022,7 +2001,6 @@ class SyncHandler:
     async def _generate_room_entry(
         self,
         sync_result_builder: "SyncResultBuilder",
-        ignored_users: FrozenSet[str],
         room_builder: "RoomSyncResultBuilder",
         ephemeral: List[JsonDict],
         tags: Optional[Dict[str, Dict[str, Any]]],
@@ -2051,7 +2029,6 @@ class SyncHandler:
 
         Args:
             sync_result_builder
-            ignored_users: Set of users ignored by user.
             room_builder
             ephemeral: List of new ephemeral events for room
             tags: List of *all* tags for room, or None if there has been
diff --git a/synapse/handlers/typing.py b/synapse/handlers/typing.py
index 843c68eb0f..6854428b7c 100644
--- a/synapse/handlers/typing.py
+++ b/synapse/handlers/typing.py
@@ -160,8 +160,9 @@ class FollowerTypingHandler:
         """Should be called whenever we receive updates for typing stream."""
 
         if self._latest_room_serial > token:
-            # The master has gone backwards. To prevent inconsistent data, just
-            # clear everything.
+            # The typing worker has gone backwards (e.g. it may have restarted).
+            # To prevent inconsistent data, just clear everything.
+            logger.info("Typing handler stream went backwards; resetting")
             self._reset()
 
         # Set the latest serial token to whatever the server gave us.
@@ -486,9 +487,7 @@ class TypingNotificationEventSource(EventSource[int, JsonDict]):
                 if handler._room_serials[room_id] <= from_key:
                     continue
 
-                if not await service.matches_user_in_member_list(
-                    room_id, self._main_store
-                ):
+                if not await service.is_interested_in_room(room_id, self._main_store):
                     continue
 
                 events.append(self._make_event_for(room_id))
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d27ed2be6a..048fd4bb82 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -19,8 +19,8 @@ import synapse.metrics
 from synapse.api.constants import EventTypes, HistoryVisibility, JoinRules, Membership
 from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
 from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.storage.databases.main.user_directory import SearchResult
 from synapse.storage.roommember import ProfileInfo
-from synapse.types import JsonDict
 from synapse.util.metrics import Measure
 
 if TYPE_CHECKING:
@@ -78,7 +78,7 @@ class UserDirectoryHandler(StateDeltasHandler):
 
     async def search_users(
         self, user_id: str, search_term: str, limit: int
-    ) -> JsonDict:
+    ) -> SearchResult:
         """Searches for users in directory
 
         Returns: