summary refs log tree commit diff
diff options
context:
space:
mode:
Diffstat (limited to '')
-rw-r--r--changelog.d/14665.misc1
-rw-r--r--synapse/handlers/federation.py117
-rw-r--r--synapse/handlers/message.py202
-rw-r--r--synapse/handlers/room.py95
-rw-r--r--synapse/handlers/room_batch.py2
-rw-r--r--synapse/handlers/room_member.py168
-rw-r--r--synapse/util/caches/response_cache.py14
7 files changed, 360 insertions, 239 deletions
diff --git a/changelog.d/14665.misc b/changelog.d/14665.misc
new file mode 100644
index 0000000000..2b7c96143d
--- /dev/null
+++ b/changelog.d/14665.misc
@@ -0,0 +1 @@
+Change `handle_new_client_event` signature so that a 429 does not reach clients on `PartialStateConflictError`, and internally retry when needed instead.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b2784d7333..eca75f1108 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1343,32 +1343,53 @@ class FederationHandler:
             )
 
             EventValidator().validate_builder(builder)
-            event, context = await self.event_creation_handler.create_new_client_event(
-                builder=builder
-            )
 
-            event, context = await self.add_display_name_to_third_party_invite(
-                room_version_obj, event_dict, event, context
-            )
+            # Try several times, it could fail with PartialStateConflictError
+            # in send_membership_event, cf comment in except block.
+            max_retries = 5
+            for i in range(max_retries):
+                try:
+                    (
+                        event,
+                        context,
+                    ) = await self.event_creation_handler.create_new_client_event(
+                        builder=builder
+                    )
 
-            EventValidator().validate_new(event, self.config)
+                    event, context = await self.add_display_name_to_third_party_invite(
+                        room_version_obj, event_dict, event, context
+                    )
 
-            # We need to tell the transaction queue to send this out, even
-            # though the sender isn't a local user.
-            event.internal_metadata.send_on_behalf_of = self.hs.hostname
+                    EventValidator().validate_new(event, self.config)
 
-            try:
-                validate_event_for_room_version(event)
-                await self._event_auth_handler.check_auth_rules_from_context(event)
-            except AuthError as e:
-                logger.warning("Denying new third party invite %r because %s", event, e)
-                raise e
+                    # We need to tell the transaction queue to send this out, even
+                    # though the sender isn't a local user.
+                    event.internal_metadata.send_on_behalf_of = self.hs.hostname
 
-            await self._check_signature(event, context)
+                    try:
+                        validate_event_for_room_version(event)
+                        await self._event_auth_handler.check_auth_rules_from_context(
+                            event
+                        )
+                    except AuthError as e:
+                        logger.warning(
+                            "Denying new third party invite %r because %s", event, e
+                        )
+                        raise e
 
-            # We retrieve the room member handler here as to not cause a cyclic dependency
-            member_handler = self.hs.get_room_member_handler()
-            await member_handler.send_membership_event(None, event, context)
+                    await self._check_signature(event, context)
+
+                    # We retrieve the room member handler here as to not cause a cyclic dependency
+                    member_handler = self.hs.get_room_member_handler()
+                    await member_handler.send_membership_event(None, event, context)
+
+                    break
+                except PartialStateConflictError as e:
+                    # Persisting couldn't happen because the room got un-partial stated
+                    # in the meantime and context needs to be recomputed, so let's do so.
+                    if i == max_retries - 1:
+                        raise e
+                    pass
         else:
             destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}
 
@@ -1400,28 +1421,46 @@ class FederationHandler:
             room_version_obj, event_dict
         )
 
-        event, context = await self.event_creation_handler.create_new_client_event(
-            builder=builder
-        )
-        event, context = await self.add_display_name_to_third_party_invite(
-            room_version_obj, event_dict, event, context
-        )
+        # Try several times, it could fail with PartialStateConflictError
+        # in send_membership_event, cf comment in except block.
+        max_retries = 5
+        for i in range(max_retries):
+            try:
+                (
+                    event,
+                    context,
+                ) = await self.event_creation_handler.create_new_client_event(
+                    builder=builder
+                )
+                event, context = await self.add_display_name_to_third_party_invite(
+                    room_version_obj, event_dict, event, context
+                )
 
