summary refs log tree commit diff
path: root/synapse/handlers
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers')
-rw-r--r--synapse/handlers/auth.py28
-rw-r--r--synapse/handlers/federation.py1927
-rw-r--r--synapse/handlers/federation_event.py1825
-rw-r--r--synapse/handlers/initial_sync.py2
-rw-r--r--synapse/handlers/presence.py5
-rw-r--r--synapse/handlers/register.py18
-rw-r--r--synapse/handlers/room_member.py17
-rw-r--r--synapse/handlers/room_summary.py76
-rw-r--r--synapse/handlers/sso.py2
-rw-r--r--synapse/handlers/sync.py198
-rw-r--r--synapse/handlers/ui_auth/__init__.py5
-rw-r--r--synapse/handlers/ui_auth/checkers.py75
12 files changed, 2126 insertions, 2052 deletions
diff --git a/synapse/handlers/auth.py b/synapse/handlers/auth.py
index 161b3c933c..34725324a6 100644
--- a/synapse/handlers/auth.py
+++ b/synapse/handlers/auth.py
@@ -627,23 +627,28 @@ class AuthHandler(BaseHandler):
 
     async def add_oob_auth(
         self, stagetype: str, authdict: Dict[str, Any], clientip: str
-    ) -> bool:
+    ) -> None:
         """
         Adds the result of out-of-band authentication into an existing auth
         session. Currently used for adding the result of fallback auth.
+
+        Raises:
+            LoginError if the stagetype is unknown or the session is missing.
+            LoginError is raised by check_auth if authentication fails.
         """
         if stagetype not in self.checkers:
-            raise LoginError(400, "", Codes.MISSING_PARAM)
+            raise LoginError(
+                400, f"Unknown UIA stage type: {stagetype}", Codes.INVALID_PARAM
+            )
         if "session" not in authdict:
-            raise LoginError(400, "", Codes.MISSING_PARAM)
+            raise LoginError(400, "Missing session ID", Codes.MISSING_PARAM)
 
+        # If authentication fails a LoginError is raised. Otherwise, store
+        # the successful result.
         result = await self.checkers[stagetype].check_auth(authdict, clientip)
-        if result:
-            await self.store.mark_ui_auth_stage_complete(
-                authdict["session"], stagetype, result
-            )
-            return True
-        return False
+        await self.store.mark_ui_auth_stage_complete(
+            authdict["session"], stagetype, result
+        )
 
     def get_session_id(self, clientdict: Dict[str, Any]) -> Optional[str]:
         """
@@ -1459,6 +1464,10 @@ class AuthHandler(BaseHandler):
         )
 
         await self.store.user_delete_threepid(user_id, medium, address)
+        if medium == "email":
+            await self.store.delete_pusher_by_app_id_pushkey_user_id(
+                app_id="m.email", pushkey=address, user_id=user_id
+            )
         return result
 
     async def hash(self, password: str) -> str:
@@ -1727,7 +1736,6 @@ class AuthHandler(BaseHandler):
 
 @attr.s(slots=True)
 class MacaroonGenerator:
-
     hs = attr.ib()
 
     def generate_guest_access_token(self, user_id: str) -> str:
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c0e13bdaac..daf1d3bfb3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -17,23 +17,9 @@
 
 import itertools
 import logging
-from collections.abc import Container
 from http import HTTPStatus
-from typing import (
-    TYPE_CHECKING,
-    Collection,
-    Dict,
-    Iterable,
-    List,
-    Optional,
-    Sequence,
-    Set,
-    Tuple,
-    Union,
-)
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
 
-import attr
-from prometheus_client import Counter
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -41,19 +27,12 @@ from unpaddedbase64 import decode_base64
 from twisted.internet import defer
 
 from synapse import event_auth
-from synapse.api.constants import (
-    EventContentFields,
-    EventTypes,
-    Membership,
-    RejectedReason,
-    RoomEncryptionAlgorithms,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
     Codes,
     FederationDeniedError,
-    FederationError,
     HttpResponseException,
     NotFoundError,
     RequestSendFailed,
@@ -61,10 +40,10 @@ from synapse.api.errors import (
 )
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
-from synapse.event_auth import auth_types_for_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
+from synapse.federation.federation_client import InvalidResponseError
 from synapse.handlers._base import BaseHandler
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import (
@@ -74,28 +53,14 @@ from synapse.logging.context import (
     run_in_background,
 )
 from synapse.logging.utils import log_function
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
-    ReplicationFederationSendEventsRestServlet,
     ReplicationStoreRoomOnOutlierMembershipRestServlet,
 )
-from synapse.state import StateResolutionStore
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.types import (
-    JsonDict,
-    MutableStateMap,
-    PersistedEventPosition,
-    RoomStreamToken,
-    StateMap,
-    UserID,
-    get_domain_from_id,
-)
-from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.types import JsonDict, StateMap, get_domain_from_id
+from synapse.util.async_helpers import Linearizer
 from synapse.util.retryutils import NotRetryingDestination
-from synapse.util.stringutils import shortstr
 from synapse.visibility import filter_events_for_server
 
 if TYPE_CHECKING:
@@ -103,50 +68,11 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
-soft_failed_event_counter = Counter(
-    "synapse_federation_soft_failed_events_total",
-    "Events received over federation that we marked as soft_failed",
-)
-
-
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _NewEventInfo:
-    """Holds information about a received event, ready for passing to _auth_and_persist_events
-
-    Attributes:
-        event: the received event
-
-        state: the state at that event, according to /state_ids from a remote
-           homeserver. Only populated for backfilled events which are going to be a
-           new backwards extremity.
-
-        claimed_auth_event_map: a map of (type, state_key) => event for the event's
-            claimed auth_events.
-
-            This can include events which have not yet been persisted, in the case that
-            we are backfilling a batch of events.
-
-            Note: May be incomplete: if we were unable to find all of the claimed auth
-            events. Also, treat the contents with caution: the events might also have
-            been rejected, might not yet have been authorized themselves, or they might
-            be in the wrong room.
-
-    """
-
-    event: EventBase
-    state: Optional[Sequence[EventBase]]
-    claimed_auth_event_map: StateMap[EventBase]
-
 
 class FederationHandler(BaseHandler):
-    """Handles events that originated from federation.
-    Responsible for:
-    a) handling received Pdus before handing them on as Events to the rest
-    of the homeserver (including auth and state conflict resolutions)
-    b) converting events that were produced by local clients that may need
-    to be sent to remote homeservers.
-    c) doing the necessary dances to invite remote users and join remote
-    rooms.
+    """Handles general incoming federation requests
+
+    Incoming events are *not* handled here, for which see FederationEventHandler.
     """
 
     def __init__(self, hs: "HomeServer"):
@@ -159,979 +85,35 @@ class FederationHandler(BaseHandler):
         self.state_store = self.storage.state
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
-        self._state_resolution_handler = hs.get_state_resolution_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
-        self.action_generator = hs.get_action_generator()
         self.is_mine_id = hs.is_mine_id
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._event_auth_handler = hs.get_event_auth_handler()
-        self._message_handler = hs.get_message_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
         self.config = hs.config
         self.http_client = hs.get_proxied_blacklisted_http_client()
-        self._instance_name = hs.get_instance_name()
         self._replication = hs.get_replication_data_handler()
+        self._federation_event_handler = hs.get_federation_event_handler()
 
-        self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
             hs
         )
 
         if hs.config.worker_app:
-            self._user_device_resync = (
-                ReplicationUserDevicesResyncRestServlet.make_client(hs)
-            )
             self._maybe_store_room_on_outlier_membership = (
                 ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs)
             )
         else:
-            self._device_list_updater = hs.get_device_handler().device_list_updater
             self._maybe_store_room_on_outlier_membership = (
                 self.store.maybe_store_room_on_outlier_membership
             )
 
-        # When joining a room we need to queue any events for that room up.
-        # For each room, a list of (pdu, origin) tuples.
-        self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {}
-        self._room_pdu_linearizer = Linearizer("fed_room_pdu")
-
         self._room_backfill = Linearizer("room_backfill")
 
         self.third_party_event_rules = hs.get_third_party_event_rules()
 
