summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/10877.feature1
-rw-r--r--synapse/handlers/message.py34
-rw-r--r--synapse/rest/client/room_batch.py6
-rw-r--r--synapse/storage/databases/main/room_batch.py6
4 files changed, 43 insertions, 4 deletions
diff --git a/changelog.d/10877.feature b/changelog.d/10877.feature
new file mode 100644
index 0000000000..06a246c108
--- /dev/null
+++ b/changelog.d/10877.feature
@@ -0,0 +1 @@
+Ensure `(room_id, next_batch_id)` is unique across [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) insertion events in rooms to avoid cross-talk/conflicts between batches.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index c66aefe2c4..07aadf3f3c 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -16,6 +16,7 @@
 # limitations under the License.
 import logging
 import random
+from http import HTTPStatus
 from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple
 
 from canonicaljson import encode_canonical_json
@@ -1461,6 +1462,39 @@ class EventCreationHandler:
             if prev_state_ids:
                 raise AuthError(403, "Changing the room create event is forbidden")
 
+        if event.type == EventTypes.MSC2716_INSERTION:
+            room_version = await self.store.get_room_version_id(event.room_id)
+            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+            create_event = await self.store.get_create_event_for_room(event.room_id)
+            room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
+
+            # Only check an insertion event if the room version
+            # supports it or the event is from the room creator.
+            if room_version_obj.msc2716_historical or (
+                self.config.experimental.msc2716_enabled
+                and event.sender == room_creator
+            ):
+                next_batch_id = event.content.get(
+                    EventContentFields.MSC2716_NEXT_BATCH_ID
+                )
+                conflicting_insertion_event_id = (
+                    await self.store.get_insertion_event_by_batch_id(
+                        event.room_id, next_batch_id
+                    )
+                )
+                if conflicting_insertion_event_id is not None:
+                    # The current insertion event that we're processing is invalid
+                    # because an insertion event already exists in the room with the
+                    # same next_batch_id. We can't allow multiple because the batch
+                    # pointing will get weird, e.g. we can't determine which insertion
+                    # event the batch event is pointing to.
+                    raise SynapseError(
+                        HTTPStatus.BAD_REQUEST,
+                        "Another insertion event already exists with the same next_batch_id",
+                        errcode=Codes.INVALID_PARAM,
+                    )
+
         # Mark any `m.historical` messages as backfilled so they don't appear
         # in `/sync` and have the proper decrementing `stream_ordering` as we import
         backfilled = False
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index bf14ec384e..1dffcc3147 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -306,11 +306,13 @@ class RoomBatchSendEventRestServlet(RestServlet):
             # Verify the batch_id_from_query corresponds to an actual insertion event
             # and have the batch connected.
             corresponding_insertion_event_id = (
-                await self.store.get_insertion_event_by_batch_id(batch_id_from_query)
+                await self.store.get_insertion_event_by_batch_id(
+                    room_id, batch_id_from_query
+                )
             )
             if corresponding_insertion_event_id is None:
                 raise SynapseError(
-                    400,
+                    HTTPStatus.BAD_REQUEST,
                     "No insertion event corresponds to the given ?batch_id",
                     errcode=Codes.INVALID_PARAM,
                 )
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index a383388757..300a563c9e 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -18,7 +18,9 @@ from synapse.storage._base import SQLBaseStore
 
 
 class RoomBatchStore(SQLBaseStore):
-    async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]:
+    async def get_insertion_event_by_batch_id(
+        self, room_id: str, batch_id: str
+    ) -> Optional[str]:
         """Retrieve a insertion event ID.
 
         Args:
@@ -30,7 +32,7 @@ class RoomBatchStore(SQLBaseStore):
         """
         return await self.db_pool.simple_select_one_onecol(
             table="insertion_events",
-            keyvalues={"next_batch_id": batch_id},
+            keyvalues={"room_id": room_id, "next_batch_id": batch_id},
             retcol="event_id",
             allow_none=True,
         )