diff --git a/synapse/config/stats.py b/synapse/config/stats.py
index b518a3ed9c..b18ddbd1fa 100644
--- a/synapse/config/stats.py
+++ b/synapse/config/stats.py
@@ -27,19 +27,16 @@ class StatsConfig(Config):
def read_config(self, config, **kwargs):
self.stats_enabled = True
- self.stats_bucket_size = 86400
+ self.stats_bucket_size = 86400 * 1000
self.stats_retention = sys.maxsize
stats_config = config.get("stats", None)
if stats_config:
self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
- self.stats_bucket_size = (
- self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
+ self.stats_bucket_size = self.parse_duration(
+ stats_config.get("bucket_size", "1d")
)
- self.stats_retention = (
- self.parse_duration(
- stats_config.get("retention", "%ds" % (sys.maxsize,))
- )
- / 1000
+ self.stats_retention = self.parse_duration(
+ stats_config.get("retention", "%ds" % (sys.maxsize,))
)
def generate_config_section(self, config_dir_path, server_name, **kwargs):
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5a95c36a8b..5527dd208e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2270,8 +2270,9 @@ class EventsStore(
"room_aliases",
"room_depth",
"room_memberships",
- "room_state",
- "room_stats",
+ "room_stats_state",
+ "room_stats_current",
+ "room_stats_historical",
"room_stats_earliest_token",
"rooms",
"stream_ordering_to_exterm",
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 5b125d17b0..52fb09c0e6 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -31,3 +31,112 @@ DELETE FROM background_updates WHERE update_name IN (
'populate_stats_process_rooms',
'populate_stats_cleanup'
);
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+CREATE TABLE IF NOT EXISTS stats_incremental_position (
+ -- the stream_id of the last-processed state delta
+ state_delta_stream_id BIGINT,
+
+ -- the stream_ordering of the last-processed backfilled event
+ -- (this is negative)
+ total_events_min_stream_ordering BIGINT,
+
+ -- the stream_ordering of the last-processed normally-created event
+ -- (this is positive)
+ total_events_max_stream_ordering BIGINT,
+
+ -- If true, this represents the contract agreed upon by the stats
+ -- regenerator.
+ -- If false, this is suitable for use by the delta/incremental processor.
+ is_background_contract BOOLEAN NOT NULL PRIMARY KEY
+);
+
+-- insert a null row and make sure it is the only one.
+DELETE FROM stats_incremental_position;
+INSERT INTO stats_incremental_position (
+ state_delta_stream_id,
+ total_events_min_stream_ordering,
+ total_events_max_stream_ordering,
+ is_background_contract
+) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
+
+-- represents PRESENT room statistics for a room
+-- only holds absolute fields
+CREATE TABLE IF NOT EXISTS room_stats_current (
+ room_id TEXT NOT NULL PRIMARY KEY,
+
+ current_state_events INT NOT NULL,
+ total_events INT NOT NULL,
+ joined_members INT NOT NULL,
+ invited_members INT NOT NULL,
+ left_members INT NOT NULL,
+ banned_members INT NOT NULL,
+
+ -- If initial stats regen is still to be performed: NULL
+ -- If initial stats regen has been performed: the maximum delta stream
+ -- position that this row takes into account.
+ completed_delta_stream_id BIGINT
+);
+
+
+-- represents HISTORICAL room statistics for a room
+CREATE TABLE IF NOT EXISTS room_stats_historical (
+ room_id TEXT NOT NULL,
+ -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
+ -- Note that end_ts is quantised.
+ end_ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+
+ current_state_events INT NOT NULL,
+ total_events INT NOT NULL,
+ joined_members INT NOT NULL,
+ invited_members INT NOT NULL,
+ left_members INT NOT NULL,
+ banned_members INT NOT NULL,
+
+ PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular room.)
+
+
+-- represents PRESENT statistics for a user
+-- only holds absolute fields
+CREATE TABLE IF NOT EXISTS user_stats_current (
+ user_id TEXT NOT NULL PRIMARY KEY,
+
+ public_rooms INT NOT NULL,
+ private_rooms INT NOT NULL,
+
+ -- If initial stats regen is still to be performed: NULL
+ -- If initial stats regen has been performed: the maximum delta stream
+ -- position that this row takes into account.
+ completed_delta_stream_id BIGINT
+);
+
+-- represents HISTORICAL statistics for a user
+CREATE TABLE IF NOT EXISTS user_stats_historical (
+ user_id TEXT NOT NULL,
+ end_ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+
+ public_rooms INT NOT NULL,
+ private_rooms INT NOT NULL,
+
+ PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular user.)
+
+-- Also rename room_state to room_stats_state to make its ownership clear.
+ALTER TABLE room_state RENAME TO room_stats_state;
diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres
new file mode 100644
index 0000000000..0519fcff79
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres
@@ -0,0 +1,24 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- These partial indices helps us with finding incomplete stats row
+CREATE INDEX IF NOT EXISTS room_stats_not_complete
+ ON room_stats_current (room_id)
+ WHERE completed_delta_stream_id IS NULL;
+
+CREATE INDEX IF NOT EXISTS user_stats_not_complete
+ ON user_stats_current (user_id)
+ WHERE completed_delta_stream_id IS NULL;
+
diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
new file mode 100644
index 0000000000..181f4ec5b9
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
@@ -0,0 +1,27 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- even though SQLite >= 3.8 can support partial indices, we won't enable
+-- them, in case the SQLite database may be later used on another system.
+-- It's also the case that SQLite is only likely to be used in small
+-- deployments or testing, where the optimisations gained by use of a
+-- partial index are not a big concern.
+
+CREATE INDEX IF NOT EXISTS room_stats_not_complete
+ ON room_stats_current (completed_delta_stream_id, room_id);
+
+CREATE INDEX IF NOT EXISTS user_stats_not_complete
+ ON user_stats_current (completed_delta_stream_id, user_id);
+
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 123c1ae220..4106f161f0 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,12 +15,16 @@
# limitations under the License.
import logging
+from itertools import chain
from synapse.storage.state_deltas import StateDeltasStore
logger = logging.getLogger(__name__)
# these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
ABSOLUTE_STATS_FIELDS = {
"room": (
"current_state_events",
@@ -27,12 +32,17 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
- "state_events",
+ "total_events",
),
"user": ("public_rooms", "private_rooms"),
}
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
+
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
class StatsStore(StateDeltasStore):
@@ -48,6 +58,22 @@ class StatsStore(StateDeltasStore):
self.register_noop_background_update("populate_stats_process_rooms")
self.register_noop_background_update("populate_stats_cleanup")
+ def quantise_stats_time(self, ts):
+ """
+ Quantises a timestamp to be a multiple of the bucket size.
+
+ Args:
+ ts (int): the timestamp to quantise, in milliseconds since the Unix
+ Epoch
+
+ Returns:
+ int: a timestamp which
+ - is divisible by the bucket size;
+ - is no later than `ts`; and
+ - is the largest such timestamp.
+ """
+ return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
def update_room_state(self, room_id, fields):
"""
Args:
@@ -71,8 +97,271 @@ class StatsStore(StateDeltasStore):
fields[col] = None
return self._simple_upsert(
- table="room_state",
+ table="room_stats_state",
keyvalues={"room_id": room_id},
values=fields,
desc="update_room_state",
)
+
+ def update_stats_delta(
+ self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+ ):
+ """
+ Updates the statistics for a subject, with a delta (difference/relative
+ change).
+
+ Args:
+ ts (int): timestamp of the change
+ stats_type (str): "room" or "user" – the kind of subject
+ stats_id (str): the subject's ID (room ID or user ID)
+ fields (dict[str, int]): Deltas of stats values.
+ complete_with_stream_id (int, optional):
+ If supplied, converts an incomplete row into a complete row,
+ with the supplied stream_id marked as the stream_id where the
+ row was completed.
+ """
+
+ return self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
+ )
+
+ def _update_stats_delta_txn(
+ self,
+ txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=None,
+ absolute_field_overrides=None,
+ ):
+ """
+ See L{update_stats_delta}
+ Additional Args:
+ absolute_field_overrides (dict[str, int]): Current stats values
+ (i.e. not deltas) of absolute fields.
+ Does not work with per-slice fields.
+ """
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ quantised_ts = self.quantise_stats_time(int(ts))
+ end_ts = quantised_ts + self.stats_bucket_size
+
+ abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+ slice_field_names = PER_SLICE_FIELDS[stats_type]
+ for field in chain(fields.keys(), absolute_field_overrides.keys()):
+ if field not in abs_field_names and field not in slice_field_names:
+ # guard against potential SQL injection dodginess
+ raise ValueError(
+ "%s is not a recognised field"
+ " for stats type %s" % (field, stats_type)
+ )
+
+ # only absolute stats fields are tracked in the `_current` stats tables,
+ # so those are the only ones that we process deltas for when
+ # we upsert against the `_current` table.
+
+ # This calculates the deltas (`field = field + ?` values)
+ # for absolute fields,
+ # * defaulting to 0 if not specified
+ # (required for the INSERT part of upserting to work)
+ # * omitting overrides specified in `absolute_field_overrides`
+ deltas_of_absolute_fields = {
+ key: fields.get(key, 0)
+ for key in abs_field_names
+ if key not in absolute_field_overrides
+ }
+
+ if absolute_field_overrides is None:
+ absolute_field_overrides = {}
+
+ if complete_with_stream_id is not None:
+ absolute_field_overrides = absolute_field_overrides.copy()
+ absolute_field_overrides[
+ "completed_delta_stream_id"
+ ] = complete_with_stream_id
+
+ # first upsert the `_current` table
+ self._upsert_with_additive_relatives_txn(
+ txn=txn,
+ table=table + "_current",
+ keyvalues={id_col: stats_id},
+ absolutes=absolute_field_overrides,
+ additive_relatives=deltas_of_absolute_fields,
+ )
+
+ if self.has_completed_background_updates():
+ # TODO want to check specifically for stats regenerator, not all
+ # background updates…
+ # then upsert the `_historical` table.
+ # we don't support absolute_fields for per-slice fields as it makes
+ # no sense.
+ per_slice_additive_relatives = {
+ key: fields.get(key, 0) for key in slice_field_names
+ }
+ self._upsert_copy_from_table_with_additive_relatives_txn(
+ txn=txn,
+ into_table=table + "_historical",
+ keyvalues={id_col: stats_id},
+ extra_dst_keyvalues={
+ "end_ts": end_ts,
+ "bucket_size": self.stats_bucket_size,
+ },
+ additive_relatives=per_slice_additive_relatives,
+ src_table=table + "_current",
+ copy_columns=abs_field_names,
+ additional_where=" AND completed_delta_stream_id IS NOT NULL",
+ )
+
+ def _upsert_with_additive_relatives_txn(
+ self, txn, table, keyvalues, absolutes, additive_relatives
+ ):
+ """Used to update values in the stats tables.
+
+ Args:
+ txn: Transaction
+ table (str): Table name
+ keyvalues (dict[str, any]): Row-identifying key values
+ absolutes (dict[str, any]): Absolute (set) fields
+ additive_relatives (dict[str, int]): Fields that will be added onto
+ if existing row present.
+ """
+ if self.database_engine.can_native_upsert:
+ absolute_updates = [
+ "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+ for field in absolutes.keys()
+ ]
+
+ relative_updates = [
+ "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+ % {"table": table, "field": field}
+ for field in additive_relatives.keys()
+ ]
+
+ insert_cols = []
+ qargs = [table]
+
+ for (key, val) in chain(
+ keyvalues.items(), absolutes.items(), additive_relatives.items()
+ ):
+ insert_cols.append(key)
+ qargs.append(val)
+
+ sql = """
+ INSERT INTO %(table)s (%(insert_cols_cs)s)
+ VALUES (%(insert_vals_qs)s)
+ ON CONFLICT DO UPDATE SET %(updates)s
+ """ % {
+ "table": table,
+ "insert_cols_cs": ", ".join(insert_cols),
+ "insert_vals_qs": ", ".join(
+ ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+ ),
+ "updates": ", ".join(chain(absolute_updates, relative_updates)),
+ }
+
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, table)
+ retcols = chain(absolutes.keys(), additive_relatives.keys())
+ current_row = self._simple_select_one_txn(
+ txn, table, keyvalues, retcols, allow_none=True
+ )
+ if current_row is None:
+ merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+ self._simple_insert_txn(txn, table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ current_row[key] += val
+ current_row.update(absolutes)
+ self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+ def _upsert_copy_from_table_with_additive_relatives_txn(
+ self,
+ txn,
+ into_table,
+ keyvalues,
+ extra_dst_keyvalues,
+ additive_relatives,
+ src_table,
+ copy_columns,
+ additional_where="",
+ ):
+ """
+ Args:
+ txn: Transaction
+ into_table (str): The destination table to UPSERT the row into
+ keyvalues (dict[str, any]): Row-identifying key values
+ extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+ for `into_table`.
+ additive_relatives (dict[str, any]): Fields that will be added onto
+ if existing row present. (Must be disjoint from copy_columns.)
+ src_table (str): The source table to copy from
+ copy_columns (iterable[str]): The list of columns to copy
+ additional_where (str): Additional SQL for where (prefix with AND
+ if using).
+ """
+ if self.database_engine.can_native_upsert:
+ ins_columns = chain(
+ keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
+ )
+ sel_exprs = chain(
+ keyvalues,
+ copy_columns,
+ ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+ )
+ keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+ sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+ sets_ar = (
+ "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+ )
+
+ sql = """
+ INSERT INTO %(into_table)s (%(ins_columns)s)
+ SELECT %(sel_exprs)s
+ FROM %(src_table)s
+ WHERE %(keyvalues_where)s %(additional_where)s
+ ON CONFLICT (%(keyvalues)s)
+ DO UPDATE SET %(sets)s
+ """ % {
+ "into_table": into_table,
+ "ins_columns": ", ".join(ins_columns),
+ "sel_exprs": ", ".join(sel_exprs),
+ "keyvalues_where": " AND ".join(keyvalues_where),
+ "src_table": src_table,
+ "keyvalues": ", ".join(
+ chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+ ),
+ "sets": ", ".join(chain(sets_cc, sets_ar)),
+ "additional_where": additional_where,
+ }
+
+ qargs = chain(additive_relatives.values(), keyvalues.values())
+ txn.execute(sql, qargs)
+ else:
+ self.database_engine.lock_table(txn, into_table)
+ src_row = self._simple_select_one_txn(
+ txn, src_table, keyvalues, copy_columns
+ )
+ dest_current_row = self._simple_select_one_txn(
+ txn,
+ into_table,
+ keyvalues,
+ chain(additive_relatives.keys(), copy_columns),
+ allow_none=True,
+ )
+
+ if dest_current_row is None:
+ merged_dict = {**keyvalues, **src_row, **additive_relatives}
+ self._simple_insert_txn(txn, into_table, merged_dict)
+ else:
+ for (key, val) in additive_relatives.items():
+ src_row[key] = dest_current_row[key] + val
+ self._simple_update_txn(txn, into_table, keyvalues, src_row)
|