summary refs log tree commit diff
diff options
context:
space:
mode:
authorEric Eastwood <erice@element.io>2021-06-22 04:02:53 -0500
committerGitHub <noreply@github.com>2021-06-22 10:02:53 +0100
commit96f6293de51c2fcf530bb6ca3705cf596c19656f (patch)
tree0de10e4d1b6abb625984573dce136892dda7ca4a
parentImplement config option `sso.update_profile_information` (#10108) (diff)
downloadsynapse-96f6293de51c2fcf530bb6ca3705cf596c19656f.tar.xz
Add endpoints for backfilling history (MSC2716) (#9247)
Work on https://github.com/matrix-org/matrix-doc/pull/2716
-rw-r--r--changelog.d/9247.feature1
-rwxr-xr-xscripts-dev/complement.sh2
-rw-r--r--synapse/api/auth.py7
-rw-r--r--synapse/api/constants.py15
-rw-r--r--synapse/config/experimental.py3
-rw-r--r--synapse/events/__init__.py9
-rw-r--r--synapse/events/builder.py17
-rw-r--r--synapse/handlers/message.py104
-rw-r--r--synapse/handlers/room_member.py90
-rw-r--r--synapse/rest/client/v1/room.py288
-rw-r--r--synapse/storage/databases/main/event_federation.py50
-rw-r--r--tests/handlers/test_presence.py4
-rw-r--r--tests/replication/test_federation_sender_shard.py4
-rw-r--r--tests/storage/test_redaction.py13
14 files changed, 584 insertions, 23 deletions
diff --git a/changelog.d/9247.feature b/changelog.d/9247.feature
new file mode 100644
index 0000000000..c687acf102
--- /dev/null
+++ b/changelog.d/9247.feature
@@ -0,0 +1 @@
+Add experimental support for backfilling history into rooms ([MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716)).
diff --git a/scripts-dev/complement.sh b/scripts-dev/complement.sh
index 0043964673..ba060104c3 100755
--- a/scripts-dev/complement.sh
+++ b/scripts-dev/complement.sh
@@ -65,4 +65,4 @@ if [[ -n "$1" ]]; then
 fi
 
 # Run the tests!
-go test -v -tags synapse_blacklist,msc2946,msc3083 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
+go test -v -tags synapse_blacklist,msc2946,msc3083,msc2716 -count=1 $EXTRA_COMPLEMENT_ARGS ./tests
diff --git a/synapse/api/auth.py b/synapse/api/auth.py
index cf4333a923..edf1b918eb 100644
--- a/synapse/api/auth.py
+++ b/synapse/api/auth.py
@@ -92,11 +92,8 @@ class Auth:
     async def check_from_context(
         self, room_version: str, event, context, do_sig_check=True
     ) -> None:
-        prev_state_ids = await context.get_prev_state_ids()
-        auth_events_ids = self.compute_auth_events(
-            event, prev_state_ids, for_verification=True
-        )
-        auth_events_by_id = await self.store.get_events(auth_events_ids)
+        auth_event_ids = event.auth_event_ids()
+        auth_events_by_id = await self.store.get_events(auth_event_ids)
         auth_events = {(e.type, e.state_key): e for e in auth_events_by_id.values()}
 
         room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
diff --git a/synapse/api/constants.py b/synapse/api/constants.py
index 6c3958f7ab..414e4c019a 100644
--- a/synapse/api/constants.py
+++ b/synapse/api/constants.py
@@ -119,6 +119,9 @@ class EventTypes:
     SpaceChild = "m.space.child"
     SpaceParent = "m.space.parent"
 
+    MSC2716_INSERTION = "org.matrix.msc2716.insertion"
+    MSC2716_MARKER = "org.matrix.msc2716.marker"
+
 
 class ToDeviceEventTypes:
     RoomKeyRequest = "m.room_key_request"
@@ -185,6 +188,18 @@ class EventContentFields:
     # cf https://github.com/matrix-org/matrix-doc/pull/1772
     ROOM_TYPE = "type"
 
+    # Used on normal messages to indicate they were historically imported after the fact
+    MSC2716_HISTORICAL = "org.matrix.msc2716.historical"
+    # For "insertion" events
+    MSC2716_NEXT_CHUNK_ID = "org.matrix.msc2716.next_chunk_id"
+    # Used on normal message events to indicate where the chunk connects to
+    MSC2716_CHUNK_ID = "org.matrix.msc2716.chunk_id"
+    # For "marker" events
+    MSC2716_MARKER_INSERTION = "org.matrix.msc2716.marker.insertion"
+    MSC2716_MARKER_INSERTION_PREV_EVENTS = (
+        "org.matrix.msc2716.marker.insertion_prev_events"
+    )
+
 
 class RoomEncryptionAlgorithms:
     MEGOLM_V1_AES_SHA2 = "m.megolm.v1.aes-sha2"
diff --git a/synapse/config/experimental.py b/synapse/config/experimental.py
index 6ebce4b2f7..7fb1f7021f 100644
--- a/synapse/config/experimental.py
+++ b/synapse/config/experimental.py
@@ -29,3 +29,6 @@ class ExperimentalConfig(Config):
 
         # MSC3026 (busy presence state)
         self.msc3026_enabled = experimental.get("msc3026_enabled", False)  # type: bool
+
+        # MSC2716 (backfill existing history)
+        self.msc2716_enabled = experimental.get("msc2716_enabled", False)  # type: bool
diff --git a/synapse/events/__init__.py b/synapse/events/__init__.py
index c8b52cbc7a..0cb9c1cc1e 100644
--- a/synapse/events/__init__.py
+++ b/synapse/events/__init__.py
@@ -119,6 +119,7 @@ class _EventInternalMetadata:
     redacted = DictProperty("redacted")  # type: bool
     txn_id = DictProperty("txn_id")  # type: str
     token_id = DictProperty("token_id")  # type: str
+    historical = DictProperty("historical")  # type: bool
 
     # XXX: These are set by StreamWorkerStore._set_before_and_after.
     # I'm pretty sure that these are never persisted to the database, so shouldn't
@@ -204,6 +205,14 @@ class _EventInternalMetadata:
         """
         return self._dict.get("redacted", False)
 
+    def is_historical(self) -> bool:
+        """Whether this is a historical message.
+        This is used by the batchsend historical message endpoint and
+        is needed to and mark the event as backfilled and skip some checks
+        like push notifications.
+        """
+        return self._dict.get("historical", False)
+
 
 class EventBase(metaclass=abc.ABCMeta):
     @property
diff --git a/synapse/events/builder.py b/synapse/events/builder.py
index 5793553a88..81bf8615b7 100644
--- a/synapse/events/builder.py
+++ b/synapse/events/builder.py
@@ -11,6 +11,7 @@
 # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 # See the License for the specific language governing permissions and
 # limitations under the License.
+import logging
 from typing import Any, Dict, List, Optional, Tuple, Union
 
 import attr
@@ -33,6 +34,8 @@ from synapse.types import EventID, JsonDict
 from synapse.util import Clock
 from synapse.util.stringutils import random_string
 
+logger = logging.getLogger(__name__)
+
 
 @attr.s(slots=True, cmp=False, frozen=True)
 class EventBuilder:
@@ -100,6 +103,7 @@ class EventBuilder:
         self,
         prev_event_ids: List[str],
         auth_event_ids: Optional[List[str]],
+        depth: Optional[int] = None,
     ) -> EventBase:
         """Transform into a fully signed and hashed event
 
@@ -108,6 +112,9 @@ class EventBuilder:
             auth_event_ids: The event IDs to use as the auth events.
                 Should normally be set to None, which will cause them to be calculated
                 based on the room state at the prev_events.
+            depth: Override the depth used to order the event in the DAG.
+                Should normally be set to None, which will cause the depth to be calculated
+                based on the prev_events.
 
         Returns:
             The signed and hashed event.
@@ -131,8 +138,14 @@ class EventBuilder:
             auth_events = auth_event_ids
             prev_events = prev_event_ids
 
-        old_depth = await self._store.get_max_depth_of(prev_event_ids)
-        depth = old_depth + 1
+        # Otherwise, progress the depth as normal
+        if depth is None:
+            (
+                _,
+                most_recent_prev_event_depth,
+            ) = await self._store.get_max_depth_of(prev_event_ids)
+
+            depth = most_recent_prev_event_depth + 1
 
         # we cap depth of generated events, to ensure that they are not
         # rejected by other servers (and so that they can be persisted in
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 4d2255bdf1..db12abd59d 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -482,6 +482,9 @@ class EventCreationHandler:
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
         require_consent: bool = True,
+        outlier: bool = False,
+        historical: bool = False,
+        depth: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """
         Given a dict from a client, create a new event.
@@ -508,6 +511,14 @@ class EventCreationHandler:
 
             require_consent: Whether to check if the requester has
                 consented to the privacy policy.
+
+            outlier: Indicates whether the event is an `outlier`, i.e. if
+                it's from an arbitrary point and floating in the DAG as
+                opposed to being inline with the current DAG.
+            depth: Override the depth used to order the event in the DAG.
+                Should normally be set to None, which will cause the depth to be calculated
+                based on the prev_events.
+
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
@@ -563,11 +574,36 @@ class EventCreationHandler:
         if txn_id is not None:
             builder.internal_metadata.txn_id = txn_id
 
+        builder.internal_metadata.outlier = outlier
+
+        builder.internal_metadata.historical = historical
+
+        # Strip down the auth_event_ids to only what we need to auth the event.
+        # For example, we don't need extra m.room.member that don't match event.sender
+        if auth_event_ids is not None:
+            temp_event = await builder.build(
+                prev_event_ids=prev_event_ids,
+                auth_event_ids=auth_event_ids,
+                depth=depth,
+            )
+            auth_events = await self.store.get_events_as_list(auth_event_ids)
+            # Create a StateMap[str]
+            auth_event_state_map = {
+                (e.type, e.state_key): e.event_id for e in auth_events
+            }
+            # Actually strip down and use the necessary auth events
+            auth_event_ids = self.auth.compute_auth_events(
+                event=temp_event,
+                current_state_ids=auth_event_state_map,
+                for_verification=False,
+            )
+
         event, context = await self.create_new_client_event(
             builder=builder,
             requester=requester,
             prev_event_ids=prev_event_ids,
             auth_event_ids=auth_event_ids,
+            depth=depth,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -724,9 +760,13 @@ class EventCreationHandler:
         self,
         requester: Requester,
         event_dict: dict,
+        prev_event_ids: Optional[List[str]] = None,
+        auth_event_ids: Optional[List[str]] = None,
         ratelimit: bool = True,
         txn_id: Optional[str] = None,
         ignore_shadow_ban: bool = False,
+        outlier: bool = False,
+        depth: Optional[int] = None,
     ) -> Tuple[EventBase, int]:
         """
         Creates an event, then sends it.
@@ -736,10 +776,24 @@ class EventCreationHandler:
         Args:
             requester: The requester sending the event.
             event_dict: An entire event.
+            prev_event_ids:
+                The event IDs to use as the prev events.
+                Should normally be left as None to automatically request them
+                from the database.
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
             ratelimit: Whether to rate limit this send.
             txn_id: The transaction ID.
             ignore_shadow_ban: True if shadow-banned users should be allowed to
                 send this event.
+            outlier: Indicates whether the event is an `outlier`, i.e. if
+                it's from an arbitrary point and floating in the DAG as
+                opposed to being inline with the current DAG.
+            depth: Override the depth used to order the event in the DAG.
+                Should normally be set to None, which will cause the depth to be calculated
+                based on the prev_events.
 
         Returns:
             The event, and its stream ordering (if deduplication happened,
@@ -779,7 +833,13 @@ class EventCreationHandler:
                     return event, event.internal_metadata.stream_ordering
 
             event, context = await self.create_event(
-                requester, event_dict, txn_id=txn_id
+                requester,
+                event_dict,
+                txn_id=txn_id,
+                prev_event_ids=prev_event_ids,
+                auth_event_ids=auth_event_ids,
+                outlier=outlier,
+                depth=depth,
             )
 
             assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
@@ -811,6 +871,7 @@ class EventCreationHandler:
         requester: Optional[Requester] = None,
         prev_event_ids: Optional[List[str]] = None,
         auth_event_ids: Optional[List[str]] = None,
+        depth: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """Create a new event for a local client
 
@@ -828,6 +889,10 @@ class EventCreationHandler:
                 Should normally be left as None, which will cause them to be calculated
                 based on the room state at the prev_events.
 
+            depth: Override the depth used to order the event in the DAG.
+                Should normally be set to None, which will cause the depth to be calculated
+                based on the prev_events.
+
         Returns:
             Tuple of created event, context
         """
@@ -851,9 +916,24 @@ class EventCreationHandler:
         ), "Attempting to create an event with no prev_events"
 
         event = await builder.build(
-            prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
+            prev_event_ids=prev_event_ids,
+            auth_event_ids=auth_event_ids,
+            depth=depth,
         )
-        context = await self.state.compute_event_context(event)
+
+        old_state = None
+
+        # Pass on the outlier property from the builder to the event
+        # after it is created
+        if builder.internal_metadata.outlier:
+            event.internal_metadata.outlier = builder.internal_metadata.outlier
+
+            # Calculate the state for outliers that pass in their own `auth_event_ids`
+            if auth_event_ids:
+                old_state = await self.store.get_events_as_list(auth_event_ids)
+
+        context = await self.state.compute_event_context(event, old_state=old_state)
+
         if requester:
             context.app_service = requester.app_service
 
@@ -1018,7 +1098,13 @@ class EventCreationHandler:
         the arguments.
         """
 
-        await self.action_generator.handle_push_actions_for_event(event, context)
+        # Skip push notification actions for historical messages
+        # because we don't want to notify people about old history back in time.
+        # The historical messages also do not have the proper `context.current_state_ids`
+        # and `state_groups` because they have `prev_events` that aren't persisted yet
+        # (historical messages persisted in reverse-chronological order).
+        if not event.internal_metadata.is_historical():
+            await self.action_generator.handle_push_actions_for_event(event, context)
 
         try:
             # If we're a worker we need to hit out to the master.
@@ -1317,13 +1403,21 @@ class EventCreationHandler:
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
+        # Mark any `m.historical` messages as backfilled so they don't appear
+        # in `/sync` and have the proper decrementing `stream_ordering` as we import
+        backfilled = False
+        if event.internal_metadata.is_historical():
+            backfilled = True
+
         # Note that this returns the event that was persisted, which may not be
         # the same as we passed in if it was deduplicated due transaction IDs.
         (
             event,
             event_pos,
             max_stream_token,
-        ) = await self.storage.persistence.persist_event(event, context=context)
+        ) = await self.storage.persistence.persist_event(
+            event, context=context, backfilled=backfilled
+        )
 
         if self._ephemeral_events_enabled:
             # If there's an expiry timestamp on the event, schedule its expiry.
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index a49a61a34c..1192591609 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -257,11 +257,42 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         room_id: str,
         membership: str,
         prev_event_ids: List[str],
+        auth_event_ids: Optional[List[str]] = None,
         txn_id: Optional[str] = None,
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
+        outlier: bool = False,
     ) -> Tuple[str, int]:
+        """
+        Internal membership update function to get an existing event or create
+        and persist a new event for the new membership change.
+
+        Args:
+            requester:
+            target:
+            room_id:
+            membership:
+            prev_event_ids: The event IDs to use as the prev events
+
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
+
+            txn_id:
+            ratelimit:
+            content:
+            require_consent:
+
+            outlier: Indicates whether the event is an `outlier`, i.e. if
+                it's from an arbitrary point and floating in the DAG as
+                opposed to being inline with the current DAG.
+
+        Returns:
+            Tuple of event ID and stream ordering position
+        """
+
         user_id = target.to_string()
 
         if content is None:
@@ -298,7 +329,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             },
             txn_id=txn_id,
             prev_event_ids=prev_event_ids,
+            auth_event_ids=auth_event_ids,
             require_consent=require_consent,
+            outlier=outlier,
         )
 
         prev_state_ids = await context.get_prev_state_ids()
@@ -399,6 +432,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
+        outlier: bool = False,
+        prev_event_ids: Optional[List[str]] = None,
+        auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
         """Update a user's membership in a room.
 
@@ -413,6 +449,14 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             ratelimit: Whether to rate limit the request.
             content: The content of the created event.
             require_consent: Whether consent is required.
+            outlier: Indicates whether the event is an `outlier`, i.e. if
+                it's from an arbitrary point and floating in the DAG as
+                opposed to being inline with the current DAG.
+            prev_event_ids: The event IDs to use as the prev events
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
 
         Returns:
             A tuple of the new event ID and stream ID.
@@ -439,6 +483,9 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 ratelimit=ratelimit,
                 content=content,
                 require_consent=require_consent,
+                outlier=outlier,
+                prev_event_ids=prev_event_ids,
+                auth_event_ids=auth_event_ids,
             )
 
         return result
@@ -455,10 +502,36 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         ratelimit: bool = True,
         content: Optional[dict] = None,
         require_consent: bool = True,
+        outlier: bool = False,
+        prev_event_ids: Optional[List[str]] = None,
+        auth_event_ids: Optional[List[str]] = None,
     ) -> Tuple[str, int]:
         """Helper for update_membership.
 
         Assumes that the membership linearizer is already held for the room.
+
+        Args:
+            requester:
+            target:
+            room_id:
+            action:
+            txn_id:
+            remote_room_hosts:
+            third_party_signed:
+            ratelimit:
+            content:
+            require_consent:
+            outlier: Indicates whether the event is an `outlier`, i.e. if
+                it's from an arbitrary point and floating in the DAG as
+                opposed to being inline with the current DAG.
+            prev_event_ids: The event IDs to use as the prev events
+            auth_event_ids:
+                The event ids to use as the auth_events for the new event.
+                Should normally be left as None, which will cause them to be calculated
+                based on the room state at the prev_events.
+
+        Returns:
+            A tuple of the new event ID and stream ID.
         """
         content_specified = bool(content)
         if content is None:
@@ -543,6 +616,21 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             if block_invite:
                 raise SynapseError(403, "Invites have been disabled on this server")
 
+        if prev_event_ids:
+            return await self._local_membership_update(
+                requester=requester,
+                target=target,
+                room_id=room_id,
+                membership=effective_membership_state,
+                txn_id=txn_id,
+                ratelimit=ratelimit,
+                prev_event_ids=prev_event_ids,
+                auth_event_ids=auth_event_ids,
+                content=content,
+                require_consent=require_consent,
+                outlier=outlier,
+            )
+
         latest_event_ids = await self.store.get_prev_events_for_room(room_id)
 
         current_state_ids = await self.state_handler.get_current_state_ids(
@@ -732,8 +820,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             txn_id=txn_id,
             ratelimit=ratelimit,
             prev_event_ids=latest_event_ids,
+            auth_event_ids=auth_event_ids,
             content=content,
             require_consent=require_consent,
+            outlier=outlier,
         )
 
     async def transfer_room_state_on_room_upgrade(
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py
index 16d087ea60..92ebe838fd 100644
--- a/synapse/rest/client/v1/room.py
+++ b/synapse/rest/client/v1/room.py
@@ -19,7 +19,7 @@ import re
 from typing import TYPE_CHECKING, Dict, List, Optional, Tuple
 from urllib import parse as urlparse
 
-from synapse.api.constants import EventTypes, Membership
+from synapse.api.constants import EventContentFields, EventTypes, Membership
 from synapse.api.errors import (
     AuthError,
     Codes,
@@ -266,6 +266,288 @@ class RoomSendEventRestServlet(TransactionRestServlet):
         )
 
 
+class RoomBatchSendEventRestServlet(TransactionRestServlet):
+    """
+    API endpoint which can insert a chunk of events historically back in time
+    next to the given `prev_event`.
+
+    `chunk_id` comes from `next_chunk_id `in the response of the batch send
+    endpoint and is derived from the "insertion" events added to each chunk.
+    It's not required for the first batch send.
+
+    `state_events_at_start` is used to define the historical state events
+    needed to auth the events like join events. These events will float
+    outside of the normal DAG as outlier's and won't be visible in the chat
+    history which also allows us to insert multiple chunks without having a bunch
+    of `@mxid joined the room` noise between each chunk.
+
+    `events` is chronological chunk/list of events you want to insert.
+    There is a reverse-chronological constraint on chunks so once you insert
+    some messages, you can only insert older ones after that.
+    tldr; Insert chunks from your most recent history -> oldest history.
+
+    POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event=<eventID>&chunk_id=<chunkID>
+    {
+        "events": [ ... ],
+        "state_events_at_start": [ ... ]
+    }
+    """
+
+    PATTERNS = (
+        re.compile(
+            "^/_matrix/client/unstable/org.matrix.msc2716"
+            "/rooms/(?P<room_id>[^/]*)/batch_send$"
+        ),
+    )
+
+    def __init__(self, hs):
+        super().__init__(hs)
+        self.hs = hs
+        self.store = hs.get_datastore()
+        self.state_store = hs.get_storage().state
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.room_member_handler = hs.get_room_member_handler()
+        self.auth = hs.get_auth()
+
+    async def inherit_depth_from_prev_ids(self, prev_event_ids) -> int:
+        (
+            most_recent_prev_event_id,
+            most_recent_prev_event_depth,
+        ) = await self.store.get_max_depth_of(prev_event_ids)
+
+        # We want to insert the historical event after the `prev_event` but before the successor event
+        #
+        # We inherit depth from the successor event instead of the `prev_event`
+        # because events returned from `/messages` are first sorted by `topological_ordering`
+        # which is just the `depth` and then tie-break with `stream_ordering`.
+        #
+        # We mark these inserted historical events as "backfilled" which gives them a
+        # negative `stream_ordering`. If we use the same depth as the `prev_event`,
+        # then our historical event will tie-break and be sorted before the `prev_event`
+        # when it should come after.
+        #
+        # We want to use the successor event depth so they appear after `prev_event` because
+        # it has a larger `depth` but before the successor event because the `stream_ordering`
+        # is negative before the successor event.
+        successor_event_ids = await self.store.get_successor_events(
+            [most_recent_prev_event_id]
+        )
+
+        # If we can't find any successor events, then it's a forward extremity of
+        # historical messages and we can just inherit from the previous historical
+        # event which we can already assume has the correct depth where we want
+        # to insert into.
+        if not successor_event_ids:
+            depth = most_recent_prev_event_depth
+        else:
+            (
+                _,
+                oldest_successor_depth,
+            ) = await self.store.get_min_depth_of(successor_event_ids)
+
+            depth = oldest_successor_depth
+
+        return depth
+
+    async def on_POST(self, request, room_id):
+        requester = await self.auth.get_user_by_req(request, allow_guest=False)
+
+        if not requester.app_service:
+            raise AuthError(
+                403,
+                "Only application services can use the /batchsend endpoint",
+            )
+
+        body = parse_json_object_from_request(request)
+        assert_params_in_dict(body, ["state_events_at_start", "events"])
+
+        prev_events_from_query = parse_strings_from_args(request.args, "prev_event")
+        chunk_id_from_query = parse_string(request, "chunk_id", default=None)
+
+        if prev_events_from_query is None:
+            raise SynapseError(
+                400,
+                "prev_event query parameter is required when inserting historical messages back in time",
+                errcode=Codes.MISSING_PARAM,
+            )
+
+        # For the event we are inserting next to (`prev_events_from_query`),
+        # find the most recent auth events (derived from state events) that
+        # allowed that message to be sent. We will use that as a base
+        # to auth our historical messages against.
+        (
+            most_recent_prev_event_id,
+            _,
+        ) = await self.store.get_max_depth_of(prev_events_from_query)
+        # mapping from (type, state_key) -> state_event_id
+        prev_state_map = await self.state_store.get_state_ids_for_event(
+            most_recent_prev_event_id
+        )
+        # List of state event ID's
+        prev_state_ids = list(prev_state_map.values())
+        auth_event_ids = prev_state_ids
+
+        for state_event in body["state_events_at_start"]:
+            assert_params_in_dict(
+                state_event, ["type", "origin_server_ts", "content", "sender"]
+            )
+
+            logger.debug(
+                "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s",
+                state_event,
+                auth_event_ids,
+            )
+
+            event_dict = {
+                "type": state_event["type"],
+                "origin_server_ts": state_event["origin_server_ts"],
+                "content": state_event["content"],
+                "room_id": room_id,
+                "sender": state_event["sender"],
+                "state_key": state_event["state_key"],
+            }
+
+            # Make the state events float off on their own
+            fake_prev_event_id = "$" + random_string(43)
+
+            # TODO: This is pretty much the same as some other code to handle inserting state in this file
+            if event_dict["type"] == EventTypes.Member:
+                membership = event_dict["content"].get("membership", None)
+                event_id, _ = await self.room_member_handler.update_membership(
+                    requester,
+                    target=UserID.from_string(event_dict["state_key"]),
+                    room_id=room_id,
+                    action=membership,
+                    content=event_dict["content"],
+                    outlier=True,
+                    prev_event_ids=[fake_prev_event_id],
+                    # Make sure to use a copy of this list because we modify it
+                    # later in the loop here. Otherwise it will be the same
+                    # reference and also update in the event when we append later.
+                    auth_event_ids=auth_event_ids.copy(),
+                )
+            else:
+                # TODO: Add some complement tests that adds state that is not member joins
+                # and will use this code path. Maybe we only want to support join state events
+                # and can get rid of this `else`?
+                (
+                    event,
+                    _,
+                ) = await self.event_creation_handler.create_and_send_nonmember_event(
+                    requester,
+                    event_dict,
+                    outlier=True,
+                    prev_event_ids=[fake_prev_event_id],
+                    # Make sure to use a copy of this list because we modify it
+                    # later in the loop here. Otherwise it will be the same
+                    # reference and also update in the event when we append later.
+                    auth_event_ids=auth_event_ids.copy(),
+                )
+                event_id = event.event_id
+
+            auth_event_ids.append(event_id)
+
+        events_to_create = body["events"]
+
+        # If provided, connect the chunk to the last insertion point
+        # The chunk ID passed in comes from the chunk_id in the
+        # "insertion" event from the previous chunk.
+        if chunk_id_from_query:
+            last_event_in_chunk = events_to_create[-1]
+            last_event_in_chunk["content"][
+                EventContentFields.MSC2716_CHUNK_ID
+            ] = chunk_id_from_query
+
+        # Add an "insertion" event to the start of each chunk (next to the oldest
+        # event in the chunk) so the next chunk can be connected to this one.
+        next_chunk_id = random_string(64)
+        insertion_event = {
+            "type": EventTypes.MSC2716_INSERTION,
+            "sender": requester.user.to_string(),
+            "content": {
+                EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
+                EventContentFields.MSC2716_HISTORICAL: True,
+            },
+            # Since the insertion event is put at the start of the chunk,
+            # where the oldest event is, copy the origin_server_ts from
+            # the first event we're inserting
+            "origin_server_ts": events_to_create[0]["origin_server_ts"],
+        }
+        # Prepend the insertion event to the start of the chunk
+        events_to_create = [insertion_event] + events_to_create
+
+        inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query)
+
+        event_ids = []
+        prev_event_ids = prev_events_from_query
+        events_to_persist = []
+        for ev in events_to_create:
+            assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
+
+            # Mark all events as historical
+            # This has important semantics within the Synapse internals to backfill properly
+            ev["content"][EventContentFields.MSC2716_HISTORICAL] = True
+
+            event_dict = {
+                "type": ev["type"],
+                "origin_server_ts": ev["origin_server_ts"],
+                "content": ev["content"],
+                "room_id": room_id,
+                "sender": ev["sender"],  # requester.user.to_string(),
+                "prev_events": prev_event_ids.copy(),
+            }
+
+            event, context = await self.event_creation_handler.create_event(
+                requester,
+                event_dict,
+                prev_event_ids=event_dict.get("prev_events"),
+                auth_event_ids=auth_event_ids,
+                historical=True,
+                depth=inherited_depth,
+            )
+            logger.debug(
+                "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
+                event,
+                prev_event_ids,
+                auth_event_ids,
+            )
+
+            assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+                event.sender,
+            )
+
+            events_to_persist.append((event, context))
+            event_id = event.event_id
+
+            event_ids.append(event_id)
+            prev_event_ids = [event_id]
+
+        # Persist events in reverse-chronological order so they have the
+        # correct stream_ordering as they are backfilled (which decrements).
+        # Events are sorted by (topological_ordering, stream_ordering)
+        # where topological_ordering is just depth.
+        for (event, context) in reversed(events_to_persist):
+            ev = await self.event_creation_handler.handle_new_client_event(
+                requester=requester,
+                event=event,
+                context=context,
+            )
+
+        return 200, {
+            "state_events": auth_event_ids,
+            "events": event_ids,
+            "next_chunk_id": next_chunk_id,
+        }
+
+    def on_GET(self, request, room_id):
+        return 501, "Not implemented"
+
+    def on_PUT(self, request, room_id):
+        return self.txns.fetch_or_execute_request(
+            request, self.on_POST, request, room_id
+        )
+
+
 # TODO: Needs unit testing for room ID + alias joins
 class JoinRoomAliasServlet(TransactionRestServlet):
     def __init__(self, hs):
