summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/message.py175
-rw-r--r--synapse/handlers/room.py155
-rw-r--r--synapse/state/__init__.py63
3 files changed, 287 insertions, 106 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e07cda133a..062f93bc67 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -63,6 +63,7 @@ from synapse.types import (
     MutableStateMap,
     Requester,
     RoomAlias,
+    StateMap,
     StreamToken,
     UserID,
     create_requester,
@@ -567,9 +568,17 @@ class EventCreationHandler:
         outlier: bool = False,
         historical: bool = False,
         depth: Optional[int] = None,
+        state_map: Optional[StateMap[str]] = None,
+        for_batch: bool = False,
+        current_state_group: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """
-        Given a dict from a client, create a new event.
+        Given a dict from a client, create a new event. If bool for_batch is true, will
+        create an event using the prev_event_ids, and will create an event context for
+        the event using the parameters state_map and current_state_group, thus these parameters
+        must be provided in this case if for_batch is True. The subsequently created event
+        and context are suitable for being batched up and bulk persisted to the database
+        with other similarly created events.
 
         Creates an FrozenEvent object, filling out auth_events, prev_events,
         etc.
@@ -612,16 +621,27 @@ class EventCreationHandler:
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
+
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
 
+            state_map: A state map of previously created events, used only when creating events
+                for batch persisting
+
+            for_batch: whether the event is being created for batch persisting to the db
+
+            current_state_group: the current state group, used only for creating events for
+                batch persisting
+
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
+
         Returns:
             Tuple of created event, Context
         """
@@ -693,6 +713,9 @@ class EventCreationHandler:
             auth_event_ids=auth_event_ids,
             state_event_ids=state_event_ids,
             depth=depth,
+            state_map=state_map,
+            for_batch=for_batch,
+            current_state_group=current_state_group,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -707,10 +730,14 @@ class EventCreationHandler:
             # federation as well as those created locally. As of room v3, aliases events
             # can be created by users that are not in the room, therefore we have to
             # tolerate them in event_auth.check().
-            prev_state_ids = await context.get_prev_state_ids(
-                StateFilter.from_types([(EventTypes.Member, None)])
-            )
-            prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
+            if for_batch:
+                assert state_map is not None
+                prev_event_id = state_map.get((EventTypes.Member, event.sender))
+            else:
+                prev_state_ids = await context.get_prev_state_ids(
+                    StateFilter.from_types([(EventTypes.Member, None)])
+                )
+                prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
             prev_event = (
                 await self.store.get_event(prev_event_id, allow_none=True)
                 if prev_event_id
@@ -1009,8 +1036,16 @@ class EventCreationHandler:
         auth_event_ids: Optional[List[str]] = None,
         state_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
+        state_map: Optional[StateMap[str]] = None,
+        for_batch: bool = False,
+        current_state_group: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
-        """Create a new event for a local client
+        """Create a new event for a local client. If bool for_batch is true, will
+        create an event using the prev_event_ids, and will create an event context for
+        the event using the parameters state_map and current_state_group, thus these parameters
+        must be provided in this case if for_batch is True. The subsequently created event
+        and context are suitable for being batched up and bulk persisted to the database
+        with other similarly created events.
 
         Args:
             builder:
@@ -1043,6 +1078,14 @@ class EventCreationHandler:
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
 
+            state_map: A state map of previously created events, used only when creating events
+                for batch persisting
+
+            for_batch: whether the event is being created for batch persisting to the db
+
+            current_state_group: the current state group, used only for creating events for
+                batch persisting
+
         Returns:
             Tuple of created event, context
         """
@@ -1095,64 +1138,76 @@ class EventCreationHandler:
                 builder.type == EventTypes.Create or prev_event_ids
             ), "Attempting to create a non-m.room.create event with no prev_events"
 
-        event = await builder.build(
-            prev_event_ids=prev_event_ids,
-            auth_event_ids=auth_event_ids,
-            depth=depth,
-        )
+        if for_batch:
+            assert prev_event_ids is not None
+            assert state_map is not None
+            assert current_state_group is not None
+            auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
+            event = await builder.build(
+                prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
+            )
+            context = await self.state.compute_event_context_for_batched(
+                event, state_map, current_state_group
+            )
+        else:
+            event = await builder.build(
+                prev_event_ids=prev_event_ids,
+                auth_event_ids=auth_event_ids,
+                depth=depth,
+            )
 
-        # Pass on the outlier property from the builder to the event
-        # after it is created
-        if builder.internal_metadata.outlier:
-            event.internal_metadata.outlier = True
-            context = EventContext.for_outlier(self._storage_controllers)
-        elif (
-            event.type == EventTypes.MSC2716_INSERTION
-            and state_event_ids
-            and builder.internal_metadata.is_historical()
-        ):
-            # Add explicit state to the insertion event so it has state to derive
-            # from even though it's floating with no `prev_events`. The rest of
-            # the batch can derive from this state and state_group.
-            #
-            # TODO(faster_joins): figure out how this works, and make sure that the
-            #   old state is complete.
-            #   https://github.com/matrix-org/synapse/issues/13003
-            metadata = await self.store.get_metadata_for_events(state_event_ids)
-
-            state_map_for_event: MutableStateMap[str] = {}
-            for state_id in state_event_ids:
-                data = metadata.get(state_id)
-                if data is None:
-                    # We're trying to persist a new historical batch of events
-                    # with the given state, e.g. via
-                    # `RoomBatchSendEventRestServlet`. The state can be inferred
-                    # by Synapse or set directly by the client.
-                    #
-                    # Either way, we should have persisted all the state before
-                    # getting here.
-                    raise Exception(
-                        f"State event {state_id} not found in DB,"
-                        " Synapse should have persisted it before using it."
-                    )
+            # Pass on the outlier property from the builder to the event
+            # after it is created
+            if builder.internal_metadata.outlier:
+                event.internal_metadata.outlier = True
+                context = EventContext.for_outlier(self._storage_controllers)
+            elif (
+                event.type == EventTypes.MSC2716_INSERTION
+                and state_event_ids
+                and builder.internal_metadata.is_historical()
+            ):
+                # Add explicit state to the insertion event so it has state to derive
+                # from even though it's floating with no `prev_events`. The rest of
+                # the batch can derive from this state and state_group.
+                #
+                # TODO(faster_joins): figure out how this works, and make sure that the
+                #   old state is complete.
+                #   https://github.com/matrix-org/synapse/issues/13003
+                metadata = await self.store.get_metadata_for_events(state_event_ids)
+
+                state_map_for_event: MutableStateMap[str] = {}
+                for state_id in state_event_ids:
+                    data = metadata.get(state_id)
+                    if data is None:
+                        # We're trying to persist a new historical batch of events
+                        # with the given state, e.g. via
+                        # `RoomBatchSendEventRestServlet`. The state can be inferred
+                        # by Synapse or set directly by the client.
+                        #
+                        # Either way, we should have persisted all the state before
+                        # getting here.
+                        raise Exception(
+                            f"State event {state_id} not found in DB,"
+                            " Synapse should have persisted it before using it."
+                        )
 
-                if data.state_key is None:
-                    raise Exception(
-                        f"Trying to set non-state event {state_id} as state"
-                    )
+                    if data.state_key is None:
+                        raise Exception(
+                            f"Trying to set non-state event {state_id} as state"
+                        )
 
-                state_map_for_event[(data.event_type, data.state_key)] = state_id
+                    state_map_for_event[(data.event_type, data.state_key)] = state_id
 
-            context = await self.state.compute_event_context(
-                event,
-                state_ids_before_event=state_map_for_event,
-                # TODO(faster_joins): check how MSC2716 works and whether we can have
-                #   partial state here
-                #   https://github.com/matrix-org/synapse/issues/13003
-                partial_state=False,
-            )
-        else:
-            context = await self.state.compute_event_context(event)
+                context = await self.state.compute_event_context(
+                    event,
+                    state_ids_before_event=state_map_for_event,
+                    # TODO(faster_joins): check how MSC2716 works and whether we can have
+                    #   partial state here
+                    #   https://github.com/matrix-org/synapse/issues/13003
+                    partial_state=False,
+                )
+            else:
+                context = await self.state.compute_event_context(event)
 
         if requester:
             context.app_service = requester.app_service
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 33e9a87002..09a1a82e6c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -716,7 +716,7 @@ class RoomCreationHandler:
 
         if (
             self._server_notices_mxid is not None
-            and requester.user.to_string() == self._server_notices_mxid
+            and user_id == self._server_notices_mxid
         ):
             # allow the server notices mxid to create rooms
             is_requester_admin = True
@@ -1042,7 +1042,9 @@ class RoomCreationHandler:
         creator_join_profile: Optional[JsonDict] = None,
         ratelimit: bool = True,
     ) -> Tuple[int, str, int]:
-        """Sends the initial events into a new room.
+        """Sends the initial events into a new room. Sends the room creation, membership,
+        and power level events into the room sequentially, then creates and batches up the
+        rest of the events to persist as a batch to the DB.
 
         `power_level_content_override` doesn't apply when initial state has
         power level state event content.
@@ -1053,13 +1055,21 @@ class RoomCreationHandler:
         """
 
         creator_id = creator.user.to_string()
-
         event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
-
         depth = 1
+        # the last event sent/persisted to the db
         last_sent_event_id: Optional[str] = None
-
-        def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
+        # the most recently created event
+        prev_event: List[str] = []
+        # a map of event types, state keys -> event_ids. We collect these mappings this as events are
+        # created (but not persisted to the db) to determine state for future created events
+        # (as this info can't be pulled from the db)
+        state_map: MutableStateMap[str] = {}
+        # current_state_group of last event created. Used for computing event context of
+        # events to be batched
+        current_state_group = None
+
+        def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
             e = {"type": etype, "content": content}
 
             e.update(event_keys)
@@ -1067,32 +1077,52 @@ class RoomCreationHandler:
 
             return e
 
-        async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
-            nonlocal last_sent_event_id
+        async def create_event(
+            etype: str,
+            content: JsonDict,
+            for_batch: bool,
+            **kwargs: Any,
+        ) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
             nonlocal depth
+            nonlocal prev_event
 
-            event = create(etype, content, **kwargs)
-            logger.debug("Sending %s in new room", etype)
-            # Allow these events to be sent even if the user is shadow-banned to
-            # allow the room creation to complete.
-            (
-                sent_event,
-                last_stream_id,
-            ) = await self.event_creation_handler.create_and_send_nonmember_event(
+            event_dict = create_event_dict(etype, content, **kwargs)
+
+            new_event, new_context = await self.event_creation_handler.create_event(
                 creator,
-                event,
+                event_dict,
+                prev_event_ids=prev_event,
+                depth=depth,
+                state_map=state_map,
+                for_batch=for_batch,
+                current_state_group=current_state_group,
+            )
+            depth += 1
+            prev_event = [new_event.event_id]
+            state_map[(new_event.type, new_event.state_key)] = new_event.event_id
+
+            return new_event, new_context
+
+        async def send(
+            event: EventBase,
+            context: synapse.events.snapshot.EventContext,
+            creator: Requester,
+        ) -> int:
+            nonlocal last_sent_event_id
+
+            ev = await self.event_creation_handler.handle_new_client_event(
+                requester=creator,
+                event=event,
+                context=context,
                 ratelimit=False,
                 ignore_shadow_ban=True,
-                # Note: we don't pass state_event_ids here because this triggers
-                # an additional query per event to look them up from the events table.
-                prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
-                depth=depth,
             )
 
-            last_sent_event_id = sent_event.event_id
-            depth += 1
+            last_sent_event_id = ev.event_id
 
-            return last_stream_id
+            # we know it was persisted, so must have a stream ordering
+            assert ev.internal_metadata.stream_ordering
+            return ev.internal_metadata.stream_ordering
 
         try:
             config = self._presets_dict[preset_config]
@@ -1102,9 +1132,13 @@ class RoomCreationHandler:
             )
 
         creation_content.update({"creator": creator_id})
-        await send(etype=EventTypes.Create, content=creation_content)
+        creation_event, creation_context = await create_event(
+            EventTypes.Create, creation_content, False
+        )
 
         logger.debug("Sending %s in new room", EventTypes.Member)
+        await send(creation_event, creation_context, creator)
+
         # Room create event must exist at this point
         assert last_sent_event_id is not None
         member_event_id, _ = await self.room_member_handler.update_membership(
@@ -1119,14 +1153,22 @@ class RoomCreationHandler:
             depth=depth,
         )
         last_sent_event_id = member_event_id
+        prev_event = [member_event_id]
+
+        # update the depth and state map here as the membership event has been created
+        # through a different code path
+        depth += 1
+        state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
 
         # We treat the power levels override specially as this needs to be one
         # of the first events that get sent into a room.
         pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
         if pl_content is not None:
-            last_sent_stream_id = await send(
-                etype=EventTypes.PowerLevels, content=pl_content
+            power_event, power_context = await create_event(
+                EventTypes.PowerLevels, pl_content, False
             )
+            current_state_group = power_context._state_group
+            last_sent_stream_id = await send(power_event, power_context, creator)
         else:
             power_level_content: JsonDict = {
                 "users": {creator_id: 100},
@@ -1169,47 +1211,68 @@ class RoomCreationHandler:
             # apply those.
             if power_level_content_override:
                 power_level_content.update(power_level_content_override)
-
-            last_sent_stream_id = await send(
-                etype=EventTypes.PowerLevels, content=power_level_content
+            pl_event, pl_context = await create_event(
+                EventTypes.PowerLevels,
+                power_level_content,
+                False,
             )
+            current_state_group = pl_context._state_group
+            last_sent_stream_id = await send(pl_event, pl_context, creator)
 
+        events_to_send = []
         if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
-            last_sent_stream_id = await send(
-                etype=EventTypes.CanonicalAlias,
-                content={"alias": room_alias.to_string()},
+            room_alias_event, room_alias_context = await create_event(
+                EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
             )
+            current_state_group = room_alias_context._state_group
+            events_to_send.append((room_alias_event, room_alias_context))
 
         if (EventTypes.JoinRules, "") not in initial_state:
-            last_sent_stream_id = await send(
-                etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
+            join_rules_event, join_rules_context = await create_event(
+                EventTypes.JoinRules,
+                {"join_rule": config["join_rules"]},
+                True,
             )
+            current_state_group = join_rules_context._state_group
+            events_to_send.append((join_rules_event, join_rules_context))
 
         if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
-            last_sent_stream_id = await send(
-                etype=EventTypes.RoomHistoryVisibility,
-                content={"history_visibility": config["history_visibility"]},
+            visibility_event, visibility_context = await create_event(
+                EventTypes.RoomHistoryVisibility,
+                {"history_visibility": config["history_visibility"]},
+                True,
             )
+            current_state_group = visibility_context._state_group
+            events_to_send.append((visibility_event, visibility_context))
 
         if config["guest_can_join"]:
             if (EventTypes.GuestAccess, "") not in initial_state:
-                last_sent_stream_id = await send(
-                    etype=EventTypes.GuestAccess,
-                    content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
+                guest_access_event, guest_access_context = await create_event(
+                    EventTypes.GuestAccess,
+                    {EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
+                    True,
                 )
+                current_state_group = guest_access_context._state_group
+                events_to_send.append((guest_access_event, guest_access_context))
 
         for (etype, state_key), content in initial_state.items():
-            last_sent_stream_id = await send(
-                etype=etype, state_key=state_key, content=content
+            event, context = await create_event(
+                etype, content, True, state_key=state_key
             )
+            current_state_group = context._state_group
+            events_to_send.append((event, context))
 
         if config["encrypted"]:
-            last_sent_stream_id = await send(
-                etype=EventTypes.RoomEncryption,
+            encryption_event, encryption_context = await create_event(
+                EventTypes.RoomEncryption,
+                {"algorithm": RoomEncryptionAlgorithms.DEFAULT},
+                True,
                 state_key="",
-                content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
             )
+            events_to_send.append((encryption_event, encryption_context))
 
+        for event, context in events_to_send:
+            last_sent_stream_id = await send(event, context, creator)
         return last_sent_stream_id, last_sent_event_id, depth
 
     def _generate_room_id(self) -> str:
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 3787d35b24..6f3dd0463e 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -420,6 +420,69 @@ class StateHandler:
             partial_state=partial_state,
         )
 
+    async def compute_event_context_for_batched(
+        self,
+        event: EventBase,
+        state_ids_before_event: StateMap[str],
+        current_state_group: int,
+    ) -> EventContext:
+        """
+        Generate an event context for an event that has not yet been persisted to the
+        database. Intended for use with events that are created to be persisted in a batch.
+        Args:
+            event: the event the context is being computed for
+            state_ids_before_event: a state map consisting of the state ids of the events
+            created prior to this event.
+            current_state_group: the current state group before the event.
+        """
+        state_group_before_event_prev_group = None
+        deltas_to_state_group_before_event = None
+
+        state_group_before_event = current_state_group
+
+        # if the event is not state, we are set
+        if not event.is_state():
+            return EventContext.with_state(
+                storage=self._storage_controllers,
+                state_group_before_event=state_group_before_event,
+                state_group=state_group_before_event,
+                state_delta_due_to_event={},
+                prev_group=state_group_before_event_prev_group,
+                delta_ids=deltas_to_state_group_before_event,
+                partial_state=False,
+            )
+
+        # otherwise, we'll need to create a new state group for after the event
+        key = (event.type, event.state_key)
+
+        if state_ids_before_event is not None:
+            replaces = state_ids_before_event.get(key)
+
+        if replaces and replaces != event.event_id:
+            event.unsigned["replaces_state"] = replaces
+
+        delta_ids = {key: event.event_id}
+
+        state_group_after_event = (
+            await self._state_storage_controller.store_state_group(
+                event.event_id,
+                event.room_id,
+                prev_group=state_group_before_event,
+                delta_ids=delta_ids,
+                current_state_ids=None,
+            )
+        )
+
+        return EventContext.with_state(
+            storage=self._storage_controllers,
+            state_group=state_group_after_event,
+            state_group_before_event=state_group_before_event,
+            state_delta_due_to_event=delta_ids,
+            prev_group=state_group_before_event,
+            delta_ids=delta_ids,
+            partial_state=False,
+        )
+
     @measure_func()
     async def resolve_state_groups_for_events(
         self, room_id: str, event_ids: Collection[str], await_full_state: bool = True