diff options
author | Richard van der Hoff <github@rvanderhoff.org.uk> | 2017-11-16 16:27:29 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2017-11-16 16:27:29 +0000 |
commit | ba05f28ae72512d4196fca01e311dae6b08d51f3 (patch) | |
tree | a8d550b51e468e2d316185cb069b76880713317a /synapse/storage/_base.py | |
parent | Merge pull request #2661 from matrix-org/rav/statereadstore (diff) | |
parent | Fix broken ref to IntegrityError (diff) | |
download | synapse-ba05f28ae72512d4196fca01e311dae6b08d51f3.tar.xz |
Merge pull request #2684 from matrix-org/rav/unlock_upsert
Start work on avoiding table locks for upserts
Diffstat (limited to 'synapse/storage/_base.py')
-rw-r--r-- | synapse/storage/_base.py | 68 |
1 files changed, 46 insertions, 22 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py index 7ebd4f189d..e6eefdd6fe 100644 --- a/synapse/storage/_base.py +++ b/synapse/storage/_base.py @@ -469,23 +469,46 @@ class SQLBaseStore(object): txn.executemany(sql, vals) + @defer.inlineCallbacks def _simple_upsert(self, table, keyvalues, values, insertion_values={}, desc="_simple_upsert", lock=True): """ + + `lock` should generally be set to True (the default), but can be set + to False if either of the following are true: + + * there is a UNIQUE INDEX on the key columns. In this case a conflict + will cause an IntegrityError in which case this function will retry + the update. + + * we somehow know that we are the only thread which will be updating + this table. + Args: table (str): The table to upsert into keyvalues (dict): The unique key tables and their new values values (dict): The nonunique columns and their new values - insertion_values (dict): key/values to use when inserting + insertion_values (dict): additional key/values to use only when + inserting + lock (bool): True to lock the table when doing the upsert. Returns: Deferred(bool): True if a new entry was created, False if an existing one was updated. """ - return self.runInteraction( - desc, - self._simple_upsert_txn, table, keyvalues, values, insertion_values, - lock - ) + while True: + try: + result = yield self.runInteraction( + desc, + self._simple_upsert_txn, table, keyvalues, values, insertion_values, + lock=lock + ) + defer.returnValue(result) + except self.database_engine.module.IntegrityError as e: + # presumably we raced with another transaction: let's retry. + logger.warn( + "IntegrityError when upserting into %s; retrying: %s", + table, e + ) def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={}, lock=True): @@ -493,7 +516,7 @@ class SQLBaseStore(object): if lock: self.database_engine.lock_table(txn, table) - # Try to update + # First try to update. sql = "UPDATE %s SET %s WHERE %s" % ( table, ", ".join("%s = ?" % (k,) for k in values), @@ -502,23 +525,24 @@ class SQLBaseStore(object): sqlargs = values.values() + keyvalues.values() txn.execute(sql, sqlargs) - if txn.rowcount == 0: - # We didn't update and rows so insert a new one - allvalues = {} - allvalues.update(keyvalues) - allvalues.update(values) - allvalues.update(insertion_values) + if txn.rowcount > 0: + # successfully updated at least one row. + return False - sql = "INSERT INTO %s (%s) VALUES (%s)" % ( - table, - ", ".join(k for k in allvalues), - ", ".join("?" for _ in allvalues) - ) - txn.execute(sql, allvalues.values()) + # We didn't update any rows so insert a new one + allvalues = {} + allvalues.update(keyvalues) + allvalues.update(values) + allvalues.update(insertion_values) - return True - else: - return False + sql = "INSERT INTO %s (%s) VALUES (%s)" % ( + table, + ", ".join(k for k in allvalues), + ", ".join("?" for _ in allvalues) + ) + txn.execute(sql, allvalues.values()) + # successfully inserted + return True def _simple_select_one(self, table, keyvalues, retcols, allow_none=False, desc="_simple_select_one"): |