diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 7ebd4f189d..e6eefdd6fe 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -469,23 +469,46 @@ class SQLBaseStore(object):
txn.executemany(sql, vals)
+ @defer.inlineCallbacks
def _simple_upsert(self, table, keyvalues, values,
insertion_values={}, desc="_simple_upsert", lock=True):
"""
+
+ `lock` should generally be set to True (the default), but can be set
+ to False if either of the following are true:
+
+ * there is a UNIQUE INDEX on the key columns. In this case a conflict
+ will cause an IntegrityError in which case this function will retry
+ the update.
+
+ * we somehow know that we are the only thread which will be updating
+ this table.
+
Args:
table (str): The table to upsert into
keyvalues (dict): The unique key tables and their new values
values (dict): The nonunique columns and their new values
- insertion_values (dict): key/values to use when inserting
+ insertion_values (dict): additional key/values to use only when
+ inserting
+ lock (bool): True to lock the table when doing the upsert.
Returns:
Deferred(bool): True if a new entry was created, False if an
existing one was updated.
"""
- return self.runInteraction(
- desc,
- self._simple_upsert_txn, table, keyvalues, values, insertion_values,
- lock
- )
+ while True:
+ try:
+ result = yield self.runInteraction(
+ desc,
+ self._simple_upsert_txn, table, keyvalues, values, insertion_values,
+ lock=lock
+ )
+ defer.returnValue(result)
+ except self.database_engine.module.IntegrityError as e:
+ # presumably we raced with another transaction: let's retry.
+ logger.warn(
+ "IntegrityError when upserting into %s; retrying: %s",
+ table, e
+ )
def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
lock=True):
@@ -493,7 +516,7 @@ class SQLBaseStore(object):
if lock:
self.database_engine.lock_table(txn, table)
- # Try to update
+ # First try to update.
sql = "UPDATE %s SET %s WHERE %s" % (
table,
", ".join("%s = ?" % (k,) for k in values),
@@ -502,23 +525,24 @@ class SQLBaseStore(object):
sqlargs = values.values() + keyvalues.values()
txn.execute(sql, sqlargs)
- if txn.rowcount == 0:
- # We didn't update and rows so insert a new one
- allvalues = {}
- allvalues.update(keyvalues)
- allvalues.update(values)
- allvalues.update(insertion_values)
+ if txn.rowcount > 0:
+ # successfully updated at least one row.
+ return False
- sql = "INSERT INTO %s (%s) VALUES (%s)" % (
- table,
- ", ".join(k for k in allvalues),
- ", ".join("?" for _ in allvalues)
- )
- txn.execute(sql, allvalues.values())
+ # We didn't update any rows so insert a new one
+ allvalues = {}
+ allvalues.update(keyvalues)
+ allvalues.update(values)
+ allvalues.update(insertion_values)
- return True
- else:
- return False
+ sql = "INSERT INTO %s (%s) VALUES (%s)" % (
+ table,
+ ", ".join(k for k in allvalues),
+ ", ".join("?" for _ in allvalues)
+ )
+ txn.execute(sql, allvalues.values())
+ # successfully inserted
+ return True
def _simple_select_one(self, table, keyvalues, retcols,
allow_none=False, desc="_simple_select_one"):
|