summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py663
1 files changed, 349 insertions, 314 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 062f93bc67..00e7645ba5 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -56,11 +56,13 @@ from synapse.logging import opentracing
 from synapse.logging.context import make_deferred_yieldable, run_in_background
 from synapse.metrics.background_process_metrics import run_as_background_process
 from synapse.replication.http.send_event import ReplicationSendEventRestServlet
+from synapse.replication.http.send_events import ReplicationSendEventsRestServlet
 from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.storage.databases.main.events_worker import EventRedactBehaviour
 from synapse.storage.state import StateFilter
 from synapse.types import (
     MutableStateMap,
+    PersistedEventPosition,
     Requester,
     RoomAlias,
     StateMap,
@@ -493,6 +495,7 @@ class EventCreationHandler:
             self.membership_types_to_include_profile_data_in.add(Membership.INVITE)
 
         self.send_event = ReplicationSendEventRestServlet.make_client(hs)
+        self.send_events = ReplicationSendEventsRestServlet.make_client(hs)
 
         self.request_ratelimiter = hs.get_request_ratelimiter()
 
@@ -1016,8 +1019,7 @@ class EventCreationHandler:
 
             ev = await self.handle_new_client_event(
                 requester=requester,
-                event=event,
-                context=context,
+                events_and_context=[(event, context)],
                 ratelimit=ratelimit,
                 ignore_shadow_ban=ignore_shadow_ban,
             )
@@ -1293,13 +1295,13 @@ class EventCreationHandler:
     async def handle_new_client_event(
         self,
         requester: Requester,
-        event: EventBase,
-        context: EventContext,
+        events_and_context: List[Tuple[EventBase, EventContext]],
         ratelimit: bool = True,
         extra_users: Optional[List[UserID]] = None,
         ignore_shadow_ban: bool = False,
     ) -> EventBase:
-        """Processes a new event.
+        """Processes new events. Please note that if batch persisting events, an error in
+        handling any one of these events will result in all of the events being dropped.
 
         This includes deduplicating, checking auth, persisting,
         notifying users, sending to remote servers, etc.
@@ -1309,8 +1311,7 @@ class EventCreationHandler:
 
         Args:
             requester
-            event
-            context
+            events_and_context: A list of one or more tuples of event, context to be persisted
             ratelimit
             extra_users: Any extra users to notify about event
 
@@ -1328,62 +1329,63 @@ class EventCreationHandler:
         """
         extra_users = extra_users or []
 
-        # we don't apply shadow-banning to membership events here. Invites are blocked
-        # higher up the stack, and we allow shadow-banned users to send join and leave
-        # events as normal.
-        if (
-            event.type != EventTypes.Member
-            and not ignore_shadow_ban
-            and requester.shadow_banned
-        ):
-            # We randomly sleep a bit just to annoy the requester.
-            await self.clock.sleep(random.randint(1, 10))
-            raise ShadowBanError()
+        for event, context in events_and_context:
+            # we don't apply shadow-banning to membership events here. Invites are blocked
+            # higher up the stack, and we allow shadow-banned users to send join and leave
+            # events as normal.
+            if (
+                event.type != EventTypes.Member
+                and not ignore_shadow_ban
+                and requester.shadow_banned
+            ):
+                # We randomly sleep a bit just to annoy the requester.
+                await self.clock.sleep(random.randint(1, 10))
+                raise ShadowBanError()
 
-        if event.is_state():
-            prev_event = await self.deduplicate_state_event(event, context)
-            if prev_event is not None:
-                logger.info(
-                    "Not bothering to persist state event %s duplicated by %s",
-                    event.event_id,
-                    prev_event.event_id,
-                )
-                return prev_event
+            if event.is_state():
+                prev_event = await self.deduplicate_state_event(event, context)
+                if prev_event is not None:
+                    logger.info(
+                        "Not bothering to persist state event %s duplicated by %s",
+                        event.event_id,
+                        prev_event.event_id,
+                    )
+                    return prev_event
 
-        if event.internal_metadata.is_out_of_band_membership():
-            # the only sort of out-of-band-membership events we expect to see here are
-            # invite rejections and rescinded knocks that we have generated ourselves.
-            assert event.type == EventTypes.Member
-            assert event.content["membership"] == Membership.LEAVE
-        else:
-            try:
-                validate_event_for_room_version(event)
-                await self._event_auth_handler.check_auth_rules_from_context(
-                    event, context
-                )
-            except AuthError as err:
-                logger.warning("Denying new event %r because %s", event, err)
-                raise err
+            if event.internal_metadata.is_out_of_band_membership():
+                # the only sort of out-of-band-membership events we expect to see here are
+                # invite rejections and rescinded knocks that we have generated ourselves.
+                assert event.type == EventTypes.Member
+                assert event.content["membership"] == Membership.LEAVE
+            else:
+                try:
+                    validate_event_for_room_version(event)
+                    await self._event_auth_handler.check_auth_rules_from_context(
+                        event, context
+                    )
+                except AuthError as err:
+                    logger.warning("Denying new event %r because %s", event, err)
+                    raise err
 
-        # Ensure that we can round trip before trying to persist in db
-        try:
-            dump = json_encoder.encode(event.content)
-            json_decoder.decode(dump)
-        except Exception:
-            logger.exception("Failed to encode content: %r", event.content)
-            raise
+            # Ensure that we can round trip before trying to persist in db
+            try:
+                dump = json_encoder.encode(event.content)
+                json_decoder.decode(dump)
+            except Exception:
+                logger.exception("Failed to encode content: %r", event.content)
+                raise
 
         # We now persist the event (and update the cache in parallel, since we
         # don't want to block on it).
+        event, context = events_and_context[0]
         try:
             result, _ = await make_deferred_yieldable(
                 gather_results(
                     (
                         run_in_background(
-                            self._persist_event,
+                            self._persist_events,
                             requester=requester,
-                            event=event,
-                            context=context,
+                            events_and_context=events_and_context,
                             ratelimit=ratelimit,
                             extra_users=extra_users,
                         ),
@@ -1407,45 +1409,47 @@ class EventCreationHandler:
 
         return result
 
-    async def _persist_event(
+    async def _persist_events(
         self,
         requester: Requester,
-        event: EventBase,
-        context: EventContext,
+        events_and_context: List[Tuple[EventBase, EventContext]],
         ratelimit: bool = True,
         extra_users: Optional[List[UserID]] = None,
     ) -> EventBase:
-        """Actually persists the event. Should only be called by
+        """Actually persists new events. Should only be called by
         `handle_new_client_event`, and see its docstring for documentation of
-        the arguments.
+        the arguments. Please note that if batch persisting events, an error in
+        handling any one of these events will result in all of the events being dropped.
 
         PartialStateConflictError: if attempting to persist a partial state event in
             a room that has been un-partial stated.
         """
 
-        # Skip push notification actions for historical messages
-        # because we don't want to notify people about old history back in time.
-        # The historical messages also do not have the proper `context.current_state_ids`
-        # and `state_groups` because they have `prev_events` that aren't persisted yet
-        # (historical messages persisted in reverse-chronological order).
-        if not event.internal_metadata.is_historical():
-            with opentracing.start_active_span("calculate_push_actions"):
-                await self._bulk_push_rule_evaluator.action_for_event_by_user(
-                    event, context
-                )
+        for event, context in events_and_context:
+            # Skip push notification actions for historical messages
+            # because we don't want to notify people about old history back in time.
+            # The historical messages also do not have the proper `context.current_state_ids`
+            # and `state_groups` because they have `prev_events` that aren't persisted yet
+            # (historical messages persisted in reverse-chronological order).
+            if not event.internal_metadata.is_historical():
+                with opentracing.start_active_span("calculate_push_actions"):
+                    await self._bulk_push_rule_evaluator.action_for_event_by_user(
+                        event, context
+                    )
 
         try:
             # If we're a worker we need to hit out to the master.
-            writer_instance = self._events_shard_config.get_instance(event.room_id)
+            first_event, _ = events_and_context[0]
+            writer_instance = self._events_shard_config.get_instance(
+                first_event.room_id
+            )
             if writer_instance != self._instance_name:
                 try:
-                    result = await self.send_event(
+                    result = await self.send_events(
                         instance_name=writer_instance,
-                        event_id=event.event_id,
+                        events_and_context=events_and_context,
                         store=self.store,
                         requester=requester,
-                        event=event,
-                        context=context,
                         ratelimit=ratelimit,
                         extra_users=extra_users,
                     )
@@ -1455,6 +1459,11 @@ class EventCreationHandler:
                     raise
                 stream_id = result["stream_id"]
                 event_id = result["event_id"]
+
+                # If we batch persisted events we return the last persisted event, otherwise
+                # we return the one event that was persisted
+                event, _ = events_and_context[-1]
+
                 if event_id != event.event_id:
                     # If we get a different event back then it means that its
                     # been de-duplicated, so we replace the given event with the
@@ -1467,15 +1476,19 @@ class EventCreationHandler:
                     event.internal_metadata.stream_ordering = stream_id
                 return event
 
-            event = await self.persist_and_notify_client_event(
-                requester, event, context, ratelimit=ratelimit, extra_users=extra_users
+            event = await self.persist_and_notify_client_events(
+                requester,
+                events_and_context,
+                ratelimit=ratelimit,
+                extra_users=extra_users,
             )
 
             return event
         except Exception:
-            # Ensure that we actually remove the entries in the push actions
-            # staging area, if we calculated them.
-            await self.store.remove_push_actions_from_staging(event.event_id)
+            for event, _ in events_and_context:
+                # Ensure that we actually remove the entries in the push actions
+                # staging area, if we calculated them.
+                await self.store.remove_push_actions_from_staging(event.event_id)
             raise
 
     async def cache_joined_hosts_for_event(
@@ -1569,23 +1582,26 @@ class EventCreationHandler:
                 Codes.BAD_ALIAS,
             )
 
-    async def persist_and_notify_client_event(
+    async def persist_and_notify_client_events(
         self,
         requester: Requester,
-        event: EventBase,
-        context: EventContext,
+        events_and_context: List[Tuple[EventBase, EventContext]],
         ratelimit: bool = True,
         extra_users: Optional[List[UserID]] = None,
     ) -> EventBase:
-        """Called when we have fully built the event, have already
-        calculated the push actions for the event, and checked auth.
+        """Called when we have fully built the events, have already
+        calculated the push actions for the events, and checked auth.
 
         This should only be run on the instance in charge of persisting events.
 
+        Please note that if batch persisting events, an error in
+        handling any one of these events will result in all of the events being dropped.
+
         Returns:
-            The persisted event. This may be different than the given event if
-            it was de-duplicated (e.g. because we had already persisted an
-            event with the same transaction ID.)
+            The persisted event, if one event is passed in, or the last event in the
+            list in the case of batch persisting. If only one event was persisted, the
+            returned event may be different than the given event if it was de-duplicated
+            (e.g. because we had already persisted an event with the same transaction ID.)
 
         Raises:
             PartialStateConflictError: if attempting to persist a partial state event in
@@ -1593,277 +1609,297 @@ class EventCreationHandler:
         """
         extra_users = extra_users or []
 
-        assert self._storage_controllers.persistence is not None
-        assert self._events_shard_config.should_handle(
-            self._instance_name, event.room_id
-        )
+        for event, context in events_and_context:
+            assert self._events_shard_config.should_handle(
+                self._instance_name, event.room_id
+            )
 
-        if ratelimit:
-            # We check if this is a room admin redacting an event so that we
-            # can apply different ratelimiting. We do this by simply checking
-            # it's not a self-redaction (to avoid having to look up whether the
-            # user is actually admin or not).
-            is_admin_redaction = False
-            if event.type == EventTypes.Redaction:
-                assert event.redacts is not None
+            if ratelimit:
+                # We check if this is a room admin redacting an event so that we
+                # can apply different ratelimiting. We do this by simply checking
+                # it's not a self-redaction (to avoid having to look up whether the
+                # user is actually admin or not).
+                is_admin_redaction = False
+                if event.type == EventTypes.Redaction:
+                    assert event.redacts is not None
+
+                    original_event = await self.store.get_event(
+                        event.redacts,
+                        redact_behaviour=EventRedactBehaviour.as_is,
+                        get_prev_content=False,
+                        allow_rejected=False,
+                        allow_none=True,
+                    )
 
-                original_event = await self.store.get_event(
-                    event.redacts,
-                    redact_behaviour=EventRedactBehaviour.as_is,
-                    get_prev_content=False,
-                    allow_rejected=False,
-                    allow_none=True,
+                    is_admin_redaction = bool(
+                        original_event and event.sender != original_event.sender
+                    )
+
+                await self.request_ratelimiter.ratelimit(
+                    requester, is_admin_redaction=is_admin_redaction
                 )
 
-                is_admin_redaction = bool(
-                    original_event and event.sender != original_event.sender
+            # run checks/actions on event based on type
+            if event.type == EventTypes.Member and event.membership == Membership.JOIN:
+                (
+                    current_membership,
+                    _,
+                ) = await self.store.get_local_current_membership_for_user_in_room(
+                    event.state_key, event.room_id
                 )
+                if current_membership != Membership.JOIN:
+                    self._notifier.notify_user_joined_room(
+                        event.event_id, event.room_id
+                    )
 
-            await self.request_ratelimiter.ratelimit(
-                requester, is_admin_redaction=is_admin_redaction
-            )
+            await self._maybe_kick_guest_users(event, context)
 
-        if event.type == EventTypes.Member and event.membership == Membership.JOIN:
-            (
-                current_membership,
-                _,
-            ) = await self.store.get_local_current_membership_for_user_in_room(
-                event.state_key, event.room_id
-            )
-            if current_membership != Membership.JOIN:
-                self._notifier.notify_user_joined_room(event.event_id, event.room_id)
+            if event.type == EventTypes.CanonicalAlias:
+                # Validate a newly added alias or newly added alt_aliases.
 
-        await self._maybe_kick_guest_users(event, context)
+                original_alias = None
+                original_alt_aliases: object = []
 
-        if event.type == EventTypes.CanonicalAlias:
-            # Validate a newly added alias or newly added alt_aliases.
+                original_event_id = event.unsigned.get("replaces_state")
+                if original_event_id:
+                    original_alias_event = await self.store.get_event(original_event_id)
 
-            original_alias = None
-            original_alt_aliases: object = []
+                    if original_alias_event:
+                        original_alias = original_alias_event.content.get("alias", None)
+                        original_alt_aliases = original_alias_event.content.get(
+                            "alt_aliases", []
+                        )
 
-            original_event_id = event.unsigned.get("replaces_state")
-            if original_event_id:
-                original_event = await self.store.get_event(original_event_id)
+                # Check the alias is currently valid (if it has changed).
+                room_alias_str = event.content.get("alias", None)
+                directory_handler = self.hs.get_directory_handler()
+                if room_alias_str and room_alias_str != original_alias:
+                    await self._validate_canonical_alias(
+                        directory_handler, room_alias_str, event.room_id
+                    )
 
-                if original_event:
-                    original_alias = original_event.content.get("alias", None)
-                    original_alt_aliases = original_event.content.get("alt_aliases", [])
-
-            # Check the alias is currently valid (if it has changed).
-            room_alias_str = event.content.get("alias", None)
-            directory_handler = self.hs.get_directory_handler()
-            if room_alias_str and room_alias_str != original_alias:
-                await self._validate_canonical_alias(
-                    directory_handler, room_alias_str, event.room_id
-                )
+                # Check that alt_aliases is the proper form.
+                alt_aliases = event.content.get("alt_aliases", [])
+                if not isinstance(alt_aliases, (list, tuple)):
+                    raise SynapseError(
+                        400,
+                        "The alt_aliases property must be a list.",
+                        Codes.INVALID_PARAM,
+                    )
 
-            # Check that alt_aliases is the proper form.
-            alt_aliases = event.content.get("alt_aliases", [])
-            if not isinstance(alt_aliases, (list, tuple)):
-                raise SynapseError(
-                    400, "The alt_aliases property must be a list.", Codes.INVALID_PARAM
-                )
+                # If the old version of alt_aliases is of an unknown form,
+                # completely replace it.
+                if not isinstance(original_alt_aliases, (list, tuple)):
+                    # TODO: check that the original_alt_aliases' entries are all strings
+                    original_alt_aliases = []
+
+                # Check that each alias is currently valid.
+                new_alt_aliases = set(alt_aliases) - set(original_alt_aliases)
+                if new_alt_aliases:
+                    for alias_str in new_alt_aliases:
+                        await self._validate_canonical_alias(
+                            directory_handler, alias_str, event.room_id
+                        )
 
-            # If the old version of alt_aliases is of an unknown form,
-            # completely replace it.
-            if not isinstance(original_alt_aliases, (list, tuple)):
-                # TODO: check that the original_alt_aliases' entries are all strings
-                original_alt_aliases = []
+            federation_handler = self.hs.get_federation_handler()
 
-            # Check that each alias is currently valid.
-            new_alt_aliases = set(alt_aliases) - set(original_alt_aliases)
-            if new_alt_aliases:
-                for alias_str in new_alt_aliases:
-                    await self._validate_canonical_alias(
-                        directory_handler, alias_str, event.room_id
+            if event.type == EventTypes.Member:
+                if event.content["membership"] == Membership.INVITE:
+                    event.unsigned[
+                        "invite_room_state"
+                    ] = await self.store.get_stripped_room_state_from_event_context(
+                        context,
+                        self.room_prejoin_state_types,
+                        membership_user_id=event.sender,
                     )
 
-        federation_handler = self.hs.get_federation_handler()
+                    invitee = UserID.from_string(event.state_key)
+                    if not self.hs.is_mine(invitee):
+                        # TODO: Can we add signature from remote server in a nicer
+                        # way? If we have been invited by a remote server, we need
+                        # to get them to sign the event.
 
-        if event.type == EventTypes.Member:
-            if event.content["membership"] == Membership.INVITE:
-                event.unsigned[
-                    "invite_room_state"
-                ] = await self.store.get_stripped_room_state_from_event_context(
-                    context,
-                    self.room_prejoin_state_types,
-                    membership_user_id=event.sender,
-                )
+                        returned_invite = await federation_handler.send_invite(
+                            invitee.domain, event
+                        )
+                        event.unsigned.pop("room_state", None)
 
-                invitee = UserID.from_string(event.state_key)
-                if not self.hs.is_mine(invitee):
-                    # TODO: Can we add signature from remote server in a nicer
-                    # way? If we have been invited by a remote server, we need
-                    # to get them to sign the event.
+                        # TODO: Make sure the signatures actually are correct.
+                        event.signatures.update(returned_invite.signatures)
 
-                    returned_invite = await federation_handler.send_invite(
-                        invitee.domain, event
+                if event.content["membership"] == Membership.KNOCK:
+                    event.unsigned[
+                        "knock_room_state"
+                    ] = await self.store.get_stripped_room_state_from_event_context(
+                        context,
+                        self.room_prejoin_state_types,
                     )
-                    event.unsigned.pop("room_state", None)
 
-                    # TODO: Make sure the signatures actually are correct.
-                    event.signatures.update(returned_invite.signatures)
+            if event.type == EventTypes.Redaction:
+                assert event.redacts is not None
 
-            if event.content["membership"] == Membership.KNOCK:
-                event.unsigned[
-                    "knock_room_state"
-                ] = await self.store.get_stripped_room_state_from_event_context(
-                    context,
-                    self.room_prejoin_state_types,
+                original_event = await self.store.get_event(
+                    event.redacts,
+                    redact_behaviour=EventRedactBehaviour.as_is,
+                    get_prev_content=False,
+                    allow_rejected=False,
+                    allow_none=True,
                 )
 
-        if event.type == EventTypes.Redaction:
-            assert event.redacts is not None
+                room_version = await self.store.get_room_version_id(event.room_id)
+                room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
 
-            original_event = await self.store.get_event(
-                event.redacts,
-                redact_behaviour=EventRedactBehaviour.as_is,
-                get_prev_content=False,
-                allow_rejected=False,
-                allow_none=True,
-            )
+                # we can make some additional checks now if we have the original event.
+                if original_event:
+                    if original_event.type == EventTypes.Create:
+                        raise AuthError(403, "Redacting create events is not permitted")
 
-            room_version = await self.store.get_room_version_id(event.room_id)
-            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-            # we can make some additional checks now if we have the original event.
-            if original_event:
-                if original_event.type == EventTypes.Create:
-                    raise AuthError(403, "Redacting create events is not permitted")
-
-                if original_event.room_id != event.room_id:
-                    raise SynapseError(400, "Cannot redact event from a different room")
-
-                if original_event.type == EventTypes.ServerACL:
-                    raise AuthError(403, "Redacting server ACL events is not permitted")
-
-                # Add a little safety stop-gap to prevent people from trying to
-                # redact MSC2716 related events when they're in a room version
-                # which does not support it yet. We allow people to use MSC2716
-                # events in existing room versions but only from the room
-                # creator since it does not require any changes to the auth
-                # rules and in effect, the redaction algorithm . In the
-                # supported room version, we add the `historical` power level to
-                # auth the MSC2716 related events and adjust the redaction
-                # algorthim to keep the `historical` field around (redacting an
-                # event should only strip fields which don't affect the
-                # structural protocol level).
-                is_msc2716_event = (
-                    original_event.type == EventTypes.MSC2716_INSERTION
-                    or original_event.type == EventTypes.MSC2716_BATCH
-                    or original_event.type == EventTypes.MSC2716_MARKER
-                )
-                if not room_version_obj.msc2716_historical and is_msc2716_event:
-                    raise AuthError(
-                        403,
-                        "Redacting MSC2716 events is not supported in this room version",
-                    )
+                    if original_event.room_id != event.room_id:
+                        raise SynapseError(
+                            400, "Cannot redact event from a different room"
+                        )
 
-            event_types = event_auth.auth_types_for_event(event.room_version, event)
-            prev_state_ids = await context.get_prev_state_ids(
-                StateFilter.from_types(event_types)
-            )
+                    if original_event.type == EventTypes.ServerACL:
+                        raise AuthError(
+                            403, "Redacting server ACL events is not permitted"
+                        )
 
-            auth_events_ids = self._event_auth_handler.compute_auth_events(
-                event, prev_state_ids, for_verification=True
-            )
-            auth_events_map = await self.store.get_events(auth_events_ids)
-            auth_events = {(e.type, e.state_key): e for e in auth_events_map.values()}
+                    # Add a little safety stop-gap to prevent people from trying to
+                    # redact MSC2716 related events when they're in a room version
+                    # which does not support it yet. We allow people to use MSC2716
+                    # events in existing room versions but only from the room
+                    # creator since it does not require any changes to the auth
+                    # rules and in effect, the redaction algorithm . In the
+                    # supported room version, we add the `historical` power level to
+                    # auth the MSC2716 related events and adjust the redaction
+                    # algorthim to keep the `historical` field around (redacting an
+                    # event should only strip fields which don't affect the
+                    # structural protocol level).
+                    is_msc2716_event = (
+                        original_event.type == EventTypes.MSC2716_INSERTION
+                        or original_event.type == EventTypes.MSC2716_BATCH
+                        or original_event.type == EventTypes.MSC2716_MARKER
+                    )
+                    if not room_version_obj.msc2716_historical and is_msc2716_event:
+                        raise AuthError(
+                            403,
+                            "Redacting MSC2716 events is not supported in this room version",
+                        )
 
-            if event_auth.check_redaction(
-                room_version_obj, event, auth_events=auth_events
-            ):
-                # this user doesn't have 'redact' rights, so we need to do some more
-                # checks on the original event. Let's start by checking the original
-                # event exists.
-                if not original_event:
-                    raise NotFoundError("Could not find event %s" % (event.redacts,))
-
-                if event.user_id != original_event.user_id:
-                    raise AuthError(403, "You don't have permission to redact events")
-
-                # all the checks are done.
-                event.internal_metadata.recheck_redaction = False
-
-        if event.type == EventTypes.Create:
-            prev_state_ids = await context.get_prev_state_ids()
-            if prev_state_ids:
-                raise AuthError(403, "Changing the room create event is forbidden")
-
-        if event.type == EventTypes.MSC2716_INSERTION:
-            room_version = await self.store.get_room_version_id(event.room_id)
-            room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
-
-            create_event = await self.store.get_create_event_for_room(event.room_id)
-            room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
-
-            # Only check an insertion event if the room version
-            # supports it or the event is from the room creator.
-            if room_version_obj.msc2716_historical or (
-                self.config.experimental.msc2716_enabled
-                and event.sender == room_creator
-            ):
-                next_batch_id = event.content.get(
-                    EventContentFields.MSC2716_NEXT_BATCH_ID
+                event_types = event_auth.auth_types_for_event(event.room_version, event)
+                prev_state_ids = await context.get_prev_state_ids(
+                    StateFilter.from_types(event_types)
                 )
-                conflicting_insertion_event_id = None
-                if next_batch_id:
-                    conflicting_insertion_event_id = (
-                        await self.store.get_insertion_event_id_by_batch_id(
-                            event.room_id, next_batch_id
+
+                auth_events_ids = self._event_auth_handler.compute_auth_events(
+                    event, prev_state_ids, for_verification=True
+                )
+                auth_events_map = await self.store.get_events(auth_events_ids)
+                auth_events = {
+                    (e.type, e.state_key): e for e in auth_events_map.values()
+                }
+
+                if event_auth.check_redaction(
+                    room_version_obj, event, auth_events=auth_events
+                ):
+                    # this user doesn't have 'redact' rights, so we need to do some more
+                    # checks on the original event. Let's start by checking the original
+                    # event exists.
+                    if not original_event:
+                        raise NotFoundError(
+                            "Could not find event %s" % (event.redacts,)
                         )
+
+                    if event.user_id != original_event.user_id:
+                        raise AuthError(
+                            403, "You don't have permission to redact events"
+                        )
+
+                    # all the checks are done.
+                    event.internal_metadata.recheck_redaction = False
+
+            if event.type == EventTypes.Create:
+                prev_state_ids = await context.get_prev_state_ids()
+                if prev_state_ids:
+                    raise AuthError(403, "Changing the room create event is forbidden")
+
+            if event.type == EventTypes.MSC2716_INSERTION:
+                room_version = await self.store.get_room_version_id(event.room_id)
+                room_version_obj = KNOWN_ROOM_VERSIONS[room_version]
+
+                create_event = await self.store.get_create_event_for_room(event.room_id)
+                room_creator = create_event.content.get(EventContentFields.ROOM_CREATOR)
+
+                # Only check an insertion event if the room version
+                # supports it or the event is from the room creator.
+                if room_version_obj.msc2716_historical or (
+                    self.config.experimental.msc2716_enabled
+                    and event.sender == room_creator
+                ):
+                    next_batch_id = event.content.get(
+                        EventContentFields.MSC2716_NEXT_BATCH_ID
                     )
-                if conflicting_insertion_event_id is not None:
-                    # The current insertion event that we're processing is invalid
-                    # because an insertion event already exists in the room with the
-                    # same next_batch_id. We can't allow multiple because the batch
-                    # pointing will get weird, e.g. we can't determine which insertion
-                    # event the batch event is pointing to.
-                    raise SynapseError(
-                        HTTPStatus.BAD_REQUEST,
-                        "Another insertion event already exists with the same next_batch_id",
-                        errcode=Codes.INVALID_PARAM,
-                    )
+                    conflicting_insertion_event_id = None
+                    if next_batch_id:
+                        conflicting_insertion_event_id = (
+                            await self.store.get_insertion_event_id_by_batch_id(
+                                event.room_id, next_batch_id
+                            )
+                        )
+                    if conflicting_insertion_event_id is not None:
+                        # The current insertion event that we're processing is invalid
+                        # because an insertion event already exists in the room with the
+                        # same next_batch_id. We can't allow multiple because the batch
+                        # pointing will get weird, e.g. we can't determine which insertion
+                        # event the batch event is pointing to.
+                        raise SynapseError(
+                            HTTPStatus.BAD_REQUEST,
+                            "Another insertion event already exists with the same next_batch_id",
+                            errcode=Codes.INVALID_PARAM,
+                        )
 
-        # Mark any `m.historical` messages as backfilled so they don't appear
-        # in `/sync` and have the proper decrementing `stream_ordering` as we import
-        backfilled = False
-        if event.internal_metadata.is_historical():
-            backfilled = True
+            # Mark any `m.historical` messages as backfilled so they don't appear
+            # in `/sync` and have the proper decrementing `stream_ordering` as we import
+            backfilled = False
+            if event.internal_metadata.is_historical():
+                backfilled = True
 
-        # Note that this returns the event that was persisted, which may not be
-        # the same as we passed in if it was deduplicated due transaction IDs.
+        assert self._storage_controllers.persistence is not None
         (
-            event,
-            event_pos,
+            persisted_events,
             max_stream_token,
-        ) = await self._storage_controllers.persistence.persist_event(
-            event, context=context, backfilled=backfilled
+        ) = await self._storage_controllers.persistence.persist_events(
+            events_and_context, backfilled=backfilled
         )
 
-        if self._ephemeral_events_enabled:
-            # If there's an expiry timestamp on the event, schedule its expiry.
-            self._message_handler.maybe_schedule_expiry(event)
+        for event in persisted_events:
+            if self._ephemeral_events_enabled:
+                # If there's an expiry timestamp on the event, schedule its expiry.
+                self._message_handler.maybe_schedule_expiry(event)
 
-        async def _notify() -> None:
-            try:
-                await self.notifier.on_new_room_event(
-                    event, event_pos, max_stream_token, extra_users=extra_users
-                )
-            except Exception:
-                logger.exception(
-                    "Error notifying about new room event %s",
-                    event.event_id,
-                )
+            stream_ordering = event.internal_metadata.stream_ordering
+            assert stream_ordering is not None
+            pos = PersistedEventPosition(self._instance_name, stream_ordering)
+
+            async def _notify() -> None:
+                try:
+                    await self.notifier.on_new_room_event(
+                        event, pos, max_stream_token, extra_users=extra_users
+                    )
+                except Exception:
+                    logger.exception(
+                        "Error notifying about new room event %s",
+                        event.event_id,
+                    )
 
-        run_in_background(_notify)
+            run_in_background(_notify)
 
-        if event.type == EventTypes.Message:
-            # We don't want to block sending messages on any presence code. This
-            # matters as sometimes presence code can take a while.
-            run_in_background(self._bump_active_time, requester.user)
+            if event.type == EventTypes.Message:
+                # We don't want to block sending messages on any presence code. This
+                # matters as sometimes presence code can take a while.
+                run_in_background(self._bump_active_time, requester.user)
 
-        return event
+        return persisted_events[-1]
 
     async def _maybe_kick_guest_users(
         self, event: EventBase, context: EventContext
@@ -1952,8 +1988,7 @@ class EventCreationHandler:
                 # shadow-banned user.
                 await self.handle_new_client_event(
                     requester,
-                    event,
-                    context,
+                    events_and_context=[(event, context)],
                     ratelimit=False,
                     ignore_shadow_ban=True,
                 )