-        try:
-            validate_event_for_room_version(event)
-            await self._event_auth_handler.check_auth_rules_from_context(event)
-        except AuthError as e:
-            logger.warning("Denying third party invite %r because %s", event, e)
-            raise e
-        await self._check_signature(event, context)
+                try:
+                    validate_event_for_room_version(event)
+                    await self._event_auth_handler.check_auth_rules_from_context(event)
+                except AuthError as e:
+                    logger.warning("Denying third party invite %r because %s", event, e)
+                    raise e
+                await self._check_signature(event, context)
+
+                # We need to tell the transaction queue to send this out, even
+                # though the sender isn't a local user.
+                event.internal_metadata.send_on_behalf_of = get_domain_from_id(
+                    event.sender
+                )
 
-        # We need to tell the transaction queue to send this out, even
-        # though the sender isn't a local user.
-        event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
+                # We retrieve the room member handler here as to not cause a cyclic dependency
+                member_handler = self.hs.get_room_member_handler()
+                await member_handler.send_membership_event(None, event, context)
 
-        # We retrieve the room member handler here as to not cause a cyclic dependency
-        member_handler = self.hs.get_room_member_handler()
-        await member_handler.send_membership_event(None, event, context)
+                break
+            except PartialStateConflictError as e:
+                # Persisting couldn't happen because the room got un-partial stated
+                # in the meantime and context needs to be recomputed, so let's do so.
+                if i == max_retries - 1:
+                    raise e
+                pass
 
     async def add_display_name_to_third_party_invite(
         self,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 845f683358..88fc51a4c9 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -37,7 +37,6 @@ from synapse.api.errors import (
     AuthError,
     Codes,
     ConsentNotGivenError,
-    LimitExceededError,
     NotFoundError,
     ShadowBanError,
     SynapseError,
@@ -999,60 +998,73 @@ class EventCreationHandler:
                         event.internal_metadata.stream_ordering,
                     )
 
-            event, context = await self.create_event(
-                requester,
-                event_dict,
-                txn_id=txn_id,
-                allow_no_prev_events=allow_no_prev_events,
-                prev_event_ids=prev_event_ids,
-                state_event_ids=state_event_ids,
-                outlier=outlier,
-                historical=historical,
-                depth=depth,
-            )
+        # Try several times, it could fail with PartialStateConflictError
+        # in handle_new_client_event, cf comment in except block.
+        max_retries = 5
+        for i in range(max_retries):
+            try:
+                event, context = await self.create_event(
+                    requester,
+                    event_dict,
+                    txn_id=txn_id,
+                    allow_no_prev_events=allow_no_prev_events,
+                    prev_event_ids=prev_event_ids,
+                    state_event_ids=state_event_ids,
+                    outlier=outlier,
+                    historical=historical,
+                    depth=depth,
+                )
 
-            assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
-                event.sender,
-            )
+                assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+                    event.sender,
+                )
 
-            spam_check_result = await self.spam_checker.check_event_for_spam(event)
-            if spam_check_result != self.spam_checker.NOT_SPAM:
-                if isinstance(spam_check_result, tuple):
-                    try:
-                        [code, dict] = spam_check_result
-                        raise SynapseError(
-                            403,
-                            "This message had been rejected as probable spam",
-                            code,
-                            dict,
-                        )
-                    except ValueError:
-                        logger.error(
-                            "Spam-check module returned invalid error value. Expecting [code, dict], got %s",
-                            spam_check_result,
-                        )
+                spam_check_result = await self.spam_checker.check_event_for_spam(event)
+                if spam_check_result != self.spam_checker.NOT_SPAM:
+                    if isinstance(spam_check_result, tuple):
+                        try:
+                            [code, dict] = spam_check_result
+                            raise SynapseError(
+                                403,
+                                "This message had been rejected as probable spam",
+                                code,
+                                dict,
+                            )
+                        except ValueError:
+                            logger.error(
+                                "Spam-check module returned invalid error value. Expecting [code, dict], got %s",
+                                spam_check_result,
+                            )
 
-                        raise SynapseError(
-                            403,
-                            "This message has been rejected as probable spam",
-                            Codes.FORBIDDEN,
-                        )
+                            raise SynapseError(
+                                403,
+                                "This message has been rejected as probable spam",
+                                Codes.FORBIDDEN,
+                            )
 
