summary refs log tree commit diff
diff options
context:
space:
mode:
authorShay <hillerys@element.io>2022-09-28 03:11:48 -0700
committerGitHub <noreply@github.com>2022-09-28 10:11:48 +0000
commit8ab16a92edd675453c78cfd9974081e374b0f998 (patch)
tree57cb2dae52dbb6e3e09a18e1fb7e6038afcc4d20
parentPrepatory work for batching events to send (#13487) (diff)
downloadsynapse-8ab16a92edd675453c78cfd9974081e374b0f998.tar.xz
Persist CreateRoom events to DB in a batch (#13800)
-rw-r--r--changelog.d/13800.misc1
-rw-r--r--synapse/handlers/message.py663
-rw-r--r--synapse/handlers/room.py21
-rw-r--r--synapse/handlers/room_batch.py3
-rw-r--r--synapse/handlers/room_member.py11
-rw-r--r--synapse/replication/http/__init__.py2
-rw-r--r--synapse/replication/http/send_event.py4
-rw-r--r--synapse/replication/http/send_events.py171
-rw-r--r--tests/handlers/test_message.py10
-rw-r--r--tests/handlers/test_register.py4
-rw-r--r--tests/storage/test_event_chain.py8
-rw-r--r--tests/unittest.py4
12 files changed, 563 insertions, 339 deletions
diff --git a/changelog.d/13800.misc b/changelog.d/13800.misc
new file mode 100644
index 0000000000..761adc8b05
--- /dev/null
+++ b/changelog.d/13800.misc
@@ -0,0 +1 @@
+Speed up creation of DM rooms.
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 062f93bc67..00e7645ba5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -56,11 +56,13 @@ from synapse.logging import opentracing
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
 from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import (
     MutableStateMap,
+    PersistedEventPosition,
     Requester,
     RoomAlias,
     StateMap,
@@ -493,6 +495,7 @@ class EventCreationHandler:
             self.membership_types_to_include_profile_data_in.add(Membership.INVITE)
 
         self.send_event = ReplicationSendEventRestServlet.make_client(hs)
+        self.send_events = ReplicationSendEventsRestServlet.make_client(hs)
 
         self.request_ratelimiter = hs.get_request_ratelimiter()
 
@@ -1016,8 +1019,7 @@ class EventCreationHandler:
 
             ev = await self.handle_new_client_event(
                 requester=requester,
-                event=event,
-                context=context,
+                events_and_context=[(event, context)],
                 ratelimit=ratelimit,
                 ignore_shadow_ban=ignore_shadow_ban,
             )
@@ -1293,13 +1295,13 @@ class EventCreationHandler:
     async def handle_new_client_event(
         self,
         requester: Requester,
-        event: EventBase,
-        context: EventContext,
+        events_and_context: List[Tuple[EventBase, EventContext]],
         ratelimit: bool = True,
         extra_users: Optional[List[UserID]] = None,
         ignore_shadow_ban: bool = False,
     ) -> EventBase:
-        """Processes a new event.
+        """Processes new events. Please note that if batch persisting events, an error in
+        handling any one of these events will result in all of the events being dropped.
 
         This includes deduplicating, checking auth, persisting,
         notifying users, sending to remote servers, etc.
@@ -1309,8 +1311,7 @@ class EventCreationHandler:
 
         Args:
             requester
-            event
-            context
+            events_and_context: A list of one or more tuples of event, context to be persisted
             ratelimit
             extra_users: Any extra users to notify about event
 
@@ -1328,62 +1329,63 @@ class EventCreationHandler:
         """
         extra_users = extra_users or []
 
-        # we don't apply shadow-banning to membership events here. Invites are blocked
-        # higher up the stack, and we allow shadow-banned users to send join and leave
-        # events as normal.
-        if (
-            event.type != EventTypes.Member
-            and not ignore_shadow_ban
-            and requester.shadow_banned
-        ):
-            # We randomly sleep a bit just to annoy the requester.
-            await self.clock.sleep(random.randint(1, 10))
-            raise ShadowBanError()
+        for event, context in events_and_context:
+            # we don't apply shadow-banning to membership events here. Invites are blocked
+            # higher up the stack, and we allow shadow-banned users to send join and leave
+            # events as normal.
+            if (
+                event.type != EventTypes.Member
+                and not ignore_shadow_ban
+                and requester.shadow_banned
+            ):
+                # We randomly sleep a bit just to annoy the requester.
+                await self.clock.sleep(random.randint(1, 10))
+                raise ShadowBanError()
 
-        if event.is_state():
-            prev_event = await self.deduplicate_state_event(event, context)
-            if prev_event is not None:
-                logger.info(
-                    "Not bothering to persist state event %s duplicated by %s",
-                    event.event_id,
-                    prev_event.event_id,
-                )
-                return prev_event
+            if event.is_state():
+                prev_event = await self.deduplicate_state_event(event, context)
+                if prev_event is not None:
+                    logger.info(
+                        "Not bothering to persist state event %s duplicated by %s",
+                        event.event_id,
+                        prev_event.event_id,
+                    )
+                    return prev_event
 
-        if event.internal_metadata.is_out_of_band_membership():
-            # the only sort of out-of-band-membership events we expect to see here are
-            # invite rejections and rescinded knocks that we have generated ourselves.
-            assert event.type == EventTypes.Member
-            assert event.content["membership"] == Membership.LEAVE
-        else:
-            try:
-                validate_event_for_room_version(event)
-                await self._event_auth_handler.check_auth_rules_from_context(
-                    event, context
-                )
-            except AuthError as err:
-                logger.warning("Denying new event %r because %s", event, err)
-                raise err
+            if event.internal_metadata.is_out_of_band_membership():
+                # the only sort of out-of-band-membership events we expect to see here are
+                # invite rejections and rescinded knocks that we have generated ourselves.
+                assert event.type == EventTypes.Member
+                assert event.content["membership"] == Membership.LEAVE
+            else:
+                try:
+                    validate_event_for_room_version(event)
+                    await self._event_auth_handler.check_auth_rules_from_context(
+                        event, context
+                    )
+                except AuthError as err:
+                    logger.warning("Denying new event %r because %s", event, err)
+                    raise err
 
-        # Ensure that we can round trip before trying to persist in db
-        try:
-            dump = json_encoder.encode(event.content)
-            json_decoder.decode(dump)
-        except Exception:
-            logger.exception("Failed to encode content: %r", event.content)
-            raise
+            # Ensure that we can round trip before trying to persist in db
+            try:
+                dump = json_encoder.encode(event.content)
+                json_decoder.decode(dump)
+            except Exception:
+                logger.exception("Failed to encode content: %r", event.content)
+                raise
 
         # We now persist the event (and update the cache in parallel, since we
         # don't want to block on it).
+        event, context = events_and_context[0]
         try:
             result, _ = await make_deferred_yieldable(
                 gather_results(
                     (
                         run_in_background(
-                            self._persist_event,
+                            self._persist_events,
                             requester=requester,
-                            event=event,
-                            context=context,
+                            events_and_context=events_and_context,
                             ratelimit=ratelimit,
                             extra_users=extra_users,
                         ),
@@ -1407,45 +1409,47 @@ class EventCreationHandler:
 
         return result
 
-    async def _persist_event(
+    async def _persist_events(
         self,
         requester: Requester,
-        event: EventBase,
-        context: EventContext,
+        events_and_context: List[Tuple[EventBase, EventContext]],
         ratelimit: bool = True,
         extra_users: Optional[List[UserID]] = None,
     ) -> EventBase:
-        """Actually persists the event. Should only be called by
+        """Actually persists new events. Should only be called by
         `handle_new_client_event`, and see its docstring for documentation of
-        the arguments.
+        the arguments. Please note that if batch persisting events, an error in
+        handling any one of these events will result in all of the events being dropped.
 
         PartialStateConflictError: if attempting to persist a partial state event in
             a room that has been un-partial stated.
         """
 
-        # Skip push notification actions for historical messages
-        # because we don't want to notify people about old history back in time.
-        # The historical messages also do not have the proper `context.current_state_ids`
-        # and `state_groups` because they have `prev_events` that aren't persisted yet
-        # (historical messages persisted in reverse-chronological order).
-        if not event.internal_metadata.is_historical():
-            with opentracing.start_active_span("calculate_push_actions"):
-                await self._bulk_push_rule_evaluator.action_for_event_by_user(
-                    event, context
-                )
+        for event, context in events_and_context:
+            # Skip push notification actions for historical messages
+            # because we don't want to notify people about old history back in time.
+            # The historical messages also do not have the proper `context.current_state_ids`
+            # and `state_groups` because they have `prev_events` that aren't persisted yet
+            # (historical messages persisted in reverse-chronological order).
+            if not event.internal_metadata.is_historical():
+                with opentracing.start_active_span("calculate_push_actions"):
+                    await self._bulk_push_rule_evaluator.action_for_event_by_user(
+                        event, context
+                    )
 
         try:
             # If we're a worker we need to hit out to the master.
-            writer_instance = self._events_shard_config.get_instance(event.room_id)
+            first_event, _ = events_and_context[0]
+            writer_instance = self._events_shard_config.get_instance(
+                first_event.room_id
+            )
             if writer_instance != self._instance_name:
                 try:
-                    result = await self.send_event(
+                    result = await self.send_events(
                         instance_name=writer_instance,
-                        event_id=event.event_id,
+                        events_and_context=events_and_context,
                         store=self.store,
                         requester=requester,
-                        event=event,
-                        context=context,
                         ratelimit=ratelimit,
                         extra_users=extra_users,
                     )
@@ -1455,6 +1459,11 @@ class EventCreationHandler:
                     raise
                 stream_id = result["stream_id"]
                 event_id = result["event_id"]
+
+                # If we batch persisted events we return the last persisted event, otherwise
+                # we return the one event that was persisted
+                event, _ = events_and_context[-1]
+
                 if event_id != event.event_id:
                     # If we get a different event back then it means that its
                     # been de-duplicated, so we replace the given event with the
@@ -1467,15 +1476,19 @@ class EventCreationHandler:
                     event.internal_metadata.stream_ordering = stream_id
                 return event
 
-            event = await self.persist_and_notify_client_event(
-                requester, event, context, ratelimit=ratelimit, extra_users=extra_users
+            event = await self.persist_and_notify_client_events(
+                requester,
+                events_and_context,
+                ratelimit=ratelimit,
+                extra_users=extra_users,
             )
 
             return event
         except Exception:
-            # Ensure that we actually remove the entries in the push actions
-            # staging area, if we calculated them.
-            await self.store.remove_push_actions_from_staging(event.event_id)
+            for event, _ in events_and_context:
+                # Ensure that we actually remove the entries in the push actions
+                # staging area, if we calculated them.
+                await self.store.remove_push_actions_from_staging(event.event_id)
             raise
 
     async def cache_joined_hosts_for_event(
@@ -1569,23 +1582,26 @@ class EventCreationHandler:
                 Codes.BAD_ALIAS,
             )
 
-    async def persist_and_notify_client_event(
+    async def persist_and_notify_client_events(
         self,
         requester: Requester,
-        event: EventBase,
-        context: EventContext,
+        events_and_context: List[Tuple[EventBase, EventContext]],
         ratelimit: bool = True,
         extra_users: Optional[List[UserID]] = None,
     ) -> EventBase:
-        """Called when we have fully built the event, have already
-        calculated the push actions for the event, and checked auth.
+        """Called when we have fully built the events, have already
+        calculated the push actions for the events, and checked auth.
 
         This should only be run on the instance in charge of persisting events.
 
+        Please note that if batch persisting events, an error in
+        handling any one of these events will result in all of the events being dropped.
+
         Returns:
-            The persisted event. This may be different than the given event if
-            it was de-duplicated (e.g. because we had already persisted an
-            event with the same transaction ID.)
+            The persisted event, if one event is passed in, or the last event in the
+            list in the case of batch persisting. If only one event was persisted, the
+            returned event may be different than the given event if it was de-duplicated
+            (e.g. because we had already persisted an event with the same transaction ID.)
 
         Raises:
             PartialStateConflictError: if attempting to persist a partial state event in
@@ -1593,277 +1609,297 @@ class EventCreationHandler:
         """
         extra_users = extra_users or []
 
-        assert self._storage_controllers.persistence is not None
-        assert self._events_shard_config.should_handle(
-            self._instance_name, event.room_id
-        )
+        for event, context in events_and_context:
+            assert self._events_shard_config.should_handle(
+                self._instance_name, event.room_id
+            )
 
-        if ratelimit:
-            # We check if this is a room admin redacting an event so that we
-            # can apply different ratelimiting. We do this by simply checking
-            # it's not a self-redaction (to avoid having to look up whether the
-            # user is actually admin or not).
-            is_admin_redaction = False
-            if event.type == EventTypes.Redaction:
-                assert event.redacts is not None
+            if ratelimit:
+                # We check if this is a room admin redacting an event so that we
+                # can apply different ratelimiting. We do this by simply checking
+                # it's not a self-redaction (to avoid having to look up whether the
+                # user is actually admin or not).
+                is_admin_redaction = False
+                if event.type == EventTypes.Redaction:
+                    assert event.redacts is not None
+
+                    original_event = await self.store.get_event(
+                        event.redacts,
+                        redact_behaviour=EventRedactBehaviour.as_is,
+                        get_prev_content=False,
+                        allow_rejected=False,
+                        allow_none=True,
+                    )
 
-                original_event = await self.store.get_event(
-                    event.redacts,
-                    redact_behaviour=EventRedactBehaviour.as_is,
-                    get_prev_content=False,
-                    allow_rejected=False,
-                    allow_none=True,
+                    is_admin_redaction = bool(
+                        original_event and event.sender != original_event.sender
+                    )
+
+                await self.request_ratelimiter.ratelimit(
+                    requester, is_admin_redaction=is_admin_redaction
                 )
 
-                is_admin_redaction = bool(
-                    original_event and event.sender != original_event.sender
+            # run checks/actions on event based on type
+            if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+                (
+                    current_membership,
+                    _,
+                ) = await self.store.get_local_current_membership_for_user_in_room(
+                    event.state_key, event.room_id
                 )
+                if current_membership != Membership.JOIN:
+                    self._notifier.notify_user_joined_room(
+                        event.event_id, event.room_id
+                    )
 
-            await self.request_ratelimiter.ratelimit(
-                requester, is_admin_redaction=is_admin_redaction
-            )
+            await self._maybe_kick_guest_users(event, context)
 
-        if event.type == EventTypes.Member and event.membership == Membership.JOIN:
-            (
-                current_membership,
-                _,
-            ) = await self.store.get_local_current_membership_for_user_in_room(
-                event.state_key, event.room_id
-            )
-            if current_membership != Membership.JOIN:
-                self._notifier.notify_user_joined_room(event.event_id, event.room_id)
+            if event.type == EventTypes.CanonicalAlias:
+                # Validate a newly added alias or newly added alt_aliases.
 
-        await self._maybe_kick_guest_users(event, context)
+                original_alias = None
+                original_alt_aliases: object = []
 
-        if event.type == EventTypes.CanonicalAlias:
-            # Validate a newly added alias or newly added alt_aliases.
+                original_event_id = event.unsigned.get("replaces_state")
+                if original_event_id:
+                    original_alias_event = await self.store.get_event(original_event_id)
 
-            original_alias = None
-            original_alt_aliases: object = []
+                    if original_alias_event:
+                        original_alias = original_alias_event.content.get("alias", None)
+                        original_alt_aliases = original_alias_event.content.get(
+                            "alt_aliases", []
+                        )
 
-            original_event_id = event.unsigned.get("replaces_state")
-            if original_event_id:
-                original_event = await self.store.get_event(original_event_id)
+                # Check the alias is currently valid (if it has changed).
+                room_alias_str = event.content.get("alias", None)
+                directory_handler = self.hs.get_directory_handler()
+                if room_alias_str and room_alias_str != original_alias:
+                    await self._validate_canonical_alias(
+                        directory_handler, room_alias_str, event.room_id
+                    )
 
-                if original_event:
-                    original_alias = original_event.content.get("alias", None)
-                    original_alt_aliases = original_event.content.get("alt_aliases", [])
-
-            # Check the alias is currently valid (if it has changed).
-            room_alias_str = event.content.get("alias", None)
-            directory_handler = self.hs.get_directory_handler()
-            if room_alias_str and room_alias_str != original_alias:
-                await self._validate_canonical_alias(
-                    directory_handler, room_alias_str, event.room_id
-                )
+                # Check that alt_aliases is the proper form.
+                alt_aliases = event.content.get("alt_aliases", [])
+                if not isinstance(alt_aliases, (list, tuple)):
+                    raise SynapseError(
+                        400,
+                        "The alt_aliases property must be a list.",
+                        Codes.INVALID_PARAM,
+                    )
 
-            # Check that alt_aliases is the proper form.
-            alt_aliases = event.content.get("alt_aliases", [])
-            if not isinstance(alt_aliases, (list, tuple)):
-                raise SynapseError(
-                    400, "The alt_aliases property must be a list.", Codes.INVALID_PARAM
-                )
+                # If the old version of alt_aliases is of an unknown form,
+                # completely replace it.
+                if not isinstance(original_alt_aliases, (list, tuple)):
+                    # TODO: check that the original_alt_aliases' entries are all strings
+                    original_alt_aliases = []
+
+                # Check that each alias is currently valid.
+                new_alt_aliases = set(alt_aliases) - set(original_alt_aliases)
+                if new_alt_aliases:
+                    for alias_str in new_alt_aliases:
+                        await self._validate_canonical_alias(
+                            directory_handler, alias_str, event.room_id
+                        )
 
-            # If the old version of alt_aliases is of an unknown form,
-            # completely replace it.
-            if not isinstance(original_alt_aliases, (list, tuple)):
-                # TODO: check that the original_alt_aliases' entries are all strings
-                original_alt_aliases = []
+            federation_handler = self.hs.get_federation_handler()
 
-            # Check that each alias is currently valid.
-            new_alt_aliases = set(alt_aliases) - set(original_alt_aliases)
-            if new_alt_aliases:
-                for alias_str in new_alt_aliases:
-                    await self._validate_canonical_alias(
-                        directory_handler, alias_str, event.room_id
+            if event.type == EventTypes.Member:
+                if event.content["membership"] == Membership.INVITE:
+                    event.unsigned[
+                        "invite_room_state"
+                    ] = await self.store.get_stripped_room_state_from_event_context(
+                        context,
+                        self.room_prejoin_state_types,
+                        membership_user_id=event.sender,
                     )
 
-        federation_handler = self.hs.get_federation_handler()
+                    invitee = UserID.from_string(event.state_key)
+                    if not self.hs.is_mine(invitee):
+                        # TODO: Can we add signature from remote server in a nicer
+                        # way? If we have been invited by a remote server, we need
+                        # to get them to sign the event.
 
-        if event.type == EventTypes.Member:
-            if event.content["membership"] == Membership.INVITE:
-                event.unsigned[
-                    "invite_room_state"
-                ] = await self.store.get_stripped_room_state_from_event_context(
-                    context,
-                    self.room_prejoin_state_types,
-                    membership_user_id=event.sender,
-                )
+                        returned_invite = await federation_handler.send_invite(
+                            invitee.domain, event
+                        )
+                        event.unsigned.pop("room_state", None)
 
-                invitee = UserID.from_string(event.state_key)
-                if not self.hs.is_mine(invitee):
-                    # TODO: Can we add signature from remote server in a nicer
-                    # way? If we have been invited by a remote server, we need
-                    # to get them to sign the event.
+                        # TODO: Make sure the signatures actually are correct.
+                        event.signatures.update(returned_invite.signatures)
 
-                    returned_invite = await federation_handler.send_invite(
-                        invitee.domain, event
+                if event.content["membership"] == Membership.KNOCK:
+                    event.unsigned[
+                        "knock_room_state"
+                    ] = await self.store.get_stripped_room_state_from_event_context(
+                        context,
+                        self.room_prejoin_state_types,
                     )
-                    event.unsigned.pop("room_state", None)
 
-                    # TODO: Make sure the signatures actually are correct.
-                    event.signatures.update(returned_invite.signatures)
+            if event.type == EventTypes.Redaction:
+                assert event.redacts is not None
 
-            if event.content["membership"] == Membership.KNOCK:
-                event.unsigned[
-                    "knock_room_state"
-                ] = await self.store.get_stripped_room_state_from_event_context(
-                    context,
-                    self.room_prejoin_state_types,
+                original_event = await self.store.get_event(
+                    event.redacts,
+                    redact_behaviour=EventRedactBehaviour.as_is,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=True,
                 )
 
-        if event.type == EventTypes.Redaction:
-            assert event.redacts is not None
+                room_version = await self.store.get_room_version_id(event.room_id)
+                room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
-            original_event = await self.store.get_event(
-                event.redacts,
-                redact_behaviour=EventRedactBehaviour.as_is,
-                get_prev_content=False,
-                allow_rejected=False,
-                allow_none=True,
-            )
+                # we can make some additional checks now if we have the original event.
+                if original_event:
+                    if original_event.type == EventTypes.Create:
+                        raise AuthError(403, "Redacting create events is not permitted")
 
-            room_version = await self.store.get_room_version_id(event.room_id)
-            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-            # we can make some additional checks now if we have the original event.
-            if original_event:
-                if original_event.type == EventTypes.Create:
-                    raise AuthError(403, "Redacting create events is not permitted")
-
-                if original_event.room_id != event.room_id:
-                    raise SynapseError(400, "Cannot redact event from a different room")
-
-                if original_event.type == EventTypes.ServerACL:
-                    raise AuthError(403, "Redacting server ACL events is not permitted")
-
-                # Add a little safety stop-gap to prevent people from trying to
-                # redact MSC2716 related events when they're in a room version
-                # which does not support it yet. We allow people to use MSC2716
-                # events in existing room versions but only from the room
-                # creator since it does not require any changes to the auth
-                # rules and in effect, the redaction algorithm . In the
-                # supported room version, we add the `historical` power level to
-                # auth the MSC2716 related events and adjust the redaction
-                # algorthim to keep the `historical` field around (redacting an
-                # event should only strip fields which don't affect the
-                # structural protocol level).
-                is_msc2716_event = (
-                    original_event.type == EventTypes.MSC2716_INSERTION
-                    or original_event.type == EventTypes.MSC2716_BATCH
-                    or original_event.type == EventTypes.MSC2716_MARKER
-                )
-                if not room_version_obj.msc2716_historical and is_msc2716_event:
-                    raise AuthError(
-                        403,
-                        "Redacting MSC2716 events is not supported in this room version",
-                    )
+                    if original_event.room_id != event.room_id:
+                        raise SynapseError(
+                            400, "Cannot redact event from a different room"
+                        )
 
-            event_types = event_auth.auth_types_for_event(event.room_version, event)
-            prev_state_ids = await context.get_prev_state_ids(
-                StateFilter.from_types(event_types)
-            )
+                    if original_event.type == EventTypes.ServerACL:
+                        raise AuthError(
+                            403, "Redacting server ACL events is not permitted"
+                        )
 
-            auth_events_ids = self._event_auth_handler.compute_auth_events(
-                event, prev_state_ids, for_verification=True
-            )
-            auth_events_map = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()}
+                    # Add a little safety stop-gap to prevent people from trying to
+                    # redact MSC2716 related events when they're in a room version
+                    # which does not support it yet. We allow people to use MSC2716
+                    # events in existing room versions but only from the room
+                    # creator since it does not require any changes to the auth
+                    # rules and in effect, the redaction algorithm . In the
+                    # supported room version, we add the `historical` power level to
+                    # auth the MSC2716 related events and adjust the redaction
+                    # algorthim to keep the `historical` field around (redacting an
+                    # event should only strip fields which don't affect the
+                    # structural protocol level).
+                    is_msc2716_event = (
+                        original_event.type == EventTypes.MSC2716_INSERTION
+                        or original_event.type == EventTypes.MSC2716_BATCH
+                        or original_event.type == EventTypes.MSC2716_MARKER
+                    )
+                    if not room_version_obj.msc2716_historical and is_msc2716_event:
+                        raise AuthError(
+                            403,
+                            "Redacting MSC2716 events is not supported in this room version",
+                        )
 
-            if event_auth.check_redaction(
-                room_version_obj, event, auth_events=auth_events
-            ):
-                # this user doesn't have 'redact' rights, so we need to do some more
-                # checks on the original event. Let's start by checking the original
-                # event exists.
-                if not original_event:
-                    raise NotFoundError("Could not find event %s" % (event.redacts,))
-
-                if event.user_id != original_event.user_id:
-                    raise AuthError(403, "You don't have permission to redact events")
-
-                # all the checks are done.
-                event.internal_metadata.recheck_redaction = False
-
-        if event.type == EventTypes.Create:
-            prev_state_ids = await context.get_prev_state_ids()
-            if prev_state_ids:
-                raise AuthError(403, "Changing the room create event is forbidden")
-
-        if event.type == EventTypes.MSC2716_INSERTION:
-            room_version = await self.store.get_room_version_id(event.room_id)
-            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-            create_event = await self.store.get_create_event_for_room(event.room_id)
-            room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
-
-            # Only check an insertion event if the room version
-            # supports it or the event is from the room creator.
-            if room_version_obj.msc2716_historical or (
-                self.config.experimental.msc2716_enabled
-                and event.sender == room_creator
-            ):
-                next_batch_id = event.content.get(
-                    EventContentFields.MSC2716_NEXT_BATCH_ID
+                event_types = event_auth.auth_types_for_event(event.room_version, event)
+                prev_state_ids = await context.get_prev_state_ids(
+                    StateFilter.from_types(event_types)
                 )
-                conflicting_insertion_event_id = None
-                if next_batch_id:
-                    conflicting_insertion_event_id = (
-                        await self.store.get_insertion_event_id_by_batch_id(
-                            event.room_id, next_batch_id
+
+                auth_events_ids = self._event_auth_handler.compute_auth_events(
+                    event, prev_state_ids, for_verification=True
+                )
+                auth_events_map = await self.store.get_events(auth_events_ids)
+                auth_events = {
+                    (e.type, e.state_key): e for e in auth_events_map.values()
+                }
+
+                if event_auth.check_redaction(
+                    room_version_obj, event, auth_events=auth_events
+                ):
+                    # this user doesn't have 'redact' rights, so we need to do some more
+                    # checks on the original event. Let's start by checking the original
+                    # event exists.
+                    if not original_event:
+                        raise NotFoundError(
+                            "Could not find event %s" % (event.redacts,)
                         )
+
+                    if event.user_id != original_event.user_id:
+                        raise AuthError(
+                            403, "You don't have permission to redact events"
+                        )
+
+                    # all the checks are done.
+                    event.internal_metadata.recheck_redaction = False
+
+            if event.type == EventTypes.Create:
+                prev_state_ids = await context.get_prev_state_ids()
+                if prev_state_ids:
+                    raise AuthError(403, "Changing the room create event is forbidden")
+
+            if event.type == EventTypes.MSC2716_INSERTION:
+                room_version = await self.store.get_room_version_id(event.room_id)
+                room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+                create_event = await self.store.get_create_event_for_room(event.room_id)
+                room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
+
+                # Only check an insertion event if the room version
+                # supports it or the event is from the room creator.
+                if room_version_obj.msc2716_historical or (
+                    self.config.experimental.msc2716_enabled
+                    and event.sender == room_creator
+                ):
+                    next_batch_id = event.content.get(
+                        EventContentFields.MSC2716_NEXT_BATCH_ID
                     )
-                if conflicting_insertion_event_id is not None:
-                    # The current insertion event that we're processing is invalid
-                    # because an insertion event already exists in the room with the
-                    # same next_batch_id. We can't allow multiple because the batch
-                    # pointing will get weird, e.g. we can't determine which insertion
-                    # event the batch event is pointing to.
-                    raise SynapseError(
-                        HTTPStatus.BAD_REQUEST,
-                        "Another insertion event already exists with the same next_batch_id",
-                        errcode=Codes.INVALID_PARAM,
-                    )
+                    conflicting_insertion_event_id = None
+                    if next_batch_id:
+                        conflicting_insertion_event_id = (
+                            await self.store.get_insertion_event_id_by_batch_id(
+                                event.room_id, next_batch_id
+                            )
+                        )
+                    if conflicting_insertion_event_id is not None:
+                        # The current insertion event that we're processing is invalid
+                        # because an insertion event already exists in the room with the
+                        # same next_batch_id. We can't allow multiple because the batch
+                        # pointing will get weird, e.g. we can't determine which insertion
+                        # event the batch event is pointing to.
+                        raise SynapseError(
+                            HTTPStatus.BAD_REQUEST,
+                            "Another insertion event already exists with the same next_batch_id",
+                            errcode=Codes.INVALID_PARAM,
+                        )
 
-        # Mark any `m.historical` messages as backfilled so they don't appear
-        # in `/sync` and have the proper decrementing `stream_ordering` as we import
-        backfilled = False
-        if event.internal_metadata.is_historical():
-            backfilled = True
+            # Mark any `m.historical` messages as backfilled so they don't appear
+            # in `/sync` and have the proper decrementing `stream_ordering` as we import
+            backfilled = False
+            if event.internal_metadata.is_historical():
+                backfilled = True
 
-        # Note that this returns the event that was persisted, which may not be
-        # the same as we passed in if it was deduplicated due transaction IDs.
+        assert self._storage_controllers.persistence is not None
         (
-            event,
-            event_pos,
+            persisted_events,
             max_stream_token,
-        ) = await self._storage_controllers.persistence.persist_event(
-            event, context=context, backfilled=backfilled
+        ) = await self._storage_controllers.persistence.persist_events(
+            events_and_context, backfilled=backfilled
         )
 
-        if self._ephemeral_events_enabled:
-            # If there's an expiry timestamp on the event, schedule its expiry.
-            self._message_handler.maybe_schedule_expiry(event)
+        for event in persisted_events:
+            if self._ephemeral_events_enabled:
+                # If there's an expiry timestamp on the event, schedule its expiry.
+                self._message_handler.maybe_schedule_expiry(event)
 
-        async def _notify() -> None:
-            try:
-                await self.notifier.on_new_room_event(
-                    event, event_pos, max_stream_token, extra_users=extra_users
-                )
-            except Exception:
-                logger.exception(
-                    "Error notifying about new room event %s",
-                    event.event_id,
-                )
+            stream_ordering = event.internal_metadata.stream_ordering
+            assert stream_ordering is not None
+            pos = PersistedEventPosition(self._instance_name, stream_ordering)
+
+            async def _notify() -> None:
+                try:
+                    await self.notifier.on_new_room_event(
+                        event, pos, max_stream_token, extra_users=extra_users
+                    )
+                except Exception:
+                    logger.exception(
+                        "Error notifying about new room event %s",
+                        event.event_id,
+                    )
 
-        run_in_background(_notify)
+            run_in_background(_notify)
 
-        if event.type == EventTypes.Message:
-            # We don't want to block sending messages on any presence code. This
-            # matters as sometimes presence code can take a while.
-            run_in_background(self._bump_active_time, requester.user)
+            if event.type == EventTypes.Message:
+                # We don't want to block sending messages on any presence code. This
+                # matters as sometimes presence code can take a while.
+                run_in_background(self._bump_active_time, requester.user)
 
-        return event
+        return persisted_events[-1]
 
     async def _maybe_kick_guest_users(
         self, event: EventBase, context: EventContext
@@ -1952,8 +1988,7 @@ class EventCreationHandler:
                 # shadow-banned user.
                 await self.handle_new_client_event(
                     requester,
-                    event,
-                    context,
+                    events_and_context=[(event, context)],
                     ratelimit=False,
                     ignore_shadow_ban=True,
                 )
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 09a1a82e6c..b220238e55 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -301,8 +301,7 @@ class RoomCreationHandler:
         # now send the tombstone
         await self.event_creation_handler.handle_new_client_event(
             requester=requester,
-            event=tombstone_event,
-            context=tombstone_context,
+            events_and_context=[(tombstone_event, tombstone_context)],
         )
 
         state_filter = StateFilter.from_types(
@@ -1057,8 +1056,10 @@ class RoomCreationHandler:
         creator_id = creator.user.to_string()
         event_keys = {"room_id": room_id, "sender": creator_id, "state_key": ""}
         depth = 1
+
         # the last event sent/persisted to the db
         last_sent_event_id: Optional[str] = None
+
         # the most recently created event
         prev_event: List[str] = []
         # a map of event types, state keys -> event_ids. We collect these mappings this as events are
@@ -1112,8 +1113,7 @@ class RoomCreationHandler:
 
             ev = await self.event_creation_handler.handle_new_client_event(
                 requester=creator,
-                event=event,
-                context=context,
+                events_and_context=[(event, context)],
                 ratelimit=False,
                 ignore_shadow_ban=True,
             )
@@ -1152,7 +1152,6 @@ class RoomCreationHandler:
             prev_event_ids=[last_sent_event_id],
             depth=depth,
         )
-        last_sent_event_id = member_event_id
         prev_event = [member_event_id]
 
         # update the depth and state map here as the membership event has been created
@@ -1168,7 +1167,7 @@ class RoomCreationHandler:
                 EventTypes.PowerLevels, pl_content, False
             )
             current_state_group = power_context._state_group
-            last_sent_stream_id = await send(power_event, power_context, creator)
+            await send(power_event, power_context, creator)
         else:
             power_level_content: JsonDict = {
                 "users": {creator_id: 100},
@@ -1217,7 +1216,7 @@ class RoomCreationHandler:
                 False,
             )
             current_state_group = pl_context._state_group
-            last_sent_stream_id = await send(pl_event, pl_context, creator)
+            await send(pl_event, pl_context, creator)
 
         events_to_send = []
         if room_alias and (EventTypes.CanonicalAlias, "") not in initial_state:
@@ -1271,9 +1270,11 @@ class RoomCreationHandler:
             )
             events_to_send.append((encryption_event, encryption_context))
 
-        for event, context in events_to_send:
-            last_sent_stream_id = await send(event, context, creator)
-        return last_sent_stream_id, last_sent_event_id, depth
+        last_event = await self.event_creation_handler.handle_new_client_event(
+            creator, events_to_send, ignore_shadow_ban=True
+        )
+        assert last_event.internal_metadata.stream_ordering is not None
+        return last_event.internal_metadata.stream_ordering, last_event.event_id, depth
 
     def _generate_room_id(self) -> str:
         """Generates a random room ID.
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 1414e575d6..411a6fb22f 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -379,8 +379,7 @@ class RoomBatchHandler:
                 await self.create_requester_for_user_id_from_app_service(
                     event.sender, app_service_requester.app_service
                 ),
-                event=event,
-                context=context,
+                events_and_context=[(event, context)],
             )
 
         return event_ids
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 8d01f4bf2b..88158822e0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -432,8 +432,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
         with opentracing.start_active_span("handle_new_client_event"):
             result_event = await self.event_creation_handler.handle_new_client_event(
                 requester,
-                event,
-                context,
+                events_and_context=[(event, context)],
                 extra_users=[target],
                 ratelimit=ratelimit,
             )
@@ -1252,7 +1251,10 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 raise SynapseError(403, "This room has been blocked on this server")
 
         event = await self.event_creation_handler.handle_new_client_event(
-            requester, event, context, extra_users=[target_user], ratelimit=ratelimit
+            requester,
+            events_and_context=[(event, context)],
+            extra_users=[target_user],
+            ratelimit=ratelimit,
         )
 
         prev_member_event_id = prev_state_ids.get(
@@ -1860,8 +1862,7 @@ class RoomMemberMasterHandler(RoomMemberHandler):
 
         result_event = await self.event_creation_handler.handle_new_client_event(
             requester,
-            event,
-            context,
+            events_and_context=[(event, context)],
             extra_users=[UserID.from_string(target_user)],
         )
         # we know it was persisted, so must have a stream ordering
diff --git a/synapse/replication/http/__init__.py b/synapse/replication/http/__init__.py
index 53aa7fa4c6..ac9a92240a 100644
--- a/synapse/replication/http/__init__.py
+++ b/synapse/replication/http/__init__.py
@@ -25,6 +25,7 @@ from synapse.replication.http import (
     push,
     register,
     send_event,
+    send_events,
     state,
     streams,
 )
@@ -43,6 +44,7 @@ class ReplicationRestResource(JsonResource):
 
     def register_servlets(self, hs: "HomeServer") -> None:
         send_event.register_servlets(hs, self)
+        send_events.register_servlets(hs, self)
         federation.register_servlets(hs, self)
         presence.register_servlets(hs, self)
         membership.register_servlets(hs, self)
diff --git a/synapse/replication/http/send_event.py b/synapse/replication/http/send_event.py
index 486f04723c..4215a1c1bc 100644
--- a/synapse/replication/http/send_event.py
+++ b/synapse/replication/http/send_event.py
@@ -141,8 +141,8 @@ class ReplicationSendEventRestServlet(ReplicationEndpoint):
             "Got event to send with ID: %s into room: %s", event.event_id, event.room_id
         )
 
-        event = await self.event_creation_handler.persist_and_notify_client_event(
-            requester, event, context, ratelimit=ratelimit, extra_users=extra_users
+        event = await self.event_creation_handler.persist_and_notify_client_events(
+            requester, [(event, context)], ratelimit=ratelimit, extra_users=extra_users
         )
 
         return (
diff --git a/synapse/replication/http/send_events.py b/synapse/replication/http/send_events.py
new file mode 100644
index 0000000000..8889bbb644
--- /dev/null
+++ b/synapse/replication/http/send_events.py
@@ -0,0 +1,171 @@
+# Copyright 2022 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+from typing import TYPE_CHECKING, List, Tuple
+
+from twisted.web.server import Request
+
+from synapse.api.room_versions import KNOWN_ROOM_VERSIONS
+from synapse.events import EventBase, make_event_from_dict
+from synapse.events.snapshot import EventContext
+from synapse.http.server import HttpServer
+from synapse.http.servlet import parse_json_object_from_request
+from synapse.replication.http._base import ReplicationEndpoint
+from synapse.types import JsonDict, Requester, UserID
+from synapse.util.metrics import Measure
+
+if TYPE_CHECKING:
+    from synapse.server import HomeServer
+    from synapse.storage.databases.main import DataStore
+
+logger = logging.getLogger(__name__)
+
+
+class ReplicationSendEventsRestServlet(ReplicationEndpoint):
+    """Handles batches of newly created events on workers, including persisting and
+    notifying.
+
+    The API looks like:
+
+        POST /_synapse/replication/send_events/:txn_id
+
+        {
+            "events": [{
+                "event": { .. serialized event .. },
+                "room_version": .., // "1", "2", "3", etc: the version of the room
+                            // containing the event
+                "event_format_version": .., // 1,2,3 etc: the event format version
+                "internal_metadata": { .. serialized internal_metadata .. },
+                "outlier": true|false,
+                "rejected_reason": ..,   // The event.rejected_reason field
+                "context": { .. serialized event context .. },
+                "requester": { .. serialized requester .. },
+                "ratelimit": true,
+            }]
+        }
+
+        200 OK
+
+        { "stream_id": 12345, "event_id": "$abcdef..." }
+
+    Responds with a 409 when a `PartialStateConflictError` is raised due to an event
+    context that needs to be recomputed due to the un-partial stating of a room.
+
+    """
+
+    NAME = "send_events"
+    PATH_ARGS = ()
+
+    def __init__(self, hs: "HomeServer"):
+        super().__init__(hs)
+
+        self.event_creation_handler = hs.get_event_creation_handler()
+        self.store = hs.get_datastores().main
+        self._storage_controllers = hs.get_storage_controllers()
+        self.clock = hs.get_clock()
+
+    @staticmethod
+    async def _serialize_payload(  # type: ignore[override]
+        events_and_context: List[Tuple[EventBase, EventContext]],
+        store: "DataStore",
+        requester: Requester,
+        ratelimit: bool,
+        extra_users: List[UserID],
+    ) -> JsonDict:
+        """
+        Args:
+            store
+            requester
+            events_and_ctx
+            ratelimit
+        """
+        serialized_events = []
+
+        for event, context in events_and_context:
+            serialized_context = await context.serialize(event, store)
+            serialized_event = {
+                "event": event.get_pdu_json(),
+                "room_version": event.room_version.identifier,
+                "event_format_version": event.format_version,
+                "internal_metadata": event.internal_metadata.get_dict(),
+                "outlier": event.internal_metadata.is_outlier(),
+                "rejected_reason": event.rejected_reason,
+                "context": serialized_context,
+                "requester": requester.serialize(),
+                "ratelimit": ratelimit,
+                "extra_users": [u.to_string() for u in extra_users],
+            }
+            serialized_events.append(serialized_event)
+
+        payload = {"events": serialized_events}
+
+        return payload
+
+    async def _handle_request(  # type: ignore[override]
+        self, request: Request
+    ) -> Tuple[int, JsonDict]:
+        with Measure(self.clock, "repl_send_events_parse"):
+            payload = parse_json_object_from_request(request)
+            events_and_context = []
+            events = payload["events"]
+
+            for event_payload in events:
+                event_dict = event_payload["event"]
+                room_ver = KNOWN_ROOM_VERSIONS[event_payload["room_version"]]
+                internal_metadata = event_payload["internal_metadata"]
+                rejected_reason = event_payload["rejected_reason"]
+
+                event = make_event_from_dict(
+                    event_dict, room_ver, internal_metadata, rejected_reason
+                )
+                event.internal_metadata.outlier = event_payload["outlier"]
+
+                requester = Requester.deserialize(
+                    self.store, event_payload["requester"]
+                )
+                context = EventContext.deserialize(
+                    self._storage_controllers, event_payload["context"]
+                )
+
+                ratelimit = event_payload["ratelimit"]
+                events_and_context.append((event, context))
+
+                extra_users = [
+                    UserID.from_string(u) for u in event_payload["extra_users"]
+                ]
+
+                logger.info(
+                    "Got batch of events to send, last ID of batch is: %s, sending into room: %s",
+                    event.event_id,
+                    event.room_id,
+                )
+
+            last_event = (
+                await self.event_creation_handler.persist_and_notify_client_events(
+                    requester, events_and_context, ratelimit, extra_users
+                )
+            )
+
+        return (
+            200,
+            {
+                "stream_id": last_event.internal_metadata.stream_ordering,
+                "event_id": last_event.event_id,
+            },
+        )
+
+
+def register_servlets(hs: "HomeServer", http_server: HttpServer) -> None:
+    ReplicationSendEventsRestServlet(hs).register(http_server)
diff --git a/tests/handlers/test_message.py b/tests/handlers/test_message.py
index 986b50ce0c..99384837d0 100644
--- a/tests/handlers/test_message.py
+++ b/tests/handlers/test_message.py
@@ -105,7 +105,10 @@ class EventCreationTestCase(unittest.HomeserverTestCase):
         event1, context = self._create_duplicate_event(txn_id)
 
         ret_event1 = self.get_success(
-            self.handler.handle_new_client_event(self.requester, event1, context)
+            self.handler.handle_new_client_event(
+                self.requester,
+                events_and_context=[(event1, context)],
+            )
         )
         stream_id1 = ret_event1.internal_metadata.stream_ordering
 
@@ -118,7 +121,10 @@ class EventCreationTestCase(unittest.HomeserverTestCase):
         self.assertNotEqual(event1.event_id, event2.event_id)
 
         ret_event2 = self.get_success(
-            self.handler.handle_new_client_event(self.requester, event2, context)
+            self.handler.handle_new_client_event(
+                self.requester,
+                events_and_context=[(event2, context)],
+            )
         )
         stream_id2 = ret_event2.internal_metadata.stream_ordering
 
diff --git a/tests/handlers/test_register.py b/tests/handlers/test_register.py
index 86b3d51975..765df75d91 100644
--- a/tests/handlers/test_register.py
+++ b/tests/handlers/test_register.py
@@ -497,7 +497,9 @@ class RegistrationTestCase(unittest.HomeserverTestCase):
             )
         )
         self.get_success(
-            event_creation_handler.handle_new_client_event(requester, event, context)
+            event_creation_handler.handle_new_client_event(
+                requester, events_and_context=[(event, context)]
+            )
         )
 
         # Register a second user, which won't be be in the room (or even have an invite)
diff --git a/tests/storage/test_event_chain.py b/tests/storage/test_event_chain.py
index a0ce077a99..de9f4af2de 100644
--- a/tests/storage/test_event_chain.py
+++ b/tests/storage/test_event_chain.py
@@ -531,7 +531,9 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
             )
         )
         self.get_success(
-            event_handler.handle_new_client_event(self.requester, event, context)
+            event_handler.handle_new_client_event(
+                self.requester, events_and_context=[(event, context)]
+            )
         )
         state1 = set(self.get_success(context.get_current_state_ids()).values())
 
@@ -549,7 +551,9 @@ class EventChainBackgroundUpdateTestCase(HomeserverTestCase):
             )
         )
         self.get_success(
-            event_handler.handle_new_client_event(self.requester, event, context)
+            event_handler.handle_new_client_event(
+                self.requester, events_and_context=[(event, context)]
+            )
         )
         state2 = set(self.get_success(context.get_current_state_ids()).values())
 
diff --git a/tests/unittest.py b/tests/unittest.py
index 00cb023198..5116be338e 100644
--- a/tests/unittest.py
+++ b/tests/unittest.py
@@ -734,7 +734,9 @@ class HomeserverTestCase(TestCase):
             event.internal_metadata.soft_failed = True
 
         self.get_success(
-            event_creator.handle_new_client_event(requester, event, context)
+            event_creator.handle_new_client_event(
+                requester, events_and_context=[(event, context)]
+            )
         )
 
         return event.event_id