-        self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
-
-    async def on_receive_pdu(
-        self, origin: str, pdu: EventBase, sent_to_us_directly: bool = False
-    ) -> None:
-        """Process a PDU received via a federation /send/ transaction, or
-        via backfill of missing prev_events
-
-        Args:
-            origin: server which initiated the /send/ transaction. Will
-                be used to fetch missing events or state.
-            pdu: received PDU
-            sent_to_us_directly: True if this event was pushed to us; False if
-                we pulled it as the result of a missing prev_event.
-        """
-
-        room_id = pdu.room_id
-        event_id = pdu.event_id
-
-        # We reprocess pdus when we have seen them only as outliers
-        existing = await self.store.get_event(
-            event_id, allow_none=True, allow_rejected=True
-        )
-
-        # FIXME: Currently we fetch an event again when we already have it
-        # if it has been marked as an outlier.
-        if existing:
-            if not existing.internal_metadata.is_outlier():
-                logger.info(
-                    "Ignoring received event %s which we have already seen", event_id
-                )
-                return
-            if pdu.internal_metadata.is_outlier():
-                logger.info(
-                    "Ignoring received outlier %s which we already have as an outlier",
-                    event_id,
-                )
-                return
-            logger.info("De-outliering event %s", event_id)
-
-        # do some initial sanity-checking of the event. In particular, make
-        # sure it doesn't have hundreds of prev_events or auth_events, which
-        # could cause a huge state resolution or cascade of event fetches.
-        try:
-            self._sanity_check_event(pdu)
-        except SynapseError as err:
-            logger.warning("Received event failed sanity checks")
-            raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
-
-        # If we are currently in the process of joining this room, then we
-        # queue up events for later processing.
-        if room_id in self.room_queues:
-            logger.info(
-                "Queuing PDU from %s for now: join in progress",
-                origin,
-            )
-            self.room_queues[room_id].append((pdu, origin))
-            return
-
-        # If we're not in the room just ditch the event entirely. This is
-        # probably an old server that has come back and thinks we're still in
-        # the room (or we've been rejoined to the room by a state reset).
-        #
-        # Note that if we were never in the room then we would have already
-        # dropped the event, since we wouldn't know the room version.
-        is_in_room = await self._event_auth_handler.check_host_in_room(
-            room_id, self.server_name
-        )
-        if not is_in_room:
-            logger.info(
-                "Ignoring PDU from %s as we're not in the room",
-                origin,
-            )
-            return None
-
-        state = None
-
-        # Check that the event passes auth based on the state at the event. This is
-        # done for events that are to be added to the timeline (non-outliers).
-        #
-        # Get missing pdus if necessary:
-        #  - Fetching any missing prev events to fill in gaps in the graph
-        #  - Fetching state if we have a hole in the graph
-        if not pdu.internal_metadata.is_outlier():
-            # We only backfill backwards to the min depth.
-            min_depth = await self.get_min_depth_for_context(pdu.room_id)
-
-            logger.debug("min_depth: %d", min_depth)
-
-            prevs = set(pdu.prev_event_ids())
-            seen = await self.store.have_events_in_timeline(prevs)
-
-            if min_depth is not None and pdu.depth < min_depth:
-                # This is so that we don't notify the user about this
-                # message, to work around the fact that some events will
-                # reference really really old events we really don't want to
-                # send to the clients.
-                pdu.internal_metadata.outlier = True
-            elif min_depth is not None and pdu.depth > min_depth:
-                missing_prevs = prevs - seen
-                if sent_to_us_directly and missing_prevs:
-                    # If we're missing stuff, ensure we only fetch stuff one
-                    # at a time.
-                    logger.info(
-                        "Acquiring room lock to fetch %d missing prev_events: %s",
-                        len(missing_prevs),
-                        shortstr(missing_prevs),
-                    )
-                    with (await self._room_pdu_linearizer.queue(pdu.room_id)):
-                        logger.info(
-                            "Acquired room lock to fetch %d missing prev_events",
-                            len(missing_prevs),
-                        )
-
-                        try:
-                            await self._get_missing_events_for_pdu(
-                                origin, pdu, prevs, min_depth
-                            )
-                        except Exception as e:
-                            raise Exception(
-                                "Error fetching missing prev_events for %s: %s"
-                                % (event_id, e)
-                            ) from e
-
-                        # Update the set of things we've seen after trying to
-                        # fetch the missing stuff
-                        seen = await self.store.have_events_in_timeline(prevs)
-
-                        if not prevs - seen:
-                            logger.info(
-                                "Found all missing prev_events",
-                            )
-
-            missing_prevs = prevs - seen
-            if missing_prevs:
-                # We've still not been able to get all of the prev_events for this event.
-                #
-                # In this case, we need to fall back to asking another server in the
-                # federation for the state at this event. That's ok provided we then
-                # resolve the state against other bits of the DAG before using it (which
-                # will ensure that you can't just take over a room by sending an event,
-                # withholding its prev_events, and declaring yourself to be an admin in
-                # the subsequent state request).
-                #
-                # Now, if we're pulling this event as a missing prev_event, then clearly
-                # this event is not going to become the only forward-extremity and we are
-                # guaranteed to resolve its state against our existing forward
-                # extremities, so that should be fine.
-                #
-                # On the other hand, if this event was pushed to us, it is possible for
-                # it to become the only forward-extremity in the room, and we would then
-                # trust its state to be the state for the whole room. This is very bad.
-                # Further, if the event was pushed to us, there is no excuse for us not to
-                # have all the prev_events. We therefore reject any such events.
-                #
-                # XXX this really feels like it could/should be merged with the above,
-                # but there is an interaction with min_depth that I'm not really
-                # following.
-
-                if sent_to_us_directly:
-                    logger.warning(
-                        "Rejecting: failed to fetch %d prev events: %s",
-                        len(missing_prevs),
-                        shortstr(missing_prevs),
-                    )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        (
-                            "Your server isn't divulging details about prev_events "
-                            "referenced in this event."
-                        ),
-                        affected=pdu.event_id,
-                    )
-
-                logger.info(
-                    "Event %s is missing prev_events %s: calculating state for a "
-                    "backwards extremity",
-                    event_id,
-                    shortstr(missing_prevs),
-                )
-
-                # Calculate the state after each of the previous events, and
-                # resolve them to find the correct state at the current event.
-                event_map = {event_id: pdu}
-                try:
-                    # Get the state of the events we know about
-                    ours = await self.state_store.get_state_groups_ids(room_id, seen)
-
-                    # state_maps is a list of mappings from (type, state_key) to event_id
-                    state_maps: List[StateMap[str]] = list(ours.values())
-
-                    # we don't need this any more, let's delete it.
-                    del ours
-
-                    # Ask the remote server for the states we don't
-                    # know about
-                    for p in missing_prevs:
-                        logger.info("Requesting state after missing prev_event %s", p)
-
-                        with nested_logging_context(p):
-                            # note that if any of the missing prevs share missing state or
-                            # auth events, the requests to fetch those events are deduped
-                            # by the get_pdu_cache in federation_client.
-                            remote_state = (
-                                await self._get_state_after_missing_prev_event(
-                                    origin, room_id, p
-                                )
-                            )
-
-                            remote_state_map = {
-                                (x.type, x.state_key): x.event_id for x in remote_state
-                            }
-                            state_maps.append(remote_state_map)
-
-                            for x in remote_state:
-                                event_map[x.event_id] = x
-
-                    room_version = await self.store.get_room_version_id(room_id)
-                    state_map = (
-                        await self._state_resolution_handler.resolve_events_with_store(
-                            room_id,
-                            room_version,
-                            state_maps,
-                            event_map,
-                            state_res_store=StateResolutionStore(self.store),
-                        )
-                    )
-
-                    # We need to give _process_received_pdu the actual state events
-                    # rather than event ids, so generate that now.
-
-                    # First though we need to fetch all the events that are in
-                    # state_map, so we can build up the state below.
-                    evs = await self.store.get_events(
-                        list(state_map.values()),
-                        get_prev_content=False,
-                        redact_behaviour=EventRedactBehaviour.AS_IS,
-                    )
-                    event_map.update(evs)
-
-                    state = [event_map[e] for e in state_map.values()]
-                except Exception:
-                    logger.warning(
-                        "Error attempting to resolve state at missing " "prev_events",
-                        exc_info=True,
-                    )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        "We can't get valid state history.",
-                        affected=event_id,
-                    )
-
-        # A second round of checks for all events. Check that the event passes auth
-        # based on `auth_events`, this allows us to assert that the event would
-        # have been allowed at some point. If an event passes this check its OK
-        # for it to be used as part of a returned `/state` request, as either
-        # a) we received the event as part of the original join and so trust it, or
-        # b) we'll do a state resolution with existing state before it becomes
-        # part of the "current state", which adds more protection.
-        await self._process_received_pdu(origin, pdu, state=state)
-
-    async def _get_missing_events_for_pdu(
-        self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
-    ) -> None:
-        """
-        Args:
-            origin: Origin of the pdu. Will be called to get the missing events
-            pdu: received pdu
-            prevs: List of event ids which we are missing
-            min_depth: Minimum depth of events to return.
-        """
-
-        room_id = pdu.room_id
-        event_id = pdu.event_id
-
-        seen = await self.store.have_events_in_timeline(prevs)
-
-        if not prevs - seen:
-            return
-
-        latest_list = await self.store.get_latest_event_ids_in_room(room_id)
-
-        # We add the prev events that we have seen to the latest
-        # list to ensure the remote server doesn't give them to us
-        latest = set(latest_list)
-        latest |= seen
-
-        logger.info(
-            "Requesting missing events between %s and %s",
-            shortstr(latest),
-            event_id,
-        )
-
-        # XXX: we set timeout to 10s to help workaround
-        # https://github.com/matrix-org/synapse/issues/1733.
-        # The reason is to avoid holding the linearizer lock
-        # whilst processing inbound /send transactions, causing
-        # FDs to stack up and block other inbound transactions
-        # which empirically can currently take up to 30 minutes.
-        #
-        # N.B. this explicitly disables retry attempts.
-        #
-        # N.B. this also increases our chances of falling back to
-        # fetching fresh state for the room if the missing event
-        # can't be found, which slightly reduces our security.
-        # it may also increase our DAG extremity count for the room,
-        # causing additional state resolution?  See #1760.
-        # However, fetching state doesn't hold the linearizer lock
-        # apparently.
-        #
-        # see https://github.com/matrix-org/synapse/pull/1744
-        #
-        # ----
-        #
-        # Update richvdh 2018/09/18: There are a number of problems with timing this
-        # request out aggressively on the client side:
-        #
-        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
-        #   if you send too many requests at once, so you end up with the server carefully
-        #   working through the backlog of your requests, which you have already timed
-        #   out.
-        #
-        # - for this request in particular, we now (as of
-        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
-        #   server can't produce a plausible-looking set of prev_events - so we becone
-        #   much more likely to reject the event.
-        #
-        # - contrary to what it says above, we do *not* fall back to fetching fresh state
-        #   for the room if get_missing_events times out. Rather, we give up processing
-        #   the PDU whose prevs we are missing, which then makes it much more likely that
-        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
-        #   problem.
-        #
-        # - the aggressive 10s timeout was introduced to deal with incoming federation
-        #   requests taking 8 hours to process. It's not entirely clear why that was going
-        #   on; certainly there were other issues causing traffic storms which are now
-        #   resolved, and I think in any case we may be more sensible about our locking
-        #   now. We're *certainly* more sensible about our logging.
-        #
-        # All that said: Let's try increasing the timeout to 60s and see what happens.
-
-        try:
-            missing_events = await self.federation_client.get_missing_events(
-                origin,
-                room_id,
-                earliest_events_ids=list(latest),
-                latest_events=[pdu],
-                limit=10,
-                min_depth=min_depth,
-                timeout=60000,
-            )
-        except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e:
-            # We failed to get the missing events, but since we need to handle
-            # the case of `get_missing_events` not returning the necessary
-            # events anyway, it is safe to simply log the error and continue.
-            logger.warning("Failed to get prev_events: %s", e)
-            return
-
-        logger.info("Got %d prev_events", len(missing_events))
-
-        # We want to sort these by depth so we process them and
-        # tell clients about them in order.
-        missing_events.sort(key=lambda x: x.depth)
-
-        for ev in missing_events:
-            logger.info("Handling received prev_event %s", ev)
-            with nested_logging_context(ev.event_id):
-                try:
-                    await self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
-                except FederationError as e:
-                    if e.code == 403:
-                        logger.warning(
-                            "Received prev_event %s failed history check.",
-                            ev.event_id,
-                        )
-                    else:
-                        raise
-
-    async def _get_state_for_room(
-        self,
-        destination: str,
-        room_id: str,
-        event_id: str,
-    ) -> List[EventBase]:
-        """Requests all of the room state at a given event from a remote
-        homeserver.
-
-        Will also fetch any missing events reported in the `auth_chain_ids`
-        section of `/state_ids`.
-
-        Args:
-            destination: The remote homeserver to query for the state.
-            room_id: The id of the room we're interested in.
-            event_id: The id of the event we want the state at.
-
-        Returns:
-            A list of events in the state, not including the event itself.
-        """
-        (
-            state_event_ids,
-            auth_event_ids,
-        ) = await self.federation_client.get_room_state_ids(
-            destination, room_id, event_id=event_id
-        )
-
-        # Fetch the state events from the DB, and check we have the auth events.
-        event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
-        auth_events_in_store = await self.store.have_seen_events(
-            room_id, auth_event_ids
-        )
-
-        # Check for missing events. We handle state and auth event seperately,
-        # as we want to pull the state from the DB, but we don't for the auth
-        # events. (Note: we likely won't use the majority of the auth chain, and
-        # it can be *huge* for large rooms, so it's worth ensuring that we don't
-        # unnecessarily pull it from the DB).
-        missing_state_events = set(state_event_ids) - set(event_map)
-        missing_auth_events = set(auth_event_ids) - set(auth_events_in_store)
-        if missing_state_events or missing_auth_events:
-            await self._get_events_and_persist(
-                destination=destination,
-                room_id=room_id,
-                events=missing_state_events | missing_auth_events,
-            )
-
-            if missing_state_events:
-                new_events = await self.store.get_events(
-                    missing_state_events, allow_rejected=True
-                )
-                event_map.update(new_events)
-
-                missing_state_events.difference_update(new_events)
-
-                if missing_state_events:
-                    logger.warning(
-                        "Failed to fetch missing state events for %s %s",
-                        event_id,
-                        missing_state_events,
-                    )
-
-            if missing_auth_events:
-                auth_events_in_store = await self.store.have_seen_events(
-                    room_id, missing_auth_events
-                )
-                missing_auth_events.difference_update(auth_events_in_store)
-
-                if missing_auth_events:
-                    logger.warning(
-                        "Failed to fetch missing auth events for %s %s",
-                        event_id,
-                        missing_auth_events,
-                    )
-
-        remote_state = list(event_map.values())
-
-        # check for events which were in the wrong room.
-        #
-        # this can happen if a remote server claims that the state or
-        # auth_events at an event in room A are actually events in room B
-
-        bad_events = [
-            (event.event_id, event.room_id)
-            for event in remote_state
-            if event.room_id != room_id
-        ]
-
-        for bad_event_id, bad_room_id in bad_events:
-            # This is a bogus situation, but since we may only discover it a long time
-            # after it happened, we try our best to carry on, by just omitting the
-            # bad events from the returned auth/state set.
-            logger.warning(
-                "Remote server %s claims event %s in room %s is an auth/state "
-                "event in room %s",
-                destination,
-                bad_event_id,
-                bad_room_id,
-                room_id,
-            )
-
-        if bad_events:
-            remote_state = [e for e in remote_state if e.room_id == room_id]
-
-        return remote_state
-
-    async def _get_state_after_missing_prev_event(
-        self,
-        destination: str,
-        room_id: str,
-        event_id: str,
-    ) -> List[EventBase]:
-        """Requests all of the room state at a given event from a remote homeserver.
-
-        Args:
-            destination: The remote homeserver to query for the state.
-            room_id: The id of the room we're interested in.
-            event_id: The id of the event we want the state at.
-
-        Returns:
-            A list of events in the state, including the event itself
-        """
-        # TODO: This function is basically the same as _get_state_for_room. Can
-        #   we make backfill() use it, rather than having two code paths? I think the
-        #   only difference is that backfill() persists the prev events separately.
-
-        (
-            state_event_ids,
-            auth_event_ids,
-        ) = await self.federation_client.get_room_state_ids(
-            destination, room_id, event_id=event_id
-        )
-
-        logger.debug(
-            "state_ids returned %i state events, %i auth events",
-            len(state_event_ids),
-            len(auth_event_ids),
-        )
-
-        # start by just trying to fetch the events from the store
-        desired_events = set(state_event_ids)
-        desired_events.add(event_id)
-        logger.debug("Fetching %i events from cache/store", len(desired_events))
-        fetched_events = await self.store.get_events(
-            desired_events, allow_rejected=True
-        )
-
-        missing_desired_events = desired_events - fetched_events.keys()
-        logger.debug(
-            "We are missing %i events (got %i)",
-            len(missing_desired_events),
-            len(fetched_events),
-        )
-
-        # We probably won't need most of the auth events, so let's just check which
-        # we have for now, rather than thrashing the event cache with them all
-        # unnecessarily.
-
-        # TODO: we probably won't actually need all of the auth events, since we
-        #   already have a bunch of the state events. It would be nice if the
-        #   federation api gave us a way of finding out which we actually need.
-
-        missing_auth_events = set(auth_event_ids) - fetched_events.keys()
-        missing_auth_events.difference_update(
-            await self.store.have_seen_events(room_id, missing_auth_events)
-        )
-        logger.debug("We are also missing %i auth events", len(missing_auth_events))
-
-        missing_events = missing_desired_events | missing_auth_events
-        logger.debug("Fetching %i events from remote", len(missing_events))
-        await self._get_events_and_persist(
-            destination=destination, room_id=room_id, events=missing_events
-        )
-
-        # we need to make sure we re-load from the database to get the rejected
-        # state correct.
-        fetched_events.update(
-            await self.store.get_events(missing_desired_events, allow_rejected=True)
-        )
-
-        # check for events which were in the wrong room.
-        #
-        # this can happen if a remote server claims that the state or
-        # auth_events at an event in room A are actually events in room B
-
-        bad_events = [
-            (event_id, event.room_id)
-            for event_id, event in fetched_events.items()
-            if event.room_id != room_id
-        ]
-
-        for bad_event_id, bad_room_id in bad_events:
-            # This is a bogus situation, but since we may only discover it a long time
-            # after it happened, we try our best to carry on, by just omitting the
-            # bad events from the returned state set.
-            logger.warning(
-                "Remote server %s claims event %s in room %s is an auth/state "
-                "event in room %s",
-                destination,
-                bad_event_id,
-                bad_room_id,
-                room_id,
-            )
-
-            del fetched_events[bad_event_id]
-
-        # if we couldn't get the prev event in question, that's a problem.
-        remote_event = fetched_events.get(event_id)
-        if not remote_event:
-            raise Exception("Unable to get missing prev_event %s" % (event_id,))
-
-        # missing state at that event is a warning, not a blocker
-        # XXX: this doesn't sound right? it means that we'll end up with incomplete
-        #   state.
-        failed_to_fetch = desired_events - fetched_events.keys()
-        if failed_to_fetch:
-            logger.warning(
-                "Failed to fetch missing state events for %s %s",
-                event_id,
-                failed_to_fetch,
-            )
-
-        remote_state = [
-            fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
-        ]
-
-        if remote_event.is_state() and remote_event.rejected_reason is None:
-            remote_state.append(remote_event)
-
-        return remote_state
-
-    async def _process_received_pdu(
-        self,
-        origin: str,
-        event: EventBase,
-        state: Optional[Iterable[EventBase]],
-    ) -> None:
-        """Called when we have a new pdu. We need to do auth checks and put it
-        through the StateHandler.
-
-        Args:
-            origin: server sending the event
-
-            event: event to be persisted
-
-            state: Normally None, but if we are handling a gap in the graph
-                (ie, we are missing one or more prev_events), the resolved state at the
-                event
-        """
-        logger.debug("Processing event: %s", event)
-
-        try:
-            context = await self.state_handler.compute_event_context(
-                event, old_state=state
-            )
-            await self._auth_and_persist_event(origin, event, context, state=state)
-        except AuthError as e:
-            raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
-
-        # For encrypted messages we check that we know about the sending device,
-        # if we don't then we mark the device cache for that user as stale.
-        if event.type == EventTypes.Encrypted:
-            device_id = event.content.get("device_id")
-            sender_key = event.content.get("sender_key")
-
-            cached_devices = await self.store.get_cached_devices_for_user(event.sender)
-
-            resync = False  # Whether we should resync device lists.
-
-            device = None
-            if device_id is not None:
-                device = cached_devices.get(device_id)
-                if device is None:
-                    logger.info(
-                        "Received event from remote device not in our cache: %s %s",
-                        event.sender,
-                        device_id,
-                    )
-                    resync = True
-
-            # We also check if the `sender_key` matches what we expect.
-            if sender_key is not None:
-                # Figure out what sender key we're expecting. If we know the
-                # device and recognize the algorithm then we can work out the
-                # exact key to expect. Otherwise check it matches any key we
-                # have for that device.
-
-                current_keys: Container[str] = []
-
-                if device:
-                    keys = device.get("keys", {}).get("keys", {})
-
-                    if (
-                        event.content.get("algorithm")
-                        == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
-                    ):
-                        # For this algorithm we expect a curve25519 key.
-                        key_name = "curve25519:%s" % (device_id,)
-                        current_keys = [keys.get(key_name)]
-                    else:
-                        # We don't know understand the algorithm, so we just
-                        # check it matches a key for the device.
-                        current_keys = keys.values()
-                elif device_id:
-                    # We don't have any keys for the device ID.
-                    pass
-                else:
-                    # The event didn't include a device ID, so we just look for
-                    # keys across all devices.
-                    current_keys = [
-                        key
-                        for device in cached_devices.values()
-                        for key in device.get("keys", {}).get("keys", {}).values()
-                    ]
-
-                # We now check that the sender key matches (one of) the expected
-                # keys.
-                if sender_key not in current_keys:
-                    logger.info(
-                        "Received event from remote device with unexpected sender key: %s %s: %s",
-                        event.sender,
-                        device_id or "<no device_id>",
-                        sender_key,
-                    )
-                    resync = True
-
-            if resync:
-                run_as_background_process(
-                    "resync_device_due_to_pdu", self._resync_device, event.sender
-                )
-
-        await self._handle_marker_event(origin, event)
-
-    async def _handle_marker_event(self, origin: str, marker_event: EventBase):
-        """Handles backfilling the insertion event when we receive a marker
-        event that points to one.
-
-        Args:
-            origin: Origin of the event. Will be called to get the insertion event
-            marker_event: The event to process
-        """
-
-        if marker_event.type != EventTypes.MSC2716_MARKER:
-            # Not a marker event
-            return
-
-        if marker_event.rejected_reason is not None:
-            # Rejected event
-            return
-
-        # Skip processing a marker event if the room version doesn't
-        # support it.
-        room_version = await self.store.get_room_version(marker_event.room_id)
-        if not room_version.msc2716_historical:
-            return
-
-        logger.debug("_handle_marker_event: received %s", marker_event)
-
-        insertion_event_id = marker_event.content.get(
-            EventContentFields.MSC2716_MARKER_INSERTION
-        )
-
-        if insertion_event_id is None:
-            # Nothing to retrieve then (invalid marker)
-            return
-
-        logger.debug(
-            "_handle_marker_event: backfilling insertion event %s", insertion_event_id
-        )
-
-        await self._get_events_and_persist(
-            origin,
-            marker_event.room_id,
-            [insertion_event_id],
-        )
-
-        insertion_event = await self.store.get_event(
-            insertion_event_id, allow_none=True
-        )
-        if insertion_event is None:
-            logger.warning(
-                "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
-                origin,
-                insertion_event_id,
-                marker_event.event_id,
-            )
-            return
-
-        logger.debug(
-            "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
-            insertion_event,
-            marker_event,
-        )
-
-        await self.store.insert_insertion_extremity(
-            insertion_event_id, marker_event.room_id
-        )
-
-        logger.debug(
-            "_handle_marker_event: insertion extremity added for %s from marker event %s",
-            insertion_event,
-            marker_event,
-        )
-
-    async def _resync_device(self, sender: str) -> None:
-        """We have detected that the device list for the given user may be out
-        of sync, so we try and resync them.
-        """
-
-        try:
-            await self.store.mark_remote_user_device_cache_as_stale(sender)
-
-            # Immediately attempt a resync in the background
-            if self.config.worker_app:
-                await self._user_device_resync(user_id=sender)
-            else:
-                await self._device_list_updater.user_device_resync(sender)
-        except Exception:
-            logger.exception("Failed to resync device for %s", sender)
-
-    @log_function
-    async def backfill(
-        self, dest: str, room_id: str, limit: int, extremities: List[str]
-    ) -> List[EventBase]:
-        """Trigger a backfill request to `dest` for the given `room_id`
-
-        This will attempt to get more events from the remote. If the other side
-        has no new events to offer, this will return an empty list.
-
-        As the events are received, we check their signatures, and also do some
-        sanity-checking on them. If any of the backfilled events are invalid,
-        this method throws a SynapseError.
-
-        TODO: make this more useful to distinguish failures of the remote
-        server from invalid events (there is probably no point in trying to
-        re-fetch invalid events from every other HS in the room.)
-        """
-        if dest == self.server_name:
-            raise SynapseError(400, "Can't backfill from self.")
-
-        events = await self.federation_client.backfill(
-            dest, room_id, limit=limit, extremities=extremities
-        )
-
-        if not events:
-            return []
-
-        # ideally we'd sanity check the events here for excess prev_events etc,
-        # but it's hard to reject events at this point without completely
-        # breaking backfill in the same way that it is currently broken by
-        # events whose signature we cannot verify (#3121).
-        #
-        # So for now we accept the events anyway. #3124 tracks this.
-        #
-        # for ev in events:
-        #     self._sanity_check_event(ev)
-
-        # Don't bother processing events we already have.
-        seen_events = await self.store.have_events_in_timeline(
-            {e.event_id for e in events}
-        )
-
-        events = [e for e in events if e.event_id not in seen_events]
-
-        if not events:
-            return []
-
-        event_map = {e.event_id: e for e in events}
-
-        event_ids = {e.event_id for e in events}
-
-        # build a list of events whose prev_events weren't in the batch.
-        # (XXX: this will include events whose prev_events we already have; that doesn't
-        # sound right?)
-        edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids]
-
-        logger.info("backfill: Got %d events with %d edges", len(events), len(edges))
-
-        # For each edge get the current state.
-
-        state_events = {}
-        events_to_state = {}
-        for e_id in edges:
-            state = await self._get_state_for_room(
-                destination=dest,
-                room_id=room_id,
-                event_id=e_id,
-            )
-            state_events.update({s.event_id: s for s in state})
-            events_to_state[e_id] = state
-
-        required_auth = {
-            a_id
-            for event in events + list(state_events.values())
-            for a_id in event.auth_event_ids()
-        }
-        auth_events = await self.store.get_events(required_auth, allow_rejected=True)
-        auth_events.update(
-            {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
-        )
-
-        ev_infos = []
-
-        # Step 1: persist the events in the chunk we fetched state for (i.e.
-        # the backwards extremities), with custom auth events and state
-        for e_id in events_to_state:
-            # For paranoia we ensure that these events are marked as
-            # non-outliers
-            ev = event_map[e_id]
-            assert not ev.internal_metadata.is_outlier()
-
-            ev_infos.append(
-                _NewEventInfo(
-                    event=ev,
-                    state=events_to_state[e_id],
-                    claimed_auth_event_map={
-                        (
-                            auth_events[a_id].type,
-                            auth_events[a_id].state_key,
-                        ): auth_events[a_id]
-                        for a_id in ev.auth_event_ids()
-                        if a_id in auth_events
-                    },
-                )
-            )
-
-        if ev_infos:
-            await self._auth_and_persist_events(
-                dest, room_id, ev_infos, backfilled=True
-            )
-
-        # Step 2: Persist the rest of the events in the chunk one by one
-        events.sort(key=lambda e: e.depth)
-
-        for event in events:
-            if event in events_to_state:
-                continue
-
-            # For paranoia we ensure that these events are marked as
-            # non-outliers
-            assert not event.internal_metadata.is_outlier()
-
-            context = await self.state_handler.compute_event_context(event)
-
-            # We store these one at a time since each event depends on the
-            # previous to work out the state.
-            # TODO: We can probably do something more clever here.
-            await self._auth_and_persist_event(dest, event, context, backfilled=True)
-
-        return events
-
     async def maybe_backfill(
         self, room_id: str, current_depth: int, limit: int
     ) -> bool:
@@ -1326,14 +308,14 @@ class FederationHandler(BaseHandler):
             # TODO: Should we try multiple of these at a time?
             for dom in domains:
                 try:
-                    await self.backfill(
+                    await self._federation_event_handler.backfill(
                         dom, room_id, limit=100, extremities=extremities
                     )
                     # If this succeeded then we probably already have the
                     # appropriate stuff.
                     # TODO: We can probably do something more intelligent here.
                     return True
-                except SynapseError as e:
+                except (SynapseError, InvalidResponseError) as e:
                     logger.info("Failed to backfill from %s because %s", dom, e)
                     continue
                 except HttpResponseException as e:
@@ -1417,115 +399,6 @@ class FederationHandler(BaseHandler):
 
         return False
 
-    async def _get_events_and_persist(
-        self, destination: str, room_id: str, events: Iterable[str]
-    ) -> None:
-        """Fetch the given events from a server, and persist them as outliers.
-
-        This function *does not* recursively get missing auth events of the
-        newly fetched events. Callers must include in the `events` argument
-        any missing events from the auth chain.
-
-        Logs a warning if we can't find the given event.
-        """
-
-        room_version = await self.store.get_room_version(room_id)
-
-        event_map: Dict[str, EventBase] = {}
-
-        async def get_event(event_id: str):
-            with nested_logging_context(event_id):
-                try:
-                    event = await self.federation_client.get_pdu(
-                        [destination],
-                        event_id,
-                        room_version,
-                        outlier=True,
-                    )
-                    if event is None:
-                        logger.warning(
-                            "Server %s didn't return event %s",
-                            destination,
-                            event_id,
-                        )
-                        return
-
-                    event_map[event.event_id] = event
-
-                except Exception as e:
-                    logger.warning(
-                        "Error fetching missing state/auth event %s: %s %s",
-                        event_id,
-                        type(e),
-                        e,
-                    )
-
-        await concurrently_execute(get_event, events, 5)
-
-        # Make a map of auth events for each event. We do this after fetching
-        # all the events as some of the events' auth events will be in the list
-        # of requested events.
-
-        auth_events = [
-            aid
-            for event in event_map.values()
-            for aid in event.auth_event_ids()
-            if aid not in event_map
-        ]
-        persisted_events = await self.store.get_events(
-            auth_events,
-            allow_rejected=True,
-        )
-
-        event_infos = []
-        for event in event_map.values():
-            auth = {}
-            for auth_event_id in event.auth_event_ids():
-                ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
-                if ae:
-                    auth[(ae.type, ae.state_key)] = ae
-                else:
-                    logger.info("Missing auth event %s", auth_event_id)
-
-            event_infos.append(_NewEventInfo(event, None, auth))
-
-        if event_infos:
-            await self._auth_and_persist_events(
-                destination,
-                room_id,
-                event_infos,
-            )
-
-    def _sanity_check_event(self, ev: EventBase) -> None:
-        """
-        Do some early sanity checks of a received event
-
-        In particular, checks it doesn't have an excessive number of
-        prev_events or auth_events, which could cause a huge state resolution
-        or cascade of event fetches.
-
-        Args:
-            ev: event to be checked
-
-        Raises:
-            SynapseError if the event does not pass muster
-        """
-        if len(ev.prev_event_ids()) > 20:
-            logger.warning(
-                "Rejecting event %s which has %i prev_events",
-                ev.event_id,
-                len(ev.prev_event_ids()),
-            )
-            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
-
-        if len(ev.auth_event_ids()) > 10:
-            logger.warning(
-                "Rejecting event %s which has %i auth_events",
-                ev.event_id,
-                len(ev.auth_event_ids()),
-            )
-            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
-
     async def send_invite(self, target_host: str, event: EventBase) -> EventBase:
         """Sends the invite to the remote server for signing.
 
@@ -1591,9 +464,9 @@ class FederationHandler(BaseHandler):
         # This shouldn't happen, because the RoomMemberHandler has a
         # linearizer lock which only allows one operation per user per room
         # at a time - so this is just paranoia.
-        assert room_id not in self.room_queues
+        assert room_id not in self._federation_event_handler.room_queues
 
-        self.room_queues[room_id] = []
+        self._federation_event_handler.room_queues[room_id] = []
 
         await self._clean_room_for_join(room_id)
 
@@ -1667,8 +540,8 @@ class FederationHandler(BaseHandler):
             logger.debug("Finished joining %s to %s", joinee, room_id)
             return event.event_id, max_stream_id
         finally:
-            room_queue = self.room_queues[room_id]
-            del self.room_queues[room_id]
+            room_queue = self._federation_event_handler.room_queues[room_id]
+            del self._federation_event_handler.room_queues[room_id]
 
             # we don't need to wait for the queued events to be processed -
             # it's just a best-effort thing at this point. We do want to do
@@ -1744,7 +617,7 @@ class FederationHandler(BaseHandler):
         event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
 
         context = await self.state_handler.compute_event_context(event)
-        stream_id = await self.persist_events_and_notify(
+        stream_id = await self._federation_event_handler.persist_events_and_notify(
             event.room_id, [(event, context)]
         )
         return event.event_id, stream_id
@@ -1764,7 +637,7 @@ class FederationHandler(BaseHandler):
                     p,
                 )
                 with nested_logging_context(p.event_id):
-                    await self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+                    await self._federation_event_handler.on_receive_pdu(origin, p)
             except Exception as e:
                 logger.warning(
                     "Error handling queued PDU %s from %s: %s", p.event_id, origin, e
@@ -1857,7 +730,7 @@ class FederationHandler(BaseHandler):
             raise
 
         # Ensure the user can even join the room.
-        await self._check_join_restrictions(context, event)
+        await self._federation_event_handler.check_join_restrictions(context, event)
 
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
@@ -1934,7 +807,9 @@ class FederationHandler(BaseHandler):
         )
 
         context = await self.state_handler.compute_event_context(event)
-        await self.persist_events_and_notify(event.room_id, [(event, context)])
+        await self._federation_event_handler.persist_events_and_notify(
+            event.room_id, [(event, context)]
+        )
 
         return event
 
@@ -1961,7 +836,7 @@ class FederationHandler(BaseHandler):
         await self.federation_client.send_leave(host_list, event)
 
         context = await self.state_handler.compute_event_context(event)
-        stream_id = await self.persist_events_and_notify(
+        stream_id = await self._federation_event_handler.persist_events_and_notify(
             event.room_id, [(event, context)]
         )
 
@@ -2104,116 +979,6 @@ class FederationHandler(BaseHandler):
 
         return event
 
-    @log_function
-    async def on_send_membership_event(
-        self, origin: str, event: EventBase
-    ) -> Tuple[EventBase, EventContext]:
-        """
-        We have received a join/leave/knock event for a room via send_join/leave/knock.
-
-        Verify that event and send it into the room on the remote homeserver's behalf.
-
-        This is quite similar to on_receive_pdu, with the following principal
-        differences:
-          * only membership events are permitted (and only events with
-            sender==state_key -- ie, no kicks or bans)
-          * *We* send out the event on behalf of the remote server.
-          * We enforce the membership restrictions of restricted rooms.
-          * Rejected events result in an exception rather than being stored.
-
-        There are also other differences, however it is not clear if these are by
-        design or omission. In particular, we do not attempt to backfill any missing
-        prev_events.
-
-        Args:
-            origin: The homeserver of the remote (joining/invited/knocking) user.
-            event: The member event that has been signed by the remote homeserver.
-
-        Returns:
-            The event and context of the event after inserting it into the room graph.
-
-        Raises:
-            SynapseError if the event is not accepted into the room
-        """
-        logger.debug(
-            "on_send_membership_event: Got event: %s, signatures: %s",
-            event.event_id,
-            event.signatures,
-        )
-
-        if get_domain_from_id(event.sender) != origin:
-            logger.info(
-                "Got send_membership request for user %r from different origin %s",
-                event.sender,
-                origin,
-            )
-            raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
-
-        if event.sender != event.state_key:
-            raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON)
-
-        assert not event.internal_metadata.outlier
-
-        # Send this event on behalf of the other server.
-        #
-        # The remote server isn't a full participant in the room at this point, so
-        # may not have an up-to-date list of the other homeservers participating in
-        # the room, so we send it on their behalf.
-        event.internal_metadata.send_on_behalf_of = origin
-
-        context = await self.state_handler.compute_event_context(event)
-        context = await self._check_event_auth(origin, event, context)
-        if context.rejected:
-            raise SynapseError(
-                403, f"{event.membership} event was rejected", Codes.FORBIDDEN
-            )
-
-        # for joins, we need to check the restrictions of restricted rooms
-        if event.membership == Membership.JOIN:
-            await self._check_join_restrictions(context, event)
-
-        # for knock events, we run the third-party event rules. It's not entirely clear
-        # why we don't do this for other sorts of membership events.
-        if event.membership == Membership.KNOCK:
-            event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
-                event, context
-            )
-            if not event_allowed:
-                logger.info("Sending of knock %s forbidden by third-party rules", event)
-                raise SynapseError(
-                    403, "This event is not allowed in this context", Codes.FORBIDDEN
-                )
-
-        # all looks good, we can persist the event.
-        await self._run_push_actions_and_persist_event(event, context)
-        return event, context
-
-    async def _check_join_restrictions(
-        self, context: EventContext, event: EventBase
-    ) -> None:
-        """Check that restrictions in restricted join rules are matched
-
-        Called when we receive a join event via send_join.
-
-        Raises an auth error if the restrictions are not matched.
-        """
-        prev_state_ids = await context.get_prev_state_ids()
-
-        # Check if the user is already in the room or invited to the room.
-        user_id = event.state_key
-        prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
-        prev_member_event = None
-        if prev_member_event_id:
-            prev_member_event = await self.store.get_event(prev_member_event_id)
-
-        # Check if the member should be allowed access via membership in a space.
-        await self._event_auth_handler.check_restricted_join_rules(
-            prev_state_ids,
-            event.room_version,
-            user_id,
-            prev_member_event,
-        )
-
     async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
         """Returns the state at the event. i.e. not including said event."""
 
@@ -2314,131 +1079,6 @@ class FederationHandler(BaseHandler):
         else:
             return None
 
-    async def get_min_depth_for_context(self, context: str) -> int:
-        return await self.store.get_min_depth(context)
-
-    async def _auth_and_persist_event(
-        self,
-        origin: str,
-        event: EventBase,
-        context: EventContext,
-        state: Optional[Iterable[EventBase]] = None,
-        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
-        backfilled: bool = False,
-    ) -> None:
-        """
-        Process an event by performing auth checks and then persisting to the database.
-
-        Args:
-            origin: The host the event originates from.
-            event: The event itself.
-            context:
-                The event context.
-
-            state:
-                The state events used to check the event for soft-fail. If this is
-                not provided the current state events will be used.
-
-            claimed_auth_event_map:
-                A map of (type, state_key) => event for the event's claimed auth_events.
-                Possibly incomplete, and possibly including events that are not yet
-                persisted, or authed, or in the right room.
-
-                Only populated where we may not already have persisted these events -
-                for example, when populating outliers.
-
-            backfilled: True if the event was backfilled.
-        """
-        context = await self._check_event_auth(
-            origin,
-            event,
-            context,
-            state=state,
-            claimed_auth_event_map=claimed_auth_event_map,
-            backfilled=backfilled,
-        )
-
-        await self._run_push_actions_and_persist_event(event, context, backfilled)
-
-    async def _run_push_actions_and_persist_event(
-        self, event: EventBase, context: EventContext, backfilled: bool = False
-    ):
-        """Run the push actions for a received event, and persist it.
-
-        Args:
-            event: The event itself.
-            context: The event context.
-            backfilled: True if the event was backfilled.
-        """
-        try:
-            if (
-                not event.internal_metadata.is_outlier()
-                and not backfilled
-                and not context.rejected
-            ):
-                await self.action_generator.handle_push_actions_for_event(
-                    event, context
-                )
-
-            await self.persist_events_and_notify(
-                event.room_id, [(event, context)], backfilled=backfilled
-            )
-        except Exception:
-            run_in_background(
-                self.store.remove_push_actions_from_staging, event.event_id
-            )
-            raise
-
-    async def _auth_and_persist_events(
-        self,
-        origin: str,
-        room_id: str,
-        event_infos: Collection[_NewEventInfo],
-        backfilled: bool = False,
-    ) -> None:
-        """Creates the appropriate contexts and persists events. The events
-        should not depend on one another, e.g. this should be used to persist
-        a bunch of outliers, but not a chunk of individual events that depend
-        on each other for state calculations.
-
-        Notifies about the events where appropriate.
-        """
-
-        if not event_infos:
-            return
-
-        async def prep(ev_info: _NewEventInfo):
-            event = ev_info.event
-            with nested_logging_context(suffix=event.event_id):
-                res = await self.state_handler.compute_event_context(
-                    event, old_state=ev_info.state
-                )
-                res = await self._check_event_auth(
-                    origin,
-                    event,
-                    res,
-                    state=ev_info.state,
-                    claimed_auth_event_map=ev_info.claimed_auth_event_map,
-                    backfilled=backfilled,
-                )
-            return res
-
-        contexts = await make_deferred_yieldable(
-            defer.gatherResults(
-                [run_in_background(prep, ev_info) for ev_info in event_infos],
-                consumeErrors=True,
-            )
-        )
-
-        await self.persist_events_and_notify(
-            room_id,
-            [
-                (ev_info.event, context)
-                for ev_info, context in zip(event_infos, contexts)
-            ],
-            backfilled=backfilled,
-        )
-
     async def _persist_auth_tree(
         self,
         origin: str,
@@ -2536,7 +1176,7 @@ class FederationHandler(BaseHandler):
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
         if auth_events or state:
-            await self.persist_events_and_notify(
+            await self._federation_event_handler.persist_events_and_notify(
                 room_id,
                 [
                     (e, events_to_context[e.event_id])
@@ -2548,108 +1188,10 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        return await self.persist_events_and_notify(
+        return await self._federation_event_handler.persist_events_and_notify(
             room_id, [(event, new_event_context)]
         )
 
-    async def _check_for_soft_fail(
-        self,
-        event: EventBase,
-        state: Optional[Iterable[EventBase]],
-        backfilled: bool,
-        origin: str,
-    ) -> None:
-        """Checks if we should soft fail the event; if so, marks the event as
-        such.
-
-        Args:
-            event
-            state: The state at the event if we don't have all the event's prev events
-            backfilled: Whether the event is from backfill
-            origin: The host the event originates from.
-        """
-        # For new (non-backfilled and non-outlier) events we check if the event
-        # passes auth based on the current state. If it doesn't then we
-        # "soft-fail" the event.
-        if backfilled or event.internal_metadata.is_outlier():
-            return
-
-        extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id)
-        extrem_ids = set(extrem_ids_list)
-        prev_event_ids = set(event.prev_event_ids())
-
-        if extrem_ids == prev_event_ids:
-            # If they're the same then the current state is the same as the
-            # state at the event, so no point rechecking auth for soft fail.
-            return
-
-        room_version = await self.store.get_room_version_id(event.room_id)
-        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-        # Calculate the "current state".
-        if state is not None:
-            # If we're explicitly given the state then we won't have all the
-            # prev events, and so we have a gap in the graph. In this case
-            # we want to be a little careful as we might have been down for
-            # a while and have an incorrect view of the current state,
-            # however we still want to do checks as gaps are easy to
-            # maliciously manufacture.
-            #
-            # So we use a "current state" that is actually a state
-            # resolution across the current forward extremities and the
-            # given state at the event. This should correctly handle cases
-            # like bans, especially with state res v2.
-
-            state_sets_d = await self.state_store.get_state_groups(
-                event.room_id, extrem_ids
-            )
-            state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
-            state_sets.append(state)
-            current_states = await self.state_handler.resolve_events(
-                room_version, state_sets, event
-            )
-            current_state_ids: StateMap[str] = {
-                k: e.event_id for k, e in current_states.items()
-            }
-        else:
-            current_state_ids = await self.state_handler.get_current_state_ids(
-                event.room_id, latest_event_ids=extrem_ids
-            )
-
-        logger.debug(
-            "Doing soft-fail check for %s: state %s",
-            event.event_id,
-            current_state_ids,
-        )
-
-        # Now check if event pass auth against said current state
-        auth_types = auth_types_for_event(room_version_obj, event)
-        current_state_ids_list = [
-            e for k, e in current_state_ids.items() if k in auth_types
-        ]
-
-        auth_events_map = await self.store.get_events(current_state_ids_list)
-        current_auth_events = {
-            (e.type, e.state_key): e for e in auth_events_map.values()
-        }
-
-        try:
-            event_auth.check(room_version_obj, event, auth_events=current_auth_events)
-        except AuthError as e:
-            logger.warning(
-                "Soft-failing %r (from %s) because %s",
-                event,
-                e,
-                origin,
-                extra={
-                    "room_id": event.room_id,
-                    "mxid": event.sender,
-                    "hs": origin,
-                },
-            )
-            soft_failed_event_counter.inc()
-            event.internal_metadata.soft_failed = True
-
     async def on_get_missing_events(
         self,
         origin: str,
@@ -2678,334 +1220,6 @@ class FederationHandler(BaseHandler):
 
         return missing_events
 
-    async def _check_event_auth(
-        self,
-        origin: str,
-        event: EventBase,
-        context: EventContext,
-        state: Optional[Iterable[EventBase]] = None,
-        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
-        backfilled: bool = False,
-    ) -> EventContext:
-        """
-        Checks whether an event should be rejected (for failing auth checks).
-
-        Args:
-            origin: The host the event originates from.
-            event: The event itself.
-            context:
-                The event context.
-
-            state:
-                The state events used to check the event for soft-fail. If this is
-                not provided the current state events will be used.
-
-            claimed_auth_event_map:
-                A map of (type, state_key) => event for the event's claimed auth_events.
-                Possibly incomplete, and possibly including events that are not yet
-                persisted, or authed, or in the right room.
-
-                Only populated where we may not already have persisted these events -
-                for example, when populating outliers, or the state for a backwards
-                extremity.
-
-            backfilled: True if the event was backfilled.
-
-        Returns:
-            The updated context object.
-        """
-        room_version = await self.store.get_room_version_id(event.room_id)
-        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-        if claimed_auth_event_map:
-            # if we have a copy of the auth events from the event, use that as the
-            # basis for auth.
-            auth_events = claimed_auth_event_map
-        else:
-            # otherwise, we calculate what the auth events *should* be, and use that
-            prev_state_ids = await context.get_prev_state_ids()
-            auth_events_ids = self._event_auth_handler.compute_auth_events(
-                event, prev_state_ids, for_verification=True
-            )
-            auth_events_x = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
-
-        try:
-            (
-                context,
-                auth_events_for_auth,
-            ) = await self._update_auth_events_and_context_for_auth(
-                origin, event, context, auth_events
-            )
-        except Exception:
-            # We don't really mind if the above fails, so lets not fail
-            # processing if it does. However, it really shouldn't fail so
-            # let's still log as an exception since we'll still want to fix
-            # any bugs.
-            logger.exception(
-                "Failed to double check auth events for %s with remote. "
-                "Ignoring failure and continuing processing of event.",
-                event.event_id,
-            )
-            auth_events_for_auth = auth_events
-
-        try:
-            event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
-        except AuthError as e:
-            logger.warning("Failed auth resolution for %r because %s", event, e)
-            context.rejected = RejectedReason.AUTH_ERROR
-
-        if not context.rejected:
-            await self._check_for_soft_fail(event, state, backfilled, origin=origin)
-
-        if event.type == EventTypes.GuestAccess and not context.rejected:
-            await self.maybe_kick_guest_users(event)
-
-        # If we are going to send this event over federation we precaclculate
-        # the joined hosts.
-        if event.internal_metadata.get_send_on_behalf_of():
-            await self.event_creation_handler.cache_joined_hosts_for_event(
-                event, context
-            )
-
-        return context
-
-    async def _update_auth_events_and_context_for_auth(
-        self,
-        origin: str,
-        event: EventBase,
-        context: EventContext,
-        input_auth_events: StateMap[EventBase],
-    ) -> Tuple[EventContext, StateMap[EventBase]]:
-        """Helper for _check_event_auth. See there for docs.
-
-        Checks whether a given event has the expected auth events. If it
-        doesn't then we talk to the remote server to compare state to see if
-        we can come to a consensus (e.g. if one server missed some valid
-        state).
-
-        This attempts to resolve any potential divergence of state between
-        servers, but is not essential and so failures should not block further
-        processing of the event.
-
-        Args:
-            origin:
-            event:
-            context:
-
-            input_auth_events:
-                Map from (event_type, state_key) to event
-
-                Normally, our calculated auth_events based on the state of the room
-                at the event's position in the DAG, though occasionally (eg if the
-                event is an outlier), may be the auth events claimed by the remote
-                server.
-
-        Returns:
-            updated context, updated auth event map
-        """
-        # take a copy of input_auth_events before we modify it.
-        auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
-
-        event_auth_events = set(event.auth_event_ids())
-
-        # missing_auth is the set of the event's auth_events which we don't yet have
-        # in auth_events.
-        missing_auth = event_auth_events.difference(
-            e.event_id for e in auth_events.values()
-        )
-
-        # if we have missing events, we need to fetch those events from somewhere.
-        #
-        # we start by checking if they are in the store, and then try calling /event_auth/.
-        if missing_auth:
-            have_events = await self.store.have_seen_events(event.room_id, missing_auth)
-            logger.debug("Events %s are in the store", have_events)
-            missing_auth.difference_update(have_events)
-
-        if missing_auth:
-            # If we don't have all the auth events, we need to get them.
-            logger.info("auth_events contains unknown events: %s", missing_auth)
-            try:
-                try:
-                    remote_auth_chain = await self.federation_client.get_event_auth(
-                        origin, event.room_id, event.event_id
-                    )
-                except RequestSendFailed as e1:
-                    # The other side isn't around or doesn't implement the
-                    # endpoint, so lets just bail out.
-                    logger.info("Failed to get event auth from remote: %s", e1)
-                    return context, auth_events
-
-                seen_remotes = await self.store.have_seen_events(
-                    event.room_id, [e.event_id for e in remote_auth_chain]
-                )
-
-                for e in remote_auth_chain:
-                    if e.event_id in seen_remotes:
-                        continue
-
-                    if e.event_id == event.event_id:
-                        continue
-
-                    try:
-                        auth_ids = e.auth_event_ids()
-                        auth = {
-                            (e.type, e.state_key): e
-                            for e in remote_auth_chain
-                            if e.event_id in auth_ids or e.type == EventTypes.Create
-                        }
-                        e.internal_metadata.outlier = True
-
-                        logger.debug(
-                            "_check_event_auth %s missing_auth: %s",
-                            event.event_id,
-                            e.event_id,
-                        )
-                        missing_auth_event_context = (
-                            await self.state_handler.compute_event_context(e)
-                        )
-                        await self._auth_and_persist_event(
-                            origin,
-                            e,
-                            missing_auth_event_context,
-                            claimed_auth_event_map=auth,
-                        )
-
-                        if e.event_id in event_auth_events:
-                            auth_events[(e.type, e.state_key)] = e
-                    except AuthError:
-                        pass
-
-            except Exception:
-                logger.exception("Failed to get auth chain")
-
-        if event.internal_metadata.is_outlier():
-            # XXX: given that, for an outlier, we'll be working with the
-            # event's *claimed* auth events rather than those we calculated:
-            # (a) is there any point in this test, since different_auth below will
-            # obviously be empty
-            # (b) alternatively, why don't we do it earlier?
-            logger.info("Skipping auth_event fetch for outlier")
-            return context, auth_events
-
-        different_auth = event_auth_events.difference(
-            e.event_id for e in auth_events.values()
-        )
-
-        if not different_auth:
-            return context, auth_events
-
-        logger.info(
-            "auth_events refers to events which are not in our calculated auth "
-            "chain: %s",
-            different_auth,
-        )
-
-        # XXX: currently this checks for redactions but I'm not convinced that is
-        # necessary?
-        different_events = await self.store.get_events_as_list(different_auth)
-
-        for d in different_events:
-            if d.room_id != event.room_id:
-                logger.warning(
-                    "Event %s refers to auth_event %s which is in a different room",
-                    event.event_id,
-                    d.event_id,
-                )
-
-                # don't attempt to resolve the claimed auth events against our own
-                # in this case: just use our own auth events.
-                #
-                # XXX: should we reject the event in this case? It feels like we should,
-                # but then shouldn't we also do so if we've failed to fetch any of the
-                # auth events?
-                return context, auth_events
-
-        # now we state-resolve between our own idea of the auth events, and the remote's
-        # idea of them.
-
-        local_state = auth_events.values()
-        remote_auth_events = dict(auth_events)
-        remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
-        remote_state = remote_auth_events.values()
-
-        room_version = await self.store.get_room_version_id(event.room_id)
-        new_state = await self.state_handler.resolve_events(
-            room_version, (local_state, remote_state), event
-        )
-
-        logger.info(
-            "After state res: updating auth_events with new state %s",
-            {
-                (d.type, d.state_key): d.event_id
-                for d in new_state.values()
-                if auth_events.get((d.type, d.state_key)) != d
-            },
-        )
-
-        auth_events.update(new_state)
-
-        context = await self._update_context_for_auth_events(
-            event, context, auth_events
-        )
-
-        return context, auth_events
-
-    async def _update_context_for_auth_events(
-        self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
-    ) -> EventContext:
-        """Update the state_ids in an event context after auth event resolution,
-        storing the changes as a new state group.
-
-        Args:
-            event: The event we're handling the context for
-
-            context: initial event context
-
-            auth_events: Events to update in the event context.
-
-        Returns:
-            new event context
-        """
-        # exclude the state key of the new event from the current_state in the context.
-        if event.is_state():
-            event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
-        else:
-            event_key = None
-        state_updates = {
-            k: a.event_id for k, a in auth_events.items() if k != event_key
-        }
-
-        current_state_ids = await context.get_current_state_ids()
-        current_state_ids = dict(current_state_ids)  # type: ignore
-
-        current_state_ids.update(state_updates)
-
-        prev_state_ids = await context.get_prev_state_ids()
-        prev_state_ids = dict(prev_state_ids)
-
-        prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
-
-        # create a new state group as a delta from the existing one.
-        prev_group = context.state_group
-        state_group = await self.state_store.store_state_group(
-            event.event_id,
-            event.room_id,
-            prev_group=prev_group,
-            delta_ids=state_updates,
-            current_state_ids=current_state_ids,
-        )
-
-        return EventContext.with_state(
-            state_group=state_group,
-            state_group_before_event=context.state_group_before_event,
-            current_state_ids=current_state_ids,
-            prev_state_ids=prev_state_ids,
-            prev_group=prev_group,
-            delta_ids=state_updates,
-        )
-
     async def construct_auth_difference(
         self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase]
     ) -> Dict:
@@ -3392,99 +1606,6 @@ class FederationHandler(BaseHandler):
         if "valid" not in response or not response["valid"]:
             raise AuthError(403, "Third party certificate was invalid")
 
-    async def persist_events_and_notify(
-        self,
-        room_id: str,
-        event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
-        backfilled: bool = False,
-    ) -> int:
-        """Persists events and tells the notifier/pushers about them, if
-        necessary.
-
-        Args:
-            room_id: The room ID of events being persisted.
-            event_and_contexts: Sequence of events with their associated
-                context that should be persisted. All events must belong to
-                the same room.
-            backfilled: Whether these events are a result of
-                backfilling or not
-
-        Returns:
-            The stream ID after which all events have been persisted.
-        """
-        if not event_and_contexts:
-            return self.store.get_current_events_token()
-
-        instance = self.config.worker.events_shard_config.get_instance(room_id)
-        if instance != self._instance_name:
-            # Limit the number of events sent over replication. We choose 200
-            # here as that is what we default to in `max_request_body_size(..)`
-            for batch in batch_iter(event_and_contexts, 200):
-                result = await self._send_events(
-                    instance_name=instance,
-                    store=self.store,
-                    room_id=room_id,
-                    event_and_contexts=batch,
-                    backfilled=backfilled,
-                )
-            return result["max_stream_id"]
-        else:
-            assert self.storage.persistence
-
-            # Note that this returns the events that were persisted, which may not be
-            # the same as were passed in if some were deduplicated due to transaction IDs.
-            events, max_stream_token = await self.storage.persistence.persist_events(
-                event_and_contexts, backfilled=backfilled
-            )
-
-            if self._ephemeral_messages_enabled:
-                for event in events:
-                    # If there's an expiry timestamp on the event, schedule its expiry.
-                    self._message_handler.maybe_schedule_expiry(event)
-
-            if not backfilled:  # Never notify for backfilled events
-                for event in events:
-                    await self._notify_persisted_event(event, max_stream_token)
-
-            return max_stream_token.stream
-
-    async def _notify_persisted_event(
-        self, event: EventBase, max_stream_token: RoomStreamToken
-    ) -> None:
-        """Checks to see if notifier/pushers should be notified about the
-        event or not.
-
-        Args:
-            event:
-            max_stream_id: The max_stream_id returned by persist_events
-        """
-
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-
-            # We notify for memberships if its an invite for one of our
-            # users
-            if event.internal_metadata.is_outlier():
-                if event.membership != Membership.INVITE:
-                    if not self.is_mine_id(target_user_id):
-                        return
-
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-        elif event.internal_metadata.is_outlier():
-            return
-
-        # the event has been persisted so it should have a stream ordering.
-        assert event.internal_metadata.stream_ordering
-
-        event_pos = PersistedEventPosition(
-            self._instance_name, event.internal_metadata.stream_ordering
-        )
-        self.notifier.on_new_room_event(
-            event, event_pos, max_stream_token, extra_users=extra_users
-        )
-
     async def _clean_room_for_join(self, room_id: str) -> None:
         """Called to clean up any data in DB for a given room, ready for the
         server to join the room.
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
new file mode 100644
index 0000000000..9f055f00cf
--- /dev/null
+++ b/synapse/handlers/federation_event.py
@@ -0,0 +1,1825 @@
+# Copyright 2021 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from http import HTTPStatus
+from typing import (
+    TYPE_CHECKING,
+    Collection,
+    Container,
+    Dict,
+    Iterable,
+    List,
+    Optional,
+    Sequence,
+    Set,
+    Tuple,
+)
+
+import attr
+from prometheus_client import Counter
+
+from twisted.internet import defer
+
+from synapse import event_auth
+from synapse.api.constants import (
+    EventContentFields,
+    EventTypes,
+    Membership,
+    RejectedReason,
+    RoomEncryptionAlgorithms,
+)
+from synapse.api.errors import (
+    AuthError,
+    Codes,
+    FederationError,
+    HttpResponseException,
+    RequestSendFailed,
+    SynapseError,
+)
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.event_auth import auth_types_for_event
+from synapse.events import EventBase
+from synapse.events.snapshot import EventContext
+from synapse.federation.federation_client import InvalidResponseError
+from synapse.handlers._base import BaseHandler
+from synapse.logging.context import (
+    make_deferred_yieldable,
+    nested_logging_context,
+    run_in_background,
+)
+from synapse.logging.utils import log_function
+from synapse.metrics.background_process_metrics import run_as_background_process
+from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
+from synapse.replication.http.federation import (
+    ReplicationFederationSendEventsRestServlet,
+)
+from synapse.state import StateResolutionStore
+from synapse.storage.databases.main.events_worker import EventRedactBehaviour
+from synapse.types import (
+    MutableStateMap,
+    PersistedEventPosition,
+    RoomStreamToken,
+    StateMap,
+    UserID,
+    get_domain_from_id,
+)
+from synapse.util.async_helpers import Linearizer, concurrently_execute
+from synapse.util.iterutils import batch_iter
+from synapse.util.retryutils import NotRetryingDestination
+from synapse.util.stringutils import shortstr
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+
+
+logger = logging.getLogger(__name__)
+
+soft_failed_event_counter = Counter(
+    "synapse_federation_soft_failed_events_total",
+    "Events received over federation that we marked as soft_failed",
+)
+
+
+@attr.s(slots=True, frozen=True, auto_attribs=True)
+class _NewEventInfo:
+    """Holds information about a received event, ready for passing to _auth_and_persist_events
+
+    Attributes:
+        event: the received event
+
+        claimed_auth_event_map: a map of (type, state_key) => event for the event's
+            claimed auth_events.
+
+            This can include events which have not yet been persisted, in the case that
+            we are backfilling a batch of events.
+
+            Note: May be incomplete: if we were unable to find all of the claimed auth
+            events. Also, treat the contents with caution: the events might also have
+            been rejected, might not yet have been authorized themselves, or they might
+            be in the wrong room.
+
+    """
+
+    event: EventBase
+    claimed_auth_event_map: StateMap[EventBase]
+
+
+class FederationEventHandler(BaseHandler):
+    """Handles events that originated from federation.
+
+    Responsible for handing incoming events and passing them on to the rest
+    of the homeserver (including auth and state conflict resolutions)
+    """
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self.store = hs.get_datastore()
+        self.storage = hs.get_storage()
+        self.state_store = self.storage.state
+
+        self.state_handler = hs.get_state_handler()
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self._event_auth_handler = hs.get_event_auth_handler()
+        self._message_handler = hs.get_message_handler()
+        self.action_generator = hs.get_action_generator()
+        self._state_resolution_handler = hs.get_state_resolution_handler()
+
+        self.federation_client = hs.get_federation_client()
+        self.third_party_event_rules = hs.get_third_party_event_rules()
+
+        self.is_mine_id = hs.is_mine_id
+        self._instance_name = hs.get_instance_name()
+
+        self.config = hs.config
+        self._ephemeral_messages_enabled = hs.config.server.enable_ephemeral_messages
+
+        self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
+        if hs.config.worker_app:
+            self._user_device_resync = (
+                ReplicationUserDevicesResyncRestServlet.make_client(hs)
+            )
+        else:
+            self._device_list_updater = hs.get_device_handler().device_list_updater
+
+        # When joining a room we need to queue any events for that room up.
+        # For each room, a list of (pdu, origin) tuples.
+        # TODO: replace this with something more elegant, probably based around the
+        # federation event staging area.
+        self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {}
+
+        self._room_pdu_linearizer = Linearizer("fed_room_pdu")
+
+    async def on_receive_pdu(self, origin: str, pdu: EventBase) -> None:
+        """Process a PDU received via a federation /send/ transaction
+
+        Args:
+            origin: server which initiated the /send/ transaction. Will
+                be used to fetch missing events or state.
+            pdu: received PDU
+        """
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        # We reprocess pdus when we have seen them only as outliers
+        existing = await self.store.get_event(
+            event_id, allow_none=True, allow_rejected=True
+        )
+
+        # FIXME: Currently we fetch an event again when we already have it
+        # if it has been marked as an outlier.
+        if existing:
+            if not existing.internal_metadata.is_outlier():
+                logger.info(
+                    "Ignoring received event %s which we have already seen", event_id
+                )
+                return
+            if pdu.internal_metadata.is_outlier():
+                logger.info(
+                    "Ignoring received outlier %s which we already have as an outlier",
+                    event_id,
+                )
+                return
+            logger.info("De-outliering event %s", event_id)
+
+        # do some initial sanity-checking of the event. In particular, make
+        # sure it doesn't have hundreds of prev_events or auth_events, which
+        # could cause a huge state resolution or cascade of event fetches.
+        try:
+            self._sanity_check_event(pdu)
+        except SynapseError as err:
+            logger.warning("Received event failed sanity checks")
+            raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
+
+        # If we are currently in the process of joining this room, then we
+        # queue up events for later processing.
+        if room_id in self.room_queues:
+            logger.info(
+                "Queuing PDU from %s for now: join in progress",
+                origin,
+            )
+            self.room_queues[room_id].append((pdu, origin))
+            return
+
+        # If we're not in the room just ditch the event entirely. This is
+        # probably an old server that has come back and thinks we're still in
+        # the room (or we've been rejoined to the room by a state reset).
+        #
+        # Note that if we were never in the room then we would have already
+        # dropped the event, since we wouldn't know the room version.
+        is_in_room = await self._event_auth_handler.check_host_in_room(
+            room_id, self.server_name
+        )
+        if not is_in_room:
+            logger.info(
+                "Ignoring PDU from %s as we're not in the room",
+                origin,
+            )
+            return None
+
+        # Check that the event passes auth based on the state at the event. This is
+        # done for events that are to be added to the timeline (non-outliers).
+        #
+        # Get missing pdus if necessary:
+        #  - Fetching any missing prev events to fill in gaps in the graph
+        #  - Fetching state if we have a hole in the graph
+        if not pdu.internal_metadata.is_outlier():
+            prevs = set(pdu.prev_event_ids())
+            seen = await self.store.have_events_in_timeline(prevs)
+            missing_prevs = prevs - seen
+
+            if missing_prevs:
+                # We only backfill backwards to the min depth.
+                min_depth = await self.get_min_depth_for_context(pdu.room_id)
+                logger.debug("min_depth: %d", min_depth)
+
+                if min_depth is not None and pdu.depth > min_depth:
+                    # If we're missing stuff, ensure we only fetch stuff one
+                    # at a time.
+                    logger.info(
+                        "Acquiring room lock to fetch %d missing prev_events: %s",
+                        len(missing_prevs),
+                        shortstr(missing_prevs),
+                    )
+                    with (await self._room_pdu_linearizer.queue(pdu.room_id)):
+                        logger.info(
+                            "Acquired room lock to fetch %d missing prev_events",
+                            len(missing_prevs),
+                        )
+
+                        try:
+                            await self._get_missing_events_for_pdu(
+                                origin, pdu, prevs, min_depth
+                            )
+                        except Exception as e:
+                            raise Exception(
+                                "Error fetching missing prev_events for %s: %s"
+                                % (event_id, e)
+                            ) from e
+
+                    # Update the set of things we've seen after trying to
+                    # fetch the missing stuff
+                    seen = await self.store.have_events_in_timeline(prevs)
+                    missing_prevs = prevs - seen
+
+                    if not missing_prevs:
+                        logger.info("Found all missing prev_events")
+
+                if missing_prevs:
+                    # since this event was pushed to us, it is possible for it to
+                    # become the only forward-extremity in the room, and we would then
+                    # trust its state to be the state for the whole room. This is very
+                    # bad. Further, if the event was pushed to us, there is no excuse
+                    # for us not to have all the prev_events. (XXX: apart from
+                    # min_depth?)
+                    #
+                    # We therefore reject any such events.
+                    logger.warning(
+                        "Rejecting: failed to fetch %d prev events: %s",
+                        len(missing_prevs),
+                        shortstr(missing_prevs),
+                    )
+                    raise FederationError(
+                        "ERROR",
+                        403,
+                        (
+                            "Your server isn't divulging details about prev_events "
+                            "referenced in this event."
+                        ),
+                        affected=pdu.event_id,
+                    )
+
+        await self._process_received_pdu(origin, pdu, state=None)
+
+    @log_function
+    async def on_send_membership_event(
+        self, origin: str, event: EventBase
+    ) -> Tuple[EventBase, EventContext]:
+        """
+        We have received a join/leave/knock event for a room via send_join/leave/knock.
+
+        Verify that event and send it into the room on the remote homeserver's behalf.
+
+        This is quite similar to on_receive_pdu, with the following principal
+        differences:
+          * only membership events are permitted (and only events with
+            sender==state_key -- ie, no kicks or bans)
+          * *We* send out the event on behalf of the remote server.
+          * We enforce the membership restrictions of restricted rooms.
+          * Rejected events result in an exception rather than being stored.
+
+        There are also other differences, however it is not clear if these are by
+        design or omission. In particular, we do not attempt to backfill any missing
+        prev_events.
+
+        Args:
+            origin: The homeserver of the remote (joining/invited/knocking) user.
+            event: The member event that has been signed by the remote homeserver.
+
+        Returns:
+            The event and context of the event after inserting it into the room graph.
+
+        Raises:
+            SynapseError if the event is not accepted into the room
+        """
+        logger.debug(
+            "on_send_membership_event: Got event: %s, signatures: %s",
+            event.event_id,
+            event.signatures,
+        )
+
+        if get_domain_from_id(event.sender) != origin:
+            logger.info(
+                "Got send_membership request for user %r from different origin %s",
+                event.sender,
+                origin,
+            )
+            raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
+
+        if event.sender != event.state_key:
+            raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON)
+
+        assert not event.internal_metadata.outlier
+
+        # Send this event on behalf of the other server.
+        #
+        # The remote server isn't a full participant in the room at this point, so
+        # may not have an up-to-date list of the other homeservers participating in
+        # the room, so we send it on their behalf.
+        event.internal_metadata.send_on_behalf_of = origin
+
+        context = await self.state_handler.compute_event_context(event)
+        context = await self._check_event_auth(origin, event, context)
+        if context.rejected:
+            raise SynapseError(
+                403, f"{event.membership} event was rejected", Codes.FORBIDDEN
+            )
+
+        # for joins, we need to check the restrictions of restricted rooms
+        if event.membership == Membership.JOIN:
+            await self.check_join_restrictions(context, event)
+
+        # for knock events, we run the third-party event rules. It's not entirely clear
+        # why we don't do this for other sorts of membership events.
+        if event.membership == Membership.KNOCK:
+            event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
+                event, context
+            )
+            if not event_allowed:
+                logger.info("Sending of knock %s forbidden by third-party rules", event)
+                raise SynapseError(
+                    403, "This event is not allowed in this context", Codes.FORBIDDEN
+                )
+
+        # all looks good, we can persist the event.
+        await self._run_push_actions_and_persist_event(event, context)
+        return event, context
+
+    async def check_join_restrictions(
+        self, context: EventContext, event: EventBase
+    ) -> None:
+        """Check that restrictions in restricted join rules are matched
+
+        Called when we receive a join event via send_join.
+
+        Raises an auth error if the restrictions are not matched.
+        """
+        prev_state_ids = await context.get_prev_state_ids()
+
+        # Check if the user is already in the room or invited to the room.
+        user_id = event.state_key
+        prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+        prev_member_event = None
+        if prev_member_event_id:
+            prev_member_event = await self.store.get_event(prev_member_event_id)
+
+        # Check if the member should be allowed access via membership in a space.
+        await self._event_auth_handler.check_restricted_join_rules(
+            prev_state_ids,
+            event.room_version,
+            user_id,
+            prev_member_event,
+        )
+
+    @log_function
+    async def backfill(
+        self, dest: str, room_id: str, limit: int, extremities: List[str]
+    ) -> None:
+        """Trigger a backfill request to `dest` for the given `room_id`
+
+        This will attempt to get more events from the remote. If the other side
+        has no new events to offer, this will return an empty list.
+
+        As the events are received, we check their signatures, and also do some
+        sanity-checking on them. If any of the backfilled events are invalid,
+        this method throws a SynapseError.
+
+        We might also raise an InvalidResponseError if the response from the remote
+        server is just bogus.
+
+        TODO: make this more useful to distinguish failures of the remote
+        server from invalid events (there is probably no point in trying to
+        re-fetch invalid events from every other HS in the room.)
+        """
+        if dest == self.server_name:
+            raise SynapseError(400, "Can't backfill from self.")
+
+        events = await self.federation_client.backfill(
+            dest, room_id, limit=limit, extremities=extremities
+        )
+
+        if not events:
+            return
+
+        # if there are any events in the wrong room, the remote server is buggy and
+        # should not be trusted.
+        for ev in events:
+            if ev.room_id != room_id:
+                raise InvalidResponseError(
+                    f"Remote server {dest} returned event {ev.event_id} which is in "
+                    f"room {ev.room_id}, when we were backfilling in {room_id}"
+                )
+
+        await self._process_pulled_events(dest, events, backfilled=True)
+
+    async def _get_missing_events_for_pdu(
+        self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
+    ) -> None:
+        """
+        Args:
+            origin: Origin of the pdu. Will be called to get the missing events
+            pdu: received pdu
+            prevs: List of event ids which we are missing
+            min_depth: Minimum depth of events to return.
+        """
+
+        room_id = pdu.room_id
+        event_id = pdu.event_id
+
+        seen = await self.store.have_events_in_timeline(prevs)
+
+        if not prevs - seen:
+            return
+
+        latest_list = await self.store.get_latest_event_ids_in_room(room_id)
+
+        # We add the prev events that we have seen to the latest
+        # list to ensure the remote server doesn't give them to us
+        latest = set(latest_list)
+        latest |= seen
+
+        logger.info(
+            "Requesting missing events between %s and %s",
+            shortstr(latest),
+            event_id,
+        )
+
+        # XXX: we set timeout to 10s to help workaround
+        # https://github.com/matrix-org/synapse/issues/1733.
+        # The reason is to avoid holding the linearizer lock
+        # whilst processing inbound /send transactions, causing
+        # FDs to stack up and block other inbound transactions
+        # which empirically can currently take up to 30 minutes.
+        #
+        # N.B. this explicitly disables retry attempts.
+        #
+        # N.B. this also increases our chances of falling back to
+        # fetching fresh state for the room if the missing event
+        # can't be found, which slightly reduces our security.
+        # it may also increase our DAG extremity count for the room,
+        # causing additional state resolution?  See #1760.
+        # However, fetching state doesn't hold the linearizer lock
+        # apparently.
+        #
+        # see https://github.com/matrix-org/synapse/pull/1744
+        #
+        # ----
+        #
+        # Update richvdh 2018/09/18: There are a number of problems with timing this
+        # request out aggressively on the client side:
+        #
+        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
+        #   if you send too many requests at once, so you end up with the server carefully
+        #   working through the backlog of your requests, which you have already timed
+        #   out.
+        #
+        # - for this request in particular, we now (as of
+        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
+        #   server can't produce a plausible-looking set of prev_events - so we becone
+        #   much more likely to reject the event.
+        #
+        # - contrary to what it says above, we do *not* fall back to fetching fresh state
+        #   for the room if get_missing_events times out. Rather, we give up processing
+        #   the PDU whose prevs we are missing, which then makes it much more likely that
+        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
+        #   problem.
+        #
+        # - the aggressive 10s timeout was introduced to deal with incoming federation
+        #   requests taking 8 hours to process. It's not entirely clear why that was going
+        #   on; certainly there were other issues causing traffic storms which are now
+        #   resolved, and I think in any case we may be more sensible about our locking
+        #   now. We're *certainly* more sensible about our logging.
+        #
+        # All that said: Let's try increasing the timeout to 60s and see what happens.
+
+        try:
+            missing_events = await self.federation_client.get_missing_events(
+                origin,
+                room_id,
+                earliest_events_ids=list(latest),
+                latest_events=[pdu],
+                limit=10,
+                min_depth=min_depth,
+                timeout=60000,
+            )
+        except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e:
+            # We failed to get the missing events, but since we need to handle
+            # the case of `get_missing_events` not returning the necessary
+            # events anyway, it is safe to simply log the error and continue.
+            logger.warning("Failed to get prev_events: %s", e)
+            return
+
+        logger.info("Got %d prev_events", len(missing_events))
+        await self._process_pulled_events(origin, missing_events, backfilled=False)
+
+    async def _process_pulled_events(
+        self, origin: str, events: Iterable[EventBase], backfilled: bool
+    ) -> None:
+        """Process a batch of events we have pulled from a remote server
+
+        Pulls in any events required to auth the events, persists the received events,
+        and notifies clients, if appropriate.
+
+        Assumes the events have already had their signatures and hashes checked.
+
+        Params:
+            origin: The server we received these events from
+            events: The received events.
+            backfilled: True if this is part of a historical batch of events (inhibits
+                notification to clients, and validation of device keys.)
+        """
+
+        # We want to sort these by depth so we process them and
+        # tell clients about them in order.
+        sorted_events = sorted(events, key=lambda x: x.depth)
+
+        for ev in sorted_events:
+            with nested_logging_context(ev.event_id):
+                await self._process_pulled_event(origin, ev, backfilled=backfilled)
+
+    async def _process_pulled_event(
+        self, origin: str, event: EventBase, backfilled: bool
+    ) -> None:
+        """Process a single event that we have pulled from a remote server
+
+        Pulls in any events required to auth the event, persists the received event,
+        and notifies clients, if appropriate.
+
+        Assumes the event has already had its signatures and hashes checked.
+
+        This is somewhat equivalent to on_receive_pdu, but applies somewhat different
+        logic in the case that we are missing prev_events (in particular, it just
+        requests the state at that point, rather than triggering a get_missing_events) -
+        so is appropriate when we have pulled the event from a remote server, rather
+        than having it pushed to us.
+
+        Params:
+            origin: The server we received this event from
+            events: The received event
+            backfilled: True if this is part of a historical batch of events (inhibits
+                notification to clients, and validation of device keys.)
+        """
+        logger.info("Processing pulled event %s", event)
+
+        # these should not be outliers.
+        assert not event.internal_metadata.is_outlier()
+
+        event_id = event.event_id
+
+        existing = await self.store.get_event(
+            event_id, allow_none=True, allow_rejected=True
+        )
+        if existing:
+            if not existing.internal_metadata.is_outlier():
+                logger.info(
+                    "Ignoring received event %s which we have already seen",
+                    event_id,
+                )
+                return
+            logger.info("De-outliering event %s", event_id)
+
+        try:
+            self._sanity_check_event(event)
+        except SynapseError as err:
+            logger.warning("Event %s failed sanity check: %s", event_id, err)
+            return
+
+        try:
+            state = await self._resolve_state_at_missing_prevs(origin, event)
+            await self._process_received_pdu(
+                origin, event, state=state, backfilled=backfilled
+            )
+        except FederationError as e:
+            if e.code == 403:
+                logger.warning("Pulled event %s failed history check.", event_id)
+            else:
+                raise
+
+    async def _resolve_state_at_missing_prevs(
+        self, dest: str, event: EventBase
+    ) -> Optional[Iterable[EventBase]]:
+        """Calculate the state at an event with missing prev_events.
+
+        This is used when we have pulled a batch of events from a remote server, and
+        still don't have all the prev_events.
+
+        If we already have all the prev_events for `event`, this method does nothing.
+
+        Otherwise, the missing prevs become new backwards extremities, and we fall back
+        to asking the remote server for the state after each missing `prev_event`,
+        and resolving across them.
+
+        That's ok provided we then resolve the state against other bits of the DAG
+        before using it - in other words, that the received event `event` is not going
+        to become the only forwards_extremity in the room (which will ensure that you
+        can't just take over a room by sending an event, withholding its prev_events,
+        and declaring yourself to be an admin in the subsequent state request).
+
+        In other words: we should only call this method if `event` has been *pulled*
+        as part of a batch of missing prev events, or similar.
+
+        Params:
+            dest: the remote server to ask for state at the missing prevs. Typically,
+                this will be the server we got `event` from.
+            event: an event to check for missing prevs.
+
+        Returns:
+            if we already had all the prev events, `None`. Otherwise, returns a list of
+            the events in the state at `event`.
+        """
+        room_id = event.room_id
+        event_id = event.event_id
+
+        prevs = set(event.prev_event_ids())
+        seen = await self.store.have_events_in_timeline(prevs)
+        missing_prevs = prevs - seen
+
+        if not missing_prevs:
+            return None
+
+        logger.info(
+            "Event %s is missing prev_events %s: calculating state for a "
+            "backwards extremity",
+            event_id,
+            shortstr(missing_prevs),
+        )
+        # Calculate the state after each of the previous events, and
+        # resolve them to find the correct state at the current event.
+        event_map = {event_id: event}
+        try:
+            # Get the state of the events we know about
+            ours = await self.state_store.get_state_groups_ids(room_id, seen)
+
+            # state_maps is a list of mappings from (type, state_key) to event_id
+            state_maps: List[StateMap[str]] = list(ours.values())
+
+            # we don't need this any more, let's delete it.
+            del ours
+
+            # Ask the remote server for the states we don't
+            # know about
+            for p in missing_prevs:
+                logger.info("Requesting state after missing prev_event %s", p)
+
+                with nested_logging_context(p):
+                    # note that if any of the missing prevs share missing state or
+                    # auth events, the requests to fetch those events are deduped
+                    # by the get_pdu_cache in federation_client.
+                    remote_state = await self._get_state_after_missing_prev_event(
+                        dest, room_id, p
+                    )
+
+                    remote_state_map = {
+                        (x.type, x.state_key): x.event_id for x in remote_state
+                    }
+                    state_maps.append(remote_state_map)
+
+                    for x in remote_state:
+                        event_map[x.event_id] = x
+
+            room_version = await self.store.get_room_version_id(room_id)
+            state_map = await self._state_resolution_handler.resolve_events_with_store(
+                room_id,
+                room_version,
+                state_maps,
+                event_map,
+                state_res_store=StateResolutionStore(self.store),
+            )
+
+            # We need to give _process_received_pdu the actual state events
+            # rather than event ids, so generate that now.
+
+            # First though we need to fetch all the events that are in
+            # state_map, so we can build up the state below.
+            evs = await self.store.get_events(
+                list(state_map.values()),
+                get_prev_content=False,
+                redact_behaviour=EventRedactBehaviour.AS_IS,
+            )
+            event_map.update(evs)
+
+            state = [event_map[e] for e in state_map.values()]
+        except Exception:
+            logger.warning(
+                "Error attempting to resolve state at missing prev_events",
+                exc_info=True,
+            )
+            raise FederationError(
+                "ERROR",
+                403,
+                "We can't get valid state history.",
+                affected=event_id,
+            )
+        return state
+
+    async def _get_state_after_missing_prev_event(
+        self,
+        destination: str,
+        room_id: str,
+        event_id: str,
+    ) -> List[EventBase]:
+        """Requests all of the room state at a given event from a remote homeserver.
+
+        Args:
+            destination: The remote homeserver to query for the state.
+            room_id: The id of the room we're interested in.
+            event_id: The id of the event we want the state at.
+
+        Returns:
+            A list of events in the state, including the event itself
+        """
+        (
+            state_event_ids,
+            auth_event_ids,
+        ) = await self.federation_client.get_room_state_ids(
+            destination, room_id, event_id=event_id
+        )
+
+        logger.debug(
+            "state_ids returned %i state events, %i auth events",
+            len(state_event_ids),
+            len(auth_event_ids),
+        )
+
+        # start by just trying to fetch the events from the store
+        desired_events = set(state_event_ids)
+        desired_events.add(event_id)
+        logger.debug("Fetching %i events from cache/store", len(desired_events))
+        fetched_events = await self.store.get_events(
+            desired_events, allow_rejected=True
+        )
+
+        missing_desired_events = desired_events - fetched_events.keys()
+        logger.debug(
+            "We are missing %i events (got %i)",
+            len(missing_desired_events),
+            len(fetched_events),
+        )
+
+        # We probably won't need most of the auth events, so let's just check which
+        # we have for now, rather than thrashing the event cache with them all
+        # unnecessarily.
+
+        # TODO: we probably won't actually need all of the auth events, since we
+        #   already have a bunch of the state events. It would be nice if the
+        #   federation api gave us a way of finding out which we actually need.
+
+        missing_auth_events = set(auth_event_ids) - fetched_events.keys()
+        missing_auth_events.difference_update(
+            await self.store.have_seen_events(room_id, missing_auth_events)
+        )
+        logger.debug("We are also missing %i auth events", len(missing_auth_events))
+
+        missing_events = missing_desired_events | missing_auth_events
+        logger.debug("Fetching %i events from remote", len(missing_events))
+        await self._get_events_and_persist(
+            destination=destination, room_id=room_id, events=missing_events
+        )
+
+        # we need to make sure we re-load from the database to get the rejected
+        # state correct.
+        fetched_events.update(
+            await self.store.get_events(missing_desired_events, allow_rejected=True)
+        )
+
+        # check for events which were in the wrong room.
+        #
+        # this can happen if a remote server claims that the state or
+        # auth_events at an event in room A are actually events in room B
+
+        bad_events = [
+            (event_id, event.room_id)
+            for event_id, event in fetched_events.items()
+            if event.room_id != room_id
+        ]
+
+        for bad_event_id, bad_room_id in bad_events:
+            # This is a bogus situation, but since we may only discover it a long time
+            # after it happened, we try our best to carry on, by just omitting the
+            # bad events from the returned state set.
+            logger.warning(
+                "Remote server %s claims event %s in room %s is an auth/state "
+                "event in room %s",
+                destination,
+                bad_event_id,
+                bad_room_id,
+                room_id,
+            )
+
+            del fetched_events[bad_event_id]
+
+        # if we couldn't get the prev event in question, that's a problem.
+        remote_event = fetched_events.get(event_id)
+        if not remote_event:
+            raise Exception("Unable to get missing prev_event %s" % (event_id,))
+
+        # missing state at that event is a warning, not a blocker
+        # XXX: this doesn't sound right? it means that we'll end up with incomplete
+        #   state.
+        failed_to_fetch = desired_events - fetched_events.keys()
+        if failed_to_fetch:
+            logger.warning(
+                "Failed to fetch missing state events for %s %s",
+                event_id,
+                failed_to_fetch,
+            )
+
+        remote_state = [
+            fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
+        ]
+
+        if remote_event.is_state() and remote_event.rejected_reason is None:
+            remote_state.append(remote_event)
+
+        return remote_state
+
+    async def _process_received_pdu(
+        self,
+        origin: str,
+        event: EventBase,
+        state: Optional[Iterable[EventBase]],
+        backfilled: bool = False,
+    ) -> None:
+        """Called when we have a new pdu. We need to do auth checks and put it
+        through the StateHandler.
+
+        Args:
+            origin: server sending the event
+
+            event: event to be persisted
+
+            state: Normally None, but if we are handling a gap in the graph
+                (ie, we are missing one or more prev_events), the resolved state at the
+                event
+
+            backfilled: True if this is part of a historical batch of events (inhibits
+                notification to clients, and validation of device keys.)
+        """
+        logger.debug("Processing event: %s", event)
+
+        try:
+            context = await self.state_handler.compute_event_context(
+                event, old_state=state
+            )
+            await self._auth_and_persist_event(
+                origin, event, context, state=state, backfilled=backfilled
+            )
+        except AuthError as e:
+            raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
+
+        if backfilled:
+            return
+
+        # For encrypted messages we check that we know about the sending device,
+        # if we don't then we mark the device cache for that user as stale.
+        if event.type == EventTypes.Encrypted:
+            device_id = event.content.get("device_id")
+            sender_key = event.content.get("sender_key")
+
+            cached_devices = await self.store.get_cached_devices_for_user(event.sender)
+
+            resync = False  # Whether we should resync device lists.
+
+            device = None
+            if device_id is not None:
+                device = cached_devices.get(device_id)
+                if device is None:
+                    logger.info(
+                        "Received event from remote device not in our cache: %s %s",
+                        event.sender,
+                        device_id,
+                    )
+                    resync = True
+
+            # We also check if the `sender_key` matches what we expect.
+            if sender_key is not None:
+                # Figure out what sender key we're expecting. If we know the
+                # device and recognize the algorithm then we can work out the
+                # exact key to expect. Otherwise check it matches any key we
+                # have for that device.
+
+                current_keys: Container[str] = []
+
+                if device:
+                    keys = device.get("keys", {}).get("keys", {})
+
+                    if (
+                        event.content.get("algorithm")
+                        == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
+                    ):
+                        # For this algorithm we expect a curve25519 key.
+                        key_name = "curve25519:%s" % (device_id,)
+                        current_keys = [keys.get(key_name)]
+                    else:
+                        # We don't know understand the algorithm, so we just
+                        # check it matches a key for the device.
+                        current_keys = keys.values()
+                elif device_id:
+                    # We don't have any keys for the device ID.
+                    pass
+                else:
+                    # The event didn't include a device ID, so we just look for
+                    # keys across all devices.
+                    current_keys = [
+                        key
+                        for device in cached_devices.values()
+                        for key in device.get("keys", {}).get("keys", {}).values()
+                    ]
+
+                # We now check that the sender key matches (one of) the expected
+                # keys.
+                if sender_key not in current_keys:
+                    logger.info(
+                        "Received event from remote device with unexpected sender key: %s %s: %s",
+                        event.sender,
+                        device_id or "<no device_id>",
+                        sender_key,
+                    )
+                    resync = True
+
+            if resync:
+                run_as_background_process(
+                    "resync_device_due_to_pdu",
+                    self._resync_device,
+                    event.sender,
+                )
+
+        await self._handle_marker_event(origin, event)
+
+    async def _resync_device(self, sender: str) -> None:
+        """We have detected that the device list for the given user may be out
+        of sync, so we try and resync them.
+        """
+
+        try:
+            await self.store.mark_remote_user_device_cache_as_stale(sender)
+
+            # Immediately attempt a resync in the background
+            if self.config.worker_app:
+                await self._user_device_resync(user_id=sender)
+            else:
+                await self._device_list_updater.user_device_resync(sender)
+        except Exception:
+            logger.exception("Failed to resync device for %s", sender)
+
+    async def _handle_marker_event(self, origin: str, marker_event: EventBase):
+        """Handles backfilling the insertion event when we receive a marker
+        event that points to one.
+
+        Args:
+            origin: Origin of the event. Will be called to get the insertion event
+            marker_event: The event to process
+        """
+
+        if marker_event.type != EventTypes.MSC2716_MARKER:
+            # Not a marker event
+            return
+
+        if marker_event.rejected_reason is not None:
+            # Rejected event
+            return
+
+        # Skip processing a marker event if the room version doesn't
+        # support it.
+        room_version = await self.store.get_room_version(marker_event.room_id)
+        if not room_version.msc2716_historical:
+            return
+
+        logger.debug("_handle_marker_event: received %s", marker_event)
+
+        insertion_event_id = marker_event.content.get(
+            EventContentFields.MSC2716_MARKER_INSERTION
+        )
+
+        if insertion_event_id is None:
+            # Nothing to retrieve then (invalid marker)
+            return
+
+        logger.debug(
+            "_handle_marker_event: backfilling insertion event %s", insertion_event_id
+        )
+
+        await self._get_events_and_persist(
+            origin,
+            marker_event.room_id,
+            [insertion_event_id],
+        )
+
+        insertion_event = await self.store.get_event(
+            insertion_event_id, allow_none=True
+        )
+        if insertion_event is None:
+            logger.warning(
+                "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
+                origin,
+                insertion_event_id,
+                marker_event.event_id,
+            )
+            return
+
+        logger.debug(
+            "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
+        await self.store.insert_insertion_extremity(
+            insertion_event_id, marker_event.room_id
+        )
+
+        logger.debug(
+            "_handle_marker_event: insertion extremity added for %s from marker event %s",
+            insertion_event,
+            marker_event,
+        )
+
+    async def _get_events_and_persist(
+        self, destination: str, room_id: str, events: Iterable[str]
+    ) -> None:
+        """Fetch the given events from a server, and persist them as outliers.
+
+        This function *does not* recursively get missing auth events of the
+        newly fetched events. Callers must include in the `events` argument
+        any missing events from the auth chain.
+
+        Logs a warning if we can't find the given event.
+        """
+
+        room_version = await self.store.get_room_version(room_id)
+
+        event_map: Dict[str, EventBase] = {}
+
+        async def get_event(event_id: str):
+            with nested_logging_context(event_id):
+                try:
+                    event = await self.federation_client.get_pdu(
+                        [destination],
+                        event_id,
+                        room_version,
+                        outlier=True,
+                    )
+                    if event is None:
+                        logger.warning(
+                            "Server %s didn't return event %s",
+                            destination,
+                            event_id,
+                        )
+                        return
+
+                    event_map[event.event_id] = event
+
+                except Exception as e:
+                    logger.warning(
+                        "Error fetching missing state/auth event %s: %s %s",
+                        event_id,
+                        type(e),
+                        e,
+                    )
+
+        await concurrently_execute(get_event, events, 5)
+
+        # Make a map of auth events for each event. We do this after fetching
+        # all the events as some of the events' auth events will be in the list
+        # of requested events.
+
+        auth_events = [
+            aid
+            for event in event_map.values()
+            for aid in event.auth_event_ids()
+            if aid not in event_map
+        ]
+        persisted_events = await self.store.get_events(
+            auth_events,
+            allow_rejected=True,
+        )
+
+        event_infos = []
+        for event in event_map.values():
+            auth = {}
+            for auth_event_id in event.auth_event_ids():
+                ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
+                if ae:
+                    auth[(ae.type, ae.state_key)] = ae
+                else:
+                    logger.info("Missing auth event %s", auth_event_id)
+
+            event_infos.append(_NewEventInfo(event, auth))
+
+        if event_infos:
+            await self._auth_and_persist_events(
+                destination,
+                room_id,
+                event_infos,
+            )
+
+    async def _auth_and_persist_events(
+        self,
+        origin: str,
+        room_id: str,
+        event_infos: Collection[_NewEventInfo],
+    ) -> None:
+        """Creates the appropriate contexts and persists events. The events
+        should not depend on one another, e.g. this should be used to persist
+        a bunch of outliers, but not a chunk of individual events that depend
+        on each other for state calculations.
+
+        Notifies about the events where appropriate.
+        """
+
+        if not event_infos:
+            return
+
+        async def prep(ev_info: _NewEventInfo):
+            event = ev_info.event
+            with nested_logging_context(suffix=event.event_id):
+                res = await self.state_handler.compute_event_context(event)
+                res = await self._check_event_auth(
+                    origin,
+                    event,
+                    res,
+                    claimed_auth_event_map=ev_info.claimed_auth_event_map,
+                )
+            return res
+
+        contexts = await make_deferred_yieldable(
+            defer.gatherResults(
+                [run_in_background(prep, ev_info) for ev_info in event_infos],
+                consumeErrors=True,
+            )
+        )
+
+        await self.persist_events_and_notify(
+            room_id,
+            [
+                (ev_info.event, context)
+                for ev_info, context in zip(event_infos, contexts)
+            ],
+        )
+
+    async def _auth_and_persist_event(
+        self,
+        origin: str,
+        event: EventBase,
+        context: EventContext,
+        state: Optional[Iterable[EventBase]] = None,
+        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
+        backfilled: bool = False,
+    ) -> None:
+        """
+        Process an event by performing auth checks and then persisting to the database.
+
+        Args:
+            origin: The host the event originates from.
+            event: The event itself.
+            context:
+                The event context.
+
+            state:
+                The state events used to check the event for soft-fail. If this is
+                not provided the current state events will be used.
+
+            claimed_auth_event_map:
+                A map of (type, state_key) => event for the event's claimed auth_events.
+                Possibly incomplete, and possibly including events that are not yet
+                persisted, or authed, or in the right room.
+
+                Only populated where we may not already have persisted these events -
+                for example, when populating outliers.
+
+            backfilled: True if the event was backfilled.
+        """
+        context = await self._check_event_auth(
+            origin,
+            event,
+            context,
+            state=state,
+            claimed_auth_event_map=claimed_auth_event_map,
+            backfilled=backfilled,
+        )
+
+        await self._run_push_actions_and_persist_event(event, context, backfilled)
+
+    async def _check_event_auth(
+        self,
+        origin: str,
+        event: EventBase,
+        context: EventContext,
+        state: Optional[Iterable[EventBase]] = None,
+        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
+        backfilled: bool = False,
+    ) -> EventContext:
+        """
+        Checks whether an event should be rejected (for failing auth checks).
+
+        Args:
+            origin: The host the event originates from.
+            event: The event itself.
+            context:
+                The event context.
+
+            state:
+                The state events used to check the event for soft-fail. If this is
+                not provided the current state events will be used.
+
+            claimed_auth_event_map:
+                A map of (type, state_key) => event for the event's claimed auth_events.
+                Possibly incomplete, and possibly including events that are not yet
+                persisted, or authed, or in the right room.
+
+                Only populated where we may not already have persisted these events -
+                for example, when populating outliers, or the state for a backwards
+                extremity.
+
+            backfilled: True if the event was backfilled.
+
+        Returns:
+            The updated context object.
+        """
+        room_version = await self.store.get_room_version_id(event.room_id)
+        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+        if claimed_auth_event_map:
+            # if we have a copy of the auth events from the event, use that as the
+            # basis for auth.
+            auth_events = claimed_auth_event_map
+        else:
+            # otherwise, we calculate what the auth events *should* be, and use that
+            prev_state_ids = await context.get_prev_state_ids()
+            auth_events_ids = self._event_auth_handler.compute_auth_events(
+                event, prev_state_ids, for_verification=True
+            )
+            auth_events_x = await self.store.get_events(auth_events_ids)
+            auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
+
+        try:
+            (
+                context,
+                auth_events_for_auth,
+            ) = await self._update_auth_events_and_context_for_auth(
+                origin, event, context, auth_events
+            )
+        except Exception:
+            # We don't really mind if the above fails, so lets not fail
+            # processing if it does. However, it really shouldn't fail so
+            # let's still log as an exception since we'll still want to fix
+            # any bugs.
+            logger.exception(
+                "Failed to double check auth events for %s with remote. "
+                "Ignoring failure and continuing processing of event.",
+                event.event_id,
+            )
+            auth_events_for_auth = auth_events
+
+        try:
+            event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
+        except AuthError as e:
+            logger.warning("Failed auth resolution for %r because %s", event, e)
+            context.rejected = RejectedReason.AUTH_ERROR
+
+        if not context.rejected:
+            await self._check_for_soft_fail(event, state, backfilled, origin=origin)
+
+        if event.type == EventTypes.GuestAccess and not context.rejected:
+            await self.maybe_kick_guest_users(event)
+
+        # If we are going to send this event over federation we precaclculate
+        # the joined hosts.
+        if event.internal_metadata.get_send_on_behalf_of():
+            await self.event_creation_handler.cache_joined_hosts_for_event(
+                event, context
+            )
+
+        return context
+
+    async def _check_for_soft_fail(
+        self,
+        event: EventBase,
+        state: Optional[Iterable[EventBase]],
+        backfilled: bool,
+        origin: str,
+    ) -> None:
+        """Checks if we should soft fail the event; if so, marks the event as
+        such.
+
+        Args:
+            event
+            state: The state at the event if we don't have all the event's prev events
+            backfilled: Whether the event is from backfill
+            origin: The host the event originates from.
+        """
+        # For new (non-backfilled and non-outlier) events we check if the event
+        # passes auth based on the current state. If it doesn't then we
+        # "soft-fail" the event.
+        if backfilled or event.internal_metadata.is_outlier():
+            return
+
+        extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id)
+        extrem_ids = set(extrem_ids_list)
+        prev_event_ids = set(event.prev_event_ids())
+
+        if extrem_ids == prev_event_ids:
+            # If they're the same then the current state is the same as the
+            # state at the event, so no point rechecking auth for soft fail.
+            return
+
+        room_version = await self.store.get_room_version_id(event.room_id)
+        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+        # Calculate the "current state".
+        if state is not None:
+            # If we're explicitly given the state then we won't have all the
+            # prev events, and so we have a gap in the graph. In this case
+            # we want to be a little careful as we might have been down for
+            # a while and have an incorrect view of the current state,
+            # however we still want to do checks as gaps are easy to
+            # maliciously manufacture.
+            #
+            # So we use a "current state" that is actually a state
+            # resolution across the current forward extremities and the
+            # given state at the event. This should correctly handle cases
+            # like bans, especially with state res v2.
+
+            state_sets_d = await self.state_store.get_state_groups(
+                event.room_id, extrem_ids
+            )
+            state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
+            state_sets.append(state)
+            current_states = await self.state_handler.resolve_events(
+                room_version, state_sets, event
+            )
+            current_state_ids: StateMap[str] = {
+                k: e.event_id for k, e in current_states.items()
+            }
+        else:
+            current_state_ids = await self.state_handler.get_current_state_ids(
+                event.room_id, latest_event_ids=extrem_ids
+            )
+
+        logger.debug(
+            "Doing soft-fail check for %s: state %s",
+            event.event_id,
+            current_state_ids,
+        )
+
+        # Now check if event pass auth against said current state
+        auth_types = auth_types_for_event(room_version_obj, event)
+        current_state_ids_list = [
+            e for k, e in current_state_ids.items() if k in auth_types
+        ]
+
+        auth_events_map = await self.store.get_events(current_state_ids_list)
+        current_auth_events = {
+            (e.type, e.state_key): e for e in auth_events_map.values()
+        }
+
+        try:
+            event_auth.check(room_version_obj, event, auth_events=current_auth_events)
+        except AuthError as e:
+            logger.warning(
+                "Soft-failing %r (from %s) because %s",
+                event,
+                e,
+                origin,
+                extra={
+                    "room_id": event.room_id,
+                    "mxid": event.sender,
+                    "hs": origin,
+                },
+            )
+            soft_failed_event_counter.inc()
+            event.internal_metadata.soft_failed = True
+
+    async def _update_auth_events_and_context_for_auth(
+        self,
+        origin: str,
+        event: EventBase,
+        context: EventContext,
+        input_auth_events: StateMap[EventBase],
+    ) -> Tuple[EventContext, StateMap[EventBase]]:
+        """Helper for _check_event_auth. See there for docs.
+
+        Checks whether a given event has the expected auth events. If it
+        doesn't then we talk to the remote server to compare state to see if
+        we can come to a consensus (e.g. if one server missed some valid
+        state).
+
+        This attempts to resolve any potential divergence of state between
+        servers, but is not essential and so failures should not block further
+        processing of the event.
+
+        Args:
+            origin:
+            event:
+            context:
+
+            input_auth_events:
+                Map from (event_type, state_key) to event
+
+                Normally, our calculated auth_events based on the state of the room
+                at the event's position in the DAG, though occasionally (eg if the
+                event is an outlier), may be the auth events claimed by the remote
+                server.
+
+        Returns:
+            updated context, updated auth event map
+        """
+        # take a copy of input_auth_events before we modify it.
+        auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
+
+        event_auth_events = set(event.auth_event_ids())
+
+        # missing_auth is the set of the event's auth_events which we don't yet have
+        # in auth_events.
+        missing_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
+
+        # if we have missing events, we need to fetch those events from somewhere.
+        #
+        # we start by checking if they are in the store, and then try calling /event_auth/.
+        if missing_auth:
+            have_events = await self.store.have_seen_events(event.room_id, missing_auth)
+            logger.debug("Events %s are in the store", have_events)
+            missing_auth.difference_update(have_events)
+
+        if missing_auth:
+            # If we don't have all the auth events, we need to get them.
+            logger.info("auth_events contains unknown events: %s", missing_auth)
+            try:
+                try:
+                    remote_auth_chain = await self.federation_client.get_event_auth(
+                        origin, event.room_id, event.event_id
+                    )
+                except RequestSendFailed as e1:
+                    # The other side isn't around or doesn't implement the
+                    # endpoint, so lets just bail out.
+                    logger.info("Failed to get event auth from remote: %s", e1)
+                    return context, auth_events
+
+                seen_remotes = await self.store.have_seen_events(
+                    event.room_id, [e.event_id for e in remote_auth_chain]
+                )
+
+                for e in remote_auth_chain:
+                    if e.event_id in seen_remotes:
+                        continue
+
+                    if e.event_id == event.event_id:
+                        continue
+
+                    try:
+                        auth_ids = e.auth_event_ids()
+                        auth = {
+                            (e.type, e.state_key): e
+                            for e in remote_auth_chain
+                            if e.event_id in auth_ids or e.type == EventTypes.Create
+                        }
+                        e.internal_metadata.outlier = True
+
+                        logger.debug(
+                            "_check_event_auth %s missing_auth: %s",
+                            event.event_id,
+                            e.event_id,
+                        )
+                        missing_auth_event_context = (
+                            await self.state_handler.compute_event_context(e)
+                        )
+                        await self._auth_and_persist_event(
+                            origin,
+                            e,
+                            missing_auth_event_context,
+                            claimed_auth_event_map=auth,
+                        )
+
+                        if e.event_id in event_auth_events:
+                            auth_events[(e.type, e.state_key)] = e
+                    except AuthError:
+                        pass
+
+            except Exception:
+                logger.exception("Failed to get auth chain")
+
+        if event.internal_metadata.is_outlier():
+            # XXX: given that, for an outlier, we'll be working with the
+            # event's *claimed* auth events rather than those we calculated:
+            # (a) is there any point in this test, since different_auth below will
+            # obviously be empty
+            # (b) alternatively, why don't we do it earlier?
+            logger.info("Skipping auth_event fetch for outlier")
+            return context, auth_events
+
+        different_auth = event_auth_events.difference(
+            e.event_id for e in auth_events.values()
+        )
+
+        if not different_auth:
+            return context, auth_events
+
+        logger.info(
+            "auth_events refers to events which are not in our calculated auth "
+            "chain: %s",
+            different_auth,
+        )
+
+        # XXX: currently this checks for redactions but I'm not convinced that is
+        # necessary?
+        different_events = await self.store.get_events_as_list(different_auth)
+
+        for d in different_events:
+            if d.room_id != event.room_id:
+                logger.warning(
+                    "Event %s refers to auth_event %s which is in a different room",
+                    event.event_id,
+                    d.event_id,
+                )
+
+                # don't attempt to resolve the claimed auth events against our own
+                # in this case: just use our own auth events.
+                #
+                # XXX: should we reject the event in this case? It feels like we should,
+                # but then shouldn't we also do so if we've failed to fetch any of the
+                # auth events?
+                return context, auth_events
+
+        # now we state-resolve between our own idea of the auth events, and the remote's
+        # idea of them.
+
+        local_state = auth_events.values()
+        remote_auth_events = dict(auth_events)
+        remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
+        remote_state = remote_auth_events.values()
+
+        room_version = await self.store.get_room_version_id(event.room_id)
+        new_state = await self.state_handler.resolve_events(
+            room_version, (local_state, remote_state), event
+        )
+
+        logger.info(
+            "After state res: updating auth_events with new state %s",
+            {
+                (d.type, d.state_key): d.event_id
+                for d in new_state.values()
+                if auth_events.get((d.type, d.state_key)) != d
+            },
+        )
+
+        auth_events.update(new_state)
+
+        context = await self._update_context_for_auth_events(
+            event, context, auth_events
+        )
+
+        return context, auth_events
+
+    async def _update_context_for_auth_events(
+        self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
+    ) -> EventContext:
+        """Update the state_ids in an event context after auth event resolution,
+        storing the changes as a new state group.
+
+        Args:
+            event: The event we're handling the context for
+
+            context: initial event context
+
+            auth_events: Events to update in the event context.
+
+        Returns:
+            new event context
+        """
+        # exclude the state key of the new event from the current_state in the context.
+        if event.is_state():
+            event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
+        else:
+            event_key = None
+        state_updates = {
+            k: a.event_id for k, a in auth_events.items() if k != event_key
+        }
+
+        current_state_ids = await context.get_current_state_ids()
+        current_state_ids = dict(current_state_ids)  # type: ignore
+
+        current_state_ids.update(state_updates)
+
+        prev_state_ids = await context.get_prev_state_ids()
+        prev_state_ids = dict(prev_state_ids)
+
+        prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
+
+        # create a new state group as a delta from the existing one.
+        prev_group = context.state_group
+        state_group = await self.state_store.store_state_group(
+            event.event_id,
+            event.room_id,
+            prev_group=prev_group,
+            delta_ids=state_updates,
+            current_state_ids=current_state_ids,
+        )
+
+        return EventContext.with_state(
+            state_group=state_group,
+            state_group_before_event=context.state_group_before_event,
+            current_state_ids=current_state_ids,
+            prev_state_ids=prev_state_ids,
+            prev_group=prev_group,
+            delta_ids=state_updates,
+        )
+
+    async def _run_push_actions_and_persist_event(
+        self, event: EventBase, context: EventContext, backfilled: bool = False
+    ):
+        """Run the push actions for a received event, and persist it.
+
+        Args:
+            event: The event itself.
+            context: The event context.
+            backfilled: True if the event was backfilled.
+        """
+        try:
+            if (
+                not event.internal_metadata.is_outlier()
+                and not backfilled
+                and not context.rejected
+                and (await self.store.get_min_depth(event.room_id)) <= event.depth
+            ):
+                await self.action_generator.handle_push_actions_for_event(
+                    event, context
+                )
+
+            await self.persist_events_and_notify(
+                event.room_id, [(event, context)], backfilled=backfilled
+            )
+        except Exception:
+            run_in_background(
+                self.store.remove_push_actions_from_staging, event.event_id
+            )
+            raise
+
+    async def persist_events_and_notify(
+        self,
+        room_id: str,
+        event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
+        backfilled: bool = False,
+    ) -> int:
+        """Persists events and tells the notifier/pushers about them, if
+        necessary.
+
+        Args:
+            room_id: The room ID of events being persisted.
+            event_and_contexts: Sequence of events with their associated
+                context that should be persisted. All events must belong to
+                the same room.
+            backfilled: Whether these events are a result of
+                backfilling or not
+
+        Returns:
+            The stream ID after which all events have been persisted.
+        """
+        if not event_and_contexts:
+            return self.store.get_current_events_token()
+
+        instance = self.config.worker.events_shard_config.get_instance(room_id)
+        if instance != self._instance_name:
+            # Limit the number of events sent over replication. We choose 200
+            # here as that is what we default to in `max_request_body_size(..)`
+            for batch in batch_iter(event_and_contexts, 200):
+                result = await self._send_events(
+                    instance_name=instance,
+                    store=self.store,
+                    room_id=room_id,
+                    event_and_contexts=batch,
+                    backfilled=backfilled,
+                )
+            return result["max_stream_id"]
+        else:
+            assert self.storage.persistence
+
+            # Note that this returns the events that were persisted, which may not be
+            # the same as were passed in if some were deduplicated due to transaction IDs.
+            events, max_stream_token = await self.storage.persistence.persist_events(
+                event_and_contexts, backfilled=backfilled
+            )
+
+            if self._ephemeral_messages_enabled:
+                for event in events:
+                    # If there's an expiry timestamp on the event, schedule its expiry.
+                    self._message_handler.maybe_schedule_expiry(event)
+
+            if not backfilled:  # Never notify for backfilled events
+                for event in events:
+                    await self._notify_persisted_event(event, max_stream_token)
+
+            return max_stream_token.stream
+
+    async def _notify_persisted_event(
+        self, event: EventBase, max_stream_token: RoomStreamToken
+    ) -> None:
+        """Checks to see if notifier/pushers should be notified about the
+        event or not.
+
+        Args:
+            event:
+            max_stream_token: The max_stream_id returned by persist_events
+        """
+
+        extra_users = []
+        if event.type == EventTypes.Member:
+            target_user_id = event.state_key
+
+            # We notify for memberships if its an invite for one of our
+            # users
+            if event.internal_metadata.is_outlier():
+                if event.membership != Membership.INVITE:
+                    if not self.is_mine_id(target_user_id):
+                        return
+
+            target_user = UserID.from_string(target_user_id)
+            extra_users.append(target_user)
+        elif event.internal_metadata.is_outlier():
+            return
+
+        # the event has been persisted so it should have a stream ordering.
+        assert event.internal_metadata.stream_ordering
+
+        event_pos = PersistedEventPosition(
+            self._instance_name, event.internal_metadata.stream_ordering
+        )
+        self.notifier.on_new_room_event(
+            event, event_pos, max_stream_token, extra_users=extra_users
+        )
+
+    def _sanity_check_event(self, ev: EventBase) -> None:
+        """
+        Do some early sanity checks of a received event
+
+        In particular, checks it doesn't have an excessive number of
+        prev_events or auth_events, which could cause a huge state resolution
+        or cascade of event fetches.
+
+        Args:
+            ev: event to be checked
+
+        Raises:
+            SynapseError if the event does not pass muster
+        """
+        if len(ev.prev_event_ids()) > 20:
+            logger.warning(
+                "Rejecting event %s which has %i prev_events",
+                ev.event_id,
+                len(ev.prev_event_ids()),
+            )
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
+
+        if len(ev.auth_event_ids()) > 10:
+            logger.warning(
+                "Rejecting event %s which has %i auth_events",
+                ev.event_id,
+                len(ev.auth_event_ids()),
+            )
+            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
+
+    async def get_min_depth_for_context(self, context: str) -> int:
+        return await self.store.get_min_depth(context)
diff --git a/synapse/handlers/initial_sync.py b/synapse/handlers/initial_sync.py
index e1c544a3c9..4e8f7f1d85 100644
--- a/synapse/handlers/initial_sync.py
+++ b/synapse/handlers/initial_sync.py
@@ -151,7 +151,7 @@ class InitialSyncHandler(BaseHandler):
             limit = 10
 
         async def handle_room(event: RoomsForUser):