-                # Backwards compatibility: if the return value is not an error code, it
-                # means the module returned an error message to be included in the
-                # SynapseError (which is now deprecated).
-                raise SynapseError(
-                    403,
-                    spam_check_result,
-                    Codes.FORBIDDEN,
+                    # Backwards compatibility: if the return value is not an error code, it
+                    # means the module returned an error message to be included in the
+                    # SynapseError (which is now deprecated).
+                    raise SynapseError(
+                        403,
+                        spam_check_result,
+                        Codes.FORBIDDEN,
+                    )
+
+                ev = await self.handle_new_client_event(
+                    requester=requester,
+                    events_and_context=[(event, context)],
+                    ratelimit=ratelimit,
+                    ignore_shadow_ban=ignore_shadow_ban,
                 )
 
-            ev = await self.handle_new_client_event(
-                requester=requester,
-                events_and_context=[(event, context)],
-                ratelimit=ratelimit,
-                ignore_shadow_ban=ignore_shadow_ban,
-            )
+                break
+            except PartialStateConflictError as e:
+                # Persisting couldn't happen because the room got un-partial stated
+                # in the meantime and context needs to be recomputed, so let's do so.
+                if i == max_retries - 1:
+                    raise e
+                pass
 
         # we know it was persisted, so must have a stream ordering
         assert ev.internal_metadata.stream_ordering
@@ -1356,7 +1368,7 @@ class EventCreationHandler:
 
         Raises:
             ShadowBanError if the requester has been shadow-banned.
-            SynapseError(503) if attempting to persist a partial state event in
+            PartialStateConflictError if attempting to persist a partial state event in
                 a room that has been un-partial stated.
         """
         extra_users = extra_users or []
@@ -1418,34 +1430,23 @@ class EventCreationHandler:
         # We now persist the event (and update the cache in parallel, since we
         # don't want to block on it).
         event, context = events_and_context[0]
-        try:
-            result, _ = await make_deferred_yieldable(
-                gather_results(
-                    (
-                        run_in_background(
-                            self._persist_events,
-                            requester=requester,
-                            events_and_context=events_and_context,
-                            ratelimit=ratelimit,
-                            extra_users=extra_users,
-                        ),
-                        run_in_background(
-                            self.cache_joined_hosts_for_events, events_and_context
-                        ).addErrback(
-                            log_failure, "cache_joined_hosts_for_event failed"
-                        ),
+        result, _ = await make_deferred_yieldable(
+            gather_results(
+                (
+                    run_in_background(
+                        self._persist_events,
+                        requester=requester,
+                        events_and_context=events_and_context,
+                        ratelimit=ratelimit,
+                        extra_users=extra_users,
                     ),
-                    consumeErrors=True,
-                )
-            ).addErrback(unwrapFirstError)
-        except PartialStateConflictError as e:
-            # The event context needs to be recomputed.
-            # Turn the error into a 429, as a hint to the client to try again.
-            logger.info(
-                "Room %s was un-partial stated while persisting client event.",
-                event.room_id,
+                    run_in_background(
+                        self.cache_joined_hosts_for_events, events_and_context
+                    ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
+                ),
+                consumeErrors=True,
             )
-            raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
+        ).addErrback(unwrapFirstError)
 
         return result
 
@@ -2012,26 +2013,39 @@ class EventCreationHandler:
         for user_id in members:
             requester = create_requester(user_id, authenticated_entity=self.server_name)
             try:
-                event, context = await self.create_event(
-                    requester,
-                    {
-                        "type": EventTypes.Dummy,
-                        "content": {},
-                        "room_id": room_id,
-                        "sender": user_id,
-                    },
-                )
+                # Try several times, it could fail with PartialStateConflictError
+                # in handle_new_client_event, cf comment in except block.
+                max_retries = 5
+                for i in range(max_retries):
+                    try:
+                        event, context = await self.create_event(
+                            requester,
+                            {
+                                "type": EventTypes.Dummy,
+                                "content": {},
+                                "room_id": room_id,
+                                "sender": user_id,
+                            },
+                        )
 
-                event.internal_metadata.proactively_send = False
+                        event.internal_metadata.proactively_send = False
 
-                # Since this is a dummy-event it is OK if it is sent by a
-                # shadow-banned user.
-                await self.handle_new_client_event(
-                    requester,
-                    events_and_context=[(event, context)],
-                    ratelimit=False,
-                    ignore_shadow_ban=True,
-                )
+                        # Since this is a dummy-event it is OK if it is sent by a
+                        # shadow-banned user.
+                        await self.handle_new_client_event(
+                            requester,
+                            events_and_context=[(event, context)],
+                            ratelimit=False,
+                            ignore_shadow_ban=True,
+                        )
+
+                        break
+                    except PartialStateConflictError as e:
+                        # Persisting couldn't happen because the room got un-partial stated
+                        # in the meantime and context needs to be recomputed, so let's do so.
+                        if i == max_retries - 1:
+                            raise e
+                        pass
                 return True
             except AuthError:
                 logger.info(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f81241c2b3..572c7b4db3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -62,6 +62,7 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
 from synapse.handlers.relations import BundledAggregations
 from synapse.module_api import NOT_SPAM
 from synapse.rest.admin._base import assert_user_is_admin
+from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.streams import EventSource
 from synapse.types import (
     JsonDict,
@@ -207,46 +208,64 @@ class RoomCreationHandler:
 
         new_room_id = self._generate_room_id()
 
-        # Check whether the user has the power level to carry out the upgrade.
-        # `check_auth_rules_from_context` will check that they are in the room and have
-        # the required power level to send the tombstone event.
-        (
-            tombstone_event,
-            tombstone_context,
-        ) = await self.event_creation_handler.create_event(
-            requester,
-            {
-                "type": EventTypes.Tombstone,
-                "state_key": "",
-                "room_id": old_room_id,
-                "sender": user_id,
-                "content": {
-                    "body": "This room has been replaced",
-                    "replacement_room": new_room_id,
-                },
-            },
-        )
-        validate_event_for_room_version(tombstone_event)
-        await self._event_auth_handler.check_auth_rules_from_context(tombstone_event)
+        # Try several times, it could fail with PartialStateConflictError
+        # in _upgrade_room, cf comment in except block.
+        max_retries = 5
+        for i in range(max_retries):
+            try:
+                # Check whether the user has the power level to carry out the upgrade.
+                # `check_auth_rules_from_context` will check that they are in the room and have
+                # the required power level to send the tombstone event.
+                (
+                    tombstone_event,
+                    tombstone_context,
+                ) = await self.event_creation_handler.create_event(
+                    requester,
+                    {
+                        "type": EventTypes.Tombstone,
+                        "state_key": "",
+                        "room_id": old_room_id,
+                        "sender": user_id,
+                        "content": {
+                            "body": "This room has been replaced",
+                            "replacement_room": new_room_id,
+                        },
+                    },
+                )
+                validate_event_for_room_version(tombstone_event)
+                await self._event_auth_handler.check_auth_rules_from_context(
+                    tombstone_event
+                )
 
-        # Upgrade the room
-        #
-        # If this user has sent multiple upgrade requests for the same room
-        # and one of them is not complete yet, cache the response and
-        # return it to all subsequent requests
-        ret = await self._upgrade_response_cache.wrap(
-            (old_room_id, user_id),
-            self._upgrade_room,
-            requester,
-            old_room_id,
-            old_room,  # args for _upgrade_room
-            new_room_id,
-            new_version,
-            tombstone_event,
-            tombstone_context,
-        )
+                # Upgrade the room
+                #
+                # If this user has sent multiple upgrade requests for the same room
+                # and one of them is not complete yet, cache the response and
+                # return it to all subsequent requests
+                ret = await self._upgrade_response_cache.wrap(
+                    (old_room_id, user_id),
+                    self._upgrade_room,
+                    requester,
+                    old_room_id,
+                    old_room,  # args for _upgrade_room
+                    new_room_id,
+                    new_version,
+                    tombstone_event,
+                    tombstone_context,
+                )
 
-        return ret
+                return ret
+            except PartialStateConflictError as e:
+                # Clean up the cache so we can retry properly
+                self._upgrade_response_cache.unset((old_room_id, user_id))
+                # Persisting couldn't happen because the room got un-partial stated
+                # in the meantime and context needs to be recomputed, so let's do so.
+                if i == max_retries - 1:
+                    raise e
+                pass
+
+        # This is to satisfy mypy and should never happen
+        raise PartialStateConflictError()
 
     async def _upgrade_room(
         self,
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 411a6fb22f..c73d2adaad 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -375,6 +375,8 @@ class RoomBatchHandler:
         # Events are sorted by (topological_ordering, stream_ordering)
         # where topological_ordering is just depth.
         for (event, context) in reversed(events_to_persist):
+            # This call can't raise `PartialStateConflictError` since we forbid
+            # use of the historical batch API during partial state
             await self.event_creation_handler.handle_new_client_event(
                 await self.create_requester_for_user_id_from_app_service(
                     event.sender, app_service_requester.app_service
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0c39e852a1..d236cc09b5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -34,6 +34,7 @@ from synapse.events.snapshot import EventContext
 from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
 from synapse.logging import opentracing
 from synapse.module_api import NOT_SPAM
+from synapse.storage.databases.main.events import PartialStateConflictError
 from synapse.types import (
     JsonDict,
     Requester,
@@ -392,60 +393,81 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
                 event_pos = await self.store.get_position_for_event(existing_event_id)
                 return existing_event_id, event_pos.stream
 
-        event, context = await self.event_creation_handler.create_event(
-            requester,
-            {
-                "type": EventTypes.Member,
-                "content": content,
-                "room_id": room_id,
-                "sender": requester.user.to_string(),
-                "state_key": user_id,
-                # For backwards compatibility:
-                "membership": membership,
-                "origin_server_ts": origin_server_ts,
-            },
-            txn_id=txn_id,
-            allow_no_prev_events=allow_no_prev_events,
-            prev_event_ids=prev_event_ids,
-            state_event_ids=state_event_ids,
-            depth=depth,
-            require_consent=require_consent,
-            outlier=outlier,
-            historical=historical,
-        )
-
-        prev_state_ids = await context.get_prev_state_ids(
-            StateFilter.from_types([(EventTypes.Member, None)])
-        )
+        # Try several times, it could fail with PartialStateConflictError,
+        # in handle_new_client_event, cf comment in except block.
+        max_retries = 5
+        for i in range(max_retries):
+            try:
+                event, context = await self.event_creation_handler.create_event(
+                    requester,
+                    {
+                        "type": EventTypes.Member,
+                        "content": content,
+                        "room_id": room_id,
+                        "sender": requester.user.to_string(),
+                        "state_key": user_id,
+                        # For backwards compatibility:
+                        "membership": membership,
+                        "origin_server_ts": origin_server_ts,
+                    },
+                    txn_id=txn_id,
+                    allow_no_prev_events=allow_no_prev_events,
+                    prev_event_ids=prev_event_ids,
+                    state_event_ids=state_event_ids,
+                    depth=depth,
+                    require_consent=require_consent,
+                    outlier=outlier,
+                    historical=historical,
+                )
 
-        prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+                prev_state_ids = await context.get_prev_state_ids(
+                    StateFilter.from_types([(EventTypes.Member, None)])
+                )
 
-        if event.membership == Membership.JOIN:
-            newly_joined = True
-            if prev_member_event_id:
-                prev_member_event = await self.store.get_event(prev_member_event_id)
-                newly_joined = prev_member_event.membership != Membership.JOIN
-
-            # Only rate-limit if the user actually joined the room, otherwise we'll end
-            # up blocking profile updates.
-            if newly_joined and ratelimit:
-                await self._join_rate_limiter_local.ratelimit(requester)
-                await self._join_rate_per_room_limiter.ratelimit(
-                    requester, key=room_id, update=False
+                prev_member_event_id = prev_state_ids.get(
+                    (EventTypes.Member, user_id), None
                 )
-        with opentracing.start_active_span("handle_new_client_event"):
-            result_event = await self.event_creation_handler.handle_new_client_event(
-                requester,
-                events_and_context=[(event, context)],
-                extra_users=[target],
-                ratelimit=ratelimit,
-            )
 
-        if event.membership == Membership.LEAVE:
-            if prev_member_event_id:
-                prev_member_event = await self.store.get_event(prev_member_event_id)
-                if prev_member_event.membership == Membership.JOIN:
-                    await self._user_left_room(target, room_id)
+                if event.membership == Membership.JOIN:
+                    newly_joined = True
+                    if prev_member_event_id:
+                        prev_member_event = await self.store.get_event(
+                            prev_member_event_id
+                        )
+                        newly_joined = prev_member_event.membership != Membership.JOIN
+
+                    # Only rate-limit if the user actually joined the room, otherwise we'll end
+                    # up blocking profile updates.
+                    if newly_joined and ratelimit:
+                        await self._join_rate_limiter_local.ratelimit(requester)
+                        await self._join_rate_per_room_limiter.ratelimit(
+                            requester, key=room_id, update=False
+                        )
+                with opentracing.start_active_span("handle_new_client_event"):
+                    result_event = (
+                        await self.event_creation_handler.handle_new_client_event(
+                            requester,
+                            events_and_context=[(event, context)],
+                            extra_users=[target],
+                            ratelimit=ratelimit,
+                        )
+                    )
+
+                if event.membership == Membership.LEAVE:
+                    if prev_member_event_id:
+                        prev_member_event = await self.store.get_event(
+                            prev_member_event_id
+                        )
+                        if prev_member_event.membership == Membership.JOIN:
+                            await self._user_left_room(target, room_id)
+
+                break
+            except PartialStateConflictError as e:
+                # Persisting couldn't happen because the room got un-partial stated
+                # in the meantime and context needs to be recomputed, so let's do so.
+                if i == max_retries - 1:
+                    raise e
+                pass
 
         # we know it was persisted, so should have a stream ordering
         assert result_event.internal_metadata.stream_ordering
@@ -1234,6 +1256,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
             ratelimit: Whether to rate limit this request.
         Raises:
             SynapseError if there was a problem changing the membership.
+            PartialStateConflictError: if attempting to persist a partial state event in
+                a room that has been un-partial stated.
         """
         target_user = UserID.from_string(event.state_key)
         room_id = event.room_id
@@ -1863,21 +1887,37 @@ class RoomMemberMasterHandler(RoomMemberHandler):
             list(previous_membership_event.auth_event_ids()) + prev_event_ids
         )
 
