summary refs log tree commit diff
path: root/synapse/storage/database.py
diff options
context:
space:
mode:
authorPatrick Cloke <clokep@users.noreply.github.com>2022-09-15 14:28:48 -0400
committerGitHub <noreply@github.com>2022-09-15 18:28:48 +0000
commitb2b0c8527957d89b36c0eafea70347c200c1d294 (patch)
tree27fc59a15ef8dc106d2aa5842907d9d741a0c108 /synapse/storage/database.py
parentA third batch of Pydantic validation for rest/client/account.py (#13736) (diff)
downloadsynapse-b2b0c8527957d89b36c0eafea70347c200c1d294.tar.xz
Support providing an index predicate for upserts. (#13822)
This is useful to upsert against a table which has a unique
partial index while avoiding conflicts.
Diffstat (limited to 'synapse/storage/database.py')
-rw-r--r--synapse/storage/database.py30
1 files changed, 23 insertions, 7 deletions
diff --git a/synapse/storage/database.py b/synapse/storage/database.py
index e881bff7fb..921cd4dc5e 100644
--- a/synapse/storage/database.py
+++ b/synapse/storage/database.py
@@ -1191,6 +1191,7 @@ class DatabasePool:
         keyvalues: Dict[str, Any],
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
+        where_clause: Optional[str] = None,
         lock: bool = True,
     ) -> bool:
         """
@@ -1203,6 +1204,7 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+            where_clause: An index predicate to apply to the upsert.
             lock: True to lock the table when doing the upsert. Unused when performing
                 a native upsert.
         Returns:
@@ -1213,7 +1215,12 @@ class DatabasePool:
 
         if table not in self._unsafe_to_upsert_tables:
             return self.simple_upsert_txn_native_upsert(
-                txn, table, keyvalues, values, insertion_values=insertion_values
+                txn,
+                table,
+                keyvalues,
+                values,
+                insertion_values=insertion_values,
+                where_clause=where_clause,
             )
         else:
             return self.simple_upsert_txn_emulated(
@@ -1222,6 +1229,7 @@ class DatabasePool:
                 keyvalues,
                 values,
                 insertion_values=insertion_values,
+                where_clause=where_clause,
                 lock=lock,
             )
 
@@ -1232,6 +1240,7 @@ class DatabasePool:
         keyvalues: Dict[str, Any],
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
+        where_clause: Optional[str] = None,
         lock: bool = True,
     ) -> bool:
         """
@@ -1240,6 +1249,7 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+            where_clause: An index predicate to apply to the upsert.
             lock: True to lock the table when doing the upsert.
         Returns:
             Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1259,14 +1269,17 @@ class DatabasePool:
             else:
                 return "%s = ?" % (key,)
 
+        # Generate a where clause of each keyvalue and optionally the provided
+        # index predicate.
+        where = [_getwhere(k) for k in keyvalues]
+        if where_clause:
+            where.append(where_clause)
+
         if not values:
             # If `values` is empty, then all of the values we care about are in
             # the unique key, so there is nothing to UPDATE. We can just do a
             # SELECT instead to see if it exists.
-            sql = "SELECT 1 FROM %s WHERE %s" % (
-                table,
-                " AND ".join(_getwhere(k) for k in keyvalues),
-            )
+            sql = "SELECT 1 FROM %s WHERE %s" % (table, " AND ".join(where))
             sqlargs = list(keyvalues.values())
             txn.execute(sql, sqlargs)
             if txn.fetchall():
@@ -1277,7 +1290,7 @@ class DatabasePool:
             sql = "UPDATE %s SET %s WHERE %s" % (
                 table,
                 ", ".join("%s = ?" % (k,) for k in values),
-                " AND ".join(_getwhere(k) for k in keyvalues),
+                " AND ".join(where),
             )
             sqlargs = list(values.values()) + list(keyvalues.values())
 
@@ -1307,6 +1320,7 @@ class DatabasePool:
         keyvalues: Dict[str, Any],
         values: Dict[str, Any],
         insertion_values: Optional[Dict[str, Any]] = None,
+        where_clause: Optional[str] = None,
     ) -> bool:
         """
         Use the native UPSERT functionality in PostgreSQL.
@@ -1316,6 +1330,7 @@ class DatabasePool:
             keyvalues: The unique key tables and their new values
             values: The nonunique columns and their new values
             insertion_values: additional key/values to use only when inserting
+            where_clause: An index predicate to apply to the upsert.
 
         Returns:
             Returns True if a row was inserted or updated (i.e. if `values` is
@@ -1331,11 +1346,12 @@ class DatabasePool:
             allvalues.update(values)
             latter = "UPDATE SET " + ", ".join(k + "=EXCLUDED." + k for k in values)
 
-        sql = ("INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) DO %s") % (
+        sql = "INSERT INTO %s (%s) VALUES (%s) ON CONFLICT (%s) %s DO %s" % (
             table,
             ", ".join(k for k in allvalues),
             ", ".join("?" for _ in allvalues),
             ", ".join(k for k in keyvalues),
+            f"WHERE {where_clause}" if where_clause else "",
             latter,
         )
         txn.execute(sql, list(allvalues.values()))