-            d = {
+            d: JsonDict = {
                 "room_id": event.room_id,
                 "membership": event.membership,
                 "visibility": (
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index 7ca14e1d84..4418d63df7 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -353,6 +353,11 @@ class BasePresenceHandler(abc.ABC):
         # otherwise would not do).
         await self.set_state(UserID.from_string(user_id), state, force_notify=True)
 
+    async def is_visible(self, observed_user: UserID, observer_user: UserID) -> bool:
+        raise NotImplementedError(
+            "Attempting to check presence on a non-presence worker."
+        )
+
 
 class _NullContextManager(ContextManager[None]):
     """A context manager which does nothing."""
diff --git a/synapse/handlers/register.py b/synapse/handlers/register.py
index 8cf614136e..0ed59d757b 100644
--- a/synapse/handlers/register.py
+++ b/synapse/handlers/register.py
@@ -56,6 +56,22 @@ login_counter = Counter(
 )
 
 
+def init_counters_for_auth_provider(auth_provider_id: str) -> None:
+    """Ensure the prometheus counters for the given auth provider are initialised
+
+    This fixes a problem where the counters are not reported for a given auth provider
+    until the user first logs in/registers.
+    """
+    for is_guest in (True, False):
+        login_counter.labels(guest=is_guest, auth_provider=auth_provider_id)
+        for shadow_banned in (True, False):
+            registration_counter.labels(
+                guest=is_guest,
+                shadow_banned=shadow_banned,
+                auth_provider=auth_provider_id,
+            )
+
+
 class LoginDict(TypedDict):
     device_id: str
     access_token: str
@@ -96,6 +112,8 @@ class RegistrationHandler(BaseHandler):
         self.session_lifetime = hs.config.session_lifetime
         self.access_token_lifetime = hs.config.access_token_lifetime
 
+        init_counters_for_auth_provider("")
+
     async def check_username(
         self,
         localpart: str,
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index ba13196218..401b84aad1 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -36,6 +36,7 @@ from synapse.api.ratelimiting import Ratelimiter
 from synapse.event_auth import get_named_level, get_power_level_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
+from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
 from synapse.types import (
     JsonDict,
     Requester,
@@ -79,7 +80,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         self.account_data_handler = hs.get_account_data_handler()
         self.event_auth_handler = hs.get_event_auth_handler()
 
-        self.member_linearizer = Linearizer(name="member")
+        self.member_linearizer: Linearizer = Linearizer(name="member")
 
         self.clock = hs.get_clock()
         self.spam_checker = hs.get_spam_checker()
@@ -556,6 +557,20 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             content.pop("displayname", None)
             content.pop("avatar_url", None)
 
+        if len(content.get("displayname") or "") > MAX_DISPLAYNAME_LEN:
+            raise SynapseError(
+                400,
+                f"Displayname is too long (max {MAX_DISPLAYNAME_LEN})",
+                errcode=Codes.BAD_JSON,
+            )
+
+        if len(content.get("avatar_url") or "") > MAX_AVATAR_URL_LEN:
+            raise SynapseError(
+                400,
+                f"Avatar URL is too long (max {MAX_AVATAR_URL_LEN})",
+                errcode=Codes.BAD_JSON,
+            )
+
         effective_membership_state = action
         if action in ["kick", "unban"]:
             effective_membership_state = "leave"
diff --git a/synapse/handlers/room_summary.py b/synapse/handlers/room_summary.py
index ac6cfc0da9..906985c754 100644
--- a/synapse/handlers/room_summary.py
+++ b/synapse/handlers/room_summary.py
@@ -28,12 +28,11 @@ from synapse.api.constants import (
     Membership,
     RoomTypes,
 )
-from synapse.api.errors import AuthError, Codes, NotFoundError, SynapseError
+from synapse.api.errors import AuthError, Codes, NotFoundError, StoreError, SynapseError
 from synapse.events import EventBase
 from synapse.events.utils import format_event_for_client_v2
 from synapse.types import JsonDict
 from synapse.util.caches.response_cache import ResponseCache
-from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
     from synapse.server import HomeServer
@@ -76,6 +75,9 @@ class _PaginationSession:
 
 
 class RoomSummaryHandler:
+    # A unique key used for pagination sessions for the room hierarchy endpoint.
+    _PAGINATION_SESSION_TYPE = "room_hierarchy_pagination"
+
     # The time a pagination session remains valid for.
     _PAGINATION_SESSION_VALIDITY_PERIOD_MS = 5 * 60 * 1000
 
@@ -87,12 +89,6 @@ class RoomSummaryHandler:
         self._server_name = hs.hostname
         self._federation_client = hs.get_federation_client()
 
-        # A map of query information to the current pagination state.
-        #
-        # TODO Allow for multiple workers to share this data.
-        # TODO Expire pagination tokens.
-        self._pagination_sessions: Dict[_PaginationKey, _PaginationSession] = {}
-
         # If a user tries to fetch the same page multiple times in quick succession,
         # only process the first attempt and return its result to subsequent requests.
         self._pagination_response_cache: ResponseCache[
@@ -102,21 +98,6 @@ class RoomSummaryHandler:
             "get_room_hierarchy",
         )
 
-    def _expire_pagination_sessions(self):
-        """Expire pagination session which are old."""
-        expire_before = (
-            self._clock.time_msec() - self._PAGINATION_SESSION_VALIDITY_PERIOD_MS
-        )
-        to_expire = []
-
-        for key, value in self._pagination_sessions.items():
-            if value.creation_time_ms < expire_before:
-                to_expire.append(key)
-
-        for key in to_expire:
-            logger.debug("Expiring pagination session id %s", key)
-            del self._pagination_sessions[key]
-
     async def get_space_summary(
         self,
         requester: str,
@@ -327,18 +308,29 @@ class RoomSummaryHandler:
 
         # If this is continuing a previous session, pull the persisted data.
         if from_token:
-            self._expire_pagination_sessions()
+            try:
+                pagination_session = await self._store.get_session(
+                    session_type=self._PAGINATION_SESSION_TYPE,
+                    session_id=from_token,
+                )
+            except StoreError:
+                raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
 
-            pagination_key = _PaginationKey(
-                requested_room_id, suggested_only, max_depth, from_token
-            )
-            if pagination_key not in self._pagination_sessions:
+            # If the requester, room ID, suggested-only, or max depth were modified
+            # the session is invalid.
+            if (
+                requester != pagination_session["requester"]
+                or requested_room_id != pagination_session["room_id"]
+                or suggested_only != pagination_session["suggested_only"]
+                or max_depth != pagination_session["max_depth"]
+            ):
                 raise SynapseError(400, "Unknown pagination token", Codes.INVALID_PARAM)
 
             # Load the previous state.
-            pagination_session = self._pagination_sessions[pagination_key]
-            room_queue = pagination_session.room_queue
-            processed_rooms = pagination_session.processed_rooms
+            room_queue = [
+                _RoomQueueEntry(*fields) for fields in pagination_session["room_queue"]
+            ]
+            processed_rooms = set(pagination_session["processed_rooms"])
         else:
             # The queue of rooms to process, the next room is last on the stack.
             room_queue = [_RoomQueueEntry(requested_room_id, ())]
@@ -456,13 +448,21 @@ class RoomSummaryHandler:
 
         # If there's additional data, generate a pagination token (and persist state).
         if room_queue:
-            next_batch = random_string(24)
-            result["next_batch"] = next_batch
-            pagination_key = _PaginationKey(
-                requested_room_id, suggested_only, max_depth, next_batch
-            )
-            self._pagination_sessions[pagination_key] = _PaginationSession(
-                self._clock.time_msec(), room_queue, processed_rooms
+            result["next_batch"] = await self._store.create_session(
+                session_type=self._PAGINATION_SESSION_TYPE,
+                value={
+                    # Information which must be identical across pagination.
+                    "requester": requester,
+                    "room_id": requested_room_id,
+                    "suggested_only": suggested_only,
+                    "max_depth": max_depth,
+                    # The stored state.
+                    "room_queue": [
+                        attr.astuple(room_entry) for room_entry in room_queue
+                    ],
+                    "processed_rooms": list(processed_rooms),
+                },
+                expiry_ms=self._PAGINATION_SESSION_VALIDITY_PERIOD_MS,
             )
 
         return result
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 1b855a685c..0e6ebb574e 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -37,6 +37,7 @@ from twisted.web.server import Request
 from synapse.api.constants import LoginType
 from synapse.api.errors import Codes, NotFoundError, RedirectException, SynapseError
 from synapse.config.sso import SsoAttributeRequirement
+from synapse.handlers.register import init_counters_for_auth_provider
 from synapse.handlers.ui_auth import UIAuthSessionDataConstants
 from synapse.http import get_request_user_agent
 from synapse.http.server import respond_with_html, respond_with_redirect
@@ -213,6 +214,7 @@ class SsoHandler:
         p_id = p.idp_id
         assert p_id not in self._identity_providers
         self._identity_providers[p_id] = p
+        init_counters_for_auth_provider(p_id)
 
     def get_identity_providers(self) -> Mapping[str, SsoIdentityProvider]:
         """Get the configured identity providers"""
diff --git a/synapse/handlers/sync.py b/synapse/handlers/sync.py
index 590642f510..86c3c7f0df 100644
--- a/synapse/handlers/sync.py
+++ b/synapse/handlers/sync.py
@@ -1,5 +1,4 @@
-# Copyright 2015, 2016 OpenMarket Ltd
-# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2015-2021 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -31,6 +30,8 @@ from prometheus_client import Counter
 
 from synapse.api.constants import AccountDataTypes, EventTypes, Membership
 from synapse.api.filtering import FilterCollection
+from synapse.api.presence import UserPresenceState
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
 from synapse.events import EventBase
 from synapse.logging.context import current_context
 from synapse.logging.opentracing import SynapseTags, log_kv, set_tag, start_active_span
@@ -86,20 +87,20 @@ LAZY_LOADED_MEMBERS_CACHE_MAX_SIZE = 100
 SyncRequestKey = Tuple[Any, ...]
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class SyncConfig:
-    user = attr.ib(type=UserID)
-    filter_collection = attr.ib(type=FilterCollection)
-    is_guest = attr.ib(type=bool)
-    request_key = attr.ib(type=SyncRequestKey)
-    device_id = attr.ib(type=Optional[str])
+    user: UserID
+    filter_collection: FilterCollection
+    is_guest: bool
+    request_key: SyncRequestKey
+    device_id: Optional[str]
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class TimelineBatch:
-    prev_batch = attr.ib(type=StreamToken)
-    events = attr.ib(type=List[EventBase])
-    limited = attr.ib(type=bool)
+    prev_batch: StreamToken
+    events: List[EventBase]
+    limited: bool
 
     def __bool__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -113,16 +114,16 @@ class TimelineBatch:
 # if there are updates for it, which we check after the instance has been created.
 # This should not be a big deal because we update the notification counts afterwards as
 # well anyway.
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
 class JoinedSyncResult:
-    room_id = attr.ib(type=str)
-    timeline = attr.ib(type=TimelineBatch)
-    state = attr.ib(type=StateMap[EventBase])
-    ephemeral = attr.ib(type=List[JsonDict])
-    account_data = attr.ib(type=List[JsonDict])
-    unread_notifications = attr.ib(type=JsonDict)
-    summary = attr.ib(type=Optional[JsonDict])
-    unread_count = attr.ib(type=int)
+    room_id: str
+    timeline: TimelineBatch
+    state: StateMap[EventBase]
+    ephemeral: List[JsonDict]
+    account_data: List[JsonDict]
+    unread_notifications: JsonDict
+    summary: Optional[JsonDict]
+    unread_count: int
 
     def __bool__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -138,12 +139,12 @@ class JoinedSyncResult:
         )
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class ArchivedSyncResult:
-    room_id = attr.ib(type=str)
-    timeline = attr.ib(type=TimelineBatch)
-    state = attr.ib(type=StateMap[EventBase])
-    account_data = attr.ib(type=List[JsonDict])
+    room_id: str
+    timeline: TimelineBatch
+    state: StateMap[EventBase]
+    account_data: List[JsonDict]
 
     def __bool__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -152,37 +153,37 @@ class ArchivedSyncResult:
         return bool(self.timeline or self.state or self.account_data)
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class InvitedSyncResult:
-    room_id = attr.ib(type=str)
-    invite = attr.ib(type=EventBase)
+    room_id: str
+    invite: EventBase
 
     def __bool__(self) -> bool:
         """Invited rooms should always be reported to the client"""
         return True
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class KnockedSyncResult:
-    room_id = attr.ib(type=str)
-    knock = attr.ib(type=EventBase)
+    room_id: str
+    knock: EventBase
 
     def __bool__(self) -> bool:
         """Knocked rooms should always be reported to the client"""
         return True
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class GroupsSyncResult:
-    join = attr.ib(type=JsonDict)
-    invite = attr.ib(type=JsonDict)
-    leave = attr.ib(type=JsonDict)
+    join: JsonDict
+    invite: JsonDict
+    leave: JsonDict
 
     def __bool__(self) -> bool:
         return bool(self.join or self.invite or self.leave)
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class DeviceLists:
     """
     Attributes:
@@ -190,27 +191,27 @@ class DeviceLists:
         left: List of user_ids whose devices we no longer track
     """
 
-    changed = attr.ib(type=Collection[str])
-    left = attr.ib(type=Collection[str])
+    changed: Collection[str]
+    left: Collection[str]
 
     def __bool__(self) -> bool:
         return bool(self.changed or self.left)
 
 
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
 class _RoomChanges:
     """The set of room entries to include in the sync, plus the set of joined
     and left room IDs since last sync.
     """
 
-    room_entries = attr.ib(type=List["RoomSyncResultBuilder"])
-    invited = attr.ib(type=List[InvitedSyncResult])
-    knocked = attr.ib(type=List[KnockedSyncResult])
-    newly_joined_rooms = attr.ib(type=List[str])
-    newly_left_rooms = attr.ib(type=List[str])
+    room_entries: List["RoomSyncResultBuilder"]
+    invited: List[InvitedSyncResult]
+    knocked: List[KnockedSyncResult]
+    newly_joined_rooms: List[str]
+    newly_left_rooms: List[str]
 
 
-@attr.s(slots=True, frozen=True)
+@attr.s(slots=True, frozen=True, auto_attribs=True)
 class SyncResult:
     """
     Attributes:
@@ -230,18 +231,18 @@ class SyncResult:
         groups: Group updates, if any
     """
 
-    next_batch = attr.ib(type=StreamToken)
-    presence = attr.ib(type=List[JsonDict])
-    account_data = attr.ib(type=List[JsonDict])
-    joined = attr.ib(type=List[JoinedSyncResult])
-    invited = attr.ib(type=List[InvitedSyncResult])
-    knocked = attr.ib(type=List[KnockedSyncResult])
-    archived = attr.ib(type=List[ArchivedSyncResult])
-    to_device = attr.ib(type=List[JsonDict])
-    device_lists = attr.ib(type=DeviceLists)
-    device_one_time_keys_count = attr.ib(type=JsonDict)
-    device_unused_fallback_key_types = attr.ib(type=List[str])
-    groups = attr.ib(type=Optional[GroupsSyncResult])
+    next_batch: StreamToken
+    presence: List[UserPresenceState]
+    account_data: List[JsonDict]
+    joined: List[JoinedSyncResult]
+    invited: List[InvitedSyncResult]
+    knocked: List[KnockedSyncResult]
+    archived: List[ArchivedSyncResult]
+    to_device: List[JsonDict]
+    device_lists: DeviceLists
+    device_one_time_keys_count: JsonDict
+    device_unused_fallback_key_types: List[str]
+    groups: Optional[GroupsSyncResult]
 
     def __bool__(self) -> bool:
         """Make the result appear empty if there are no updates. This is used
@@ -701,7 +702,7 @@ class SyncHandler:
         name_id = state_ids.get((EventTypes.Name, ""))
         canonical_alias_id = state_ids.get((EventTypes.CanonicalAlias, ""))
 
-        summary = {}
+        summary: JsonDict = {}
         empty_ms = MemberSummary([], 0)
 
         # TODO: only send these when they change.
@@ -1843,6 +1844,9 @@ class SyncHandler:
         knocked = []
 
         for event in room_list:
+            if event.room_version_id not in KNOWN_ROOM_VERSIONS:
+                continue
+
             if event.membership == Membership.JOIN:
                 room_entries.append(
                     RoomSyncResultBuilder(
@@ -2076,21 +2080,23 @@ class SyncHandler:
         # If the membership's stream ordering is after the given stream
         # ordering, we need to go and work out if the user was in the room
         # before.
-        for room_id, event_pos in joined_rooms:
-            if not event_pos.persisted_after(room_key):
-                joined_room_ids.add(room_id)
+        for joined_room in joined_rooms:
+            if not joined_room.event_pos.persisted_after(room_key):
+                joined_room_ids.add(joined_room.room_id)
                 continue
 
-            logger.info("User joined room after current token: %s", room_id)
+            logger.info("User joined room after current token: %s", joined_room.room_id)
 
             extrems = (
                 await self.store.get_forward_extremities_for_room_at_stream_ordering(
-                    room_id, event_pos.stream
+                    joined_room.room_id, joined_room.event_pos.stream
                 )
             )
-            users_in_room = await self.state.get_current_users_in_room(room_id, extrems)
+            users_in_room = await self.state.get_current_users_in_room(
+                joined_room.room_id, extrems
+            )
             if user_id in users_in_room:
-                joined_room_ids.add(room_id)
+                joined_room_ids.add(joined_room.room_id)
 
         return frozenset(joined_room_ids)
 
@@ -2160,7 +2166,7 @@ def _calculate_state(
     return {event_id_to_key[e]: e for e in state_ids}
 
 
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
 class SyncResultBuilder:
     """Used to help build up a new SyncResult for a user
 
@@ -2172,33 +2178,33 @@ class SyncResultBuilder:
         joined_room_ids: List of rooms the user is joined to
 
         # The following mirror the fields in a sync response
-        presence (list)
-        account_data (list)
-        joined (list[JoinedSyncResult])
-        invited (list[InvitedSyncResult])
-        knocked (list[KnockedSyncResult])
-        archived (list[ArchivedSyncResult])
-        groups (GroupsSyncResult|None)
-        to_device (list)
+        presence
+        account_data
+        joined
+        invited
+        knocked
+        archived
+        groups
+        to_device
     """
 
-    sync_config = attr.ib(type=SyncConfig)
-    full_state = attr.ib(type=bool)
-    since_token = attr.ib(type=Optional[StreamToken])
-    now_token = attr.ib(type=StreamToken)
-    joined_room_ids = attr.ib(type=FrozenSet[str])
+    sync_config: SyncConfig
+    full_state: bool
+    since_token: Optional[StreamToken]
+    now_token: StreamToken
+    joined_room_ids: FrozenSet[str]
 
-    presence = attr.ib(type=List[JsonDict], default=attr.Factory(list))
-    account_data = attr.ib(type=List[JsonDict], default=attr.Factory(list))
-    joined = attr.ib(type=List[JoinedSyncResult], default=attr.Factory(list))
-    invited = attr.ib(type=List[InvitedSyncResult], default=attr.Factory(list))
-    knocked = attr.ib(type=List[KnockedSyncResult], default=attr.Factory(list))
-    archived = attr.ib(type=List[ArchivedSyncResult], default=attr.Factory(list))
-    groups = attr.ib(type=Optional[GroupsSyncResult], default=None)
-    to_device = attr.ib(type=List[JsonDict], default=attr.Factory(list))
+    presence: List[UserPresenceState] = attr.Factory(list)
+    account_data: List[JsonDict] = attr.Factory(list)
+    joined: List[JoinedSyncResult] = attr.Factory(list)
+    invited: List[InvitedSyncResult] = attr.Factory(list)
+    knocked: List[KnockedSyncResult] = attr.Factory(list)
+    archived: List[ArchivedSyncResult] = attr.Factory(list)
+    groups: Optional[GroupsSyncResult] = None
+    to_device: List[JsonDict] = attr.Factory(list)
 
 
-@attr.s(slots=True)
+@attr.s(slots=True, auto_attribs=True)
 class RoomSyncResultBuilder:
     """Stores information needed to create either a `JoinedSyncResult` or
     `ArchivedSyncResult`.
@@ -2214,10 +2220,10 @@ class RoomSyncResultBuilder:
         upto_token: Latest point to return events from.
     """
 
-    room_id = attr.ib(type=str)
-    rtype = attr.ib(type=str)
-    events = attr.ib(type=Optional[List[EventBase]])
-    newly_joined = attr.ib(type=bool)
-    full_state = attr.ib(type=bool)
-    since_token = attr.ib(type=Optional[StreamToken])
-    upto_token = attr.ib(type=StreamToken)
+    room_id: str
+    rtype: str
+    events: Optional[List[EventBase]]
+    newly_joined: bool
+    full_state: bool
+    since_token: Optional[StreamToken]
+    upto_token: StreamToken
diff --git a/synapse/handlers/ui_auth/__init__.py b/synapse/handlers/ui_auth/__init__.py
index 4c3b669fae..13b0c61d2e 100644
--- a/synapse/handlers/ui_auth/__init__.py
+++ b/synapse/handlers/ui_auth/__init__.py
@@ -34,3 +34,8 @@ class UIAuthSessionDataConstants:
     # used by validate_user_via_ui_auth to store the mxid of the user we are validating
     # for.
     REQUEST_USER_ID = "request_user_id"
+
+    # used during registration to store the registration token used (if required) so that:
+    # - we can prevent a token being used twice by one session
+    # - we can 'use up' the token after registration has successfully completed
+    REGISTRATION_TOKEN = "org.matrix.msc3231.login.registration_token"
diff --git a/synapse/handlers/ui_auth/checkers.py b/synapse/handlers/ui_auth/checkers.py
index 5414ce77d8..d3828dec6b 100644
--- a/synapse/handlers/ui_auth/checkers.py
+++ b/synapse/handlers/ui_auth/checkers.py
@@ -49,7 +49,7 @@ class UserInteractiveAuthChecker:
             clientip: The IP address of the client.
 
         Raises:
-            SynapseError if authentication failed
+            LoginError if authentication failed.
 
         Returns:
             The result of authentication (to pass back to the client?)
@@ -131,7 +131,9 @@ class RecaptchaAuthChecker(UserInteractiveAuthChecker):
             )
             if resp_body["success"]:
                 return True
-        raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+        raise LoginError(
+            401, "Captcha authentication failed", errcode=Codes.UNAUTHORIZED
+        )
 
 
 class _BaseThreepidAuthChecker:
@@ -191,7 +193,9 @@ class _BaseThreepidAuthChecker:
             raise AssertionError("Unrecognized threepid medium: %s" % (medium,))
 
         if not threepid:
-            raise LoginError(401, "", errcode=Codes.UNAUTHORIZED)
+            raise LoginError(
+                401, "Unable to get validated threepid", errcode=Codes.UNAUTHORIZED
+            )
 
         if threepid["medium"] != medium:
             raise LoginError(
@@ -237,11 +241,76 @@ class MsisdnAuthChecker(UserInteractiveAuthChecker, _BaseThreepidAuthChecker):
         return await self._check_threepid("msisdn", authdict)
 
 
+class RegistrationTokenAuthChecker(UserInteractiveAuthChecker):
+    AUTH_TYPE = LoginType.REGISTRATION_TOKEN
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+        self.hs = hs
+        self._enabled = bool(hs.config.registration_requires_token)
+        self.store = hs.get_datastore()
+
+    def is_enabled(self) -> bool:
+        return self._enabled
+
+    async def check_auth(self, authdict: dict, clientip: str) -> Any:
+        if "token" not in authdict:
+            raise LoginError(400, "Missing registration token", Codes.MISSING_PARAM)
+        if not isinstance(authdict["token"], str):
+            raise LoginError(
+                400, "Registration token must be a string", Codes.INVALID_PARAM
+            )
+        if "session" not in authdict:
+            raise LoginError(400, "Missing UIA session", Codes.MISSING_PARAM)
+
+        # Get these here to avoid cyclic dependencies
+        from synapse.handlers.ui_auth import UIAuthSessionDataConstants
+
+        auth_handler = self.hs.get_auth_handler()
+
+        session = authdict["session"]
+        token = authdict["token"]
+
+        # If the LoginType.REGISTRATION_TOKEN stage has already been completed,
+        # return early to avoid incrementing `pending` again.
+        stored_token = await auth_handler.get_session_data(
+            session, UIAuthSessionDataConstants.REGISTRATION_TOKEN
+        )
+        if stored_token:
+            if token != stored_token:
+                raise LoginError(
+                    400, "Registration token has changed", Codes.INVALID_PARAM
+                )
+            else:
+                return token
+
+        if await self.store.registration_token_is_valid(token):
+            # Increment pending counter, so that if token has limited uses it
+            # can't be used up by someone else in the meantime.
+            await self.store.set_registration_token_pending(token)
+            # Store the token in the UIA session, so that once registration
+            # is complete `completed` can be incremented.
+            await auth_handler.set_session_data(
+                session,
+                UIAuthSessionDataConstants.REGISTRATION_TOKEN,
+                token,
+            )
+            # The token will be stored as the result of the authentication stage
+            # in ui_auth_sessions_credentials. This allows the pending counter
+            # for tokens to be decremented when expired sessions are deleted.
+            return token
+        else:
+            raise LoginError(
+                401, "Invalid registration token", errcode=Codes.UNAUTHORIZED
+            )
+
+
 INTERACTIVE_AUTH_CHECKERS = [
     DummyAuthChecker,
     TermsAuthChecker,
     RecaptchaAuthChecker,
     EmailIdentityAuthChecker,
     MsisdnAuthChecker,
+    RegistrationTokenAuthChecker,
 ]
 """A list of UserInteractiveAuthChecker classes"""