diff --git a/synapse/handlers/appservice.py b/synapse/handlers/appservice.py
index 316c4b677c..1b57840506 100644
--- a/synapse/handlers/appservice.py
+++ b/synapse/handlers/appservice.py
@@ -330,10 +330,8 @@ class ApplicationServicesHandler:
continue
# Since we read/update the stream position for this AS/stream
- with (
- await self._ephemeral_events_linearizer.queue(
- (service.id, stream_key)
- )
+ async with self._ephemeral_events_linearizer.queue(
+ (service.id, stream_key)
):
if stream_key == "receipt_key":
events = await self._handle_receipts(service, new_token)
diff --git a/synapse/handlers/device.py b/synapse/handlers/device.py
index c710c02cf9..ffa28b2a30 100644
--- a/synapse/handlers/device.py
+++ b/synapse/handlers/device.py
@@ -833,7 +833,7 @@ class DeviceListUpdater:
async def _handle_device_updates(self, user_id: str) -> None:
"Actually handle pending updates."
- with (await self._remote_edu_linearizer.queue(user_id)):
+ async with self._remote_edu_linearizer.queue(user_id):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
diff --git a/synapse/handlers/e2e_keys.py b/synapse/handlers/e2e_keys.py
index d96456cd40..d6714228ef 100644
--- a/synapse/handlers/e2e_keys.py
+++ b/synapse/handlers/e2e_keys.py
@@ -118,7 +118,7 @@ class E2eKeysHandler:
from_device_id: the device making the query. This is used to limit
the number of in-flight queries at a time.
"""
- with await self._query_devices_linearizer.queue((from_user_id, from_device_id)):
+ async with self._query_devices_linearizer.queue((from_user_id, from_device_id)):
device_keys_query: Dict[str, Iterable[str]] = query_body.get(
"device_keys", {}
)
@@ -1386,7 +1386,7 @@ class SigningKeyEduUpdater:
device_handler = self.e2e_keys_handler.device_handler
device_list_updater = device_handler.device_list_updater
- with (await self._remote_edu_linearizer.queue(user_id)):
+ async with self._remote_edu_linearizer.queue(user_id):
pending_updates = self._pending_updates.pop(user_id, [])
if not pending_updates:
# This can happen since we batch updates
diff --git a/synapse/handlers/e2e_room_keys.py b/synapse/handlers/e2e_room_keys.py
index 52e44a2d42..446f509bdc 100644
--- a/synapse/handlers/e2e_room_keys.py
+++ b/synapse/handlers/e2e_room_keys.py
@@ -83,7 +83,7 @@ class E2eRoomKeysHandler:
# we deliberately take the lock to get keys so that changing the version
# works atomically
- with (await self._upload_linearizer.queue(user_id)):
+ async with self._upload_linearizer.queue(user_id):
# make sure the backup version exists
try:
await self.store.get_e2e_room_keys_version_info(user_id, version)
@@ -126,7 +126,7 @@ class E2eRoomKeysHandler:
"""
# lock for consistency with uploading
- with (await self._upload_linearizer.queue(user_id)):
+ async with self._upload_linearizer.queue(user_id):
# make sure the backup version exists
try:
version_info = await self.store.get_e2e_room_keys_version_info(
@@ -187,7 +187,7 @@ class E2eRoomKeysHandler:
# TODO: Validate the JSON to make sure it has the right keys.
# XXX: perhaps we should use a finer grained lock here?
- with (await self._upload_linearizer.queue(user_id)):
+ async with self._upload_linearizer.queue(user_id):
# Check that the version we're trying to upload is the current version
try:
@@ -332,7 +332,7 @@ class E2eRoomKeysHandler:
# TODO: Validate the JSON to make sure it has the right keys.
# lock everyone out until we've switched version
- with (await self._upload_linearizer.queue(user_id)):
+ async with self._upload_linearizer.queue(user_id):
new_version = await self.store.create_e2e_room_keys_version(
user_id, version_info
)
@@ -359,7 +359,7 @@ class E2eRoomKeysHandler:
}
"""
- with (await self._upload_linearizer.queue(user_id)):
+ async with self._upload_linearizer.queue(user_id):
try:
res = await self.store.get_e2e_room_keys_version_info(user_id, version)
except StoreError as e:
@@ -383,7 +383,7 @@ class E2eRoomKeysHandler:
NotFoundError: if this backup version doesn't exist
"""
- with (await self._upload_linearizer.queue(user_id)):
+ async with self._upload_linearizer.queue(user_id):
try:
await self.store.delete_e2e_room_keys_version(user_id, version)
except StoreError as e:
@@ -413,7 +413,7 @@ class E2eRoomKeysHandler:
raise SynapseError(
400, "Version in body does not match", Codes.INVALID_PARAM
)
- with (await self._upload_linearizer.queue(user_id)):
+ async with self._upload_linearizer.queue(user_id):
try:
old_info = await self.store.get_e2e_room_keys_version_info(
user_id, version
diff --git a/synapse/handlers/federation.py b/synapse/handlers/federation.py
index 350ec9c03a..78d149905f 100644
--- a/synapse/handlers/federation.py
+++ b/synapse/handlers/federation.py
@@ -151,7 +151,7 @@ class FederationHandler:
return. This is used as part of the heuristic to decide if we
should back paginate.
"""
- with (await self._room_backfill.queue(room_id)):
+ async with self._room_backfill.queue(room_id):
return await self._maybe_backfill_inner(room_id, current_depth, limit)
async def _maybe_backfill_inner(
diff --git a/synapse/handlers/federation_event.py b/synapse/handlers/federation_event.py
index e7b9f15e13..03c1197c99 100644
--- a/synapse/handlers/federation_event.py
+++ b/synapse/handlers/federation_event.py
@@ -224,7 +224,7 @@ class FederationEventHandler:
len(missing_prevs),
shortstr(missing_prevs),
)
- with (await self._room_pdu_linearizer.queue(pdu.room_id)):
+ async with self._room_pdu_linearizer.queue(pdu.room_id):
logger.info(
"Acquired room lock to fetch %d missing prev_events",
len(missing_prevs),
diff --git a/synapse/handlers/message.py b/synapse/handlers/message.py
index 766f597a55..7db6905c61 100644
--- a/synapse/handlers/message.py
+++ b/synapse/handlers/message.py
@@ -851,7 +851,7 @@ class EventCreationHandler:
# a situation where event persistence can't keep up, causing
# extremities to pile up, which in turn leads to state resolution
# taking longer.
- with (await self.limiter.queue(event_dict["room_id"])):
+ async with self.limiter.queue(event_dict["room_id"]):
if txn_id and requester.access_token_id:
existing_event_id = await self.store.get_event_id_from_transaction_id(
event_dict["room_id"],
diff --git a/synapse/handlers/presence.py b/synapse/handlers/presence.py
index dace31d87e..209a4b0e52 100644
--- a/synapse/handlers/presence.py
+++ b/synapse/handlers/presence.py
@@ -1030,7 +1030,7 @@ class PresenceHandler(BasePresenceHandler):
is_syncing: Whether or not the user is now syncing
sync_time_msec: Time in ms when the user was last syncing
"""
- with (await self.external_sync_linearizer.queue(process_id)):
+ async with self.external_sync_linearizer.queue(process_id):
prev_state = await self.current_state_for_user(user_id)
process_presence = self.external_process_to_current_syncs.setdefault(
@@ -1071,7 +1071,7 @@ class PresenceHandler(BasePresenceHandler):
Used when the process has stopped/disappeared.
"""
- with (await self.external_sync_linearizer.queue(process_id)):
+ async with self.external_sync_linearizer.queue(process_id):
process_presence = self.external_process_to_current_syncs.pop(
process_id, set()
)
diff --git a/synapse/handlers/read_marker.py b/synapse/handlers/read_marker.py
index bad1acc634..05122fd5a6 100644
--- a/synapse/handlers/read_marker.py
+++ b/synapse/handlers/read_marker.py
@@ -40,7 +40,7 @@ class ReadMarkerHandler:
the read marker has changed.
"""
- with await self.read_marker_linearizer.queue((room_id, user_id)):
+ async with self.read_marker_linearizer.queue((room_id, user_id)):
existing_read_marker = await self.store.get_account_data_for_room_and_type(
user_id, room_id, "m.fully_read"
)
diff --git a/synapse/handlers/room.py b/synapse/handlers/room.py
index 51a08fd2c0..65d4aea9af 100644
--- a/synapse/handlers/room.py
+++ b/synapse/handlers/room.py
@@ -883,7 +883,7 @@ class RoomCreationHandler:
#
# we also don't need to check the requester's shadow-ban here, as we
# have already done so above (and potentially emptied invite_list).
- with (await self.room_member_handler.member_linearizer.queue((room_id,))):
+ async with self.room_member_handler.member_linearizer.queue((room_id,)):
content = {}
is_direct = config.get("is_direct", None)
if is_direct:
diff --git a/synapse/handlers/room_member.py b/synapse/handlers/room_member.py
index 0785e31114..802e57c4d0 100644
--- a/synapse/handlers/room_member.py
+++ b/synapse/handlers/room_member.py
@@ -515,8 +515,8 @@ class RoomMemberHandler(metaclass=abc.ABCMeta):
# We first linearise by the application service (to try to limit concurrent joins
# by application services), and then by room ID.
- with (await self.member_as_limiter.queue(as_id)):
- with (await self.member_linearizer.queue(key)):
+ async with self.member_as_limiter.queue(as_id):
+ async with self.member_linearizer.queue(key):
result = await self.update_membership_locked(
requester,
target,
diff --git a/synapse/handlers/sso.py b/synapse/handlers/sso.py
index 4f02a060d9..e4fe94e557 100644
--- a/synapse/handlers/sso.py
+++ b/synapse/handlers/sso.py
@@ -430,7 +430,7 @@ class SsoHandler:
# grab a lock while we try to find a mapping for this user. This seems...
# optimistic, especially for implementations that end up redirecting to
# interstitial pages.
- with await self._mapping_lock.queue(auth_provider_id):
+ async with self._mapping_lock.queue(auth_provider_id):
# first of all, check if we already have a mapping for this user
user_id = await self.get_sso_user_by_remote_user_id(
auth_provider_id,
|