diff options
-rw-r--r-- | changelog.d/14665.misc | 1 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 117 | ||||
-rw-r--r-- | synapse/handlers/message.py | 202 | ||||
-rw-r--r-- | synapse/handlers/room.py | 95 | ||||
-rw-r--r-- | synapse/handlers/room_batch.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 168 | ||||
-rw-r--r-- | synapse/util/caches/response_cache.py | 14 |
7 files changed, 360 insertions, 239 deletions
diff --git a/changelog.d/14665.misc b/changelog.d/14665.misc new file mode 100644 index 0000000000..2b7c96143d --- /dev/null +++ b/changelog.d/14665.misc @@ -0,0 +1 @@ +Change `handle_new_client_event` signature so that a 429 does not reach clients on `PartialStateConflictError`, and internally retry when needed instead. diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index b2784d7333..eca75f1108 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -1343,32 +1343,53 @@ class FederationHandler: ) EventValidator().validate_builder(builder) - event, context = await self.event_creation_handler.create_new_client_event( - builder=builder - ) - event, context = await self.add_display_name_to_third_party_invite( - room_version_obj, event_dict, event, context - ) + # Try several times, it could fail with PartialStateConflictError + # in send_membership_event, cf comment in except block. + max_retries = 5 + for i in range(max_retries): + try: + ( + event, + context, + ) = await self.event_creation_handler.create_new_client_event( + builder=builder + ) - EventValidator().validate_new(event, self.config) + event, context = await self.add_display_name_to_third_party_invite( + room_version_obj, event_dict, event, context + ) - # We need to tell the transaction queue to send this out, even - # though the sender isn't a local user. - event.internal_metadata.send_on_behalf_of = self.hs.hostname + EventValidator().validate_new(event, self.config) - try: - validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context(event) - except AuthError as e: - logger.warning("Denying new third party invite %r because %s", event, e) - raise e + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = self.hs.hostname - await self._check_signature(event, context) + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context( + event + ) + except AuthError as e: + logger.warning( + "Denying new third party invite %r because %s", event, e + ) + raise e - # We retrieve the room member handler here as to not cause a cyclic dependency - member_handler = self.hs.get_room_member_handler() - await member_handler.send_membership_event(None, event, context) + await self._check_signature(event, context) + + # We retrieve the room member handler here as to not cause a cyclic dependency + member_handler = self.hs.get_room_member_handler() + await member_handler.send_membership_event(None, event, context) + + break + except PartialStateConflictError as e: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + if i == max_retries - 1: + raise e + pass else: destinations = {x.split(":", 1)[-1] for x in (sender_user_id, room_id)} @@ -1400,28 +1421,46 @@ class FederationHandler: room_version_obj, event_dict ) - event, context = await self.event_creation_handler.create_new_client_event( - builder=builder - ) - event, context = await self.add_display_name_to_third_party_invite( - room_version_obj, event_dict, event, context - ) + # Try several times, it could fail with PartialStateConflictError + # in send_membership_event, cf comment in except block. + max_retries = 5 + for i in range(max_retries): + try: + ( + event, + context, + ) = await self.event_creation_handler.create_new_client_event( + builder=builder + ) + event, context = await self.add_display_name_to_third_party_invite( + room_version_obj, event_dict, event, context + ) - try: - validate_event_for_room_version(event) - await self._event_auth_handler.check_auth_rules_from_context(event) - except AuthError as e: - logger.warning("Denying third party invite %r because %s", event, e) - raise e - await self._check_signature(event, context) + try: + validate_event_for_room_version(event) + await self._event_auth_handler.check_auth_rules_from_context(event) + except AuthError as e: + logger.warning("Denying third party invite %r because %s", event, e) + raise e + await self._check_signature(event, context) + + # We need to tell the transaction queue to send this out, even + # though the sender isn't a local user. + event.internal_metadata.send_on_behalf_of = get_domain_from_id( + event.sender + ) - # We need to tell the transaction queue to send this out, even - # though the sender isn't a local user. - event.internal_metadata.send_on_behalf_of = get_domain_from_id(event.sender) + # We retrieve the room member handler here as to not cause a cyclic dependency + member_handler = self.hs.get_room_member_handler() + await member_handler.send_membership_event(None, event, context) - # We retrieve the room member handler here as to not cause a cyclic dependency - member_handler = self.hs.get_room_member_handler() - await member_handler.send_membership_event(None, event, context) + break + except PartialStateConflictError as e: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + if i == max_retries - 1: + raise e + pass async def add_display_name_to_third_party_invite( self, diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 845f683358..88fc51a4c9 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -37,7 +37,6 @@ from synapse.api.errors import ( AuthError, Codes, ConsentNotGivenError, - LimitExceededError, NotFoundError, ShadowBanError, SynapseError, @@ -999,60 +998,73 @@ class EventCreationHandler: event.internal_metadata.stream_ordering, ) - event, context = await self.create_event( - requester, - event_dict, - txn_id=txn_id, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - outlier=outlier, - historical=historical, - depth=depth, - ) + # Try several times, it could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + max_retries = 5 + for i in range(max_retries): + try: + event, context = await self.create_event( + requester, + event_dict, + txn_id=txn_id, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + outlier=outlier, + historical=historical, + depth=depth, + ) - assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( - event.sender, - ) + assert self.hs.is_mine_id(event.sender), "User must be our own: %s" % ( + event.sender, + ) - spam_check_result = await self.spam_checker.check_event_for_spam(event) - if spam_check_result != self.spam_checker.NOT_SPAM: - if isinstance(spam_check_result, tuple): - try: - [code, dict] = spam_check_result - raise SynapseError( - 403, - "This message had been rejected as probable spam", - code, - dict, - ) - except ValueError: - logger.error( - "Spam-check module returned invalid error value. Expecting [code, dict], got %s", - spam_check_result, - ) + spam_check_result = await self.spam_checker.check_event_for_spam(event) + if spam_check_result != self.spam_checker.NOT_SPAM: + if isinstance(spam_check_result, tuple): + try: + [code, dict] = spam_check_result + raise SynapseError( + 403, + "This message had been rejected as probable spam", + code, + dict, + ) + except ValueError: + logger.error( + "Spam-check module returned invalid error value. Expecting [code, dict], got %s", + spam_check_result, + ) - raise SynapseError( - 403, - "This message has been rejected as probable spam", - Codes.FORBIDDEN, - ) + raise SynapseError( + 403, + "This message has been rejected as probable spam", + Codes.FORBIDDEN, + ) - # Backwards compatibility: if the return value is not an error code, it - # means the module returned an error message to be included in the - # SynapseError (which is now deprecated). - raise SynapseError( - 403, - spam_check_result, - Codes.FORBIDDEN, + # Backwards compatibility: if the return value is not an error code, it + # means the module returned an error message to be included in the + # SynapseError (which is now deprecated). + raise SynapseError( + 403, + spam_check_result, + Codes.FORBIDDEN, + ) + + ev = await self.handle_new_client_event( + requester=requester, + events_and_context=[(event, context)], + ratelimit=ratelimit, + ignore_shadow_ban=ignore_shadow_ban, ) - ev = await self.handle_new_client_event( - requester=requester, - events_and_context=[(event, context)], - ratelimit=ratelimit, - ignore_shadow_ban=ignore_shadow_ban, - ) + break + except PartialStateConflictError as e: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + if i == max_retries - 1: + raise e + pass # we know it was persisted, so must have a stream ordering assert ev.internal_metadata.stream_ordering @@ -1356,7 +1368,7 @@ class EventCreationHandler: Raises: ShadowBanError if the requester has been shadow-banned. - SynapseError(503) if attempting to persist a partial state event in + PartialStateConflictError if attempting to persist a partial state event in a room that has been un-partial stated. """ extra_users = extra_users or [] @@ -1418,34 +1430,23 @@ class EventCreationHandler: # We now persist the event (and update the cache in parallel, since we # don't want to block on it). event, context = events_and_context[0] - try: - result, _ = await make_deferred_yieldable( - gather_results( - ( - run_in_background( - self._persist_events, - requester=requester, - events_and_context=events_and_context, - ratelimit=ratelimit, - extra_users=extra_users, - ), - run_in_background( - self.cache_joined_hosts_for_events, events_and_context - ).addErrback( - log_failure, "cache_joined_hosts_for_event failed" - ), + result, _ = await make_deferred_yieldable( + gather_results( + ( + run_in_background( + self._persist_events, + requester=requester, + events_and_context=events_and_context, + ratelimit=ratelimit, + extra_users=extra_users, ), - consumeErrors=True, - ) - ).addErrback(unwrapFirstError) - except PartialStateConflictError as e: - # The event context needs to be recomputed. - # Turn the error into a 429, as a hint to the client to try again. - logger.info( - "Room %s was un-partial stated while persisting client event.", - event.room_id, + run_in_background( + self.cache_joined_hosts_for_events, events_and_context + ).addErrback(log_failure, "cache_joined_hosts_for_event failed"), + ), + consumeErrors=True, ) - raise LimitExceededError(msg=e.msg, errcode=e.errcode, retry_after_ms=0) + ).addErrback(unwrapFirstError) return result @@ -2012,26 +2013,39 @@ class EventCreationHandler: for user_id in members: requester = create_requester(user_id, authenticated_entity=self.server_name) try: - event, context = await self.create_event( - requester, - { - "type": EventTypes.Dummy, - "content": {}, - "room_id": room_id, - "sender": user_id, - }, - ) + # Try several times, it could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + max_retries = 5 + for i in range(max_retries): + try: + event, context = await self.create_event( + requester, + { + "type": EventTypes.Dummy, + "content": {}, + "room_id": room_id, + "sender": user_id, + }, + ) - event.internal_metadata.proactively_send = False + event.internal_metadata.proactively_send = False - # Since this is a dummy-event it is OK if it is sent by a - # shadow-banned user. - await self.handle_new_client_event( - requester, - events_and_context=[(event, context)], - ratelimit=False, - ignore_shadow_ban=True, - ) + # Since this is a dummy-event it is OK if it is sent by a + # shadow-banned user. + await self.handle_new_client_event( + requester, + events_and_context=[(event, context)], + ratelimit=False, + ignore_shadow_ban=True, + ) + + break + except PartialStateConflictError as e: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + if i == max_retries - 1: + raise e + pass return True except AuthError: logger.info( diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index f81241c2b3..572c7b4db3 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -62,6 +62,7 @@ from synapse.events.utils import copy_and_fixup_power_levels_contents from synapse.handlers.relations import BundledAggregations from synapse.module_api import NOT_SPAM from synapse.rest.admin._base import assert_user_is_admin +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.streams import EventSource from synapse.types import ( JsonDict, @@ -207,46 +208,64 @@ class RoomCreationHandler: new_room_id = self._generate_room_id() - # Check whether the user has the power level to carry out the upgrade. - # `check_auth_rules_from_context` will check that they are in the room and have - # the required power level to send the tombstone event. - ( - tombstone_event, - tombstone_context, - ) = await self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Tombstone, - "state_key": "", - "room_id": old_room_id, - "sender": user_id, - "content": { - "body": "This room has been replaced", - "replacement_room": new_room_id, - }, - }, - ) - validate_event_for_room_version(tombstone_event) - await self._event_auth_handler.check_auth_rules_from_context(tombstone_event) + # Try several times, it could fail with PartialStateConflictError + # in _upgrade_room, cf comment in except block. + max_retries = 5 + for i in range(max_retries): + try: + # Check whether the user has the power level to carry out the upgrade. + # `check_auth_rules_from_context` will check that they are in the room and have + # the required power level to send the tombstone event. + ( + tombstone_event, + tombstone_context, + ) = await self.event_creation_handler.create_event( + requester, + { + "type": EventTypes.Tombstone, + "state_key": "", + "room_id": old_room_id, + "sender": user_id, + "content": { + "body": "This room has been replaced", + "replacement_room": new_room_id, + }, + }, + ) + validate_event_for_room_version(tombstone_event) + await self._event_auth_handler.check_auth_rules_from_context( + tombstone_event + ) - # Upgrade the room - # - # If this user has sent multiple upgrade requests for the same room - # and one of them is not complete yet, cache the response and - # return it to all subsequent requests - ret = await self._upgrade_response_cache.wrap( - (old_room_id, user_id), - self._upgrade_room, - requester, - old_room_id, - old_room, # args for _upgrade_room - new_room_id, - new_version, - tombstone_event, - tombstone_context, - ) + # Upgrade the room + # + # If this user has sent multiple upgrade requests for the same room + # and one of them is not complete yet, cache the response and + # return it to all subsequent requests + ret = await self._upgrade_response_cache.wrap( + (old_room_id, user_id), + self._upgrade_room, + requester, + old_room_id, + old_room, # args for _upgrade_room + new_room_id, + new_version, + tombstone_event, + tombstone_context, + ) - return ret + return ret + except PartialStateConflictError as e: + # Clean up the cache so we can retry properly + self._upgrade_response_cache.unset((old_room_id, user_id)) + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + if i == max_retries - 1: + raise e + pass + + # This is to satisfy mypy and should never happen + raise PartialStateConflictError() async def _upgrade_room( self, diff --git a/synapse/handlers/room_batch.py b/synapse/handlers/room_batch.py index 411a6fb22f..c73d2adaad 100644 --- a/synapse/handlers/room_batch.py +++ b/synapse/handlers/room_batch.py @@ -375,6 +375,8 @@ class RoomBatchHandler: # Events are sorted by (topological_ordering, stream_ordering) # where topological_ordering is just depth. for (event, context) in reversed(events_to_persist): + # This call can't raise `PartialStateConflictError` since we forbid + # use of the historical batch API during partial state await self.event_creation_handler.handle_new_client_event( await self.create_requester_for_user_id_from_app_service( event.sender, app_service_requester.app_service diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0c39e852a1..d236cc09b5 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -34,6 +34,7 @@ from synapse.events.snapshot import EventContext from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN from synapse.logging import opentracing from synapse.module_api import NOT_SPAM +from synapse.storage.databases.main.events import PartialStateConflictError from synapse.types import ( JsonDict, Requester, @@ -392,60 +393,81 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): event_pos = await self.store.get_position_for_event(existing_event_id) return existing_event_id, event_pos.stream - event, context = await self.event_creation_handler.create_event( - requester, - { - "type": EventTypes.Member, - "content": content, - "room_id": room_id, - "sender": requester.user.to_string(), - "state_key": user_id, - # For backwards compatibility: - "membership": membership, - "origin_server_ts": origin_server_ts, - }, - txn_id=txn_id, - allow_no_prev_events=allow_no_prev_events, - prev_event_ids=prev_event_ids, - state_event_ids=state_event_ids, - depth=depth, - require_consent=require_consent, - outlier=outlier, - historical=historical, - ) - - prev_state_ids = await context.get_prev_state_ids( - StateFilter.from_types([(EventTypes.Member, None)]) - ) + # Try several times, it could fail with PartialStateConflictError, + # in handle_new_client_event, cf comment in except block. + max_retries = 5 + for i in range(max_retries): + try: + event, context = await self.event_creation_handler.create_event( + requester, + { + "type": EventTypes.Member, + "content": content, + "room_id": room_id, + "sender": requester.user.to_string(), + "state_key": user_id, + # For backwards compatibility: + "membership": membership, + "origin_server_ts": origin_server_ts, + }, + txn_id=txn_id, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + depth=depth, + require_consent=require_consent, + outlier=outlier, + historical=historical, + ) - prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None) + prev_state_ids = await context.get_prev_state_ids( + StateFilter.from_types([(EventTypes.Member, None)]) + ) - if event.membership == Membership.JOIN: - newly_joined = True - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - newly_joined = prev_member_event.membership != Membership.JOIN - - # Only rate-limit if the user actually joined the room, otherwise we'll end - # up blocking profile updates. - if newly_joined and ratelimit: - await self._join_rate_limiter_local.ratelimit(requester) - await self._join_rate_per_room_limiter.ratelimit( - requester, key=room_id, update=False + prev_member_event_id = prev_state_ids.get( + (EventTypes.Member, user_id), None ) - with opentracing.start_active_span("handle_new_client_event"): - result_event = await self.event_creation_handler.handle_new_client_event( - requester, - events_and_context=[(event, context)], - extra_users=[target], - ratelimit=ratelimit, - ) - if event.membership == Membership.LEAVE: - if prev_member_event_id: - prev_member_event = await self.store.get_event(prev_member_event_id) - if prev_member_event.membership == Membership.JOIN: - await self._user_left_room(target, room_id) + if event.membership == Membership.JOIN: + newly_joined = True + if prev_member_event_id: + prev_member_event = await self.store.get_event( + prev_member_event_id + ) + newly_joined = prev_member_event.membership != Membership.JOIN + + # Only rate-limit if the user actually joined the room, otherwise we'll end + # up blocking profile updates. + if newly_joined and ratelimit: + await self._join_rate_limiter_local.ratelimit(requester) + await self._join_rate_per_room_limiter.ratelimit( + requester, key=room_id, update=False + ) + with opentracing.start_active_span("handle_new_client_event"): + result_event = ( + await self.event_creation_handler.handle_new_client_event( + requester, + events_and_context=[(event, context)], + extra_users=[target], + ratelimit=ratelimit, + ) + ) + + if event.membership == Membership.LEAVE: + if prev_member_event_id: + prev_member_event = await self.store.get_event( + prev_member_event_id + ) + if prev_member_event.membership == Membership.JOIN: + await self._user_left_room(target, room_id) + + break + except PartialStateConflictError as e: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + if i == max_retries - 1: + raise e + pass # we know it was persisted, so should have a stream ordering assert result_event.internal_metadata.stream_ordering @@ -1234,6 +1256,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): ratelimit: Whether to rate limit this request. Raises: SynapseError if there was a problem changing the membership. + PartialStateConflictError: if attempting to persist a partial state event in + a room that has been un-partial stated. """ target_user = UserID.from_string(event.state_key) room_id = event.room_id @@ -1863,21 +1887,37 @@ class RoomMemberMasterHandler(RoomMemberHandler): list(previous_membership_event.auth_event_ids()) + prev_event_ids ) - event, context = await self.event_creation_handler.create_event( - requester, - event_dict, - txn_id=txn_id, - prev_event_ids=prev_event_ids, - auth_event_ids=auth_event_ids, - outlier=True, - ) - event.internal_metadata.out_of_band_membership = True + # Try several times, it could fail with PartialStateConflictError + # in handle_new_client_event, cf comment in except block. + max_retries = 5 + for i in range(max_retries): + try: + event, context = await self.event_creation_handler.create_event( + requester, + event_dict, + txn_id=txn_id, + prev_event_ids=prev_event_ids, + auth_event_ids=auth_event_ids, + outlier=True, + ) + event.internal_metadata.out_of_band_membership = True + + result_event = ( + await self.event_creation_handler.handle_new_client_event( + requester, + events_and_context=[(event, context)], + extra_users=[UserID.from_string(target_user)], + ) + ) + + break + except PartialStateConflictError as e: + # Persisting couldn't happen because the room got un-partial stated + # in the meantime and context needs to be recomputed, so let's do so. + if i == max_retries - 1: + raise e + pass - result_event = await self.event_creation_handler.handle_new_client_event( - requester, - events_and_context=[(event, context)], - extra_users=[UserID.from_string(target_user)], - ) # we know it was persisted, so must have a stream ordering assert result_event.internal_metadata.stream_ordering diff --git a/synapse/util/caches/response_cache.py b/synapse/util/caches/response_cache.py index a3eb5f741b..340e5e9145 100644 --- a/synapse/util/caches/response_cache.py +++ b/synapse/util/caches/response_cache.py @@ -167,12 +167,10 @@ class ResponseCache(Generic[KV]): # the should_cache bit, we leave it in the cache for now and schedule # its removal later. if self.timeout_sec and context.should_cache: - self.clock.call_later( - self.timeout_sec, self._result_cache.pop, key, None - ) + self.clock.call_later(self.timeout_sec, self.unset, key) else: # otherwise, remove the result immediately. - self._result_cache.pop(key, None) + self.unset(key) return r # make sure we do this *after* adding the entry to result_cache, @@ -181,6 +179,14 @@ class ResponseCache(Generic[KV]): result.addBoth(on_complete) return entry + def unset(self, key: KV) -> None: + """Remove the cached value for this key from the cache, if any. + + Args: + key: key used to remove the cached value + """ + self._result_cache.pop(key, None) + async def wrap( self, key: KV, |