diff options
Diffstat (limited to 'synapse/rest/client/room_batch.py')
-rw-r--r-- | synapse/rest/client/room_batch.py | 339 |
1 files changed, 56 insertions, 283 deletions
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 1dffcc3147..38ad4c2447 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -15,13 +15,12 @@ import logging import re from http import HTTPStatus -from typing import TYPE_CHECKING, Awaitable, List, Tuple +from typing import TYPE_CHECKING, Awaitable, Tuple from twisted.web.server import Request -from synapse.api.constants import EventContentFields, EventTypes +from synapse.api.constants import EventContentFields from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.appservice import ApplicationService from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, @@ -32,7 +31,7 @@ from synapse.http.servlet import ( ) from synapse.http.site import SynapseRequest from synapse.rest.client.transactions import HttpTransactionCache -from synapse.types import JsonDict, Requester, UserID, create_requester +from synapse.types import JsonDict from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -77,102 +76,12 @@ class RoomBatchSendEventRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() - self.hs = hs self.store = hs.get_datastore() - self.state_store = hs.get_storage().state self.event_creation_handler = hs.get_event_creation_handler() - self.room_member_handler = hs.get_room_member_handler() self.auth = hs.get_auth() + self.room_batch_handler = hs.get_room_batch_handler() self.txns = HttpTransactionCache(hs) - async def _inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: - ( - most_recent_prev_event_id, - most_recent_prev_event_depth, - ) = await self.store.get_max_depth_of(prev_event_ids) - - # We want to insert the historical event after the `prev_event` but before the successor event - # - # We inherit depth from the successor event instead of the `prev_event` - # because events returned from `/messages` are first sorted by `topological_ordering` - # which is just the `depth` and then tie-break with `stream_ordering`. - # - # We mark these inserted historical events as "backfilled" which gives them a - # negative `stream_ordering`. If we use the same depth as the `prev_event`, - # then our historical event will tie-break and be sorted before the `prev_event` - # when it should come after. - # - # We want to use the successor event depth so they appear after `prev_event` because - # it has a larger `depth` but before the successor event because the `stream_ordering` - # is negative before the successor event. - successor_event_ids = await self.store.get_successor_events( - [most_recent_prev_event_id] - ) - - # If we can't find any successor events, then it's a forward extremity of - # historical messages and we can just inherit from the previous historical - # event which we can already assume has the correct depth where we want - # to insert into. - if not successor_event_ids: - depth = most_recent_prev_event_depth - else: - ( - _, - oldest_successor_depth, - ) = await self.store.get_min_depth_of(successor_event_ids) - - depth = oldest_successor_depth - - return depth - - def _create_insertion_event_dict( - self, sender: str, room_id: str, origin_server_ts: int - ) -> JsonDict: - """Creates an event dict for an "insertion" event with the proper fields - and a random batch ID. - - Args: - sender: The event author MXID - room_id: The room ID that the event belongs to - origin_server_ts: Timestamp when the event was sent - - Returns: - The new event dictionary to insert. - """ - - next_batch_id = random_string(8) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": sender, - "room_id": room_id, - "content": { - EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, - "origin_server_ts": origin_server_ts, - } - - return insertion_event - - async def _create_requester_for_user_id_from_app_service( - self, user_id: str, app_service: ApplicationService - ) -> Requester: - """Creates a new requester for the given user_id - and validates that the app service is allowed to control - the given user. - - Args: - user_id: The author MXID that the app service is controlling - app_service: The app service that controls the user - - Returns: - Requester object - """ - - await self.auth.validate_appservice_can_control_user_id(app_service, user_id) - - return create_requester(user_id, app_service=app_service) - async def on_POST( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: @@ -200,123 +109,62 @@ class RoomBatchSendEventRestServlet(RestServlet): errcode=Codes.MISSING_PARAM, ) + # Verify the batch_id_from_query corresponds to an actual insertion event + # and have the batch connected. + if batch_id_from_query: + corresponding_insertion_event_id = ( + await self.store.get_insertion_event_by_batch_id( + room_id, batch_id_from_query + ) + ) + if corresponding_insertion_event_id is None: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "No insertion event corresponds to the given ?batch_id", + errcode=Codes.INVALID_PARAM, + ) + # For the event we are inserting next to (`prev_event_ids_from_query`), # find the most recent auth events (derived from state events) that # allowed that message to be sent. We will use that as a base # to auth our historical messages against. - ( - most_recent_prev_event_id, - _, - ) = await self.store.get_max_depth_of(prev_event_ids_from_query) - # mapping from (type, state_key) -> state_event_id - prev_state_map = await self.state_store.get_state_ids_for_event( - most_recent_prev_event_id + auth_event_ids = await self.room_batch_handler.get_most_recent_auth_event_ids_from_event_id_list( + prev_event_ids_from_query ) - # List of state event ID's - prev_state_ids = list(prev_state_map.values()) - auth_event_ids = prev_state_ids - - state_event_ids_at_start = [] - for state_event in body["state_events_at_start"]: - assert_params_in_dict( - state_event, ["type", "origin_server_ts", "content", "sender"] - ) - logger.debug( - "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s", - state_event, - auth_event_ids, + # Create and persist all of the state events that float off on their own + # before the batch. These will most likely be all of the invite/member + # state events used to auth the upcoming historical messages. + state_event_ids_at_start = ( + await self.room_batch_handler.persist_state_events_at_start( + state_events_at_start=body["state_events_at_start"], + room_id=room_id, + initial_auth_event_ids=auth_event_ids, + app_service_requester=requester, ) + ) + # Update our ongoing auth event ID list with all of the new state we + # just created + auth_event_ids.extend(state_event_ids_at_start) - event_dict = { - "type": state_event["type"], - "origin_server_ts": state_event["origin_server_ts"], - "content": state_event["content"], - "room_id": room_id, - "sender": state_event["sender"], - "state_key": state_event["state_key"], - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - # Make the state events float off on their own - fake_prev_event_id = "$" + random_string(43) - - # TODO: This is pretty much the same as some other code to handle inserting state in this file - if event_dict["type"] == EventTypes.Member: - membership = event_dict["content"].get("membership", None) - event_id, _ = await self.room_member_handler.update_membership( - await self._create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service - ), - target=UserID.from_string(event_dict["state_key"]), - room_id=room_id, - action=membership, - content=event_dict["content"], - outlier=True, - prev_event_ids=[fake_prev_event_id], - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - auth_event_ids=auth_event_ids.copy(), - ) - else: - # TODO: Add some complement tests that adds state that is not member joins - # and will use this code path. Maybe we only want to support join state events - # and can get rid of this `else`? - ( - event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self._create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service - ), - event_dict, - outlier=True, - prev_event_ids=[fake_prev_event_id], - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - auth_event_ids=auth_event_ids.copy(), - ) - event_id = event.event_id - - state_event_ids_at_start.append(event_id) - auth_event_ids.append(event_id) - - events_to_create = body["events"] - - inherited_depth = await self._inherit_depth_from_prev_ids( + inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids( prev_event_ids_from_query ) + events_to_create = body["events"] + # Figure out which batch to connect to. If they passed in # batch_id_from_query let's use it. The batch ID passed in comes # from the batch_id in the "insertion" event from the previous batch. last_event_in_batch = events_to_create[-1] - batch_id_to_connect_to = batch_id_from_query base_insertion_event = None if batch_id_from_query: + batch_id_to_connect_to = batch_id_from_query # All but the first base insertion event should point at a fake # event, which causes the HS to ask for the state at the start of # the batch later. + fake_prev_event_id = "$" + random_string(43) prev_event_ids = [fake_prev_event_id] - - # Verify the batch_id_from_query corresponds to an actual insertion event - # and have the batch connected. - corresponding_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( - room_id, batch_id_from_query - ) - ) - if corresponding_insertion_event_id is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "No insertion event corresponds to the given ?batch_id", - errcode=Codes.INVALID_PARAM, - ) - pass # Otherwise, create an insertion event to act as a starting point. # # We don't always have an insertion event to start hanging more history @@ -327,10 +175,12 @@ class RoomBatchSendEventRestServlet(RestServlet): else: prev_event_ids = prev_event_ids_from_query - base_insertion_event_dict = self._create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - origin_server_ts=last_event_in_batch["origin_server_ts"], + base_insertion_event_dict = ( + self.room_batch_handler.create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + origin_server_ts=last_event_in_batch["origin_server_ts"], + ) ) base_insertion_event_dict["prev_events"] = prev_event_ids.copy() @@ -338,7 +188,7 @@ class RoomBatchSendEventRestServlet(RestServlet): base_insertion_event, _, ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self._create_requester_for_user_id_from_app_service( + await self.room_batch_handler.create_requester_for_user_id_from_app_service( base_insertion_event_dict["sender"], requester.app_service, ), @@ -353,92 +203,17 @@ class RoomBatchSendEventRestServlet(RestServlet): EventContentFields.MSC2716_NEXT_BATCH_ID ] - # Connect this current batch to the insertion event from the previous batch - batch_event = { - "type": EventTypes.MSC2716_BATCH, - "sender": requester.user.to_string(), - "room_id": room_id, - "content": { - EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, - EventContentFields.MSC2716_HISTORICAL: True, - }, - # Since the batch event is put at the end of the batch, - # where the newest-in-time event is, copy the origin_server_ts from - # the last event we're inserting - "origin_server_ts": last_event_in_batch["origin_server_ts"], - } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) - - # Add an "insertion" event to the start of each batch (next to the oldest-in-time - # event in the batch) so the next batch can be connected to this one. - insertion_event = self._create_insertion_event_dict( - sender=requester.user.to_string(), + # Create and persist all of the historical events as well as insertion + # and batch meta events to make the batch navigable in the DAG. + event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( + events_to_create=events_to_create, room_id=room_id, - # Since the insertion event is put at the start of the batch, - # where the oldest-in-time event is, copy the origin_server_ts from - # the first event we're inserting - origin_server_ts=events_to_create[0]["origin_server_ts"], + batch_id_to_connect_to=batch_id_to_connect_to, + initial_prev_event_ids=prev_event_ids, + inherited_depth=inherited_depth, + auth_event_ids=auth_event_ids, + app_service_requester=requester, ) - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create - - event_ids = [] - events_to_persist = [] - for ev in events_to_create: - assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) - - event_dict = { - "type": ev["type"], - "origin_server_ts": ev["origin_server_ts"], - "content": ev["content"], - "room_id": room_id, - "sender": ev["sender"], # requester.user.to_string(), - "prev_events": prev_event_ids.copy(), - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - event, context = await self.event_creation_handler.create_event( - await self._create_requester_for_user_id_from_app_service( - ev["sender"], requester.app_service - ), - event_dict, - prev_event_ids=event_dict.get("prev_events"), - auth_event_ids=auth_event_ids, - historical=True, - depth=inherited_depth, - ) - logger.debug( - "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", - event, - prev_event_ids, - auth_event_ids, - ) - - assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( - event.sender, - ) - - events_to_persist.append((event, context)) - event_id = event.event_id - - event_ids.append(event_id) - prev_event_ids = [event_id] - - # Persist events in reverse-chronological order so they have the - # correct stream_ordering as they are backfilled (which decrements). - # Events are sorted by (topological_ordering, stream_ordering) - # where topological_ordering is just depth. - for (event, context) in reversed(events_to_persist): - ev = await self.event_creation_handler.handle_new_client_event( - await self._create_requester_for_user_id_from_app_service( - event["sender"], requester.app_service - ), - event=event, - context=context, - ) insertion_event_id = event_ids[0] batch_event_id = event_ids[-1] @@ -447,9 +222,7 @@ class RoomBatchSendEventRestServlet(RestServlet): response_dict = { "state_event_ids": state_event_ids_at_start, "event_ids": historical_event_ids, - "next_batch_id": insertion_event["content"][ - EventContentFields.MSC2716_NEXT_BATCH_ID - ], + "next_batch_id": next_batch_id, "insertion_event_id": insertion_event_id, "batch_event_id": batch_event_id, } |