diff --git a/changelog.d/14469.misc b/changelog.d/14469.misc
new file mode 100644
index 0000000000..a12a21e9ae
--- /dev/null
+++ b/changelog.d/14469.misc
@@ -0,0 +1 @@
+Remove option to skip locking of tables when performing emulated upserts, to avoid a class of bugs in future.
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index a14b13aec8..55bcb90001 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1129,7 +1129,6 @@ class DatabasePool:
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
desc: str = "simple_upsert",
- lock: bool = True,
) -> bool:
"""Insert a row with values + insertion_values; on conflict, update with values.
@@ -1154,21 +1153,12 @@ class DatabasePool:
requiring that a unique index exist on the column names used to detect a
conflict (i.e. `keyvalues.keys()`).
- If there is no such index, we can "emulate" an upsert with a SELECT followed
- by either an INSERT or an UPDATE. This is unsafe: we cannot make the same
- atomicity guarantees that a native upsert can and are very vulnerable to races
- and crashes. Therefore if we wish to upsert without an appropriate unique index,
- we must either:
-
- 1. Acquire a table-level lock before the emulated upsert (`lock=True`), or
- 2. VERY CAREFULLY ensure that we are the only thread and worker which will be
- writing to this table, in which case we can proceed without a lock
- (`lock=False`).
-
- Generally speaking, you should use `lock=True`. If the table in question has a
- unique index[*], this class will use a native upsert (which is atomic and so can
- ignore the `lock` argument). Otherwise this class will use an emulated upsert,
- in which case we want the safer option unless we been VERY CAREFUL.
+ If there is no such index yet[*], we can "emulate" an upsert with a SELECT
+ followed by either an INSERT or an UPDATE. This is unsafe unless *all* upserters
+ run at the SERIALIZABLE isolation level: we cannot make the same atomicity
+ guarantees that a native upsert can and are very vulnerable to races and
+ crashes. Therefore to upsert without an appropriate unique index, we acquire a
+ table-level lock before the emulated upsert.
[*]: Some tables have unique indices added to them in the background. Those
tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES,
@@ -1189,7 +1179,6 @@ class DatabasePool:
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
desc: description of the transaction, for logging and metrics
- lock: True to lock the table when doing the upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
@@ -1209,7 +1198,6 @@ class DatabasePool:
keyvalues,
values,
insertion_values,
- lock=lock,
db_autocommit=autocommit,
)
except self.engine.module.IntegrityError as e:
@@ -1232,7 +1220,6 @@ class DatabasePool:
values: Dict[str, Any],
insertion_values: Optional[Dict[str, Any]] = None,
where_clause: Optional[str] = None,
- lock: bool = True,
) -> bool:
"""
Pick the UPSERT method which works best on the platform. Either the
@@ -1245,8 +1232,6 @@ class DatabasePool:
values: The nonunique columns and their new values
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
- lock: True to lock the table when doing the upsert. Unused when performing
- a native upsert.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
@@ -1270,7 +1255,6 @@ class DatabasePool:
values,
insertion_values=insertion_values,
where_clause=where_clause,
- lock=lock,
)
def simple_upsert_txn_emulated(
@@ -1291,14 +1275,15 @@ class DatabasePool:
insertion_values: additional key/values to use only when inserting
where_clause: An index predicate to apply to the upsert.
lock: True to lock the table when doing the upsert.
+ Must not be False unless the table has already been locked.
Returns:
Returns True if a row was inserted or updated (i.e. if `values` is
not empty then this always returns True)
"""
insertion_values = insertion_values or {}
- # We need to lock the table :(, unless we're *really* careful
if lock:
+ # We need to lock the table :(
self.engine.lock_table(txn, table)
def _getwhere(key: str) -> str:
@@ -1406,7 +1391,6 @@ class DatabasePool:
value_names: Collection[str],
value_values: Collection[Collection[Any]],
desc: str,
- lock: bool = True,
) -> None:
"""
Upsert, many times.
@@ -1418,8 +1402,6 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused when performing
- a native upsert.
"""
# We can autocommit if it safe to upsert
@@ -1433,7 +1415,6 @@ class DatabasePool:
key_values,
value_names,
value_values,
- lock=lock,
db_autocommit=autocommit,
)
@@ -1445,7 +1426,6 @@ class DatabasePool:
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
- lock: bool = True,
) -> None:
"""
Upsert, many times.
@@ -1457,8 +1437,6 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert. Unused when performing
- a native upsert.
"""
if table not in self._unsafe_to_upsert_tables:
return self.simple_upsert_many_txn_native_upsert(
@@ -1466,7 +1444,12 @@ class DatabasePool:
)
else:
return self.simple_upsert_many_txn_emulated(
- txn, table, key_names, key_values, value_names, value_values, lock=lock
+ txn,
+ table,
+ key_names,
+ key_values,
+ value_names,
+ value_values,
)
def simple_upsert_many_txn_emulated(
@@ -1477,7 +1460,6 @@ class DatabasePool:
key_values: Collection[Iterable[Any]],
value_names: Collection[str],
value_values: Iterable[Iterable[Any]],
- lock: bool = True,
) -> None:
"""
Upsert, many times, but without native UPSERT support or batching.
@@ -1489,18 +1471,16 @@ class DatabasePool:
value_names: The value column names
value_values: A list of each row's value column values.
Ignored if value_names is empty.
- lock: True to lock the table when doing the upsert.
"""
# No value columns, therefore make a blank list so that the following
# zip() works correctly.
if not value_names:
value_values = [() for x in range(len(key_values))]
- if lock:
- # Lock the table just once, to prevent it being done once per row.
- # Note that, according to Postgres' documentation, once obtained,
- # the lock is held for the remainder of the current transaction.
- self.engine.lock_table(txn, "user_ips")
+ # Lock the table just once, to prevent it being done once per row.
+ # Note that, according to Postgres' documentation, once obtained,
+ # the lock is held for the remainder of the current transaction.
+ self.engine.lock_table(txn, "user_ips")
for keyv, valv in zip(key_values, value_values):
_keys = {x: y for x, y in zip(key_names, keyv)}
diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py
index 282687ebce..07908c41d9 100644
--- a/synapse/storage/databases/main/account_data.py
+++ b/synapse/storage/databases/main/account_data.py
@@ -449,9 +449,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
content_json = json_encoder.encode(content)
async with self._account_data_id_gen.get_next() as next_id:
- # no need to lock here as room_account_data has a unique constraint
- # on (user_id, room_id, account_data_type) so simple_upsert will
- # retry if there is a conflict.
await self.db_pool.simple_upsert(
desc="add_room_account_data",
table="room_account_data",
@@ -461,7 +458,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
"account_data_type": account_data_type,
},
values={"stream_id": next_id, "content": content_json},
- lock=False,
)
self._account_data_stream_cache.entity_has_changed(user_id, next_id)
@@ -517,15 +513,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore)
) -> None:
content_json = json_encoder.encode(content)
- # no need to lock here as account_data has a unique constraint on
- # (user_id, account_data_type) so simple_upsert will retry if
- # there is a conflict.
self.db_pool.simple_upsert_txn(
txn,
table="account_data",
keyvalues={"user_id": user_id, "account_data_type": account_data_type},
values={"stream_id": next_id, "content": content_json},
- lock=False,
)
# Ignored users get denormalized into a separate table as an optimisation.
diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py
index 63046c0527..25da0c56c5 100644
--- a/synapse/storage/databases/main/appservice.py
+++ b/synapse/storage/databases/main/appservice.py
@@ -451,8 +451,6 @@ class ApplicationServiceTransactionWorkerStore(
table="application_services_state",
keyvalues={"as_id": service.id},
values={f"{stream_type}_stream_id": pos},
- # no need to lock when emulating upsert: as_id is a unique key
- lock=False,
desc="set_appservice_stream_type_pos",
)
diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py
index 05a193f889..534f7fc04a 100644
--- a/synapse/storage/databases/main/devices.py
+++ b/synapse/storage/databases/main/devices.py
@@ -1744,9 +1744,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_cache",
keyvalues={"user_id": user_id, "device_id": device_id},
values={"content": json_encoder.encode(content)},
- # we don't need to lock, because we assume we are the only thread
- # updating this user's devices.
- lock=False,
)
txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id))
@@ -1760,9 +1757,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
- # again, we can assume we are the only thread updating this user's
- # extremity.
- lock=False,
)
async def update_remote_device_list_cache(
@@ -1815,9 +1809,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore):
table="device_lists_remote_extremeties",
keyvalues={"user_id": user_id},
values={"stream_id": stream_id},
- # we don't need to lock, because we can assume we are the only thread
- # updating this user's extremity.
- lock=False,
)
async def add_device_change_to_streams(
diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py
index 309a4ba664..bbee02ab18 100644
--- a/synapse/storage/databases/main/event_federation.py
+++ b/synapse/storage/databases/main/event_federation.py
@@ -1686,7 +1686,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas
},
insertion_values={},
desc="insert_insertion_extremity",
- lock=False,
)
async def insert_received_event_to_staging(
diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py
index fee37b9ce4..40fd781a6a 100644
--- a/synapse/storage/databases/main/pusher.py
+++ b/synapse/storage/databases/main/pusher.py
@@ -325,14 +325,11 @@ class PusherWorkerStore(SQLBaseStore):
async def set_throttle_params(
self, pusher_id: str, room_id: str, params: ThrottleParams
) -> None:
- # no need to lock because `pusher_throttle` has a primary key on
- # (pusher, room_id) so simple_upsert will retry
await self.db_pool.simple_upsert(
"pusher_throttle",
{"pusher": pusher_id, "room_id": room_id},
{"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms},
desc="set_throttle_params",
- lock=False,
)
async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int:
@@ -589,8 +586,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
device_id: Optional[str] = None,
) -> None:
async with self._pushers_id_gen.get_next() as stream_id:
- # no need to lock because `pushers` has a unique key on
- # (app_id, pushkey, user_name) so simple_upsert will retry
await self.db_pool.simple_upsert(
table="pushers",
keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id},
@@ -609,7 +604,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore):
"device_id": device_id,
},
desc="add_pusher",
- lock=False,
)
user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate(
diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py
index 52ad947c6c..1309bfd374 100644
--- a/synapse/storage/databases/main/room.py
+++ b/synapse/storage/databases/main/room.py
@@ -1847,9 +1847,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"creator": room_creator,
"has_auth_chain_index": has_auth_chain_index,
},
- # rooms has a unique constraint on room_id, so no need to lock when doing an
- # emulated upsert.
- lock=False,
)
async def store_partial_state_room(
@@ -1970,9 +1967,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore):
"creator": "",
"has_auth_chain_index": has_auth_chain_index,
},
- # rooms has a unique constraint on room_id, so no need to lock when doing an
- # emulated upsert.
- lock=False,
)
async def set_room_is_public(self, room_id: str, is_public: bool) -> None:
diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py
index 39e80f6f5b..131f357d04 100644
--- a/synapse/storage/databases/main/room_batch.py
+++ b/synapse/storage/databases/main/room_batch.py
@@ -44,6 +44,4 @@ class RoomBatchStore(SQLBaseStore):
table="event_to_state_groups",
keyvalues={"event_id": event_id},
values={"state_group": state_group_id, "event_id": event_id},
- # Unique constraint on event_id so we don't have to lock
- lock=False,
)
diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py
index 698d6f7515..044435deab 100644
--- a/synapse/storage/databases/main/user_directory.py
+++ b/synapse/storage/databases/main/user_directory.py
@@ -481,7 +481,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
table="user_directory",
keyvalues={"user_id": user_id},
values={"display_name": display_name, "avatar_url": avatar_url},
- lock=False, # We're only inserter
)
if isinstance(self.database_engine, PostgresEngine):
@@ -511,7 +510,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore):
table="user_directory_search",
keyvalues={"user_id": user_id},
values={"value": value},
- lock=False, # We're only inserter
)
else:
# This should be unreachable.
|