diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index d4b1c20a45..f77c46f6ae 100644
--- a/synapse/storage/databases/state/deletion.py
+++ b/synapse/storage/databases/state/deletion.py
@@ -321,19 +321,43 @@ class StateDeletionDataStore:
async def mark_state_groups_as_pending_deletion(
self, state_groups: Collection[int]
) -> None:
- """Mark the given state groups as pending deletion"""
+ """Mark the given state groups as pending deletion.
- now = self._clock.time_msec()
+ If any of the state groups are already pending deletion, then those records are
+ left as is.
+ """
- await self.db_pool.simple_upsert_many(
- table="state_groups_pending_deletion",
- key_names=("state_group",),
- key_values=[(state_group,) for state_group in state_groups],
- value_names=("insertion_ts",),
- value_values=[(now,) for _ in state_groups],
- desc="mark_state_groups_as_pending_deletion",
+ await self.db_pool.runInteraction(
+ "mark_state_groups_as_pending_deletion",
+ self._mark_state_groups_as_pending_deletion_txn,
+ state_groups,
)
+ def _mark_state_groups_as_pending_deletion_txn(
+ self,
+ txn: LoggingTransaction,
+ state_groups: Collection[int],
+ ) -> None:
+ sql = """
+ INSERT INTO state_groups_pending_deletion (state_group, insertion_ts)
+ VALUES %s
+ ON CONFLICT (state_group)
+ DO NOTHING
+ """
+
+ now = self._clock.time_msec()
+ rows = [
+ (
+ state_group,
+ now,
+ )
+ for state_group in state_groups
+ ]
+ if isinstance(txn.database_engine, PostgresEngine):
+ txn.execute_values(sql % ("?",), rows, fetch=False)
+ else:
+ txn.execute_batch(sql % ("(?, ?)",), rows)
+
async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None:
"""Mark the given state groups as now being referenced"""
|