-        event, context = await self.event_creation_handler.create_event(
-            requester,
-            event_dict,
-            txn_id=txn_id,
-            prev_event_ids=prev_event_ids,
-            auth_event_ids=auth_event_ids,
-            outlier=True,
-        )
-        event.internal_metadata.out_of_band_membership = True
+        # Try several times, it could fail with PartialStateConflictError
+        # in handle_new_client_event, cf comment in except block.
+        max_retries = 5
+        for i in range(max_retries):
+            try:
+                event, context = await self.event_creation_handler.create_event(
+                    requester,
+                    event_dict,
+                    txn_id=txn_id,
+                    prev_event_ids=prev_event_ids,
+                    auth_event_ids=auth_event_ids,
+                    outlier=True,
+                )
+                event.internal_metadata.out_of_band_membership = True
+
+                result_event = (
+                    await self.event_creation_handler.handle_new_client_event(
+                        requester,
+                        events_and_context=[(event, context)],
+                        extra_users=[UserID.from_string(target_user)],
+                    )
+                )
+
+                break
+            except PartialStateConflictError as e:
+                # Persisting couldn't happen because the room got un-partial stated
+                # in the meantime and context needs to be recomputed, so let's do so.
+                if i == max_retries - 1:
+                    raise e
+                pass
 
