diff options
author | Sean Quah <8349537+squahtx@users.noreply.github.com> | 2022-04-05 15:43:52 +0100 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-04-05 15:43:52 +0100 |
commit | 800ba87cc881856adae19ec40485578356398639 (patch) | |
tree | cb19370283b7def590239b507c5dc5c72e39af8a /synapse/handlers | |
parent | Merge branch 'master' into develop (diff) | |
download | synapse-800ba87cc881856adae19ec40485578356398639.tar.xz |
Refactor and convert `Linearizer` to async (#12357)
Refactor and convert `Linearizer` to async. This makes a `Linearizer` cancellation bug easier to fix. Also refactor to use an async context manager, which eliminates an unlikely footgun where code that doesn't immediately use the context manager could forget to release the lock. Signed-off-by: Sean Quah <seanq@element.io>
Diffstat (limited to 'synapse/handlers')
-rw-r--r-- | synapse/handlers/appservice.py | 6 | ||||
-rw-r--r-- | synapse/handlers/device.py | 2 | ||||
-rw-r--r-- | synapse/handlers/e2e_keys.py | 4 | ||||
-rw-r--r-- | synapse/handlers/e2e_room_keys.py | 14 | ||||
-rw-r--r-- | synapse/handlers/federation.py | 2 | ||||
-rw-r--r-- | synapse/handlers/federation_event.py | 2 | ||||
-rw-r--r-- | synapse/handlers/message.py | 2 | ||||
-rw-r--r-- | synapse/handlers/presence.py | 4 | ||||
-rw-r--r-- | synapse/handlers/read_marker.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room.py | 2 | ||||
-rw-r--r-- | synapse/handlers/room_member.py | 4 | ||||
-rw-r--r-- | synapse/handlers/sso.py | 2 |
12 files changed, 22 insertions, 24 deletions
diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py index 316c4b677c..1b57840506 100644 --- a/synapse/handlers/appservice.py +++ b/synapse/handlers/appservice.py @@ -330,10 +330,8 @@ class ApplicationServicesHandler: continue # Since we read/update the stream position for this AS/stream - with ( - await self._ephemeral_events_linearizer.queue( - (service.id, stream_key) - ) + async with self._ephemeral_events_linearizer.queue( + (service.id, stream_key) ): if stream_key == "receipt_key": events = await self._handle_receipts(service, new_token) diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py index c710c02cf9..ffa28b2a30 100644 --- a/synapse/handlers/device.py +++ b/synapse/handlers/device.py @@ -833,7 +833,7 @@ class DeviceListUpdater: async def _handle_device_updates(self, user_id: str) -> None: "Actually handle pending updates." - with (await self._remote_edu_linearizer.queue(user_id)): + async with self._remote_edu_linearizer.queue(user_id): pending_updates = self._pending_updates.pop(user_id, []) if not pending_updates: # This can happen since we batch updates diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py index d96456cd40..d6714228ef 100644 --- a/synapse/handlers/e2e_keys.py +++ b/synapse/handlers/e2e_keys.py @@ -118,7 +118,7 @@ class E2eKeysHandler: from_device_id: the device making the query. This is used to limit the number of in-flight queries at a time. """ - with await self._query_devices_linearizer.queue((from_user_id, from_device_id)): + async with self._query_devices_linearizer.queue((from_user_id, from_device_id)): device_keys_query: Dict[str, Iterable[str]] = query_body.get( "device_keys", {} ) @@ -1386,7 +1386,7 @@ class SigningKeyEduUpdater: device_handler = self.e2e_keys_handler.device_handler device_list_updater = device_handler.device_list_updater - with (await self._remote_edu_linearizer.queue(user_id)): + async with self._remote_edu_linearizer.queue(user_id): pending_updates = self._pending_updates.pop(user_id, []) if not pending_updates: # This can happen since we batch updates diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py index 52e44a2d42..446f509bdc 100644 --- a/synapse/handlers/e2e_room_keys.py +++ b/synapse/handlers/e2e_room_keys.py @@ -83,7 +83,7 @@ class E2eRoomKeysHandler: # we deliberately take the lock to get keys so that changing the version # works atomically - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): # make sure the backup version exists try: await self.store.get_e2e_room_keys_version_info(user_id, version) @@ -126,7 +126,7 @@ class E2eRoomKeysHandler: """ # lock for consistency with uploading - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): # make sure the backup version exists try: version_info = await self.store.get_e2e_room_keys_version_info( @@ -187,7 +187,7 @@ class E2eRoomKeysHandler: # TODO: Validate the JSON to make sure it has the right keys. # XXX: perhaps we should use a finer grained lock here? - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): # Check that the version we're trying to upload is the current version try: @@ -332,7 +332,7 @@ class E2eRoomKeysHandler: # TODO: Validate the JSON to make sure it has the right keys. # lock everyone out until we've switched version - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): new_version = await self.store.create_e2e_room_keys_version( user_id, version_info ) @@ -359,7 +359,7 @@ class E2eRoomKeysHandler: } """ - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): try: res = await self.store.get_e2e_room_keys_version_info(user_id, version) except StoreError as e: @@ -383,7 +383,7 @@ class E2eRoomKeysHandler: NotFoundError: if this backup version doesn't exist """ - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): try: await self.store.delete_e2e_room_keys_version(user_id, version) except StoreError as e: @@ -413,7 +413,7 @@ class E2eRoomKeysHandler: raise SynapseError( 400, "Version in body does not match", Codes.INVALID_PARAM ) - with (await self._upload_linearizer.queue(user_id)): + async with self._upload_linearizer.queue(user_id): try: old_info = await self.store.get_e2e_room_keys_version_info( user_id, version diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py index 350ec9c03a..78d149905f 100644 --- a/synapse/handlers/federation.py +++ b/synapse/handlers/federation.py @@ -151,7 +151,7 @@ class FederationHandler: return. This is used as part of the heuristic to decide if we should back paginate. """ - with (await self._room_backfill.queue(room_id)): + async with self._room_backfill.queue(room_id): return await self._maybe_backfill_inner(room_id, current_depth, limit) async def _maybe_backfill_inner( diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py index e7b9f15e13..03c1197c99 100644 --- a/synapse/handlers/federation_event.py +++ b/synapse/handlers/federation_event.py @@ -224,7 +224,7 @@ class FederationEventHandler: len(missing_prevs), shortstr(missing_prevs), ) - with (await self._room_pdu_linearizer.queue(pdu.room_id)): + async with self._room_pdu_linearizer.queue(pdu.room_id): logger.info( "Acquired room lock to fetch %d missing prev_events", len(missing_prevs), diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py index 766f597a55..7db6905c61 100644 --- a/synapse/handlers/message.py +++ b/synapse/handlers/message.py @@ -851,7 +851,7 @@ class EventCreationHandler: # a situation where event persistence can't keep up, causing # extremities to pile up, which in turn leads to state resolution # taking longer. - with (await self.limiter.queue(event_dict["room_id"])): + async with self.limiter.queue(event_dict["room_id"]): if txn_id and requester.access_token_id: existing_event_id = await self.store.get_event_id_from_transaction_id( event_dict["room_id"], diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py index dace31d87e..209a4b0e52 100644 --- a/synapse/handlers/presence.py +++ b/synapse/handlers/presence.py @@ -1030,7 +1030,7 @@ class PresenceHandler(BasePresenceHandler): is_syncing: Whether or not the user is now syncing sync_time_msec: Time in ms when the user was last syncing """ - with (await self.external_sync_linearizer.queue(process_id)): + async with self.external_sync_linearizer.queue(process_id): prev_state = await self.current_state_for_user(user_id) process_presence = self.external_process_to_current_syncs.setdefault( @@ -1071,7 +1071,7 @@ class PresenceHandler(BasePresenceHandler): Used when the process has stopped/disappeared. """ - with (await self.external_sync_linearizer.queue(process_id)): + async with self.external_sync_linearizer.queue(process_id): process_presence = self.external_process_to_current_syncs.pop( process_id, set() ) diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py index bad1acc634..05122fd5a6 100644 --- a/synapse/handlers/read_marker.py +++ b/synapse/handlers/read_marker.py @@ -40,7 +40,7 @@ class ReadMarkerHandler: the read marker has changed. """ - with await self.read_marker_linearizer.queue((room_id, user_id)): + async with self.read_marker_linearizer.queue((room_id, user_id)): existing_read_marker = await self.store.get_account_data_for_room_and_type( user_id, room_id, "m.fully_read" ) diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py index 51a08fd2c0..65d4aea9af 100644 --- a/synapse/handlers/room.py +++ b/synapse/handlers/room.py @@ -883,7 +883,7 @@ class RoomCreationHandler: # # we also don't need to check the requester's shadow-ban here, as we # have already done so above (and potentially emptied invite_list). - with (await self.room_member_handler.member_linearizer.queue((room_id,))): + async with self.room_member_handler.member_linearizer.queue((room_id,)): content = {} is_direct = config.get("is_direct", None) if is_direct: diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py index 0785e31114..802e57c4d0 100644 --- a/synapse/handlers/room_member.py +++ b/synapse/handlers/room_member.py @@ -515,8 +515,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta): # We first linearise by the application service (to try to limit concurrent joins # by application services), and then by room ID. - with (await self.member_as_limiter.queue(as_id)): - with (await self.member_linearizer.queue(key)): + async with self.member_as_limiter.queue(as_id): + async with self.member_linearizer.queue(key): result = await self.update_membership_locked( requester, target, diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py index 4f02a060d9..e4fe94e557 100644 --- a/synapse/handlers/sso.py +++ b/synapse/handlers/sso.py @@ -430,7 +430,7 @@ class SsoHandler: # grab a lock while we try to find a mapping for this user. This seems... # optimistic, especially for implementations that end up redirecting to # interstitial pages. - with await self._mapping_lock.queue(auth_provider_id): + async with self._mapping_lock.queue(auth_provider_id): # first of all, check if we already have a mapping for this user user_id = await self.get_sso_user_by_remote_user_id( auth_provider_id, |