diff options
author | Eric Eastwood <erice@element.io> | 2021-09-28 21:23:16 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-09-28 21:23:16 -0500 |
commit | 9fd057b8c5a8c5748e7d8137d1485c38abd9602f (patch) | |
tree | cc5217c451f43f2cd61da15f851e3d0c7fab664d /synapse | |
parent | Update utility code to handle C implementations of frozendict (#10902) (diff) | |
download | synapse-9fd057b8c5a8c5748e7d8137d1485c38abd9602f.tar.xz |
Ensure `(room_id, next_batch_id)` is unique to avoid cross-talk/conflicts between batches (MSC2716) (#10877)
Part of [MSC2716](https://github.com/matrix-org/matrix-doc/pull/2716) Part of https://github.com/matrix-org/synapse/issues/10737
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/message.py | 34 | ||||
-rw-r--r-- | synapse/rest/client/room_batch.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/room_batch.py | 6 |
3 files changed, 42 insertions, 4 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index c66aefe2c4..07aadf3f3c 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -16,6 +16,7 @@ # limitations under the License. import logging import random +from http import HTTPStatus from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Tuple from canonicaljson import encode_canonical_json @@ -1461,6 +1462,39 @@ class EventCreationHandler: if prev_state_ids: raise AuthError(403, "Changing the room create event is forbidden") + if event.type == EventTypes.MSC2716_INSERTION: + room_version = await self.store.get_room_version_id(event.room_id) + room_version_obj = KNOWN_ROOM_VERSIONS[room_version] + + create_event = await self.store.get_create_event_for_room(event.room_id) + room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR) + + # Only check an insertion event if the room version + # supports it or the event is from the room creator. + if room_version_obj.msc2716_historical or ( + self.config.experimental.msc2716_enabled + and event.sender == room_creator + ): + next_batch_id = event.content.get( + EventContentFields.MSC2716_NEXT_BATCH_ID + ) + conflicting_insertion_event_id = ( + await self.store.get_insertion_event_by_batch_id( + event.room_id, next_batch_id + ) + ) + if conflicting_insertion_event_id is not None: + # The current insertion event that we're processing is invalid + # because an insertion event already exists in the room with the + # same next_batch_id. We can't allow multiple because the batch + # pointing will get weird, e.g. we can't determine which insertion + # event the batch event is pointing to. + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "Another insertion event already exists with the same next_batch_id", + errcode=Codes.INVALID_PARAM, + ) + # Mark any `m.historical` messages as backfilled so they don't appear # in `/sync` and have the proper decrementing `stream_ordering` as we import backfilled = False diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index bf14ec384e..1dffcc3147 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -306,11 +306,13 @@ class RoomBatchSendEventRestServlet(RestServlet): # Verify the batch_id_from_query corresponds to an actual insertion event # and have the batch connected. corresponding_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id(batch_id_from_query) + await self.store.get_insertion_event_by_batch_id( + room_id, batch_id_from_query + ) ) if corresponding_insertion_event_id is None: raise SynapseError( - 400, + HTTPStatus.BAD_REQUEST, "No insertion event corresponds to the given ?batch_id", errcode=Codes.INVALID_PARAM, ) diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index a383388757..300a563c9e 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -18,7 +18,9 @@ from synapse.storage._base import SQLBaseStore class RoomBatchStore(SQLBaseStore): - async def get_insertion_event_by_batch_id(self, batch_id: str) -> Optional[str]: + async def get_insertion_event_by_batch_id( + self, room_id: str, batch_id: str + ) -> Optional[str]: """Retrieve a insertion event ID. Args: @@ -30,7 +32,7 @@ class RoomBatchStore(SQLBaseStore): """ return await self.db_pool.simple_select_one_onecol( table="insertion_events", - keyvalues={"next_batch_id": batch_id}, + keyvalues={"room_id": room_id, "next_batch_id": batch_id}, retcol="event_id", allow_none=True, ) |