summary refs log tree commit diff
path: root/synapse/rest/client/room_batch.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/rest/client/room_batch.py')
-rw-r--r--synapse/rest/client/room_batch.py339
1 files changed, 56 insertions, 283 deletions
diff --git a/synapse/rest/client/room_batch.py b/synapse/rest/client/room_batch.py
index 1dffcc3147..38ad4c2447 100644
--- a/synapse/rest/client/room_batch.py
+++ b/synapse/rest/client/room_batch.py
@@ -15,13 +15,12 @@
 import logging
 import re
 from http import HTTPStatus
-from typing import TYPE_CHECKING, Awaitable, List, Tuple
+from typing import TYPE_CHECKING, Awaitable, Tuple
 
 from twisted.web.server import Request
 
-from synapse.api.constants import EventContentFields, EventTypes
+from synapse.api.constants import EventContentFields
 from synapse.api.errors import AuthError, Codes, SynapseError
-from synapse.appservice import ApplicationService
 from synapse.http.server import HttpServer
 from synapse.http.servlet import (
     RestServlet,
@@ -32,7 +31,7 @@ from synapse.http.servlet import (
 )
 from synapse.http.site import SynapseRequest
 from synapse.rest.client.transactions import HttpTransactionCache
-from synapse.types import JsonDict, Requester, UserID, create_requester
+from synapse.types import JsonDict
 from synapse.util.stringutils import random_string
 
 if TYPE_CHECKING:
@@ -77,102 +76,12 @@ class RoomBatchSendEventRestServlet(RestServlet):
 
     def __init__(self, hs: "HomeServer"):
         super().__init__()
-        self.hs = hs
         self.store = hs.get_datastore()
-        self.state_store = hs.get_storage().state
         self.event_creation_handler = hs.get_event_creation_handler()
-        self.room_member_handler = hs.get_room_member_handler()
         self.auth = hs.get_auth()
+        self.room_batch_handler = hs.get_room_batch_handler()
         self.txns = HttpTransactionCache(hs)
 
-    async def _inherit_depth_from_prev_ids(self, prev_event_ids: List[str]) -> int:
-        (
-            most_recent_prev_event_id,
-            most_recent_prev_event_depth,
-        ) = await self.store.get_max_depth_of(prev_event_ids)
-
-        # We want to insert the historical event after the `prev_event` but before the successor event
-        #
-        # We inherit depth from the successor event instead of the `prev_event`
-        # because events returned from `/messages` are first sorted by `topological_ordering`
-        # which is just the `depth` and then tie-break with `stream_ordering`.
-        #
-        # We mark these inserted historical events as "backfilled" which gives them a
-        # negative `stream_ordering`. If we use the same depth as the `prev_event`,
-        # then our historical event will tie-break and be sorted before the `prev_event`
-        # when it should come after.
-        #
-        # We want to use the successor event depth so they appear after `prev_event` because
-        # it has a larger `depth` but before the successor event because the `stream_ordering`
-        # is negative before the successor event.
-        successor_event_ids = await self.store.get_successor_events(
-            [most_recent_prev_event_id]
-        )
-
-        # If we can't find any successor events, then it's a forward extremity of
-        # historical messages and we can just inherit from the previous historical
-        # event which we can already assume has the correct depth where we want
-        # to insert into.
-        if not successor_event_ids:
-            depth = most_recent_prev_event_depth
-        else:
-            (
-                _,
-                oldest_successor_depth,
-            ) = await self.store.get_min_depth_of(successor_event_ids)
-
-            depth = oldest_successor_depth
-
-        return depth
-
-    def _create_insertion_event_dict(
-        self, sender: str, room_id: str, origin_server_ts: int
-    ) -> JsonDict:
-        """Creates an event dict for an "insertion" event with the proper fields
-        and a random batch ID.
-
-        Args:
-            sender: The event author MXID
-            room_id: The room ID that the event belongs to
-            origin_server_ts: Timestamp when the event was sent
-
-        Returns:
-            The new event dictionary to insert.
-        """
-
-        next_batch_id = random_string(8)
-        insertion_event = {
-            "type": EventTypes.MSC2716_INSERTION,
-            "sender": sender,
-            "room_id": room_id,
-            "content": {
-                EventContentFields.MSC2716_NEXT_BATCH_ID: next_batch_id,
-                EventContentFields.MSC2716_HISTORICAL: True,
-            },
-            "origin_server_ts": origin_server_ts,
-        }
-
-        return insertion_event
-
-    async def _create_requester_for_user_id_from_app_service(
-        self, user_id: str, app_service: ApplicationService
-    ) -> Requester:
-        """Creates a new requester for the given user_id
-        and validates that the app service is allowed to control
-        the given user.
-
-        Args:
-            user_id: The author MXID that the app service is controlling
-            app_service: The app service that controls the user
-
-        Returns:
-            Requester object
-        """
-
-        await self.auth.validate_appservice_can_control_user_id(app_service, user_id)
-
-        return create_requester(user_id, app_service=app_service)
-
     async def on_POST(
         self, request: SynapseRequest, room_id: str
     ) -> Tuple[int, JsonDict]:
@@ -200,123 +109,62 @@ class RoomBatchSendEventRestServlet(RestServlet):
                 errcode=Codes.MISSING_PARAM,
             )
 
