summary refs log tree commit diff
path: root/synapse/storage/databases
diff options
context:
space:
mode:
Diffstat (limited to 'synapse/storage/databases')
-rw-r--r--synapse/storage/databases/state/bg_updates.py10
-rw-r--r--synapse/storage/databases/state/deletion.py42
2 files changed, 42 insertions, 10 deletions
diff --git a/synapse/storage/databases/state/bg_updates.py b/synapse/storage/databases/state/bg_updates.py

index f7824cba0f..95fd0ae73a 100644 --- a/synapse/storage/databases/state/bg_updates.py +++ b/synapse/storage/databases/state/bg_updates.py
@@ -20,7 +20,15 @@ # import logging -from typing import TYPE_CHECKING, Dict, List, Mapping, Optional, Tuple, Union +from typing import ( + TYPE_CHECKING, + Dict, + List, + Mapping, + Optional, + Tuple, + Union, +) from synapse.logging.opentracing import tag_args, trace from synapse.storage._base import SQLBaseStore diff --git a/synapse/storage/databases/state/deletion.py b/synapse/storage/databases/state/deletion.py
index d4b1c20a45..f77c46f6ae 100644 --- a/synapse/storage/databases/state/deletion.py +++ b/synapse/storage/databases/state/deletion.py
@@ -321,19 +321,43 @@ class StateDeletionDataStore: async def mark_state_groups_as_pending_deletion( self, state_groups: Collection[int] ) -> None: - """Mark the given state groups as pending deletion""" + """Mark the given state groups as pending deletion. - now = self._clock.time_msec() + If any of the state groups are already pending deletion, then those records are + left as is. + """ - await self.db_pool.simple_upsert_many( - table="state_groups_pending_deletion", - key_names=("state_group",), - key_values=[(state_group,) for state_group in state_groups], - value_names=("insertion_ts",), - value_values=[(now,) for _ in state_groups], - desc="mark_state_groups_as_pending_deletion", + await self.db_pool.runInteraction( + "mark_state_groups_as_pending_deletion", + self._mark_state_groups_as_pending_deletion_txn, + state_groups, ) + def _mark_state_groups_as_pending_deletion_txn( + self, + txn: LoggingTransaction, + state_groups: Collection[int], + ) -> None: + sql = """ + INSERT INTO state_groups_pending_deletion (state_group, insertion_ts) + VALUES %s + ON CONFLICT (state_group) + DO NOTHING + """ + + now = self._clock.time_msec() + rows = [ + ( + state_group, + now, + ) + for state_group in state_groups + ] + if isinstance(txn.database_engine, PostgresEngine): + txn.execute_values(sql % ("?",), rows, fetch=False) + else: + txn.execute_batch(sql % ("(?, ?)",), rows) + async def mark_state_groups_as_used(self, state_groups: Collection[int]) -> None: """Mark the given state groups as now being referenced"""