diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 3433eef288..358ef3fd73 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -320,9 +320,10 @@ class StatsStore(StateDeltasStore):
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
+ extra_dst_insvalues={"bucket_size": self.stats_bucket_size, },
extra_dst_keyvalues={
"end_ts": end_ts,
- "bucket_size": self.stats_bucket_size,
+
},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
@@ -356,7 +357,7 @@ class StatsStore(StateDeltasStore):
]
insert_cols = []
- qargs = [table]
+ qargs = []
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
@@ -367,13 +368,14 @@ class StatsStore(StateDeltasStore):
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
- ON CONFLICT DO UPDATE SET %(updates)s
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
+ "key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
@@ -399,6 +401,7 @@ class StatsStore(StateDeltasStore):
into_table,
keyvalues,
extra_dst_keyvalues,
+ extra_dst_insvalues,
additive_relatives,
src_table,
copy_columns,
@@ -411,6 +414,8 @@ class StatsStore(StateDeltasStore):
keyvalues (dict[str, any]): Row-identifying key values
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
for `into_table`.
+ extra_dst_insvalues (dict[str, any]): Additional values to insert
+ on new row creation for `into_table`.
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
@@ -420,18 +425,18 @@ class StatsStore(StateDeltasStore):
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(
- keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
+ keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
)
sel_exprs = chain(
keyvalues,
copy_columns,
- ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+ ("?" for _ in chain(additive_relatives, extra_dst_keyvalues, extra_dst_insvalues)),
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
sets_ar = (
- "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in additive_relatives
)
sql = """
@@ -454,7 +459,8 @@ class StatsStore(StateDeltasStore):
"additional_where": additional_where,
}
- qargs = chain(additive_relatives.values(), keyvalues.values())
+ qargs = list(chain(additive_relatives.values(), extra_dst_keyvalues.values(), extra_dst_insvalues.values(),
+ keyvalues.values()))
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, into_table)
@@ -470,7 +476,8 @@ class StatsStore(StateDeltasStore):
)
if dest_current_row is None:
- merged_dict = {**keyvalues, **src_row, **additive_relatives}
+ merged_dict = {**keyvalues, **extra_dst_keyvalues, **extra_dst_insvalues, **src_row,
+ **additive_relatives}
self._simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():
|