summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
authorShay <hillerys@element.io>2022-09-28 02:39:03 -0700
committerGitHub <noreply@github.com>2022-09-28 10:39:03 +0100
commita2cf66a94d5dfd9d6496ac3e48ec9a22f17be69a (patch)
treebe6567b1d3c8e253df717bddf0a543b147fa39e9 /synapse/handlers/message.py
parentFix `have_seen_event` cache not being invalidated (#13863) (diff)
downloadsynapse-a2cf66a94d5dfd9d6496ac3e48ec9a22f17be69a.tar.xz
Prepatory work for batching events to send (#13487)
This PR begins work on batching up events during the creation of a room. The PR splits out the creation and sending/persisting of the events. The first three events in the creation of the room-creating the room, joining the creator to the room, and the power levels event are sent sequentially, while the subsequent events are created and collected to be sent at the end of the function. This is currently done by appending them to a list and then iterating over the list to send, the next step (after this PR) would be to send and persist the collected events as a batch.
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py175
1 files changed, 115 insertions, 60 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index e07cda133a..062f93bc67 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -63,6 +63,7 @@ from synapse.types import (
     MutableStateMap,
     Requester,
     RoomAlias,
+    StateMap,
     StreamToken,
     UserID,
     create_requester,
@@ -567,9 +568,17 @@ class EventCreationHandler:
         outlier: bool = False,
         historical: bool = False,
         depth: Optional[int] = None,
+        state_map: Optional[StateMap[str]] = None,
+        for_batch: bool = False,
+        current_state_group: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
         """
-        Given a dict from a client, create a new event.
+        Given a dict from a client, create a new event. If bool for_batch is true, will
+        create an event using the prev_event_ids, and will create an event context for
+        the event using the parameters state_map and current_state_group, thus these parameters
+        must be provided in this case if for_batch is True. The subsequently created event
+        and context are suitable for being batched up and bulk persisted to the database
+        with other similarly created events.
 
         Creates an FrozenEvent object, filling out auth_events, prev_events,
         etc.
@@ -612,16 +621,27 @@ class EventCreationHandler:
             outlier: Indicates whether the event is an `outlier`, i.e. if
                 it's from an arbitrary point and floating in the DAG as
                 opposed to being inline with the current DAG.
+
             historical: Indicates whether the message is being inserted
                 back in time around some existing events. This is used to skip
                 a few checks and mark the event as backfilled.
+
             depth: Override the depth used to order the event in the DAG.
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
 
+            state_map: A state map of previously created events, used only when creating events
+                for batch persisting
+
+            for_batch: whether the event is being created for batch persisting to the db
+
+            current_state_group: the current state group, used only for creating events for
+                batch persisting
+
         Raises:
             ResourceLimitError if server is blocked to some resource being
             exceeded
+
         Returns:
             Tuple of created event, Context
         """
@@ -693,6 +713,9 @@ class EventCreationHandler:
             auth_event_ids=auth_event_ids,
             state_event_ids=state_event_ids,
             depth=depth,
+            state_map=state_map,
+            for_batch=for_batch,
+            current_state_group=current_state_group,
         )
 
         # In an ideal world we wouldn't need the second part of this condition. However,
@@ -707,10 +730,14 @@ class EventCreationHandler:
             # federation as well as those created locally. As of room v3, aliases events
             # can be created by users that are not in the room, therefore we have to
             # tolerate them in event_auth.check().
-            prev_state_ids = await context.get_prev_state_ids(
-                StateFilter.from_types([(EventTypes.Member, None)])
-            )
-            prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
+            if for_batch:
+                assert state_map is not None
+                prev_event_id = state_map.get((EventTypes.Member, event.sender))
+            else:
+                prev_state_ids = await context.get_prev_state_ids(
+                    StateFilter.from_types([(EventTypes.Member, None)])
+                )
+                prev_event_id = prev_state_ids.get((EventTypes.Member, event.sender))
             prev_event = (
                 await self.store.get_event(prev_event_id, allow_none=True)
                 if prev_event_id
@@ -1009,8 +1036,16 @@ class EventCreationHandler:
         auth_event_ids: Optional[List[str]] = None,
         state_event_ids: Optional[List[str]] = None,
         depth: Optional[int] = None,
+        state_map: Optional[StateMap[str]] = None,
+        for_batch: bool = False,
+        current_state_group: Optional[int] = None,
     ) -> Tuple[EventBase, EventContext]:
-        """Create a new event for a local client
+        """Create a new event for a local client. If bool for_batch is true, will
+        create an event using the prev_event_ids, and will create an event context for
+        the event using the parameters state_map and current_state_group, thus these parameters
+        must be provided in this case if for_batch is True. The subsequently created event
+        and context are suitable for being batched up and bulk persisted to the database
+        with other similarly created events.
 
         Args:
             builder:
@@ -1043,6 +1078,14 @@ class EventCreationHandler:
                 Should normally be set to None, which will cause the depth to be calculated
                 based on the prev_events.
 
+            state_map: A state map of previously created events, used only when creating events
+                for batch persisting
+
+            for_batch: whether the event is being created for batch persisting to the db
+
+            current_state_group: the current state group, used only for creating events for
+                batch persisting
+
         Returns:
             Tuple of created event, context
         """
@@ -1095,64 +1138,76 @@ class EventCreationHandler:
                 builder.type == EventTypes.Create or prev_event_ids
             ), "Attempting to create a non-m.room.create event with no prev_events"
 
-        event = await builder.build(
-            prev_event_ids=prev_event_ids,
-            auth_event_ids=auth_event_ids,
-            depth=depth,
-        )
+        if for_batch:
+            assert prev_event_ids is not None
+            assert state_map is not None
+            assert current_state_group is not None
+            auth_ids = self._event_auth_handler.compute_auth_events(builder, state_map)
+            event = await builder.build(
+                prev_event_ids=prev_event_ids, auth_event_ids=auth_ids, depth=depth
+            )
+            context = await self.state.compute_event_context_for_batched(
+                event, state_map, current_state_group
+            )
+        else:
+            event = await builder.build(
+                prev_event_ids=prev_event_ids,
+                auth_event_ids=auth_event_ids,
+                depth=depth,
+            )
 
-        # Pass on the outlier property from the builder to the event
-        # after it is created
-        if builder.internal_metadata.outlier:
-            event.internal_metadata.outlier = True
-            context = EventContext.for_outlier(self._storage_controllers)
-        elif (
-            event.type == EventTypes.MSC2716_INSERTION
-            and state_event_ids
-            and builder.internal_metadata.is_historical()
-        ):
-            # Add explicit state to the insertion event so it has state to derive
-            # from even though it's floating with no `prev_events`. The rest of
-            # the batch can derive from this state and state_group.
-            #
-            # TODO(faster_joins): figure out how this works, and make sure that the
-            #   old state is complete.
-            #   https://github.com/matrix-org/synapse/issues/13003
-            metadata = await self.store.get_metadata_for_events(state_event_ids)
-
-            state_map_for_event: MutableStateMap[str] = {}
-            for state_id in state_event_ids:
-                data = metadata.get(state_id)
-                if data is None:
-                    # We're trying to persist a new historical batch of events
-                    # with the given state, e.g. via
-                    # `RoomBatchSendEventRestServlet`. The state can be inferred
-                    # by Synapse or set directly by the client.
-                    #
-                    # Either way, we should have persisted all the state before
-                    # getting here.
-                    raise Exception(
-                        f"State event {state_id} not found in DB,"
-                        " Synapse should have persisted it before using it."
-                    )
+            # Pass on the outlier property from the builder to the event
+            # after it is created
+            if builder.internal_metadata.outlier:
+                event.internal_metadata.outlier = True
+                context = EventContext.for_outlier(self._storage_controllers)
+            elif (
+                event.type == EventTypes.MSC2716_INSERTION
+                and state_event_ids
+                and builder.internal_metadata.is_historical()
+            ):
+                # Add explicit state to the insertion event so it has state to derive
+                # from even though it's floating with no `prev_events`. The rest of
+                # the batch can derive from this state and state_group.
+                #
+                # TODO(faster_joins): figure out how this works, and make sure that the
+                #   old state is complete.
+                #   https://github.com/matrix-org/synapse/issues/13003
+                metadata = await self.store.get_metadata_for_events(state_event_ids)
+
+                state_map_for_event: MutableStateMap[str] = {}
+                for state_id in state_event_ids:
+                    data = metadata.get(state_id)
+                    if data is None:
+                        # We're trying to persist a new historical batch of events
+                        # with the given state, e.g. via
+                        # `RoomBatchSendEventRestServlet`. The state can be inferred
+                        # by Synapse or set directly by the client.
+                        #
+                        # Either way, we should have persisted all the state before
+                        # getting here.
+                        raise Exception(
+                            f"State event {state_id} not found in DB,"
+                            " Synapse should have persisted it before using it."
+                        )
 
-                if data.state_key is None:
-                    raise Exception(
-                        f"Trying to set non-state event {state_id} as state"
-                    )
+                    if data.state_key is None:
+                        raise Exception(
+                            f"Trying to set non-state event {state_id} as state"
+                        )
 
-                state_map_for_event[(data.event_type, data.state_key)] = state_id
+                    state_map_for_event[(data.event_type, data.state_key)] = state_id
 
-            context = await self.state.compute_event_context(
-                event,
-                state_ids_before_event=state_map_for_event,
-                # TODO(faster_joins): check how MSC2716 works and whether we can have
-                #   partial state here
-                #   https://github.com/matrix-org/synapse/issues/13003
-                partial_state=False,
-            )
-        else:
-            context = await self.state.compute_event_context(event)
+                context = await self.state.compute_event_context(
+                    event,
+                    state_ids_before_event=state_map_for_event,
+                    # TODO(faster_joins): check how MSC2716 works and whether we can have
+                    #   partial state here
+                    #   https://github.com/matrix-org/synapse/issues/13003
+                    partial_state=False,
+                )
+            else:
+                context = await self.state.compute_event_context(event)
 
         if requester:
             context.app_service = requester.app_service