diff options
author | Nick Mills-Barrett <nick@beeper.com> | 2022-07-18 15:17:24 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-07-18 14:17:24 +0100 |
commit | 6785b0f39d8f920a7b91a8a6a043ede08eb277e4 (patch) | |
tree | acaa3119a6b6e72b9bde0eeac85b0caf35433392 /synapse | |
parent | Update expected DB query count when creating a room (#13307) (diff) | |
download | synapse-6785b0f39d8f920a7b91a8a6a043ede08eb277e4.tar.xz |
Use READ COMMITTED isolation level when purging rooms (#12942)
To close: #10294. Signed off by Nick @ Beeper.
Diffstat (limited to '')
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 549ce07c16..6d42276503 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -19,6 +19,8 @@ from synapse.api.errors import SynapseError from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main import CacheInvalidationWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore +from synapse.storage.engines import PostgresEngine +from synapse.storage.engines._base import IsolationLevel from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) @@ -317,11 +319,38 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): Returns: The list of state groups to delete. """ - return await self.db_pool.runInteraction( - "purge_room", self._purge_room_txn, room_id + + # This first runs the purge transaction with READ_COMMITTED isolation level, + # meaning any new rows in the tables will not trigger a serialization error. + # We then run the same purge a second time without this isolation level to + # purge any of those rows which were added during the first. + + state_groups_to_delete = await self.db_pool.runInteraction( + "purge_room", + self._purge_room_txn, + room_id=room_id, + isolation_level=IsolationLevel.READ_COMMITTED, + ) + + state_groups_to_delete.extend( + await self.db_pool.runInteraction( + "purge_room", + self._purge_room_txn, + room_id=room_id, + ), ) + return state_groups_to_delete + def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: + # This collides with event persistence so we cannot write new events and metadata into + # a room while deleting it or this transaction will fail. + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "SELECT room_version FROM rooms WHERE room_id = ? FOR UPDATE", + (room_id,), + ) + # First, fetch all the state groups that should be deleted, before # we delete that information. txn.execute( |