diff options
Diffstat (limited to 'synapse/storage')
-rw-r--r-- | synapse/storage/databases/main/purge_events.py | 33 |
1 files changed, 31 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py index 549ce07c16..6d42276503 100644 --- a/synapse/storage/databases/main/purge_events.py +++ b/synapse/storage/databases/main/purge_events.py @@ -19,6 +19,8 @@ from synapse.api.errors import SynapseError from synapse.storage.database import LoggingTransaction from synapse.storage.databases.main import CacheInvalidationWorkerStore from synapse.storage.databases.main.state import StateGroupWorkerStore +from synapse.storage.engines import PostgresEngine +from synapse.storage.engines._base import IsolationLevel from synapse.types import RoomStreamToken logger = logging.getLogger(__name__) @@ -317,11 +319,38 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore): Returns: The list of state groups to delete. """ - return await self.db_pool.runInteraction( - "purge_room", self._purge_room_txn, room_id + + # This first runs the purge transaction with READ_COMMITTED isolation level, + # meaning any new rows in the tables will not trigger a serialization error. + # We then run the same purge a second time without this isolation level to + # purge any of those rows which were added during the first. + + state_groups_to_delete = await self.db_pool.runInteraction( + "purge_room", + self._purge_room_txn, + room_id=room_id, + isolation_level=IsolationLevel.READ_COMMITTED, + ) + + state_groups_to_delete.extend( + await self.db_pool.runInteraction( + "purge_room", + self._purge_room_txn, + room_id=room_id, + ), ) + return state_groups_to_delete + def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]: + # This collides with event persistence so we cannot write new events and metadata into + # a room while deleting it or this transaction will fail. + if isinstance(self.database_engine, PostgresEngine): + txn.execute( + "SELECT room_version FROM rooms WHERE room_id = ? FOR UPDATE", + (room_id,), + ) + # First, fetch all the state groups that should be deleted, before # we delete that information. txn.execute( |