From 8de9ebe35ddd9a0c5e73358fc861f375868b6c84 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 13:22:07 +0100 Subject: Tear out current room & user statistics (#5880) * Tear out current room & user statistics. Signed-off-by: Olivier Wilkinson (reivilibre) * Black is back with more linting complaints Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 33 ++ synapse/storage/stats.py | 412 +-------------------- 2 files changed, 36 insertions(+), 409 deletions(-) create mode 100644 synapse/storage/schema/delta/56/stats_separated1.sql (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql new file mode 100644 index 0000000000..5b125d17b0 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -0,0 +1,33 @@ +/* Copyright 2018 New Vector Ltd + * Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +----- First clean up from previous versions of room stats. + +-- First remove old stats stuff +DROP TABLE IF EXISTS room_stats; +DROP TABLE IF EXISTS user_stats; +DROP TABLE IF EXISTS room_stats_earliest_tokens; +DROP TABLE IF EXISTS _temp_populate_stats_position; +DROP TABLE IF EXISTS _temp_populate_stats_rooms; +DROP TABLE IF EXISTS stats_stream_pos; + +-- Unschedule old background updates if they're still scheduled +DELETE FROM background_updates WHERE update_name IN ( + 'populate_stats_createtables', + 'populate_stats_process_rooms', + 'populate_stats_cleanup' +); diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e13efed417..123c1ae220 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,12 +15,7 @@ import logging -from twisted.internet import defer - -from synapse.api.constants import EventTypes, Membership -from synapse.storage.prepare_database import get_statements from synapse.storage.state_deltas import StateDeltasStore -from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -39,8 +34,6 @@ ABSOLUTE_STATS_FIELDS = { TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -TEMP_TABLE = "_temp_populate_stats" - class StatsStore(StateDeltasStore): def __init__(self, db_conn, hs): @@ -51,292 +44,9 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.register_background_update_handler( - "populate_stats_createtables", self._populate_stats_createtables - ) - self.register_background_update_handler( - "populate_stats_process_rooms", self._populate_stats_process_rooms - ) - self.register_background_update_handler( - "populate_stats_cleanup", self._populate_stats_cleanup - ) - - @defer.inlineCallbacks - def _populate_stats_createtables(self, progress, batch_size): - - if not self.stats_enabled: - yield self._end_background_update("populate_stats_createtables") - return 1 - - # Get all the rooms that we want to process. - def _make_staging_area(txn): - # Create the temporary tables - stmts = get_statements( - """ - -- We just recreate the table, we'll be reinserting the - -- correct entries again later anyway. - DROP TABLE IF EXISTS {temp}_rooms; - - CREATE TABLE IF NOT EXISTS {temp}_rooms( - room_id TEXT NOT NULL, - events BIGINT NOT NULL - ); - - CREATE INDEX {temp}_rooms_events - ON {temp}_rooms(events); - CREATE INDEX {temp}_rooms_id - ON {temp}_rooms(room_id); - """.format( - temp=TEMP_TABLE - ).splitlines() - ) - - for statement in stmts: - txn.execute(statement) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) - - # Get rooms we want to process from the database, only adding - # those that we haven't (i.e. those not in room_stats_earliest_token) - sql = """ - INSERT INTO %s_rooms (room_id, events) - SELECT c.room_id, count(*) FROM current_state_events AS c - LEFT JOIN room_stats_earliest_token AS t USING (room_id) - WHERE t.room_id IS NULL - GROUP BY c.room_id - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) - - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.runInteraction("populate_stats_temp_build", _make_staging_area) - yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) - self.get_earliest_token_for_room_stats.invalidate_all() - - yield self._end_background_update("populate_stats_createtables") - return 1 - - @defer.inlineCallbacks - def _populate_stats_cleanup(self, progress, batch_size): - """ - Update the user directory stream position, then clean up the old tables. - """ - if not self.stats_enabled: - yield self._end_background_update("populate_stats_cleanup") - return 1 - - position = yield self._simple_select_one_onecol( - TEMP_TABLE + "_position", None, "position" - ) - yield self.update_stats_stream_pos(position) - - def _delete_staging_area(txn): - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") - - yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) - - yield self._end_background_update("populate_stats_cleanup") - return 1 - - @defer.inlineCallbacks - def _populate_stats_process_rooms(self, progress, batch_size): - - if not self.stats_enabled: - yield self._end_background_update("populate_stats_process_rooms") - return 1 - - # If we don't have progress filed, delete everything. - if not progress: - yield self.delete_all_stats() - - def _get_next_batch(txn): - # Only fetch 250 rooms, so we don't fetch too many at once, even - # if those 250 rooms have less than batch_size state events. - sql = """ - SELECT room_id, events FROM %s_rooms - ORDER BY events DESC - LIMIT 250 - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) - rooms_to_work_on = txn.fetchall() - - if not rooms_to_work_on: - return None - - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - progress["remaining"] = txn.fetchone()[0] - - return rooms_to_work_on - - rooms_to_work_on = yield self.runInteraction( - "populate_stats_temp_read", _get_next_batch - ) - - # No more rooms -- complete the transaction. - if not rooms_to_work_on: - yield self._end_background_update("populate_stats_process_rooms") - return 1 - - logger.info( - "Processing the next %d rooms of %d remaining", - len(rooms_to_work_on), - progress["remaining"], - ) - - # Number of state events we've processed by going through each room - processed_event_count = 0 - - for room_id, event_count in rooms_to_work_on: - - current_state_ids = yield self.get_current_state_ids(room_id) - - join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) - history_visibility_id = current_state_ids.get( - (EventTypes.RoomHistoryVisibility, "") - ) - encryption_id = current_state_ids.get((EventTypes.RoomEncryption, "")) - name_id = current_state_ids.get((EventTypes.Name, "")) - topic_id = current_state_ids.get((EventTypes.Topic, "")) - avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) - canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) - - event_ids = [ - join_rules_id, - history_visibility_id, - encryption_id, - name_id, - topic_id, - avatar_id, - canonical_alias_id, - ] - - state_events = yield self.get_events( - [ev for ev in event_ids if ev is not None] - ) - - def _get_or_none(event_id, arg): - event = state_events.get(event_id) - if event: - return event.content.get(arg) - return None - - yield self.update_room_state( - room_id, - { - "join_rules": _get_or_none(join_rules_id, "join_rule"), - "history_visibility": _get_or_none( - history_visibility_id, "history_visibility" - ), - "encryption": _get_or_none(encryption_id, "algorithm"), - "name": _get_or_none(name_id, "name"), - "topic": _get_or_none(topic_id, "topic"), - "avatar": _get_or_none(avatar_id, "url"), - "canonical_alias": _get_or_none(canonical_alias_id, "alias"), - }, - ) - - now = self.hs.get_reactor().seconds() - - # quantise time to the nearest bucket - now = (now // self.stats_bucket_size) * self.stats_bucket_size - - def _fetch_data(txn): - - # Get the current token of the room - current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) - - current_state_events = len(current_state_ids) - - membership_counts = self._get_user_counts_in_room_txn(txn, room_id) - - total_state_events = self._get_total_state_event_counts_txn( - txn, room_id - ) - - self._update_stats_txn( - txn, - "room", - room_id, - now, - { - "bucket_size": self.stats_bucket_size, - "current_state_events": current_state_events, - "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get(Membership.INVITE, 0), - "left_members": membership_counts.get(Membership.LEAVE, 0), - "banned_members": membership_counts.get(Membership.BAN, 0), - "state_events": total_state_events, - }, - ) - self._simple_insert_txn( - txn, - "room_stats_earliest_token", - {"room_id": room_id, "token": current_token}, - ) - - # We've finished a room. Delete it from the table. - self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id} - ) - - yield self.runInteraction("update_room_stats", _fetch_data) - - # Update the remaining counter. - progress["remaining"] -= 1 - yield self.runInteraction( - "populate_stats", - self._background_update_progress_txn, - "populate_stats_process_rooms", - progress, - ) - - processed_event_count += event_count - - if processed_event_count > batch_size: - # Don't process any more rooms, we've hit our batch size. - return processed_event_count - - return processed_event_count - - def delete_all_stats(self): - """ - Delete all statistics records. - """ - - def _delete_all_stats_txn(txn): - txn.execute("DELETE FROM room_state") - txn.execute("DELETE FROM room_stats") - txn.execute("DELETE FROM room_stats_earliest_token") - txn.execute("DELETE FROM user_stats") - - return self.runInteraction("delete_all_stats", _delete_all_stats_txn) - - def get_stats_stream_pos(self): - return self._simple_select_one_onecol( - table="stats_stream_pos", - keyvalues={}, - retcol="stream_id", - desc="stats_stream_pos", - ) - - def update_stats_stream_pos(self, stream_id): - return self._simple_update_one( - table="stats_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_stats_stream_pos", - ) + self.register_noop_background_update("populate_stats_createtables") + self.register_noop_background_update("populate_stats_process_rooms") + self.register_noop_background_update("populate_stats_cleanup") def update_room_state(self, room_id, fields): """ @@ -366,119 +76,3 @@ class StatsStore(StateDeltasStore): values=fields, desc="update_room_state", ) - - def get_deltas_for_room(self, room_id, start, size=100): - """ - Get statistics deltas for a given room. - - Args: - room_id (str) - start (int): Pagination start. Number of entries, not timestamp. - size (int): How many entries to return. - - Returns: - Deferred[list[dict]], where the dict has the keys of - ABSOLUTE_STATS_FIELDS["room"] and "ts". - """ - return self._simple_select_list_paginate( - "room_stats", - {"room_id": room_id}, - "ts", - start, - size, - retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), - order_direction="DESC", - ) - - def get_all_room_state(self): - return self._simple_select_list( - "room_state", None, retcols=("name", "topic", "canonical_alias") - ) - - @cached() - def get_earliest_token_for_room_stats(self, room_id): - """ - Fetch the "earliest token". This is used by the room stats delta - processor to ignore deltas that have been processed between the - start of the background task and any particular room's stats - being calculated. - - Returns: - Deferred[int] - """ - return self._simple_select_one_onecol( - "room_stats_earliest_token", - {"room_id": room_id}, - retcol="token", - allow_none=True, - ) - - def update_stats(self, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert( - table=table, - keyvalues={id_col: stats_id, "ts": ts}, - values=fields, - desc="update_stats", - ) - - def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert_txn( - txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields - ) - - def update_stats_delta(self, ts, stats_type, stats_id, field, value): - def _update_stats_delta(txn): - table, id_col = TYPE_TO_ROOM[stats_type] - - sql = ( - "SELECT * FROM %s" - " WHERE %s=? and ts=(" - " SELECT MAX(ts) FROM %s" - " WHERE %s=?" - ")" - ) % (table, id_col, table, id_col) - txn.execute(sql, (stats_id, stats_id)) - rows = self.cursor_to_dict(txn) - if len(rows) == 0: - # silently skip as we don't have anything to apply a delta to yet. - # this tries to minimise any race between the initial sync and - # subsequent deltas arriving. - return - - current_ts = ts - latest_ts = rows[0]["ts"] - if current_ts < latest_ts: - # This one is in the past, but we're just encountering it now. - # Mark it as part of the current bucket. - current_ts = latest_ts - elif ts != latest_ts: - # we have to copy our absolute counters over to the new entry. - values = { - key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] - } - values[id_col] = stats_id - values["ts"] = ts - values["bucket_size"] = self.stats_bucket_size - - self._simple_insert_txn(txn, table=table, values=values) - - # actually update the new value - if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: - self._simple_update_txn( - txn, - table=table, - keyvalues={id_col: stats_id, "ts": current_ts}, - updatevalues={field: value}, - ) - else: - sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( - table, - field, - field, - id_col, - ) - txn.execute(sql, (value, stats_id, current_ts)) - - return self.runInteraction("update_stats_delta", _update_stats_delta) -- cgit 1.5.1 From d7675e79e13c1fd44dd97033d1e69f57811d75d1 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 13:45:05 +0100 Subject: Add schema for Separated Statistics Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 115 +++++++++++++++++++++ .../storage/schema/delta/56/stats_separated2.py | 87 ++++++++++++++++ 2 files changed, 202 insertions(+) create mode 100644 synapse/storage/schema/delta/56/stats_separated2.py (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 5b125d17b0..8d8e8fb97c 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -31,3 +31,118 @@ DELETE FROM background_updates WHERE update_name IN ( 'populate_stats_process_rooms', 'populate_stats_cleanup' ); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +CREATE TABLE IF NOT EXISTS stats_incremental_position ( + -- the stream_id of the last-processed state delta + state_delta_stream_id BIGINT, + + -- the stream_ordering of the last-processed backfilled event + -- (this is negative) + total_events_min_stream_ordering BIGINT, + + -- the stream_ordering of the last-processed normally-created event + -- (this is positive) + total_events_max_stream_ordering BIGINT, + + -- If true, this represents the contract agreed upon by the background + -- population processor. + -- If false, this is suitable for use by the delta/incremental processor. + is_background_contract BOOLEAN NOT NULL PRIMARY KEY +); + +-- insert a null row and make sure it is the only one. +DELETE FROM stats_incremental_position; +INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract +) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); + +-- represents PRESENT room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- These starts cover the time from start_ts...end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + start_ts BIGINT, + end_ts BIGINT, + + current_state_events INT NOT NULL DEFAULT 0, + total_events INT NOT NULL DEFAULT 0, + joined_members INT NOT NULL DEFAULT 0, + invited_members INT NOT NULL DEFAULT 0, + left_members INT NOT NULL DEFAULT 0, + banned_members INT NOT NULL DEFAULT 0, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT, + + CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) +); + + +-- represents HISTORICAL room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_historical ( + room_id TEXT NOT NULL, + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular room.) + + +-- represents PRESENT statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + -- The timestamp that represents the start of the + start_ts BIGINT, + end_ts BIGINT, + + public_rooms INT DEFAULT 0 NOT NULL, + private_rooms INT DEFAULT 0 NOT NULL, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + +-- represents HISTORICAL statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts); + +-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular user.) diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py new file mode 100644 index 0000000000..049867fa3e --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This schema delta will be run after 'stats_separated1.sql' due to lexicographic +# ordering. Note that it MUST be so. +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +def _run_create_generic(stats_type, cursor, database_engine): + """ + Creates the pertinent (partial, if supported) indices for one kind of stats. + Args: + stats_type: "room" or "user" - the type of stats + cursor: Database Cursor + database_engine: Database Engine + """ + if isinstance(database_engine, Sqlite3Engine): + # even though SQLite >= 3.8 can support partial indices, we won't enable + # them, in case the SQLite database may be later used on another system. + # It's also the case that SQLite is only likely to be used in small + # deployments or testing, where the optimisations gained by use of a + # partial index are not a big concern. + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts); + """ + % (stats_type, stats_type) + ) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (completed_delta_stream_id, %s_id); + """ + % (stats_type, stats_type, stats_type) + ) + elif isinstance(database_engine, PostgresEngine): + # This partial index helps us with finding dirty stats rows + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts) + WHERE end_ts IS NOT NULL; + """ + % (stats_type, stats_type) + ) + # This partial index helps us with old collection + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (%s_id) + WHERE completed_delta_stream_id IS NULL; + """ + % (stats_type, stats_type, stats_type) + ) + else: + raise NotImplementedError("Unknown database engine.") + + +def run_create(cursor, database_engine): + """ + This function is called as part of the schema delta. + It will create indices - partial, if supported - for the new 'separated' + room & user statistics. + """ + _run_create_generic("room", cursor, database_engine) + _run_create_generic("user", cursor, database_engine) + + +def run_upgrade(cur, database_engine, config): + """ + This function is run on a database upgrade (of a non-empty database). + We have no need to do anything specific here. + """ + pass -- cgit 1.5.1 From 80a1c6e9e5e4fbaa58355559e42a9a1bbc91c81f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:08:02 +0100 Subject: Add storage function for storing stats deltas Old collection is not included in this commit Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 171 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 169 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 123c1ae220..e8b1ce240b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -16,6 +17,7 @@ import logging from synapse.storage.state_deltas import StateDeltasStore +from twisted.internet import defer logger = logging.getLogger(__name__) @@ -27,12 +29,21 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "total_events", ), "user": ("public_rooms", "private_rooms"), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +PER_SLICE_FIELDS = {"room": (), "user": ()} + +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} + + +class OldCollectionRequired(Exception): + """ Signal that we need to collect old stats rows and retry. """ + + pass class StatsStore(StateDeltasStore): @@ -48,6 +59,21 @@ class StatsStore(StateDeltasStore): self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. + + Args: + ts: the timestamp to quantise, in seconds since the Unix Epoch + + Returns: + a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. + """ + return (ts // self.stats_bucket_size) * self.stats_bucket_size + def update_room_state(self, room_id, fields): """ Args: @@ -76,3 +102,144 @@ class StatsStore(StateDeltasStore): values=fields, desc="update_room_state", ) + + @defer.inlineCallbacks + def update_stats_delta( + self, ts, stats_type, stats_id, fields, complete_with_stream_id=None + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + """ + + while True: + try: + res = yield self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + return res + except OldCollectionRequired: + # retry after collecting old rows + # TODO (implement later) + raise NotImplementedError("old collection not in this PR") + + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_fields=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] + field_values = list(fields.values()) + + if absolute_fields is not None: + field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] + field_values += list(absolute_fields.values()) + + if complete_with_stream_id is not None: + field_sqls.append("completed_delta_stream_id = ?") + field_values.append(complete_with_stream_id) + + sql = ( + "UPDATE %s_current SET end_ts = ?, %s" + " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" + " AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = [end_ts] + list(field_values) + [end_ts, stats_id] + + txn.execute(sql, qargs) + + if txn.rowcount > 0: + # success. + return + + # if we're here, it's because we didn't succeed in updating a stats + # row. Why? Let's find out… + + current_row = self._simple_select_one_txn( + txn, + table + "_current", + {id_col: stats_id}, + ("end_ts", "completed_delta_stream_id"), + allow_none=True, + ) + + if current_row is None: + # we need to insert a row! (insert a dirty, incomplete row) + insertee = { + id_col: stats_id, + "end_ts": end_ts, + "start_ts": ts, + "completed_delta_stream_id": complete_with_stream_id, + } + + # we assume that, by default, blank fields should be zero. + for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: + insertee[field_name] = 0 + + for field_name in PER_SLICE_FIELDS[stats_type]: + insertee[field_name] = 0 + + for (field, value) in fields.items(): + insertee[field] = value + + if absolute_fields is not None: + for (field, value) in absolute_fields.items(): + insertee[field] = value + + self._simple_insert_txn(txn, table + "_current", insertee) + + elif current_row["end_ts"] is None: + # update the row, including start_ts + sql = ( + "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" + " WHERE end_ts IS NULL AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = ( + [end_ts - self.stats_bucket_size, end_ts] + + list(field_values) + + [stats_id] + ) + + txn.execute(sql, qargs) + if txn.rowcount == 0: + raise RuntimeError( + "Should be impossible: No rows updated" + " but all conditions are known to be met." + ) + + elif current_row["end_ts"] < end_ts: + # we need to perform old collection first + raise OldCollectionRequired() -- cgit 1.5.1 From e4cbea6c46afe6c45e0ee0604eaf536da70cb9f3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:24:35 +0100 Subject: Handle state deltas and turn them into stats deltas Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 329 +++++++++++++++++++++++++++++++++++++++++++++- synapse/storage/stats.py | 112 +++++++++++++++- 2 files changed, 439 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 7b1d1b4203..156adfd310 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -15,7 +15,14 @@ import logging +from twisted.internet import defer + +from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.handlers.state_deltas import StateDeltasHandler +from synapse.metrics import event_processing_positions +from synapse.metrics.background_process_metrics import run_as_background_process +from synapse.types import UserID +from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -52,4 +59,324 @@ class StatsHandler(StateDeltasHandler): def notify_new_event(self): """Called when there may be more deltas to process """ - pass + if not self.hs.config.stats_enabled: + return + + lock = self.store.stats_delta_processing_lock + + @defer.inlineCallbacks + def process(): + try: + yield self._unsafe_process() + finally: + lock.release() + + if lock.acquire(blocking=False): + # we only want to run this process one-at-a-time, + # and also, if the initial background updater wants us to keep out, + # we should respect that. + try: + run_as_background_process("stats.notify_new_event", process) + except: # noqa: E722 – re-raised so fine + lock.release() + raise + + @defer.inlineCallbacks + def _unsafe_process(self): + # If self.pos is None then means we haven't fetched it from DB + if self.pos is None or None in self.pos.values(): + self.pos = yield self.store.get_stats_positions() + + # If still None then the initial background update hasn't started yet + if self.pos is None or None in self.pos.values(): + return None + + # Loop round handling deltas until we're up to date + with Measure(self.clock, "stats_delta"): + while True: + deltas = yield self.store.get_current_state_deltas( + self.pos["state_delta_stream_id"] + ) + if not deltas: + break + + logger.debug("Handling %d state deltas", len(deltas)) + yield self._handle_deltas(deltas) + + self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + + event_processing_positions.labels("stats").set( + self.pos["state_delta_stream_id"] + ) + + if self.pos is not None: + yield self.store.update_stats_positions(self.pos) + + @defer.inlineCallbacks + def _handle_deltas(self, deltas): + """ + Called with the state deltas to process + """ + for delta in deltas: + typ = delta["type"] + state_key = delta["state_key"] + room_id = delta["room_id"] + event_id = delta["event_id"] + stream_id = delta["stream_id"] + prev_event_id = delta["prev_event_id"] + stream_pos = delta["stream_id"] + + logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + + token = yield self.store.get_earliest_token_for_stats("room", room_id) + + # If the earliest token to begin from is larger than our current + # stream ID, skip processing this delta. + if token is not None and token >= stream_id: + logger.debug( + "Ignoring: %s as earlier than this room's initial ingestion event", + event_id, + ) + continue + + if event_id is None and prev_event_id is None: + # Errr... + continue + + event_content = {} + + if event_id is not None: + event = yield self.store.get_event(event_id, allow_none=True) + if event: + event_content = event.content or {} + + # We use stream_pos here rather than fetch by event_id as event_id + # may be None + now = yield self.store.get_received_ts_by_stream_pos(stream_pos) + now = int(now) // 1000 + + room_stats_delta = {} + room_stats_complete = False + + if prev_event_id is None: + # this state event doesn't overwrite another, + # so it is a new effective/current state event + room_stats_delta["current_state_events"] = ( + room_stats_delta.get("current_state_events", 0) + 1 + ) + + if typ == EventTypes.Member: + # we could use _get_key_change here but it's a bit inefficient + # given we're not testing for a specific result; might as well + # just grab the prev_membership and membership strings and + # compare them. + # We take None rather than leave as a previous membership + # in the absence of a previous event because we do not want to + # reduce the leave count when a new-to-the-room user joins. + prev_membership = None + if prev_event_id is not None: + prev_event = yield self.store.get_event( + prev_event_id, allow_none=True + ) + if prev_event: + prev_event_content = prev_event.content + prev_membership = prev_event_content.get( + "membership", Membership.LEAVE + ) + + membership = event_content.get("membership", Membership.LEAVE) + + if prev_membership is None: + logger.debug("No previous membership for this user.") + elif prev_membership == Membership.JOIN: + room_stats_delta["joined_members"] = ( + room_stats_delta.get("joined_members", 0) - 1 + ) + elif prev_membership == Membership.INVITE: + room_stats_delta["invited_members"] = ( + room_stats_delta.get("invited_members", 0) - 1 + ) + elif prev_membership == Membership.LEAVE: + room_stats_delta["left_members"] = ( + room_stats_delta.get("left_members", 0) - 1 + ) + elif prev_membership == Membership.BAN: + room_stats_delta["banned_members"] = ( + room_stats_delta.get("banned_members", 0) - 1 + ) + else: + err = "%s is not a valid prev_membership" % (repr(prev_membership),) + logger.error(err) + raise ValueError(err) + + if membership == Membership.JOIN: + room_stats_delta["joined_members"] = ( + room_stats_delta.get("joined_members", 0) + 1 + ) + elif membership == Membership.INVITE: + room_stats_delta["invited_members"] = ( + room_stats_delta.get("invited_members", 0) + 1 + ) + elif membership == Membership.LEAVE: + room_stats_delta["left_members"] = ( + room_stats_delta.get("left_members", 0) + 1 + ) + elif membership == Membership.BAN: + room_stats_delta["banned_members"] = ( + room_stats_delta.get("banned_members", 0) + 1 + ) + else: + err = "%s is not a valid membership" % (repr(membership),) + logger.error(err) + raise ValueError(err) + + user_id = state_key + if self.is_mine_id(user_id) and membership in ( + Membership.JOIN, + Membership.LEAVE, + ): + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + field = "public_rooms" if public else "private_rooms" + delta = +1 if membership == Membership.JOIN else -1 + + yield self.store.update_stats_delta( + now, "user", user_id, {field: delta} + ) + + elif typ == EventTypes.Create: + # Newly created room. Add it with all blank portions. + yield self.store.update_room_state( + room_id, + { + "join_rules": None, + "history_visibility": None, + "encryption": None, + "name": None, + "topic": None, + "avatar": None, + "canonical_alias": None, + }, + ) + + room_stats_complete = True + + elif typ == EventTypes.JoinRules: + old_room_state = yield self.store.get_room_state(room_id) + yield self.store.update_room_state( + room_id, {"join_rules": event_content.get("join_rule")} + ) + + # whether the room would be public anyway, + # because of history_visibility + other_field_gives_publicity = ( + old_room_state["history_visibility"] == "world_readable" + ) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.RoomHistoryVisibility: + old_room_state = yield self.store.get_room_state(room_id) + yield self.store.update_room_state( + room_id, + {"history_visibility": event_content.get("history_visibility")}, + ) + + # whether the room would be public anyway, + # because of join_rule + other_field_gives_publicity = ( + old_room_state["join_rules"] == JoinRules.PUBLIC + ) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) + + elif typ == EventTypes.Encryption: + yield self.store.update_room_state( + room_id, {"encryption": event_content.get("algorithm")} + ) + elif typ == EventTypes.Name: + yield self.store.update_room_state( + room_id, {"name": event_content.get("name")} + ) + elif typ == EventTypes.Topic: + yield self.store.update_room_state( + room_id, {"topic": event_content.get("topic")} + ) + elif typ == EventTypes.RoomAvatar: + yield self.store.update_room_state( + room_id, {"avatar": event_content.get("url")} + ) + elif typ == EventTypes.CanonicalAlias: + yield self.store.update_room_state( + room_id, {"canonical_alias": event_content.get("alias")} + ) + + if room_stats_complete: + yield self.store.update_stats_delta( + now, + "room", + room_id, + room_stats_delta, + complete_with_stream_id=stream_id, + ) + + elif len(room_stats_delta) > 0: + yield self.store.update_stats_delta( + now, "room", room_id, room_stats_delta + ) + + @defer.inlineCallbacks + def update_public_room_stats(self, ts, room_id, is_public): + """ + Increment/decrement a user's number of public rooms when a room they are + in changes to/from public visibility. + + Args: + ts (int): Timestamp in seconds + room_id (str) + is_public (bool) + """ + # For now, blindly iterate over all local users in the room so that + # we can handle the whole problem of copying buckets over as needed + user_ids = yield self.store.get_users_in_room(room_id) + + for user_id in user_ids: + if self.hs.is_mine(UserID.from_string(user_id)): + yield self.store.update_stats_delta( + ts, + "user", + user_id, + { + "public_rooms": +1 if is_public else -1, + "private_rooms": -1 if is_public else +1, + }, + ) + + @defer.inlineCallbacks + def _is_public_room(self, room_id): + join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules) + history_visibility = yield self.state.get_current_state( + room_id, EventTypes.RoomHistoryVisibility + ) + + if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or ( + ( + history_visibility + and history_visibility.content.get("history_visibility") + == "world_readable" + ) + ): + return True + else: + return False diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e8b1ce240b..4112291c76 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,10 +15,13 @@ # limitations under the License. import logging +from threading import Lock -from synapse.storage.state_deltas import StateDeltasStore from twisted.internet import defer +from synapse.storage.state_deltas import StateDeltasStore +from synapse.util.caches.descriptors import cached + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) @@ -55,6 +58,8 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size + self.stats_delta_processing_lock = Lock() + self.register_noop_background_update("populate_stats_createtables") self.register_noop_background_update("populate_stats_process_rooms") self.register_noop_background_update("populate_stats_cleanup") @@ -74,6 +79,91 @@ class StatsStore(StateDeltasStore): """ return (ts // self.stats_bucket_size) * self.stats_bucket_size + def get_stats_positions(self, for_initial_processor=False): + """ + Returns the stats processor positions. + + Args: + for_initial_processor (bool, optional): If true, returns the position + promised by the latest stats regeneration, rather than the current + incremental processor's position. + Otherwise (if false), return the incremental processor's position. + + Returns (dict): + Dict containing :- + state_delta_stream_id: stream_id of last-processed state delta + total_events_min_stream_ordering: stream_ordering of latest-processed + backfilled event, in the context of total_events counting. + total_events_max_stream_ordering: stream_ordering of latest-processed + non-backfilled event, in the context of total_events counting. + """ + return self._simple_select_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + desc="stats_incremental_position", + ) + + def _get_stats_positions_txn(self, txn, for_initial_processor=False): + """ + See L{get_stats_positions}. + + Args: + txn (cursor): Database cursor + """ + return self._simple_select_one_txn( + txn=txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + ) + + def update_stats_positions(self, positions, for_initial_processor=False): + """ + Updates the stats processor positions. + + Args: + positions: See L{get_stats_positions} + for_initial_processor: See L{get_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + desc="update_stats_incremental_position", + ) + + def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): + """ + See L{update_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + ) + def update_room_state(self, room_id, fields): """ Args: @@ -103,6 +193,26 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) + @cached() + def get_earliest_token_for_stats(self, stats_type, id): + """ + Fetch the "earliest token". This is used by the room stats delta + processor to ignore deltas that have been processed between the + start of the background task and any particular room's stats + being calculated. + + Returns: + Deferred[int] + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + return self._simple_select_one_onecol( + "%s_current" % (table,), + {id_col: id}, + retcol="completed_delta_stream_id", + allow_none=True, + ) + @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None -- cgit 1.5.1 From 1819563640e1be839c348e18afc1b59c7b3b8c9c Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 14:27:47 +0100 Subject: Ack, isort! Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e8b1ce240b..4e0a3d4f6e 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -16,9 +16,10 @@ import logging -from synapse.storage.state_deltas import StateDeltasStore from twisted.internet import defer +from synapse.storage.state_deltas import StateDeltasStore + logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) -- cgit 1.5.1 From b5573c0ffb97059f672d465be7dd38c94854411d Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 15:02:49 +0100 Subject: Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4e0a3d4f6e..095924cae6 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -68,7 +68,7 @@ class StatsStore(StateDeltasStore): ts: the timestamp to quantise, in seconds since the Unix Epoch Returns: - a timestamp which + int: a timestamp which - is divisible by the bucket size; - is no later than `ts`; and - is the largest such timestamp. -- cgit 1.5.1 From 4a97eef0dc49ee8f3c446221f0fcbb0e65ece113 Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 20 Aug 2019 15:12:21 +0100 Subject: Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 095924cae6..0445b97b4a 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -65,7 +65,7 @@ class StatsStore(StateDeltasStore): Quantises a timestamp to be a multiple of the bucket size. Args: - ts: the timestamp to quantise, in seconds since the Unix Epoch + ts (int): the timestamp to quantise, in seconds since the Unix Epoch Returns: int: a timestamp which -- cgit 1.5.1 From 981c6cf5442bfb16c177f995deedeb3ec44bf5fb Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 15:41:10 +0100 Subject: Sanitise accepted fields in `_update_stats_delta_txn` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 0445b97b4a..a372f35eae 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,6 +15,7 @@ # limitations under the License. import logging +from itertools import chain from twisted.internet import defer @@ -160,6 +161,17 @@ class StatsStore(StateDeltasStore): quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + for field in chain(fields.keys(), absolute_fields.keys()): + if ( + field not in ABSOLUTE_STATS_FIELDS[stats_type] + and field not in PER_SLICE_FIELDS[stats_type] + ): + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] field_values = list(fields.values()) -- cgit 1.5.1 From 977310ee2767e4edaa20e4a2216be359a7eb8002 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 15:49:00 +0100 Subject: Clarify `_update_stats_delta_txn` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a372f35eae..cebe4cbc57 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -183,6 +183,14 @@ class StatsStore(StateDeltasStore): field_sqls.append("completed_delta_stream_id = ?") field_values.append(complete_with_stream_id) + # update current row, but only if it is either: + # - dirty and complete but not old + # If it is old, we should old-collect it first and retry. + # - dirty and incomplete + # Incomplete rows can't be old-collected (as this would commit + # false statistics into the _historical table). + # Instead, their `end_ts` is extended, whilst we wait for them to + # become complete at the hand of the stats regenerator. sql = ( "UPDATE %s_current SET end_ts = ?, %s" " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" @@ -209,6 +217,8 @@ class StatsStore(StateDeltasStore): ) if current_row is None: + # Failure reason: There is no row. + # Solution: # we need to insert a row! (insert a dirty, incomplete row) insertee = { id_col: stats_id, @@ -234,7 +244,9 @@ class StatsStore(StateDeltasStore): self._simple_insert_txn(txn, table + "_current", insertee) elif current_row["end_ts"] is None: - # update the row, including start_ts + # Failure reason: The row is not dirty. + # Solution: + # update the row, including `start_ts`, to make it dirty. sql = ( "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" " WHERE end_ts IS NULL AND %s = ?" @@ -254,5 +266,6 @@ class StatsStore(StateDeltasStore): ) elif current_row["end_ts"] < end_ts: - # we need to perform old collection first + # Failure reason: The row is complete and old. + # Solution: We need to perform old collection first raise OldCollectionRequired() -- cgit 1.5.1 From eafa8d3c54e03ca2c7929125a5ce5ea015491bf5 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 16:03:37 +0100 Subject: Unify name of 'stats regenerator' in schema comments. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 8d8e8fb97c..19a91416c2 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -47,8 +47,8 @@ CREATE TABLE IF NOT EXISTS stats_incremental_position ( -- (this is positive) total_events_max_stream_ordering BIGINT, - -- If true, this represents the contract agreed upon by the background - -- population processor. + -- If true, this represents the contract agreed upon by the stats + -- regenerator. -- If false, this is suitable for use by the delta/incremental processor. is_background_contract BOOLEAN NOT NULL PRIMARY KEY ); @@ -78,8 +78,8 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( left_members INT NOT NULL DEFAULT 0, banned_members INT NOT NULL DEFAULT 0, - -- If initial background count is still to be performed: NULL - -- If initial background count has been performed: the maximum delta stream + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT, @@ -123,8 +123,8 @@ CREATE TABLE IF NOT EXISTS user_stats_current ( public_rooms INT DEFAULT 0 NOT NULL, private_rooms INT DEFAULT 0 NOT NULL, - -- If initial background count is still to be performed: NULL - -- If initial background count has been performed: the maximum delta stream + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT ); -- cgit 1.5.1 From 18a4c03c50236a2da6b5aa5321ca084f18dbc36d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 20 Aug 2019 16:04:04 +0100 Subject: Remove needless defaults. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 19a91416c2..1e17eae226 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -71,12 +71,12 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( start_ts BIGINT, end_ts BIGINT, - current_state_events INT NOT NULL DEFAULT 0, - total_events INT NOT NULL DEFAULT 0, - joined_members INT NOT NULL DEFAULT 0, - invited_members INT NOT NULL DEFAULT 0, - left_members INT NOT NULL DEFAULT 0, - banned_members INT NOT NULL DEFAULT 0, + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream @@ -120,8 +120,8 @@ CREATE TABLE IF NOT EXISTS user_stats_current ( start_ts BIGINT, end_ts BIGINT, - public_rooms INT DEFAULT 0 NOT NULL, - private_rooms INT DEFAULT 0 NOT NULL, + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream -- cgit 1.5.1 From 7b657f1148fa10234d52d333ff176969f296aa0f Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 15:40:58 +0100 Subject: Simplify table structure This obviates the need for old collection, but comes at the minor cost of not being able to track historical stats or per-slice fields until after the statistics regenerator is finished. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated1.sql | 15 +- .../storage/schema/delta/56/stats_separated2.py | 18 +- synapse/storage/stats.py | 280 +++++++++++++-------- 3 files changed, 175 insertions(+), 138 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 1e17eae226..d7418fdf1e 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -63,14 +63,10 @@ INSERT INTO stats_incremental_position ( ) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); -- represents PRESENT room statistics for a room +-- only holds absolute fields CREATE TABLE IF NOT EXISTS room_stats_current ( room_id TEXT NOT NULL PRIMARY KEY, - -- These starts cover the time from start_ts...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. - start_ts BIGINT, - end_ts BIGINT, - current_state_events INT NOT NULL, total_events INT NOT NULL, joined_members INT NOT NULL, @@ -82,8 +78,6 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. completed_delta_stream_id BIGINT, - - CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) ); @@ -91,7 +85,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). - -- Note that end_ts is quantised, and start_ts usually so. + -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, @@ -113,13 +107,10 @@ CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical -- represents PRESENT statistics for a user +-- only holds absolute fields CREATE TABLE IF NOT EXISTS user_stats_current ( user_id TEXT NOT NULL PRIMARY KEY, - -- The timestamp that represents the start of the - start_ts BIGINT, - end_ts BIGINT, - public_rooms INT NOT NULL, private_rooms INT NOT NULL, diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py index 049867fa3e..942d240010 100644 --- a/synapse/storage/schema/delta/56/stats_separated2.py +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -32,13 +32,6 @@ def _run_create_generic(stats_type, cursor, database_engine): # It's also the case that SQLite is only likely to be used in small # deployments or testing, where the optimisations gained by use of a # partial index are not a big concern. - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts); - """ - % (stats_type, stats_type) - ) cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete @@ -47,16 +40,7 @@ def _run_create_generic(stats_type, cursor, database_engine): % (stats_type, stats_type, stats_type) ) elif isinstance(database_engine, PostgresEngine): - # This partial index helps us with finding dirty stats rows - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_current_dirty - ON %s_stats_current (end_ts) - WHERE end_ts IS NOT NULL; - """ - % (stats_type, stats_type) - ) - # This partial index helps us with old collection + # This partial index helps us with finding incomplete stats rows cursor.execute( """ CREATE INDEX IF NOT EXISTS %s_stats_not_complete diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index cebe4cbc57..4cb10dc9fb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -105,7 +105,6 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - @defer.inlineCallbacks def update_stats_delta( self, ts, stats_type, stats_id, fields, complete_with_stream_id=None ): @@ -124,22 +123,142 @@ class StatsStore(StateDeltasStore): row was completed. """ - while True: - try: - res = yield self.runInteraction( - "update_stats_delta", - self._update_stats_delta_txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=complete_with_stream_id, - ) - return res - except OldCollectionRequired: - # retry after collecting old rows - # TODO (implement later) - raise NotImplementedError("old collection not in this PR") + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """ + + Args: + txn: Transaction + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [table] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + retcols = chain(absolutes.keys(), additive_relatives.keys()) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + current_row[key] += val + for (key, val) in absolutes.items(): + current_row[key] = val + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + ): + """ + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + """ + if self.database_engine.can_native_upsert: + ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + sel_exprs = chain( + keyvalues, copy_columns, ("?" for _ in additive_relatives) + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": ", ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join(keyvalues.keys()), + "sets": ", ".join(chain(sets_cc, sets_ar)), + } + + qargs = chain(additive_relatives.values(), keyvalues.values()) + txn.execute(sql, qargs) + else: + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues, + chain(additive_relatives.keys(), copy_columns), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = {**keyvalues, **src_row, **additive_relatives} + self._simple_insert_txn(txn, into_table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, keyvalues, src_row) def _update_stats_delta_txn( self, @@ -154,118 +273,61 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + absolute_fields (dict[str, int]): Absolute current stats values + (i.e. not deltas). Does not work with per-slice fields. """ table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) end_ts = quantised_ts + self.stats_bucket_size + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] for field in chain(fields.keys(), absolute_fields.keys()): - if ( - field not in ABSOLUTE_STATS_FIELDS[stats_type] - and field not in PER_SLICE_FIELDS[stats_type] - ): + if field not in abs_field_names and field not in slice_field_names: # guard against potential SQL injection dodginess raise ValueError( "%s is not a recognised field" " for stats type %s" % (field, stats_type) ) - field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] - field_values = list(fields.values()) - - if absolute_fields is not None: - field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] - field_values += list(absolute_fields.values()) - - if complete_with_stream_id is not None: - field_sqls.append("completed_delta_stream_id = ?") - field_values.append(complete_with_stream_id) - - # update current row, but only if it is either: - # - dirty and complete but not old - # If it is old, we should old-collect it first and retry. - # - dirty and incomplete - # Incomplete rows can't be old-collected (as this would commit - # false statistics into the _historical table). - # Instead, their `end_ts` is extended, whilst we wait for them to - # become complete at the hand of the stats regenerator. - sql = ( - "UPDATE %s_current SET end_ts = ?, %s" - " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" - " AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = [end_ts] + list(field_values) + [end_ts, stats_id] - - txn.execute(sql, qargs) + additive_relatives = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_fields + } - if txn.rowcount > 0: - # success. - return + if absolute_fields is None: + absolute_fields = {} + elif complete_with_stream_id is not None: + absolute_fields = absolute_fields.copy() - # if we're here, it's because we didn't succeed in updating a stats - # row. Why? Let's find out… + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - current_row = self._simple_select_one_txn( + # first upsert the current table + self._upsert_with_additive_relatives_txn( txn, table + "_current", {id_col: stats_id}, - ("end_ts", "completed_delta_stream_id"), - allow_none=True, + absolute_fields, + additive_relatives, ) - if current_row is None: - # Failure reason: There is no row. - # Solution: - # we need to insert a row! (insert a dirty, incomplete row) - insertee = { - id_col: stats_id, - "end_ts": end_ts, - "start_ts": ts, - "completed_delta_stream_id": complete_with_stream_id, + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the historical table. + # we don't support absolute_fields for slice_field_names as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) + for key in slice_field_names } - - # we assume that, by default, blank fields should be zero. - for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: - insertee[field_name] = 0 - - for field_name in PER_SLICE_FIELDS[stats_type]: - insertee[field_name] = 0 - - for (field, value) in fields.items(): - insertee[field] = value - - if absolute_fields is not None: - for (field, value) in absolute_fields.items(): - insertee[field] = value - - self._simple_insert_txn(txn, table + "_current", insertee) - - elif current_row["end_ts"] is None: - # Failure reason: The row is not dirty. - # Solution: - # update the row, including `start_ts`, to make it dirty. - sql = ( - "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" - " WHERE end_ts IS NULL AND %s = ?" - ) % (table, ", ".join(field_sqls), id_col) - - qargs = ( - [end_ts - self.stats_bucket_size, end_ts] - + list(field_values) - + [stats_id] + self._upsert_copy_from_table_with_additive_relatives_txn( + txn, + table + "_historical", + {id_col: stats_id}, + per_slice_additive_relatives, + table + "_current", + abs_field_names ) - - txn.execute(sql, qargs) - if txn.rowcount == 0: - raise RuntimeError( - "Should be impossible: No rows updated" - " but all conditions are known to be met." - ) - - elif current_row["end_ts"] < end_ts: - # Failure reason: The row is complete and old. - # Solution: We need to perform old collection first - raise OldCollectionRequired() -- cgit 1.5.1 From e8fc180d4dbcf8237769397652356ffa23a2e952 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 16:10:05 +0100 Subject: Fix up SQL schema delta Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index d7418fdf1e..95daf8f53b 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -77,7 +77,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- If initial stats regen is still to be performed: NULL -- If initial stats regen has been performed: the maximum delta stream -- position that this row takes into account. - completed_delta_stream_id BIGINT, + completed_delta_stream_id BIGINT ); -- cgit 1.5.1 From 79252d1c83f9d230f0e2320cc0a40493e22ad653 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 22 Aug 2019 16:10:32 +0100 Subject: Fix up historical stats support. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 43 ++++++++++++++++++++++++++++++++----------- 1 file changed, 32 insertions(+), 11 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4cb10dc9fb..5a7dfde926 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -17,8 +17,6 @@ import logging from itertools import chain -from twisted.internet import defer - from synapse.storage.state_deltas import StateDeltasStore logger = logging.getLogger(__name__) @@ -197,22 +195,41 @@ class StatsStore(StateDeltasStore): self._simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( - self, txn, into_table, keyvalues, additive_relatives, src_table, copy_columns + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + additive_relatives, + src_table, + copy_columns, + additional_where="", ): """ Args: txn: Transaction into_table (str): The destination table to UPSERT the row into keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. additive_relatives (dict[str, any]): Fields that will be added onto if existing row present. (Must be disjoint from copy_columns.) src_table (str): The source table to copy from copy_columns (iterable[str]): The list of columns to copy + additional_where (str): Additional SQL for where (prefix with AND + if using). """ if self.database_engine.can_native_upsert: - ins_columns = chain(keyvalues, copy_columns, additive_relatives.keys()) + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives.keys(), + extra_dst_keyvalues.keys(), + ) sel_exprs = chain( - keyvalues, copy_columns, ("?" for _ in additive_relatives) + keyvalues, + copy_columns, + ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) @@ -225,17 +242,20 @@ class StatsStore(StateDeltasStore): INSERT INTO %(into_table)s (%(ins_columns)s) SELECT %(sel_exprs)s FROM %(src_table)s - WHERE %(keyvalues_where)s + WHERE %(keyvalues_where)s %(additional_where)s ON CONFLICT (%(keyvalues)s) DO UPDATE SET %(sets)s """ % { "into_table": into_table, "ins_columns": ", ".join(ins_columns), "sel_exprs": ", ".join(sel_exprs), - "keyvalues_where": ", ".join(keyvalues_where), + "keyvalues_where": " AND ".join(keyvalues_where), "src_table": src_table, - "keyvalues": ", ".join(keyvalues.keys()), + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), "sets": ", ".join(chain(sets_cc, sets_ar)), + "additional_where": additional_where, } qargs = chain(additive_relatives.values(), keyvalues.values()) @@ -320,14 +340,15 @@ class StatsStore(StateDeltasStore): # we don't support absolute_fields for slice_field_names as it makes # no sense. per_slice_additive_relatives = { - key: fields.get(key, 0) - for key in slice_field_names + key: fields.get(key, 0) for key in slice_field_names } self._upsert_copy_from_table_with_additive_relatives_txn( txn, table + "_historical", {id_col: stats_id}, + {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, per_slice_additive_relatives, table + "_current", - abs_field_names + abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", ) -- cgit 1.5.1 From c3d2bf280790986017c43e3dde17c04a6797f557 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 08:52:20 +0100 Subject: Allow schema deltas to be engine-specific Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/prepare_database.py | 26 +++++++++++++++++++++----- 1 file changed, 21 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/prepare_database.py b/synapse/storage/prepare_database.py index d20eacda59..0270cd6f6c 100644 --- a/synapse/storage/prepare_database.py +++ b/synapse/storage/prepare_database.py @@ -238,6 +238,15 @@ def _upgrade_existing_database( logger.debug("applied_delta_files: %s", applied_delta_files) + if isinstance(database_engine, PostgresEngine): + specific_engine_extension = ".postgres" + else: + specific_engine_extension = ".sqlite" + + specific_engine_extensions = ( + ".sqlite", ".postgres" + ) + for v in range(start_ver, SCHEMA_VERSION + 1): logger.info("Upgrading schema to v%d", v) @@ -274,15 +283,22 @@ def _upgrade_existing_database( # Sometimes .pyc files turn up anyway even though we've # disabled their generation; e.g. from distribution package # installers. Silently skip it - pass + continue elif ext == ".sql": # A plain old .sql file, just read and execute it logger.info("Applying schema %s", relative_path) executescript(cur, absolute_path) + elif ext == specific_engine_extension and root_name.endswith(".sql"): + # A .sql file specific to our engine; just read and execute it + logger.info("Applying engine-specific schema %s", relative_path) + executescript(cur, absolute_path) + elif ext in specific_engine_extensions and root_name.endswith(".sql"): + # A .sql file for a different engine; skip it. + continue else: # Not a valid delta file. - logger.warn( - "Found directory entry that did not end in .py or" " .sql: %s", + logger.warning( + "Found directory entry that did not end in .py or .sql: %s", relative_path, ) continue @@ -290,7 +306,7 @@ def _upgrade_existing_database( # Mark as done. cur.execute( database_engine.convert_param_style( - "INSERT INTO applied_schema_deltas (version, file)" " VALUES (?,?)" + "INSERT INTO applied_schema_deltas (version, file) VALUES (?,?)" ), (v, relative_path), ) @@ -298,7 +314,7 @@ def _upgrade_existing_database( cur.execute("DELETE FROM schema_version") cur.execute( database_engine.convert_param_style( - "INSERT INTO schema_version (version, upgraded)" " VALUES (?,?)" + "INSERT INTO schema_version (version, upgraded) VALUES (?,?)" ), (v, True), ) -- cgit 1.5.1 From 1ecd1a6a5fe95df7a726362c143320fab09373c2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 09:46:13 +0100 Subject: Use engine-specific delta SQL files rather than delta written in Python. Signed-off-by: Olivier Wilkinson (reivilibre) --- .../storage/schema/delta/56/stats_separated2.py | 71 ---------------------- .../schema/delta/56/stats_separated2.sql.postgres | 24 ++++++++ .../schema/delta/56/stats_separated2.sql.sqlite | 27 ++++++++ 3 files changed, 51 insertions(+), 71 deletions(-) delete mode 100644 synapse/storage/schema/delta/56/stats_separated2.py create mode 100644 synapse/storage/schema/delta/56/stats_separated2.sql.postgres create mode 100644 synapse/storage/schema/delta/56/stats_separated2.sql.sqlite (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py deleted file mode 100644 index 942d240010..0000000000 --- a/synapse/storage/schema/delta/56/stats_separated2.py +++ /dev/null @@ -1,71 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2019 The Matrix.org Foundation C.I.C. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -# This schema delta will be run after 'stats_separated1.sql' due to lexicographic -# ordering. Note that it MUST be so. -from synapse.storage.engines import PostgresEngine, Sqlite3Engine - - -def _run_create_generic(stats_type, cursor, database_engine): - """ - Creates the pertinent (partial, if supported) indices for one kind of stats. - Args: - stats_type: "room" or "user" - the type of stats - cursor: Database Cursor - database_engine: Database Engine - """ - if isinstance(database_engine, Sqlite3Engine): - # even though SQLite >= 3.8 can support partial indices, we won't enable - # them, in case the SQLite database may be later used on another system. - # It's also the case that SQLite is only likely to be used in small - # deployments or testing, where the optimisations gained by use of a - # partial index are not a big concern. - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_not_complete - ON %s_stats_current (completed_delta_stream_id, %s_id); - """ - % (stats_type, stats_type, stats_type) - ) - elif isinstance(database_engine, PostgresEngine): - # This partial index helps us with finding incomplete stats rows - cursor.execute( - """ - CREATE INDEX IF NOT EXISTS %s_stats_not_complete - ON %s_stats_current (%s_id) - WHERE completed_delta_stream_id IS NULL; - """ - % (stats_type, stats_type, stats_type) - ) - else: - raise NotImplementedError("Unknown database engine.") - - -def run_create(cursor, database_engine): - """ - This function is called as part of the schema delta. - It will create indices - partial, if supported - for the new 'separated' - room & user statistics. - """ - _run_create_generic("room", cursor, database_engine) - _run_create_generic("user", cursor, database_engine) - - -def run_upgrade(cur, database_engine, config): - """ - This function is run on a database upgrade (of a non-empty database). - We have no need to do anything specific here. - """ - pass diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres new file mode 100644 index 0000000000..0519fcff79 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres @@ -0,0 +1,24 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- These partial indices helps us with finding incomplete stats row +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (room_id) + WHERE completed_delta_stream_id IS NULL; + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (user_id) + WHERE completed_delta_stream_id IS NULL; + diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite new file mode 100644 index 0000000000..181f4ec5b9 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite @@ -0,0 +1,27 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- even though SQLite >= 3.8 can support partial indices, we won't enable +-- them, in case the SQLite database may be later used on another system. +-- It's also the case that SQLite is only likely to be used in small +-- deployments or testing, where the optimisations gained by use of a +-- partial index are not a big concern. + +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (completed_delta_stream_id, room_id); + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (completed_delta_stream_id, user_id); + -- cgit 1.5.1 From 4b7bf2e413a6012c87e3d12f7bf4183b9638836b Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Tue, 27 Aug 2019 13:26:08 +0100 Subject: Apply suggestions from code review Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 5a7dfde926..62047839cc 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -134,7 +134,7 @@ class StatsStore(StateDeltasStore): def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): - """ + """Used to update values in the stats tables. Args: txn: Transaction @@ -322,7 +322,7 @@ class StatsStore(StateDeltasStore): elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id + absolute_fields["completed_delta_stream_id"] = complete_with_stream_id # first upsert the current table self._upsert_with_additive_relatives_txn( -- cgit 1.5.1 From 81c5289c839a6d6888cd849996572aa5c9e19fbd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:35:37 +0100 Subject: Clarify `_update_stats_delta_txn` by adding code comments and kwargs. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 34 ++++++++++++++++++---------------- 1 file changed, 18 insertions(+), 16 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 62047839cc..7959f5785b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -311,6 +311,9 @@ class StatsStore(StateDeltasStore): " for stats type %s" % (field, stats_type) ) + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. additive_relatives = { key: fields.get(key, 0) for key in abs_field_names @@ -321,34 +324,33 @@ class StatsStore(StateDeltasStore): absolute_fields = {} elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - # first upsert the current table + # first upsert the `_current` table self._upsert_with_additive_relatives_txn( - txn, - table + "_current", - {id_col: stats_id}, - absolute_fields, - additive_relatives, + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_fields, + additive_relatives=additive_relatives, ) if self.has_completed_background_updates(): # TODO want to check specifically for stats regenerator, not all # background updates… - # then upsert the historical table. - # we don't support absolute_fields for slice_field_names as it makes + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes # no sense. per_slice_additive_relatives = { key: fields.get(key, 0) for key in slice_field_names } self._upsert_copy_from_table_with_additive_relatives_txn( - txn, - table + "_historical", - {id_col: stats_id}, - {"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, - per_slice_additive_relatives, - table + "_current", - abs_field_names, + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_keyvalues={"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, additional_where=" AND completed_delta_stream_id IS NOT NULL", ) -- cgit 1.5.1 From 544ba2c2e9ad8d3d9aa9041e3f724e5c96a15390 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:40:00 +0100 Subject: Apply minor suggestions from review Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 7959f5785b..c950ab9953 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -190,8 +190,7 @@ class StatsStore(StateDeltasStore): else: for (key, val) in additive_relatives.items(): current_row[key] += val - for (key, val) in absolutes.items(): - current_row[key] = val + current_row.update(absolutes) self._simple_update_one_txn(txn, table, keyvalues, current_row) def _upsert_copy_from_table_with_additive_relatives_txn( @@ -223,8 +222,8 @@ class StatsStore(StateDeltasStore): ins_columns = chain( keyvalues, copy_columns, - additive_relatives.keys(), - extra_dst_keyvalues.keys(), + additive_relatives, + extra_dst_keyvalues, ) sel_exprs = chain( keyvalues, -- cgit 1.5.1 From a6c102009e219d93b512f682e5f799037536e3ee Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:41:48 +0100 Subject: Lock tables in upsert fall-backs. Should not be too much of a performance concern as this code won't be hit on Postgres, which large deployments should be using. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 2 ++ 1 file changed, 2 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index c950ab9953..0f3aa6a801 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -180,6 +180,7 @@ class StatsStore(StateDeltasStore): txn.execute(sql, qargs) else: + self.database_engine.lock_table(txn, table) retcols = chain(absolutes.keys(), additive_relatives.keys()) current_row = self._simple_select_one_txn( txn, table, keyvalues, retcols, allow_none=True @@ -260,6 +261,7 @@ class StatsStore(StateDeltasStore): qargs = chain(additive_relatives.values(), keyvalues.values()) txn.execute(sql, qargs) else: + self.database_engine.lock_table(txn, into_table) src_row = self._simple_select_one_txn( txn, src_table, keyvalues, copy_columns ) -- cgit 1.5.1 From 736ac58e1191fc28abaef5c2cab86901b85ce192 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:42:33 +0100 Subject: Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 0f3aa6a801..650c0050cb 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -221,10 +221,7 @@ class StatsStore(StateDeltasStore): """ if self.database_engine.can_native_upsert: ins_columns = chain( - keyvalues, - copy_columns, - additive_relatives, - extra_dst_keyvalues, + keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues ) sel_exprs = chain( keyvalues, @@ -349,7 +346,10 @@ class StatsStore(StateDeltasStore): txn=txn, into_table=table + "_historical", keyvalues={id_col: stats_id}, - extra_dst_keyvalues={"end_ts": end_ts, "bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={ + "end_ts": end_ts, + "bucket_size": self.stats_bucket_size, + }, additive_relatives=per_slice_additive_relatives, src_table=table + "_current", copy_columns=abs_field_names, -- cgit 1.5.1 From 09cbc3a8e9c0f494fb272cb3761024a851b3e3f8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 13:50:58 +0100 Subject: Switch to milliseconds in room/user stats for consistency. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/config/stats.py | 13 +++++-------- synapse/storage/schema/delta/56/stats_separated1.sql | 2 +- synapse/storage/stats.py | 3 ++- 3 files changed, 8 insertions(+), 10 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b518a3ed9c..b18ddbd1fa 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -27,19 +27,16 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 + self.stats_bucket_size = 86400 * 1000 self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = ( - self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + self.stats_bucket_size = self.parse_duration( + stats_config.get("bucket_size", "1d") ) - self.stats_retention = ( - self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) - / 1000 + self.stats_retention = self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) ) def generate_config_section(self, config_dir_path, server_name, **kwargs): diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 95daf8f53b..045b5ca013 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -84,7 +84,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( -- represents HISTORICAL room statistics for a room CREATE TABLE IF NOT EXISTS room_stats_historical ( room_id TEXT NOT NULL, - -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds). + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms). -- Note that end_ts is quantised. end_ts BIGINT NOT NULL, bucket_size INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 650c0050cb..35fca1dc7b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -64,7 +64,8 @@ class StatsStore(StateDeltasStore): Quantises a timestamp to be a multiple of the bucket size. Args: - ts (int): the timestamp to quantise, in seconds since the Unix Epoch + ts (int): the timestamp to quantise, in milliseconds since the Unix + Epoch Returns: int: a timestamp which -- cgit 1.5.1 From 491eaf0808fb4d3bb6d9b5f5f26c77e82e9333ec Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:19:38 +0100 Subject: Remove obsolete `OldCollectionRequired` as old collection is obsolete. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 6 ------ 1 file changed, 6 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 35fca1dc7b..824e57bad7 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -40,12 +40,6 @@ PER_SLICE_FIELDS = {"room": (), "user": ()} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -class OldCollectionRequired(Exception): - """ Signal that we need to collect old stats rows and retry. """ - - pass - - class StatsStore(StateDeltasStore): def __init__(self, db_conn, hs): super(StatsStore, self).__init__(db_conn, hs) -- cgit 1.5.1 From 11c4e506bd6254fcb9551729e70b7ade80b127e4 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:24:25 +0100 Subject: Rename `room_state` table to `room_stats_state` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/events.py | 2 +- synapse/storage/schema/delta/56/stats_separated1.sql | 3 +++ synapse/storage/stats.py | 2 +- 3 files changed, 5 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 5a95c36a8b..28fdba5fb2 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2270,7 +2270,7 @@ class EventsStore( "room_aliases", "room_depth", "room_memberships", - "room_state", + "room_stats_state", "room_stats", "room_stats_earliest_token", "rooms", diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 045b5ca013..52fb09c0e6 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -137,3 +137,6 @@ CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical -- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that -- out for us. (We would want it to review stats for a particular user.) + +-- Also rename room_state to room_stats_state to make its ownership clear. +ALTER TABLE room_state RENAME TO room_stats_state; diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 824e57bad7..fce0fb5a56 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -92,7 +92,7 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", -- cgit 1.5.1 From 62b1250629f0aef9ef74021c86ae53a2d27aad09 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:24:56 +0100 Subject: Update `_purge_room_txn` to take account of separated stats tables Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/events.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 28fdba5fb2..5527dd208e 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2271,7 +2271,8 @@ class EventsStore( "room_depth", "room_memberships", "room_stats_state", - "room_stats", + "room_stats_current", + "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", -- cgit 1.5.1 From 07c267c51676fe1df80993da6700f35e69fe6761 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:34:05 +0100 Subject: For user stats, handle other membership transitions properly. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 5 ++--- synapse/storage/stats.py | 5 +---- 2 files changed, 3 insertions(+), 7 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index f065d88a7d..2f7c108181 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -219,9 +219,8 @@ class StatsHandler(StateDeltasHandler): user_id = state_key if self.is_mine_id(user_id): # this accounts for transitions like leave → ban and so on. - has_changed_joinedness = ( - (prev_membership == Membership.JOIN) != - (membership == Membership.JOIN) + has_changed_joinedness = (prev_membership == Membership.JOIN) != ( + membership == Membership.JOIN ) if has_changed_joinedness: diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 6832ec6b7f..f86e9bd269 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,12 +15,9 @@ # limitations under the License. import logging -from threading import Lock - -from twisted.internet import defer from itertools import chain +from threading import Lock -from synapse.storage.state_deltas import StateDeltasStore from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached -- cgit 1.5.1 From 44d3c2e80b03bf7168ef23f0a3f080013d8800b0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:34:20 +0100 Subject: Invalidate `get_earliest_token_for_stats` cache as required. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 1 + 1 file changed, 1 insertion(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index f86e9bd269..d345b2cb32 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -433,6 +433,7 @@ class StatsStore(StateDeltasStore): elif complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id + self.get_earliest_token_for_stats.invalidate(stats_type, stats_id) # first upsert the `_current` table self._upsert_with_additive_relatives_txn( -- cgit 1.5.1 From 10c1a233f91bd3fcc505e5521cadfc6ed8daa301 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:49:12 +0100 Subject: Fix logic error. `absolute_fields` being None shouldn't preclude completion of a current stats row. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index d345b2cb32..ede5002fca 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -430,7 +430,8 @@ class StatsStore(StateDeltasStore): if absolute_fields is None: absolute_fields = {} - elif complete_with_stream_id is not None: + + if complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id self.get_earliest_token_for_stats.invalidate(stats_type, stats_id) -- cgit 1.5.1 From 324f21b216e84e8b2b9287bda4f2ad5565ac60c6 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:53:45 +0100 Subject: Fix logic error. `absolute_fields` being None shouldn't preclude completion of a current stats row. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index fce0fb5a56..35d8bdb7b7 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -315,7 +315,8 @@ class StatsStore(StateDeltasStore): if absolute_fields is None: absolute_fields = {} - elif complete_with_stream_id is not None: + + if complete_with_stream_id is not None: absolute_fields = absolute_fields.copy() absolute_fields["completed_delta_stream_id"] = complete_with_stream_id -- cgit 1.5.1 From 064143c1308cf6354554702d5041ec4bd3ac8ff8 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 14:59:39 +0100 Subject: Use `DeferredLock` instead of `threading.Lock` Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 5 +++-- synapse/storage/stats.py | 5 +++-- 2 files changed, 6 insertions(+), 4 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 2f7c108181..8e1bf8b5d5 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -66,12 +66,13 @@ class StatsHandler(StateDeltasHandler): @defer.inlineCallbacks def process(): + yield lock.acquire() try: yield self._unsafe_process() finally: - lock.release() + yield lock.release() - if lock.acquire(blocking=False): + if not lock.locked: # we only want to run this process one-at-a-time, # and also, if the initial background updater wants us to keep out, # we should respect that. diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index ede5002fca..c9687c29d2 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -16,7 +16,8 @@ import logging from itertools import chain -from threading import Lock + +from twisted.internet.defer import DeferredLock from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -57,7 +58,7 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.stats_delta_processing_lock = Lock() + self.stats_delta_processing_lock = DeferredLock() self.register_noop_background_update("populate_stats_createtables") self.register_noop_background_update("populate_stats_process_rooms") -- cgit 1.5.1 From 1af7866562c3a09815378635b1d305b5824bc7c0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Tue, 27 Aug 2019 16:16:53 +0100 Subject: Clean up code with improved naming and hoist around functions. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 170 +++++++++++++++++++++++++---------------------- 1 file changed, 91 insertions(+), 79 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 35d8bdb7b7..4b2364746c 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -22,6 +22,9 @@ from synapse.storage.state_deltas import StateDeltasStore logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) +# You can think of these as Prometheus Gauges. +# You can draw these stats on a line graph. +# Example: number of users in a room ABSOLUTE_STATS_FIELDS = { "room": ( "current_state_events", @@ -35,6 +38,8 @@ ABSOLUTE_STATS_FIELDS = { } # these fields are per-timeslice and so should be reset to 0 upon a new slice +# You can draw these stats on a histogram. +# Example: number of events sent locally during a time slice PER_SLICE_FIELDS = {"room": (), "user": ()} TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} @@ -126,6 +131,92 @@ class StatsStore(StateDeltasStore): complete_with_stream_id=complete_with_stream_id, ) + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_field_overrides=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Current stats values + (i.e. not deltas) of absolute fields. + Does not work with per-slice fields. + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_field_overrides.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) + ) + + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. + + # This calculates the deltas (`field = field + ?` values) + # for absolute fields, + # * defaulting to 0 if not specified + # (required for the INSERT part of upserting to work) + # * omitting overrides specified in `absolute_field_overrides` + deltas_of_absolute_fields = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_field_overrides + } + + if absolute_field_overrides is None: + absolute_field_overrides = {} + + if complete_with_stream_id is not None: + absolute_field_overrides = absolute_field_overrides.copy() + absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + + # first upsert the `_current` table + self._upsert_with_additive_relatives_txn( + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_field_overrides, + additive_relatives=deltas_of_absolute_fields, + ) + + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_keyvalues={ + "end_ts": end_ts, + "bucket_size": self.stats_bucket_size, + }, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", + ) + def _upsert_with_additive_relatives_txn( self, txn, table, keyvalues, absolutes, additive_relatives ): @@ -272,82 +363,3 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) - - def _update_stats_delta_txn( - self, - txn, - ts, - stats_type, - stats_id, - fields, - complete_with_stream_id=None, - absolute_fields=None, - ): - """ - See L{update_stats_delta} - Additional Args: - absolute_fields (dict[str, int]): Absolute current stats values - (i.e. not deltas). Does not work with per-slice fields. - """ - table, id_col = TYPE_TO_TABLE[stats_type] - - quantised_ts = self.quantise_stats_time(int(ts)) - end_ts = quantised_ts + self.stats_bucket_size - - abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] - slice_field_names = PER_SLICE_FIELDS[stats_type] - for field in chain(fields.keys(), absolute_fields.keys()): - if field not in abs_field_names and field not in slice_field_names: - # guard against potential SQL injection dodginess - raise ValueError( - "%s is not a recognised field" - " for stats type %s" % (field, stats_type) - ) - - # only absolute stats fields are tracked in the `_current` stats tables, - # so those are the only ones that we process deltas for when - # we upsert against the `_current` table. - additive_relatives = { - key: fields.get(key, 0) - for key in abs_field_names - if key not in absolute_fields - } - - if absolute_fields is None: - absolute_fields = {} - - if complete_with_stream_id is not None: - absolute_fields = absolute_fields.copy() - absolute_fields["completed_delta_stream_id"] = complete_with_stream_id - - # first upsert the `_current` table - self._upsert_with_additive_relatives_txn( - txn=txn, - table=table + "_current", - keyvalues={id_col: stats_id}, - absolutes=absolute_fields, - additive_relatives=additive_relatives, - ) - - if self.has_completed_background_updates(): - # TODO want to check specifically for stats regenerator, not all - # background updates… - # then upsert the `_historical` table. - # we don't support absolute_fields for per-slice fields as it makes - # no sense. - per_slice_additive_relatives = { - key: fields.get(key, 0) for key in slice_field_names - } - self._upsert_copy_from_table_with_additive_relatives_txn( - txn=txn, - into_table=table + "_historical", - keyvalues={id_col: stats_id}, - extra_dst_keyvalues={ - "end_ts": end_ts, - "bucket_size": self.stats_bucket_size, - }, - additive_relatives=per_slice_additive_relatives, - src_table=table + "_current", - copy_columns=abs_field_names, - additional_where=" AND completed_delta_stream_id IS NOT NULL", - ) -- cgit 1.5.1 From b9f1adc370fa06e5803907ee563fd743821e271d Mon Sep 17 00:00:00 2001 From: reivilibre <38398653+reivilibre@users.noreply.github.com> Date: Wed, 28 Aug 2019 09:01:25 +0100 Subject: Update synapse/storage/stats.py Co-Authored-By: Erik Johnston --- synapse/storage/stats.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 4b2364746c..a4493556f3 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -144,7 +144,7 @@ class StatsStore(StateDeltasStore): """ See L{update_stats_delta} Additional Args: - absolute_fields (dict[str, int]): Current stats values + absolute_field_overrides (dict[str, int]): Current stats values (i.e. not deltas) of absolute fields. Does not work with per-slice fields. """ -- cgit 1.5.1 From a344ad3d3fdb6575338a056e4f36afae1ee4f9a0 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 09:33:03 +0100 Subject: Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index a4493556f3..4106f161f0 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -183,7 +183,9 @@ class StatsStore(StateDeltasStore): if complete_with_stream_id is not None: absolute_field_overrides = absolute_field_overrides.copy() - absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id + absolute_field_overrides[ + "completed_delta_stream_id" + ] = complete_with_stream_id # first upsert the `_current` table self._upsert_with_additive_relatives_txn( -- cgit 1.5.1 From bc2c284dbe02ef283ab525f14febd7d0998ba552 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 14:28:44 +0100 Subject: Add `total_event_bytes` to room statistics schema. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/schema/delta/56/stats_separated1.sql | 2 ++ synapse/storage/stats.py | 1 + 2 files changed, 3 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql index 52fb09c0e6..6d4648c0d7 100644 --- a/synapse/storage/schema/delta/56/stats_separated1.sql +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -69,6 +69,7 @@ CREATE TABLE IF NOT EXISTS room_stats_current ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, @@ -91,6 +92,7 @@ CREATE TABLE IF NOT EXISTS room_stats_historical ( current_state_events INT NOT NULL, total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, joined_members INT NOT NULL, invited_members INT NOT NULL, left_members INT NOT NULL, diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3df57b52ea..8c99a125a9 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -36,6 +36,7 @@ ABSOLUTE_STATS_FIELDS = { "left_members", "banned_members", "total_events", + "total_event_bytes", ), "user": ("public_rooms", "private_rooms"), } -- cgit 1.5.1 From a13ad21abf029901f5d74dd683e536d81f3150c3 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 14:49:58 +0100 Subject: Add incremental counting for rooms' total events and total event bytes. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 106 +++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 106 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 8c99a125a9..8cfa694d31 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -19,6 +19,7 @@ from itertools import chain from twisted.internet.defer import DeferredLock +from synapse.storage import PostgresEngine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -476,3 +477,108 @@ class StatsStore(StateDeltasStore): for (key, val) in additive_relatives.items(): src_row[key] = dest_current_row[key] + val self._simple_update_txn(txn, into_table, keyvalues, src_row) + + def incremental_update_room_total_events_and_bytes(self, in_positions): + """ + Counts the number of events and total event bytes per-room and then adds + these to the respective total_events and total_event_bytes room counts. + + Args: + in_positions (dict): Positions, + as retrieved from L{get_stats_positions}. + + Returns (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + """ + + def incremental_update_total_events_and_bytes_txn(txn): + positions = in_positions.copy() + + max_pos = self.get_room_max_stream_ordering() + min_pos = self.get_room_min_stream_ordering() + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=positions["total_events_max_stream_ordering"], + high_pos=max_pos, + ) + + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=min_pos, + high_pos=positions["total_events_min_stream_ordering"], + ) + + if ( + positions["total_events_max_stream_ordering"] != max_pos + or positions["total_events_min_stream_ordering"] != min_pos + ): + positions["total_events_max_stream_ordering"] = max_pos + positions["total_events_min_stream_ordering"] = min_pos + + self._update_stats_positions_txn(txn, positions) + + return positions + + return self.runInteraction( + "stats_incremental_total_events_and_bytes", + incremental_update_total_events_and_bytes_txn, + ) + + def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos): + """ + Updates the total_events and total_event_bytes counts for rooms, + in a range of stream_orderings. + + Inclusivity of low_pos and high_pos is dependent upon their signs. + This makes it intuitive to use this function for both backfilled + and non-backfilled events. + + Examples: + (low, high) → (kind) + (3, 7) → 3 < … <= 7 (normal-filled; low already processed before) + (-4, -2) → -4 <= … < -2 (backfilled; high already processed before) + (-7, 7) → -7 <= … <= 7 (both) + + Args: + txn: Database transaction. + low_pos: Low stream ordering + high_pos: High stream ordering + """ + + if low_pos >= high_pos: + # nothing to do here. + return + + now = self.hs.clock.time_msec() + + # we choose comparators based on the signs + low_comparator = "<=" if low_pos < 0 else "<" + high_comparator = "<" if high_pos < 0 else "<=" + + if isinstance(self.database_engine, PostgresEngine): + new_bytes_expression = "OCTET_LENGTH(json)" + else: + new_bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE ? %s stream_ordering AND stream_ordering %s ? + GROUP BY room_id + """ % ( + low_comparator, + high_comparator, + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos)) + + for room_id, new_events, new_bytes in txn.fetchall(): + self._update_stats_delta_txn( + txn, + now, + "room", + room_id, + {"total_events": new_events, "total_event_bytes": new_bytes}, + ) -- cgit 1.5.1 From b06f2947e4a79220564da94f2ca3feb988f033ed Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 15:46:33 +0100 Subject: Track new users in user statistics. This makes the rows 'completed' so that the stats regenerator need not touch them. --- synapse/storage/registration.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) (limited to 'synapse/storage') diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 9027b917c1..672eebbe56 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -868,6 +868,17 @@ class RegistrationStore( (user_id_obj.localpart, create_profile_with_displayname), ) + if self.hs.config.stats_enabled: + # we create a new completed user statistics row + + # we don't strictly need current_token since this user really can't + # have any state deltas before now (as it is a new user), but still, + # we include it for completeness. + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + self._update_stats_delta_txn( + txn, now, "user", user_id, {}, complete_with_stream_id=current_token + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) @@ -1139,6 +1150,7 @@ class RegistrationStore( deferred str|None: A str representing a link to redirect the user to if there is one. """ + # Insert everything into a transaction in order to run atomically def validate_threepid_session_txn(txn): row = self._simple_select_one_txn( -- cgit 1.5.1 From 73d552a05d68b2895b796ca95def75778cd00cc2 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 16:14:00 +0100 Subject: Hoist up None check to prevent trying to iterate over NoneType.keys() --- synapse/storage/stats.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3df57b52ea..3433eef288 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -258,6 +258,10 @@ class StatsStore(StateDeltasStore): (i.e. not deltas) of absolute fields. Does not work with per-slice fields. """ + + if absolute_field_overrides is None: + absolute_field_overrides = {} + table, id_col = TYPE_TO_TABLE[stats_type] quantised_ts = self.quantise_stats_time(int(ts)) @@ -288,9 +292,6 @@ class StatsStore(StateDeltasStore): if key not in absolute_field_overrides } - if absolute_field_overrides is None: - absolute_field_overrides = {} - if complete_with_stream_id is not None: absolute_field_overrides = absolute_field_overrides.copy() absolute_field_overrides[ -- cgit 1.5.1 From 3b69bf3e74a5b9247f306c6c1e74be09f79822bd Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Wed, 28 Aug 2019 20:18:45 +0100 Subject: Upsert fixes --- synapse/storage/stats.py | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 3433eef288..358ef3fd73 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -320,9 +320,10 @@ class StatsStore(StateDeltasStore): txn=txn, into_table=table + "_historical", keyvalues={id_col: stats_id}, + extra_dst_insvalues={"bucket_size": self.stats_bucket_size, }, extra_dst_keyvalues={ "end_ts": end_ts, - "bucket_size": self.stats_bucket_size, + }, additive_relatives=per_slice_additive_relatives, src_table=table + "_current", @@ -356,7 +357,7 @@ class StatsStore(StateDeltasStore): ] insert_cols = [] - qargs = [table] + qargs = [] for (key, val) in chain( keyvalues.items(), absolutes.items(), additive_relatives.items() @@ -367,13 +368,14 @@ class StatsStore(StateDeltasStore): sql = """ INSERT INTO %(table)s (%(insert_cols_cs)s) VALUES (%(insert_vals_qs)s) - ON CONFLICT DO UPDATE SET %(updates)s + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s """ % { "table": table, "insert_cols_cs": ", ".join(insert_cols), "insert_vals_qs": ", ".join( ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) ), + "key_columns": ", ".join(keyvalues), "updates": ", ".join(chain(absolute_updates, relative_updates)), } @@ -399,6 +401,7 @@ class StatsStore(StateDeltasStore): into_table, keyvalues, extra_dst_keyvalues, + extra_dst_insvalues, additive_relatives, src_table, copy_columns, @@ -411,6 +414,8 @@ class StatsStore(StateDeltasStore): keyvalues (dict[str, any]): Row-identifying key values extra_dst_keyvalues (dict[str, any]): Additional keyvalues for `into_table`. + extra_dst_insvalues (dict[str, any]): Additional values to insert + on new row creation for `into_table`. additive_relatives (dict[str, any]): Fields that will be added onto if existing row present. (Must be disjoint from copy_columns.) src_table (str): The source table to copy from @@ -420,18 +425,18 @@ class StatsStore(StateDeltasStore): """ if self.database_engine.can_native_upsert: ins_columns = chain( - keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues + keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues, extra_dst_insvalues ) sel_exprs = chain( keyvalues, copy_columns, - ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)), + ("?" for _ in chain(additive_relatives, extra_dst_keyvalues, extra_dst_insvalues)), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) sets_ar = ( - "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in additive_relatives ) sql = """ @@ -454,7 +459,8 @@ class StatsStore(StateDeltasStore): "additional_where": additional_where, } - qargs = chain(additive_relatives.values(), keyvalues.values()) + qargs = list(chain(additive_relatives.values(), extra_dst_keyvalues.values(), extra_dst_insvalues.values(), + keyvalues.values())) txn.execute(sql, qargs) else: self.database_engine.lock_table(txn, into_table) @@ -470,7 +476,8 @@ class StatsStore(StateDeltasStore): ) if dest_current_row is None: - merged_dict = {**keyvalues, **src_row, **additive_relatives} + merged_dict = {**keyvalues, **extra_dst_keyvalues, **extra_dst_insvalues, **src_row, + **additive_relatives} self._simple_insert_txn(txn, into_table, merged_dict) else: for (key, val) in additive_relatives.items(): -- cgit 1.5.1 From 4444b9a1b3444fec6a40ec42eadad5ec51ea1eee Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 29 Aug 2019 08:08:41 +0100 Subject: Code formatting (Black) Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 42 ++++++++++++++++++++++++++++++------------ 1 file changed, 30 insertions(+), 12 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 358ef3fd73..8bbf1b00d1 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -320,11 +320,8 @@ class StatsStore(StateDeltasStore): txn=txn, into_table=table + "_historical", keyvalues={id_col: stats_id}, - extra_dst_insvalues={"bucket_size": self.stats_bucket_size, }, - extra_dst_keyvalues={ - "end_ts": end_ts, - - }, + extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={"end_ts": end_ts}, additive_relatives=per_slice_additive_relatives, src_table=table + "_current", copy_columns=abs_field_names, @@ -425,18 +422,28 @@ class StatsStore(StateDeltasStore): """ if self.database_engine.can_native_upsert: ins_columns = chain( - keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues, extra_dst_insvalues + keyvalues, + copy_columns, + additive_relatives, + extra_dst_keyvalues, + extra_dst_insvalues, ) sel_exprs = chain( keyvalues, copy_columns, - ("?" for _ in chain(additive_relatives, extra_dst_keyvalues, extra_dst_insvalues)), + ( + "?" + for _ in chain( + additive_relatives, extra_dst_keyvalues, extra_dst_insvalues + ) + ), ) keyvalues_where = ("%s = ?" % f for f in keyvalues) sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) sets_ar = ( - "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in additive_relatives + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) + for f in additive_relatives ) sql = """ @@ -459,8 +466,14 @@ class StatsStore(StateDeltasStore): "additional_where": additional_where, } - qargs = list(chain(additive_relatives.values(), extra_dst_keyvalues.values(), extra_dst_insvalues.values(), - keyvalues.values())) + qargs = list( + chain( + additive_relatives.values(), + extra_dst_keyvalues.values(), + extra_dst_insvalues.values(), + keyvalues.values(), + ) + ) txn.execute(sql, qargs) else: self.database_engine.lock_table(txn, into_table) @@ -476,8 +489,13 @@ class StatsStore(StateDeltasStore): ) if dest_current_row is None: - merged_dict = {**keyvalues, **extra_dst_keyvalues, **extra_dst_insvalues, **src_row, - **additive_relatives} + merged_dict = { + **keyvalues, + **extra_dst_keyvalues, + **extra_dst_insvalues, + **src_row, + **additive_relatives, + } self._simple_insert_txn(txn, into_table, merged_dict) else: for (key, val) in additive_relatives.items(): -- cgit 1.5.1 From 39dbee2a3e129415e2c20aa9a4f5f866e723fe41 Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Thu, 29 Aug 2019 14:10:21 +0100 Subject: Count total_events and total_event_bytes within the loop. In this case, we still update these counts if we get stuck in the loop because the server is busy. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/handlers/stats.py | 22 +++++++++++----------- synapse/storage/stats.py | 13 +++++++++---- 2 files changed, 20 insertions(+), 15 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index e9b1646e0e..f44adfc07b 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -91,31 +91,31 @@ class StatsHandler(StateDeltasHandler): return None # Loop round handling deltas until we're up to date - with Measure(self.clock, "stats_delta"): - while True: + + while True: + with Measure(self.clock, "stats_delta"): deltas = yield self.store.get_current_state_deltas( self.pos["state_delta_stream_id"] ) - if not deltas: - break logger.debug("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + yield self.store.update_stats_positions(self.pos) event_processing_positions.labels("stats").set( self.pos["state_delta_stream_id"] ) - if self.pos is not None: - yield self.store.update_stats_positions(self.pos) + # Then count deltas for total_events and total_event_bytes. + with Measure(self.clock, "stats_total_events_and_bytes"): + self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes( + self.pos + ) - # Then count deltas for total_events and total_event_bytes. - with Measure(self.clock, "stats_total_events_and_bytes"): - self.pos = yield self.store.incremental_update_total_events_and_bytes( - self.pos - ) + if not deltas and not had_counts: + break @defer.inlineCallbacks def _handle_deltas(self, deltas): diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 8cfa694d31..9d6c3027d5 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -487,9 +487,12 @@ class StatsStore(StateDeltasStore): in_positions (dict): Positions, as retrieved from L{get_stats_positions}. - Returns (dict): - The new positions. Note that this is for reference only – - the new positions WILL be committed by this function. + Returns (Deferred[tuple[dict, bool]]): + First element (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + Second element (bool): + true iff there was a change to the positions, false otherwise """ def incremental_update_total_events_and_bytes_txn(txn): @@ -518,7 +521,9 @@ class StatsStore(StateDeltasStore): self._update_stats_positions_txn(txn, positions) - return positions + return positions, True + else: + return positions, False return self.runInteraction( "stats_incremental_total_events_and_bytes", -- cgit 1.5.1 From 757205d7186132ae6d1ae189249e6218dd11e32d Mon Sep 17 00:00:00 2001 From: "Olivier Wilkinson (reivilibre)" Date: Fri, 30 Aug 2019 14:26:02 +0100 Subject: Convert `chain` to `list` as `chain` is only once iterable. Signed-off-by: Olivier Wilkinson (reivilibre) --- synapse/storage/stats.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'synapse/storage') diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index 63c8c2840a..20ce3664a0 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -381,7 +381,7 @@ class StatsStore(StateDeltasStore): txn.execute(sql, qargs) else: self.database_engine.lock_table(txn, table) - retcols = chain(absolutes.keys(), additive_relatives.keys()) + retcols = list(chain(absolutes.keys(), additive_relatives.keys())) current_row = self._simple_select_one_txn( txn, table, keyvalues, retcols, allow_none=True ) @@ -486,7 +486,7 @@ class StatsStore(StateDeltasStore): txn, into_table, keyvalues, - chain(additive_relatives.keys(), copy_columns), + retcols=list(chain(additive_relatives.keys(), copy_columns)), allow_none=True, ) -- cgit 1.5.1