diff options
-rw-r--r-- | docs/room_and_user_statistics.md | 146 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated1.sql | 106 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated2.sql.postgres | 24 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated2.sql.sqlite | 27 | ||||
-rw-r--r-- | synapse/storage/stats.py | 280 |
5 files changed, 581 insertions, 2 deletions
diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md new file mode 100644 index 0000000000..eb003f771a --- /dev/null +++ b/docs/room_and_user_statistics.md @@ -0,0 +1,146 @@ +Room and User Statistics +======================== + +Synapse maintains room and user statistics (as well as a cache of room state), +in various tables. + +These can be used for administrative purposes but are also used when generating +the public room directory. If these tables get stale or out of sync (possibly +after database corruption), you may wish to regenerate them. + + +# Synapse Administrator Documentation + +## Various SQL scripts that you may find useful + +### Delete stats, including historical stats + +```sql +DELETE FROM room_stats_current; +DELETE FROM room_stats_historical; +DELETE FROM user_stats_current; +DELETE FROM user_stats_historical; +``` + +### Regenerate stats (all subjects) + +```sql +BEGIN; + DELETE FROM stats_incremental_position; + INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract + ) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE); +COMMIT; + +DELETE FROM room_stats_current; +DELETE FROM user_stats_current; +``` + +then follow the steps below for **'Regenerate stats (missing subjects only)'** + +### Regenerate stats (missing subjects only) + +```sql +-- Set up staging tables +-- we depend on current_state_events_membership because this is used +-- in our counting. +INSERT INTO background_updates (update_name, progress_json) VALUES + ('populate_stats_prepare', '{}', 'current_state_events_membership'); + +-- Run through each room and update stats +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', 'populate_stats_prepare'); + +-- Run through each user and update stats. +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_users', '{}', 'populate_stats_process_rooms'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_cleanup', '{}', 'populate_stats_process_users'); +``` + +then **restart Synapse**. + + +# Synapse Developer Documentation + +## High-Level Concepts + +### Definitions + +* **subject**: Something we are tracking stats about – currently a room or user. +* **current row**: An entry for a subject in the appropriate current statistics + table. Each subject can have only one. +* **historical row**: An entry for a subject in the appropriate historical + statistics table. Each subject can have any number of these. + +### Overview + +Stats are maintained as time series. There are two kinds of column: + +* absolute columns – where the value is correct for the time given by `end_ts` + in the stats row. (Imagine a line graph for these values) +* per-slice columns – where the value corresponds to how many of the occurrences + occurred within the time slice given by `(end_ts − bucket_size)…end_ts` + or `start_ts…end_ts`. (Imagine a histogram for these values) + +Currently, only absolute columns are in use. + +Stats are maintained in two tables (for each type): current and historical. + +Current stats correspond to the present values. Each subject can only have one +entry. + +Historical stats correspond to values in the past. Subjects may have multiple +entries. + +## Concepts around the management of stats + +### current rows + +#### dirty current rows + +Current rows can be **dirty**, which means that they have changed since the +latest historical row for the same subject. +**Dirty** current rows possess an end timestamp, `end_ts`. + +#### old current rows and old collection + +When a (necessarily dirty) current row has an `end_ts` in the past, it is said +to be **old**. +Old current rows must be copied into a historical row, and cleared of their dirty +status, before further statistics can be tracked for that subject. +The process which does this is referred to as **old collection**. + +#### incomplete current rows + +There are also **incomplete** current rows, which are current rows that do not +contain a full count yet – this is because they are waiting for the stats +regenerator to give them an initial count. Incomplete current rows DO NOT contain +correct and up-to-date values. As such, *incomplete rows are not old-collected*. +Instead, old incomplete rows will be extended so they are no longer old. + +### historical rows + +Historical rows can always be considered to be valid for the time slice and +end time specified. (This, of course, assumes a lack of defects in the code +to track the statistics, and assumes integrity of the database). + +Even still, there are two considerations that we may need to bear in mind: + +* historical rows will not exist for every time slice – they will be omitted + if there were no changes. In this case, the following assumptions can be + made to interpolate/recreate missing rows: + - absolute fields have the same values as in the preceding row + - per-slice fields are zero (`0`) +* historical rows will not be retained forever – rows older than a configurable + time will be purged. + +#### purge + +The purging of historical rows is not yet implemented. + diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 5b125d17b0..95daf8f53b 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -31,3 +31,109 @@ DELETE FROM background_updates WHERE update_name IN ( 'populate_stats_process_rooms', 'populate_stats_cleanup' ); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +CREATE TABLE IF NOT EXISTS stats_incremental_position ( + -- the stream_id of the last-processed state delta + state_delta_stream_id BIGINT, + + -- the stream_ordering of the last-processed backfilled event + -- (this is negative) + total_events_min_stream_ordering BIGINT, + + -- the stream_ordering of the last-processed normally-created event + -- (this is positive) + total_events_max_stream_ordering BIGINT, + + -- If true, this represents the contract agreed upon by the stats + -- regenerator. + -- If false, this is suitable for use by the delta/incremental processor. + is_background_contract BOOLEAN NOT NULL PRIMARY KEY +); + +-- insert a null row and make sure it is the only one. +DELETE FROM stats_incremental_position; +INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract +) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); + +-- represents PRESENT room statistics for a room +-- only holds absolute fields +CREATE TABLE IF NOT EXISTS room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + + +-- represents HISTORICAL room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_historical ( + room_id TEXT NOT NULL, + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- Note that end_ts is quantised. + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular room.) + + +-- represents PRESENT statistics for a user +-- only holds absolute fields +CREATE TABLE IF NOT EXISTS user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + +-- represents HISTORICAL statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts); + +-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular user.) diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres new file mode 100644 index 0000000000..0519fcff79 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres @@ -0,0 +1,24 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- These partial indices helps us with finding incomplete stats row +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (room_id) + WHERE completed_delta_stream_id IS NULL; + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (user_id) + WHERE completed_delta_stream_id IS NULL; + diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite new file mode 100644 index 0000000000..181f4ec5b9 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite @@ -0,0 +1,27 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- even though SQLite >= 3.8 can support partial indices, we won't enable +-- them, in case the SQLite database may be later used on another system. +-- It's also the case that SQLite is only likely to be used in small +-- deployments or testing, where the optimisations gained by use of a +-- partial index are not a big concern. + +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (completed_delta_stream_id, room_id); + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (completed_delta_stream_id, user_id); + diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 123c1ae220..5a7dfde926 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,6 +15,7 @@ # limitations under the License. import logging +from itertools import chain from synapse.storage.state_deltas import StateDeltasStore @@ -27,12 +29,21 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "total_events", ), "user": ("public_rooms", "private_rooms"), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +PER_SLICE_FIELDS = {"room": (), "user": ()} + +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} + + +class OldCollectionRequired(Exception): + """ Signal that we need to collect old stats rows and retry. """ + + pass class StatsStore(StateDeltasStore): @@ -48,6 +59,21 @@ class StatsStore(StateDeltasStore): self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. + + Args: + ts (int): the timestamp to quantise, in seconds since the Unix Epoch + + Returns: + int: a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. + """ + return (ts // self.stats_bucket_size) * self.stats_bucket_size + def update_room_state(self, room_id, fields): """ Args: @@ -76,3 +102,253 @@ class StatsStore(StateDeltasStore): values=fields, desc="update_room_state", ) + + def update_stats_delta( + self, ts, stats_type, stats_id, fields, complete_with_stream_id=None + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + """ + + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """ + + Args: + txn: Transaction + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [table] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + retcols = chain(absolutes.keys(), additive_relatives.keys()) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + for (key, val) in absolutes.items(): + current_row[key] = val + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + additive_relatives, + src_table, + copy_columns, + additional_where="", + ): + """ + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + additional_where (str): Additional SQL for where (prefix with AND + if using). + """ + if self.database_engine.can_native_upsert: + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives.keys(), + extra_dst_keyvalues.keys(), + ) + sel_exprs = chain( + keyvalues, + copy_columns, + ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)), + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s %(additional_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": " AND ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), + "sets": ", ".join(chain(sets_cc, sets_ar)), + "additional_where": additional_where, + } + + qargs = chain(additive_relatives.values(), keyvalues.values()) + txn.execute(sql, qargs) + else: + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues, + chain(additive_relatives.keys(), copy_columns), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = {**keyvalues, **src_row, **additive_relatives} + self._simple_insert_txn(txn, into_table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, keyvalues, src_row) + + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_fields=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Absolute current stats values + (i.e. not deltas). Does not work with per-slice fields. + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_fields.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + + additive_relatives = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_fields + } + + if absolute_fields is None: + absolute_fields = {} + elif complete_with_stream_id is not None: + absolute_fields = absolute_fields.copy() + + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id + + # first upsert the current table + self._upsert_with_additive_relatives_txn( + txn, + table + "_current", + {id_col: stats_id}, + absolute_fields, + additive_relatives, + ) + + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the historical table. + # we don't support absolute_fields for slice_field_names as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn, + table + "_historical", + {id_col: stats_id}, + {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, + per_slice_additive_relatives, + table + "_current", + abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", + ) |