diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 1e17eae226..d7418fdf1e 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position (
) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
-- represents PRESENT room statistics for a room
+-- only holds absolute fields
CREATE TABLE IF NOT EXISTS room_stats_current (
room_id TEXT NOT NULL PRIMARY KEY,
- -- These starts cover the time from start_ts...end_ts (in seconds).
- -- Note that end_ts is quantised, and start_ts usually so.
- start_ts BIGINT,
- end_ts BIGINT,
-
current_state_events INT NOT NULL,
total_events INT NOT NULL,
joined_members INT NOT NULL,
@@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
-- If initial stats regen has been performed: the maximum delta stream
-- position that this row takes into account.
completed_delta_stream_id BIGINT,
-
- CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
);
@@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current (
CREATE TABLE IF NOT EXISTS room_stats_historical (
room_id TEXT NOT NULL,
-- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
- -- Note that end_ts is quantised, and start_ts usually so.
+ -- Note that end_ts is quantised.
end_ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
@@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical
-- represents PRESENT statistics for a user
+-- only holds absolute fields
CREATE TABLE IF NOT EXISTS user_stats_current (
user_id TEXT NOT NULL PRIMARY KEY,
- -- The timestamp that represents the start of the
- start_ts BIGINT,
- end_ts BIGINT,
-
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py
index 049867fa3e..942d240010 100644
--- a/synapse/storage/schema/delta/56/stats_separated2.py
+++ b/synapse/storage/schema/delta/56/stats_separated2.py
@@ -34,29 +34,13 @@ def _run_create_generic(stats_type, cursor, database_engine):
# partial index are not a big concern.
cursor.execute(
"""
- CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
- ON %s_stats_current (end_ts);
- """
- % (stats_type, stats_type)
- )
- cursor.execute(
- """
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
ON %s_stats_current (completed_delta_stream_id, %s_id);
"""
% (stats_type, stats_type, stats_type)
)
elif isinstance(database_engine, PostgresEngine):
- # This partial index helps us with finding dirty stats rows
- cursor.execute(
- """
- CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
- ON %s_stats_current (end_ts)
- WHERE end_ts IS NOT NULL;
- """
- % (stats_type, stats_type)
- )
- # This partial index helps us with old collection
+ # This partial index helps us with finding incomplete stats rows
cursor.execute(
"""
CREATE INDEX IF NOT EXISTS %s_stats_not_complete
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index cebe4cbc57..4cb10dc9fb 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
- @defer.inlineCallbacks
def update_stats_delta(
self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
):
@@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore):
row was completed.
"""
- while True:
- try:
- res = yield self.runInteraction(
- "update_stats_delta",
- self._update_stats_delta_txn,
- ts,
- stats_type,
- stats_id,
- fields,
- complete_with_stream_id=complete_with_stream_id,
- )
- return res
- except OldCollectionRequired:
- # retry after collecting old rows
- # TODO (implement later)
- raise NotImplementedError("old collection not in this PR")
+ return self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
+ )
+
+ def _upsert_with_additive_relatives_txn(
+ self, txn, table, keyvalues, absolutes, additive_relatives
+ ):
+ """
+
+ Args:
+ txn: Transaction
+ table (str): Table name
+ keyvalues (dict[str, any]): Row-identifying key values
+ absolutes (dict[str, any]): Absolute (set) fields
+ additive_relatives (dict[str, int]): Fields that will be added onto
+ if existing row present.
+ """
+ if self.database_engine.can_native_upsert:
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = [table]
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
+
+ txn.execute(sql, qargs)
+ else:
+ retcols = chain(absolutes.keys(), additive_relatives.keys())
+ current_row = self._simple_select_one_txn(
+ txn, table, keyvalues, retcols, allow_none=True
+ )
+ if current_row is None:
+ merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+ self._simple_insert_txn(txn, table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ current_row[key] += val
+ for (key, val) in absolutes.items():
+ current_row[key] = val
+ self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+ def _upsert_copy_from_table_with_additive_relatives_txn(
+ self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns
+ ):
+ """
+ Args:
+ txn: Transaction
+ into_table (str): The destination table to UPSERT the row into
+ keyvalues (dict[str, any]): Row-identifying key values
+ additive_relatives (dict[str, any]): Fields that will be added onto
+ if existing row present. (Must be disjoint from copy_columns.)
+ src_table (str): The source table to copy from
+ copy_columns (iterable[str]): The list of columns to copy
+ """
+ if self.database_engine.can_native_upsert:
+ ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys())
+ sel_exprs = chain(
+ keyvalues, copy_columns, ("?" for _ in additive_relatives)
+ )
+ keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+ sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+ sets_ar = (
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+ )
+
+ sql = """
+ INSERT INTO %(into_table)s (%(ins_columns)s)
+ SELECT %(sel_exprs)s
+ FROM %(src_table)s
+ WHERE %(keyvalues_where)s
+ ON CONFLICT (%(keyvalues)s)
+ DO UPDATE SET %(sets)s
+ """ % {
+ "into_table": into_table,
+ "ins_columns": ", ".join(ins_columns),
+ "sel_exprs": ", ".join(sel_exprs),
+ "keyvalues_where": ", ".join(keyvalues_where),
+ "src_table": src_table,
+ "keyvalues": ", ".join(keyvalues.keys()),
+ "sets": ", ".join(chain(sets_cc, sets_ar)),
+ }
+
+ qargs = chain(additive_relatives.values(), keyvalues.values())
+ txn.execute(sql, qargs)
+ else:
+ src_row = self._simple_select_one_txn(
+ txn, src_table, keyvalues, copy_columns
+ )
+ dest_current_row = self._simple_select_one_txn(
+ txn,
+ into_table,
+ keyvalues,
+ chain(additive_relatives.keys(), copy_columns),
+ allow_none=True,
+ )
+
+ if dest_current_row is None:
+ merged_dict = {**keyvalues, **src_row, **additive_relatives}
+ self._simple_insert_txn(txn, into_table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ src_row[key] = dest_current_row[key] + val
+ self._simple_update_txn(txn, into_table, keyvalues, src_row)
def _update_stats_delta_txn(
self,
@@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore):
"""
See L{update_stats_delta}
Additional Args:
- absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+ absolute_fields (dict[str, int]): Absolute current stats values
+ (i.e. not deltas). Does not work with per-slice fields.
"""
table, id_col = TYPE_TO_TABLE[stats_type]
quantised_ts = self.quantise_stats_time(int(ts))
end_ts = quantised_ts + self.stats_bucket_size
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
for field in chain(fields.keys(), absolute_fields.keys()):
- if (
- field not in ABSOLUTE_STATS_FIELDS[stats_type]
- and field not in PER_SLICE_FIELDS[stats_type]
- ):
+ if field not in abs_field_names and field not in slice_field_names:
# guard against potential SQL injection dodginess
raise ValueError(
"%s is not a recognised field"
" for stats type %s" % (field, stats_type)
)
- field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
- field_values = list(fields.values())
-
- if absolute_fields is not None:
- field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
- field_values += list(absolute_fields.values())
-
- if complete_with_stream_id is not None:
- field_sqls.append("completed_delta_stream_id = ?")
- field_values.append(complete_with_stream_id)
-
- # update current row, but only if it is either:
- # - dirty and complete but not old
- # If it is old, we should old-collect it first and retry.
- # - dirty and incomplete
- # Incomplete rows can't be old-collected (as this would commit
- # false statistics into the _historical table).
- # Instead, their `end_ts` is extended, whilst we wait for them to
- # become complete at the hand of the stats regenerator.
- sql = (
- "UPDATE %s_current SET end_ts = ?, %s"
- " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
- " AND %s = ?"
- ) % (table, ", ".join(field_sqls), id_col)
-
- qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
-
- txn.execute(sql, qargs)
+ additive_relatives = {
+ key: fields.get(key, 0)
+ for key in abs_field_names
+ if key not in absolute_fields
+ }
- if txn.rowcount > 0:
- # success.
- return
+ if absolute_fields is None:
+ absolute_fields = {}
+ elif complete_with_stream_id is not None:
+ absolute_fields = absolute_fields.copy()
- # if we're here, it's because we didn't succeed in updating a stats
- # row. Why? Let's find out…
+ absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
- current_row = self._simple_select_one_txn(
+ # first upsert the current table
+ self._upsert_with_additive_relatives_txn(
txn,
table + "_current",
{id_col: stats_id},
- ("end_ts", "completed_delta_stream_id"),
- allow_none=True,
+ absolute_fields,
+ additive_relatives,
)
- if current_row is None:
- # Failure reason: There is no row.
- # Solution:
- # we need to insert a row! (insert a dirty, incomplete row)
- insertee = {
- id_col: stats_id,
- "end_ts": end_ts,
- "start_ts": ts,
- "completed_delta_stream_id": complete_with_stream_id,
+ if self.has_completed_background_updates():
+ # TODO want to check specifically for stats regenerator, not all
+ # background updates…
+ # then upsert the historical table.
+ # we don't support absolute_fields for slice_field_names as it makes
+ # no sense.
+ per_slice_additive_relatives = {
+ key: fields.get(key, 0)
+ for key in slice_field_names
}
-
- # we assume that, by default, blank fields should be zero.
- for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
- insertee[field_name] = 0
-
- for field_name in PER_SLICE_FIELDS[stats_type]:
- insertee[field_name] = 0
-
- for (field, value) in fields.items():
- insertee[field] = value
-
- if absolute_fields is not None:
- for (field, value) in absolute_fields.items():
- insertee[field] = value
-
- self._simple_insert_txn(txn, table + "_current", insertee)
-
- elif current_row["end_ts"] is None:
- # Failure reason: The row is not dirty.
- # Solution:
- # update the row, including `start_ts`, to make it dirty.
- sql = (
- "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
- " WHERE end_ts IS NULL AND %s = ?"
- ) % (table, ", ".join(field_sqls), id_col)
-
- qargs = (
- [end_ts - self.stats_bucket_size, end_ts]
- + list(field_values)
- + [stats_id]
+ self._upsert_copy_from_table_with_additive_relatives_txn(
+ txn,
+ table + "_historical",
+ {id_col: stats_id},
+ per_slice_additive_relatives,
+ table + "_current",
+ abs_field_names
)
-
- txn.execute(sql, qargs)
- if txn.rowcount == 0:
- raise RuntimeError(
- "Should be impossible: No rows updated"
- " but all conditions are known to be met."
- )
-
- elif current_row["end_ts"] < end_ts:
- # Failure reason: The row is complete and old.
- # Solution: We need to perform old collection first
- raise OldCollectionRequired()
|