summary refs log tree commit diff
path: root/synapse/storage
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage')
-rw-r--r--synapse/storage/databases/main/purge_events.py33
1 files changed, 31 insertions, 2 deletions
diff --git a/synapse/storage/databases/main/purge_events.py b/synapse/storage/databases/main/purge_events.py
index 549ce07c16..6d42276503 100644
--- a/synapse/storage/databases/main/purge_events.py
+++ b/synapse/storage/databases/main/purge_events.py
@@ -19,6 +19,8 @@ from synapse.api.errors import SynapseError
 from synapse.storage.database import LoggingTransaction
 from synapse.storage.databases.main import CacheInvalidationWorkerStore
 from synapse.storage.databases.main.state import StateGroupWorkerStore
+from synapse.storage.engines import PostgresEngine
+from synapse.storage.engines._base import IsolationLevel
 from synapse.types import RoomStreamToken
 
 logger = logging.getLogger(__name__)
@@ -317,11 +319,38 @@ class PurgeEventsStore(StateGroupWorkerStore, CacheInvalidationWorkerStore):
         Returns:
             The list of state groups to delete.
         """
-        return await self.db_pool.runInteraction(
-            "purge_room", self._purge_room_txn, room_id
+
+        # This first runs the purge transaction with READ_COMMITTED isolation level,
+        # meaning any new rows in the tables will not trigger a serialization error.
+        # We then run the same purge a second time without this isolation level to
+        # purge any of those rows which were added during the first.
+
+        state_groups_to_delete = await self.db_pool.runInteraction(
+            "purge_room",
+            self._purge_room_txn,
+            room_id=room_id,
+            isolation_level=IsolationLevel.READ_COMMITTED,
+        )
+
+        state_groups_to_delete.extend(
+            await self.db_pool.runInteraction(
+                "purge_room",
+                self._purge_room_txn,
+                room_id=room_id,
+            ),
         )
 
+        return state_groups_to_delete
+
     def _purge_room_txn(self, txn: LoggingTransaction, room_id: str) -> List[int]:
+        # This collides with event persistence so we cannot write new events and metadata into
+        # a room while deleting it or this transaction will fail.
+        if isinstance(self.database_engine, PostgresEngine):
+            txn.execute(
+                "SELECT room_version FROM rooms WHERE room_id = ? FOR UPDATE",
+                (room_id,),
+            )
+
         # First, fetch all the state groups that should be deleted, before
         # we delete that information.
         txn.execute(