diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5a95c36a8b..5527dd208e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2270,8 +2270,9 @@ class EventsStore(
"room_aliases",
"room_depth",
"room_memberships",
- "room_state",
- "room_stats",
+ "room_stats_state",
+ "room_stats_current",
+ "room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 045b5ca013..52fb09c0e6 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -137,3 +137,6 @@ CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical
-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
-- out for us. (We would want it to review stats for a particular user.)
+
+-- Also rename room_state to room_stats_state to make its ownership clear.
+ALTER TABLE room_state RENAME TO room_stats_state;
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index c9687c29d2..3df57b52ea 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -25,6 +25,9 @@ from synapse.util.caches.descriptors import cached
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
@@ -38,17 +41,13 @@ ABSOLUTE_STATS_FIELDS = {
}
# these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
PER_SLICE_FIELDS = {"room": (), "user": ()}
TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
-class OldCollectionRequired(Exception):
- """ Signal that we need to collect old stats rows and retry. """
-
- pass
-
-
class StatsStore(StateDeltasStore):
def __init__(self, db_conn, hs):
super(StatsStore, self).__init__(db_conn, hs)
@@ -188,7 +187,7 @@ class StatsStore(StateDeltasStore):
fields[col] = None
return self._simple_upsert(
- table="room_state",
+ table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
@@ -242,6 +241,94 @@ class StatsStore(StateDeltasStore):
complete_with_stream_id=complete_with_stream_id,
)
+ def _update_stats_delta_txn(
+ self,
+ txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=None,
+ absolute_field_overrides=None,
+ ):
+ """
+ See L{update_stats_delta}
+ Additional Args:
+ absolute_field_overrides (dict[str, int]): Current stats values
+ (i.e. not deltas) of absolute fields.
+ Does not work with per-slice fields.
+ """
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ quantised_ts = self.quantise_stats_time(int(ts))
+ end_ts = quantised_ts + self.stats_bucket_size
+
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
+ for field in chain(fields.keys(), absolute_field_overrides.keys()):
+ if field not in abs_field_names and field not in slice_field_names:
+ # guard against potential SQL injection dodginess
+ raise ValueError(
+ "%s is not a recognised field"
+ " for stats type %s" % (field, stats_type)
+ )
+
+ # only absolute stats fields are tracked in the `_current` stats tables,
+ # so those are the only ones that we process deltas for when
+ # we upsert against the `_current` table.
+
+ # This calculates the deltas (`field = field + ?` values)
+ # for absolute fields,
+ # * defaulting to 0 if not specified
+ # (required for the INSERT part of upserting to work)
+ # * omitting overrides specified in `absolute_field_overrides`
+ deltas_of_absolute_fields = {
+ key: fields.get(key, 0)
+ for key in abs_field_names
+ if key not in absolute_field_overrides
+ }
+
+ if absolute_field_overrides is None:
+ absolute_field_overrides = {}
+
+ if complete_with_stream_id is not None:
+ absolute_field_overrides = absolute_field_overrides.copy()
+ absolute_field_overrides[
+ "completed_delta_stream_id"
+ ] = complete_with_stream_id
+
+ # first upsert the `_current` table
+ self._upsert_with_additive_relatives_txn(
+ txn=txn,
+ table=table + "_current",
+ keyvalues={id_col: stats_id},
+ absolutes=absolute_field_overrides,
+ additive_relatives=deltas_of_absolute_fields,
+ )
+
+ if self.has_completed_background_updates():
+ # TODO want to check specifically for stats regenerator, not all
+ # background updates…
+ # then upsert the `_historical` table.
+ # we don't support absolute_fields for per-slice fields as it makes
+ # no sense.
+ per_slice_additive_relatives = {
+ key: fields.get(key, 0) for key in slice_field_names
+ }
+ self._upsert_copy_from_table_with_additive_relatives_txn(
+ txn=txn,
+ into_table=table + "_historical",
+ keyvalues={id_col: stats_id},
+ extra_dst_keyvalues={
+ "end_ts": end_ts,
+ "bucket_size": self.stats_bucket_size,
+ },
+ additive_relatives=per_slice_additive_relatives,
+ src_table=table + "_current",
+ copy_columns=abs_field_names,
+ additional_where=" AND completed_delta_stream_id IS NOT NULL",
+ )
+
def _upsert_with_additive_relatives_txn(
self, txn, table, keyvalues, absolutes, additive_relatives
):
@@ -388,83 +475,3 @@ class StatsStore(StateDeltasStore):
for (key, val) in additive_relatives.items():
src_row[key] = dest_current_row[key] + val
self._simple_update_txn(txn, into_table, keyvalues, src_row)
-
- def _update_stats_delta_txn(
- self,
- txn,
- ts,
- stats_type,
- stats_id,
- fields,
- complete_with_stream_id=None,
- absolute_fields=None,
- ):
- """
- See L{update_stats_delta}
- Additional Args:
- absolute_fields (dict[str, int]): Absolute current stats values
- (i.e. not deltas). Does not work with per-slice fields.
- """
- table, id_col = TYPE_TO_TABLE[stats_type]
-
- quantised_ts = self.quantise_stats_time(int(ts))
- end_ts = quantised_ts + self.stats_bucket_size
-
- abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
- slice_field_names = PER_SLICE_FIELDS[stats_type]
- for field in chain(fields.keys(), absolute_fields.keys()):
- if field not in abs_field_names and field not in slice_field_names:
- # guard against potential SQL injection dodginess
- raise ValueError(
- "%s is not a recognised field"
- " for stats type %s" % (field, stats_type)
- )
-
- # only absolute stats fields are tracked in the `_current` stats tables,
- # so those are the only ones that we process deltas for when
- # we upsert against the `_current` table.
- additive_relatives = {
- key: fields.get(key, 0)
- for key in abs_field_names
- if key not in absolute_fields
- }
-
- if absolute_fields is None:
- absolute_fields = {}
-
- if complete_with_stream_id is not None:
- absolute_fields = absolute_fields.copy()
- absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
- self.get_earliest_token_for_stats.invalidate(stats_type, stats_id)
-
- # first upsert the `_current` table
- self._upsert_with_additive_relatives_txn(
- txn=txn,
- table=table + "_current",
- keyvalues={id_col: stats_id},
- absolutes=absolute_fields,
- additive_relatives=additive_relatives,
- )
-
- if self.has_completed_background_updates():
- # TODO want to check specifically for stats regenerator, not all
- # background updates…
- # then upsert the `_historical` table.
- # we don't support absolute_fields for per-slice fields as it makes
- # no sense.
- per_slice_additive_relatives = {
- key: fields.get(key, 0) for key in slice_field_names
- }
- self._upsert_copy_from_table_with_additive_relatives_txn(
- txn=txn,
- into_table=table + "_historical",
- keyvalues={id_col: stats_id},
- extra_dst_keyvalues={
- "end_ts": end_ts,
- "bucket_size": self.stats_bucket_size,
- },
- additive_relatives=per_slice_additive_relatives,
- src_table=table + "_current",
- copy_columns=abs_field_names,
- additional_where=" AND completed_delta_stream_id IS NOT NULL",
- )
|