diff options
author | Eric Eastwood <erice@element.io> | 2021-10-08 18:35:00 -0500 |
---|---|---|
committer | GitHub <noreply@github.com> | 2021-10-08 18:35:00 -0500 |
commit | a7d22c36dbbbdd396aeb8938b57b5fd7edb689f3 (patch) | |
tree | 9ed85aacdb70bd64e8d8ff95c9bb5be568914ab8 /synapse | |
parent | Autodiscover oEmbed endpoint from returned HTML (#10822) (diff) | |
download | synapse-a7d22c36dbbbdd396aeb8938b57b5fd7edb689f3.tar.xz |
Refactor MSC2716 `/batch_send` endpoint into separate handler functions (#10974)
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/room_batch.py | 423 | ||||
-rw-r--r-- | synapse/rest/client/room_batch.py | 339 | ||||
-rw-r--r-- | synapse/server.py | 5 |
3 files changed, 484 insertions, 283 deletions
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py new file mode 100644 index 0000000000..51dd4e7555 --- /dev/null +++ b/synapse/handlers/room_batch.py @@ -0,0 +1,423 @@ +import logging +from typing import TYPE_CHECKING, List, Tuple + +from synapse.api.constants import EventContentFields, EventTypes +from synapse.appservice import ApplicationService +from synapse.http.servlet import assert_params_in_dict +from synapse.types import JsonDict, Requester, UserID, create_requester +from synapse.util.stringutils import random_string + +if TYPE_CHECKING: + from synapse.server import HomeServer + +logger = logging.getLogger(__name__) + + +class RoomBatchHandler: + def __init__(self, hs: "HomeServer"): + self.hs = hs + self.store = hs.get_datastore() + self.state_store = hs.get_storage().state + self.event_creation_handler = hs.get_event_creation_handler() + self.room_member_handler = hs.get_room_member_handler() + self.auth = hs.get_auth() + + async def inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: + """Finds the depth which would sort it after the most-recent + prev_event_id but before the successors of those events. If no + successors are found, we assume it's an historical extremity part of the + current batch and use the same depth of the prev_event_ids. + + Args: + prev_event_ids: List of prev event IDs + + Returns: + Inherited depth + """ + ( + most_recent_prev_event_id, + most_recent_prev_event_depth, + ) = await self.store.get_max_depth_of(prev_event_ids) + + # We want to insert the historical event after the `prev_event` but before the successor event + # + # We inherit depth from the successor event instead of the `prev_event` + # because events returned from `/messages` are first sorted by `topological_ordering` + # which is just the `depth` and then tie-break with `stream_ordering`. + # + # We mark these inserted historical events as "backfilled" which gives them a + # negative `stream_ordering`. If we use the same depth as the `prev_event`, + # then our historical event will tie-break and be sorted before the `prev_event` + # when it should come after. + # + # We want to use the successor event depth so they appear after `prev_event` because + # it has a larger `depth` but before the successor event because the `stream_ordering` + # is negative before the successor event. + successor_event_ids = await self.store.get_successor_events( + [most_recent_prev_event_id] + ) + + # If we can't find any successor events, then it's a forward extremity of + # historical messages and we can just inherit from the previous historical + # event which we can already assume has the correct depth where we want + # to insert into. + if not successor_event_ids: + depth = most_recent_prev_event_depth + else: + ( + _, + oldest_successor_depth, + ) = await self.store.get_min_depth_of(successor_event_ids) + + depth = oldest_successor_depth + + return depth + + def create_insertion_event_dict( + self, sender: str, room_id: str, origin_server_ts: int + ) -> JsonDict: + """Creates an event dict for an "insertion" event with the proper fields + and a random batch ID. + + Args: + sender: The event author MXID + room_id: The room ID that the event belongs to + origin_server_ts: Timestamp when the event was sent + + Returns: + The new event dictionary to insert. + """ + + next_batch_id = random_string(8) + insertion_event = { + "type": EventTypes.MSC2716_INSERTION, + "sender": sender, + "room_id": room_id, + "content": { + EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, + EventContentFields.MSC2716_HISTORICAL: True, + }, + "origin_server_ts": origin_server_ts, + } + + return insertion_event + + async def create_requester_for_user_id_from_app_service( + self, user_id: str, app_service: ApplicationService + ) -> Requester: + """Creates a new requester for the given user_id + and validates that the app service is allowed to control + the given user. + + Args: + user_id: The author MXID that the app service is controlling + app_service: The app service that controls the user + + Returns: + Requester object + """ + + await self.auth.validate_appservice_can_control_user_id(app_service, user_id) + + return create_requester(user_id, app_service=app_service) + + async def get_most_recent_auth_event_ids_from_event_id_list( + self, event_ids: List[str] + ) -> List[str]: + """Find the most recent auth event ids (derived from state events) that + allowed that message to be sent. We will use this as a base + to auth our historical messages against. + + Args: + event_ids: List of event ID's to look at + + Returns: + List of event ID's + """ + + ( + most_recent_prev_event_id, + _, + ) = await self.store.get_max_depth_of(event_ids) + # mapping from (type, state_key) -> state_event_id + prev_state_map = await self.state_store.get_state_ids_for_event( + most_recent_prev_event_id + ) + # List of state event ID's + prev_state_ids = list(prev_state_map.values()) + auth_event_ids = prev_state_ids + + return auth_event_ids + + async def persist_state_events_at_start( + self, + state_events_at_start: List[JsonDict], + room_id: str, + initial_auth_event_ids: List[str], + app_service_requester: Requester, + ) -> List[str]: + """Takes all `state_events_at_start` event dictionaries and creates/persists + them as floating state events which don't resolve into the current room state. + They are floating because they reference a fake prev_event which doesn't connect + to the normal DAG at all. + + Args: + state_events_at_start: + room_id: Room where you want the events persisted in. + initial_auth_event_ids: These will be the auth_events for the first + state event created. Each event created afterwards will be + added to the list of auth events for the next state event + created. + app_service_requester: The requester of an application service. + + Returns: + List of state event ID's we just persisted + """ + assert app_service_requester.app_service + + state_event_ids_at_start = [] + auth_event_ids = initial_auth_event_ids.copy() + for state_event in state_events_at_start: + assert_params_in_dict( + state_event, ["type", "origin_server_ts", "content", "sender"] + ) + + logger.debug( + "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s", + state_event, + auth_event_ids, + ) + + event_dict = { + "type": state_event["type"], + "origin_server_ts": state_event["origin_server_ts"], + "content": state_event["content"], + "room_id": room_id, + "sender": state_event["sender"], + "state_key": state_event["state_key"], + } + + # Mark all events as historical + event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True + + # Make the state events float off on their own so we don't have a + # bunch of `@mxid joined the room` noise between each batch + fake_prev_event_id = "$" + random_string(43) + + # TODO: This is pretty much the same as some other code to handle inserting state in this file + if event_dict["type"] == EventTypes.Member: + membership = event_dict["content"].get("membership", None) + event_id, _ = await self.room_member_handler.update_membership( + await self.create_requester_for_user_id_from_app_service( + state_event["sender"], app_service_requester.app_service + ), + target=UserID.from_string(event_dict["state_key"]), + room_id=room_id, + action=membership, + content=event_dict["content"], + outlier=True, + prev_event_ids=[fake_prev_event_id], + # Make sure to use a copy of this list because we modify it + # later in the loop here. Otherwise it will be the same + # reference and also update in the event when we append later. + auth_event_ids=auth_event_ids.copy(), + ) + else: + # TODO: Add some complement tests that adds state that is not member joins + # and will use this code path. Maybe we only want to support join state events + # and can get rid of this `else`? + ( + event, + _, + ) = await self.event_creation_handler.create_and_send_nonmember_event( + await self.create_requester_for_user_id_from_app_service( + state_event["sender"], app_service_requester.app_service + ), + event_dict, + outlier=True, + prev_event_ids=[fake_prev_event_id], + # Make sure to use a copy of this list because we modify it + # later in the loop here. Otherwise it will be the same + # reference and also update in the event when we append later. + auth_event_ids=auth_event_ids.copy(), + ) + event_id = event.event_id + + state_event_ids_at_start.append(event_id) + auth_event_ids.append(event_id) + + return state_event_ids_at_start + + async def persist_historical_events( + self, + events_to_create: List[JsonDict], + room_id: str, + initial_prev_event_ids: List[str], + inherited_depth: int, + auth_event_ids: List[str], + app_service_requester: Requester, + ) -> List[str]: + """Create and persists all events provided sequentially. Handles the + complexity of creating events in chronological order so they can + reference each other by prev_event but still persists in + reverse-chronoloical order so they have the correct + (topological_ordering, stream_ordering) and sort correctly from + /messages. + + Args: + events_to_create: List of historical events to create in JSON + dictionary format. + room_id: Room where you want the events persisted in. + initial_prev_event_ids: These will be the prev_events for the first + event created. Each event created afterwards will point to the + previous event created. + inherited_depth: The depth to create the events at (you will + probably by calling inherit_depth_from_prev_ids(...)). + auth_event_ids: Define which events allow you to create the given + event in the room. + app_service_requester: The requester of an application service. + + Returns: + List of persisted event IDs + """ + assert app_service_requester.app_service + + prev_event_ids = initial_prev_event_ids.copy() + + event_ids = [] + events_to_persist = [] + for ev in events_to_create: + assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) + + event_dict = { + "type": ev["type"], + "origin_server_ts": ev["origin_server_ts"], + "content": ev["content"], + "room_id": room_id, + "sender": ev["sender"], # requester.user.to_string(), + "prev_events": prev_event_ids.copy(), + } + + # Mark all events as historical + event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True + + event, context = await self.event_creation_handler.create_event( + await self.create_requester_for_user_id_from_app_service( + ev["sender"], app_service_requester.app_service + ), + event_dict, + prev_event_ids=event_dict.get("prev_events"), + auth_event_ids=auth_event_ids, + historical=True, + depth=inherited_depth, + ) + logger.debug( + "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", + event, + prev_event_ids, + auth_event_ids, + ) + + assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( + event.sender, + ) + + events_to_persist.append((event, context)) + event_id = event.event_id + + event_ids.append(event_id) + prev_event_ids = [event_id] + + # Persist events in reverse-chronological order so they have the + # correct stream_ordering as they are backfilled (which decrements). + # Events are sorted by (topological_ordering, stream_ordering) + # where topological_ordering is just depth. + for (event, context) in reversed(events_to_persist): + await self.event_creation_handler.handle_new_client_event( + await self.create_requester_for_user_id_from_app_service( + event["sender"], app_service_requester.app_service + ), + event=event, + context=context, + ) + + return event_ids + + async def handle_batch_of_events( + self, + events_to_create: List[JsonDict], + room_id: str, + batch_id_to_connect_to: str, + initial_prev_event_ids: List[str], + inherited_depth: int, + auth_event_ids: List[str], + app_service_requester: Requester, + ) -> Tuple[List[str], str]: + """ + Handles creating and persisting all of the historical events as well + as insertion and batch meta events to make the batch navigable in the DAG. + + Args: + events_to_create: List of historical events to create in JSON + dictionary format. + room_id: Room where you want the events created in. + batch_id_to_connect_to: The batch_id from the insertion event you + want this batch to connect to. + initial_prev_event_ids: These will be the prev_events for the first + event created. Each event created afterwards will point to the + previous event created. + inherited_depth: The depth to create the events at (you will + probably by calling inherit_depth_from_prev_ids(...)). + auth_event_ids: Define which events allow you to create the given + event in the room. + app_service_requester: The requester of an application service. + + Returns: + Tuple containing a list of created events and the next_batch_id + """ + + # Connect this current batch to the insertion event from the previous batch + last_event_in_batch = events_to_create[-1] + batch_event = { + "type": EventTypes.MSC2716_BATCH, + "sender": app_service_requester.user.to_string(), + "room_id": room_id, + "content": { + EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, + EventContentFields.MSC2716_HISTORICAL: True, + }, + # Since the batch event is put at the end of the batch, + # where the newest-in-time event is, copy the origin_server_ts from + # the last event we're inserting + "origin_server_ts": last_event_in_batch["origin_server_ts"], + } + # Add the batch event to the end of the batch (newest-in-time) + events_to_create.append(batch_event) + + # Add an "insertion" event to the start of each batch (next to the oldest-in-time + # event in the batch) so the next batch can be connected to this one. + insertion_event = self.create_insertion_event_dict( + sender=app_service_requester.user.to_string(), + room_id=room_id, + # Since the insertion event is put at the start of the batch, + # where the oldest-in-time event is, copy the origin_server_ts from + # the first event we're inserting + origin_server_ts=events_to_create[0]["origin_server_ts"], + ) + next_batch_id = insertion_event["content"][ + EventContentFields.MSC2716_NEXT_BATCH_ID + ] + # Prepend the insertion event to the start of the batch (oldest-in-time) + events_to_create = [insertion_event] + events_to_create + + # Create and persist all of the historical events + event_ids = await self.persist_historical_events( + events_to_create=events_to_create, + room_id=room_id, + initial_prev_event_ids=initial_prev_event_ids, + inherited_depth=inherited_depth, + auth_event_ids=auth_event_ids, + app_service_requester=app_service_requester, + ) + + return event_ids, next_batch_id diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py index 1dffcc3147..38ad4c2447 100644 --- a/synapse/rest/client/room_batch.py +++ b/synapse/rest/client/room_batch.py @@ -15,13 +15,12 @@ import logging import re from http import HTTPStatus -from typing import TYPE_CHECKING, Awaitable, List, Tuple +from typing import TYPE_CHECKING, Awaitable, Tuple from twisted.web.server import Request -from synapse.api.constants import EventContentFields, EventTypes +from synapse.api.constants import EventContentFields from synapse.api.errors import AuthError, Codes, SynapseError -from synapse.appservice import ApplicationService from synapse.http.server import HttpServer from synapse.http.servlet import ( RestServlet, @@ -32,7 +31,7 @@ from synapse.http.servlet import ( ) from synapse.http.site import SynapseRequest from synapse.rest.client.transactions import HttpTransactionCache -from synapse.types import JsonDict, Requester, UserID, create_requester +from synapse.types import JsonDict from synapse.util.stringutils import random_string if TYPE_CHECKING: @@ -77,102 +76,12 @@ class RoomBatchSendEventRestServlet(RestServlet): def __init__(self, hs: "HomeServer"): super().__init__() - self.hs = hs self.store = hs.get_datastore() - self.state_store = hs.get_storage().state self.event_creation_handler = hs.get_event_creation_handler() - self.room_member_handler = hs.get_room_member_handler() self.auth = hs.get_auth() + self.room_batch_handler = hs.get_room_batch_handler() self.txns = HttpTransactionCache(hs) - async def _inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int: - ( - most_recent_prev_event_id, - most_recent_prev_event_depth, - ) = await self.store.get_max_depth_of(prev_event_ids) - - # We want to insert the historical event after the `prev_event` but before the successor event - # - # We inherit depth from the successor event instead of the `prev_event` - # because events returned from `/messages` are first sorted by `topological_ordering` - # which is just the `depth` and then tie-break with `stream_ordering`. - # - # We mark these inserted historical events as "backfilled" which gives them a - # negative `stream_ordering`. If we use the same depth as the `prev_event`, - # then our historical event will tie-break and be sorted before the `prev_event` - # when it should come after. - # - # We want to use the successor event depth so they appear after `prev_event` because - # it has a larger `depth` but before the successor event because the `stream_ordering` - # is negative before the successor event. - successor_event_ids = await self.store.get_successor_events( - [most_recent_prev_event_id] - ) - - # If we can't find any successor events, then it's a forward extremity of - # historical messages and we can just inherit from the previous historical - # event which we can already assume has the correct depth where we want - # to insert into. - if not successor_event_ids: - depth = most_recent_prev_event_depth - else: - ( - _, - oldest_successor_depth, - ) = await self.store.get_min_depth_of(successor_event_ids) - - depth = oldest_successor_depth - - return depth - - def _create_insertion_event_dict( - self, sender: str, room_id: str, origin_server_ts: int - ) -> JsonDict: - """Creates an event dict for an "insertion" event with the proper fields - and a random batch ID. - - Args: - sender: The event author MXID - room_id: The room ID that the event belongs to - origin_server_ts: Timestamp when the event was sent - - Returns: - The new event dictionary to insert. - """ - - next_batch_id = random_string(8) - insertion_event = { - "type": EventTypes.MSC2716_INSERTION, - "sender": sender, - "room_id": room_id, - "content": { - EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id, - EventContentFields.MSC2716_HISTORICAL: True, - }, - "origin_server_ts": origin_server_ts, - } - - return insertion_event - - async def _create_requester_for_user_id_from_app_service( - self, user_id: str, app_service: ApplicationService - ) -> Requester: - """Creates a new requester for the given user_id - and validates that the app service is allowed to control - the given user. - - Args: - user_id: The author MXID that the app service is controlling - app_service: The app service that controls the user - - Returns: - Requester object - """ - - await self.auth.validate_appservice_can_control_user_id(app_service, user_id) - - return create_requester(user_id, app_service=app_service) - async def on_POST( self, request: SynapseRequest, room_id: str ) -> Tuple[int, JsonDict]: @@ -200,123 +109,62 @@ class RoomBatchSendEventRestServlet(RestServlet): errcode=Codes.MISSING_PARAM, ) + # Verify the batch_id_from_query corresponds to an actual insertion event + # and have the batch connected. + if batch_id_from_query: + corresponding_insertion_event_id = ( + await self.store.get_insertion_event_by_batch_id( + room_id, batch_id_from_query + ) + ) + if corresponding_insertion_event_id is None: + raise SynapseError( + HTTPStatus.BAD_REQUEST, + "No insertion event corresponds to the given ?batch_id", + errcode=Codes.INVALID_PARAM, + ) + # For the event we are inserting next to (`prev_event_ids_from_query`), # find the most recent auth events (derived from state events) that # allowed that message to be sent. We will use that as a base # to auth our historical messages against. - ( - most_recent_prev_event_id, - _, - ) = await self.store.get_max_depth_of(prev_event_ids_from_query) - # mapping from (type, state_key) -> state_event_id - prev_state_map = await self.state_store.get_state_ids_for_event( - most_recent_prev_event_id + auth_event_ids = await self.room_batch_handler.get_most_recent_auth_event_ids_from_event_id_list( + prev_event_ids_from_query ) - # List of state event ID's - prev_state_ids = list(prev_state_map.values()) - auth_event_ids = prev_state_ids - - state_event_ids_at_start = [] - for state_event in body["state_events_at_start"]: - assert_params_in_dict( - state_event, ["type", "origin_server_ts", "content", "sender"] - ) - logger.debug( - "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s", - state_event, - auth_event_ids, + # Create and persist all of the state events that float off on their own + # before the batch. These will most likely be all of the invite/member + # state events used to auth the upcoming historical messages. + state_event_ids_at_start = ( + await self.room_batch_handler.persist_state_events_at_start( + state_events_at_start=body["state_events_at_start"], + room_id=room_id, + initial_auth_event_ids=auth_event_ids, + app_service_requester=requester, ) + ) + # Update our ongoing auth event ID list with all of the new state we + # just created + auth_event_ids.extend(state_event_ids_at_start) - event_dict = { - "type": state_event["type"], - "origin_server_ts": state_event["origin_server_ts"], - "content": state_event["content"], - "room_id": room_id, - "sender": state_event["sender"], - "state_key": state_event["state_key"], - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - # Make the state events float off on their own - fake_prev_event_id = "$" + random_string(43) - - # TODO: This is pretty much the same as some other code to handle inserting state in this file - if event_dict["type"] == EventTypes.Member: - membership = event_dict["content"].get("membership", None) - event_id, _ = await self.room_member_handler.update_membership( - await self._create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service - ), - target=UserID.from_string(event_dict["state_key"]), - room_id=room_id, - action=membership, - content=event_dict["content"], - outlier=True, - prev_event_ids=[fake_prev_event_id], - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - auth_event_ids=auth_event_ids.copy(), - ) - else: - # TODO: Add some complement tests that adds state that is not member joins - # and will use this code path. Maybe we only want to support join state events - # and can get rid of this `else`? - ( - event, - _, - ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self._create_requester_for_user_id_from_app_service( - state_event["sender"], requester.app_service - ), - event_dict, - outlier=True, - prev_event_ids=[fake_prev_event_id], - # Make sure to use a copy of this list because we modify it - # later in the loop here. Otherwise it will be the same - # reference and also update in the event when we append later. - auth_event_ids=auth_event_ids.copy(), - ) - event_id = event.event_id - - state_event_ids_at_start.append(event_id) - auth_event_ids.append(event_id) - - events_to_create = body["events"] - - inherited_depth = await self._inherit_depth_from_prev_ids( + inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids( prev_event_ids_from_query ) + events_to_create = body["events"] + # Figure out which batch to connect to. If they passed in # batch_id_from_query let's use it. The batch ID passed in comes # from the batch_id in the "insertion" event from the previous batch. last_event_in_batch = events_to_create[-1] - batch_id_to_connect_to = batch_id_from_query base_insertion_event = None if batch_id_from_query: + batch_id_to_connect_to = batch_id_from_query # All but the first base insertion event should point at a fake # event, which causes the HS to ask for the state at the start of # the batch later. + fake_prev_event_id = "$" + random_string(43) prev_event_ids = [fake_prev_event_id] - - # Verify the batch_id_from_query corresponds to an actual insertion event - # and have the batch connected. - corresponding_insertion_event_id = ( - await self.store.get_insertion_event_by_batch_id( - room_id, batch_id_from_query - ) - ) - if corresponding_insertion_event_id is None: - raise SynapseError( - HTTPStatus.BAD_REQUEST, - "No insertion event corresponds to the given ?batch_id", - errcode=Codes.INVALID_PARAM, - ) - pass # Otherwise, create an insertion event to act as a starting point. # # We don't always have an insertion event to start hanging more history @@ -327,10 +175,12 @@ class RoomBatchSendEventRestServlet(RestServlet): else: prev_event_ids = prev_event_ids_from_query - base_insertion_event_dict = self._create_insertion_event_dict( - sender=requester.user.to_string(), - room_id=room_id, - origin_server_ts=last_event_in_batch["origin_server_ts"], + base_insertion_event_dict = ( + self.room_batch_handler.create_insertion_event_dict( + sender=requester.user.to_string(), + room_id=room_id, + origin_server_ts=last_event_in_batch["origin_server_ts"], + ) ) base_insertion_event_dict["prev_events"] = prev_event_ids.copy() @@ -338,7 +188,7 @@ class RoomBatchSendEventRestServlet(RestServlet): base_insertion_event, _, ) = await self.event_creation_handler.create_and_send_nonmember_event( - await self._create_requester_for_user_id_from_app_service( + await self.room_batch_handler.create_requester_for_user_id_from_app_service( base_insertion_event_dict["sender"], requester.app_service, ), @@ -353,92 +203,17 @@ class RoomBatchSendEventRestServlet(RestServlet): EventContentFields.MSC2716_NEXT_BATCH_ID ] - # Connect this current batch to the insertion event from the previous batch - batch_event = { - "type": EventTypes.MSC2716_BATCH, - "sender": requester.user.to_string(), - "room_id": room_id, - "content": { - EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to, - EventContentFields.MSC2716_HISTORICAL: True, - }, - # Since the batch event is put at the end of the batch, - # where the newest-in-time event is, copy the origin_server_ts from - # the last event we're inserting - "origin_server_ts": last_event_in_batch["origin_server_ts"], - } - # Add the batch event to the end of the batch (newest-in-time) - events_to_create.append(batch_event) - - # Add an "insertion" event to the start of each batch (next to the oldest-in-time - # event in the batch) so the next batch can be connected to this one. - insertion_event = self._create_insertion_event_dict( - sender=requester.user.to_string(), + # Create and persist all of the historical events as well as insertion + # and batch meta events to make the batch navigable in the DAG. + event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events( + events_to_create=events_to_create, room_id=room_id, - # Since the insertion event is put at the start of the batch, - # where the oldest-in-time event is, copy the origin_server_ts from - # the first event we're inserting - origin_server_ts=events_to_create[0]["origin_server_ts"], + batch_id_to_connect_to=batch_id_to_connect_to, + initial_prev_event_ids=prev_event_ids, + inherited_depth=inherited_depth, + auth_event_ids=auth_event_ids, + app_service_requester=requester, ) - # Prepend the insertion event to the start of the batch (oldest-in-time) - events_to_create = [insertion_event] + events_to_create - - event_ids = [] - events_to_persist = [] - for ev in events_to_create: - assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"]) - - event_dict = { - "type": ev["type"], - "origin_server_ts": ev["origin_server_ts"], - "content": ev["content"], - "room_id": room_id, - "sender": ev["sender"], # requester.user.to_string(), - "prev_events": prev_event_ids.copy(), - } - - # Mark all events as historical - event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True - - event, context = await self.event_creation_handler.create_event( - await self._create_requester_for_user_id_from_app_service( - ev["sender"], requester.app_service - ), - event_dict, - prev_event_ids=event_dict.get("prev_events"), - auth_event_ids=auth_event_ids, - historical=True, - depth=inherited_depth, - ) - logger.debug( - "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s", - event, - prev_event_ids, - auth_event_ids, - ) - - assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( - event.sender, - ) - - events_to_persist.append((event, context)) - event_id = event.event_id - - event_ids.append(event_id) - prev_event_ids = [event_id] - - # Persist events in reverse-chronological order so they have the - # correct stream_ordering as they are backfilled (which decrements). - # Events are sorted by (topological_ordering, stream_ordering) - # where topological_ordering is just depth. - for (event, context) in reversed(events_to_persist): - ev = await self.event_creation_handler.handle_new_client_event( - await self._create_requester_for_user_id_from_app_service( - event["sender"], requester.app_service - ), - event=event, - context=context, - ) insertion_event_id = event_ids[0] batch_event_id = event_ids[-1] @@ -447,9 +222,7 @@ class RoomBatchSendEventRestServlet(RestServlet): response_dict = { "state_event_ids": state_event_ids_at_start, "event_ids": historical_event_ids, - "next_batch_id": insertion_event["content"][ - EventContentFields.MSC2716_NEXT_BATCH_ID - ], + "next_batch_id": next_batch_id, "insertion_event_id": insertion_event_id, "batch_event_id": batch_event_id, } diff --git a/synapse/server.py b/synapse/server.py index 0783df41d4..5bc045d615 100644 --- a/synapse/server.py +++ b/synapse/server.py @@ -97,6 +97,7 @@ from synapse.handlers.room import ( RoomCreationHandler, RoomShutdownHandler, ) +from synapse.handlers.room_batch import RoomBatchHandler from synapse.handlers.room_list import RoomListHandler from synapse.handlers.room_member import RoomMemberHandler, RoomMemberMasterHandler from synapse.handlers.room_member_worker import RoomMemberWorkerHandler @@ -438,6 +439,10 @@ class HomeServer(metaclass=abc.ABCMeta): return RoomCreationHandler(self) @cache_in_self + def get_room_batch_handler(self) -> RoomBatchHandler: + return RoomBatchHandler(self) + + @cache_in_self def get_room_shutdown_handler(self) -> RoomShutdownHandler: return RoomShutdownHandler(self) |