summary refs log tree commit diff
path: root/synapse/handlers/federation.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/federation.py')
-rw-r--r--synapse/handlers/federation.py1927
1 files changed, 24 insertions, 1903 deletions
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index c0e13bdaac..daf1d3bfb3 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -17,23 +17,9 @@
 
 import itertools
 import logging
-from collections.abc import Container
 from http import HTTPStatus
-from typing import (
-    TYPE_CHECKING,
-    Collection,
-    Dict,
-    Iterable,
-    List,
-    Optional,
-    Sequence,
-    Set,
-    Tuple,
-    Union,
-)
+from typing import TYPE_CHECKING, Dict, Iterable, List, Optional, Tuple, Union
 
-import attr
-from prometheus_client import Counter
 from signedjson.key import decode_verify_key_bytes
 from signedjson.sign import verify_signed_json
 from unpaddedbase64 import decode_base64
@@ -41,19 +27,12 @@ from unpaddedbase64 import decode_base64
 from twisted.internet import defer
 
 from synapse import event_auth
-from synapse.api.constants import (
-    EventContentFields,
-    EventTypes,
-    Membership,
-    RejectedReason,
-    RoomEncryptionAlgorithms,
-)
+from synapse.api.constants import EventTypes, Membership, RejectedReason
 from synapse.api.errors import (
     AuthError,
     CodeMessageException,
     Codes,
     FederationDeniedError,
-    FederationError,
     HttpResponseException,
     NotFoundError,
     RequestSendFailed,
@@ -61,10 +40,10 @@ from synapse.api.errors import (
 )
 from synapse.api.room_versions import KNOWN_ROOM_VERSIONS, RoomVersion, RoomVersions
 from synapse.crypto.event_signing import compute_event_signature
-from synapse.event_auth import auth_types_for_event
 from synapse.events import EventBase
 from synapse.events.snapshot import EventContext
 from synapse.events.validator import EventValidator
+from synapse.federation.federation_client import InvalidResponseError
 from synapse.handlers._base import BaseHandler
 from synapse.http.servlet import assert_params_in_dict
 from synapse.logging.context import (
@@ -74,28 +53,14 @@ from synapse.logging.context import (
     run_in_background,
 )
 from synapse.logging.utils import log_function
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.replication.http.devices import ReplicationUserDevicesResyncRestServlet
 from synapse.replication.http.federation import (
     ReplicationCleanRoomRestServlet,
-    ReplicationFederationSendEventsRestServlet,
     ReplicationStoreRoomOnOutlierMembershipRestServlet,
 )
-from synapse.state import StateResolutionStore
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
-from synapse.types import (
-    JsonDict,
-    MutableStateMap,
-    PersistedEventPosition,
-    RoomStreamToken,
-    StateMap,
-    UserID,
-    get_domain_from_id,
-)
-from synapse.util.async_helpers import Linearizer, concurrently_execute
-from synapse.util.iterutils import batch_iter
+from synapse.types import JsonDict, StateMap, get_domain_from_id
+from synapse.util.async_helpers import Linearizer
 from synapse.util.retryutils import NotRetryingDestination
-from synapse.util.stringutils import shortstr
 from synapse.visibility import filter_events_for_server
 
 if TYPE_CHECKING:
@@ -103,50 +68,11 @@ if TYPE_CHECKING:
 
 logger = logging.getLogger(__name__)
 
-soft_failed_event_counter = Counter(
-    "synapse_federation_soft_failed_events_total",
-    "Events received over federation that we marked as soft_failed",
-)
-
-
-@attr.s(slots=True, frozen=True, auto_attribs=True)
-class _NewEventInfo:
-    """Holds information about a received event, ready for passing to _auth_and_persist_events
-
-    Attributes:
-        event: the received event
-
-        state: the state at that event, according to /state_ids from a remote
-           homeserver. Only populated for backfilled events which are going to be a
-           new backwards extremity.
-
-        claimed_auth_event_map: a map of (type, state_key) => event for the event's
-            claimed auth_events.
-
-            This can include events which have not yet been persisted, in the case that
-            we are backfilling a batch of events.
-
-            Note: May be incomplete: if we were unable to find all of the claimed auth
-            events. Also, treat the contents with caution: the events might also have
-            been rejected, might not yet have been authorized themselves, or they might
-            be in the wrong room.
-
-    """
-
-    event: EventBase
-    state: Optional[Sequence[EventBase]]
-    claimed_auth_event_map: StateMap[EventBase]
-
 
 class FederationHandler(BaseHandler):
-    """Handles events that originated from federation.
-    Responsible for:
-    a) handling received Pdus before handing them on as Events to the rest
-    of the homeserver (including auth and state conflict resolutions)
-    b) converting events that were produced by local clients that may need
-    to be sent to remote homeservers.
-    c) doing the necessary dances to invite remote users and join remote
-    rooms.
+    """Handles general incoming federation requests
+
+    Incoming events are *not* handled here, for which see FederationEventHandler.
     """
 
     def __init__(self, hs: "HomeServer"):
@@ -159,979 +85,35 @@ class FederationHandler(BaseHandler):
         self.state_store = self.storage.state
         self.federation_client = hs.get_federation_client()
         self.state_handler = hs.get_state_handler()
-        self._state_resolution_handler = hs.get_state_resolution_handler()
         self.server_name = hs.hostname
         self.keyring = hs.get_keyring()
-        self.action_generator = hs.get_action_generator()
         self.is_mine_id = hs.is_mine_id
         self.spam_checker = hs.get_spam_checker()
         self.event_creation_handler = hs.get_event_creation_handler()
         self._event_auth_handler = hs.get_event_auth_handler()
-        self._message_handler = hs.get_message_handler()
         self._server_notices_mxid = hs.config.server_notices_mxid
         self.config = hs.config
         self.http_client = hs.get_proxied_blacklisted_http_client()
-        self._instance_name = hs.get_instance_name()
         self._replication = hs.get_replication_data_handler()
+        self._federation_event_handler = hs.get_federation_event_handler()
 
-        self._send_events = ReplicationFederationSendEventsRestServlet.make_client(hs)
         self._clean_room_for_join_client = ReplicationCleanRoomRestServlet.make_client(
             hs
         )
 
         if hs.config.worker_app:
-            self._user_device_resync = (
-                ReplicationUserDevicesResyncRestServlet.make_client(hs)
-            )
             self._maybe_store_room_on_outlier_membership = (
                 ReplicationStoreRoomOnOutlierMembershipRestServlet.make_client(hs)
             )
         else:
-            self._device_list_updater = hs.get_device_handler().device_list_updater
             self._maybe_store_room_on_outlier_membership = (
                 self.store.maybe_store_room_on_outlier_membership
             )
 
-        # When joining a room we need to queue any events for that room up.
-        # For each room, a list of (pdu, origin) tuples.
-        self.room_queues: Dict[str, List[Tuple[EventBase, str]]] = {}
-        self._room_pdu_linearizer = Linearizer("fed_room_pdu")
-
         self._room_backfill = Linearizer("room_backfill")
 
         self.third_party_event_rules = hs.get_third_party_event_rules()
 
-        self._ephemeral_messages_enabled = hs.config.enable_ephemeral_messages
-
-    async def on_receive_pdu(
-        self, origin: str, pdu: EventBase, sent_to_us_directly: bool = False
-    ) -> None:
-        """Process a PDU received via a federation /send/ transaction, or
-        via backfill of missing prev_events
-
-        Args:
-            origin: server which initiated the /send/ transaction. Will
-                be used to fetch missing events or state.
-            pdu: received PDU
-            sent_to_us_directly: True if this event was pushed to us; False if
-                we pulled it as the result of a missing prev_event.
-        """
-
-        room_id = pdu.room_id
-        event_id = pdu.event_id
-
-        # We reprocess pdus when we have seen them only as outliers
-        existing = await self.store.get_event(
-            event_id, allow_none=True, allow_rejected=True
-        )
-
-        # FIXME: Currently we fetch an event again when we already have it
-        # if it has been marked as an outlier.
-        if existing:
-            if not existing.internal_metadata.is_outlier():
-                logger.info(
-                    "Ignoring received event %s which we have already seen", event_id
-                )
-                return
-            if pdu.internal_metadata.is_outlier():
-                logger.info(
-                    "Ignoring received outlier %s which we already have as an outlier",
-                    event_id,
-                )
-                return
-            logger.info("De-outliering event %s", event_id)
-
-        # do some initial sanity-checking of the event. In particular, make
-        # sure it doesn't have hundreds of prev_events or auth_events, which
-        # could cause a huge state resolution or cascade of event fetches.
-        try:
-            self._sanity_check_event(pdu)
-        except SynapseError as err:
-            logger.warning("Received event failed sanity checks")
-            raise FederationError("ERROR", err.code, err.msg, affected=pdu.event_id)
-
-        # If we are currently in the process of joining this room, then we
-        # queue up events for later processing.
-        if room_id in self.room_queues:
-            logger.info(
-                "Queuing PDU from %s for now: join in progress",
-                origin,
-            )
-            self.room_queues[room_id].append((pdu, origin))
-            return
-
-        # If we're not in the room just ditch the event entirely. This is
-        # probably an old server that has come back and thinks we're still in
-        # the room (or we've been rejoined to the room by a state reset).
-        #
-        # Note that if we were never in the room then we would have already
-        # dropped the event, since we wouldn't know the room version.
-        is_in_room = await self._event_auth_handler.check_host_in_room(
-            room_id, self.server_name
-        )
-        if not is_in_room:
-            logger.info(
-                "Ignoring PDU from %s as we're not in the room",
-                origin,
-            )
-            return None
-
-        state = None
-
-        # Check that the event passes auth based on the state at the event. This is
-        # done for events that are to be added to the timeline (non-outliers).
-        #
-        # Get missing pdus if necessary:
-        #  - Fetching any missing prev events to fill in gaps in the graph
-        #  - Fetching state if we have a hole in the graph
-        if not pdu.internal_metadata.is_outlier():
-            # We only backfill backwards to the min depth.
-            min_depth = await self.get_min_depth_for_context(pdu.room_id)
-
-            logger.debug("min_depth: %d", min_depth)
-
-            prevs = set(pdu.prev_event_ids())
-            seen = await self.store.have_events_in_timeline(prevs)
-
-            if min_depth is not None and pdu.depth < min_depth:
-                # This is so that we don't notify the user about this
-                # message, to work around the fact that some events will
-                # reference really really old events we really don't want to
-                # send to the clients.
-                pdu.internal_metadata.outlier = True
-            elif min_depth is not None and pdu.depth > min_depth:
-                missing_prevs = prevs - seen
-                if sent_to_us_directly and missing_prevs:
-                    # If we're missing stuff, ensure we only fetch stuff one
-                    # at a time.
-                    logger.info(
-                        "Acquiring room lock to fetch %d missing prev_events: %s",
-                        len(missing_prevs),
-                        shortstr(missing_prevs),
-                    )
-                    with (await self._room_pdu_linearizer.queue(pdu.room_id)):
-                        logger.info(
-                            "Acquired room lock to fetch %d missing prev_events",
-                            len(missing_prevs),
-                        )
-
-                        try:
-                            await self._get_missing_events_for_pdu(
-                                origin, pdu, prevs, min_depth
-                            )
-                        except Exception as e:
-                            raise Exception(
-                                "Error fetching missing prev_events for %s: %s"
-                                % (event_id, e)
-                            ) from e
-
-                        # Update the set of things we've seen after trying to
-                        # fetch the missing stuff
-                        seen = await self.store.have_events_in_timeline(prevs)
-
-                        if not prevs - seen:
-                            logger.info(
-                                "Found all missing prev_events",
-                            )
-
-            missing_prevs = prevs - seen
-            if missing_prevs:
-                # We've still not been able to get all of the prev_events for this event.
-                #
-                # In this case, we need to fall back to asking another server in the
-                # federation for the state at this event. That's ok provided we then
-                # resolve the state against other bits of the DAG before using it (which
-                # will ensure that you can't just take over a room by sending an event,
-                # withholding its prev_events, and declaring yourself to be an admin in
-                # the subsequent state request).
-                #
-                # Now, if we're pulling this event as a missing prev_event, then clearly
-                # this event is not going to become the only forward-extremity and we are
-                # guaranteed to resolve its state against our existing forward
-                # extremities, so that should be fine.
-                #
-                # On the other hand, if this event was pushed to us, it is possible for
-                # it to become the only forward-extremity in the room, and we would then
-                # trust its state to be the state for the whole room. This is very bad.
-                # Further, if the event was pushed to us, there is no excuse for us not to
-                # have all the prev_events. We therefore reject any such events.
-                #
-                # XXX this really feels like it could/should be merged with the above,
-                # but there is an interaction with min_depth that I'm not really
-                # following.
-
-                if sent_to_us_directly:
-                    logger.warning(
-                        "Rejecting: failed to fetch %d prev events: %s",
-                        len(missing_prevs),
-                        shortstr(missing_prevs),
-                    )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        (
-                            "Your server isn't divulging details about prev_events "
-                            "referenced in this event."
-                        ),
-                        affected=pdu.event_id,
-                    )
-
-                logger.info(
-                    "Event %s is missing prev_events %s: calculating state for a "
-                    "backwards extremity",
-                    event_id,
-                    shortstr(missing_prevs),
-                )
-
-                # Calculate the state after each of the previous events, and
-                # resolve them to find the correct state at the current event.
-                event_map = {event_id: pdu}
-                try:
-                    # Get the state of the events we know about
-                    ours = await self.state_store.get_state_groups_ids(room_id, seen)
-
-                    # state_maps is a list of mappings from (type, state_key) to event_id
-                    state_maps: List[StateMap[str]] = list(ours.values())
-
-                    # we don't need this any more, let's delete it.
-                    del ours
-
-                    # Ask the remote server for the states we don't
-                    # know about
-                    for p in missing_prevs:
-                        logger.info("Requesting state after missing prev_event %s", p)
-
-                        with nested_logging_context(p):
-                            # note that if any of the missing prevs share missing state or
-                            # auth events, the requests to fetch those events are deduped
-                            # by the get_pdu_cache in federation_client.
-                            remote_state = (
-                                await self._get_state_after_missing_prev_event(
-                                    origin, room_id, p
-                                )
-                            )
-
-                            remote_state_map = {
-                                (x.type, x.state_key): x.event_id for x in remote_state
-                            }
-                            state_maps.append(remote_state_map)
-
-                            for x in remote_state:
-                                event_map[x.event_id] = x
-
-                    room_version = await self.store.get_room_version_id(room_id)
-                    state_map = (
-                        await self._state_resolution_handler.resolve_events_with_store(
-                            room_id,
-                            room_version,
-                            state_maps,
-                            event_map,
-                            state_res_store=StateResolutionStore(self.store),
-                        )
-                    )
-
-                    # We need to give _process_received_pdu the actual state events
-                    # rather than event ids, so generate that now.
-
-                    # First though we need to fetch all the events that are in
-                    # state_map, so we can build up the state below.
-                    evs = await self.store.get_events(
-                        list(state_map.values()),
-                        get_prev_content=False,
-                        redact_behaviour=EventRedactBehaviour.AS_IS,
-                    )
-                    event_map.update(evs)
-
-                    state = [event_map[e] for e in state_map.values()]
-                except Exception:
-                    logger.warning(
-                        "Error attempting to resolve state at missing " "prev_events",
-                        exc_info=True,
-                    )
-                    raise FederationError(
-                        "ERROR",
-                        403,
-                        "We can't get valid state history.",
-                        affected=event_id,
-                    )
-
-        # A second round of checks for all events. Check that the event passes auth
-        # based on `auth_events`, this allows us to assert that the event would
-        # have been allowed at some point. If an event passes this check its OK
-        # for it to be used as part of a returned `/state` request, as either
-        # a) we received the event as part of the original join and so trust it, or
-        # b) we'll do a state resolution with existing state before it becomes
-        # part of the "current state", which adds more protection.
-        await self._process_received_pdu(origin, pdu, state=state)
-
-    async def _get_missing_events_for_pdu(
-        self, origin: str, pdu: EventBase, prevs: Set[str], min_depth: int
-    ) -> None:
-        """
-        Args:
-            origin: Origin of the pdu. Will be called to get the missing events
-            pdu: received pdu
-            prevs: List of event ids which we are missing
-            min_depth: Minimum depth of events to return.
-        """
-
-        room_id = pdu.room_id
-        event_id = pdu.event_id
-
-        seen = await self.store.have_events_in_timeline(prevs)
-
-        if not prevs - seen:
-            return
-
-        latest_list = await self.store.get_latest_event_ids_in_room(room_id)
-
-        # We add the prev events that we have seen to the latest
-        # list to ensure the remote server doesn't give them to us
-        latest = set(latest_list)
-        latest |= seen
-
-        logger.info(
-            "Requesting missing events between %s and %s",
-            shortstr(latest),
-            event_id,
-        )
-
-        # XXX: we set timeout to 10s to help workaround
-        # https://github.com/matrix-org/synapse/issues/1733.
-        # The reason is to avoid holding the linearizer lock
-        # whilst processing inbound /send transactions, causing
-        # FDs to stack up and block other inbound transactions
-        # which empirically can currently take up to 30 minutes.
-        #
-        # N.B. this explicitly disables retry attempts.
-        #
-        # N.B. this also increases our chances of falling back to
-        # fetching fresh state for the room if the missing event
-        # can't be found, which slightly reduces our security.
-        # it may also increase our DAG extremity count for the room,
-        # causing additional state resolution?  See #1760.
-        # However, fetching state doesn't hold the linearizer lock
-        # apparently.
-        #
-        # see https://github.com/matrix-org/synapse/pull/1744
-        #
-        # ----
-        #
-        # Update richvdh 2018/09/18: There are a number of problems with timing this
-        # request out aggressively on the client side:
-        #
-        # - it plays badly with the server-side rate-limiter, which starts tarpitting you
-        #   if you send too many requests at once, so you end up with the server carefully
-        #   working through the backlog of your requests, which you have already timed
-        #   out.
-        #
-        # - for this request in particular, we now (as of
-        #   https://github.com/matrix-org/synapse/pull/3456) reject any PDUs where the
-        #   server can't produce a plausible-looking set of prev_events - so we becone
-        #   much more likely to reject the event.
-        #
-        # - contrary to what it says above, we do *not* fall back to fetching fresh state
-        #   for the room if get_missing_events times out. Rather, we give up processing
-        #   the PDU whose prevs we are missing, which then makes it much more likely that
-        #   we'll end up back here for the *next* PDU in the list, which exacerbates the
-        #   problem.
-        #
-        # - the aggressive 10s timeout was introduced to deal with incoming federation
-        #   requests taking 8 hours to process. It's not entirely clear why that was going
-        #   on; certainly there were other issues causing traffic storms which are now
-        #   resolved, and I think in any case we may be more sensible about our locking
-        #   now. We're *certainly* more sensible about our logging.
-        #
-        # All that said: Let's try increasing the timeout to 60s and see what happens.
-
-        try:
-            missing_events = await self.federation_client.get_missing_events(
-                origin,
-                room_id,
-                earliest_events_ids=list(latest),
-                latest_events=[pdu],
-                limit=10,
-                min_depth=min_depth,
-                timeout=60000,
-            )
-        except (RequestSendFailed, HttpResponseException, NotRetryingDestination) as e:
-            # We failed to get the missing events, but since we need to handle
-            # the case of `get_missing_events` not returning the necessary
-            # events anyway, it is safe to simply log the error and continue.
-            logger.warning("Failed to get prev_events: %s", e)
-            return
-
-        logger.info("Got %d prev_events", len(missing_events))
-
-        # We want to sort these by depth so we process them and
-        # tell clients about them in order.
-        missing_events.sort(key=lambda x: x.depth)
-
-        for ev in missing_events:
-            logger.info("Handling received prev_event %s", ev)
-            with nested_logging_context(ev.event_id):
-                try:
-                    await self.on_receive_pdu(origin, ev, sent_to_us_directly=False)
-                except FederationError as e:
-                    if e.code == 403:
-                        logger.warning(
-                            "Received prev_event %s failed history check.",
-                            ev.event_id,
-                        )
-                    else:
-                        raise
-
-    async def _get_state_for_room(
-        self,
-        destination: str,
-        room_id: str,
-        event_id: str,
-    ) -> List[EventBase]:
-        """Requests all of the room state at a given event from a remote
-        homeserver.
-
-        Will also fetch any missing events reported in the `auth_chain_ids`
-        section of `/state_ids`.
-
-        Args:
-            destination: The remote homeserver to query for the state.
-            room_id: The id of the room we're interested in.
-            event_id: The id of the event we want the state at.
-
-        Returns:
-            A list of events in the state, not including the event itself.
-        """
-        (
-            state_event_ids,
-            auth_event_ids,
-        ) = await self.federation_client.get_room_state_ids(
-            destination, room_id, event_id=event_id
-        )
-
-        # Fetch the state events from the DB, and check we have the auth events.
-        event_map = await self.store.get_events(state_event_ids, allow_rejected=True)
-        auth_events_in_store = await self.store.have_seen_events(
-            room_id, auth_event_ids
-        )
-
-        # Check for missing events. We handle state and auth event seperately,
-        # as we want to pull the state from the DB, but we don't for the auth
-        # events. (Note: we likely won't use the majority of the auth chain, and
-        # it can be *huge* for large rooms, so it's worth ensuring that we don't
-        # unnecessarily pull it from the DB).
-        missing_state_events = set(state_event_ids) - set(event_map)
-        missing_auth_events = set(auth_event_ids) - set(auth_events_in_store)
-        if missing_state_events or missing_auth_events:
-            await self._get_events_and_persist(
-                destination=destination,
-                room_id=room_id,
-                events=missing_state_events | missing_auth_events,
-            )
-
-            if missing_state_events:
-                new_events = await self.store.get_events(
-                    missing_state_events, allow_rejected=True
-                )
-                event_map.update(new_events)
-
-                missing_state_events.difference_update(new_events)
-
-                if missing_state_events:
-                    logger.warning(
-                        "Failed to fetch missing state events for %s %s",
-                        event_id,
-                        missing_state_events,
-                    )
-
-            if missing_auth_events:
-                auth_events_in_store = await self.store.have_seen_events(
-                    room_id, missing_auth_events
-                )
-                missing_auth_events.difference_update(auth_events_in_store)
-
-                if missing_auth_events:
-                    logger.warning(
-                        "Failed to fetch missing auth events for %s %s",
-                        event_id,
-                        missing_auth_events,
-                    )
-
-        remote_state = list(event_map.values())
-
-        # check for events which were in the wrong room.
-        #
-        # this can happen if a remote server claims that the state or
-        # auth_events at an event in room A are actually events in room B
-
-        bad_events = [
-            (event.event_id, event.room_id)
-            for event in remote_state
-            if event.room_id != room_id
-        ]
-
-        for bad_event_id, bad_room_id in bad_events:
-            # This is a bogus situation, but since we may only discover it a long time
-            # after it happened, we try our best to carry on, by just omitting the
-            # bad events from the returned auth/state set.
-            logger.warning(
-                "Remote server %s claims event %s in room %s is an auth/state "
-                "event in room %s",
-                destination,
-                bad_event_id,
-                bad_room_id,
-                room_id,
-            )
-
-        if bad_events:
-            remote_state = [e for e in remote_state if e.room_id == room_id]
-
-        return remote_state
-
-    async def _get_state_after_missing_prev_event(
-        self,
-        destination: str,
-        room_id: str,
-        event_id: str,
-    ) -> List[EventBase]:
-        """Requests all of the room state at a given event from a remote homeserver.
-
-        Args:
-            destination: The remote homeserver to query for the state.
-            room_id: The id of the room we're interested in.
-            event_id: The id of the event we want the state at.
-
-        Returns:
-            A list of events in the state, including the event itself
-        """
-        # TODO: This function is basically the same as _get_state_for_room. Can
-        #   we make backfill() use it, rather than having two code paths? I think the
-        #   only difference is that backfill() persists the prev events separately.
-
-        (
-            state_event_ids,
-            auth_event_ids,
-        ) = await self.federation_client.get_room_state_ids(
-            destination, room_id, event_id=event_id
-        )
-
-        logger.debug(
-            "state_ids returned %i state events, %i auth events",
-            len(state_event_ids),
-            len(auth_event_ids),
-        )
-
-        # start by just trying to fetch the events from the store
-        desired_events = set(state_event_ids)
-        desired_events.add(event_id)
-        logger.debug("Fetching %i events from cache/store", len(desired_events))
-        fetched_events = await self.store.get_events(
-            desired_events, allow_rejected=True
-        )
-
-        missing_desired_events = desired_events - fetched_events.keys()
-        logger.debug(
-            "We are missing %i events (got %i)",
-            len(missing_desired_events),
-            len(fetched_events),
-        )
-
-        # We probably won't need most of the auth events, so let's just check which
-        # we have for now, rather than thrashing the event cache with them all
-        # unnecessarily.
-
-        # TODO: we probably won't actually need all of the auth events, since we
-        #   already have a bunch of the state events. It would be nice if the
-        #   federation api gave us a way of finding out which we actually need.
-
-        missing_auth_events = set(auth_event_ids) - fetched_events.keys()
-        missing_auth_events.difference_update(
-            await self.store.have_seen_events(room_id, missing_auth_events)
-        )
-        logger.debug("We are also missing %i auth events", len(missing_auth_events))
-
-        missing_events = missing_desired_events | missing_auth_events
-        logger.debug("Fetching %i events from remote", len(missing_events))
-        await self._get_events_and_persist(
-            destination=destination, room_id=room_id, events=missing_events
-        )
-
-        # we need to make sure we re-load from the database to get the rejected
-        # state correct.
-        fetched_events.update(
-            await self.store.get_events(missing_desired_events, allow_rejected=True)
-        )
-
-        # check for events which were in the wrong room.
-        #
-        # this can happen if a remote server claims that the state or
-        # auth_events at an event in room A are actually events in room B
-
-        bad_events = [
-            (event_id, event.room_id)
-            for event_id, event in fetched_events.items()
-            if event.room_id != room_id
-        ]
-
-        for bad_event_id, bad_room_id in bad_events:
-            # This is a bogus situation, but since we may only discover it a long time
-            # after it happened, we try our best to carry on, by just omitting the
-            # bad events from the returned state set.
-            logger.warning(
-                "Remote server %s claims event %s in room %s is an auth/state "
-                "event in room %s",
-                destination,
-                bad_event_id,
-                bad_room_id,
-                room_id,
-            )
-
-            del fetched_events[bad_event_id]
-
-        # if we couldn't get the prev event in question, that's a problem.
-        remote_event = fetched_events.get(event_id)
-        if not remote_event:
-            raise Exception("Unable to get missing prev_event %s" % (event_id,))
-
-        # missing state at that event is a warning, not a blocker
-        # XXX: this doesn't sound right? it means that we'll end up with incomplete
-        #   state.
-        failed_to_fetch = desired_events - fetched_events.keys()
-        if failed_to_fetch:
-            logger.warning(
-                "Failed to fetch missing state events for %s %s",
-                event_id,
-                failed_to_fetch,
-            )
-
-        remote_state = [
-            fetched_events[e_id] for e_id in state_event_ids if e_id in fetched_events
-        ]
-
-        if remote_event.is_state() and remote_event.rejected_reason is None:
-            remote_state.append(remote_event)
-
-        return remote_state
-
-    async def _process_received_pdu(
-        self,
-        origin: str,
-        event: EventBase,
-        state: Optional[Iterable[EventBase]],
-    ) -> None:
-        """Called when we have a new pdu. We need to do auth checks and put it
-        through the StateHandler.
-
-        Args:
-            origin: server sending the event
-
-            event: event to be persisted
-
-            state: Normally None, but if we are handling a gap in the graph
-                (ie, we are missing one or more prev_events), the resolved state at the
-                event
-        """
-        logger.debug("Processing event: %s", event)
-
-        try:
-            context = await self.state_handler.compute_event_context(
-                event, old_state=state
-            )
-            await self._auth_and_persist_event(origin, event, context, state=state)
-        except AuthError as e:
-            raise FederationError("ERROR", e.code, e.msg, affected=event.event_id)
-
-        # For encrypted messages we check that we know about the sending device,
-        # if we don't then we mark the device cache for that user as stale.
-        if event.type == EventTypes.Encrypted:
-            device_id = event.content.get("device_id")
-            sender_key = event.content.get("sender_key")
-
-            cached_devices = await self.store.get_cached_devices_for_user(event.sender)
-
-            resync = False  # Whether we should resync device lists.
-
-            device = None
-            if device_id is not None:
-                device = cached_devices.get(device_id)
-                if device is None:
-                    logger.info(
-                        "Received event from remote device not in our cache: %s %s",
-                        event.sender,
-                        device_id,
-                    )
-                    resync = True
-
-            # We also check if the `sender_key` matches what we expect.
-            if sender_key is not None:
-                # Figure out what sender key we're expecting. If we know the
-                # device and recognize the algorithm then we can work out the
-                # exact key to expect. Otherwise check it matches any key we
-                # have for that device.
-
-                current_keys: Container[str] = []
-
-                if device:
-                    keys = device.get("keys", {}).get("keys", {})
-
-                    if (
-                        event.content.get("algorithm")
-                        == RoomEncryptionAlgorithms.MEGOLM_V1_AES_SHA2
-                    ):
-                        # For this algorithm we expect a curve25519 key.
-                        key_name = "curve25519:%s" % (device_id,)
-                        current_keys = [keys.get(key_name)]
-                    else:
-                        # We don't know understand the algorithm, so we just
-                        # check it matches a key for the device.
-                        current_keys = keys.values()
-                elif device_id:
-                    # We don't have any keys for the device ID.
-                    pass
-                else:
-                    # The event didn't include a device ID, so we just look for
-                    # keys across all devices.
-                    current_keys = [
-                        key
-                        for device in cached_devices.values()
-                        for key in device.get("keys", {}).get("keys", {}).values()
-                    ]
-
-                # We now check that the sender key matches (one of) the expected
-                # keys.
-                if sender_key not in current_keys:
-                    logger.info(
-                        "Received event from remote device with unexpected sender key: %s %s: %s",
-                        event.sender,
-                        device_id or "<no device_id>",
-                        sender_key,
-                    )
-                    resync = True
-
-            if resync:
-                run_as_background_process(
-                    "resync_device_due_to_pdu", self._resync_device, event.sender
-                )
-
-        await self._handle_marker_event(origin, event)
-
-    async def _handle_marker_event(self, origin: str, marker_event: EventBase):
-        """Handles backfilling the insertion event when we receive a marker
-        event that points to one.
-
-        Args:
-            origin: Origin of the event. Will be called to get the insertion event
-            marker_event: The event to process
-        """
-
-        if marker_event.type != EventTypes.MSC2716_MARKER:
-            # Not a marker event
-            return
-
-        if marker_event.rejected_reason is not None:
-            # Rejected event
-            return
-
-        # Skip processing a marker event if the room version doesn't
-        # support it.
-        room_version = await self.store.get_room_version(marker_event.room_id)
-        if not room_version.msc2716_historical:
-            return
-
-        logger.debug("_handle_marker_event: received %s", marker_event)
-
-        insertion_event_id = marker_event.content.get(
-            EventContentFields.MSC2716_MARKER_INSERTION
-        )
-
-        if insertion_event_id is None:
-            # Nothing to retrieve then (invalid marker)
-            return
-
-        logger.debug(
-            "_handle_marker_event: backfilling insertion event %s", insertion_event_id
-        )
-
-        await self._get_events_and_persist(
-            origin,
-            marker_event.room_id,
-            [insertion_event_id],
-        )
-
-        insertion_event = await self.store.get_event(
-            insertion_event_id, allow_none=True
-        )
-        if insertion_event is None:
-            logger.warning(
-                "_handle_marker_event: server %s didn't return insertion event %s for marker %s",
-                origin,
-                insertion_event_id,
-                marker_event.event_id,
-            )
-            return
-
-        logger.debug(
-            "_handle_marker_event: succesfully backfilled insertion event %s from marker event %s",
-            insertion_event,
-            marker_event,
-        )
-
-        await self.store.insert_insertion_extremity(
-            insertion_event_id, marker_event.room_id
-        )
-
-        logger.debug(
-            "_handle_marker_event: insertion extremity added for %s from marker event %s",
-            insertion_event,
-            marker_event,
-        )
-
-    async def _resync_device(self, sender: str) -> None:
-        """We have detected that the device list for the given user may be out
-        of sync, so we try and resync them.
-        """
-
-        try:
-            await self.store.mark_remote_user_device_cache_as_stale(sender)
-
-            # Immediately attempt a resync in the background
-            if self.config.worker_app:
-                await self._user_device_resync(user_id=sender)
-            else:
-                await self._device_list_updater.user_device_resync(sender)
-        except Exception:
-            logger.exception("Failed to resync device for %s", sender)
-
-    @log_function
-    async def backfill(
-        self, dest: str, room_id: str, limit: int, extremities: List[str]
-    ) -> List[EventBase]:
-        """Trigger a backfill request to `dest` for the given `room_id`
-
-        This will attempt to get more events from the remote. If the other side
-        has no new events to offer, this will return an empty list.
-
-        As the events are received, we check their signatures, and also do some
-        sanity-checking on them. If any of the backfilled events are invalid,
-        this method throws a SynapseError.
-
-        TODO: make this more useful to distinguish failures of the remote
-        server from invalid events (there is probably no point in trying to
-        re-fetch invalid events from every other HS in the room.)
-        """
-        if dest == self.server_name:
-            raise SynapseError(400, "Can't backfill from self.")
-
-        events = await self.federation_client.backfill(
-            dest, room_id, limit=limit, extremities=extremities
-        )
-
-        if not events:
-            return []
-
-        # ideally we'd sanity check the events here for excess prev_events etc,
-        # but it's hard to reject events at this point without completely
-        # breaking backfill in the same way that it is currently broken by
-        # events whose signature we cannot verify (#3121).
-        #
-        # So for now we accept the events anyway. #3124 tracks this.
-        #
-        # for ev in events:
-        #     self._sanity_check_event(ev)
-
-        # Don't bother processing events we already have.
-        seen_events = await self.store.have_events_in_timeline(
-            {e.event_id for e in events}
-        )
-
-        events = [e for e in events if e.event_id not in seen_events]
-
-        if not events:
-            return []
-
-        event_map = {e.event_id: e for e in events}
-
-        event_ids = {e.event_id for e in events}
-
-        # build a list of events whose prev_events weren't in the batch.
-        # (XXX: this will include events whose prev_events we already have; that doesn't
-        # sound right?)
-        edges = [ev.event_id for ev in events if set(ev.prev_event_ids()) - event_ids]
-
-        logger.info("backfill: Got %d events with %d edges", len(events), len(edges))
-
-        # For each edge get the current state.
-
-        state_events = {}
-        events_to_state = {}
-        for e_id in edges:
-            state = await self._get_state_for_room(
-                destination=dest,
-                room_id=room_id,
-                event_id=e_id,
-            )
-            state_events.update({s.event_id: s for s in state})
-            events_to_state[e_id] = state
-
-        required_auth = {
-            a_id
-            for event in events + list(state_events.values())
-            for a_id in event.auth_event_ids()
-        }
-        auth_events = await self.store.get_events(required_auth, allow_rejected=True)
-        auth_events.update(
-            {e_id: event_map[e_id] for e_id in required_auth if e_id in event_map}
-        )
-
-        ev_infos = []
-
-        # Step 1: persist the events in the chunk we fetched state for (i.e.
-        # the backwards extremities), with custom auth events and state
-        for e_id in events_to_state:
-            # For paranoia we ensure that these events are marked as
-            # non-outliers
-            ev = event_map[e_id]
-            assert not ev.internal_metadata.is_outlier()
-
-            ev_infos.append(
-                _NewEventInfo(
-                    event=ev,
-                    state=events_to_state[e_id],
-                    claimed_auth_event_map={
-                        (
-                            auth_events[a_id].type,
-                            auth_events[a_id].state_key,
-                        ): auth_events[a_id]
-                        for a_id in ev.auth_event_ids()
-                        if a_id in auth_events
-                    },
-                )
-            )
-
-        if ev_infos:
-            await self._auth_and_persist_events(
-                dest, room_id, ev_infos, backfilled=True
-            )
-
-        # Step 2: Persist the rest of the events in the chunk one by one
-        events.sort(key=lambda e: e.depth)
-
-        for event in events:
-            if event in events_to_state:
-                continue
-
-            # For paranoia we ensure that these events are marked as
-            # non-outliers
-            assert not event.internal_metadata.is_outlier()
-
-            context = await self.state_handler.compute_event_context(event)
-
-            # We store these one at a time since each event depends on the
-            # previous to work out the state.
-            # TODO: We can probably do something more clever here.
-            await self._auth_and_persist_event(dest, event, context, backfilled=True)
-
-        return events
-
     async def maybe_backfill(
         self, room_id: str, current_depth: int, limit: int
     ) -> bool:
@@ -1326,14 +308,14 @@ class FederationHandler(BaseHandler):
             # TODO: Should we try multiple of these at a time?
             for dom in domains:
                 try:
-                    await self.backfill(
+                    await self._federation_event_handler.backfill(
                         dom, room_id, limit=100, extremities=extremities
                     )
                     # If this succeeded then we probably already have the
                     # appropriate stuff.
                     # TODO: We can probably do something more intelligent here.
                     return True
-                except SynapseError as e:
+                except (SynapseError, InvalidResponseError) as e:
                     logger.info("Failed to backfill from %s because %s", dom, e)
                     continue
                 except HttpResponseException as e:
@@ -1417,115 +399,6 @@ class FederationHandler(BaseHandler):
 
         return False
 
-    async def _get_events_and_persist(
-        self, destination: str, room_id: str, events: Iterable[str]
-    ) -> None:
-        """Fetch the given events from a server, and persist them as outliers.
-
-        This function *does not* recursively get missing auth events of the
-        newly fetched events. Callers must include in the `events` argument
-        any missing events from the auth chain.
-
-        Logs a warning if we can't find the given event.
-        """
-
-        room_version = await self.store.get_room_version(room_id)
-
-        event_map: Dict[str, EventBase] = {}
-
-        async def get_event(event_id: str):
-            with nested_logging_context(event_id):
-                try:
-                    event = await self.federation_client.get_pdu(
-                        [destination],
-                        event_id,
-                        room_version,
-                        outlier=True,
-                    )
-                    if event is None:
-                        logger.warning(
-                            "Server %s didn't return event %s",
-                            destination,
-                            event_id,
-                        )
-                        return
-
-                    event_map[event.event_id] = event
-
-                except Exception as e:
-                    logger.warning(
-                        "Error fetching missing state/auth event %s: %s %s",
-                        event_id,
-                        type(e),
-                        e,
-                    )
-
-        await concurrently_execute(get_event, events, 5)
-
-        # Make a map of auth events for each event. We do this after fetching
-        # all the events as some of the events' auth events will be in the list
-        # of requested events.
-
-        auth_events = [
-            aid
-            for event in event_map.values()
-            for aid in event.auth_event_ids()
-            if aid not in event_map
-        ]
-        persisted_events = await self.store.get_events(
-            auth_events,
-            allow_rejected=True,
-        )
-
-        event_infos = []
-        for event in event_map.values():
-            auth = {}
-            for auth_event_id in event.auth_event_ids():
-                ae = persisted_events.get(auth_event_id) or event_map.get(auth_event_id)
-                if ae:
-                    auth[(ae.type, ae.state_key)] = ae
-                else:
-                    logger.info("Missing auth event %s", auth_event_id)
-
-            event_infos.append(_NewEventInfo(event, None, auth))
-
-        if event_infos:
-            await self._auth_and_persist_events(
-                destination,
-                room_id,
-                event_infos,
-            )
-
-    def _sanity_check_event(self, ev: EventBase) -> None:
-        """
-        Do some early sanity checks of a received event
-
-        In particular, checks it doesn't have an excessive number of
-        prev_events or auth_events, which could cause a huge state resolution
-        or cascade of event fetches.
-
-        Args:
-            ev: event to be checked
-
-        Raises:
-            SynapseError if the event does not pass muster
-        """
-        if len(ev.prev_event_ids()) > 20:
-            logger.warning(
-                "Rejecting event %s which has %i prev_events",
-                ev.event_id,
-                len(ev.prev_event_ids()),
-            )
-            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many prev_events")
-
-        if len(ev.auth_event_ids()) > 10:
-            logger.warning(
-                "Rejecting event %s which has %i auth_events",
-                ev.event_id,
-                len(ev.auth_event_ids()),
-            )
-            raise SynapseError(HTTPStatus.BAD_REQUEST, "Too many auth_events")
-
     async def send_invite(self, target_host: str, event: EventBase) -> EventBase:
         """Sends the invite to the remote server for signing.
 
@@ -1591,9 +464,9 @@ class FederationHandler(BaseHandler):
         # This shouldn't happen, because the RoomMemberHandler has a
         # linearizer lock which only allows one operation per user per room
         # at a time - so this is just paranoia.
-        assert room_id not in self.room_queues
+        assert room_id not in self._federation_event_handler.room_queues
 
-        self.room_queues[room_id] = []
+        self._federation_event_handler.room_queues[room_id] = []
 
         await self._clean_room_for_join(room_id)
 
@@ -1667,8 +540,8 @@ class FederationHandler(BaseHandler):
             logger.debug("Finished joining %s to %s", joinee, room_id)
             return event.event_id, max_stream_id
         finally:
-            room_queue = self.room_queues[room_id]
-            del self.room_queues[room_id]
+            room_queue = self._federation_event_handler.room_queues[room_id]
+            del self._federation_event_handler.room_queues[room_id]
 
             # we don't need to wait for the queued events to be processed -
             # it's just a best-effort thing at this point. We do want to do
@@ -1744,7 +617,7 @@ class FederationHandler(BaseHandler):
         event.unsigned["knock_room_state"] = stripped_room_state["knock_state_events"]
 
         context = await self.state_handler.compute_event_context(event)
-        stream_id = await self.persist_events_and_notify(
+        stream_id = await self._federation_event_handler.persist_events_and_notify(
             event.room_id, [(event, context)]
         )
         return event.event_id, stream_id
@@ -1764,7 +637,7 @@ class FederationHandler(BaseHandler):
                     p,
                 )
                 with nested_logging_context(p.event_id):
-                    await self.on_receive_pdu(origin, p, sent_to_us_directly=True)
+                    await self._federation_event_handler.on_receive_pdu(origin, p)
             except Exception as e:
                 logger.warning(
                     "Error handling queued PDU %s from %s: %s", p.event_id, origin, e
@@ -1857,7 +730,7 @@ class FederationHandler(BaseHandler):
             raise
 
         # Ensure the user can even join the room.
-        await self._check_join_restrictions(context, event)
+        await self._federation_event_handler.check_join_restrictions(context, event)
 
         # The remote hasn't signed it yet, obviously. We'll do the full checks
         # when we get the event back in `on_send_join_request`
@@ -1934,7 +807,9 @@ class FederationHandler(BaseHandler):
         )
 
         context = await self.state_handler.compute_event_context(event)
-        await self.persist_events_and_notify(event.room_id, [(event, context)])
+        await self._federation_event_handler.persist_events_and_notify(
+            event.room_id, [(event, context)]
+        )
 
         return event
 
@@ -1961,7 +836,7 @@ class FederationHandler(BaseHandler):
         await self.federation_client.send_leave(host_list, event)
 
         context = await self.state_handler.compute_event_context(event)
-        stream_id = await self.persist_events_and_notify(
+        stream_id = await self._federation_event_handler.persist_events_and_notify(
             event.room_id, [(event, context)]
         )
 
@@ -2104,116 +979,6 @@ class FederationHandler(BaseHandler):
 
         return event
 
-    @log_function
-    async def on_send_membership_event(
-        self, origin: str, event: EventBase
-    ) -> Tuple[EventBase, EventContext]:
-        """
-        We have received a join/leave/knock event for a room via send_join/leave/knock.
-
-        Verify that event and send it into the room on the remote homeserver's behalf.
-
-        This is quite similar to on_receive_pdu, with the following principal
-        differences:
-          * only membership events are permitted (and only events with
-            sender==state_key -- ie, no kicks or bans)
-          * *We* send out the event on behalf of the remote server.
-          * We enforce the membership restrictions of restricted rooms.
-          * Rejected events result in an exception rather than being stored.
-
-        There are also other differences, however it is not clear if these are by
-        design or omission. In particular, we do not attempt to backfill any missing
-        prev_events.
-
-        Args:
-            origin: The homeserver of the remote (joining/invited/knocking) user.
-            event: The member event that has been signed by the remote homeserver.
-
-        Returns:
-            The event and context of the event after inserting it into the room graph.
-
-        Raises:
-            SynapseError if the event is not accepted into the room
-        """
-        logger.debug(
-            "on_send_membership_event: Got event: %s, signatures: %s",
-            event.event_id,
-            event.signatures,
-        )
-
-        if get_domain_from_id(event.sender) != origin:
-            logger.info(
-                "Got send_membership request for user %r from different origin %s",
-                event.sender,
-                origin,
-            )
-            raise SynapseError(403, "User not from origin", Codes.FORBIDDEN)
-
-        if event.sender != event.state_key:
-            raise SynapseError(400, "state_key and sender must match", Codes.BAD_JSON)
-
-        assert not event.internal_metadata.outlier
-
-        # Send this event on behalf of the other server.
-        #
-        # The remote server isn't a full participant in the room at this point, so
-        # may not have an up-to-date list of the other homeservers participating in
-        # the room, so we send it on their behalf.
-        event.internal_metadata.send_on_behalf_of = origin
-
-        context = await self.state_handler.compute_event_context(event)
-        context = await self._check_event_auth(origin, event, context)
-        if context.rejected:
-            raise SynapseError(
-                403, f"{event.membership} event was rejected", Codes.FORBIDDEN
-            )
-
-        # for joins, we need to check the restrictions of restricted rooms
-        if event.membership == Membership.JOIN:
-            await self._check_join_restrictions(context, event)
-
-        # for knock events, we run the third-party event rules. It's not entirely clear
-        # why we don't do this for other sorts of membership events.
-        if event.membership == Membership.KNOCK:
-            event_allowed, _ = await self.third_party_event_rules.check_event_allowed(
-                event, context
-            )
-            if not event_allowed:
-                logger.info("Sending of knock %s forbidden by third-party rules", event)
-                raise SynapseError(
-                    403, "This event is not allowed in this context", Codes.FORBIDDEN
-                )
-
-        # all looks good, we can persist the event.
-        await self._run_push_actions_and_persist_event(event, context)
-        return event, context
-
-    async def _check_join_restrictions(
-        self, context: EventContext, event: EventBase
-    ) -> None:
-        """Check that restrictions in restricted join rules are matched
-
-        Called when we receive a join event via send_join.
-
-        Raises an auth error if the restrictions are not matched.
-        """
-        prev_state_ids = await context.get_prev_state_ids()
-
-        # Check if the user is already in the room or invited to the room.
-        user_id = event.state_key
-        prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
-        prev_member_event = None
-        if prev_member_event_id:
-            prev_member_event = await self.store.get_event(prev_member_event_id)
-
-        # Check if the member should be allowed access via membership in a space.
-        await self._event_auth_handler.check_restricted_join_rules(
-            prev_state_ids,
-            event.room_version,
-            user_id,
-            prev_member_event,
-        )
-
     async def get_state_for_pdu(self, room_id: str, event_id: str) -> List[EventBase]:
         """Returns the state at the event. i.e. not including said event."""
 
@@ -2314,131 +1079,6 @@ class FederationHandler(BaseHandler):
         else:
             return None
 
-    async def get_min_depth_for_context(self, context: str) -> int:
-        return await self.store.get_min_depth(context)
-
-    async def _auth_and_persist_event(
-        self,
-        origin: str,
-        event: EventBase,
-        context: EventContext,
-        state: Optional[Iterable[EventBase]] = None,
-        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
-        backfilled: bool = False,
-    ) -> None:
-        """
-        Process an event by performing auth checks and then persisting to the database.
-
-        Args:
-            origin: The host the event originates from.
-            event: The event itself.
-            context:
-                The event context.
-
-            state:
-                The state events used to check the event for soft-fail. If this is
-                not provided the current state events will be used.
-
-            claimed_auth_event_map:
-                A map of (type, state_key) => event for the event's claimed auth_events.
-                Possibly incomplete, and possibly including events that are not yet
-                persisted, or authed, or in the right room.
-
-                Only populated where we may not already have persisted these events -
-                for example, when populating outliers.
-
-            backfilled: True if the event was backfilled.
-        """
-        context = await self._check_event_auth(
-            origin,
-            event,
-            context,
-            state=state,
-            claimed_auth_event_map=claimed_auth_event_map,
-            backfilled=backfilled,
-        )
-
-        await self._run_push_actions_and_persist_event(event, context, backfilled)
-
-    async def _run_push_actions_and_persist_event(
-        self, event: EventBase, context: EventContext, backfilled: bool = False
-    ):
-        """Run the push actions for a received event, and persist it.
-
-        Args:
-            event: The event itself.
-            context: The event context.
-            backfilled: True if the event was backfilled.
-        """
-        try:
-            if (
-                not event.internal_metadata.is_outlier()
-                and not backfilled
-                and not context.rejected
-            ):
-                await self.action_generator.handle_push_actions_for_event(
-                    event, context
-                )
-
-            await self.persist_events_and_notify(
-                event.room_id, [(event, context)], backfilled=backfilled
-            )
-        except Exception:
-            run_in_background(
-                self.store.remove_push_actions_from_staging, event.event_id
-            )
-            raise
-
-    async def _auth_and_persist_events(
-        self,
-        origin: str,
-        room_id: str,
-        event_infos: Collection[_NewEventInfo],
-        backfilled: bool = False,
-    ) -> None:
-        """Creates the appropriate contexts and persists events. The events
-        should not depend on one another, e.g. this should be used to persist
-        a bunch of outliers, but not a chunk of individual events that depend
-        on each other for state calculations.
-
-        Notifies about the events where appropriate.
-        """
-
-        if not event_infos:
-            return
-
-        async def prep(ev_info: _NewEventInfo):
-            event = ev_info.event
-            with nested_logging_context(suffix=event.event_id):
-                res = await self.state_handler.compute_event_context(
-                    event, old_state=ev_info.state
-                )
-                res = await self._check_event_auth(
-                    origin,
-                    event,
-                    res,
-                    state=ev_info.state,
-                    claimed_auth_event_map=ev_info.claimed_auth_event_map,
-                    backfilled=backfilled,
-                )
-            return res
-
-        contexts = await make_deferred_yieldable(
-            defer.gatherResults(
-                [run_in_background(prep, ev_info) for ev_info in event_infos],
-                consumeErrors=True,
-            )
-        )
-
-        await self.persist_events_and_notify(
-            room_id,
-            [
-                (ev_info.event, context)
-                for ev_info, context in zip(event_infos, contexts)
-            ],
-            backfilled=backfilled,
-        )
-
     async def _persist_auth_tree(
         self,
         origin: str,
@@ -2536,7 +1176,7 @@ class FederationHandler(BaseHandler):
                 events_to_context[e.event_id].rejected = RejectedReason.AUTH_ERROR
 
         if auth_events or state:
-            await self.persist_events_and_notify(
+            await self._federation_event_handler.persist_events_and_notify(
                 room_id,
                 [
                     (e, events_to_context[e.event_id])
@@ -2548,108 +1188,10 @@ class FederationHandler(BaseHandler):
             event, old_state=state
         )
 
-        return await self.persist_events_and_notify(
+        return await self._federation_event_handler.persist_events_and_notify(
             room_id, [(event, new_event_context)]
         )
 
-    async def _check_for_soft_fail(
-        self,
-        event: EventBase,
-        state: Optional[Iterable[EventBase]],
-        backfilled: bool,
-        origin: str,
-    ) -> None:
-        """Checks if we should soft fail the event; if so, marks the event as
-        such.
-
-        Args:
-            event
-            state: The state at the event if we don't have all the event's prev events
-            backfilled: Whether the event is from backfill
-            origin: The host the event originates from.
-        """
-        # For new (non-backfilled and non-outlier) events we check if the event
-        # passes auth based on the current state. If it doesn't then we
-        # "soft-fail" the event.
-        if backfilled or event.internal_metadata.is_outlier():
-            return
-
-        extrem_ids_list = await self.store.get_latest_event_ids_in_room(event.room_id)
-        extrem_ids = set(extrem_ids_list)
-        prev_event_ids = set(event.prev_event_ids())
-
-        if extrem_ids == prev_event_ids:
-            # If they're the same then the current state is the same as the
-            # state at the event, so no point rechecking auth for soft fail.
-            return
-
-        room_version = await self.store.get_room_version_id(event.room_id)
-        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-        # Calculate the "current state".
-        if state is not None:
-            # If we're explicitly given the state then we won't have all the
-            # prev events, and so we have a gap in the graph. In this case
-            # we want to be a little careful as we might have been down for
-            # a while and have an incorrect view of the current state,
-            # however we still want to do checks as gaps are easy to
-            # maliciously manufacture.
-            #
-            # So we use a "current state" that is actually a state
-            # resolution across the current forward extremities and the
-            # given state at the event. This should correctly handle cases
-            # like bans, especially with state res v2.
-
-            state_sets_d = await self.state_store.get_state_groups(
-                event.room_id, extrem_ids
-            )
-            state_sets: List[Iterable[EventBase]] = list(state_sets_d.values())
-            state_sets.append(state)
-            current_states = await self.state_handler.resolve_events(
-                room_version, state_sets, event
-            )
-            current_state_ids: StateMap[str] = {
-                k: e.event_id for k, e in current_states.items()
-            }
-        else:
-            current_state_ids = await self.state_handler.get_current_state_ids(
-                event.room_id, latest_event_ids=extrem_ids
-            )
-
-        logger.debug(
-            "Doing soft-fail check for %s: state %s",
-            event.event_id,
-            current_state_ids,
-        )
-
-        # Now check if event pass auth against said current state
-        auth_types = auth_types_for_event(room_version_obj, event)
-        current_state_ids_list = [
-            e for k, e in current_state_ids.items() if k in auth_types
-        ]
-
-        auth_events_map = await self.store.get_events(current_state_ids_list)
-        current_auth_events = {
-            (e.type, e.state_key): e for e in auth_events_map.values()
-        }
-
-        try:
-            event_auth.check(room_version_obj, event, auth_events=current_auth_events)
-        except AuthError as e:
-            logger.warning(
-                "Soft-failing %r (from %s) because %s",
-                event,
-                e,
-                origin,
-                extra={
-                    "room_id": event.room_id,
-                    "mxid": event.sender,
-                    "hs": origin,
-                },
-            )
-            soft_failed_event_counter.inc()
-            event.internal_metadata.soft_failed = True
-
     async def on_get_missing_events(
         self,
         origin: str,
@@ -2678,334 +1220,6 @@ class FederationHandler(BaseHandler):
 
         return missing_events
 
-    async def _check_event_auth(
-        self,
-        origin: str,
-        event: EventBase,
-        context: EventContext,
-        state: Optional[Iterable[EventBase]] = None,
-        claimed_auth_event_map: Optional[StateMap[EventBase]] = None,
-        backfilled: bool = False,
-    ) -> EventContext:
-        """
-        Checks whether an event should be rejected (for failing auth checks).
-
-        Args:
-            origin: The host the event originates from.
-            event: The event itself.
-            context:
-                The event context.
-
-            state:
-                The state events used to check the event for soft-fail. If this is
-                not provided the current state events will be used.
-
-            claimed_auth_event_map:
-                A map of (type, state_key) => event for the event's claimed auth_events.
-                Possibly incomplete, and possibly including events that are not yet
-                persisted, or authed, or in the right room.
-
-                Only populated where we may not already have persisted these events -
-                for example, when populating outliers, or the state for a backwards
-                extremity.
-
-            backfilled: True if the event was backfilled.
-
-        Returns:
-            The updated context object.
-        """
-        room_version = await self.store.get_room_version_id(event.room_id)
-        room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-        if claimed_auth_event_map:
-            # if we have a copy of the auth events from the event, use that as the
-            # basis for auth.
-            auth_events = claimed_auth_event_map
-        else:
-            # otherwise, we calculate what the auth events *should* be, and use that
-            prev_state_ids = await context.get_prev_state_ids()
-            auth_events_ids = self._event_auth_handler.compute_auth_events(
-                event, prev_state_ids, for_verification=True
-            )
-            auth_events_x = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events_x.values()}
-
-        try:
-            (
-                context,
-                auth_events_for_auth,
-            ) = await self._update_auth_events_and_context_for_auth(
-                origin, event, context, auth_events
-            )
-        except Exception:
-            # We don't really mind if the above fails, so lets not fail
-            # processing if it does. However, it really shouldn't fail so
-            # let's still log as an exception since we'll still want to fix
-            # any bugs.
-            logger.exception(
-                "Failed to double check auth events for %s with remote. "
-                "Ignoring failure and continuing processing of event.",
-                event.event_id,
-            )
-            auth_events_for_auth = auth_events
-
-        try:
-            event_auth.check(room_version_obj, event, auth_events=auth_events_for_auth)
-        except AuthError as e:
-            logger.warning("Failed auth resolution for %r because %s", event, e)
-            context.rejected = RejectedReason.AUTH_ERROR
-
-        if not context.rejected:
-            await self._check_for_soft_fail(event, state, backfilled, origin=origin)
-
-        if event.type == EventTypes.GuestAccess and not context.rejected:
-            await self.maybe_kick_guest_users(event)
-
-        # If we are going to send this event over federation we precaclculate
-        # the joined hosts.
-        if event.internal_metadata.get_send_on_behalf_of():
-            await self.event_creation_handler.cache_joined_hosts_for_event(
-                event, context
-            )
-
-        return context
-
-    async def _update_auth_events_and_context_for_auth(
-        self,
-        origin: str,
-        event: EventBase,
-        context: EventContext,
-        input_auth_events: StateMap[EventBase],
-    ) -> Tuple[EventContext, StateMap[EventBase]]:
-        """Helper for _check_event_auth. See there for docs.
-
-        Checks whether a given event has the expected auth events. If it
-        doesn't then we talk to the remote server to compare state to see if
-        we can come to a consensus (e.g. if one server missed some valid
-        state).
-
-        This attempts to resolve any potential divergence of state between
-        servers, but is not essential and so failures should not block further
-        processing of the event.
-
-        Args:
-            origin:
-            event:
-            context:
-
-            input_auth_events:
-                Map from (event_type, state_key) to event
-
-                Normally, our calculated auth_events based on the state of the room
-                at the event's position in the DAG, though occasionally (eg if the
-                event is an outlier), may be the auth events claimed by the remote
-                server.
-
-        Returns:
-            updated context, updated auth event map
-        """
-        # take a copy of input_auth_events before we modify it.
-        auth_events: MutableStateMap[EventBase] = dict(input_auth_events)
-
-        event_auth_events = set(event.auth_event_ids())
-
-        # missing_auth is the set of the event's auth_events which we don't yet have
-        # in auth_events.
-        missing_auth = event_auth_events.difference(
-            e.event_id for e in auth_events.values()
-        )
-
-        # if we have missing events, we need to fetch those events from somewhere.
-        #
-        # we start by checking if they are in the store, and then try calling /event_auth/.
-        if missing_auth:
-            have_events = await self.store.have_seen_events(event.room_id, missing_auth)
-            logger.debug("Events %s are in the store", have_events)
-            missing_auth.difference_update(have_events)
-
-        if missing_auth:
-            # If we don't have all the auth events, we need to get them.
-            logger.info("auth_events contains unknown events: %s", missing_auth)
-            try:
-                try:
-                    remote_auth_chain = await self.federation_client.get_event_auth(
-                        origin, event.room_id, event.event_id
-                    )
-                except RequestSendFailed as e1:
-                    # The other side isn't around or doesn't implement the
-                    # endpoint, so lets just bail out.
-                    logger.info("Failed to get event auth from remote: %s", e1)
-                    return context, auth_events
-
-                seen_remotes = await self.store.have_seen_events(
-                    event.room_id, [e.event_id for e in remote_auth_chain]
-                )
-
-                for e in remote_auth_chain:
-                    if e.event_id in seen_remotes:
-                        continue
-
-                    if e.event_id == event.event_id:
-                        continue
-
-                    try:
-                        auth_ids = e.auth_event_ids()
-                        auth = {
-                            (e.type, e.state_key): e
-                            for e in remote_auth_chain
-                            if e.event_id in auth_ids or e.type == EventTypes.Create
-                        }
-                        e.internal_metadata.outlier = True
-
-                        logger.debug(
-                            "_check_event_auth %s missing_auth: %s",
-                            event.event_id,
-                            e.event_id,
-                        )
-                        missing_auth_event_context = (
-                            await self.state_handler.compute_event_context(e)
-                        )
-                        await self._auth_and_persist_event(
-                            origin,
-                            e,
-                            missing_auth_event_context,
-                            claimed_auth_event_map=auth,
-                        )
-
-                        if e.event_id in event_auth_events:
-                            auth_events[(e.type, e.state_key)] = e
-                    except AuthError:
-                        pass
-
-            except Exception:
-                logger.exception("Failed to get auth chain")
-
-        if event.internal_metadata.is_outlier():
-            # XXX: given that, for an outlier, we'll be working with the
-            # event's *claimed* auth events rather than those we calculated:
-            # (a) is there any point in this test, since different_auth below will
-            # obviously be empty
-            # (b) alternatively, why don't we do it earlier?
-            logger.info("Skipping auth_event fetch for outlier")
-            return context, auth_events
-
-        different_auth = event_auth_events.difference(
-            e.event_id for e in auth_events.values()
-        )
-
-        if not different_auth:
-            return context, auth_events
-
-        logger.info(
-            "auth_events refers to events which are not in our calculated auth "
-            "chain: %s",
-            different_auth,
-        )
-
-        # XXX: currently this checks for redactions but I'm not convinced that is
-        # necessary?
-        different_events = await self.store.get_events_as_list(different_auth)
-
-        for d in different_events:
-            if d.room_id != event.room_id:
-                logger.warning(
-                    "Event %s refers to auth_event %s which is in a different room",
-                    event.event_id,
-                    d.event_id,
-                )
-
-                # don't attempt to resolve the claimed auth events against our own
-                # in this case: just use our own auth events.
-                #
-                # XXX: should we reject the event in this case? It feels like we should,
-                # but then shouldn't we also do so if we've failed to fetch any of the
-                # auth events?
-                return context, auth_events
-
-        # now we state-resolve between our own idea of the auth events, and the remote's
-        # idea of them.
-
-        local_state = auth_events.values()
-        remote_auth_events = dict(auth_events)
-        remote_auth_events.update({(d.type, d.state_key): d for d in different_events})
-        remote_state = remote_auth_events.values()
-
-        room_version = await self.store.get_room_version_id(event.room_id)
-        new_state = await self.state_handler.resolve_events(
-            room_version, (local_state, remote_state), event
-        )
-
-        logger.info(
-            "After state res: updating auth_events with new state %s",
-            {
-                (d.type, d.state_key): d.event_id
-                for d in new_state.values()
-                if auth_events.get((d.type, d.state_key)) != d
-            },
-        )
-
-        auth_events.update(new_state)
-
-        context = await self._update_context_for_auth_events(
-            event, context, auth_events
-        )
-
-        return context, auth_events
-
-    async def _update_context_for_auth_events(
-        self, event: EventBase, context: EventContext, auth_events: StateMap[EventBase]
-    ) -> EventContext:
-        """Update the state_ids in an event context after auth event resolution,
-        storing the changes as a new state group.
-
-        Args:
-            event: The event we're handling the context for
-
-            context: initial event context
-
-            auth_events: Events to update in the event context.
-
-        Returns:
-            new event context
-        """
-        # exclude the state key of the new event from the current_state in the context.
-        if event.is_state():
-            event_key: Optional[Tuple[str, str]] = (event.type, event.state_key)
-        else:
-            event_key = None
-        state_updates = {
-            k: a.event_id for k, a in auth_events.items() if k != event_key
-        }
-
-        current_state_ids = await context.get_current_state_ids()
-        current_state_ids = dict(current_state_ids)  # type: ignore
-
-        current_state_ids.update(state_updates)
-
-        prev_state_ids = await context.get_prev_state_ids()
-        prev_state_ids = dict(prev_state_ids)
-
-        prev_state_ids.update({k: a.event_id for k, a in auth_events.items()})
-
-        # create a new state group as a delta from the existing one.
-        prev_group = context.state_group
-        state_group = await self.state_store.store_state_group(
-            event.event_id,
-            event.room_id,
-            prev_group=prev_group,
-            delta_ids=state_updates,
-            current_state_ids=current_state_ids,
-        )
-
-        return EventContext.with_state(
-            state_group=state_group,
-            state_group_before_event=context.state_group_before_event,
-            current_state_ids=current_state_ids,
-            prev_state_ids=prev_state_ids,
-            prev_group=prev_group,
-            delta_ids=state_updates,
-        )
-
     async def construct_auth_difference(
         self, local_auth: Iterable[EventBase], remote_auth: Iterable[EventBase]
     ) -> Dict:
@@ -3392,99 +1606,6 @@ class FederationHandler(BaseHandler):
         if "valid" not in response or not response["valid"]:
             raise AuthError(403, "Third party certificate was invalid")
 
-    async def persist_events_and_notify(
-        self,
-        room_id: str,
-        event_and_contexts: Sequence[Tuple[EventBase, EventContext]],
-        backfilled: bool = False,
-    ) -> int:
-        """Persists events and tells the notifier/pushers about them, if
-        necessary.
-
-        Args:
-            room_id: The room ID of events being persisted.
-            event_and_contexts: Sequence of events with their associated
-                context that should be persisted. All events must belong to
-                the same room.
-            backfilled: Whether these events are a result of
-                backfilling or not
-
-        Returns:
-            The stream ID after which all events have been persisted.
-        """
-        if not event_and_contexts:
-            return self.store.get_current_events_token()
-
-        instance = self.config.worker.events_shard_config.get_instance(room_id)
-        if instance != self._instance_name:
-            # Limit the number of events sent over replication. We choose 200
-            # here as that is what we default to in `max_request_body_size(..)`
-            for batch in batch_iter(event_and_contexts, 200):
-                result = await self._send_events(
-                    instance_name=instance,
-                    store=self.store,
-                    room_id=room_id,
-                    event_and_contexts=batch,
-                    backfilled=backfilled,
-                )
-            return result["max_stream_id"]
-        else:
-            assert self.storage.persistence
-
-            # Note that this returns the events that were persisted, which may not be
-            # the same as were passed in if some were deduplicated due to transaction IDs.
-            events, max_stream_token = await self.storage.persistence.persist_events(
-                event_and_contexts, backfilled=backfilled
-            )
-
-            if self._ephemeral_messages_enabled:
-                for event in events:
-                    # If there's an expiry timestamp on the event, schedule its expiry.
-                    self._message_handler.maybe_schedule_expiry(event)
-
-            if not backfilled:  # Never notify for backfilled events
-                for event in events:
-                    await self._notify_persisted_event(event, max_stream_token)
-
-            return max_stream_token.stream
-
-    async def _notify_persisted_event(
-        self, event: EventBase, max_stream_token: RoomStreamToken
-    ) -> None:
-        """Checks to see if notifier/pushers should be notified about the
-        event or not.
-
-        Args:
-            event:
-            max_stream_id: The max_stream_id returned by persist_events
-        """
-
-        extra_users = []
-        if event.type == EventTypes.Member:
-            target_user_id = event.state_key
-
-            # We notify for memberships if its an invite for one of our
-            # users
-            if event.internal_metadata.is_outlier():
-                if event.membership != Membership.INVITE:
-                    if not self.is_mine_id(target_user_id):
-                        return
-
-            target_user = UserID.from_string(target_user_id)
-            extra_users.append(target_user)
-        elif event.internal_metadata.is_outlier():
-            return
-
-        # the event has been persisted so it should have a stream ordering.
-        assert event.internal_metadata.stream_ordering
-
-        event_pos = PersistedEventPosition(
-            self._instance_name, event.internal_metadata.stream_ordering
-        )
-        self.notifier.on_new_room_event(
-            event, event_pos, max_stream_token, extra_users=extra_users
-        )
-
     async def _clean_room_for_join(self, room_id: str) -> None:
         """Called to clean up any data in DB for a given room, ready for the
         server to join the room.