@@ -1054,6 +1336,8 @@ class RoomSpaceSummaryRestServlet(RestServlet):
 
 
 def register_servlets(hs: "HomeServer", http_server, is_worker=False):
+    msc2716_enabled = hs.config.experimental.msc2716_enabled
+
     RoomStateEventRestServlet(hs).register(http_server)
     RoomMemberListRestServlet(hs).register(http_server)
     JoinedRoomMemberListRestServlet(hs).register(http_server)
@@ -1061,6 +1345,8 @@ def register_servlets(hs: "HomeServer", http_server, is_worker=False):
     JoinRoomAliasServlet(hs).register(http_server)
     RoomMembershipRestServlet(hs).register(http_server)
     RoomSendEventRestServlet(hs).register(http_server)
+    if msc2716_enabled:
+        RoomBatchSendEventRestServlet(hs).register(http_server)
     PublicRoomListRestServlet(hs).register(http_server)
     RoomStateRestServlet(hs).register(http_server)
     RoomRedactEventRestServlet(hs).register(http_server)
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index ff81d5cd17..c0ea445550 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -16,6 +16,7 @@ import logging
 from queue import Empty, PriorityQueue
 from typing import Collection, Dict, Iterable, List, Set, Tuple
 
+from synapse.api.constants import MAX_DEPTH
 from synapse.api.errors import StoreError
 from synapse.events import EventBase
 from synapse.metrics.background_process_metrics import wrap_as_background_process
