diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index ba385f9fc4..f6822707e4 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -19,6 +19,8 @@ from synapse.api.errors import SynapseError
from synapse.storage.database import LoggingTransaction
from synapse.storage.databases.main import CacheInvalidationWorkerStore
from synapse.storage.databases.main.state import StateGroupWorkerStore
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines._base import IsolationLevel
from synapse.types import RoomStreamToken
logger = logging.getLogger(__name__)
@@ -214,10 +216,10 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
# Delete all remote non-state events
for table in (
+ "event_edges",
"events",
"event_json",
"event_auth",
- "event_edges",
"event_forward_extremities",
"event_relations",
"event_search",
@@ -302,7 +304,7 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
self._invalidate_cache_and_stream(
txn, self.have_seen_event, (room_id, event_id)
)
- self._invalidate_get_event_cache(event_id)
+ self.invalidate_get_event_cache_after_txn(txn, event_id)
logger.info("[purge] done")
@@ -317,11 +319,38 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
Returns:
The list of state groups to delete.
"""
- return await self.db_pool.runInteraction(
- "purge_room", self._purge_room_txn, room_id
+
+ # This first runs the purge transaction with READ_COMMITTED isolation level,
+ # meaning any new rows in the tables will not trigger a serialization error.
+ # We then run the same purge a second time without this isolation level to
+ # purge any of those rows which were added during the first.
+
+ state_groups_to_delete = await self.db_pool.runInteraction(
+ "purge_room",
+ self._purge_room_txn,
+ room_id=room_id,
+ isolation_level=IsolationLevel.READ_COMMITTED,
+ )
+
+ state_groups_to_delete.extend(
+ await self.db_pool.runInteraction(
+ "purge_room",
+ self._purge_room_txn,
+ room_id=room_id,
+ ),
)
+ return state_groups_to_delete
+
def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
+ # This collides with event persistence so we cannot write new events and metadata into
+ # a room while deleting it or this transaction will fail.
+ if isinstance(self.database_engine, PostgresEngine):
+ txn.execute(
+ "SELECT room_version FROM rooms WHERE room_id = ? FOR UPDATE",
+ (room_id,),
+ )
+
# First, fetch all the state groups that should be deleted, before
# we delete that information.
txn.execute(
|