diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/storage/database.py | 56 | ||||
-rw-r--r-- | synapse/storage/databases/main/account_data.py | 8 | ||||
-rw-r--r-- | synapse/storage/databases/main/appservice.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/devices.py | 9 | ||||
-rw-r--r-- | synapse/storage/databases/main/event_federation.py | 1 | ||||
-rw-r--r-- | synapse/storage/databases/main/pusher.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/room.py | 6 | ||||
-rw-r--r-- | synapse/storage/databases/main/room_batch.py | 2 | ||||
-rw-r--r-- | synapse/storage/databases/main/user_directory.py | 2 |
9 files changed, 18 insertions, 74 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py index a14b13aec8..55bcb90001 100644 --- a/synapse/storage/database.py +++ b/synapse/storage/database.py @@ -1129,7 +1129,6 @@ class DatabasePool: values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, desc: str = "simple_upsert", - lock: bool = True, ) -> bool: """Insert a row with values + insertion_values; on conflict, update with values. @@ -1154,21 +1153,12 @@ class DatabasePool: requiring that a unique index exist on the column names used to detect a conflict (i.e. `keyvalues.keys()`). - If there is no such index, we can "emulate" an upsert with a SELECT followed - by either an INSERT or an UPDATE. This is unsafe: we cannot make the same - atomicity guarantees that a native upsert can and are very vulnerable to races - and crashes. Therefore if we wish to upsert without an appropriate unique index, - we must either: - - 1. Acquire a table-level lock before the emulated upsert (`lock=True`), or - 2. VERY CAREFULLY ensure that we are the only thread and worker which will be - writing to this table, in which case we can proceed without a lock - (`lock=False`). - - Generally speaking, you should use `lock=True`. If the table in question has a - unique index[*], this class will use a native upsert (which is atomic and so can - ignore the `lock` argument). Otherwise this class will use an emulated upsert, - in which case we want the safer option unless we been VERY CAREFUL. + If there is no such index yet[*], we can "emulate" an upsert with a SELECT + followed by either an INSERT or an UPDATE. This is unsafe unless *all* upserters + run at the SERIALIZABLE isolation level: we cannot make the same atomicity + guarantees that a native upsert can and are very vulnerable to races and + crashes. Therefore to upsert without an appropriate unique index, we acquire a + table-level lock before the emulated upsert. [*]: Some tables have unique indices added to them in the background. Those tables `T` are keys in the dictionary UNIQUE_INDEX_BACKGROUND_UPDATES, @@ -1189,7 +1179,6 @@ class DatabasePool: values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting desc: description of the transaction, for logging and metrics - lock: True to lock the table when doing the upsert. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) @@ -1209,7 +1198,6 @@ class DatabasePool: keyvalues, values, insertion_values, - lock=lock, db_autocommit=autocommit, ) except self.engine.module.IntegrityError as e: @@ -1232,7 +1220,6 @@ class DatabasePool: values: Dict[str, Any], insertion_values: Optional[Dict[str, Any]] = None, where_clause: Optional[str] = None, - lock: bool = True, ) -> bool: """ Pick the UPSERT method which works best on the platform. Either the @@ -1245,8 +1232,6 @@ class DatabasePool: values: The nonunique columns and their new values insertion_values: additional key/values to use only when inserting where_clause: An index predicate to apply to the upsert. - lock: True to lock the table when doing the upsert. Unused when performing - a native upsert. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) @@ -1270,7 +1255,6 @@ class DatabasePool: values, insertion_values=insertion_values, where_clause=where_clause, - lock=lock, ) def simple_upsert_txn_emulated( @@ -1291,14 +1275,15 @@ class DatabasePool: insertion_values: additional key/values to use only when inserting where_clause: An index predicate to apply to the upsert. lock: True to lock the table when doing the upsert. + Must not be False unless the table has already been locked. Returns: Returns True if a row was inserted or updated (i.e. if `values` is not empty then this always returns True) """ insertion_values = insertion_values or {} - # We need to lock the table :(, unless we're *really* careful if lock: + # We need to lock the table :( self.engine.lock_table(txn, table) def _getwhere(key: str) -> str: @@ -1406,7 +1391,6 @@ class DatabasePool: value_names: Collection[str], value_values: Collection[Collection[Any]], desc: str, - lock: bool = True, ) -> None: """ Upsert, many times. @@ -1418,8 +1402,6 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused when performing - a native upsert. """ # We can autocommit if it safe to upsert @@ -1433,7 +1415,6 @@ class DatabasePool: key_values, value_names, value_values, - lock=lock, db_autocommit=autocommit, ) @@ -1445,7 +1426,6 @@ class DatabasePool: key_values: Collection[Iterable[Any]], value_names: Collection[str], value_values: Iterable[Iterable[Any]], - lock: bool = True, ) -> None: """ Upsert, many times. @@ -1457,8 +1437,6 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. Unused when performing - a native upsert. """ if table not in self._unsafe_to_upsert_tables: return self.simple_upsert_many_txn_native_upsert( @@ -1466,7 +1444,12 @@ class DatabasePool: ) else: return self.simple_upsert_many_txn_emulated( - txn, table, key_names, key_values, value_names, value_values, lock=lock + txn, + table, + key_names, + key_values, + value_names, + value_values, ) def simple_upsert_many_txn_emulated( @@ -1477,7 +1460,6 @@ class DatabasePool: key_values: Collection[Iterable[Any]], value_names: Collection[str], value_values: Iterable[Iterable[Any]], - lock: bool = True, ) -> None: """ Upsert, many times, but without native UPSERT support or batching. @@ -1489,18 +1471,16 @@ class DatabasePool: value_names: The value column names value_values: A list of each row's value column values. Ignored if value_names is empty. - lock: True to lock the table when doing the upsert. """ # No value columns, therefore make a blank list so that the following # zip() works correctly. if not value_names: value_values = [() for x in range(len(key_values))] - if lock: - # Lock the table just once, to prevent it being done once per row. - # Note that, according to Postgres' documentation, once obtained, - # the lock is held for the remainder of the current transaction. - self.engine.lock_table(txn, "user_ips") + # Lock the table just once, to prevent it being done once per row. + # Note that, according to Postgres' documentation, once obtained, + # the lock is held for the remainder of the current transaction. + self.engine.lock_table(txn, "user_ips") for keyv, valv in zip(key_values, value_values): _keys = {x: y for x, y in zip(key_names, keyv)} diff --git a/synapse/storage/databases/main/account_data.py b/synapse/storage/databases/main/account_data.py index 282687ebce..07908c41d9 100644 --- a/synapse/storage/databases/main/account_data.py +++ b/synapse/storage/databases/main/account_data.py @@ -449,9 +449,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) content_json = json_encoder.encode(content) async with self._account_data_id_gen.get_next() as next_id: - # no need to lock here as room_account_data has a unique constraint - # on (user_id, room_id, account_data_type) so simple_upsert will - # retry if there is a conflict. await self.db_pool.simple_upsert( desc="add_room_account_data", table="room_account_data", @@ -461,7 +458,6 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) "account_data_type": account_data_type, }, values={"stream_id": next_id, "content": content_json}, - lock=False, ) self._account_data_stream_cache.entity_has_changed(user_id, next_id) @@ -517,15 +513,11 @@ class AccountDataWorkerStore(PushRulesWorkerStore, CacheInvalidationWorkerStore) ) -> None: content_json = json_encoder.encode(content) - # no need to lock here as account_data has a unique constraint on - # (user_id, account_data_type) so simple_upsert will retry if - # there is a conflict. self.db_pool.simple_upsert_txn( txn, table="account_data", keyvalues={"user_id": user_id, "account_data_type": account_data_type}, values={"stream_id": next_id, "content": content_json}, - lock=False, ) # Ignored users get denormalized into a separate table as an optimisation. diff --git a/synapse/storage/databases/main/appservice.py b/synapse/storage/databases/main/appservice.py index 63046c0527..25da0c56c5 100644 --- a/synapse/storage/databases/main/appservice.py +++ b/synapse/storage/databases/main/appservice.py @@ -451,8 +451,6 @@ class ApplicationServiceTransactionWorkerStore( table="application_services_state", keyvalues={"as_id": service.id}, values={f"{stream_type}_stream_id": pos}, - # no need to lock when emulating upsert: as_id is a unique key - lock=False, desc="set_appservice_stream_type_pos", ) diff --git a/synapse/storage/databases/main/devices.py b/synapse/storage/databases/main/devices.py index 05a193f889..534f7fc04a 100644 --- a/synapse/storage/databases/main/devices.py +++ b/synapse/storage/databases/main/devices.py @@ -1744,9 +1744,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="device_lists_remote_cache", keyvalues={"user_id": user_id, "device_id": device_id}, values={"content": json_encoder.encode(content)}, - # we don't need to lock, because we assume we are the only thread - # updating this user's devices. - lock=False, ) txn.call_after(self._get_cached_user_device.invalidate, (user_id, device_id)) @@ -1760,9 +1757,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, values={"stream_id": stream_id}, - # again, we can assume we are the only thread updating this user's - # extremity. - lock=False, ) async def update_remote_device_list_cache( @@ -1815,9 +1809,6 @@ class DeviceStore(DeviceWorkerStore, DeviceBackgroundUpdateStore): table="device_lists_remote_extremeties", keyvalues={"user_id": user_id}, values={"stream_id": stream_id}, - # we don't need to lock, because we can assume we are the only thread - # updating this user's extremity. - lock=False, ) async def add_device_change_to_streams( diff --git a/synapse/storage/databases/main/event_federation.py b/synapse/storage/databases/main/event_federation.py index 309a4ba664..bbee02ab18 100644 --- a/synapse/storage/databases/main/event_federation.py +++ b/synapse/storage/databases/main/event_federation.py @@ -1686,7 +1686,6 @@ class EventFederationWorkerStore(SignatureWorkerStore, EventsWorkerStore, SQLBas }, insertion_values={}, desc="insert_insertion_extremity", - lock=False, ) async def insert_received_event_to_staging( diff --git a/synapse/storage/databases/main/pusher.py b/synapse/storage/databases/main/pusher.py index fee37b9ce4..40fd781a6a 100644 --- a/synapse/storage/databases/main/pusher.py +++ b/synapse/storage/databases/main/pusher.py @@ -325,14 +325,11 @@ class PusherWorkerStore(SQLBaseStore): async def set_throttle_params( self, pusher_id: str, room_id: str, params: ThrottleParams ) -> None: - # no need to lock because `pusher_throttle` has a primary key on - # (pusher, room_id) so simple_upsert will retry await self.db_pool.simple_upsert( "pusher_throttle", {"pusher": pusher_id, "room_id": room_id}, {"last_sent_ts": params.last_sent_ts, "throttle_ms": params.throttle_ms}, desc="set_throttle_params", - lock=False, ) async def _remove_deactivated_pushers(self, progress: dict, batch_size: int) -> int: @@ -589,8 +586,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): device_id: Optional[str] = None, ) -> None: async with self._pushers_id_gen.get_next() as stream_id: - # no need to lock because `pushers` has a unique key on - # (app_id, pushkey, user_name) so simple_upsert will retry await self.db_pool.simple_upsert( table="pushers", keyvalues={"app_id": app_id, "pushkey": pushkey, "user_name": user_id}, @@ -609,7 +604,6 @@ class PusherStore(PusherWorkerStore, PusherBackgroundUpdatesStore): "device_id": device_id, }, desc="add_pusher", - lock=False, ) user_has_pusher = self.get_if_user_has_pusher.cache.get_immediate( diff --git a/synapse/storage/databases/main/room.py b/synapse/storage/databases/main/room.py index 52ad947c6c..1309bfd374 100644 --- a/synapse/storage/databases/main/room.py +++ b/synapse/storage/databases/main/room.py @@ -1847,9 +1847,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): "creator": room_creator, "has_auth_chain_index": has_auth_chain_index, }, - # rooms has a unique constraint on room_id, so no need to lock when doing an - # emulated upsert. - lock=False, ) async def store_partial_state_room( @@ -1970,9 +1967,6 @@ class RoomStore(RoomBackgroundUpdateStore, RoomWorkerStore): "creator": "", "has_auth_chain_index": has_auth_chain_index, }, - # rooms has a unique constraint on room_id, so no need to lock when doing an - # emulated upsert. - lock=False, ) async def set_room_is_public(self, room_id: str, is_public: bool) -> None: diff --git a/synapse/storage/databases/main/room_batch.py b/synapse/storage/databases/main/room_batch.py index 39e80f6f5b..131f357d04 100644 --- a/synapse/storage/databases/main/room_batch.py +++ b/synapse/storage/databases/main/room_batch.py @@ -44,6 +44,4 @@ class RoomBatchStore(SQLBaseStore): table="event_to_state_groups", keyvalues={"event_id": event_id}, values={"state_group": state_group_id, "event_id": event_id}, - # Unique constraint on event_id so we don't have to lock - lock=False, ) diff --git a/synapse/storage/databases/main/user_directory.py b/synapse/storage/databases/main/user_directory.py index 698d6f7515..044435deab 100644 --- a/synapse/storage/databases/main/user_directory.py +++ b/synapse/storage/databases/main/user_directory.py @@ -481,7 +481,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): table="user_directory", keyvalues={"user_id": user_id}, values={"display_name": display_name, "avatar_url": avatar_url}, - lock=False, # We're only inserter ) if isinstance(self.database_engine, PostgresEngine): @@ -511,7 +510,6 @@ class UserDirectoryBackgroundUpdateStore(StateDeltasStore): table="user_directory_search", keyvalues={"user_id": user_id}, values={"value": value}, - lock=False, # We're only inserter ) else: # This should be unreachable. |