From 8de9ebe35ddd9a0c5e73358fc861f375868b6c84 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 13:22:07 +0100 Subject: Tear out current room & user statistics (#5880) * Tear out current room & user statistics. Signed-off-by: Olivier Wilkinson (reivilibre) * Black is back with more linting complaints Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 33 ++++++++++++++++++++++ 1 file changed, 33 insertions(+) create mode 100644 synapse/storage/schema/delta/56/stats_separated1.sql (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql new file mode 100644 index 0000000000..5b125d17b0 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -0,0 +1,33 @@ +/* Copyright 2018 New Vector Ltd + * Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +----- First clean up from previous versions of room stats. + +-- First remove old stats stuff +DROP TABLE IF EXISTS room_stats; +DROP TABLE IF EXISTS user_stats; +DROP TABLE IF EXISTS room_stats_earliest_tokens; +DROP TABLE IF EXISTS _temp_populate_stats_position; +DROP TABLE IF EXISTS _temp_populate_stats_rooms; +DROP TABLE IF EXISTS stats_stream_pos; + +-- Unschedule old background updates if they're still scheduled +DELETE FROM background_updates WHERE update_name IN ( + 'populate_stats_createtables', + 'populate_stats_process_rooms', + 'populate_stats_cleanup' +); -- cgit 1.5.1 From d7675e79e13c1fd44dd97033d1e69f57811d75d1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 13:45:05 +0100 Subject: Add schema for Separated Statistics Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 115 +++++++++++++++++++++ .../storage/schema/delta/56/stats_separated2.py | 87 ++++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 synapse/storage/schema/delta/56/stats_separated2.py (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 5b125d17b0..8d8e8fb97c 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -31,3 +31,118 @@ DELETE FROM background_updates WHERE update_name IN ( 'populate_stats_process_rooms', 'populate_stats_cleanup' ); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +CREATE TABLE IF NOT EXISTS stats_incremental_position ( + -- the stream_id of the last-processed state delta + state_delta_stream_id BIGINT, + + -- the stream_ordering of the last-processed backfilled event + -- (this is negative) + total_events_min_stream_ordering BIGINT, + + -- the stream_ordering of the last-processed normally-created event + -- (this is positive) + total_events_max_stream_ordering BIGINT, + + -- If true, this represents the contract agreed upon by the background + -- population processor. + -- If false, this is suitable for use by the delta/incremental processor. + is_background_contract BOOLEAN NOT NULL PRIMARY KEY +); + +-- insert a null row and make sure it is the only one. +DELETE FROM stats_incremental_position; +INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract +) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); + +-- represents PRESENT room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- These starts cover the time from start_ts...end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + start_ts BIGINT, + end_ts BIGINT, + + current_state_events INT NOT NULL DEFAULT 0, + total_events INT NOT NULL DEFAULT 0, + joined_members INT NOT NULL DEFAULT 0, + invited_members INT NOT NULL DEFAULT 0, + left_members INT NOT NULL DEFAULT 0, + banned_members INT NOT NULL DEFAULT 0, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT, + + CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) +); + + +-- represents HISTORICAL room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_historical ( + room_id TEXT NOT NULL, + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular room.) + + +-- represents PRESENT statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + -- The timestamp that represents the start of the + start_ts BIGINT, + end_ts BIGINT, + + public_rooms INT DEFAULT 0 NOT NULL, + private_rooms INT DEFAULT 0 NOT NULL, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + +-- represents HISTORICAL statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts); + +-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular user.) diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py new file mode 100644 index 0000000000..049867fa3e --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This schema delta will be run after 'stats_separated1.sql' due to lexicographic +# ordering. Note that it MUST be so. +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +def _run_create_generic(stats_type, cursor, database_engine): + """ + Creates the pertinent (partial, if supported) indices for one kind of stats. + Args: + stats_type: "room" or "user" - the type of stats + cursor: Database Cursor + database_engine: Database Engine + """ + if isinstance(database_engine, Sqlite3Engine): + # even though SQLite >= 3.8 can support partial indices, we won't enable + # them, in case the SQLite database may be later used on another system. + # It's also the case that SQLite is only likely to be used in small + # deployments or testing, where the optimisations gained by use of a + # partial index are not a big concern. + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts); + """ + % (stats_type, stats_type) + ) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (completed_delta_stream_id, %s_id); + """ + % (stats_type, stats_type, stats_type) + ) + elif isinstance(database_engine, PostgresEngine): + # This partial index helps us with finding dirty stats rows + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts) + WHERE end_ts IS NOT NULL; + """ + % (stats_type, stats_type) + ) + # This partial index helps us with old collection + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (%s_id) + WHERE completed_delta_stream_id IS NULL; + """ + % (stats_type, stats_type, stats_type) + ) + else: + raise NotImplementedError("Unknown database engine.") + + +def run_create(cursor, database_engine): + """ + This function is called as part of the schema delta. + It will create indices - partial, if supported - for the new 'separated' + room & user statistics. + """ + _run_create_generic("room", cursor, database_engine) + _run_create_generic("user", cursor, database_engine) + + +def run_upgrade(cur, database_engine, config): + """ + This function is run on a database upgrade (of a non-empty database). + We have no need to do anything specific here. + """ + pass -- cgit 1.5.1 From eafa8d3c54e03ca2c7929125a5ce5ea015491bf5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 16:03:37 +0100 Subject: Unify name of 'stats regenerator' in schema comments. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 8d8e8fb97c..19a91416c2 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -47,8 +47,8 @@ CREATE TABLE IF NOT EXISTS stats_incremental_position ( -- (this is positive) total_events_max_stream_ordering BIGINT, - -- If true, this represents the contract agreed upon by the background - -- population processor. + -- If true, this represents the contract agreed upon by the stats + -- regenerator. -- If false, this is suitable for use by the delta/incremental processor. is_background_contract BOOLEAN NOT NULL PRIMARY KEY ); @@ -78,8 +78,8 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( left_members INT NOT NULL DEFAULT 0, banned_members INT NOT NULL DEFAULT 0, - -- If initial background count is still to be performed: NULL - -- If initial background count has been performed: the maximum delta stream + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT, @@ -123,8 +123,8 @@ CREATE TABLE IF NOT EXISTS user_stats_current ( public_rooms INT DEFAULT 0 NOT NULL, private_rooms INT DEFAULT 0 NOT NULL, - -- If initial background count is still to be performed: NULL - -- If initial background count has been performed: the maximum delta stream + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT ); -- cgit 1.5.1 From 18a4c03c50236a2da6b5aa5321ca084f18dbc36d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 16:04:04 +0100 Subject: Remove needless defaults. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 19a91416c2..1e17eae226 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -71,12 +71,12 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( start_ts BIGINT, end_ts BIGINT, - current_state_events INT NOT NULL DEFAULT 0, - total_events INT NOT NULL DEFAULT 0, - joined_members INT NOT NULL DEFAULT 0, - invited_members INT NOT NULL DEFAULT 0, - left_members INT NOT NULL DEFAULT 0, - banned_members INT NOT NULL DEFAULT 0, + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream @@ -120,8 +120,8 @@ CREATE TABLE IF NOT EXISTS user_stats_current ( start_ts BIGINT, end_ts BIGINT, - public_rooms INT DEFAULT 0 NOT NULL, - private_rooms INT DEFAULT 0 NOT NULL, + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream -- cgit 1.5.1 From 7b657f1148fa10234d52d333ff176969f296aa0f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 15:40:58 +0100 Subject: Simplify table structure This obviates the need for old collection, but comes at the minor cost of not being able to track historical stats or per-slice fields until after the statistics regenerator is finished. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 15 +- .../storage/schema/delta/56/stats_separated2.py | 18 +- synapse/storage/stats.py | 280 +++++++++++++-------- 3 files changed, 175 insertions(+), 138 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 1e17eae226..d7418fdf1e 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position ( ) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); -- represents PRESENT room statistics for a room +-- only holds absolute fields CREATE TABLE IF NOT EXISTS room_stats_current ( room_id TEXT NOT NULL PRIMARY KEY, - -- These starts cover the time from start_ts...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. - start_ts BIGINT, - end_ts BIGINT, - current_state_events INT NOT NULL, total_events INT NOT NULL, joined_members INT NOT NULL, @@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT, - - CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) ); @@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. + -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, @@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical -- represents PRESENT statistics for a user +-- only holds absolute fields CREATE TABLE IF NOT EXISTS user_stats_current ( user_id TEXT NOT NULL PRIMARY KEY, - -- The timestamp that represents the start of the - start_ts BIGINT, - end_ts BIGINT, - public_rooms INT NOT NULL, private_rooms INT NOT NULL, diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py index 049867fa3e..942d240010 100644 --- a/synapse/storage/schema/delta/56/stats_separated2.py +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -32,13 +32,6 @@ def _run_create_generic(stats_type, cursor, database_engine): # It's also the case that SQLite is only likely to be used in small # deployments or testing, where the optimisations gained by use of a # partial index are not a big concern. - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts); - """ - % (stats_type, stats_type) - ) cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete @@ -47,16 +40,7 @@ def _run_create_generic(stats_type, cursor, database_engine): % (stats_type, stats_type, stats_type) ) elif isinstance(database_engine, PostgresEngine): - # This partial index helps us with finding dirty stats rows - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts) - WHERE end_ts IS NOT NULL; - """ - % (stats_type, stats_type) - ) - # This partial index helps us with old collection + # This partial index helps us with finding incomplete stats rows cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index cebe4cbc57..4cb10dc9fb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None ): @@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore): row was completed. """ - while True: - try: - res = yield self.runInteraction( - "update_stats_delta", - self._update_stats_delta_txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=complete_with_stream_id, - ) - return res - except OldCollectionRequired: - # retry after collecting old rows - # TODO (implement later) - raise NotImplementedError("old collection not in this PR") + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """ + + Args: + txn: Transaction + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [table] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + retcols = chain(absolutes.keys(), additive_relatives.keys()) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + for (key, val) in absolutes.items(): + current_row[key] = val + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + ): + """ + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + """ + if self.database_engine.can_native_upsert: + ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + sel_exprs = chain( + keyvalues, copy_columns, ("?" for _ in additive_relatives) + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": ", ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join(keyvalues.keys()), + "sets": ", ".join(chain(sets_cc, sets_ar)), + } + + qargs = chain(additive_relatives.values(), keyvalues.values()) + txn.execute(sql, qargs) + else: + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues, + chain(additive_relatives.keys(), copy_columns), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = {**keyvalues, **src_row, **additive_relatives} + self._simple_insert_txn(txn, into_table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, keyvalues, src_row) def _update_stats_delta_txn( self, @@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + absolute_fields (dict[str, int]): Absolute current stats values + (i.e. not deltas). Does not work with per-slice fields. """ table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] for field in chain(fields.keys(), absolute_fields.keys()): - if ( - field not in ABSOLUTE_STATS_FIELDS[stats_type] - and field not in PER_SLICE_FIELDS[stats_type] - ): + if field not in abs_field_names and field not in slice_field_names: # guard against potential SQL injection dodginess raise ValueError( "%s is not a recognised field" " for stats type %s" % (field, stats_type) ) - field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] - field_values = list(fields.values()) - - if absolute_fields is not None: - field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] - field_values += list(absolute_fields.values()) - - if complete_with_stream_id is not None: - field_sqls.append("completed_delta_stream_id = ?") - field_values.append(complete_with_stream_id) - - # update current row, but only if it is either: - # - dirty and complete but not old - # If it is old, we should old-collect it first and retry. - # - dirty and incomplete - # Incomplete rows can't be old-collected (as this would commit - # false statistics into the _historical table). - # Instead, their `end_ts` is extended, whilst we wait for them to - # become complete at the hand of the stats regenerator. - sql = ( - "UPDATE %s_current SET end_ts = ?, %s" - " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" - " AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = [end_ts] + list(field_values) + [end_ts, stats_id] - - txn.execute(sql, qargs) + additive_relatives = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_fields + } - if txn.rowcount > 0: - # success. - return + if absolute_fields is None: + absolute_fields = {} + elif complete_with_stream_id is not None: + absolute_fields = absolute_fields.copy() - # if we're here, it's because we didn't succeed in updating a stats - # row. Why? Let's find out… + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - current_row = self._simple_select_one_txn( + # first upsert the current table + self._upsert_with_additive_relatives_txn( txn, table + "_current", {id_col: stats_id}, - ("end_ts", "completed_delta_stream_id"), - allow_none=True, + absolute_fields, + additive_relatives, ) - if current_row is None: - # Failure reason: There is no row. - # Solution: - # we need to insert a row! (insert a dirty, incomplete row) - insertee = { - id_col: stats_id, - "end_ts": end_ts, - "start_ts": ts, - "completed_delta_stream_id": complete_with_stream_id, + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the historical table. + # we don't support absolute_fields for slice_field_names as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) + for key in slice_field_names } - - # we assume that, by default, blank fields should be zero. - for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: - insertee[field_name] = 0 - - for field_name in PER_SLICE_FIELDS[stats_type]: - insertee[field_name] = 0 - - for (field, value) in fields.items(): - insertee[field] = value - - if absolute_fields is not None: - for (field, value) in absolute_fields.items(): - insertee[field] = value - - self._simple_insert_txn(txn, table + "_current", insertee) - - elif current_row["end_ts"] is None: - # Failure reason: The row is not dirty. - # Solution: - # update the row, including `start_ts`, to make it dirty. - sql = ( - "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" - " WHERE end_ts IS NULL AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = ( - [end_ts - self.stats_bucket_size, end_ts] - + list(field_values) - + [stats_id] + self._upsert_copy_from_table_with_additive_relatives_txn( + txn, + table + "_historical", + {id_col: stats_id}, + per_slice_additive_relatives, + table + "_current", + abs_field_names ) - - txn.execute(sql, qargs) - if txn.rowcount == 0: - raise RuntimeError( - "Should be impossible: No rows updated" - " but all conditions are known to be met." - ) - - elif current_row["end_ts"] < end_ts: - # Failure reason: The row is complete and old. - # Solution: We need to perform old collection first - raise OldCollectionRequired() -- cgit 1.5.1 From e8fc180d4dbcf8237769397652356ffa23a2e952 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 16:10:05 +0100 Subject: Fix up SQL schema delta Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index d7418fdf1e..95daf8f53b 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. - completed_delta_stream_id BIGINT, + completed_delta_stream_id BIGINT ); -- cgit 1.5.1 From 1ecd1a6a5fe95df7a726362c143320fab09373c2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 09:46:13 +0100 Subject: Use engine-specific delta SQL files rather than delta written in Python. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated2.py | 71 ---------------------- .../schema/delta/56/stats_separated2.sql.postgres | 24 ++++++++ .../schema/delta/56/stats_separated2.sql.sqlite | 27 ++++++++ 3 files changed, 51 insertions(+), 71 deletions(-) delete mode 100644 synapse/storage/schema/delta/56/stats_separated2.py create mode 100644 synapse/storage/schema/delta/56/stats_separated2.sql.postgres create mode 100644 synapse/storage/schema/delta/56/stats_separated2.sql.sqlite (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py deleted file mode 100644 index 942d240010..0000000000 --- a/synapse/storage/schema/delta/56/stats_separated2.py +++ /dev/null @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This schema delta will be run after 'stats_separated1.sql' due to lexicographic -# ordering. Note that it MUST be so. -from synapse.storage.engines import PostgresEngine, Sqlite3Engine - - -def _run_create_generic(stats_type, cursor, database_engine): - """ - Creates the pertinent (partial, if supported) indices for one kind of stats. - Args: - stats_type: "room" or "user" - the type of stats - cursor: Database Cursor - database_engine: Database Engine - """ - if isinstance(database_engine, Sqlite3Engine): - # even though SQLite >= 3.8 can support partial indices, we won't enable - # them, in case the SQLite database may be later used on another system. - # It's also the case that SQLite is only likely to be used in small - # deployments or testing, where the optimisations gained by use of a - # partial index are not a big concern. - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_not_complete - ON %s_stats_current (completed_delta_stream_id, %s_id); - """ - % (stats_type, stats_type, stats_type) - ) - elif isinstance(database_engine, PostgresEngine): - # This partial index helps us with finding incomplete stats rows - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_not_complete - ON %s_stats_current (%s_id) - WHERE completed_delta_stream_id IS NULL; - """ - % (stats_type, stats_type, stats_type) - ) - else: - raise NotImplementedError("Unknown database engine.") - - -def run_create(cursor, database_engine): - """ - This function is called as part of the schema delta. - It will create indices - partial, if supported - for the new 'separated' - room & user statistics. - """ - _run_create_generic("room", cursor, database_engine) - _run_create_generic("user", cursor, database_engine) - - -def run_upgrade(cur, database_engine, config): - """ - This function is run on a database upgrade (of a non-empty database). - We have no need to do anything specific here. - """ - pass diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres new file mode 100644 index 0000000000..0519fcff79 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres @@ -0,0 +1,24 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- These partial indices helps us with finding incomplete stats row +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (room_id) + WHERE completed_delta_stream_id IS NULL; + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (user_id) + WHERE completed_delta_stream_id IS NULL; + diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite new file mode 100644 index 0000000000..181f4ec5b9 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite @@ -0,0 +1,27 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- even though SQLite >= 3.8 can support partial indices, we won't enable +-- them, in case the SQLite database may be later used on another system. +-- It's also the case that SQLite is only likely to be used in small +-- deployments or testing, where the optimisations gained by use of a +-- partial index are not a big concern. + +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (completed_delta_stream_id, room_id); + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (completed_delta_stream_id, user_id); + -- cgit 1.5.1 From 09cbc3a8e9c0f494fb272cb3761024a851b3e3f8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:50:58 +0100 Subject: Switch to milliseconds in room/user stats for consistency. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/config/stats.py | 13 +++++-------- synapse/storage/schema/delta/56/stats_separated1.sql | 2 +- synapse/storage/stats.py | 3 ++- 3 files changed, 8 insertions(+), 10 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b518a3ed9c..b18ddbd1fa 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -27,19 +27,16 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 + self.stats_bucket_size = 86400 * 1000 self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = ( - self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + self.stats_bucket_size = self.parse_duration( + stats_config.get("bucket_size", "1d") ) - self.stats_retention = ( - self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) - / 1000 + self.stats_retention = self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) ) def generate_config_section(self, config_dir_path, server_name, **kwargs): diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 95daf8f53b..045b5ca013 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -84,7 +84,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- represents HISTORICAL room statistics for a room CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, - -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms). -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 650c0050cb..35fca1dc7b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -64,7 +64,8 @@ class StatsStore(StateDeltasStore): Quantises a timestamp to be a multiple of the bucket size. Args: - ts (int): the timestamp to quantise, in seconds since the Unix Epoch + ts (int): the timestamp to quantise, in milliseconds since the Unix + Epoch Returns: int: a timestamp which -- cgit 1.5.1 From 11c4e506bd6254fcb9551729e70b7ade80b127e4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:24:25 +0100 Subject: Rename `room_state` table to `room_stats_state` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/events.py | 2 +- synapse/storage/schema/delta/56/stats_separated1.sql | 3 +++ synapse/storage/stats.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a95c36a8b..28fdba5fb2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2270,7 +2270,7 @@ class EventsStore( "room_aliases", "room_depth", "room_memberships", - "room_state", + "room_stats_state", "room_stats", "room_stats_earliest_token", "rooms", diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 045b5ca013..52fb09c0e6 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -137,3 +137,6 @@ CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical -- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that -- out for us. (We would want it to review stats for a particular user.) + +-- Also rename room_state to room_stats_state to make its ownership clear. +ALTER TABLE room_state RENAME TO room_stats_state; diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 824e57bad7..fce0fb5a56 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -92,7 +92,7 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", -- cgit 1.5.1 From bc2c284dbe02ef283ab525f14febd7d0998ba552 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 14:28:44 +0100 Subject: Add `total_event_bytes` to room statistics schema. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 2 ++ synapse/storage/stats.py | 1 + 2 files changed, 3 insertions(+) (limited to 'synapse/storage/schema') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 52fb09c0e6..6d4648c0d7 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, @@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS room_stats_historical ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3df57b52ea..8c99a125a9 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -36,6 +36,7 @@ ABSOLUTE_STATS_FIELDS = { "left_members", "banned_members", "total_events", + "total_event_bytes", ), "user": ("public_rooms", "private_rooms"), } -- cgit 1.5.1