diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e07cda133a..062f93bc67 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -63,6 +63,7 @@ from synapse.types import (
MutableStateMap,
Requester,
RoomAlias,
+ StateMap,
StreamToken,
UserID,
create_requester,
@@ -567,9 +568,17 @@ class EventCreationHandler:
outlier: bool = False,
historical: bool = False,
depth: Optional[int] = None,
+ state_map: Optional[StateMap[str]] = None,
+ for_batch: bool = False,
+ current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
"""
- Given a dict from a client, create a new event.
+ Given a dict from a client, create a new event. If bool for_batch is true, will
+ create an event using the prev_event_ids, and will create an event context for
+ the event using the parameters state_map and current_state_group, thus these parameters
+ must be provided in this case if for_batch is True. The subsequently created event
+ and context are suitable for being batched up and bulk persisted to the database
+ with other similarly created events.
Creates an FrozenEvent object, filling out auth_events, prev_events,
etc.
@@ -612,16 +621,27 @@ class EventCreationHandler:
outlier: Indicates whether the event is an `outlier`, i.e. if
it's from an arbitrary point and floating in the DAG as
opposed to being inline with the current DAG.
+
historical: Indicates whether the message is being inserted
back in time around some existing events. This is used to skip
a few checks and mark the event as backfilled.
+
depth: Override the depth used to order the event in the DAG.
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
+ state_map: A state map of previously created events, used only when creating events
+ for batch persisting
+
+ for_batch: whether the event is being created for batch persisting to the db
+
+ current_state_group: the current state group, used only for creating events for
+ batch persisting
+
Raises:
ResourceLimitError if server is blocked to some resource being
exceeded
+
Returns:
Tuple of created event, Context
"""
@@ -693,6 +713,9 @@ class EventCreationHandler:
auth_event_ids=auth_event_ids,
state_event_ids=state_event_ids,
depth=depth,
+ state_map=state_map,
+ for_batch=for_batch,
+ current_state_group=current_state_group,
)
# In an ideal world we wouldn't need the second part of this condition. However,
@@ -707,10 +730,14 @@ class EventCreationHandler:
# federation as well as those created locally. As of room v3, aliases events
# can be created by users that are not in the room, therefore we have to
# tolerate them in event_auth.check().
- prev_state_ids = await context.get_prev_state_ids(
- StateFilter.from_types([(EventTypes.Member, None)])
- )
- prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
+ if for_batch:
+ assert state_map is not None
+ prev_event_id = state_map.get((EventTypes.Member, event.sender))
+ else:
+ prev_state_ids = await context.get_prev_state_ids(
+ StateFilter.from_types([(EventTypes.Member, None)])
+ )
+ prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
prev_event = (
await self.store.get_event(prev_event_id, allow_none=True)
if prev_event_id
@@ -1009,8 +1036,16 @@ class EventCreationHandler:
auth_event_ids: Optional[List[str]] = None,
state_event_ids: Optional[List[str]] = None,
depth: Optional[int] = None,
+ state_map: Optional[StateMap[str]] = None,
+ for_batch: bool = False,
+ current_state_group: Optional[int] = None,
) -> Tuple[EventBase, EventContext]:
- """Create a new event for a local client
+ """Create a new event for a local client. If bool for_batch is true, will
+ create an event using the prev_event_ids, and will create an event context for
+ the event using the parameters state_map and current_state_group, thus these parameters
+ must be provided in this case if for_batch is True. The subsequently created event
+ and context are suitable for being batched up and bulk persisted to the database
+ with other similarly created events.
Args:
builder:
@@ -1043,6 +1078,14 @@ class EventCreationHandler:
Should normally be set to None, which will cause the depth to be calculated
based on the prev_events.
+ state_map: A state map of previously created events, used only when creating events
+ for batch persisting
+
+ for_batch: whether the event is being created for batch persisting to the db
+
+ current_state_group: the current state group, used only for creating events for
+ batch persisting
+
Returns:
Tuple of created event, context
"""
@@ -1095,64 +1138,76 @@ class EventCreationHandler:
builder.type == EventTypes.Create or prev_event_ids
), "Attempting to create a non-m.room.create event with no prev_events"
- event = await builder.build(
- prev_event_ids=prev_event_ids,
- auth_event_ids=auth_event_ids,
- depth=depth,
- )
+ if for_batch:
+ assert prev_event_ids is not None
+ assert state_map is not None
+ assert current_state_group is not None
+ auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
+ event = await builder.build(
+ prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
+ )
+ context = await self.state.compute_event_context_for_batched(
+ event, state_map, current_state_group
+ )
+ else:
+ event = await builder.build(
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ depth=depth,
+ )
- # Pass on the outlier property from the builder to the event
- # after it is created
- if builder.internal_metadata.outlier:
- event.internal_metadata.outlier = True
- context = EventContext.for_outlier(self._storage_controllers)
- elif (
- event.type == EventTypes.MSC2716_INSERTION
- and state_event_ids
- and builder.internal_metadata.is_historical()
- ):
- # Add explicit state to the insertion event so it has state to derive
- # from even though it's floating with no `prev_events`. The rest of
- # the batch can derive from this state and state_group.
- #
- # TODO(faster_joins): figure out how this works, and make sure that the
- # old state is complete.
- # https://github.com/matrix-org/synapse/issues/13003
- metadata = await self.store.get_metadata_for_events(state_event_ids)
-
- state_map_for_event: MutableStateMap[str] = {}
- for state_id in state_event_ids:
- data = metadata.get(state_id)
- if data is None:
- # We're trying to persist a new historical batch of events
- # with the given state, e.g. via
- # `RoomBatchSendEventRestServlet`. The state can be inferred
- # by Synapse or set directly by the client.
- #
- # Either way, we should have persisted all the state before
- # getting here.
- raise Exception(
- f"State event {state_id} not found in DB,"
- " Synapse should have persisted it before using it."
- )
+ # Pass on the outlier property from the builder to the event
+ # after it is created
+ if builder.internal_metadata.outlier:
+ event.internal_metadata.outlier = True
+ context = EventContext.for_outlier(self._storage_controllers)
+ elif (
+ event.type == EventTypes.MSC2716_INSERTION
+ and state_event_ids
+ and builder.internal_metadata.is_historical()
+ ):
+ # Add explicit state to the insertion event so it has state to derive
+ # from even though it's floating with no `prev_events`. The rest of
+ # the batch can derive from this state and state_group.
+ #
+ # TODO(faster_joins): figure out how this works, and make sure that the
+ # old state is complete.
+ # https://github.com/matrix-org/synapse/issues/13003
+ metadata = await self.store.get_metadata_for_events(state_event_ids)
+
+ state_map_for_event: MutableStateMap[str] = {}
+ for state_id in state_event_ids:
+ data = metadata.get(state_id)
+ if data is None:
+ # We're trying to persist a new historical batch of events
+ # with the given state, e.g. via
+ # `RoomBatchSendEventRestServlet`. The state can be inferred
+ # by Synapse or set directly by the client.
+ #
+ # Either way, we should have persisted all the state before
+ # getting here.
+ raise Exception(
+ f"State event {state_id} not found in DB,"
+ " Synapse should have persisted it before using it."
+ )
- if data.state_key is None:
- raise Exception(
- f"Trying to set non-state event {state_id} as state"
- )
+ if data.state_key is None:
+ raise Exception(
+ f"Trying to set non-state event {state_id} as state"
+ )
- state_map_for_event[(data.event_type, data.state_key)] = state_id
+ state_map_for_event[(data.event_type, data.state_key)] = state_id
- context = await self.state.compute_event_context(
- event,
- state_ids_before_event=state_map_for_event,
- # TODO(faster_joins): check how MSC2716 works and whether we can have
- # partial state here
- # https://github.com/matrix-org/synapse/issues/13003
- partial_state=False,
- )
- else:
- context = await self.state.compute_event_context(event)
+ context = await self.state.compute_event_context(
+ event,
+ state_ids_before_event=state_map_for_event,
+ # TODO(faster_joins): check how MSC2716 works and whether we can have
+ # partial state here
+ # https://github.com/matrix-org/synapse/issues/13003
+ partial_state=False,
+ )
+ else:
+ context = await self.state.compute_event_context(event)
if requester:
context.app_service = requester.app_service
|