diff options
Diffstat (limited to 'synapse/rest/client')
-rw-r--r-- | synapse/rest/client/v1/room.py | 112 |
1 files changed, 89 insertions, 23 deletions
diff --git a/synapse/rest/client/v1/room.py b/synapse/rest/client/v1/room.py index 92ebe838fd..9c58e3689e 100644 --- a/synapse/rest/client/v1/room.py +++ b/synapse/rest/client/v1/room.py @@ -349,6 +349,35 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): return depth + def _create_insertion_event_dict( + self, sender: str, room_id: str, origin_server_ts: int + ): + """Creates an event dict for an "insertion" event with the proper fields + and a random chunk ID. + + Args: + sender: The event author MXID + room_id: The room ID that the event belongs to + origin_server_ts: Timestamp when the event was sent + + Returns: + Tuple of event ID and stream ordering position + """ + + next_chunk_id = random_string(8) + insertion_event = { + "type": EventTypes.MSC2716_INSERTION, + "sender": sender, + "room_id": room_id, + "content": { + EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id, + EventContentFields.MSC2716_HISTORICAL: True, + }, + "origin_server_ts": origin_server_ts, + } + + return insertion_event + async def on_POST(self, request, room_id): requester = await self.auth.get_user_by_req(request, allow_guest=False) @@ -449,37 +478,68 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): events_to_create = body["events"] - # If provided, connect the chunk to the last insertion point - # The chunk ID passed in comes from the chunk_id in the - # "insertion" event from the previous chunk. + prev_event_ids = prev_events_from_query + inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query) + + # Figure out which chunk to connect to. If they passed in + # chunk_id_from_query let's use it. The chunk ID passed in comes + # from the chunk_id in the "insertion" event from the previous chunk. + last_event_in_chunk = events_to_create[-1] + chunk_id_to_connect_to = chunk_id_from_query + base_insertion_event = None if chunk_id_from_query: - last_event_in_chunk = events_to_create[-1] - last_event_in_chunk["content"][ - EventContentFields.MSC2716_CHUNK_ID - ] = chunk_id_from_query + # TODO: Verify the chunk_id_from_query corresponds to an insertion event + pass + # Otherwise, create an insertion event to act as a starting point. + # + # We don't always have an insertion event to start hanging more history + # off of (ideally there would be one in the main DAG, but that's not the + # case if we're wanting to add history to e.g. existing rooms without + # an insertion event), in which case we just create a new insertion event + # that can then get pointed to by a "marker" event later. + else: + base_insertion_event_dict = self._create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + origin_server_ts=last_event_in_chunk["origin_server_ts"], + ) + base_insertion_event_dict["prev_events"] = prev_event_ids.copy() - # Add an "insertion" event to the start of each chunk (next to the oldest + ( + base_insertion_event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + requester, + base_insertion_event_dict, + prev_event_ids=base_insertion_event_dict.get("prev_events"), + auth_event_ids=auth_event_ids, + historical=True, + depth=inherited_depth, + ) + + chunk_id_to_connect_to = base_insertion_event["content"][ + EventContentFields.MSC2716_NEXT_CHUNK_ID + ] + + # Connect this current chunk to the insertion event from the previous chunk + last_event_in_chunk["content"][ + EventContentFields.MSC2716_CHUNK_ID + ] = chunk_id_to_connect_to + + # Add an "insertion" event to the start of each chunk (next to the oldest-in-time # event in the chunk) so the next chunk can be connected to this one. - next_chunk_id = random_string(64) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": requester.user.to_string(), - "content": { - EventContentFields.MSC2716_NEXT_CHUNK_ID: next_chunk_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, + insertion_event = self._create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, # Since the insertion event is put at the start of the chunk, - # where the oldest event is, copy the origin_server_ts from + # where the oldest-in-time event is, copy the origin_server_ts from # the first event we're inserting - "origin_server_ts": events_to_create[0]["origin_server_ts"], - } + origin_server_ts=events_to_create[0]["origin_server_ts"], + ) # Prepend the insertion event to the start of the chunk events_to_create = [insertion_event] + events_to_create - inherited_depth = await self.inherit_depth_from_prev_ids(prev_events_from_query) - event_ids = [] - prev_event_ids = prev_events_from_query events_to_persist = [] for ev in events_to_create: assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) @@ -533,10 +593,16 @@ class RoomBatchSendEventRestServlet(TransactionRestServlet): context=context, ) + # Add the base_insertion_event to the bottom of the list we return + if base_insertion_event is not None: + event_ids.append(base_insertion_event.event_id) + return 200, { "state_events": auth_event_ids, "events": event_ids, - "next_chunk_id": next_chunk_id, + "next_chunk_id": insertion_event["content"][ + EventContentFields.MSC2716_NEXT_CHUNK_ID + ], } def on_GET(self, request, room_id): |