-        result_event = await self.event_creation_handler.handle_new_client_event(
-            requester,
-            events_and_context=[(event, context)],
-            extra_users=[UserID.from_string(target_user)],
-        )
         # we know it was persisted, so must have a stream ordering
         assert result_event.internal_metadata.stream_ordering
 
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index a3eb5f741b..340e5e9145 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -167,12 +167,10 @@ class ResponseCache(Generic[KV]):
             # the should_cache bit, we leave it in the cache for now and schedule
             # its removal later.
             if self.timeout_sec and context.should_cache:
-                self.clock.call_later(
-                    self.timeout_sec, self._result_cache.pop, key, None
-                )
+                self.clock.call_later(self.timeout_sec, self.unset, key)
             else:
                 # otherwise, remove the result immediately.
-                self._result_cache.pop(key, None)
+                self.unset(key)
             return r
 
         # make sure we do this *after* adding the entry to result_cache,
@@ -181,6 +179,14 @@ class ResponseCache(Generic[KV]):
         result.addBoth(on_complete)
         return entry
 
+    def unset(self, key: KV) -> None:
+        """Remove the cached value for this key from the cache, if any.
+
+        Args:
+            key: key used to remove the cached value
+        """
+        self._result_cache.pop(key, None)
+
     async def wrap(
         self,
         key: KV,