+        # Verify the batch_id_from_query corresponds to an actual insertion event
+        # and have the batch connected.
+        if batch_id_from_query:
+            corresponding_insertion_event_id = (
+                await self.store.get_insertion_event_by_batch_id(
+                    room_id, batch_id_from_query
+                )
+            )
+            if corresponding_insertion_event_id is None:
+                raise SynapseError(
+                    HTTPStatus.BAD_REQUEST,
+                    "No insertion event corresponds to the given ?batch_id",
+                    errcode=Codes.INVALID_PARAM,
+                )
+
         # For the event we are inserting next to (`prev_event_ids_from_query`),
         # find the most recent auth events (derived from state events) that
         # allowed that message to be sent. We will use that as a base
         # to auth our historical messages against.
-        (
-            most_recent_prev_event_id,
-            _,
-        ) = await self.store.get_max_depth_of(prev_event_ids_from_query)
-        # mapping from (type, state_key) -> state_event_id
-        prev_state_map = await self.state_store.get_state_ids_for_event(
-            most_recent_prev_event_id
+        auth_event_ids = await self.room_batch_handler.get_most_recent_auth_event_ids_from_event_id_list(
+            prev_event_ids_from_query
         )
-        # List of state event ID's
-        prev_state_ids = list(prev_state_map.values())
-        auth_event_ids = prev_state_ids
-
-        state_event_ids_at_start = []
-        for state_event in body["state_events_at_start"]:
-            assert_params_in_dict(
-                state_event, ["type", "origin_server_ts", "content", "sender"]
-            )
 
-            logger.debug(
-                "RoomBatchSendEventRestServlet inserting state_event=%s, auth_event_ids=%s",
-                state_event,
-                auth_event_ids,
+        # Create and persist all of the state events that float off on their own
+        # before the batch. These will most likely be all of the invite/member
+        # state events used to auth the upcoming historical messages.
+        state_event_ids_at_start = (
+            await self.room_batch_handler.persist_state_events_at_start(
+                state_events_at_start=body["state_events_at_start"],
+                room_id=room_id,
+                initial_auth_event_ids=auth_event_ids,
+                app_service_requester=requester,
             )
+        )
+        # Update our ongoing auth event ID list with all of the new state we
+        # just created
+        auth_event_ids.extend(state_event_ids_at_start)
 
-            event_dict = {
-                "type": state_event["type"],
-                "origin_server_ts": state_event["origin_server_ts"],
-                "content": state_event["content"],
-                "room_id": room_id,
-                "sender": state_event["sender"],
-                "state_key": state_event["state_key"],
-            }
-
-            # Mark all events as historical
-            event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
-
-            # Make the state events float off on their own
-            fake_prev_event_id = "$" + random_string(43)
-
-            # TODO: This is pretty much the same as some other code to handle inserting state in this file
-            if event_dict["type"] == EventTypes.Member:
-                membership = event_dict["content"].get("membership", None)
-                event_id, _ = await self.room_member_handler.update_membership(
-                    await self._create_requester_for_user_id_from_app_service(
-                        state_event["sender"], requester.app_service
-                    ),
-                    target=UserID.from_string(event_dict["state_key"]),
-                    room_id=room_id,
-                    action=membership,
-                    content=event_dict["content"],
-                    outlier=True,
-                    prev_event_ids=[fake_prev_event_id],
-                    # Make sure to use a copy of this list because we modify it
-                    # later in the loop here. Otherwise it will be the same
-                    # reference and also update in the event when we append later.
-                    auth_event_ids=auth_event_ids.copy(),
-                )
-            else:
-                # TODO: Add some complement tests that adds state that is not member joins
-                # and will use this code path. Maybe we only want to support join state events
-                # and can get rid of this `else`?
-                (
-                    event,
-                    _,
-                ) = await self.event_creation_handler.create_and_send_nonmember_event(
-                    await self._create_requester_for_user_id_from_app_service(
-                        state_event["sender"], requester.app_service
-                    ),
-                    event_dict,
-                    outlier=True,
-                    prev_event_ids=[fake_prev_event_id],
-                    # Make sure to use a copy of this list because we modify it
-                    # later in the loop here. Otherwise it will be the same
-                    # reference and also update in the event when we append later.
-                    auth_event_ids=auth_event_ids.copy(),
-                )
-                event_id = event.event_id
-
-            state_event_ids_at_start.append(event_id)
-            auth_event_ids.append(event_id)
-
-        events_to_create = body["events"]
-
-        inherited_depth = await self._inherit_depth_from_prev_ids(
+        inherited_depth = await self.room_batch_handler.inherit_depth_from_prev_ids(
             prev_event_ids_from_query
         )
 
+        events_to_create = body["events"]
+
         # Figure out which batch to connect to. If they passed in
         # batch_id_from_query let's use it. The batch ID passed in comes
         # from the batch_id in the "insertion" event from the previous batch.
         last_event_in_batch = events_to_create[-1]
-        batch_id_to_connect_to = batch_id_from_query
         base_insertion_event = None
         if batch_id_from_query:
+            batch_id_to_connect_to = batch_id_from_query
             #  All but the first base insertion event should point at a fake
             #  event, which causes the HS to ask for the state at the start of
             #  the batch later.
+            fake_prev_event_id = "$" + random_string(43)
             prev_event_ids = [fake_prev_event_id]
-
-            # Verify the batch_id_from_query corresponds to an actual insertion event
-            # and have the batch connected.
-            corresponding_insertion_event_id = (
-                await self.store.get_insertion_event_by_batch_id(
-                    room_id, batch_id_from_query
-                )
-            )
-            if corresponding_insertion_event_id is None:
-                raise SynapseError(
-                    HTTPStatus.BAD_REQUEST,
-                    "No insertion event corresponds to the given ?batch_id",
-                    errcode=Codes.INVALID_PARAM,
-                )
-            pass
         # Otherwise, create an insertion event to act as a starting point.
         #
         # We don't always have an insertion event to start hanging more history
@@ -327,10 +175,12 @@ class RoomBatchSendEventRestServlet(RestServlet):
         else:
             prev_event_ids = prev_event_ids_from_query
 
-            base_insertion_event_dict = self._create_insertion_event_dict(
-                sender=requester.user.to_string(),
-                room_id=room_id,
-                origin_server_ts=last_event_in_batch["origin_server_ts"],
+            base_insertion_event_dict = (
+                self.room_batch_handler.create_insertion_event_dict(
+                    sender=requester.user.to_string(),
+                    room_id=room_id,
+                    origin_server_ts=last_event_in_batch["origin_server_ts"],
+                )
             )
             base_insertion_event_dict["prev_events"] = prev_event_ids.copy()
 
@@ -338,7 +188,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
                 base_insertion_event,
                 _,
             ) = await self.event_creation_handler.create_and_send_nonmember_event(
-                await self._create_requester_for_user_id_from_app_service(
+                await self.room_batch_handler.create_requester_for_user_id_from_app_service(
                     base_insertion_event_dict["sender"],
                     requester.app_service,
                 ),
@@ -353,92 +203,17 @@ class RoomBatchSendEventRestServlet(RestServlet):
                 EventContentFields.MSC2716_NEXT_BATCH_ID
             ]
 
-        # Connect this current batch to the insertion event from the previous batch
-        batch_event = {
-            "type": EventTypes.MSC2716_BATCH,
-            "sender": requester.user.to_string(),
-            "room_id": room_id,
-            "content": {
-                EventContentFields.MSC2716_BATCH_ID: batch_id_to_connect_to,
-                EventContentFields.MSC2716_HISTORICAL: True,
-            },
-            # Since the batch event is put at the end of the batch,
-            # where the newest-in-time event is, copy the origin_server_ts from
-            # the last event we're inserting
-            "origin_server_ts": last_event_in_batch["origin_server_ts"],
-        }
-        # Add the batch event to the end of the batch (newest-in-time)
-        events_to_create.append(batch_event)
-
-        # Add an "insertion" event to the start of each batch (next to the oldest-in-time
-        # event in the batch) so the next batch can be connected to this one.
-        insertion_event = self._create_insertion_event_dict(
-            sender=requester.user.to_string(),
+        # Create and persist all of the historical events as well as insertion
+        # and batch meta events to make the batch navigable in the DAG.
+        event_ids, next_batch_id = await self.room_batch_handler.handle_batch_of_events(
+            events_to_create=events_to_create,
             room_id=room_id,
-            # Since the insertion event is put at the start of the batch,
-            # where the oldest-in-time event is, copy the origin_server_ts from
-            # the first event we're inserting
-            origin_server_ts=events_to_create[0]["origin_server_ts"],
+            batch_id_to_connect_to=batch_id_to_connect_to,
+            initial_prev_event_ids=prev_event_ids,
+            inherited_depth=inherited_depth,
+            auth_event_ids=auth_event_ids,
+            app_service_requester=requester,
         )
-        # Prepend the insertion event to the start of the batch (oldest-in-time)
-        events_to_create = [insertion_event] + events_to_create
-
-        event_ids = []
-        events_to_persist = []
-        for ev in events_to_create:
-            assert_params_in_dict(ev, ["type", "origin_server_ts", "content", "sender"])
-
-            event_dict = {
-                "type": ev["type"],
-                "origin_server_ts": ev["origin_server_ts"],
-                "content": ev["content"],
-                "room_id": room_id,
-                "sender": ev["sender"],  # requester.user.to_string(),
-                "prev_events": prev_event_ids.copy(),
-            }
-
-            # Mark all events as historical
-            event_dict["content"][EventContentFields.MSC2716_HISTORICAL] = True
-
-            event, context = await self.event_creation_handler.create_event(
-                await self._create_requester_for_user_id_from_app_service(
-                    ev["sender"], requester.app_service
-                ),
-                event_dict,
-                prev_event_ids=event_dict.get("prev_events"),
-                auth_event_ids=auth_event_ids,
-                historical=True,
-                depth=inherited_depth,
-            )
-            logger.debug(
-                "RoomBatchSendEventRestServlet inserting event=%s, prev_event_ids=%s, auth_event_ids=%s",
-                event,
-                prev_event_ids,
-                auth_event_ids,
-            )
-
-            assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
-                event.sender,
-            )
-
-            events_to_persist.append((event, context))
-            event_id = event.event_id
-
-            event_ids.append(event_id)
-            prev_event_ids = [event_id]
-
-        # Persist events in reverse-chronological order so they have the
-        # correct stream_ordering as they are backfilled (which decrements).
-        # Events are sorted by (topological_ordering, stream_ordering)
-        # where topological_ordering is just depth.
-        for (event, context) in reversed(events_to_persist):
-            ev = await self.event_creation_handler.handle_new_client_event(
-                await self._create_requester_for_user_id_from_app_service(
-                    event["sender"], requester.app_service
-                ),
-                event=event,
-                context=context,
-            )
 
         insertion_event_id = event_ids[0]
         batch_event_id = event_ids[-1]
@@ -447,9 +222,7 @@ class RoomBatchSendEventRestServlet(RestServlet):
         response_dict = {
             "state_event_ids": state_event_ids_at_start,
             "event_ids": historical_event_ids,
-            "next_batch_id": insertion_event["content"][
-                EventContentFields.MSC2716_NEXT_BATCH_ID
-            ],
+            "next_batch_id": next_batch_id,
             "insertion_event_id": insertion_event_id,
             "batch_event_id": batch_event_id,
         }