@@ -670,8 +671,8 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
 
         return dict(txn)
 
-    async def get_max_depth_of(self, event_ids: List[str]) -> int:
-        """Returns the max depth of a set of event IDs
+    async def get_max_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
+        """Returns the event ID and depth for the event that has the max depth from a set of event IDs
 
         Args:
             event_ids: The event IDs to calculate the max depth of.
@@ -680,14 +681,53 @@ class EventFederationWorkerStore(EventsWorkerStore, SignatureWorkerStore, SQLBas
             table="events",
             column="event_id",
             iterable=event_ids,
-            retcols=("depth",),
+            retcols=(
+                "event_id",
+                "depth",
+            ),
             desc="get_max_depth_of",
         )
 
         if not rows:
-            return 0
+            return None, 0
         else:
-            return max(row["depth"] for row in rows)
+            max_depth_event_id = ""
+            current_max_depth = 0
+            for row in rows:
+                if row["depth"] > current_max_depth:
+                    max_depth_event_id = row["event_id"]
+                    current_max_depth = row["depth"]
+
+            return max_depth_event_id, current_max_depth
+
+    async def get_min_depth_of(self, event_ids: List[str]) -> Tuple[str, int]:
+        """Returns the event ID and depth for the event that has the min depth from a set of event IDs
+
+        Args:
+            event_ids: The event IDs to calculate the max depth of.
+        """
+        rows = await self.db_pool.simple_select_many_batch(
+            table="events",
+            column="event_id",
+            iterable=event_ids,
+            retcols=(
+                "event_id",
+                "depth",
+            ),
+            desc="get_min_depth_of",
+        )
+
+        if not rows:
+            return None, 0
+        else:
+            min_depth_event_id = ""
+            current_min_depth = MAX_DEPTH
+            for row in rows:
+                if row["depth"] < current_min_depth:
+                    min_depth_event_id = row["event_id"]
+                    current_min_depth = row["depth"]
+
+            return min_depth_event_id, current_min_depth
 
     async def get_prev_events_for_room(self, room_id: str) -> List[str]:
         """
diff --git a/tests/handlers/test_presence.py b/tests/handlers/test_presence.py
index d90a9fec91..dfb9b3a0fa 100644
--- a/tests/handlers/test_presence.py
+++ b/tests/handlers/test_presence.py
@@ -863,7 +863,9 @@ class PresenceJoinTestCase(unittest.HomeserverTestCase):
             self.store.get_latest_event_ids_in_room(room_id)
         )
 
-        event = self.get_success(builder.build(prev_event_ids, None))
+        event = self.get_success(
+            builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
+        )
 
         self.get_success(self.federation_handler.on_receive_pdu(hostname, event))
 
diff --git a/tests/replication/test_federation_sender_shard.py b/tests/replication/test_federation_sender_shard.py
index 48ab3aa4e3..584da58371 100644
--- a/tests/replication/test_federation_sender_shard.py
+++ b/tests/replication/test_federation_sender_shard.py
@@ -224,7 +224,9 @@ class FederationSenderTestCase(BaseMultiWorkerStreamTestCase):
         }
 
         builder = factory.for_room_version(room_version, event_dict)
-        join_event = self.get_success(builder.build(prev_event_ids, None))
+        join_event = self.get_success(
+            builder.build(prev_event_ids=prev_event_ids, auth_event_ids=None)
+        )
 
         self.get_success(federation.on_send_join_request(remote_server, join_event))
         self.replicate()
diff --git a/tests/storage/test_redaction.py b/tests/storage/test_redaction.py
index bb31ab756d..dbacce4380 100644
--- a/tests/storage/test_redaction.py
+++ b/tests/storage/test_redaction.py
@@ -232,9 +232,14 @@ class RedactionTestCase(unittest.HomeserverTestCase):
                 self._base_builder = base_builder
                 self._event_id = event_id
 
-            async def build(self, prev_event_ids, auth_event_ids):
+            async def build(
+                self,
+                prev_event_ids,
+                auth_event_ids,
+                depth: Optional[int] = None,
+            ):
                 built_event = await self._base_builder.build(
-                    prev_event_ids, auth_event_ids
+                    prev_event_ids=prev_event_ids, auth_event_ids=auth_event_ids
                 )
 
                 built_event._event_id = self._event_id
@@ -251,6 +256,10 @@ class RedactionTestCase(unittest.HomeserverTestCase):
             def type(self):
                 return self._base_builder.type
 
+            @property
+            def internal_metadata(self):
+                return self._base_builder.internal_metadata
+
         event_1, context_1 = self.get_success(
             self.event_creation_handler.create_new_client_event(
                 EventIdManglingBuilder(