diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/stats.py | 198 | ||||
-rw-r--r-- | synapse/storage/registration.py | 11 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated1.sql | 168 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated2.py | 87 | ||||
-rw-r--r-- | synapse/storage/state_deltas.py | 2 | ||||
-rw-r--r-- | synapse/storage/stats.py | 1057 |
6 files changed, 1241 insertions, 282 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 4449da6669..dee19cdcec 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler): # The current position in the current_state_delta stream self.pos = None - # Guard to ensure we only process deltas one at a time - self._is_processing = False - if hs.config.stats_enabled: self.notifier.add_replication_callback(self.notify_new_event) @@ -65,43 +62,58 @@ class StatsHandler(StateDeltasHandler): if not self.hs.config.stats_enabled: return - if self._is_processing: - return + lock = self.store.stats_delta_processing_lock @defer.inlineCallbacks def process(): try: yield self._unsafe_process() finally: - self._is_processing = False + lock.release() - self._is_processing = True - run_as_background_process("stats.notify_new_event", process) + if lock.acquire(blocking=False): + # we only want to run this process one-at-a-time, + # and also, if the initial background updater wants us to keep out, + # we should respect that. + try: + run_as_background_process("stats.notify_new_event", process) + except: # noqa: E722 – re-raised so fine + lock.release() + raise @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB - if self.pos is None: - self.pos = yield self.store.get_stats_stream_pos() + if self.pos is None or None in self.pos.values(): + self.pos = yield self.store.get_stats_positions() - # If still None then the initial background update hasn't happened yet - if self.pos is None: + # If still None then the initial background update hasn't started yet + if self.pos is None or None in self.pos.values(): return None # Loop round handling deltas until we're up to date - while True: - with Measure(self.clock, "stats_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) + with Measure(self.clock, "stats_delta"): + while True: + deltas = yield self.store.get_current_state_deltas( + self.pos["state_delta_stream_id"] + ) if not deltas: - return + break logger.info("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) - self.pos = deltas[-1]["stream_id"] - yield self.store.update_stats_stream_pos(self.pos) + self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + + event_processing_positions.labels("stats").set( + self.pos["state_delta_stream_id"] + ) + + if self.pos is not None: + yield self.store.update_stats_positions(self.pos) - event_processing_positions.labels("stats").set(self.pos) + with Measure(self.clock, "stats_total_events"): + self.pos = yield self.store.incremental_update_total_events(self.pos) @defer.inlineCallbacks def _handle_deltas(self, deltas): @@ -119,7 +131,7 @@ class StatsHandler(StateDeltasHandler): logger.debug("Handling: %r %r, %s", typ, state_key, event_id) - token = yield self.store.get_earliest_token_for_room_stats(room_id) + token = yield self.store.get_earliest_token_for_stats("room", room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. @@ -144,44 +156,56 @@ class StatsHandler(StateDeltasHandler): # We use stream_pos here rather than fetch by event_id as event_id # may be None now = yield self.store.get_received_ts_by_stream_pos(stream_pos) + now = int(now) // 1000 - # quantise time to the nearest bucket - now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size + room_stats_delta = {} + room_stats_complete = False + + if prev_event_id is None: + # this state event doesn't overwrite another, + # so it is a new effective/current state event + room_stats_delta["current_state_events"] = ( + room_stats_delta.get("current_state_events", 0) + 1 + ) if typ == EventTypes.Member: # we could use _get_key_change here but it's a bit inefficient # given we're not testing for a specific result; might as well # just grab the prev_membership and membership strings and # compare them. - prev_event_content = {} + # We take None rather than leave as a previous membership + # in the absence of a previous event because we do not want to + # reduce the leave count when a new-to-the-room user joins. + prev_membership = None if prev_event_id is not None: prev_event = yield self.store.get_event( prev_event_id, allow_none=True ) if prev_event: prev_event_content = prev_event.content + prev_membership = prev_event_content.get( + "membership", Membership.LEAVE + ) membership = event_content.get("membership", Membership.LEAVE) - prev_membership = prev_event_content.get("membership", Membership.LEAVE) - if prev_membership == membership: - continue - - if prev_membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", -1 + if prev_membership is None: + logger.debug("No previous membership for this user.") + elif prev_membership == Membership.JOIN: + room_stats_delta["joined_members"] = ( + room_stats_delta.get("joined_members", 0) - 1 ) elif prev_membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", -1 + room_stats_delta["invited_members"] = ( + room_stats_delta.get("invited_members", 0) - 1 ) elif prev_membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", -1 + room_stats_delta["left_members"] = ( + room_stats_delta.get("left_members", 0) - 1 ) elif prev_membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", -1 + room_stats_delta["banned_members"] = ( + room_stats_delta.get("banned_members", 0) - 1 ) else: err = "%s is not a valid prev_membership" % (repr(prev_membership),) @@ -189,20 +213,20 @@ class StatsHandler(StateDeltasHandler): raise ValueError(err) if membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", +1 + room_stats_delta["joined_members"] = ( + room_stats_delta.get("joined_members", 0) + 1 ) elif membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", +1 + room_stats_delta["invited_members"] = ( + room_stats_delta.get("invited_members", 0) + 1 ) elif membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", +1 + room_stats_delta["left_members"] = ( + room_stats_delta.get("left_members", 0) + 1 ) elif membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", +1 + room_stats_delta["banned_members"] = ( + room_stats_delta.get("banned_members", 0) + 1 ) else: err = "%s is not a valid membership" % (repr(membership),) @@ -210,26 +234,19 @@ class StatsHandler(StateDeltasHandler): raise ValueError(err) user_id = state_key - if self.is_mine_id(user_id): + if self.is_mine_id(user_id) and membership in ( + Membership.JOIN, + Membership.LEAVE, + ): # update user_stats as it's one of our users public = yield self._is_public_room(room_id) - if membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - -1, - ) - elif membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - +1, - ) + field = "public_rooms" if public else "private_rooms" + delta = +1 if membership == Membership.JOIN else -1 + + yield self.store.update_stats_delta( + now, "user", user_id, {field: delta} + ) elif typ == EventTypes.Create: # Newly created room. Add it with all blank portions. @@ -246,28 +263,46 @@ class StatsHandler(StateDeltasHandler): }, ) + room_stats_complete = True + elif typ == EventTypes.JoinRules: + old_room_state = yield self.store.get_room_state(room_id) yield self.store.update_room_state( room_id, {"join_rules": event_content.get("join_rule")} ) - is_public = yield self._get_key_change( - prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + # whether the room would be public anyway, + # because of history_visibility + other_field_gives_publicity = ( + old_room_state["history_visibility"] == "world_readable" ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) elif typ == EventTypes.RoomHistoryVisibility: + old_room_state = yield self.store.get_room_state(room_id) yield self.store.update_room_state( room_id, {"history_visibility": event_content.get("history_visibility")}, ) - is_public = yield self._get_key_change( - prev_event_id, event_id, "history_visibility", "world_readable" + # whether the room would be public anyway, + # because of join_rule + other_field_gives_publicity = ( + old_room_state["join_rules"] == JoinRules.PUBLIC ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats(now, room_id, is_public) elif typ == EventTypes.Encryption: yield self.store.update_room_state( @@ -290,6 +325,20 @@ class StatsHandler(StateDeltasHandler): room_id, {"canonical_alias": event_content.get("alias")} ) + if room_stats_complete: + yield self.store.update_stats_delta( + now, + "room", + room_id, + room_stats_delta, + complete_with_stream_id=stream_id, + ) + + elif len(room_stats_delta) > 0: + yield self.store.update_stats_delta( + now, "room", room_id, room_stats_delta + ) + @defer.inlineCallbacks def update_public_room_stats(self, ts, room_id, is_public): """ @@ -308,10 +357,13 @@ class StatsHandler(StateDeltasHandler): for user_id in user_ids: if self.hs.is_mine(UserID.from_string(user_id)): yield self.store.update_stats_delta( - ts, "user", user_id, "public_rooms", +1 if is_public else -1 - ) - yield self.store.update_stats_delta( - ts, "user", user_id, "private_rooms", -1 if is_public else +1 + ts, + "user", + user_id, + { + "public_rooms": +1 if is_public else -1, + "private_rooms": -1 if is_public else +1, + }, ) @defer.inlineCallbacks diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 55e4e84d71..d8e1864a08 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -845,6 +845,17 @@ class RegistrationStore( (user_id_obj.localpart, create_profile_with_displayname), ) + if self.hs.config.stats_enabled: + # we create a new completed user statistics row + + # we don't strictly need current_token since this user really can't + # have any state deltas before now (as it is a new user), but still, + # we include it for completeness. + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + self._update_stats_delta_txn( + txn, now, "user", user_id, {}, complete_with_stream_id=current_token + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql new file mode 100644 index 0000000000..d987479363 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -0,0 +1,168 @@ +/* Copyright 2018 New Vector Ltd + * Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +----- First clean up from previous versions of room stats. + +-- First remove old stats stuff +DROP TABLE IF EXISTS room_stats; +DROP TABLE IF EXISTS user_stats; +DROP TABLE IF EXISTS room_stats_earliest_tokens; +DROP TABLE IF EXISTS _temp_populate_stats_position; +DROP TABLE IF EXISTS _temp_populate_stats_rooms; +DROP TABLE IF EXISTS stats_stream_pos; + +-- Unschedule old background updates if they're still scheduled +DELETE FROM background_updates WHERE update_name IN ( + 'populate_stats_createtables', + 'populate_stats_process_rooms', + 'populate_stats_cleanup', + 'regen_stats' +); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +CREATE TABLE IF NOT EXISTS stats_incremental_position ( + -- the stream_id of the last-processed state delta + state_delta_stream_id BIGINT, + + -- the stream_ordering of the last-processed backfilled event + -- (this is negative) + total_events_min_stream_ordering BIGINT, + + -- the stream_ordering of the last-processed normally-created event + -- (this is positive) + total_events_max_stream_ordering BIGINT, + + -- If true, this represents the contract agreed upon by the background + -- population processor. + -- If false, this is suitable for use by the delta/incremental processor. + is_background_contract BOOLEAN NOT NULL PRIMARY KEY +); + +-- insert a null row and make sure it is the only one. +DELETE FROM stats_incremental_position; +INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract +) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); + +-- represents PRESENT room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + -- These starts cover the time from start_ts…end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + start_ts BIGINT, + end_ts BIGINT, + + current_state_events INT NOT NULL DEFAULT 0, + total_events INT NOT NULL DEFAULT 0, + joined_members INT NOT NULL DEFAULT 0, + invited_members INT NOT NULL DEFAULT 0, + left_members INT NOT NULL DEFAULT 0, + banned_members INT NOT NULL DEFAULT 0, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT, + + CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL)) +); + + +-- represents HISTORICAL room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_historical ( + room_id TEXT NOT NULL, + -- These starts cover the time from (end_ts - bucket_size)…end_ts (in seconds). + -- Note that end_ts is quantised, and start_ts usually so. + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular room.) + + +-- represents PRESENT statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + -- The timestamp that represents the start of the + start_ts BIGINT, + end_ts BIGINT, + + public_rooms INT DEFAULT 0 NOT NULL, + private_rooms INT DEFAULT 0 NOT NULL, + + -- If initial background count is still to be performed: NULL + -- If initial background count has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + +-- represents HISTORICAL statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts); + +-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular user.) + + +-- Set up staging tables +-- we depend on current_state_events_membership because this is used +-- in our counting. +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_prepare', '{}', 'current_state_events_membership'); + +-- Run through each room and update stats +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_rooms', '{}', 'populate_stats_prepare'); + +-- Run through each user and update stats. +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_process_users', '{}', 'populate_stats_process_rooms'); + +-- Clean up staging tables +INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES + ('populate_stats_cleanup', '{}', 'populate_stats_process_users'); diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py new file mode 100644 index 0000000000..c5e3208adb --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.py @@ -0,0 +1,87 @@ +# -*- coding: utf-8 -*- +# Copyright 2019 The Matrix.org Foundation C.I.C. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +# This schema delta will be run after 'stats_separated1.sql' due to lexicographic +# ordering. Note that it MUST be so. +from synapse.storage.engines import PostgresEngine, Sqlite3Engine + + +def _run_create_generic(stats_type, cursor, database_engine): + """ + Creates the pertinent (partial, if supported) indices for one kind of stats. + Args: + stats_type: "room" or "user" – the type of stats + cursor: Database Cursor + database_engine: Database Engine + """ + if isinstance(database_engine, Sqlite3Engine): + # even though SQLite >= 3.8 can support partial indices, we won't enable + # them, in case the SQLite database may be later used on another system. + # It's also the case that SQLite is only likely to be used in small + # deployments or testing, where the optimisations gained by use of a + # partial index are not a big concern. + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts); + """ + % (stats_type, stats_type) + ) + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (completed_delta_stream_id, %s_id); + """ + % (stats_type, stats_type, stats_type) + ) + elif isinstance(database_engine, PostgresEngine): + # This partial index helps us with finding dirty stats rows + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_current_dirty + ON %s_stats_current (end_ts) + WHERE end_ts IS NOT NULL; + """ + % (stats_type, stats_type) + ) + # This partial index helps us with old collection + cursor.execute( + """ + CREATE INDEX IF NOT EXISTS %s_stats_not_complete + ON %s_stats_current (%s_id) + WHERE completed_delta_stream_id IS NULL; + """ + % (stats_type, stats_type, stats_type) + ) + else: + raise NotImplementedError("Unknown database engine.") + + +def run_create(cursor, database_engine): + """ + This function is called as part of the schema delta. + It will create indices – partial, if supported – for the new 'separated' + room & user statistics. + """ + _run_create_generic("room", cursor, database_engine) + _run_create_generic("user", cursor, database_engine) + + +def run_upgrade(cur, database_engine, config): + """ + This function is run on a database upgrade (of a non-empty database). + We have no need to do anything specific here. + """ + pass diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py index 5fdb442104..75b2217c97 100644 --- a/synapse/storage/state_deltas.py +++ b/synapse/storage/state_deltas.py @@ -31,7 +31,7 @@ class StateDeltasStore(SQLBaseStore): - state_key (str): - event_id (str|None): new event_id for this state key. None if the state has been deleted. - - prev_event_id (str|None): previous event_id for this state key. None + - prev_event_id (str): previous event_id for this state key. None if it's new state. Args: diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e13efed417..b6959f7967 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,10 +15,13 @@ # limitations under the License. import logging +from itertools import chain +from threading import Lock from twisted.internet import defer from synapse.api.constants import EventTypes, Membership +from synapse.storage.engines import Sqlite3Engine from synapse.storage.prepare_database import get_statements from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached @@ -32,14 +36,21 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "total_events", ), "user": ("public_rooms", "private_rooms"), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +PER_SLICE_FIELDS = {"room": (), "user": ()} -TEMP_TABLE = "_temp_populate_stats" +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} + + +class OldCollectionRequired(Exception): + """ Signal that we need to collect old stats rows and retry. """ + + pass class StatsStore(StateDeltasStore): @@ -51,121 +62,320 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size + self.stats_delta_processing_lock = Lock() + self.register_background_update_handler( - "populate_stats_createtables", self._populate_stats_createtables + "populate_stats_prepare", self._populate_stats_prepare ) self.register_background_update_handler( "populate_stats_process_rooms", self._populate_stats_process_rooms ) self.register_background_update_handler( - "populate_stats_cleanup", self._populate_stats_cleanup + "populate_stats_process_users", self._populate_stats_process_users + ) + # we no longer need to perform clean-up, but we will give ourselves + # the potential to reintroduce it in the future – so documentation + # will still encourage the use of this no-op handler. + self.register_noop_background_update("populate_stats_cleanup") + + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. + + Args: + ts: the timestamp to quantise, in seconds since the Unix Epoch + + Returns: + a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. + """ + return (ts // self.stats_bucket_size) * self.stats_bucket_size + + @defer.inlineCallbacks + def _unwedge_incremental_processor(self, forced_promise): + """ + Make a promise about what this initial background count will handle, + so that we can allow the incremental processor to start doing things + right away – 'unwedging' it. + """ + + if forced_promise is None: + promised_stats_delta_pos = ( + yield self.get_max_stream_id_in_current_state_deltas() + ) + + promised_max = self.get_room_max_stream_ordering() + promised_min = self.get_room_min_stream_ordering() + + promised_positions = { + "state_delta_stream_id": promised_stats_delta_pos, + "total_events_min_stream_ordering": promised_min, + "total_events_max_stream_ordering": promised_max, + } + else: + promised_positions = forced_promise + + # this stores it for our reference later + yield self.update_stats_positions( + promised_positions, for_initial_processor=True + ) + + # this unwedges the incremental processor + yield self.update_stats_positions( + promised_positions, for_initial_processor=False ) + # with the delta processor unwedged, now let it catch up in case + # anything was missed during the wedge period + self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event) + @defer.inlineCallbacks - def _populate_stats_createtables(self, progress, batch_size): + def _populate_stats_prepare(self, progress, batch_size): + """ + This is a background update, which prepares the database for + statistics regeneration. + """ if not self.stats_enabled: - yield self._end_background_update("populate_stats_createtables") + yield self._end_background_update("populate_stats_prepare") return 1 - # Get all the rooms that we want to process. - def _make_staging_area(txn): - # Create the temporary tables - stmts = get_statements( + def _wedge_incremental_processor(txn): + """ + Wedge the incremental processor (by setting its positions to NULL), + and return its previous positions – atomically. + """ + + with self.stats_delta_processing_lock: + old = self._get_stats_positions_txn(txn, for_initial_processor=False) + self._update_stats_positions_txn(txn, None, for_initial_processor=False) + + return old + + def _make_skeletons(txn): + """ + Get all the rooms and users that we want to process, and create + 'skeletons' (incomplete _stats_current rows) for them, if they do + not already have a row. + """ + + if isinstance(self.database_engine, Sqlite3Engine): + sqls = """ + INSERT OR IGNORE INTO room_stats_current (room_id) + SELECT room_id FROM rooms; + + INSERT OR IGNORE INTO user_stats_current (user_id) + SELECT name AS user_id FROM users; + """ + else: + sqls = """ + INSERT INTO room_stats_current (room_id) + SELECT room_id FROM rooms + ON CONFLICT DO NOTHING; + + INSERT INTO user_stats_current (user_id) + SELECT name AS user_id FROM users + ON CONFLICT DO NOTHING; """ - -- We just recreate the table, we'll be reinserting the - -- correct entries again later anyway. - DROP TABLE IF EXISTS {temp}_rooms; - - CREATE TABLE IF NOT EXISTS {temp}_rooms( - room_id TEXT NOT NULL, - events BIGINT NOT NULL - ); - - CREATE INDEX {temp}_rooms_events - ON {temp}_rooms(events); - CREATE INDEX {temp}_rooms_id - ON {temp}_rooms(room_id); - """.format( - temp=TEMP_TABLE - ).splitlines() - ) - for statement in stmts: + for statement in get_statements(sqls.splitlines()): txn.execute(statement) - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) + def _delete_dirty_skeletons(txn): + """ + Delete pre-existing rows which are incomplete. + """ + sqls = """ + DELETE FROM room_stats_current + WHERE completed_delta_stream_id IS NULL; - # Get rooms we want to process from the database, only adding - # those that we haven't (i.e. those not in room_stats_earliest_token) - sql = """ - INSERT INTO %s_rooms (room_id, events) - SELECT c.room_id, count(*) FROM current_state_events AS c - LEFT JOIN room_stats_earliest_token AS t USING (room_id) - WHERE t.room_id IS NULL - GROUP BY c.room_id - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) + DELETE FROM user_stats_current + WHERE completed_delta_stream_id IS NULL; + """ - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.runInteraction("populate_stats_temp_build", _make_staging_area) - yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) - self.get_earliest_token_for_room_stats.invalidate_all() + for statement in get_statements(sqls.splitlines()): + txn.execute(statement) - yield self._end_background_update("populate_stats_createtables") + # first wedge the incremental processor and reset our promise + old_positions = yield self.runInteraction( + "populate_stats_wedge", _wedge_incremental_processor + ) + + if None in old_positions.values(): + old_positions = None + + # with the incremental processor wedged, we delete dirty skeleton rows + # since we don't want to double-count them. + yield self.runInteraction( + "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons + ) + + yield self._unwedge_incremental_processor(old_positions) + + yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons) + self.get_earliest_token_for_stats.invalidate_all() + + yield self._end_background_update("populate_stats_prepare") return 1 @defer.inlineCallbacks - def _populate_stats_cleanup(self, progress, batch_size): + def _populate_stats_process_users(self, progress, batch_size): """ - Update the user directory stream position, then clean up the old tables. + This is a background update which regenerates statistics for users. """ if not self.stats_enabled: - yield self._end_background_update("populate_stats_cleanup") + yield self._end_background_update("populate_stats_process_users") + return 1 + + def _get_next_batch(txn): + # Only fetch 250 users, so we don't fetch too many at once, even + # if those 250 users have less than batch_size state events. + sql = """ + SELECT user_id FROM user_stats_current + WHERE completed_delta_stream_id IS NULL + LIMIT 250 + """ + txn.execute(sql) + users_to_work_on = txn.fetchall() + + if not users_to_work_on: + return None + + # Get how many are left to process, so we can give status on how + # far we are in processing + txn.execute( + "SELECT COUNT(*) FROM room_stats_current" + " WHERE completed_delta_stream_id IS NULL" + ) + progress["remaining"] = txn.fetchone()[0] + + return users_to_work_on + + users_to_work_on = yield self.runInteraction( + "populate_stats_users_get_batch", _get_next_batch + ) + + # No more users -- complete the transaction. + if not users_to_work_on: + yield self._end_background_update("populate_stats_process_users") return 1 - position = yield self._simple_select_one_onecol( - TEMP_TABLE + "_position", None, "position" + logger.info( + "Processing the next %d users of %d remaining", + len(users_to_work_on), + progress["remaining"], ) - yield self.update_stats_stream_pos(position) - def _delete_staging_area(txn): - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") + processed_membership_count = 0 - yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) + promised_positions = yield self.get_stats_positions(for_initial_processor=True) - yield self._end_background_update("populate_stats_cleanup") - return 1 + if None in promised_positions: + logger.error( + "There is a None in promised_positions;" + " dependency task must not have been run." + " promised_positions: %s", + promised_positions, + ) + yield self._end_background_update("populate_stats_process_users") + return 1 + + for (user_id,) in users_to_work_on: + now = self.hs.get_reactor().seconds() + + def _process_user(txn): + # Get the current token + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + + sql = """ + SELECT + ( + join_rules = 'public' + OR history_visibility = 'world_readable' + ) AS is_public, + COUNT(*) AS count + FROM room_memberships + JOIN room_state USING (room_id) + WHERE + user_id = ? AND membership = 'join' + GROUP BY is_public + """ + txn.execute(sql, (user_id,)) + room_counts_by_publicness = dict(txn.fetchall()) + + try: + self._update_stats_delta_txn( + txn, + now, + "user", + user_id, + {}, + complete_with_stream_id=current_token, + absolute_fields={ + # these are counted absolutely because it is + # more difficult to count them from the promised time, + # because counting them now can use the quick lookup + # tables. + "public_rooms": room_counts_by_publicness.get(True, 0), + "private_rooms": room_counts_by_publicness.get(False, 0), + }, + ) + except OldCollectionRequired: + # this can't (shouldn't) actually happen + # since we only run the background update for incomplete rows + # and incomplete rows can never be old. + # However, if it does, the most graceful handling is just to + # ignore it – and carry on processing other users. + logger.error( + "Supposedly Impossible: OldCollectionRequired in initial" + " background update, for user ID %s", + user_id, + exc_info=True, + ) + pass + + # we use this count for rate-limiting + return sum(room_counts_by_publicness.values()) + + processed_membership_count += yield self.runInteraction( + "update_user_stats", _process_user + ) + + # Update the remaining counter. + progress["remaining"] -= 1 + + if processed_membership_count > batch_size: + # Don't process any more users, we've hit our batch size. + return processed_membership_count + + yield self.runInteraction( + "populate_stats", + self._background_update_progress_txn, + "populate_stats_process_users", + progress, + ) + + return processed_membership_count @defer.inlineCallbacks def _populate_stats_process_rooms(self, progress, batch_size): - + """ + This is a background update which regenerates statistics for rooms. + """ if not self.stats_enabled: yield self._end_background_update("populate_stats_process_rooms") return 1 - # If we don't have progress filed, delete everything. - if not progress: - yield self.delete_all_stats() - def _get_next_batch(txn): # Only fetch 250 rooms, so we don't fetch too many at once, even # if those 250 rooms have less than batch_size state events. sql = """ - SELECT room_id, events FROM %s_rooms - ORDER BY events DESC + SELECT room_id FROM room_stats_current + WHERE completed_delta_stream_id IS NULL LIMIT 250 - """ % ( - TEMP_TABLE, - ) + """ txn.execute(sql) rooms_to_work_on = txn.fetchall() @@ -174,13 +384,16 @@ class StatsStore(StateDeltasStore): # Get how many are left to process, so we can give status on how # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") + txn.execute( + "SELECT COUNT(*) FROM room_stats_current" + " WHERE completed_delta_stream_id IS NULL" + ) progress["remaining"] = txn.fetchone()[0] return rooms_to_work_on rooms_to_work_on = yield self.runInteraction( - "populate_stats_temp_read", _get_next_batch + "populate_stats_rooms_get_batch", _get_next_batch ) # No more rooms -- complete the transaction. @@ -197,8 +410,19 @@ class StatsStore(StateDeltasStore): # Number of state events we've processed by going through each room processed_event_count = 0 - for room_id, event_count in rooms_to_work_on: + promised_positions = yield self.get_stats_positions(for_initial_processor=True) + + if None in promised_positions: + logger.error( + "There is a None in promised_positions;" + " dependency task must not have been run." + " promised_positions: %s", + promised_positions, + ) + yield self._end_background_update("populate_stats_process_rooms") + return 1 + for (room_id,) in rooms_to_work_on: current_state_ids = yield self.get_current_state_ids(room_id) join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) @@ -248,11 +472,7 @@ class StatsStore(StateDeltasStore): now = self.hs.get_reactor().seconds() - # quantise time to the nearest bucket - now = (now // self.stats_bucket_size) * self.stats_bucket_size - def _fetch_data(txn): - # Get the current token of the room current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) @@ -260,82 +480,234 @@ class StatsStore(StateDeltasStore): membership_counts = self._get_user_counts_in_room_txn(txn, room_id) - total_state_events = self._get_total_state_event_counts_txn( - txn, room_id - ) - - self._update_stats_txn( + room_total_event_count = self._count_events_in_room_txn( txn, - "room", room_id, - now, - { - "bucket_size": self.stats_bucket_size, - "current_state_events": current_state_events, - "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get(Membership.INVITE, 0), - "left_members": membership_counts.get(Membership.LEAVE, 0), - "banned_members": membership_counts.get(Membership.BAN, 0), - "state_events": total_state_events, - }, - ) - self._simple_insert_txn( - txn, - "room_stats_earliest_token", - {"room_id": room_id, "token": current_token}, + promised_positions["total_events_min_stream_ordering"], + promised_positions["total_events_max_stream_ordering"], ) - # We've finished a room. Delete it from the table. - self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id} - ) - - yield self.runInteraction("update_room_stats", _fetch_data) + try: + self._update_stats_delta_txn( + txn, + now, + "room", + room_id, + {"total_events": room_total_event_count}, + complete_with_stream_id=current_token, + absolute_fields={ + # these are counted absolutely because it is + # more difficult to count them from the promised time, + # because counting them now can use the quick lookup + # tables. + "current_state_events": current_state_events, + "joined_members": membership_counts.get(Membership.JOIN, 0), + "invited_members": membership_counts.get( + Membership.INVITE, 0 + ), + "left_members": membership_counts.get(Membership.LEAVE, 0), + "banned_members": membership_counts.get(Membership.BAN, 0), + }, + ) + except OldCollectionRequired: + # this can't (shouldn't) actually happen + # since we only run the background update for incomplete rows + # and incomplete rows can never be old. + # However, if it does, the most graceful handling is just to + # ignore it – and carry on processing other rooms. + logger.error( + "Supposedly Impossible: OldCollectionRequired in initial" + " background update, for room ID %s", + room_id, + exc_info=True, + ) + pass + + # we use this count for rate-limiting + return room_total_event_count + + room_event_count = yield self.runInteraction( + "update_room_stats", _fetch_data + ) # Update the remaining counter. progress["remaining"] -= 1 - yield self.runInteraction( - "populate_stats", - self._background_update_progress_txn, - "populate_stats_process_rooms", - progress, - ) - processed_event_count += event_count + processed_event_count += room_event_count if processed_event_count > batch_size: # Don't process any more rooms, we've hit our batch size. return processed_event_count + yield self.runInteraction( + "populate_stats", + self._background_update_progress_txn, + "populate_stats_process_rooms", + progress, + ) + return processed_event_count - def delete_all_stats(self): + def update_total_event_count_between_txn(self, txn, low_pos, high_pos): """ - Delete all statistics records. + Updates the total_events counts for rooms, in a range of stream_orderings. + + Inclusivity of low_pos and high_pos is dependent upon their signs. + This makes it intuitive to use this function for both backfilled + and non-backfilled events. + + Examples: + (low, high) → (kind) + (3, 7) → 3 < … <= 7 (normal-filled; low already processed before) + (-4, -2) → -4 <= … < -2 (backfilled; high already processed before) + (-7, 7) → -7 <= … <= 7 (both) + + Args: + txn: Database transaction. It is assumed that you will have one, + since you probably want to update pointers at the same time. + low_pos: Low stream ordering + high_pos: High stream ordering """ - def _delete_all_stats_txn(txn): - txn.execute("DELETE FROM room_state") - txn.execute("DELETE FROM room_stats") - txn.execute("DELETE FROM room_stats_earliest_token") - txn.execute("DELETE FROM user_stats") + if low_pos >= high_pos: + # nothing to do here. + return - return self.runInteraction("delete_all_stats", _delete_all_stats_txn) + now = self.hs.get_reactor().seconds() - def get_stats_stream_pos(self): - return self._simple_select_one_onecol( - table="stats_stream_pos", - keyvalues={}, - retcol="stream_id", - desc="stats_stream_pos", + # we choose comparators based on the signs + low_comparator = "<=" if low_pos < 0 else "<" + high_comparator = "<" if high_pos < 0 else "<=" + + sql = """ + SELECT room_id, COUNT(*) AS new_events + FROM events + WHERE ? %s stream_ordering AND stream_ordering %s ? + GROUP BY room_id + """ % ( + low_comparator, + high_comparator, + ) + + txn.execute(sql, (low_pos, high_pos)) + + for room_id, new_events in txn.fetchall(): + while True: + try: + self._update_stats_delta_txn( + txn, now, "room", room_id, {"total_events": new_events} + ) + break + except OldCollectionRequired: + self._collect_old_txn(txn, "room") + continue + + def _count_events_in_room_txn(self, txn, room_id, low_token, high_token): + """ + Count the number of events in a room between two tokens, inclusive. + Args: + txn (cursor): The database + room_id (str): The ID of the room to count events for + low_token (int): the minimum stream ordering to count + high_token (int): the maximum stream ordering to count + + Returns (int): + the number of events + """ + + sql = """ + SELECT COUNT(*) AS num_events + FROM events + WHERE room_id = ? + AND ? <= stream_ordering + AND stream_ordering <= ? + """ + txn.execute(sql, (room_id, low_token, high_token)) + return txn.fetchone()[0] + + def get_stats_positions(self, for_initial_processor=False): + """ + Returns the stats processor positions. + + Args: + for_initial_processor (bool, optional): If true, returns the position + promised by the latest stats regeneration, rather than the current + incremental processor's position. + Otherwise (if false), return the incremental processor's position. + + Returns (dict): + Dict containing :- + state_delta_stream_id: stream_id of last-processed state delta + total_events_min_stream_ordering: stream_ordering of latest-processed + backfilled event, in the context of total_events counting. + total_events_max_stream_ordering: stream_ordering of latest-processed + non-backfilled event, in the context of total_events counting. + """ + return self._simple_select_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + desc="stats_incremental_position", + ) + + def _get_stats_positions_txn(self, txn, for_initial_processor=False): + """ + See L{get_stats_positions}. + + Args: + txn (cursor): Database cursor + """ + return self._simple_select_one_txn( + txn=txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), ) - def update_stats_stream_pos(self, stream_id): + def update_stats_positions(self, positions, for_initial_processor=False): + """ + Updates the stats processor positions. + + Args: + positions: See L{get_stats_positions} + for_initial_processor: See L{get_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } return self._simple_update_one( - table="stats_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_stats_stream_pos", + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + desc="update_stats_incremental_position", + ) + + def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): + """ + See L{update_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, ) def update_room_state(self, room_id, fields): @@ -367,36 +739,109 @@ class StatsStore(StateDeltasStore): desc="update_room_state", ) - def get_deltas_for_room(self, room_id, start, size=100): + def get_statistics_for_subject(self, stats_type, stats_id, start, size=100): """ - Get statistics deltas for a given room. + Get statistics for a given subject. Args: - room_id (str) + stats_type (str): The type of subject + stats_id (str): The ID of the subject (e.g. room_id or user_id) start (int): Pagination start. Number of entries, not timestamp. size (int): How many entries to return. Returns: Deferred[list[dict]], where the dict has the keys of - ABSOLUTE_STATS_FIELDS["room"] and "ts". + ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts". """ - return self._simple_select_list_paginate( - "room_stats", - {"room_id": room_id}, - "ts", + return self.runInteraction( + "get_statistics_for_subject", + self._get_statistics_for_subject_txn, + stats_type, + stats_id, start, size, - retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), + ) + + def _get_statistics_for_subject_txn( + self, txn, stats_type, stats_id, start, size=100 + ): + """ + Transaction-bound version of L{get_statistics_for_subject}. + """ + + table, id_col = TYPE_TO_TABLE[stats_type] + selected_columns = list( + ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type] + ) + + slice_list = self._simple_select_list_paginate_txn( + txn, + table + "_historical", + {id_col: stats_id}, + "end_ts", + start, + size, + retcols=selected_columns + ["bucket_size", "end_ts"], order_direction="DESC", ) + if len(slice_list) < size: + # also fetch the current row + current = self._simple_select_one_txn( + txn, + table + "_current", + {id_col: stats_id}, + retcols=selected_columns + + ["start_ts", "end_ts", "completed_delta_stream_id"], + allow_none=True, + ) + + if current is not None: + completed = current["completed_delta_stream_id"] is not None + dirty = current["end_ts"] is not None + + if completed and dirty: + # it is dirty, so contains new information, so should be included + # we don't accept incomplete rows as that would almost certainly + # be giving misinformation, since it is awaiting an + # initial background count + current["bucket_size"] = current["end_ts"] - current["start_ts"] + del current["start_ts"] + return [current] + slice_list + return slice_list + def get_all_room_state(self): return self._simple_select_list( "room_state", None, retcols=("name", "topic", "canonical_alias") ) + def get_room_state(self, room_id): + """ + Returns the current room_state for a room. + + Args: + room_id (str): The ID of the room to return state for. + + Returns (dict): + Dictionary containing these keys: + "name", "topic", "canonical_alias", "avatar", "join_rules", + "history_visibility" + """ + return self._simple_select_one( + "room_state", + {"room_id": room_id}, + retcols=( + "name", + "topic", + "canonical_alias", + "avatar", + "join_rules", + "history_visibility", + ), + ) + @cached() - def get_earliest_token_for_room_stats(self, room_id): + def get_earliest_token_for_stats(self, stats_type, id): """ Fetch the "earliest token". This is used by the room stats delta processor to ignore deltas that have been processed between the @@ -406,79 +851,275 @@ class StatsStore(StateDeltasStore): Returns: Deferred[int] """ + table, id_col = TYPE_TO_TABLE[stats_type] + return self._simple_select_one_onecol( - "room_stats_earliest_token", - {"room_id": room_id}, - retcol="token", + "%s_current" % (table,), + {id_col: id}, + retcol="completed_delta_stream_id", allow_none=True, ) - def update_stats(self, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert( - table=table, - keyvalues={id_col: stats_id, "ts": ts}, - values=fields, - desc="update_stats", + def _collect_old_txn(self, txn, stats_type, limit=500): + """ + See {collect_old}. Runs only a small batch, specified by limit. + + Returns (bool): + True iff there is possibly more to do (i.e. this needs re-running), + False otherwise. + + """ + # we do them in batches to prevent concurrent updates from + # messing us over with lots of retries + + now = self.hs.get_reactor().seconds() + quantised_ts = self.quantise_stats_time(now) + table, id_col = TYPE_TO_TABLE[stats_type] + + fields = ", ".join( + field + for field in chain( + ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type] + ) + ) + + # `end_ts IS NOT NULL` is for partial index optimisation + if isinstance(self.database_engine, Sqlite3Engine): + # SQLite doesn't support SELECT FOR UPDATE + sql = ( + "SELECT %s FROM %s_current" + " WHERE end_ts <= ? AND end_ts IS NOT NULL" + " LIMIT %d" + ) % (id_col, table, limit) + else: + sql = ( + "SELECT %s FROM %s_current" + " WHERE end_ts <= ? AND end_ts IS NOT NULL" + " LIMIT %d FOR UPDATE" + ) % (id_col, table, limit) + txn.execute(sql, (quantised_ts,)) + maybe_more = txn.rowcount == limit + updates = txn.fetchall() + + sql = ( + "INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)" + " SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts" + " FROM %s_current WHERE %s = ?" + ) % (table, id_col, fields, id_col, fields, table, id_col) + txn.executemany(sql, updates) + + sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % ( + table, + id_col, ) + txn.executemany(sql, updates) + + return maybe_more + + @defer.inlineCallbacks + def collect_old(self, stats_type): + """ + Run 'old collection' on current stats rows. + + Old collection is the process of copying dirty (updated) stats rows + from the current table to the historical table, when those rows have + finished their stats time slice. + Collected rows are then cleared of their dirty status. + + Args: + stats_type: "room" or "user" – the type of stats to run old collection + on. + + """ + while True: + maybe_more = yield self.runInteraction( + "stats_collect_old", self._collect_old_txn, stats_type + ) + if not maybe_more: + return None + + @defer.inlineCallbacks + def update_stats_delta( + self, ts, stats_type, stats_id, fields, complete_with_stream_id=None + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + """ + + while True: + try: + res = yield self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, + ) + return res + except OldCollectionRequired: + # retry after collecting old rows + yield self.collect_old(stats_type) + + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_fields=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas). + """ + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()] + field_values = list(fields.values()) + + if absolute_fields is not None: + field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()] + field_values += list(absolute_fields.values()) + + if complete_with_stream_id is not None: + field_sqls.append("completed_delta_stream_id = ?") + field_values.append(complete_with_stream_id) + + sql = ( + "UPDATE %s_current SET end_ts = ?, %s" + " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))" + " AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = [end_ts] + list(field_values) + [end_ts, stats_id] + + txn.execute(sql, qargs) - def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert_txn( - txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields + if txn.rowcount > 0: + # success. + return + + # if we're here, it's because we didn't succeed in updating a stats + # row. Why? Let's find out… + + current_row = self._simple_select_one_txn( + txn, + table + "_current", + {id_col: stats_id}, + ("end_ts", "completed_delta_stream_id"), + allow_none=True, ) - def update_stats_delta(self, ts, stats_type, stats_id, field, value): - def _update_stats_delta(txn): - table, id_col = TYPE_TO_ROOM[stats_type] + if current_row is None: + # we need to insert a row! (insert a dirty, incomplete row) + insertee = { + id_col: stats_id, + "end_ts": end_ts, + "start_ts": ts, + "completed_delta_stream_id": complete_with_stream_id, + } + + # we assume that, by default, blank fields should be zero. + for field_name in ABSOLUTE_STATS_FIELDS[stats_type]: + insertee[field_name] = 0 + + for field_name in PER_SLICE_FIELDS[stats_type]: + insertee[field_name] = 0 + + for (field, value) in fields.items(): + insertee[field] = value + + if absolute_fields is not None: + for (field, value) in absolute_fields.items(): + insertee[field] = value + + self._simple_insert_txn(txn, table + "_current", insertee) + elif current_row["end_ts"] is None: + # update the row, including start_ts sql = ( - "SELECT * FROM %s" - " WHERE %s=? and ts=(" - " SELECT MAX(ts) FROM %s" - " WHERE %s=?" - ")" - ) % (table, id_col, table, id_col) - txn.execute(sql, (stats_id, stats_id)) - rows = self.cursor_to_dict(txn) - if len(rows) == 0: - # silently skip as we don't have anything to apply a delta to yet. - # this tries to minimise any race between the initial sync and - # subsequent deltas arriving. - return - - current_ts = ts - latest_ts = rows[0]["ts"] - if current_ts < latest_ts: - # This one is in the past, but we're just encountering it now. - # Mark it as part of the current bucket. - current_ts = latest_ts - elif ts != latest_ts: - # we have to copy our absolute counters over to the new entry. - values = { - key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] - } - values[id_col] = stats_id - values["ts"] = ts - values["bucket_size"] = self.stats_bucket_size - - self._simple_insert_txn(txn, table=table, values=values) - - # actually update the new value - if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: - self._simple_update_txn( - txn, - table=table, - keyvalues={id_col: stats_id, "ts": current_ts}, - updatevalues={field: value}, - ) - else: - sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( - table, - field, - field, - id_col, + "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s" + " WHERE end_ts IS NULL AND %s = ?" + ) % (table, ", ".join(field_sqls), id_col) + + qargs = ( + [end_ts - self.stats_bucket_size, end_ts] + + list(field_values) + + [stats_id] + ) + + txn.execute(sql, qargs) + if txn.rowcount == 0: + raise RuntimeError( + "Should be impossible: No rows updated" + " but all conditions are known to be met." ) - txn.execute(sql, (value, stats_id, current_ts)) - return self.runInteraction("update_stats_delta", _update_stats_delta) + elif current_row["end_ts"] < end_ts: + # we need to perform old collection first + raise OldCollectionRequired() + + def incremental_update_total_events(self, in_positions): + """ + Counts the number of events per-room and then adds these to the respective + total_events room counts. + + Args: + in_positions (dict): Positions, + as retrieved from L{get_stats_positions}. + + Returns (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + """ + + def incremental_update_total_events_txn(txn): + positions = in_positions.copy() + + max_pos = self.get_room_max_stream_ordering() + min_pos = self.get_room_min_stream_ordering() + self.update_total_event_count_between_txn( + txn, + low_pos=positions["total_events_max_stream_ordering"], + high_pos=max_pos, + ) + + self.update_total_event_count_between_txn( + txn, + low_pos=min_pos, + high_pos=positions["total_events_min_stream_ordering"], + ) + + if ( + positions["total_events_max_stream_ordering"] != max_pos + or positions["total_events_min_stream_ordering"] != min_pos + ): + positions["total_events_max_stream_ordering"] = max_pos + positions["total_events_min_stream_ordering"] = min_pos + + self._update_stats_positions_txn(txn, positions) + + return positions + + return self.runInteraction( + "stats_incremental_total_events", incremental_update_total_events_txn + ) |