diff --git a/changelog.d/14665.misc b/changelog.d/14665.misc
new file mode 100644
index 0000000000..2b7c96143d
--- /dev/null
+++ b/changelog.d/14665.misc
@@ -0,0 +1 @@
+Change `handle_new_client_event` signature so that a 429 does not reach clients on `PartialStateConflictError`, and internally retry when needed instead.
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index b2784d7333..eca75f1108 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -1343,32 +1343,53 @@ class FederationHandler:
)
EventValidator().validate_builder(builder)
- event, context = await self.event_creation_handler.create_new_client_event(
- builder=builder
- )
- event, context = await self.add_display_name_to_third_party_invite(
- room_version_obj, event_dict, event, context
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in send_membership_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ (
+ event,
+ context,
+ ) = await self.event_creation_handler.create_new_client_event(
+ builder=builder
+ )
- EventValidator().validate_new(event, self.config)
+ event, context = await self.add_display_name_to_third_party_invite(
+ room_version_obj, event_dict, event, context
+ )
- # We need to tell the transaction queue to send this out, even
- # though the sender isn't a local user.
- event.internal_metadata.send_on_behalf_of = self.hs.hostname
+ EventValidator().validate_new(event, self.config)
- try:
- validate_event_for_room_version(event)
- await self._event_auth_handler.check_auth_rules_from_context(event)
- except AuthError as e:
- logger.warning("Denying new third party invite %r because %s", event, e)
- raise e
+ # We need to tell the transaction queue to send this out, even
+ # though the sender isn't a local user.
+ event.internal_metadata.send_on_behalf_of = self.hs.hostname
- await self._check_signature(event, context)
+ try:
+ validate_event_for_room_version(event)
+ await self._event_auth_handler.check_auth_rules_from_context(
+ event
+ )
+ except AuthError as e:
+ logger.warning(
+ "Denying new third party invite %r because %s", event, e
+ )
+ raise e
- # We retrieve the room member handler here as to not cause a cyclic dependency
- member_handler = self.hs.get_room_member_handler()
- await member_handler.send_membership_event(None, event, context)
+ await self._check_signature(event, context)
+
+ # We retrieve the room member handler here as to not cause a cyclic dependency
+ member_handler = self.hs.get_room_member_handler()
+ await member_handler.send_membership_event(None, event, context)
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
else:
destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)}
@@ -1400,28 +1421,46 @@ class FederationHandler:
room_version_obj, event_dict
)
- event, context = await self.event_creation_handler.create_new_client_event(
- builder=builder
- )
- event, context = await self.add_display_name_to_third_party_invite(
- room_version_obj, event_dict, event, context
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in send_membership_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ (
+ event,
+ context,
+ ) = await self.event_creation_handler.create_new_client_event(
+ builder=builder
+ )
+ event, context = await self.add_display_name_to_third_party_invite(
+ room_version_obj, event_dict, event, context
+ )
- try:
- validate_event_for_room_version(event)
- await self._event_auth_handler.check_auth_rules_from_context(event)
- except AuthError as e:
- logger.warning("Denying third party invite %r because %s", event, e)
- raise e
- await self._check_signature(event, context)
+ try:
+ validate_event_for_room_version(event)
+ await self._event_auth_handler.check_auth_rules_from_context(event)
+ except AuthError as e:
+ logger.warning("Denying third party invite %r because %s", event, e)
+ raise e
+ await self._check_signature(event, context)
+
+ # We need to tell the transaction queue to send this out, even
+ # though the sender isn't a local user.
+ event.internal_metadata.send_on_behalf_of = get_domain_from_id(
+ event.sender
+ )
- # We need to tell the transaction queue to send this out, even
- # though the sender isn't a local user.
- event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender)
+ # We retrieve the room member handler here as to not cause a cyclic dependency
+ member_handler = self.hs.get_room_member_handler()
+ await member_handler.send_membership_event(None, event, context)
- # We retrieve the room member handler here as to not cause a cyclic dependency
- member_handler = self.hs.get_room_member_handler()
- await member_handler.send_membership_event(None, event, context)
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
async def add_display_name_to_third_party_invite(
self,
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 845f683358..88fc51a4c9 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -37,7 +37,6 @@ from synapse.api.errors import (
AuthError,
Codes,
ConsentNotGivenError,
- LimitExceededError,
NotFoundError,
ShadowBanError,
SynapseError,
@@ -999,60 +998,73 @@ class EventCreationHandler:
event.internal_metadata.stream_ordering,
)
- event, context = await self.create_event(
- requester,
- event_dict,
- txn_id=txn_id,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- outlier=outlier,
- historical=historical,
- depth=depth,
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ outlier=outlier,
+ historical=historical,
+ depth=depth,
+ )
- assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
- event.sender,
- )
+ assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % (
+ event.sender,
+ )
- spam_check_result = await self.spam_checker.check_event_for_spam(event)
- if spam_check_result != self.spam_checker.NOT_SPAM:
- if isinstance(spam_check_result, tuple):
- try:
- [code, dict] = spam_check_result
- raise SynapseError(
- 403,
- "This message had been rejected as probable spam",
- code,
- dict,
- )
- except ValueError:
- logger.error(
- "Spam-check module returned invalid error value. Expecting [code, dict], got %s",
- spam_check_result,
- )
+ spam_check_result = await self.spam_checker.check_event_for_spam(event)
+ if spam_check_result != self.spam_checker.NOT_SPAM:
+ if isinstance(spam_check_result, tuple):
+ try:
+ [code, dict] = spam_check_result
+ raise SynapseError(
+ 403,
+ "This message had been rejected as probable spam",
+ code,
+ dict,
+ )
+ except ValueError:
+ logger.error(
+ "Spam-check module returned invalid error value. Expecting [code, dict], got %s",
+ spam_check_result,
+ )
- raise SynapseError(
- 403,
- "This message has been rejected as probable spam",
- Codes.FORBIDDEN,
- )
+ raise SynapseError(
+ 403,
+ "This message has been rejected as probable spam",
+ Codes.FORBIDDEN,
+ )
- # Backwards compatibility: if the return value is not an error code, it
- # means the module returned an error message to be included in the
- # SynapseError (which is now deprecated).
- raise SynapseError(
- 403,
- spam_check_result,
- Codes.FORBIDDEN,
+ # Backwards compatibility: if the return value is not an error code, it
+ # means the module returned an error message to be included in the
+ # SynapseError (which is now deprecated).
+ raise SynapseError(
+ 403,
+ spam_check_result,
+ Codes.FORBIDDEN,
+ )
+
+ ev = await self.handle_new_client_event(
+ requester=requester,
+ events_and_context=[(event, context)],
+ ratelimit=ratelimit,
+ ignore_shadow_ban=ignore_shadow_ban,
)
- ev = await self.handle_new_client_event(
- requester=requester,
- events_and_context=[(event, context)],
- ratelimit=ratelimit,
- ignore_shadow_ban=ignore_shadow_ban,
- )
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
# we know it was persisted, so must have a stream ordering
assert ev.internal_metadata.stream_ordering
@@ -1356,7 +1368,7 @@ class EventCreationHandler:
Raises:
ShadowBanError if the requester has been shadow-banned.
- SynapseError(503) if attempting to persist a partial state event in
+ PartialStateConflictError if attempting to persist a partial state event in
a room that has been un-partial stated.
"""
extra_users = extra_users or []
@@ -1418,34 +1430,23 @@ class EventCreationHandler:
# We now persist the event (and update the cache in parallel, since we
# don't want to block on it).
event, context = events_and_context[0]
- try:
- result, _ = await make_deferred_yieldable(
- gather_results(
- (
- run_in_background(
- self._persist_events,
- requester=requester,
- events_and_context=events_and_context,
- ratelimit=ratelimit,
- extra_users=extra_users,
- ),
- run_in_background(
- self.cache_joined_hosts_for_events, events_and_context
- ).addErrback(
- log_failure, "cache_joined_hosts_for_event failed"
- ),
+ result, _ = await make_deferred_yieldable(
+ gather_results(
+ (
+ run_in_background(
+ self._persist_events,
+ requester=requester,
+ events_and_context=events_and_context,
+ ratelimit=ratelimit,
+ extra_users=extra_users,
),
- consumeErrors=True,
- )
- ).addErrback(unwrapFirstError)
- except PartialStateConflictError as e:
- # The event context needs to be recomputed.
- # Turn the error into a 429, as a hint to the client to try again.
- logger.info(
- "Room %s was un-partial stated while persisting client event.",
- event.room_id,
+ run_in_background(
+ self.cache_joined_hosts_for_events, events_and_context
+ ).addErrback(log_failure, "cache_joined_hosts_for_event failed"),
+ ),
+ consumeErrors=True,
)
- raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0)
+ ).addErrback(unwrapFirstError)
return result
@@ -2012,26 +2013,39 @@ class EventCreationHandler:
for user_id in members:
requester = create_requester(user_id, authenticated_entity=self.server_name)
try:
- event, context = await self.create_event(
- requester,
- {
- "type": EventTypes.Dummy,
- "content": {},
- "room_id": room_id,
- "sender": user_id,
- },
- )
+ # Try several times, it could fail with PartialStateConflictError
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.create_event(
+ requester,
+ {
+ "type": EventTypes.Dummy,
+ "content": {},
+ "room_id": room_id,
+ "sender": user_id,
+ },
+ )
- event.internal_metadata.proactively_send = False
+ event.internal_metadata.proactively_send = False
- # Since this is a dummy-event it is OK if it is sent by a
- # shadow-banned user.
- await self.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- ratelimit=False,
- ignore_shadow_ban=True,
- )
+ # Since this is a dummy-event it is OK if it is sent by a
+ # shadow-banned user.
+ await self.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ ratelimit=False,
+ ignore_shadow_ban=True,
+ )
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
return True
except AuthError:
logger.info(
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index f81241c2b3..572c7b4db3 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -62,6 +62,7 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents
from synapse.handlers.relations import BundledAggregations
from synapse.module_api import NOT_SPAM
from synapse.rest.admin._base import assert_user_is_admin
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.streams import EventSource
from synapse.types import (
JsonDict,
@@ -207,46 +208,64 @@ class RoomCreationHandler:
new_room_id = self._generate_room_id()
- # Check whether the user has the power level to carry out the upgrade.
- # `check_auth_rules_from_context` will check that they are in the room and have
- # the required power level to send the tombstone event.
- (
- tombstone_event,
- tombstone_context,
- ) = await self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Tombstone,
- "state_key": "",
- "room_id": old_room_id,
- "sender": user_id,
- "content": {
- "body": "This room has been replaced",
- "replacement_room": new_room_id,
- },
- },
- )
- validate_event_for_room_version(tombstone_event)
- await self._event_auth_handler.check_auth_rules_from_context(tombstone_event)
+ # Try several times, it could fail with PartialStateConflictError
+ # in _upgrade_room, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ # Check whether the user has the power level to carry out the upgrade.
+ # `check_auth_rules_from_context` will check that they are in the room and have
+ # the required power level to send the tombstone event.
+ (
+ tombstone_event,
+ tombstone_context,
+ ) = await self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Tombstone,
+ "state_key": "",
+ "room_id": old_room_id,
+ "sender": user_id,
+ "content": {
+ "body": "This room has been replaced",
+ "replacement_room": new_room_id,
+ },
+ },
+ )
+ validate_event_for_room_version(tombstone_event)
+ await self._event_auth_handler.check_auth_rules_from_context(
+ tombstone_event
+ )
- # Upgrade the room
- #
- # If this user has sent multiple upgrade requests for the same room
- # and one of them is not complete yet, cache the response and
- # return it to all subsequent requests
- ret = await self._upgrade_response_cache.wrap(
- (old_room_id, user_id),
- self._upgrade_room,
- requester,
- old_room_id,
- old_room, # args for _upgrade_room
- new_room_id,
- new_version,
- tombstone_event,
- tombstone_context,
- )
+ # Upgrade the room
+ #
+ # If this user has sent multiple upgrade requests for the same room
+ # and one of them is not complete yet, cache the response and
+ # return it to all subsequent requests
+ ret = await self._upgrade_response_cache.wrap(
+ (old_room_id, user_id),
+ self._upgrade_room,
+ requester,
+ old_room_id,
+ old_room, # args for _upgrade_room
+ new_room_id,
+ new_version,
+ tombstone_event,
+ tombstone_context,
+ )
- return ret
+ return ret
+ except PartialStateConflictError as e:
+ # Clean up the cache so we can retry properly
+ self._upgrade_response_cache.unset((old_room_id, user_id))
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
+
+ # This is to satisfy mypy and should never happen
+ raise PartialStateConflictError()
async def _upgrade_room(
self,
diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py
index 411a6fb22f..c73d2adaad 100644
--- a/synapse/handlers/room_batch.py
+++ b/synapse/handlers/room_batch.py
@@ -375,6 +375,8 @@ class RoomBatchHandler:
# Events are sorted by (topological_ordering, stream_ordering)
# where topological_ordering is just depth.
for (event, context) in reversed(events_to_persist):
+ # This call can't raise `PartialStateConflictError` since we forbid
+ # use of the historical batch API during partial state
await self.event_creation_handler.handle_new_client_event(
await self.create_requester_for_user_id_from_app_service(
event.sender, app_service_requester.app_service
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0c39e852a1..d236cc09b5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -34,6 +34,7 @@ from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.types import (
JsonDict,
Requester,
@@ -392,60 +393,81 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
- event, context = await self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Member,
- "content": content,
- "room_id": room_id,
- "sender": requester.user.to_string(),
- "state_key": user_id,
- # For backwards compatibility:
- "membership": membership,
- "origin_server_ts": origin_server_ts,
- },
- txn_id=txn_id,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- depth=depth,
- require_consent=require_consent,
- outlier=outlier,
- historical=historical,
- )
-
- prev_state_ids = await context.get_prev_state_ids(
- StateFilter.from_types([(EventTypes.Member, None)])
- )
+ # Try several times, it could fail with PartialStateConflictError,
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Member,
+ "content": content,
+ "room_id": room_id,
+ "sender": requester.user.to_string(),
+ "state_key": user_id,
+ # For backwards compatibility:
+ "membership": membership,
+ "origin_server_ts": origin_server_ts,
+ },
+ txn_id=txn_id,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ depth=depth,
+ require_consent=require_consent,
+ outlier=outlier,
+ historical=historical,
+ )
- prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+ prev_state_ids = await context.get_prev_state_ids(
+ StateFilter.from_types([(EventTypes.Member, None)])
+ )
- if event.membership == Membership.JOIN:
- newly_joined = True
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- newly_joined = prev_member_event.membership != Membership.JOIN
-
- # Only rate-limit if the user actually joined the room, otherwise we'll end
- # up blocking profile updates.
- if newly_joined and ratelimit:
- await self._join_rate_limiter_local.ratelimit(requester)
- await self._join_rate_per_room_limiter.ratelimit(
- requester, key=room_id, update=False
+ prev_member_event_id = prev_state_ids.get(
+ (EventTypes.Member, user_id), None
)
- with opentracing.start_active_span("handle_new_client_event"):
- result_event = await self.event_creation_handler.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- extra_users=[target],
- ratelimit=ratelimit,
- )
- if event.membership == Membership.LEAVE:
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- if prev_member_event.membership == Membership.JOIN:
- await self._user_left_room(target, room_id)
+ if event.membership == Membership.JOIN:
+ newly_joined = True
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(
+ prev_member_event_id
+ )
+ newly_joined = prev_member_event.membership != Membership.JOIN
+
+ # Only rate-limit if the user actually joined the room, otherwise we'll end
+ # up blocking profile updates.
+ if newly_joined and ratelimit:
+ await self._join_rate_limiter_local.ratelimit(requester)
+ await self._join_rate_per_room_limiter.ratelimit(
+ requester, key=room_id, update=False
+ )
+ with opentracing.start_active_span("handle_new_client_event"):
+ result_event = (
+ await self.event_creation_handler.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ extra_users=[target],
+ ratelimit=ratelimit,
+ )
+ )
+
+ if event.membership == Membership.LEAVE:
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(
+ prev_member_event_id
+ )
+ if prev_member_event.membership == Membership.JOIN:
+ await self._user_left_room(target, room_id)
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
# we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering
@@ -1234,6 +1256,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: Whether to rate limit this request.
Raises:
SynapseError if there was a problem changing the membership.
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
target_user = UserID.from_string(event.state_key)
room_id = event.room_id
@@ -1863,21 +1887,37 @@ class RoomMemberMasterHandler(RoomMemberHandler):
list(previous_membership_event.auth_event_ids()) + prev_event_ids
)
- event, context = await self.event_creation_handler.create_event(
- requester,
- event_dict,
- txn_id=txn_id,
- prev_event_ids=prev_event_ids,
- auth_event_ids=auth_event_ids,
- outlier=True,
- )
- event.internal_metadata.out_of_band_membership = True
+ # Try several times, it could fail with PartialStateConflictError
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ outlier=True,
+ )
+ event.internal_metadata.out_of_band_membership = True
+
+ result_event = (
+ await self.event_creation_handler.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ extra_users=[UserID.from_string(target_user)],
+ )
+ )
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
- result_event = await self.event_creation_handler.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- extra_users=[UserID.from_string(target_user)],
- )
# we know it was persisted, so must have a stream ordering
assert result_event.internal_metadata.stream_ordering
diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py
index a3eb5f741b..340e5e9145 100644
--- a/synapse/util/caches/response_cache.py
+++ b/synapse/util/caches/response_cache.py
@@ -167,12 +167,10 @@ class ResponseCache(Generic[KV]):
# the should_cache bit, we leave it in the cache for now and schedule
# its removal later.
if self.timeout_sec and context.should_cache:
- self.clock.call_later(
- self.timeout_sec, self._result_cache.pop, key, None
- )
+ self.clock.call_later(self.timeout_sec, self.unset, key)
else:
# otherwise, remove the result immediately.
- self._result_cache.pop(key, None)
+ self.unset(key)
return r
# make sure we do this *after* adding the entry to result_cache,
@@ -181,6 +179,14 @@ class ResponseCache(Generic[KV]):
result.addBoth(on_complete)
return entry
+ def unset(self, key: KV) -> None:
+ """Remove the cached value for this key from the cache, if any.
+
+ Args:
+ key: key used to remove the cached value
+ """
+ self._result_cache.pop(key, None)
+
async def wrap(
self,
key: KV,
|