diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 51ae3f50f8..bd8277e736 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -39,6 +39,7 @@ from synapse.events import EventBase
from synapse.events.snapshot import EventContext
from synapse.handlers.profile import MAX_AVATAR_URL_LEN, MAX_DISPLAYNAME_LEN
from synapse.handlers.state_deltas import MatchChange, StateDeltasHandler
+from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME
from synapse.logging import opentracing
from synapse.metrics import event_processing_positions
from synapse.metrics.background_process_metrics import run_as_background_process
@@ -94,6 +95,7 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.event_creation_handler = hs.get_event_creation_handler()
self.account_data_handler = hs.get_account_data_handler()
self.event_auth_handler = hs.get_event_auth_handler()
+ self._worker_lock_handler = hs.get_worker_locks_handler()
self.member_linearizer: Linearizer = Linearizer(name="member")
self.member_as_limiter = Linearizer(max_count=10, name="member_as_limiter")
@@ -174,8 +176,6 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
self.request_ratelimiter = hs.get_request_ratelimiter()
hs.get_notifier().add_new_join_in_room_callback(self._on_user_joined_room)
- self._msc3970_enabled = hs.config.experimental.msc3970_enabled
-
def _on_user_joined_room(self, event_id: str, room_id: str) -> None:
"""Notify the rate limiter that a room join has occurred.
@@ -416,29 +416,11 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# do this check just before we persist an event as well, but may as well
# do it up front for efficiency.)
if txn_id:
- existing_event_id = None
- if self._msc3970_enabled and requester.device_id:
- # When MSC3970 is enabled, we lookup for events sent by the same device
- # first, and fallback to the old behaviour if none were found.
- existing_event_id = (
- await self.store.get_event_id_from_transaction_id_and_device_id(
- room_id,
- requester.user.to_string(),
- requester.device_id,
- txn_id,
- )
+ existing_event_id = (
+ await self.event_creation_handler.get_event_id_from_transaction(
+ requester, txn_id, room_id
)
-
- if requester.access_token_id and not existing_event_id:
- existing_event_id = (
- await self.store.get_event_id_from_transaction_id_and_token_id(
- room_id,
- requester.user.to_string(),
- requester.access_token_id,
- txn_id,
- )
- )
-
+ )
if existing_event_id:
event_pos = await self.store.get_position_for_event(existing_event_id)
return existing_event_id, event_pos.stream
@@ -646,32 +628,35 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
raise SynapseError(504, "took to long to process")
async with self.member_linearizer.queue(key):
- diff = self.clock.time_msec() - then
+ async with self._worker_lock_handler.acquire_read_write_lock(
+ DELETE_ROOM_LOCK_NAME, room_id, write=False
+ ):
+ diff = self.clock.time_msec() - then
- if diff > 80 * 1000:
- # haproxy would have timed the request out anyway...
- raise SynapseError(504, "took to long to process")
+ if diff > 80 * 1000:
+ # haproxy would have timed the request out anyway...
+ raise SynapseError(504, "took to long to process")
- with opentracing.start_active_span("update_membership_locked"):
- result = await self.update_membership_locked(
- requester,
- target,
- room_id,
- action,
- txn_id=txn_id,
- remote_room_hosts=remote_room_hosts,
- third_party_signed=third_party_signed,
- ratelimit=ratelimit,
- content=content,
- new_room=new_room,
- require_consent=require_consent,
- outlier=outlier,
- allow_no_prev_events=allow_no_prev_events,
- prev_event_ids=prev_event_ids,
- state_event_ids=state_event_ids,
- depth=depth,
- origin_server_ts=origin_server_ts,
- )
+ with opentracing.start_active_span("update_membership_locked"):
+ result = await self.update_membership_locked(
+ requester,
+ target,
+ room_id,
+ action,
+ txn_id=txn_id,
+ remote_room_hosts=remote_room_hosts,
+ third_party_signed=third_party_signed,
+ ratelimit=ratelimit,
+ content=content,
+ new_room=new_room,
+ require_consent=require_consent,
+ outlier=outlier,
+ allow_no_prev_events=allow_no_prev_events,
+ prev_event_ids=prev_event_ids,
+ state_event_ids=state_event_ids,
+ depth=depth,
+ origin_server_ts=origin_server_ts,
+ )
return result
|