diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 9d6c3027d5..20ce3664a0 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -260,6 +260,10 @@ class StatsStore(StateDeltasStore):
(i.e. not deltas) of absolute fields.
Does not work with per-slice fields.
"""
+
+ if absolute_field_overrides is None:
+ absolute_field_overrides = {}
+
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
@@ -290,9 +294,6 @@ class StatsStore(StateDeltasStore):
if key not in absolute_field_overrides
}
- if absolute_field_overrides is None:
- absolute_field_overrides = {}
-
if complete_with_stream_id is not None:
absolute_field_overrides = absolute_field_overrides.copy()
absolute_field_overrides[
@@ -321,10 +322,8 @@ class StatsStore(StateDeltasStore):
txn=txn,
into_table=table + "_historical",
keyvalues={id_col: stats_id},
- extra_dst_keyvalues={
- "end_ts": end_ts,
- "bucket_size": self.stats_bucket_size,
- },
+ extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
+ extra_dst_keyvalues={"end_ts": end_ts},
additive_relatives=per_slice_additive_relatives,
src_table=table + "_current",
copy_columns=abs_field_names,
@@ -357,7 +356,7 @@ class StatsStore(StateDeltasStore):
]
insert_cols = []
- qargs = [table]
+ qargs = []
for (key, val) in chain(
keyvalues.items(), absolutes.items(), additive_relatives.items()
@@ -368,20 +367,21 @@ class StatsStore(StateDeltasStore):
sql = """
INSERT INTO %(table)s (%(insert_cols_cs)s)
VALUES (%(insert_vals_qs)s)
- ON CONFLICT DO UPDATE SET %(updates)s
+ ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
""" % {
"table": table,
"insert_cols_cs": ", ".join(insert_cols),
"insert_vals_qs": ", ".join(
["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
),
+ "key_columns": ", ".join(keyvalues),
"updates": ", ".join(chain(absolute_updates, relative_updates)),
}
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, table)
- retcols = chain(absolutes.keys(), additive_relatives.keys())
+ retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
current_row = self._simple_select_one_txn(
txn, table, keyvalues, retcols, allow_none=True
)
@@ -400,6 +400,7 @@ class StatsStore(StateDeltasStore):
into_table,
keyvalues,
extra_dst_keyvalues,
+ extra_dst_insvalues,
additive_relatives,
src_table,
copy_columns,
@@ -412,6 +413,8 @@ class StatsStore(StateDeltasStore):
keyvalues (dict[str, any]): Row-identifying key values
extra_dst_keyvalues (dict[str, any]): Additional keyvalues
for `into_table`.
+ extra_dst_insvalues (dict[str, any]): Additional values to insert
+ on new row creation for `into_table`.
additive_relatives (dict[str, any]): Fields that will be added onto
if existing row present. (Must be disjoint from copy_columns.)
src_table (str): The source table to copy from
@@ -421,18 +424,28 @@ class StatsStore(StateDeltasStore):
"""
if self.database_engine.can_native_upsert:
ins_columns = chain(
- keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
+ keyvalues,
+ copy_columns,
+ additive_relatives,
+ extra_dst_keyvalues,
+ extra_dst_insvalues,
)
sel_exprs = chain(
keyvalues,
copy_columns,
- ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+ (
+ "?"
+ for _ in chain(
+ additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
+ )
+ ),
)
keyvalues_where = ("%s = ?" % f for f in keyvalues)
sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
sets_ar = (
- "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
+ for f in additive_relatives
)
sql = """
@@ -455,7 +468,14 @@ class StatsStore(StateDeltasStore):
"additional_where": additional_where,
}
- qargs = chain(additive_relatives.values(), keyvalues.values())
+ qargs = list(
+ chain(
+ additive_relatives.values(),
+ extra_dst_keyvalues.values(),
+ extra_dst_insvalues.values(),
+ keyvalues.values(),
+ )
+ )
txn.execute(sql, qargs)
else:
self.database_engine.lock_table(txn, into_table)
@@ -466,12 +486,18 @@ class StatsStore(StateDeltasStore):
txn,
into_table,
keyvalues,
- chain(additive_relatives.keys(), copy_columns),
+ retcols=list(chain(additive_relatives.keys(), copy_columns)),
allow_none=True,
)
if dest_current_row is None:
- merged_dict = {**keyvalues, **src_row, **additive_relatives}
+ merged_dict = {
+ **keyvalues,
+ **extra_dst_keyvalues,
+ **extra_dst_insvalues,
+ **src_row,
+ **additive_relatives,
+ }
self._simple_insert_txn(txn, into_table, merged_dict)
else:
for (key, val) in additive_relatives.items():
|