diff --git a/synapse/handlers/pagination.py b/synapse/handlers/pagination.py
index 19b8728db9..da34658470 100644
--- a/synapse/handlers/pagination.py
+++ b/synapse/handlers/pagination.py
@@ -46,6 +46,11 @@ logger = logging.getLogger(__name__)
BACKFILL_BECAUSE_TOO_MANY_GAPS_THRESHOLD = 3
+PURGE_HISTORY_LOCK_NAME = "purge_history_lock"
+
+DELETE_ROOM_LOCK_NAME = "delete_room_lock"
+
+
@attr.s(slots=True, auto_attribs=True)
class PurgeStatus:
"""Object tracking the status of a purge request
@@ -142,6 +147,7 @@ class PaginationHandler:
self._server_name = hs.hostname
self._room_shutdown_handler = hs.get_room_shutdown_handler()
self._relations_handler = hs.get_relations_handler()
+ self._worker_locks = hs.get_worker_locks_handler()
self.pagination_lock = ReadWriteLock()
# IDs of rooms in which there currently an active purge *or delete* operation.
@@ -356,7 +362,9 @@ class PaginationHandler:
"""
self._purges_in_progress_by_room.add(room_id)
try:
- async with self.pagination_lock.write(room_id):
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_HISTORY_LOCK_NAME, room_id, write=True
+ ):
await self._storage_controllers.purge_events.purge_history(
room_id, token, delete_local_events
)
@@ -412,7 +420,10 @@ class PaginationHandler:
room_id: room to be purged
force: set true to skip checking for joined users.
"""
- async with self.pagination_lock.write(room_id):
+ async with self._worker_locks.acquire_multi_read_write_lock(
+ [(PURGE_HISTORY_LOCK_NAME, room_id), (DELETE_ROOM_LOCK_NAME, room_id)],
+ write=True,
+ ):
# first check that we have no users in this room
if not force:
joined = await self.store.is_host_joined(room_id, self._server_name)
@@ -471,7 +482,9 @@ class PaginationHandler:
room_token = from_token.room_key
- async with self.pagination_lock.read(room_id):
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_HISTORY_LOCK_NAME, room_id, write=False
+ ):
(membership, member_event_id) = (None, None)
if not use_admin_priviledge:
(
@@ -747,7 +760,9 @@ class PaginationHandler:
self._purges_in_progress_by_room.add(room_id)
try:
- async with self.pagination_lock.write(room_id):
+ async with self._worker_locks.acquire_read_write_lock(
+ PURGE_HISTORY_LOCK_NAME, room_id, write=True
+ ):
self._delete_by_id[delete_id].status = DeleteStatus.STATUS_SHUTTING_DOWN
self._delete_by_id[
delete_id
|