diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 4112291c76..6832ec6b7f 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -18,8 +18,10 @@ import logging
from threading import Lock
from twisted.internet import defer
+from itertools import chain
from synapse.storage.state_deltas import StateDeltasStore
+from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
@@ -69,10 +71,11 @@ class StatsStore(StateDeltasStore):
Quantises a timestamp to be a multiple of the bucket size.
Args:
- ts: the timestamp to quantise, in seconds since the Unix Epoch
+ ts (int): the timestamp to quantise, in milliseconds since the Unix
+ Epoch
Returns:
- a timestamp which
+ int: a timestamp which
- is divisible by the bucket size;
- is no later than `ts`; and
- is the largest such timestamp.
@@ -213,7 +216,6 @@ class StatsStore(StateDeltasStore):
allow_none=True,
)
- @defer.inlineCallbacks
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
@@ -232,22 +234,162 @@ class StatsStore(StateDeltasStore):
row was completed.
"""
- while True:
- try:
- res = yield self.runInteraction(
- "update_stats_delta",
- self._update_stats_delta_txn,
- ts,
- stats_type,
- stats_id,
- fields,
- complete_with_stream_id=complete_with_stream_id,
- )
- return res
- except OldCollectionRequired:
- # retry after collecting old rows
- # TODO (implement later)
- raise NotImplementedError("old collection not in this PR")
+ return self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
+ )
+
+ def _upsert_with_additive_relatives_txn(
+ self, txn, table, keyvalues, absolutes, additive_relatives
+ ):
+ """Used to update values in the stats tables.
+
+ Args:
+ txn: Transaction
+ table (str): Table name
+ keyvalues (dict[str, any]): Row-identifying key values
+ absolutes (dict[str, any]): Absolute (set) fields
+ additive_relatives (dict[str, int]): Fields that will be added onto
+ if existing row present.
+ """
+ if self.database_engine.can_native_upsert:
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = [table]
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
+
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, table)
+ retcols = chain(absolutes.keys(), additive_relatives.keys())
+ current_row = self._simple_select_one_txn(
+ txn, table, keyvalues, retcols, allow_none=True
+ )
+ if current_row is None:
+ merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+ self._simple_insert_txn(txn, table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ current_row[key] += val
+ current_row.update(absolutes)
+ self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+ def _upsert_copy_from_table_with_additive_relatives_txn(
+ self,
+ txn,
+ into_table,
+ keyvalues,
+ extra_dst_keyvalues,
+ additive_relatives,
+ src_table,
+ copy_columns,
+ additional_where="",
+ ):
+ """
+ Args:
+ txn: Transaction
+ into_table (str): The destination table to UPSERT the row into
+ keyvalues (dict[str, any]): Row-identifying key values
+ extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+ for `into_table`.
+ additive_relatives (dict[str, any]): Fields that will be added onto
+ if existing row present. (Must be disjoint from copy_columns.)
+ src_table (str): The source table to copy from
+ copy_columns (iterable[str]): The list of columns to copy
+ additional_where (str): Additional SQL for where (prefix with AND
+ if using).
+ """
+ if self.database_engine.can_native_upsert:
+ ins_columns = chain(
+ keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
+ )
+ sel_exprs = chain(
+ keyvalues,
+ copy_columns,
+ ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+ )
+ keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+ sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+ sets_ar = (
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+ )
+
+ sql = """
+ INSERT INTO %(into_table)s (%(ins_columns)s)
+ SELECT %(sel_exprs)s
+ FROM %(src_table)s
+ WHERE %(keyvalues_where)s %(additional_where)s
+ ON CONFLICT (%(keyvalues)s)
+ DO UPDATE SET %(sets)s
+ """ % {
+ "into_table": into_table,
+ "ins_columns": ", ".join(ins_columns),
+ "sel_exprs": ", ".join(sel_exprs),
+ "keyvalues_where": " AND ".join(keyvalues_where),
+ "src_table": src_table,
+ "keyvalues": ", ".join(
+ chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+ ),
+ "sets": ", ".join(chain(sets_cc, sets_ar)),
+ "additional_where": additional_where,
+ }
+
+ qargs = chain(additive_relatives.values(), keyvalues.values())
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, into_table)
+ src_row = self._simple_select_one_txn(
+ txn, src_table, keyvalues, copy_columns
+ )
+ dest_current_row = self._simple_select_one_txn(
+ txn,
+ into_table,
+ keyvalues,
+ chain(additive_relatives.keys(), copy_columns),
+ allow_none=True,
+ )
+
+ if dest_current_row is None:
+ merged_dict = {**keyvalues, **src_row, **additive_relatives}
+ self._simple_insert_txn(txn, into_table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ src_row[key] = dest_current_row[key] + val
+ self._simple_update_txn(txn, into_table, keyvalues, src_row)
def _update_stats_delta_txn(
self,
@@ -262,94 +404,67 @@ class StatsStore(StateDeltasStore):
"""
See L{update_stats_delta}
Additional Args:
- absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+ absolute_fields (dict[str, int]): Absolute current stats values
+ (i.e. not deltas). Does not work with per-slice fields.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
- field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
- field_values = list(fields.values())
-
- if absolute_fields is not None:
- field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
- field_values += list(absolute_fields.values())
-
- if complete_with_stream_id is not None:
- field_sqls.append("completed_delta_stream_id = ?")
- field_values.append(complete_with_stream_id)
-
- sql = (
- "UPDATE %s_current SET end_ts = ?, %s"
- " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
- " AND %s = ?"
- ) % (table, ", ".join(field_sqls), id_col)
-
- qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
-
- txn.execute(sql, qargs)
-
- if txn.rowcount > 0:
- # success.
- return
-
- # if we're here, it's because we didn't succeed in updating a stats
- # row. Why? Let's find out…
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
+ for field in chain(fields.keys(), absolute_fields.keys()):
+ if field not in abs_field_names and field not in slice_field_names:
+ # guard against potential SQL injection dodginess
+ raise ValueError(
+ "%s is not a recognised field"
+ " for stats type %s" % (field, stats_type)
+ )
- current_row = self._simple_select_one_txn(
- txn,
- table + "_current",
- {id_col: stats_id},
- ("end_ts", "completed_delta_stream_id"),
- allow_none=True,
+ # only absolute stats fields are tracked in the `_current` stats tables,
+ # so those are the only ones that we process deltas for when
+ # we upsert against the `_current` table.
+ additive_relatives = {
+ key: fields.get(key, 0)
+ for key in abs_field_names
+ if key not in absolute_fields
+ }
+
+ if absolute_fields is None:
+ absolute_fields = {}
+ elif complete_with_stream_id is not None:
+ absolute_fields = absolute_fields.copy()
+ absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
+
+ # first upsert the `_current` table
+ self._upsert_with_additive_relatives_txn(
+ txn=txn,
+ table=table + "_current",
+ keyvalues={id_col: stats_id},
+ absolutes=absolute_fields,
+ additive_relatives=additive_relatives,
)
- if current_row is None:
- # we need to insert a row! (insert a dirty, incomplete row)
- insertee = {
- id_col: stats_id,
- "end_ts": end_ts,
- "start_ts": ts,
- "completed_delta_stream_id": complete_with_stream_id,
+ if self.has_completed_background_updates():
+ # TODO want to check specifically for stats regenerator, not all
+ # background updates…
+ # then upsert the `_historical` table.
+ # we don't support absolute_fields for per-slice fields as it makes
+ # no sense.
+ per_slice_additive_relatives = {
+ key: fields.get(key, 0) for key in slice_field_names
}
-
- # we assume that, by default, blank fields should be zero.
- for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
- insertee[field_name] = 0
-
- for field_name in PER_SLICE_FIELDS[stats_type]:
- insertee[field_name] = 0
-
- for (field, value) in fields.items():
- insertee[field] = value
-
- if absolute_fields is not None:
- for (field, value) in absolute_fields.items():
- insertee[field] = value
-
- self._simple_insert_txn(txn, table + "_current", insertee)
-
- elif current_row["end_ts"] is None:
- # update the row, including start_ts
- sql = (
- "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
- " WHERE end_ts IS NULL AND %s = ?"
- ) % (table, ", ".join(field_sqls), id_col)
-
- qargs = (
- [end_ts - self.stats_bucket_size, end_ts]
- + list(field_values)
- + [stats_id]
+ self._upsert_copy_from_table_with_additive_relatives_txn(
+ txn=txn,
+ into_table=table + "_historical",
+ keyvalues={id_col: stats_id},
+ extra_dst_keyvalues={
+ "end_ts": end_ts,
+ "bucket_size": self.stats_bucket_size,
+ },
+ additive_relatives=per_slice_additive_relatives,
+ src_table=table + "_current",
+ copy_columns=abs_field_names,
+ additional_where=" AND completed_delta_stream_id IS NOT NULL",
)
-
- txn.execute(sql, qargs)
- if txn.rowcount == 0:
- raise RuntimeError(
- "Should be impossible: No rows updated"
- " but all conditions are known to be met."
- )
-
- elif current_row["end_ts"] < end_ts:
- # we need to perform old collection first
- raise OldCollectionRequired()
|