summary refs log tree commit diff
path: root/synapse/storage/stats.py
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-28 20:18:45 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-28 20:18:45 +0100
commit3b69bf3e74a5b9247f306c6c1e74be09f79822bd (patch)
tree5ec3d1ea9eddbdef36c5a9ed6b645e5922c1c539 /synapse/storage/stats.py
parentHoist up None check to prevent trying to iterate over NoneType.keys() (diff)
downloadsynapse-3b69bf3e74a5b9247f306c6c1e74be09f79822bd.tar.xz
Upsert fixes
Diffstat (limited to 'synapse/storage/stats.py')
-rw-r--r--synapse/storage/stats.py23
1 files changed, 15 insertions, 8 deletions
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 3433eef288..358ef3fd73 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -320,9 +320,10 @@ class StatsStore(StateDeltasStore):
                 txn=txn,
                 into_table=table + "_historical",
                 keyvalues={id_col: stats_id},
+                extra_dst_insvalues={"bucket_size": self.stats_bucket_size, },
                 extra_dst_keyvalues={
                     "end_ts": end_ts,
-                    "bucket_size": self.stats_bucket_size,
+
                 },
                 additive_relatives=per_slice_additive_relatives,
                 src_table=table + "_current",
@@ -356,7 +357,7 @@ class StatsStore(StateDeltasStore):
             ]
 
             insert_cols = []
-            qargs = [table]
+            qargs = []
 
             for (key, val) in chain(
                 keyvalues.items(), absolutes.items(), additive_relatives.items()
@@ -367,13 +368,14 @@ class StatsStore(StateDeltasStore):
             sql = """
                 INSERT INTO %(table)s (%(insert_cols_cs)s)
                 VALUES (%(insert_vals_qs)s)
-                ON CONFLICT DO UPDATE SET %(updates)s
+                ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
             """ % {
                 "table": table,
                 "insert_cols_cs": ", ".join(insert_cols),
                 "insert_vals_qs": ", ".join(
                     ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
                 ),
+                "key_columns": ", ".join(keyvalues),
                 "updates": ", ".join(chain(absolute_updates, relative_updates)),
             }
 
@@ -399,6 +401,7 @@ class StatsStore(StateDeltasStore):
         into_table,
         keyvalues,
         extra_dst_keyvalues,
+        extra_dst_insvalues,
         additive_relatives,
         src_table,
         copy_columns,
@@ -411,6 +414,8 @@ class StatsStore(StateDeltasStore):
              keyvalues (dict[str, any]): Row-identifying key values
              extra_dst_keyvalues (dict[str, any]): Additional keyvalues
                 for `into_table`.
+             extra_dst_insvalues (dict[str, any]): Additional values to insert
+                on new row creation for `into_table`.
              additive_relatives (dict[str, any]): Fields that will be added onto
                 if existing row present. (Must be disjoint from copy_columns.)
              src_table (str): The source table to copy from
@@ -420,18 +425,18 @@ class StatsStore(StateDeltasStore):
         """
         if self.database_engine.can_native_upsert:
             ins_columns = chain(
-                keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
+                keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
             )
             sel_exprs = chain(
                 keyvalues,
                 copy_columns,
-                ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+                ("?" for _ in chain(additive_relatives, extra_dst_keyvalues, extra_dst_insvalues)),
             )
             keyvalues_where = ("%s = ?" % f for f in keyvalues)
 
             sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
             sets_ar = (
-                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in additive_relatives
             )
 
             sql = """
@@ -454,7 +459,8 @@ class StatsStore(StateDeltasStore):
                 "additional_where": additional_where,
             }
 
-            qargs = chain(additive_relatives.values(), keyvalues.values())
+            qargs = list(chain(additive_relatives.values(), extra_dst_keyvalues.values(), extra_dst_insvalues.values(),
+                               keyvalues.values()))
             txn.execute(sql, qargs)
         else:
             self.database_engine.lock_table(txn, into_table)
@@ -470,7 +476,8 @@ class StatsStore(StateDeltasStore):
             )
 
             if dest_current_row is None:
-                merged_dict = {**keyvalues, **src_row, **additive_relatives}
+                merged_dict = {**keyvalues, **extra_dst_keyvalues, **extra_dst_insvalues, **src_row,
+                               **additive_relatives}
                 self._simple_insert_txn(txn, into_table, merged_dict)
             else:
                 for (key, val) in additive_relatives.items():