summary refs log tree commit diff
path: root/synapse/handlers/message.py
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/handlers/message.py')
-rw-r--r--synapse/handlers/message.py89
1 files changed, 73 insertions, 16 deletions
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py

index fff0b5fa12..d485f21e49 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py
@@ -53,6 +53,7 @@ from synapse.events.snapshot import EventContext, UnpersistedEventContextBase from synapse.events.utils import SerializeEventConfig, maybe_upsert_event_field from synapse.events.validator import EventValidator from synapse.handlers.directory import DirectoryHandler +from synapse.handlers.worker_lock import DELETE_ROOM_LOCK_NAME from synapse.logging import opentracing from synapse.logging.context import make_deferred_yieldable, run_in_background from synapse.metrics.background_process_metrics import run_as_background_process @@ -485,6 +486,7 @@ class EventCreationHandler: self._events_shard_config = self.config.worker.events_shard_config self._instance_name = hs.get_instance_name() self._notifier = hs.get_notifier() + self._worker_lock_handler = hs.get_worker_locks_handler() self.room_prejoin_state_types = self.hs.config.api.room_prejoin_state @@ -559,8 +561,6 @@ class EventCreationHandler: expiry_ms=30 * 60 * 1000, ) - self._msc3970_enabled = hs.config.experimental.msc3970_enabled - async def create_event( self, requester: Requester, @@ -876,14 +876,13 @@ class EventCreationHandler: return prev_event return None - async def get_event_from_transaction( + async def get_event_id_from_transaction( self, requester: Requester, txn_id: str, room_id: str, - ) -> Optional[EventBase]: - """For the given transaction ID and room ID, check if there is a matching event. - If so, fetch it and return it. + ) -> Optional[str]: + """For the given transaction ID and room ID, check if there is a matching event ID. Args: requester: The requester making the request in the context of which we want @@ -892,12 +891,12 @@ class EventCreationHandler: room_id: The room ID. Returns: - An event if one could be found, None otherwise. + An event ID if one could be found, None otherwise. """ + existing_event_id = None - if self._msc3970_enabled and requester.device_id: - # When MSC3970 is enabled, we lookup for events sent by the same device first, - # and fallback to the old behaviour if none were found. + # According to the spec, transactions are scoped to a user's device ID. + if requester.device_id: existing_event_id = ( await self.store.get_event_id_from_transaction_id_and_device_id( room_id, @@ -907,10 +906,11 @@ class EventCreationHandler: ) ) if existing_event_id: - return await self.store.get_event(existing_event_id) + return existing_event_id - # Pre-MSC3970, we looked up for events that were sent by the same session by - # using the access token ID. + # Some requsters don't have device IDs (appservice, guests, and access + # tokens minted with the admin API), fallback to checking the access token + # ID, which should be close enough. if requester.access_token_id: existing_event_id = ( await self.store.get_event_id_from_transaction_id_and_token_id( @@ -920,9 +920,32 @@ class EventCreationHandler: txn_id, ) ) - if existing_event_id: - return await self.store.get_event(existing_event_id) + return existing_event_id + + async def get_event_from_transaction( + self, + requester: Requester, + txn_id: str, + room_id: str, + ) -> Optional[EventBase]: + """For the given transaction ID and room ID, check if there is a matching event. + If so, fetch it and return it. + + Args: + requester: The requester making the request in the context of which we want + to fetch the event. + txn_id: The transaction ID. + room_id: The room ID. + + Returns: + An event if one could be found, None otherwise. + """ + existing_event_id = await self.get_event_id_from_transaction( + requester, txn_id, room_id + ) + if existing_event_id: + return await self.store.get_event(existing_event_id) return None async def create_and_send_nonmember_event( @@ -1010,6 +1033,37 @@ class EventCreationHandler: event.internal_metadata.stream_ordering, ) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + return await self._create_and_send_nonmember_event_locked( + requester=requester, + event_dict=event_dict, + allow_no_prev_events=allow_no_prev_events, + prev_event_ids=prev_event_ids, + state_event_ids=state_event_ids, + ratelimit=ratelimit, + txn_id=txn_id, + ignore_shadow_ban=ignore_shadow_ban, + outlier=outlier, + depth=depth, + ) + + async def _create_and_send_nonmember_event_locked( + self, + requester: Requester, + event_dict: dict, + allow_no_prev_events: bool = False, + prev_event_ids: Optional[List[str]] = None, + state_event_ids: Optional[List[str]] = None, + ratelimit: bool = True, + txn_id: Optional[str] = None, + ignore_shadow_ban: bool = False, + outlier: bool = False, + depth: Optional[int] = None, + ) -> Tuple[EventBase, int]: + room_id = event_dict["room_id"] + # If we don't have any prev event IDs specified then we need to # check that the host is in the room (as otherwise populating the # prev events will fail), at which point we may as well check the @@ -1923,7 +1977,10 @@ class EventCreationHandler: ) for room_id in room_ids: - dummy_event_sent = await self._send_dummy_event_for_room(room_id) + async with self._worker_lock_handler.acquire_read_write_lock( + DELETE_ROOM_LOCK_NAME, room_id, write=False + ): + dummy_event_sent = await self._send_dummy_event_for_room(room_id) if not dummy_event_sent: # Did not find a valid user in the room, so remove from future attempts