diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0c39e852a1..d236cc09b5 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -34,6 +34,7 @@ from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.logging import opentracing
from synapse.module_api import NOT_SPAM
+from synapse.storage.databases.main.events import PartialStateConflictError
from synapse.types import (
JsonDict,
Requester,
@@ -392,60 +393,81 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
- event, context = await self.event_creation_handler.create_event(
- requester,
- {
- "type": EventTypes.Member,
- "content": content,
- "room_id": room_id,
- "sender": requester.user.to_string(),
- "state_key": user_id,
- # For backwards compatibility:
- "membership": membership,
- "origin_server_ts": origin_server_ts,
- },
- txn_id=txn_id,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- depth=depth,
- require_consent=require_consent,
- outlier=outlier,
- historical=historical,
- )
-
- prev_state_ids = await context.get_prev_state_ids(
- StateFilter.from_types([(EventTypes.Member, None)])
- )
+ # Try several times, it could fail with PartialStateConflictError,
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ {
+ "type": EventTypes.Member,
+ "content": content,
+ "room_id": room_id,
+ "sender": requester.user.to_string(),
+ "state_key": user_id,
+ # For backwards compatibility:
+ "membership": membership,
+ "origin_server_ts": origin_server_ts,
+ },
+ txn_id=txn_id,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ depth=depth,
+ require_consent=require_consent,
+ outlier=outlier,
+ historical=historical,
+ )
- prev_member_event_id = prev_state_ids.get((EventTypes.Member, user_id), None)
+ prev_state_ids = await context.get_prev_state_ids(
+ StateFilter.from_types([(EventTypes.Member, None)])
+ )
- if event.membership == Membership.JOIN:
- newly_joined = True
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- newly_joined = prev_member_event.membership != Membership.JOIN
-
- # Only rate-limit if the user actually joined the room, otherwise we'll end
- # up blocking profile updates.
- if newly_joined and ratelimit:
- await self._join_rate_limiter_local.ratelimit(requester)
- await self._join_rate_per_room_limiter.ratelimit(
- requester, key=room_id, update=False
+ prev_member_event_id = prev_state_ids.get(
+ (EventTypes.Member, user_id), None
)
- with opentracing.start_active_span("handle_new_client_event"):
- result_event = await self.event_creation_handler.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- extra_users=[target],
- ratelimit=ratelimit,
- )
- if event.membership == Membership.LEAVE:
- if prev_member_event_id:
- prev_member_event = await self.store.get_event(prev_member_event_id)
- if prev_member_event.membership == Membership.JOIN:
- await self._user_left_room(target, room_id)
+ if event.membership == Membership.JOIN:
+ newly_joined = True
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(
+ prev_member_event_id
+ )
+ newly_joined = prev_member_event.membership != Membership.JOIN
+
+ # Only rate-limit if the user actually joined the room, otherwise we'll end
+ # up blocking profile updates.
+ if newly_joined and ratelimit:
+ await self._join_rate_limiter_local.ratelimit(requester)
+ await self._join_rate_per_room_limiter.ratelimit(
+ requester, key=room_id, update=False
+ )
+ with opentracing.start_active_span("handle_new_client_event"):
+ result_event = (
+ await self.event_creation_handler.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ extra_users=[target],
+ ratelimit=ratelimit,
+ )
+ )
+
+ if event.membership == Membership.LEAVE:
+ if prev_member_event_id:
+ prev_member_event = await self.store.get_event(
+ prev_member_event_id
+ )
+ if prev_member_event.membership == Membership.JOIN:
+ await self._user_left_room(target, room_id)
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
# we know it was persisted, so should have a stream ordering
assert result_event.internal_metadata.stream_ordering
@@ -1234,6 +1256,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
ratelimit: Whether to rate limit this request.
Raises:
SynapseError if there was a problem changing the membership.
+ PartialStateConflictError: if attempting to persist a partial state event in
+ a room that has been un-partial stated.
"""
target_user = UserID.from_string(event.state_key)
room_id = event.room_id
@@ -1863,21 +1887,37 @@ class RoomMemberMasterHandler(RoomMemberHandler):
list(previous_membership_event.auth_event_ids()) + prev_event_ids
)
- event, context = await self.event_creation_handler.create_event(
- requester,
- event_dict,
- txn_id=txn_id,
- prev_event_ids=prev_event_ids,
- auth_event_ids=auth_event_ids,
- outlier=True,
- )
- event.internal_metadata.out_of_band_membership = True
+ # Try several times, it could fail with PartialStateConflictError
+ # in handle_new_client_event, cf comment in except block.
+ max_retries = 5
+ for i in range(max_retries):
+ try:
+ event, context = await self.event_creation_handler.create_event(
+ requester,
+ event_dict,
+ txn_id=txn_id,
+ prev_event_ids=prev_event_ids,
+ auth_event_ids=auth_event_ids,
+ outlier=True,
+ )
+ event.internal_metadata.out_of_band_membership = True
+
+ result_event = (
+ await self.event_creation_handler.handle_new_client_event(
+ requester,
+ events_and_context=[(event, context)],
+ extra_users=[UserID.from_string(target_user)],
+ )
+ )
+
+ break
+ except PartialStateConflictError as e:
+ # Persisting couldn't happen because the room got un-partial stated
+ # in the meantime and context needs to be recomputed, so let's do so.
+ if i == max_retries - 1:
+ raise e
+ pass
- result_event = await self.event_creation_handler.handle_new_client_event(
- requester,
- events_and_context=[(event, context)],
- extra_users=[UserID.from_string(target_user)],
- )
# we know it was persisted, so must have a stream ordering
assert result_event.internal_metadata.stream_ordering
|