diff --git a/changelog.d/13487.misc b/changelog.d/13487.misc
new file mode 100644
index 0000000000..761adc8b05
--- /dev/null
+++ b/changelog.d/13487.misc
@@ -0,0 +1 @@
+Speed up creation of DM rooms.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e07cda133a..062f93bc67 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -63,6 +63,7 @@ from synapse.types import (
MutableStateMap,
Requester,
RoomAlias,
+ StateMap,
StreamToken,
UserID,
create_requester,
@@ -567,9 +568,17 @@ class EventCreationHandler:
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
+ state_map: Optional[StateMap[str]] = None,
+ for_batch: bool = False,
+ current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
- Given a dict from a client, create a new event.
+ Given a dict from a client, create a new event. If bool for_batch is true, will
+ create an event using the prev_event_ids, and will create an event context for
+ the event using the parameters state_map and current_state_group, thus these parameters
+ must be provided in this case if for_batch is True. The subsequently created event
+ and context are suitable for being batched up and bulk persisted to the database
+ with other similarly created events.
Creates an FrozenEvent object, filling out auth_events, prev_events,
etc.
@@ -612,16 +621,27 @@ class EventCreationHandler:
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
+
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
+
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
+ state_map: A state map of previously created events, used only when creating events
+ for batch persisting
+
+ for_batch: whether the event is being created for batch persisting to the db
+
+ current_state_group: the current state group, used only for creating events for
+ batch persisting
+
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
+
Returns:
Tuple of created event, Context
"""
@@ -693,6 +713,9 @@ class EventCreationHandler:
auth_event_ids=auth_event_ids,
state_event_ids=state_event_ids,
depth=depth,
+ state_map=state_map,
+ for_batch=for_batch,
+ current_state_group=current_state_group,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -707,10 +730,14 @@ class EventCreationHandler:
# federation as well as those created locally. As of room v3, aliases events
# can be created by users that are not in the room, therefore we have to
# tolerate them in event_auth.check().
- prev_state_ids = await context.get_prev_state_ids(
- StateFilter.from_types([(EventTypes.Member, None)])
- )
- prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
+ if for_batch:
+ assert state_map is not None
+ prev_event_id = state_map.get((EventTypes.Member, event.sender))
+ else:
+ prev_state_ids = await context.get_prev_state_ids(
+ StateFilter.from_types([(EventTypes.Member, None)])
+ )
+ prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
prev_event = (
await self.store.get_event(prev_event_id, allow_none=True)
if prev_event_id
@@ -1009,8 +1036,16 @@ class EventCreationHandler:
auth_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
+ state_map: Optional[StateMap[str]] = None,
+ for_batch: bool = False,
+ current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
- """Create a new event for a local client
+ """Create a new event for a local client. If bool for_batch is true, will
+ create an event using the prev_event_ids, and will create an event context for
+ the event using the parameters state_map and current_state_group, thus these parameters
+ must be provided in this case if for_batch is True. The subsequently created event
+ and context are suitable for being batched up and bulk persisted to the database
+ with other similarly created events.
Args:
builder:
@@ -1043,6 +1078,14 @@ class EventCreationHandler:
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
+ state_map: A state map of previously created events, used only when creating events
+ for batch persisting
+
+ for_batch: whether the event is being created for batch persisting to the db
+
+ current_state_group: the current state group, used only for creating events for
+ batch persisting
+
Returns:
Tuple of created event, context
"""
@@ -1095,64 +1138,76 @@ class EventCreationHandler:
builder.type == EventTypes.Create or prev_event_ids
), "Attempting to create a non-m.room.create event with no prev_events"
- event = await builder.build(
- prev_event_ids=prev_event_ids,
- auth_event_ids=auth_event_ids,
- depth=depth,
- )
+ if for_batch:
+ assert prev_event_ids is not None
+ assert state_map is not None
+ assert current_state_group is not None
+ auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
+ event = await builder.build(
+ prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
+ )
+ context = await self.state.compute_event_context_for_batched(
+ event, state_map, current_state_group
+ )
+ else:
+ event = await builder.build(
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ depth=depth,
+ )
- # Pass on the outlier property from the builder to the event
- # after it is created
- if builder.internal_metadata.outlier:
- event.internal_metadata.outlier = True
- context = EventContext.for_outlier(self._storage_controllers)
- elif (
- event.type == EventTypes.MSC2716_INSERTION
- and state_event_ids
- and builder.internal_metadata.is_historical()
- ):
- # Add explicit state to the insertion event so it has state to derive
- # from even though it's floating with no `prev_events`. The rest of
- # the batch can derive from this state and state_group.
- #
- # TODO(faster_joins): figure out how this works, and make sure that the
- # old state is complete.
- # https://github.com/matrix-org/synapse/issues/13003
- metadata = await self.store.get_metadata_for_events(state_event_ids)
-
- state_map_for_event: MutableStateMap[str] = {}
- for state_id in state_event_ids:
- data = metadata.get(state_id)
- if data is None:
- # We're trying to persist a new historical batch of events
- # with the given state, e.g. via
- # `RoomBatchSendEventRestServlet`. The state can be inferred
- # by Synapse or set directly by the client.
- #
- # Either way, we should have persisted all the state before
- # getting here.
- raise Exception(
- f"State event {state_id} not found in DB,"
- " Synapse should have persisted it before using it."
- )
+ # Pass on the outlier property from the builder to the event
+ # after it is created
+ if builder.internal_metadata.outlier:
+ event.internal_metadata.outlier = True
+ context = EventContext.for_outlier(self._storage_controllers)
+ elif (
+ event.type == EventTypes.MSC2716_INSERTION
+ and state_event_ids
+ and builder.internal_metadata.is_historical()
+ ):
+ # Add explicit state to the insertion event so it has state to derive
+ # from even though it's floating with no `prev_events`. The rest of
+ # the batch can derive from this state and state_group.
+ #
+ # TODO(faster_joins): figure out how this works, and make sure that the
+ # old state is complete.
+ # https://github.com/matrix-org/synapse/issues/13003
+ metadata = await self.store.get_metadata_for_events(state_event_ids)
+
+ state_map_for_event: MutableStateMap[str] = {}
+ for state_id in state_event_ids:
+ data = metadata.get(state_id)
+ if data is None:
+ # We're trying to persist a new historical batch of events
+ # with the given state, e.g. via
+ # `RoomBatchSendEventRestServlet`. The state can be inferred
+ # by Synapse or set directly by the client.
+ #
+ # Either way, we should have persisted all the state before
+ # getting here.
+ raise Exception(
+ f"State event {state_id} not found in DB,"
+ " Synapse should have persisted it before using it."
+ )
- if data.state_key is None:
- raise Exception(
- f"Trying to set non-state event {state_id} as state"
- )
+ if data.state_key is None:
+ raise Exception(
+ f"Trying to set non-state event {state_id} as state"
+ )
- state_map_for_event[(data.event_type, data.state_key)] = state_id
+ state_map_for_event[(data.event_type, data.state_key)] = state_id
- context = await self.state.compute_event_context(
- event,
- state_ids_before_event=state_map_for_event,
- # TODO(faster_joins): check how MSC2716 works and whether we can have
- # partial state here
- # https://github.com/matrix-org/synapse/issues/13003
- partial_state=False,
- )
- else:
- context = await self.state.compute_event_context(event)
+ context = await self.state.compute_event_context(
+ event,
+ state_ids_before_event=state_map_for_event,
+ # TODO(faster_joins): check how MSC2716 works and whether we can have
+ # partial state here
+ # https://github.com/matrix-org/synapse/issues/13003
+ partial_state=False,
+ )
+ else:
+ context = await self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 33e9a87002..09a1a82e6c 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -716,7 +716,7 @@ class RoomCreationHandler:
if (
self._server_notices_mxid is not None
- and requester.user.to_string() == self._server_notices_mxid
+ and user_id == self._server_notices_mxid
):
# allow the server notices mxid to create rooms
is_requester_admin = True
@@ -1042,7 +1042,9 @@ class RoomCreationHandler:
creator_join_profile: Optional[JsonDict] = None,
ratelimit: bool = True,
) -> Tuple[int, str, int]:
- """Sends the initial events into a new room.
+ """Sends the initial events into a new room. Sends the room creation, membership,
+ and power level events into the room sequentially, then creates and batches up the
+ rest of the events to persist as a batch to the DB.
`power_level_content_override` doesn't apply when initial state has
power level state event content.
@@ -1053,13 +1055,21 @@ class RoomCreationHandler:
"""
creator_id = creator.user.to_string()
-
event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
-
depth = 1
+ # the last event sent/persisted to the db
last_sent_event_id: Optional[str] = None
-
- def create(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
+ # the most recently created event
+ prev_event: List[str] = []
+ # a map of event types, state keys -> event_ids. We collect these mappings this as events are
+ # created (but not persisted to the db) to determine state for future created events
+ # (as this info can't be pulled from the db)
+ state_map: MutableStateMap[str] = {}
+ # current_state_group of last event created. Used for computing event context of
+ # events to be batched
+ current_state_group = None
+
+ def create_event_dict(etype: str, content: JsonDict, **kwargs: Any) -> JsonDict:
e = {"type": etype, "content": content}
e.update(event_keys)
@@ -1067,32 +1077,52 @@ class RoomCreationHandler:
return e
- async def send(etype: str, content: JsonDict, **kwargs: Any) -> int:
- nonlocal last_sent_event_id
+ async def create_event(
+ etype: str,
+ content: JsonDict,
+ for_batch: bool,
+ **kwargs: Any,
+ ) -> Tuple[EventBase, synapse.events.snapshot.EventContext]:
nonlocal depth
+ nonlocal prev_event
- event = create(etype, content, **kwargs)
- logger.debug("Sending %s in new room", etype)
- # Allow these events to be sent even if the user is shadow-banned to
- # allow the room creation to complete.
- (
- sent_event,
- last_stream_id,
- ) = await self.event_creation_handler.create_and_send_nonmember_event(
+ event_dict = create_event_dict(etype, content, **kwargs)
+
+ new_event, new_context = await self.event_creation_handler.create_event(
creator,
- event,
+ event_dict,
+ prev_event_ids=prev_event,
+ depth=depth,
+ state_map=state_map,
+ for_batch=for_batch,
+ current_state_group=current_state_group,
+ )
+ depth += 1
+ prev_event = [new_event.event_id]
+ state_map[(new_event.type, new_event.state_key)] = new_event.event_id
+
+ return new_event, new_context
+
+ async def send(
+ event: EventBase,
+ context: synapse.events.snapshot.EventContext,
+ creator: Requester,
+ ) -> int:
+ nonlocal last_sent_event_id
+
+ ev = await self.event_creation_handler.handle_new_client_event(
+ requester=creator,
+ event=event,
+ context=context,
ratelimit=False,
ignore_shadow_ban=True,
- # Note: we don't pass state_event_ids here because this triggers
- # an additional query per event to look them up from the events table.
- prev_event_ids=[last_sent_event_id] if last_sent_event_id else [],
- depth=depth,
)
- last_sent_event_id = sent_event.event_id
- depth += 1
+ last_sent_event_id = ev.event_id
- return last_stream_id
+ # we know it was persisted, so must have a stream ordering
+ assert ev.internal_metadata.stream_ordering
+ return ev.internal_metadata.stream_ordering
try:
config = self._presets_dict[preset_config]
@@ -1102,9 +1132,13 @@ class RoomCreationHandler:
)
creation_content.update({"creator": creator_id})
- await send(etype=EventTypes.Create, content=creation_content)
+ creation_event, creation_context = await create_event(
+ EventTypes.Create, creation_content, False
+ )
logger.debug("Sending %s in new room", EventTypes.Member)
+ await send(creation_event, creation_context, creator)
+
# Room create event must exist at this point
assert last_sent_event_id is not None
member_event_id, _ = await self.room_member_handler.update_membership(
@@ -1119,14 +1153,22 @@ class RoomCreationHandler:
depth=depth,
)
last_sent_event_id = member_event_id
+ prev_event = [member_event_id]
+
+ # update the depth and state map here as the membership event has been created
+ # through a different code path
+ depth += 1
+ state_map[(EventTypes.Member, creator.user.to_string())] = member_event_id
# We treat the power levels override specially as this needs to be one
# of the first events that get sent into a room.
pl_content = initial_state.pop((EventTypes.PowerLevels, ""), None)
if pl_content is not None:
- last_sent_stream_id = await send(
- etype=EventTypes.PowerLevels, content=pl_content
+ power_event, power_context = await create_event(
+ EventTypes.PowerLevels, pl_content, False
)
+ current_state_group = power_context._state_group
+ last_sent_stream_id = await send(power_event, power_context, creator)
else:
power_level_content: JsonDict = {
"users": {creator_id: 100},
@@ -1169,47 +1211,68 @@ class RoomCreationHandler:
# apply those.
if power_level_content_override:
power_level_content.update(power_level_content_override)
-
- last_sent_stream_id = await send(
- etype=EventTypes.PowerLevels, content=power_level_content
+ pl_event, pl_context = await create_event(
+ EventTypes.PowerLevels,
+ power_level_content,
+ False,
)
+ current_state_group = pl_context._state_group
+ last_sent_stream_id = await send(pl_event, pl_context, creator)
+ events_to_send = []
if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.CanonicalAlias,
- content={"alias": room_alias.to_string()},
+ room_alias_event, room_alias_context = await create_event(
+ EventTypes.CanonicalAlias, {"alias": room_alias.to_string()}, True
)
+ current_state_group = room_alias_context._state_group
+ events_to_send.append((room_alias_event, room_alias_context))
if (EventTypes.JoinRules, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.JoinRules, content={"join_rule": config["join_rules"]}
+ join_rules_event, join_rules_context = await create_event(
+ EventTypes.JoinRules,
+ {"join_rule": config["join_rules"]},
+ True,
)
+ current_state_group = join_rules_context._state_group
+ events_to_send.append((join_rules_event, join_rules_context))
if (EventTypes.RoomHistoryVisibility, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.RoomHistoryVisibility,
- content={"history_visibility": config["history_visibility"]},
+ visibility_event, visibility_context = await create_event(
+ EventTypes.RoomHistoryVisibility,
+ {"history_visibility": config["history_visibility"]},
+ True,
)
+ current_state_group = visibility_context._state_group
+ events_to_send.append((visibility_event, visibility_context))
if config["guest_can_join"]:
if (EventTypes.GuestAccess, "") not in initial_state:
- last_sent_stream_id = await send(
- etype=EventTypes.GuestAccess,
- content={EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
+ guest_access_event, guest_access_context = await create_event(
+ EventTypes.GuestAccess,
+ {EventContentFields.GUEST_ACCESS: GuestAccess.CAN_JOIN},
+ True,
)
+ current_state_group = guest_access_context._state_group
+ events_to_send.append((guest_access_event, guest_access_context))
for (etype, state_key), content in initial_state.items():
- last_sent_stream_id = await send(
- etype=etype, state_key=state_key, content=content
+ event, context = await create_event(
+ etype, content, True, state_key=state_key
)
+ current_state_group = context._state_group
+ events_to_send.append((event, context))
if config["encrypted"]:
- last_sent_stream_id = await send(
- etype=EventTypes.RoomEncryption,
+ encryption_event, encryption_context = await create_event(
+ EventTypes.RoomEncryption,
+ {"algorithm": RoomEncryptionAlgorithms.DEFAULT},
+ True,
state_key="",
- content={"algorithm": RoomEncryptionAlgorithms.DEFAULT},
)
+ events_to_send.append((encryption_event, encryption_context))
+ for event, context in events_to_send:
+ last_sent_stream_id = await send(event, context, creator)
return last_sent_stream_id, last_sent_event_id, depth
def _generate_room_id(self) -> str:
diff --git a/synapse/state/__init__.py b/synapse/state/__init__.py
index 3787d35b24..6f3dd0463e 100644
--- a/synapse/state/__init__.py
+++ b/synapse/state/__init__.py
@@ -420,6 +420,69 @@ class StateHandler:
partial_state=partial_state,
)
+ async def compute_event_context_for_batched(
+ self,
+ event: EventBase,
+ state_ids_before_event: StateMap[str],
+ current_state_group: int,
+ ) -> EventContext:
+ """
+ Generate an event context for an event that has not yet been persisted to the
+ database. Intended for use with events that are created to be persisted in a batch.
+ Args:
+ event: the event the context is being computed for
+ state_ids_before_event: a state map consisting of the state ids of the events
+ created prior to this event.
+ current_state_group: the current state group before the event.
+ """
+ state_group_before_event_prev_group = None
+ deltas_to_state_group_before_event = None
+
+ state_group_before_event = current_state_group
+
+ # if the event is not state, we are set
+ if not event.is_state():
+ return EventContext.with_state(
+ storage=self._storage_controllers,
+ state_group_before_event=state_group_before_event,
+ state_group=state_group_before_event,
+ state_delta_due_to_event={},
+ prev_group=state_group_before_event_prev_group,
+ delta_ids=deltas_to_state_group_before_event,
+ partial_state=False,
+ )
+
+ # otherwise, we'll need to create a new state group for after the event
+ key = (event.type, event.state_key)
+
+ if state_ids_before_event is not None:
+ replaces = state_ids_before_event.get(key)
+
+ if replaces and replaces != event.event_id:
+ event.unsigned["replaces_state"] = replaces
+
+ delta_ids = {key: event.event_id}
+
+ state_group_after_event = (
+ await self._state_storage_controller.store_state_group(
+ event.event_id,
+ event.room_id,
+ prev_group=state_group_before_event,
+ delta_ids=delta_ids,
+ current_state_ids=None,
+ )
+ )
+
+ return EventContext.with_state(
+ storage=self._storage_controllers,
+ state_group=state_group_after_event,
+ state_group_before_event=state_group_before_event,
+ state_delta_due_to_event=delta_ids,
+ prev_group=state_group_before_event,
+ delta_ids=delta_ids,
+ partial_state=False,
+ )
+
@measure_func()
async def resolve_state_groups_for_events(
self, room_id: str, event_ids: Collection[str], await_full_state: bool = True
diff --git a/tests/rest/client/test_rooms.py b/tests/rest/client/test_rooms.py
index c7eb88d33f..e281aef779 100644
--- a/tests/rest/client/test_rooms.py
+++ b/tests/rest/client/test_rooms.py
@@ -710,7 +710,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
- self.assertEqual(44, channel.resource_usage.db_txn_count)
+ self.assertEqual(35, channel.resource_usage.db_txn_count)
def test_post_room_initial_state(self) -> None:
# POST with initial_state config key, expect new room id
@@ -723,7 +723,7 @@ class RoomsCreateTestCase(RoomBase):
self.assertEqual(HTTPStatus.OK, channel.code, channel.result)
self.assertTrue("room_id" in channel.json_body)
assert channel.resource_usage is not None
- self.assertEqual(50, channel.resource_usage.db_txn_count)
+ self.assertEqual(38, channel.resource_usage.db_txn_count)
def test_post_room_visibility_key(self) -> None:
# POST with visibility config key, expect new room id
|