diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index ed96978448..bf14ec384e 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -14,6 +14,7 @@
import logging
import re
+from http import HTTPStatus
from typing import TYPE_CHECKING, Awaitable, List, Tuple
from twisted.web.server import Request
@@ -42,25 +43,25 @@ logger = logging.getLogger(__name__)
class RoomBatchSendEventRestServlet(RestServlet):
"""
- API endpoint which can insert a chunk of events historically back in time
+ API endpoint which can insert a batch of events historically back in time
next to the given `prev_event`.
- `chunk_id` comes from `next_chunk_id `in the response of the batch send
- endpoint and is derived from the "insertion" events added to each chunk.
+ `batch_id` comes from `next_batch_id `in the response of the batch send
+ endpoint and is derived from the "insertion" events added to each batch.
It's not required for the first batch send.
`state_events_at_start` is used to define the historical state events
needed to auth the events like join events. These events will float
outside of the normal DAG as outlier's and won't be visible in the chat
- history which also allows us to insert multiple chunks without having a bunch
- of `@mxid joined the room` noise between each chunk.
+ history which also allows us to insert multiple batches without having a bunch
+ of `@mxid joined the room` noise between each batch.
- `events` is chronological chunk/list of events you want to insert.
- There is a reverse-chronological constraint on chunks so once you insert
+ `events` is chronological list of events you want to insert.
+ There is a reverse-chronological constraint on batches so once you insert
some messages, you can only insert older ones after that.
- tldr; Insert chunks from your most recent history -> oldest history.
+ tldr; Insert batches from your most recent history -> oldest history.
- POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event=<eventID>&chunk_id=<chunkID>
+ POST /_matrix/client/unstable/org.matrix.msc2716/rooms/<roomID>/batch_send?prev_event_id=<eventID>&batch_id=<batchID>
{
"events": [ ... ],
"state_events_at_start": [ ... ]
@@ -128,7 +129,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
self, sender: str, room_id: str, origin_server_ts: int
) -> JsonDict:
"""Creates an event dict for an "insertion" event with the proper fields
- and a random chunk ID.
+ and a random batch ID.
Args:
sender: The event author MXID
@@ -139,13 +140,13 @@ class RoomBatchSendEventRestServlet(RestServlet):
The new event dictionary to insert.
"""
- next_chunk_id = random_string(8)
+ next_batch_id = random_string(8)
insertion_event = {
"type": EventTypes.MSC2716_INSERTION,
"sender": sender,
"room_id": room_id,
"content": {
- EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id,
+ EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
EventContentFields.MSC2716_HISTORICAL: True,
},
"origin_server_ts": origin_server_ts,
@@ -179,7 +180,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
if not requester.app_service:
raise AuthError(
- 403,
+ HTTPStatus.FORBIDDEN,
"Only application services can use the /batchsend endpoint",
)
@@ -187,24 +188,26 @@ class RoomBatchSendEventRestServlet(RestServlet):
assert_params_in_dict(body, ["state_events_at_start", "events"])
assert request.args is not None
- prev_events_from_query = parse_strings_from_args(request.args, "prev_event")
- chunk_id_from_query = parse_string(request, "chunk_id")
+ prev_event_ids_from_query = parse_strings_from_args(
+ request.args, "prev_event_id"
+ )
+ batch_id_from_query = parse_string(request, "batch_id")
- if prev_events_from_query is None:
+ if prev_event_ids_from_query is None:
raise SynapseError(
- 400,
+ HTTPStatus.BAD_REQUEST,
"prev_event query parameter is required when inserting historical messages back in time",
errcode=Codes.MISSING_PARAM,
)
- # For the event we are inserting next to (`prev_events_from_query`),
+ # For the event we are inserting next to (`prev_event_ids_from_query`),
# find the most recent auth events (derived from state events) that
# allowed that message to be sent. We will use that as a base
# to auth our historical messages against.
(
most_recent_prev_event_id,
_,
- ) = await self.store.get_max_depth_of(prev_events_from_query)
+ ) = await self.store.get_max_depth_of(prev_event_ids_from_query)
# mapping from (type, state_key) -> state_event_id
prev_state_map = await self.state_store.get_state_ids_for_event(
most_recent_prev_event_id
@@ -213,7 +216,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
prev_state_ids = list(prev_state_map.values())
auth_event_ids = prev_state_ids
- state_events_at_start = []
+ state_event_ids_at_start = []
for state_event in body["state_events_at_start"]:
assert_params_in_dict(
state_event, ["type", "origin_server_ts", "content", "sender"]
@@ -279,27 +282,38 @@ class RoomBatchSendEventRestServlet(RestServlet):
)
event_id = event.event_id
- state_events_at_start.append(event_id)
+ state_event_ids_at_start.append(event_id)
auth_event_ids.append(event_id)
events_to_create = body["events"]
inherited_depth = await self._inherit_depth_from_prev_ids(
- prev_events_from_query
+ prev_event_ids_from_query
)
- # Figure out which chunk to connect to. If they passed in
- # chunk_id_from_query let's use it. The chunk ID passed in comes
- # from the chunk_id in the "insertion" event from the previous chunk.
- last_event_in_chunk = events_to_create[-1]
- chunk_id_to_connect_to = chunk_id_from_query
+ # Figure out which batch to connect to. If they passed in
+ # batch_id_from_query let's use it. The batch ID passed in comes
+ # from the batch_id in the "insertion" event from the previous batch.
+ last_event_in_batch = events_to_create[-1]
+ batch_id_to_connect_to = batch_id_from_query
base_insertion_event = None
- if chunk_id_from_query:
+ if batch_id_from_query:
# All but the first base insertion event should point at a fake
# event, which causes the HS to ask for the state at the start of
- # the chunk later.
+ # the batch later.
prev_event_ids = [fake_prev_event_id]
- # TODO: Verify the chunk_id_from_query corresponds to an insertion event
+
+ # Verify the batch_id_from_query corresponds to an actual insertion event
+ # and have the batch connected.
+ corresponding_insertion_event_id = (
+ await self.store.get_insertion_event_by_batch_id(batch_id_from_query)
+ )
+ if corresponding_insertion_event_id is None:
+ raise SynapseError(
+ 400,
+ "No insertion event corresponds to the given ?batch_id",
+ errcode=Codes.INVALID_PARAM,
+ )
pass
# Otherwise, create an insertion event to act as a starting point.
#
@@ -309,12 +323,12 @@ class RoomBatchSendEventRestServlet(RestServlet):
# an insertion event), in which case we just create a new insertion event
# that can then get pointed to by a "marker" event later.
else:
- prev_event_ids = prev_events_from_query
+ prev_event_ids = prev_event_ids_from_query
base_insertion_event_dict = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
- origin_server_ts=last_event_in_chunk["origin_server_ts"],
+ origin_server_ts=last_event_in_batch["origin_server_ts"],
)
base_insertion_event_dict["prev_events"] = prev_event_ids.copy()
@@ -333,38 +347,38 @@ class RoomBatchSendEventRestServlet(RestServlet):
depth=inherited_depth,
)
- chunk_id_to_connect_to = base_insertion_event["content"][
- EventContentFields.MSC2716_NEXT_CHUNK_ID
+ batch_id_to_connect_to = base_insertion_event["content"][
+ EventContentFields.MSC2716_NEXT_BATCH_ID
]
- # Connect this current chunk to the insertion event from the previous chunk
- chunk_event = {
- "type": EventTypes.MSC2716_CHUNK,
+ # Connect this current batch to the insertion event from the previous batch
+ batch_event = {
+ "type": EventTypes.MSC2716_BATCH,
"sender": requester.user.to_string(),
"room_id": room_id,
"content": {
- EventContentFields.MSC2716_CHUNK_ID: chunk_id_to_connect_to,
+ EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to,
EventContentFields.MSC2716_HISTORICAL: True,
},
- # Since the chunk event is put at the end of the chunk,
+ # Since the batch event is put at the end of the batch,
# where the newest-in-time event is, copy the origin_server_ts from
# the last event we're inserting
- "origin_server_ts": last_event_in_chunk["origin_server_ts"],
+ "origin_server_ts": last_event_in_batch["origin_server_ts"],
}
- # Add the chunk event to the end of the chunk (newest-in-time)
- events_to_create.append(chunk_event)
+ # Add the batch event to the end of the batch (newest-in-time)
+ events_to_create.append(batch_event)
- # Add an "insertion" event to the start of each chunk (next to the oldest-in-time
- # event in the chunk) so the next chunk can be connected to this one.
+ # Add an "insertion" event to the start of each batch (next to the oldest-in-time
+ # event in the batch) so the next batch can be connected to this one.
insertion_event = self._create_insertion_event_dict(
sender=requester.user.to_string(),
room_id=room_id,
- # Since the insertion event is put at the start of the chunk,
+ # Since the insertion event is put at the start of the batch,
# where the oldest-in-time event is, copy the origin_server_ts from
# the first event we're inserting
origin_server_ts=events_to_create[0]["origin_server_ts"],
)
- # Prepend the insertion event to the start of the chunk (oldest-in-time)
+ # Prepend the insertion event to the start of the batch (oldest-in-time)
events_to_create = [insertion_event] + events_to_create
event_ids = []
@@ -424,20 +438,26 @@ class RoomBatchSendEventRestServlet(RestServlet):
context=context,
)
- # Add the base_insertion_event to the bottom of the list we return
- if base_insertion_event is not None:
- event_ids.append(base_insertion_event.event_id)
+ insertion_event_id = event_ids[0]
+ batch_event_id = event_ids[-1]
+ historical_event_ids = event_ids[1:-1]
- return 200, {
- "state_events": state_events_at_start,
- "events": event_ids,
- "next_chunk_id": insertion_event["content"][
- EventContentFields.MSC2716_NEXT_CHUNK_ID
+ response_dict = {
+ "state_event_ids": state_event_ids_at_start,
+ "event_ids": historical_event_ids,
+ "next_batch_id": insertion_event["content"][
+ EventContentFields.MSC2716_NEXT_BATCH_ID
],
+ "insertion_event_id": insertion_event_id,
+ "batch_event_id": batch_event_id,
}
+ if base_insertion_event is not None:
+ response_dict["base_insertion_event_id"] = base_insertion_event.event_id
+
+ return HTTPStatus.OK, response_dict
def on_GET(self, request: Request, room_id: str) -> Tuple[int, str]:
- return 501, "Not implemented"
+ return HTTPStatus.NOT_IMPLEMENTED, "Not implemented"
def on_PUT(
self, request: SynapseRequest, room_id: str
|