diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c66aefe2c4..07aadf3f3c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,6 +16,7 @@
# limitations under the License.
import logging
import random
+from http import HTTPStatus
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
from canonicaljson import encode_canonical_json
@@ -1461,6 +1462,39 @@ class EventCreationHandler:
if prev_state_ids:
raise AuthError(403, "Changing the room create event is forbidden")
+ if event.type == EventTypes.MSC2716_INSERTION:
+ room_version = await self.store.get_room_version_id(event.room_id)
+ room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+ create_event = await self.store.get_create_event_for_room(event.room_id)
+ room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
+
+ # Only check an insertion event if the room version
+ # supports it or the event is from the room creator.
+ if room_version_obj.msc2716_historical or (
+ self.config.experimental.msc2716_enabled
+ and event.sender == room_creator
+ ):
+ next_batch_id = event.content.get(
+ EventContentFields.MSC2716_NEXT_BATCH_ID
+ )
+ conflicting_insertion_event_id = (
+ await self.store.get_insertion_event_by_batch_id(
+ event.room_id, next_batch_id
+ )
+ )
+ if conflicting_insertion_event_id is not None:
+ # The current insertion event that we're processing is invalid
+ # because an insertion event already exists in the room with the
+ # same next_batch_id. We can't allow multiple because the batch
+ # pointing will get weird, e.g. we can't determine which insertion
+ # event the batch event is pointing to.
+ raise SynapseError(
+ HTTPStatus.BAD_REQUEST,
+ "Another insertion event already exists with the same next_batch_id",
+ errcode=Codes.INVALID_PARAM,
+ )
+
# Mark any `m.historical` messages as backfilled so they don't appear
# in `/sync` and have the proper decrementing `stream_ordering` as we import
backfilled = False
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index bf14ec384e..1dffcc3147 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -306,11 +306,13 @@ class RoomBatchSendEventRestServlet(RestServlet):
# Verify the batch_id_from_query corresponds to an actual insertion event
# and have the batch connected.
corresponding_insertion_event_id = (
- await self.store.get_insertion_event_by_batch_id(batch_id_from_query)
+ await self.store.get_insertion_event_by_batch_id(
+ room_id, batch_id_from_query
+ )
)
if corresponding_insertion_event_id is None:
raise SynapseError(
- 400,
+ HTTPStatus.BAD_REQUEST,
"No insertion event corresponds to the given ?batch_id",
errcode=Codes.INVALID_PARAM,
)
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index a383388757..300a563c9e 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -18,7 +18,9 @@ from synapse.storage._base import SQLBaseStore
class RoomBatchStore(SQLBaseStore):
- async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
+ async def get_insertion_event_by_batch_id(
+ self, room_id: str, batch_id: str
+ ) -> Optional[str]:
"""Retrieve a insertion event ID.
Args:
@@ -30,7 +32,7 @@ class RoomBatchStore(SQLBaseStore):
"""
return await self.db_pool.simple_select_one_onecol(
table="insertion_events",
- keyvalues={"next_batch_id": batch_id},
+ keyvalues={"room_id": room_id, "next_batch_id": batch_id},
retcol="event_id",
allow_none=True,
)
|