summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorRichard van der Hoff <richard@matrix.org>2017-11-16 15:30:15 +0000
committerRichard van der Hoff <richard@matrix.org>2017-11-16 15:30:15 +0000
commit10aaa1bc15775a228ce22ab45efbb55b5099289b (patch)
tree5874f1f23416fc5c046f25d73dc098ecfd00ddb1 /synapse
parentCleanup in _simple_upsert_txn (diff)
downloadsynapse-10aaa1bc15775a228ce22ab45efbb55b5099289b.tar.xz
_simple_upsert: retry on IntegrityError
wrap the call to _simple_upsert_txn in a loop so that we retry on an
integrityerror: this means we can avoid locking the table provided there is an
unique index.
Diffstat (limited to 'synapse')
-rw-r--r--synapse/storage/_base.py35
1 files changed, 29 insertions, 6 deletions
diff --git a/synapse/storage/_base.py b/synapse/storage/_base.py
index 740400d58b..1582a58966 100644
--- a/synapse/storage/_base.py
+++ b/synapse/storage/_base.py
@@ -469,23 +469,46 @@ class SQLBaseStore(object):
 
         txn.executemany(sql, vals)
 
+    @defer.inlineCallbacks
     def _simple_upsert(self, table, keyvalues, values,
                        insertion_values={}, desc="_simple_upsert", lock=True):
         """
+
+        `lock` should generally be set to True (the default), but can be set
+        to False if either of the following are true:
+
+        * there is a UNIQUE INDEX on the key columns. In this case a conflict
+          will cause an IntegrityError in which case this function will retry
+          the update.
+
+        * we somehow know that we are the only thread which will be updating
+          this table.
+
         Args:
             table (str): The table to upsert into
             keyvalues (dict): The unique key tables and their new values
             values (dict): The nonunique columns and their new values
-            insertion_values (dict): key/values to use when inserting
+            insertion_values (dict): additional key/values to use only when
+                inserting
+            lock (bool): True to lock the table when doing the upsert.
         Returns:
             Deferred(bool): True if a new entry was created, False if an
                 existing one was updated.
         """
-        return self.runInteraction(
-            desc,
-            self._simple_upsert_txn, table, keyvalues, values, insertion_values,
-            lock
-        )
+        while True:
+            try:
+                result = yield self.runInteraction(
+                    desc,
+                    self._simple_upsert_txn, table, keyvalues, values, insertion_values,
+                    lock=lock
+                )
+                defer.returnValue(result)
+            except self.database_engine.IntegrityError as e:
+                # presumably we raced with another transaction: let's retry.
+                logger.warn(
+                    "IntegrityError when upserting into %s; retrying: %s",
+                    table, e
+                )
 
     def _simple_upsert_txn(self, txn, table, keyvalues, values, insertion_values={},
                            lock=True):