diff options
-rw-r--r-- | changelog.d/5879.misc | 1 | ||||
-rw-r--r-- | changelog.d/5930.misc | 1 | ||||
-rw-r--r-- | synapse/config/stats.py | 13 | ||||
-rw-r--r-- | synapse/handlers/stats.py | 227 | ||||
-rw-r--r-- | synapse/rest/client/versions.py | 7 | ||||
-rw-r--r-- | synapse/storage/events.py | 5 | ||||
-rw-r--r-- | synapse/storage/registration.py | 12 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated1.sql | 144 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated2.sql.postgres | 24 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated2.sql.sqlite | 27 | ||||
-rw-r--r-- | synapse/storage/stats.py | 872 | ||||
-rw-r--r-- | tests/handlers/test_stats.py | 304 |
12 files changed, 856 insertions, 781 deletions
diff --git a/changelog.d/5879.misc b/changelog.d/5879.misc new file mode 100644 index 0000000000..e1c829772f --- /dev/null +++ b/changelog.d/5879.misc @@ -0,0 +1 @@ +Rework room and user statistics to separate current & historical rows, as well as track stats correctly. diff --git a/changelog.d/5930.misc b/changelog.d/5930.misc deleted file mode 100644 index 81dcc10e6d..0000000000 --- a/changelog.d/5930.misc +++ /dev/null @@ -1 +0,0 @@ -Add temporary flag to /versions in unstable_features to indicate this Synapse supports receiving id_access_token parameters on calls to identity server-proxying endpoints. \ No newline at end of file diff --git a/synapse/config/stats.py b/synapse/config/stats.py index b518a3ed9c..b18ddbd1fa 100644 --- a/synapse/config/stats.py +++ b/synapse/config/stats.py @@ -27,19 +27,16 @@ class StatsConfig(Config): def read_config(self, config, **kwargs): self.stats_enabled = True - self.stats_bucket_size = 86400 + self.stats_bucket_size = 86400 * 1000 self.stats_retention = sys.maxsize stats_config = config.get("stats", None) if stats_config: self.stats_enabled = stats_config.get("enabled", self.stats_enabled) - self.stats_bucket_size = ( - self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000 + self.stats_bucket_size = self.parse_duration( + stats_config.get("bucket_size", "1d") ) - self.stats_retention = ( - self.parse_duration( - stats_config.get("retention", "%ds" % (sys.maxsize,)) - ) - / 1000 + self.stats_retention = self.parse_duration( + stats_config.get("retention", "%ds" % (sys.maxsize,)) ) def generate_config_section(self, config_dir_path, server_name, **kwargs): diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 4449da6669..f44adfc07b 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler): # The current position in the current_state_delta stream self.pos = None - # Guard to ensure we only process deltas one at a time - self._is_processing = False - if hs.config.stats_enabled: self.notifier.add_replication_callback(self.notify_new_event) @@ -65,43 +62,60 @@ class StatsHandler(StateDeltasHandler): if not self.hs.config.stats_enabled: return - if self._is_processing: - return + lock = self.store.stats_delta_processing_lock @defer.inlineCallbacks def process(): + yield lock.acquire() try: yield self._unsafe_process() finally: - self._is_processing = False + yield lock.release() - self._is_processing = True - run_as_background_process("stats.notify_new_event", process) + if not lock.locked: + # we only want to run this process one-at-a-time, + # and also, if the initial background updater wants us to keep out, + # we should respect that. + run_as_background_process("stats.notify_new_event", process) @defer.inlineCallbacks def _unsafe_process(self): # If self.pos is None then means we haven't fetched it from DB - if self.pos is None: - self.pos = yield self.store.get_stats_stream_pos() + # If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us + # but note that this might be outdated, so we retrieve the positions again. + if self.pos is None or None in self.pos.values(): + self.pos = yield self.store.get_stats_positions() - # If still None then the initial background update hasn't happened yet - if self.pos is None: + # If still contains a None position, then the stats regenerator hasn't started yet + if None in self.pos.values(): return None # Loop round handling deltas until we're up to date + while True: with Measure(self.clock, "stats_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) - if not deltas: - return + deltas = yield self.store.get_current_state_deltas( + self.pos["state_delta_stream_id"] + ) - logger.info("Handling %d state deltas", len(deltas)) + logger.debug("Handling %d state deltas", len(deltas)) yield self._handle_deltas(deltas) - self.pos = deltas[-1]["stream_id"] - yield self.store.update_stats_stream_pos(self.pos) + self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"] + yield self.store.update_stats_positions(self.pos) + + event_processing_positions.labels("stats").set( + self.pos["state_delta_stream_id"] + ) + + # Then count deltas for total_events and total_event_bytes. + with Measure(self.clock, "stats_total_events_and_bytes"): + self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes( + self.pos + ) - event_processing_positions.labels("stats").set(self.pos) + if not deltas and not had_counts: + break @defer.inlineCallbacks def _handle_deltas(self, deltas): @@ -119,7 +133,7 @@ class StatsHandler(StateDeltasHandler): logger.debug("Handling: %r %r, %s", typ, state_key, event_id) - token = yield self.store.get_earliest_token_for_room_stats(room_id) + token = yield self.store.get_earliest_token_for_stats("room", room_id) # If the earliest token to begin from is larger than our current # stream ID, skip processing this delta. @@ -131,7 +145,10 @@ class StatsHandler(StateDeltasHandler): continue if event_id is None and prev_event_id is None: - # Errr... + logger.error( + "event ID is None and so is the previous event ID. stream_id: %s", + stream_id, + ) continue event_content = {} @@ -143,92 +160,87 @@ class StatsHandler(StateDeltasHandler): # We use stream_pos here rather than fetch by event_id as event_id # may be None - now = yield self.store.get_received_ts_by_stream_pos(stream_pos) + stream_timestamp = yield self.store.get_received_ts_by_stream_pos( + stream_pos + ) + stream_timestamp = int(stream_timestamp) + + # All the values in this dict are deltas (RELATIVE changes) + room_stats_delta = {} + is_newly_created = False - # quantise time to the nearest bucket - now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size + if prev_event_id is None: + # this state event doesn't overwrite another, + # so it is a new effective/current state event + room_stats_delta["current_state_events"] = 1 if typ == EventTypes.Member: # we could use _get_key_change here but it's a bit inefficient # given we're not testing for a specific result; might as well # just grab the prev_membership and membership strings and # compare them. - prev_event_content = {} + # We take None rather than leave as a previous membership + # in the absence of a previous event because we do not want to + # reduce the leave count when a new-to-the-room user joins. + prev_membership = None if prev_event_id is not None: prev_event = yield self.store.get_event( prev_event_id, allow_none=True ) if prev_event: prev_event_content = prev_event.content + prev_membership = prev_event_content.get( + "membership", Membership.LEAVE + ) membership = event_content.get("membership", Membership.LEAVE) - prev_membership = prev_event_content.get("membership", Membership.LEAVE) - if prev_membership == membership: - continue - - if prev_membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", -1 - ) + if prev_membership is None: + logger.debug("No previous membership for this user.") + elif membership == prev_membership: + pass # noop + elif prev_membership == Membership.JOIN: + room_stats_delta["joined_members"] = -1 elif prev_membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", -1 - ) + room_stats_delta["invited_members"] = -1 elif prev_membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", -1 - ) + room_stats_delta["left_members"] = -1 elif prev_membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", -1 - ) + room_stats_delta["banned_members"] = -1 else: - err = "%s is not a valid prev_membership" % (repr(prev_membership),) - logger.error(err) - raise ValueError(err) + raise ValueError( + "%r is not a valid prev_membership" % (prev_membership,) + ) + if membership == prev_membership: + pass # noop if membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", +1 - ) + room_stats_delta["joined_members"] = +1 elif membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", +1 - ) + room_stats_delta["invited_members"] = +1 elif membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", +1 - ) + room_stats_delta["left_members"] = +1 elif membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", +1 - ) + room_stats_delta["banned_members"] = +1 else: - err = "%s is not a valid membership" % (repr(membership),) - logger.error(err) - raise ValueError(err) + raise ValueError("%r is not a valid membership" % (membership,)) user_id = state_key if self.is_mine_id(user_id): - # update user_stats as it's one of our users - public = yield self._is_public_room(room_id) + # this accounts for transitions like leave → ban and so on. + has_changed_joinedness = (prev_membership == Membership.JOIN) != ( + membership == Membership.JOIN + ) + + if has_changed_joinedness: + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + field = "public_rooms" if public else "private_rooms" + delta = +1 if membership == Membership.JOIN else -1 - if membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - -1, - ) - elif membership == Membership.JOIN: yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - +1, + stream_timestamp, "user", user_id, {field: delta} ) elif typ == EventTypes.Create: @@ -246,28 +258,50 @@ class StatsHandler(StateDeltasHandler): }, ) + is_newly_created = True + elif typ == EventTypes.JoinRules: + old_room_state = yield self.store.get_room_state(room_id) yield self.store.update_room_state( room_id, {"join_rules": event_content.get("join_rule")} ) - is_public = yield self._get_key_change( - prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + # whether the room would be public anyway, + # because of history_visibility + other_field_gives_publicity = ( + old_room_state["history_visibility"] == "world_readable" ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + yield self.update_public_room_stats( + stream_timestamp, room_id, is_public + ) elif typ == EventTypes.RoomHistoryVisibility: + old_room_state = yield self.store.get_room_state(room_id) yield self.store.update_room_state( room_id, {"history_visibility": event_content.get("history_visibility")}, ) - is_public = yield self._get_key_change( - prev_event_id, event_id, "history_visibility", "world_readable" + # whether the room would be public anyway, + # because of join_rule + other_field_gives_publicity = ( + old_room_state["join_rules"] == JoinRules.PUBLIC ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) + + if not other_field_gives_publicity: + is_public = yield self._get_key_change( + prev_event_id, event_id, "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats( + stream_timestamp, room_id, is_public + ) elif typ == EventTypes.Encryption: yield self.store.update_room_state( @@ -290,6 +324,20 @@ class StatsHandler(StateDeltasHandler): room_id, {"canonical_alias": event_content.get("alias")} ) + if is_newly_created: + yield self.store.update_stats_delta( + stream_timestamp, + "room", + room_id, + room_stats_delta, + complete_with_stream_id=stream_id, + ) + + elif len(room_stats_delta) > 0: + yield self.store.update_stats_delta( + stream_timestamp, "room", room_id, room_stats_delta + ) + @defer.inlineCallbacks def update_public_room_stats(self, ts, room_id, is_public): """ @@ -308,10 +356,13 @@ class StatsHandler(StateDeltasHandler): for user_id in user_ids: if self.hs.is_mine(UserID.from_string(user_id)): yield self.store.update_stats_delta( - ts, "user", user_id, "public_rooms", +1 if is_public else -1 - ) - yield self.store.update_stats_delta( - ts, "user", user_id, "private_rooms", -1 if is_public else +1 + ts, + "user", + user_id, + { + "public_rooms": +1 if is_public else -1, + "private_rooms": -1 if is_public else +1, + }, ) @defer.inlineCallbacks diff --git a/synapse/rest/client/versions.py b/synapse/rest/client/versions.py index c51c9e617d..0e09191632 100644 --- a/synapse/rest/client/versions.py +++ b/synapse/rest/client/versions.py @@ -44,12 +44,7 @@ class VersionsRestServlet(RestServlet): "r0.5.0", ], # as per MSC1497: - "unstable_features": { - "m.lazy_load_members": True, - # as per https://github.com/matrix-org/synapse/issues/5927 - # to be removed in r0.6.0 - "m.id_access_token": True, - }, + "unstable_features": {"m.lazy_load_members": True}, }, ) diff --git a/synapse/storage/events.py b/synapse/storage/events.py index 32050868ff..1958afe1d7 100644 --- a/synapse/storage/events.py +++ b/synapse/storage/events.py @@ -2270,8 +2270,9 @@ class EventsStore( "room_aliases", "room_depth", "room_memberships", - "room_state", - "room_stats", + "room_stats_state", + "room_stats_current", + "room_stats_historical", "room_stats_earliest_token", "rooms", "stream_ordering_to_exterm", diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py index 3f50324253..2d3c7e2dc9 100644 --- a/synapse/storage/registration.py +++ b/synapse/storage/registration.py @@ -869,6 +869,17 @@ class RegistrationStore( (user_id_obj.localpart, create_profile_with_displayname), ) + if self.hs.config.stats_enabled: + # we create a new completed user statistics row + + # we don't strictly need current_token since this user really can't + # have any state deltas before now (as it is a new user), but still, + # we include it for completeness. + current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) + self._update_stats_delta_txn( + txn, now, "user", user_id, {}, complete_with_stream_id=current_token + ) + self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,)) txn.call_after(self.is_guest.invalidate, (user_id,)) @@ -1140,6 +1151,7 @@ class RegistrationStore( deferred str|None: A str representing a link to redirect the user to if there is one. """ + # Insert everything into a transaction in order to run atomically def validate_threepid_session_txn(txn): row = self._simple_select_one_txn( diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql new file mode 100644 index 0000000000..6d4648c0d7 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -0,0 +1,144 @@ +/* Copyright 2018 New Vector Ltd + * Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +----- First clean up from previous versions of room stats. + +-- First remove old stats stuff +DROP TABLE IF EXISTS room_stats; +DROP TABLE IF EXISTS user_stats; +DROP TABLE IF EXISTS room_stats_earliest_tokens; +DROP TABLE IF EXISTS _temp_populate_stats_position; +DROP TABLE IF EXISTS _temp_populate_stats_rooms; +DROP TABLE IF EXISTS stats_stream_pos; + +-- Unschedule old background updates if they're still scheduled +DELETE FROM background_updates WHERE update_name IN ( + 'populate_stats_createtables', + 'populate_stats_process_rooms', + 'populate_stats_cleanup' +); + +----- Create tables for our version of room stats. + +-- single-row table to track position of incremental updates +CREATE TABLE IF NOT EXISTS stats_incremental_position ( + -- the stream_id of the last-processed state delta + state_delta_stream_id BIGINT, + + -- the stream_ordering of the last-processed backfilled event + -- (this is negative) + total_events_min_stream_ordering BIGINT, + + -- the stream_ordering of the last-processed normally-created event + -- (this is positive) + total_events_max_stream_ordering BIGINT, + + -- If true, this represents the contract agreed upon by the stats + -- regenerator. + -- If false, this is suitable for use by the delta/incremental processor. + is_background_contract BOOLEAN NOT NULL PRIMARY KEY +); + +-- insert a null row and make sure it is the only one. +DELETE FROM stats_incremental_position; +INSERT INTO stats_incremental_position ( + state_delta_stream_id, + total_events_min_stream_ordering, + total_events_max_stream_ordering, + is_background_contract +) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1)); + +-- represents PRESENT room statistics for a room +-- only holds absolute fields +CREATE TABLE IF NOT EXISTS room_stats_current ( + room_id TEXT NOT NULL PRIMARY KEY, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + + +-- represents HISTORICAL room statistics for a room +CREATE TABLE IF NOT EXISTS room_stats_historical ( + room_id TEXT NOT NULL, + -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms). + -- Note that end_ts is quantised. + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + current_state_events INT NOT NULL, + total_events INT NOT NULL, + total_event_bytes BIGINT NOT NULL, + joined_members INT NOT NULL, + invited_members INT NOT NULL, + left_members INT NOT NULL, + banned_members INT NOT NULL, + + PRIMARY KEY (room_id, end_ts) +); + +-- We use this index to speed up deletion of ancient room stats. +CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts); + +-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular room.) + + +-- represents PRESENT statistics for a user +-- only holds absolute fields +CREATE TABLE IF NOT EXISTS user_stats_current ( + user_id TEXT NOT NULL PRIMARY KEY, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + -- If initial stats regen is still to be performed: NULL + -- If initial stats regen has been performed: the maximum delta stream + -- position that this row takes into account. + completed_delta_stream_id BIGINT +); + +-- represents HISTORICAL statistics for a user +CREATE TABLE IF NOT EXISTS user_stats_historical ( + user_id TEXT NOT NULL, + end_ts BIGINT NOT NULL, + bucket_size INT NOT NULL, + + public_rooms INT NOT NULL, + private_rooms INT NOT NULL, + + PRIMARY KEY (user_id, end_ts) +); + +-- We use this index to speed up deletion of ancient user stats. +CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts); + +-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that +-- out for us. (We would want it to review stats for a particular user.) + +-- Also rename room_state to room_stats_state to make its ownership clear. +ALTER TABLE room_state RENAME TO room_stats_state; diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres new file mode 100644 index 0000000000..0519fcff79 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres @@ -0,0 +1,24 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- These partial indices helps us with finding incomplete stats row +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (room_id) + WHERE completed_delta_stream_id IS NULL; + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (user_id) + WHERE completed_delta_stream_id IS NULL; + diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite new file mode 100644 index 0000000000..181f4ec5b9 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite @@ -0,0 +1,27 @@ +/* Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +-- even though SQLite >= 3.8 can support partial indices, we won't enable +-- them, in case the SQLite database may be later used on another system. +-- It's also the case that SQLite is only likely to be used in small +-- deployments or testing, where the optimisations gained by use of a +-- partial index are not a big concern. + +CREATE INDEX IF NOT EXISTS room_stats_not_complete + ON room_stats_current (completed_delta_stream_id, room_id); + +CREATE INDEX IF NOT EXISTS user_stats_not_complete + ON user_stats_current (completed_delta_stream_id, user_id); + diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index f659f96551..8c1eaaa10b 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- # Copyright 2018, 2019 New Vector Ltd +# Copyright 2019 The Matrix.org Foundation C.I.C. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -14,17 +15,20 @@ # limitations under the License. import logging +from itertools import chain -from twisted.internet import defer +from twisted.internet.defer import DeferredLock -from synapse.api.constants import EventTypes, Membership -from synapse.storage.prepare_database import get_statements +from synapse.storage import PostgresEngine from synapse.storage.state_deltas import StateDeltasStore from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) # these fields track absolutes (e.g. total number of rooms on the server) +# You can think of these as Prometheus Gauges. +# You can draw these stats on a line graph. +# Example: number of users in a room ABSOLUTE_STATS_FIELDS = { "room": ( "current_state_events", @@ -32,14 +36,18 @@ ABSOLUTE_STATS_FIELDS = { "invited_members", "left_members", "banned_members", - "state_events", + "total_events", + "total_event_bytes", ), "user": ("public_rooms", "private_rooms"), } -TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} +# these fields are per-timeslice and so should be reset to 0 upon a new slice +# You can draw these stats on a histogram. +# Example: number of events sent locally during a time slice +PER_SLICE_FIELDS = {"room": (), "user": ()} -TEMP_TABLE = "_temp_populate_stats" +TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} class StatsStore(StateDeltasStore): @@ -51,294 +59,111 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.register_background_update_handler( - "populate_stats_createtables", self._populate_stats_createtables - ) - self.register_background_update_handler( - "populate_stats_process_rooms", self._populate_stats_process_rooms - ) - self.register_background_update_handler( - "populate_stats_cleanup", self._populate_stats_cleanup - ) - - @defer.inlineCallbacks - def _populate_stats_createtables(self, progress, batch_size): - - if not self.stats_enabled: - yield self._end_background_update("populate_stats_createtables") - return 1 - - # TODO dev only - yield self.delete_all_stats() - - # Get all the rooms that we want to process. - def _make_staging_area(txn): - # Create the temporary tables - stmts = get_statements( - """ - -- We just recreate the table, we'll be reinserting the - -- correct entries again later anyway. - DROP TABLE IF EXISTS {temp}_rooms; - - CREATE TABLE IF NOT EXISTS {temp}_rooms( - room_id TEXT NOT NULL, - events BIGINT NOT NULL - ); - - CREATE INDEX {temp}_rooms_events - ON {temp}_rooms(events); - CREATE INDEX {temp}_rooms_id - ON {temp}_rooms(room_id); - """.format( - temp=TEMP_TABLE - ).splitlines() - ) + self.stats_delta_processing_lock = DeferredLock() - for statement in stmts: - txn.execute(statement) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) - - # Get rooms we want to process from the database, only adding - # those that we haven't (i.e. those not in room_stats_earliest_token) - sql = """ - INSERT INTO %s_rooms (room_id, events) - SELECT c.room_id, count(*) FROM current_state_events AS c - LEFT JOIN room_stats_earliest_token AS t USING (room_id) - WHERE t.room_id IS NULL - GROUP BY c.room_id - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) + self.register_noop_background_update("populate_stats_createtables") + self.register_noop_background_update("populate_stats_process_rooms") + self.register_noop_background_update("populate_stats_cleanup") - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.runInteraction("populate_stats_temp_build", _make_staging_area) - yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) - self.get_earliest_token_for_room_stats.invalidate_all() + def quantise_stats_time(self, ts): + """ + Quantises a timestamp to be a multiple of the bucket size. - yield self._end_background_update("populate_stats_createtables") - return 1 + Args: + ts (int): the timestamp to quantise, in milliseconds since the Unix + Epoch - @defer.inlineCallbacks - def _populate_stats_cleanup(self, progress, batch_size): + Returns: + int: a timestamp which + - is divisible by the bucket size; + - is no later than `ts`; and + - is the largest such timestamp. """ - Update the user directory stream position, then clean up the old tables. + return (ts // self.stats_bucket_size) * self.stats_bucket_size + + def get_stats_positions(self, for_initial_processor=False): """ - if not self.stats_enabled: - yield self._end_background_update("populate_stats_cleanup") - return 1 + Returns the stats processor positions. - position = yield self._simple_select_one_onecol( - TEMP_TABLE + "_position", None, "position" + Args: + for_initial_processor (bool, optional): If true, returns the position + promised by the latest stats regeneration, rather than the current + incremental processor's position. + Otherwise (if false), return the incremental processor's position. + + Returns (dict): + Dict containing :- + state_delta_stream_id: stream_id of last-processed state delta + total_events_min_stream_ordering: stream_ordering of latest-processed + backfilled event, in the context of total_events counting. + total_events_max_stream_ordering: stream_ordering of latest-processed + non-backfilled event, in the context of total_events counting. + """ + return self._simple_select_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), + desc="stats_incremental_position", ) - yield self.update_stats_stream_pos(position) - - def _delete_staging_area(txn): - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") - - yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) - - yield self._end_background_update("populate_stats_cleanup") - return 1 - - @defer.inlineCallbacks - def _populate_stats_process_rooms(self, progress, batch_size): - if not self.stats_enabled: - yield self._end_background_update("populate_stats_process_rooms") - return 1 - - # If we don't have progress filed, delete everything. - if not progress: - yield self.delete_all_stats() - - def _get_next_batch(txn): - # Only fetch 250 rooms, so we don't fetch too many at once, even - # if those 250 rooms have less than batch_size state events. - sql = """ - SELECT room_id, events FROM %s_rooms - ORDER BY events DESC - LIMIT 250 - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) - rooms_to_work_on = txn.fetchall() - - if not rooms_to_work_on: - return None - - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - progress["remaining"] = txn.fetchone()[0] - - return rooms_to_work_on + def _get_stats_positions_txn(self, txn, for_initial_processor=False): + """ + See L{get_stats_positions}. - rooms_to_work_on = yield self.runInteraction( - "populate_stats_temp_read", _get_next_batch + Args: + txn (cursor): Database cursor + """ + return self._simple_select_one_txn( + txn=txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + retcols=( + "state_delta_stream_id", + "total_events_min_stream_ordering", + "total_events_max_stream_ordering", + ), ) - # No more rooms -- complete the transaction. - if not rooms_to_work_on: - yield self._end_background_update("populate_stats_process_rooms") - return 1 + def update_stats_positions(self, positions, for_initial_processor=False): + """ + Updates the stats processor positions. - logger.info( - "Processing the next %d rooms of %d remaining", - len(rooms_to_work_on), - progress["remaining"], + Args: + positions: See L{get_stats_positions} + for_initial_processor: See L{get_stats_positions} + """ + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one( + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, + desc="update_stats_incremental_position", ) - # Number of state events we've processed by going through each room - processed_event_count = 0 - - for room_id, event_count in rooms_to_work_on: - - current_state_ids = yield self.get_current_state_ids(room_id) - - join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) - history_visibility_id = current_state_ids.get( - (EventTypes.RoomHistoryVisibility, "") - ) - encryption_id = current_state_ids.get((EventTypes.RoomEncryption, "")) - name_id = current_state_ids.get((EventTypes.Name, "")) - topic_id = current_state_ids.get((EventTypes.Topic, "")) - avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) - canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) - - event_ids = [ - join_rules_id, - history_visibility_id, - encryption_id, - name_id, - topic_id, - avatar_id, - canonical_alias_id, - ] - - state_events = yield self.get_events( - [ev for ev in event_ids if ev is not None] - ) - - def _get_or_none(event_id, arg): - event = state_events.get(event_id) - if event: - return event.content.get(arg) - return None - - yield self.update_room_state( - room_id, - { - "join_rules": _get_or_none(join_rules_id, "join_rule"), - "history_visibility": _get_or_none( - history_visibility_id, "history_visibility" - ), - "encryption": _get_or_none(encryption_id, "algorithm"), - "name": _get_or_none(name_id, "name"), - "topic": _get_or_none(topic_id, "topic"), - "avatar": _get_or_none(avatar_id, "url"), - "canonical_alias": _get_or_none(canonical_alias_id, "alias"), - }, - ) - - now = self.hs.get_reactor().seconds() - - # quantise time to the nearest bucket - now = (now // self.stats_bucket_size) * self.stats_bucket_size - - def _fetch_data(txn): - - # Get the current token of the room - current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) - - current_state_events = len(current_state_ids) - - membership_counts = self._get_user_counts_in_room_txn(txn, room_id) - - total_state_events = self._get_total_state_event_counts_txn( - txn, room_id - ) - - self._update_stats_txn( - txn, - "room", - room_id, - now, - { - "bucket_size": self.stats_bucket_size, - "current_state_events": current_state_events, - "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get(Membership.INVITE, 0), - "left_members": membership_counts.get(Membership.LEAVE, 0), - "banned_members": membership_counts.get(Membership.BAN, 0), - "state_events": total_state_events, - }, - ) - self._simple_insert_txn( - txn, - "room_stats_earliest_token", - {"room_id": room_id, "token": current_token}, - ) - - # We've finished a room. Delete it from the table. - self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id} - ) - - yield self.runInteraction("update_room_stats", _fetch_data) - - # Update the remaining counter. - progress["remaining"] -= 1 - yield self.runInteraction( - "populate_stats", - self._background_update_progress_txn, - "populate_stats_process_rooms", - progress, - ) - - processed_event_count += event_count - - if processed_event_count > batch_size: - # Don't process any more rooms, we've hit our batch size. - return processed_event_count - - return processed_event_count - - def delete_all_stats(self): + def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False): """ - Delete all statistics records. + See L{update_stats_positions} """ - - def _delete_all_stats_txn(txn): - txn.execute("DELETE FROM room_state") - txn.execute("DELETE FROM room_stats") - txn.execute("DELETE FROM room_stats_earliest_token") - txn.execute("DELETE FROM user_stats") - - return self.runInteraction("delete_all_stats", _delete_all_stats_txn) - - def get_stats_stream_pos(self): - return self._simple_select_one_onecol( - table="stats_stream_pos", - keyvalues={}, - retcol="stream_id", - desc="stats_stream_pos", - ) - - def update_stats_stream_pos(self, stream_id): - return self._simple_update_one( - table="stats_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_stats_stream_pos", + if positions is None: + positions = { + "state_delta_stream_id": None, + "total_events_min_stream_ordering": None, + "total_events_max_stream_ordering": None, + } + return self._simple_update_one_txn( + txn, + table="stats_incremental_position", + keyvalues={"is_background_contract": for_initial_processor}, + updatevalues=positions, ) def update_room_state(self, room_id, fields): @@ -364,42 +189,14 @@ class StatsStore(StateDeltasStore): fields[col] = None return self._simple_upsert( - table="room_state", + table="room_stats_state", keyvalues={"room_id": room_id}, values=fields, desc="update_room_state", ) - def get_deltas_for_room(self, room_id, start, size=100): - """ - Get statistics deltas for a given room. - - Args: - room_id (str) - start (int): Pagination start. Number of entries, not timestamp. - size (int): How many entries to return. - - Returns: - Deferred[list[dict]], where the dict has the keys of - ABSOLUTE_STATS_FIELDS["room"] and "ts". - """ - return self._simple_select_list_paginate( - "room_stats", - {"room_id": room_id}, - "ts", - start, - size, - retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), - order_direction="DESC", - ) - - def get_all_room_state(self): - return self._simple_select_list( - "room_state", None, retcols=("name", "topic", "canonical_alias") - ) - @cached() - def get_earliest_token_for_room_stats(self, room_id): + def get_earliest_token_for_stats(self, stats_type, id): """ Fetch the "earliest token". This is used by the room stats delta processor to ignore deltas that have been processed between the @@ -409,79 +206,410 @@ class StatsStore(StateDeltasStore): Returns: Deferred[int] """ + table, id_col = TYPE_TO_TABLE[stats_type] + return self._simple_select_one_onecol( - "room_stats_earliest_token", - {"room_id": room_id}, - retcol="token", + "%s_current" % (table,), + {id_col: id}, + retcol="completed_delta_stream_id", allow_none=True, ) - def update_stats(self, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert( - table=table, - keyvalues={id_col: stats_id, "ts": ts}, - values=fields, - desc="update_stats", - ) + def update_stats_delta( + self, ts, stats_type, stats_id, fields, complete_with_stream_id=None + ): + """ + Updates the statistics for a subject, with a delta (difference/relative + change). + + Args: + ts (int): timestamp of the change + stats_type (str): "room" or "user" – the kind of subject + stats_id (str): the subject's ID (room ID or user ID) + fields (dict[str, int]): Deltas of stats values. + complete_with_stream_id (int, optional): + If supplied, converts an incomplete row into a complete row, + with the supplied stream_id marked as the stream_id where the + row was completed. + """ - def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert_txn( - txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields + return self.runInteraction( + "update_stats_delta", + self._update_stats_delta_txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=complete_with_stream_id, ) - def update_stats_delta(self, ts, stats_type, stats_id, field, value): - def _update_stats_delta(txn): - table, id_col = TYPE_TO_ROOM[stats_type] - - sql = ( - "SELECT * FROM %s" - " WHERE %s=? and ts=(" - " SELECT MAX(ts) FROM %s" - " WHERE %s=?" - ")" - ) % (table, id_col, table, id_col) - txn.execute(sql, (stats_id, stats_id)) - rows = self.cursor_to_dict(txn) - if len(rows) == 0: - # silently skip as we don't have anything to apply a delta to yet. - # this tries to minimise any race between the initial sync and - # subsequent deltas arriving. - return - - current_ts = ts - latest_ts = rows[0]["ts"] - if current_ts < latest_ts: - # This one is in the past, but we're just encountering it now. - # Mark it as part of the current bucket. - current_ts = latest_ts - elif ts != latest_ts: - # we have to copy our absolute counters over to the new entry. - values = { - key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] - } - values[id_col] = stats_id - values["ts"] = ts - values["bucket_size"] = self.stats_bucket_size - - self._simple_insert_txn(txn, table=table, values=values) - - # actually update the new value - if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: - self._simple_update_txn( - txn, - table=table, - keyvalues={id_col: stats_id, "ts": current_ts}, - updatevalues={field: value}, + def _update_stats_delta_txn( + self, + txn, + ts, + stats_type, + stats_id, + fields, + complete_with_stream_id=None, + absolute_field_overrides=None, + ): + """ + See L{update_stats_delta} + Additional Args: + absolute_field_overrides (dict[str, int]): Current stats values + (i.e. not deltas) of absolute fields. + Does not work with per-slice fields. + """ + + if absolute_field_overrides is None: + absolute_field_overrides = {} + + table, id_col = TYPE_TO_TABLE[stats_type] + + quantised_ts = self.quantise_stats_time(int(ts)) + end_ts = quantised_ts + self.stats_bucket_size + + abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type] + slice_field_names = PER_SLICE_FIELDS[stats_type] + for field in chain(fields.keys(), absolute_field_overrides.keys()): + if field not in abs_field_names and field not in slice_field_names: + # guard against potential SQL injection dodginess + raise ValueError( + "%s is not a recognised field" + " for stats type %s" % (field, stats_type) ) + + # only absolute stats fields are tracked in the `_current` stats tables, + # so those are the only ones that we process deltas for when + # we upsert against the `_current` table. + + # This calculates the deltas (`field = field + ?` values) + # for absolute fields, + # * defaulting to 0 if not specified + # (required for the INSERT part of upserting to work) + # * omitting overrides specified in `absolute_field_overrides` + deltas_of_absolute_fields = { + key: fields.get(key, 0) + for key in abs_field_names + if key not in absolute_field_overrides + } + + if complete_with_stream_id is not None: + absolute_field_overrides = absolute_field_overrides.copy() + absolute_field_overrides[ + "completed_delta_stream_id" + ] = complete_with_stream_id + + # first upsert the `_current` table + self._upsert_with_additive_relatives_txn( + txn=txn, + table=table + "_current", + keyvalues={id_col: stats_id}, + absolutes=absolute_field_overrides, + additive_relatives=deltas_of_absolute_fields, + ) + + if self.has_completed_background_updates(): + # TODO want to check specifically for stats regenerator, not all + # background updates… + # then upsert the `_historical` table. + # we don't support absolute_fields for per-slice fields as it makes + # no sense. + per_slice_additive_relatives = { + key: fields.get(key, 0) for key in slice_field_names + } + self._upsert_copy_from_table_with_additive_relatives_txn( + txn=txn, + into_table=table + "_historical", + keyvalues={id_col: stats_id}, + extra_dst_insvalues={"bucket_size": self.stats_bucket_size}, + extra_dst_keyvalues={"end_ts": end_ts}, + additive_relatives=per_slice_additive_relatives, + src_table=table + "_current", + copy_columns=abs_field_names, + additional_where=" AND completed_delta_stream_id IS NOT NULL", + ) + + def _upsert_with_additive_relatives_txn( + self, txn, table, keyvalues, absolutes, additive_relatives + ): + """Used to update values in the stats tables. + + Args: + txn: Transaction + table (str): Table name + keyvalues (dict[str, any]): Row-identifying key values + absolutes (dict[str, any]): Absolute (set) fields + additive_relatives (dict[str, int]): Fields that will be added onto + if existing row present. + """ + if self.database_engine.can_native_upsert: + absolute_updates = [ + "%(field)s = EXCLUDED.%(field)s" % {"field": field} + for field in absolutes.keys() + ] + + relative_updates = [ + "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s" + % {"table": table, "field": field} + for field in additive_relatives.keys() + ] + + insert_cols = [] + qargs = [] + + for (key, val) in chain( + keyvalues.items(), absolutes.items(), additive_relatives.items() + ): + insert_cols.append(key) + qargs.append(val) + + sql = """ + INSERT INTO %(table)s (%(insert_cols_cs)s) + VALUES (%(insert_vals_qs)s) + ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s + """ % { + "table": table, + "insert_cols_cs": ", ".join(insert_cols), + "insert_vals_qs": ", ".join( + ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives)) + ), + "key_columns": ", ".join(keyvalues), + "updates": ", ".join(chain(absolute_updates, relative_updates)), + } + + txn.execute(sql, qargs) + else: + self.database_engine.lock_table(txn, table) + retcols = list(chain(absolutes.keys(), additive_relatives.keys())) + current_row = self._simple_select_one_txn( + txn, table, keyvalues, retcols, allow_none=True + ) + if current_row is None: + merged_dict = {**keyvalues, **absolutes, **additive_relatives} + self._simple_insert_txn(txn, table, merged_dict) else: - sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( - table, - field, - field, - id_col, + for (key, val) in additive_relatives.items(): + current_row[key] += val + current_row.update(absolutes) + self._simple_update_one_txn(txn, table, keyvalues, current_row) + + def _upsert_copy_from_table_with_additive_relatives_txn( + self, + txn, + into_table, + keyvalues, + extra_dst_keyvalues, + extra_dst_insvalues, + additive_relatives, + src_table, + copy_columns, + additional_where="", + ): + """ + Args: + txn: Transaction + into_table (str): The destination table to UPSERT the row into + keyvalues (dict[str, any]): Row-identifying key values + extra_dst_keyvalues (dict[str, any]): Additional keyvalues + for `into_table`. + extra_dst_insvalues (dict[str, any]): Additional values to insert + on new row creation for `into_table`. + additive_relatives (dict[str, any]): Fields that will be added onto + if existing row present. (Must be disjoint from copy_columns.) + src_table (str): The source table to copy from + copy_columns (iterable[str]): The list of columns to copy + additional_where (str): Additional SQL for where (prefix with AND + if using). + """ + if self.database_engine.can_native_upsert: + ins_columns = chain( + keyvalues, + copy_columns, + additive_relatives, + extra_dst_keyvalues, + extra_dst_insvalues, + ) + sel_exprs = chain( + keyvalues, + copy_columns, + ( + "?" + for _ in chain( + additive_relatives, extra_dst_keyvalues, extra_dst_insvalues + ) + ), + ) + keyvalues_where = ("%s = ?" % f for f in keyvalues) + + sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns) + sets_ar = ( + "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) + for f in additive_relatives + ) + + sql = """ + INSERT INTO %(into_table)s (%(ins_columns)s) + SELECT %(sel_exprs)s + FROM %(src_table)s + WHERE %(keyvalues_where)s %(additional_where)s + ON CONFLICT (%(keyvalues)s) + DO UPDATE SET %(sets)s + """ % { + "into_table": into_table, + "ins_columns": ", ".join(ins_columns), + "sel_exprs": ", ".join(sel_exprs), + "keyvalues_where": " AND ".join(keyvalues_where), + "src_table": src_table, + "keyvalues": ", ".join( + chain(keyvalues.keys(), extra_dst_keyvalues.keys()) + ), + "sets": ", ".join(chain(sets_cc, sets_ar)), + "additional_where": additional_where, + } + + qargs = list( + chain( + additive_relatives.values(), + extra_dst_keyvalues.values(), + extra_dst_insvalues.values(), + keyvalues.values(), ) - txn.execute(sql, (value, stats_id, current_ts)) + ) + txn.execute(sql, qargs) + else: + self.database_engine.lock_table(txn, into_table) + src_row = self._simple_select_one_txn( + txn, src_table, keyvalues, copy_columns + ) + dest_current_row = self._simple_select_one_txn( + txn, + into_table, + keyvalues, + retcols=list(chain(additive_relatives.keys(), copy_columns)), + allow_none=True, + ) + + if dest_current_row is None: + merged_dict = { + **keyvalues, + **extra_dst_keyvalues, + **extra_dst_insvalues, + **src_row, + **additive_relatives, + } + self._simple_insert_txn(txn, into_table, merged_dict) + else: + for (key, val) in additive_relatives.items(): + src_row[key] = dest_current_row[key] + val + self._simple_update_txn(txn, into_table, keyvalues, src_row) + + def incremental_update_room_total_events_and_bytes(self, in_positions): + """ + Counts the number of events and total event bytes per-room and then adds + these to the respective total_events and total_event_bytes room counts. + + Args: + in_positions (dict): Positions, + as retrieved from L{get_stats_positions}. + + Returns (Deferred[tuple[dict, bool]]): + First element (dict): + The new positions. Note that this is for reference only – + the new positions WILL be committed by this function. + Second element (bool): + true iff there was a change to the positions, false otherwise + """ + + def incremental_update_total_events_and_bytes_txn(txn): + positions = in_positions.copy() - return self.runInteraction("update_stats_delta", _update_stats_delta) + max_pos = self.get_room_max_stream_ordering() + min_pos = self.get_room_min_stream_ordering() + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=positions["total_events_max_stream_ordering"], + high_pos=max_pos, + ) + + self.update_total_event_and_bytes_count_between_txn( + txn, + low_pos=min_pos, + high_pos=positions["total_events_min_stream_ordering"], + ) + + if ( + positions["total_events_max_stream_ordering"] != max_pos + or positions["total_events_min_stream_ordering"] != min_pos + ): + positions["total_events_max_stream_ordering"] = max_pos + positions["total_events_min_stream_ordering"] = min_pos + + self._update_stats_positions_txn(txn, positions) + + return positions, True + else: + return positions, False + + return self.runInteraction( + "stats_incremental_total_events_and_bytes", + incremental_update_total_events_and_bytes_txn, + ) + + def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos): + """ + Updates the total_events and total_event_bytes counts for rooms, + in a range of stream_orderings. + + Inclusivity of low_pos and high_pos is dependent upon their signs. + This makes it intuitive to use this function for both backfilled + and non-backfilled events. + + Examples: + (low, high) → (kind) + (3, 7) → 3 <git … <= 7 (normal-filled; low already processed before) + (-4, -2) → -4 <= … < -2 (backfilled; high already processed before) + (-7, 7) → -7 <= … <= 7 (both) + + Args: + txn: Database transaction. + low_pos: Low stream ordering + high_pos: High stream ordering + """ + + if low_pos >= high_pos: + # nothing to do here. + return + + now = self.hs.clock.time_msec() + + # we choose comparators based on the signs + low_comparator = "<=" if low_pos < 0 else "<" + high_comparator = "<" if high_pos < 0 else "<=" + + if isinstance(self.database_engine, PostgresEngine): + new_bytes_expression = "OCTET_LENGTH(json)" + else: + new_bytes_expression = "LENGTH(CAST(json AS BLOB))" + + sql = """ + SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes + FROM events INNER JOIN event_json USING (event_id) + WHERE ? %s stream_ordering AND stream_ordering %s ? + GROUP BY room_id + """ % ( + low_comparator, + high_comparator, + new_bytes_expression, + ) + + txn.execute(sql, (low_pos, high_pos)) + + for room_id, new_events, new_bytes in txn.fetchall(): + self._update_stats_delta_txn( + txn, + now, + "room", + room_id, + {"total_events": new_events, "total_event_bytes": new_bytes}, + ) diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py deleted file mode 100644 index a8b858eb4f..0000000000 --- a/tests/handlers/test_stats.py +++ /dev/null @@ -1,304 +0,0 @@ -# -*- coding: utf-8 -*- -# Copyright 2019 New Vector Ltd -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from mock import Mock - -from twisted.internet import defer - -from synapse.api.constants import EventTypes, Membership -from synapse.rest import admin -from synapse.rest.client.v1 import login, room - -from tests import unittest - - -class StatsRoomTests(unittest.HomeserverTestCase): - - servlets = [ - admin.register_servlets_for_client_rest_resource, - room.register_servlets, - login.register_servlets, - ] - - def prepare(self, reactor, clock, hs): - - self.store = hs.get_datastore() - self.handler = self.hs.get_stats_handler() - - def _add_background_updates(self): - """ - Add the background updates we need to run. - """ - # Ugh, have to reset this flag - self.store._all_done = False - - self.get_success( - self.store._simple_insert( - "background_updates", - {"update_name": "populate_stats_createtables", "progress_json": "{}"}, - ) - ) - self.get_success( - self.store._simple_insert( - "background_updates", - { - "update_name": "populate_stats_process_rooms", - "progress_json": "{}", - "depends_on": "populate_stats_createtables", - }, - ) - ) - self.get_success( - self.store._simple_insert( - "background_updates", - { - "update_name": "populate_stats_cleanup", - "progress_json": "{}", - "depends_on": "populate_stats_process_rooms", - }, - ) - ) - - def test_initial_room(self): - """ - The background updates will build the table from scratch. - """ - r = self.get_success(self.store.get_all_room_state()) - self.assertEqual(len(r), 0) - - # Disable stats - self.hs.config.stats_enabled = False - self.handler.stats_enabled = False - - u1 = self.register_user("u1", "pass") - u1_token = self.login("u1", "pass") - - room_1 = self.helper.create_room_as(u1, tok=u1_token) - self.helper.send_state( - room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token - ) - - # Stats disabled, shouldn't have done anything - r = self.get_success(self.store.get_all_room_state()) - self.assertEqual(len(r), 0) - - # Enable stats - self.hs.config.stats_enabled = True - self.handler.stats_enabled = True - - # Do the initial population of the user directory via the background update - self._add_background_updates() - - while not self.get_success(self.store.has_completed_background_updates()): - self.get_success(self.store.do_next_background_update(100), by=0.1) - - r = self.get_success(self.store.get_all_room_state()) - - self.assertEqual(len(r), 1) - self.assertEqual(r[0]["topic"], "foo") - - def test_initial_earliest_token(self): - """ - Ingestion via notify_new_event will ignore tokens that the background - update have already processed. - """ - self.reactor.advance(86401) - - self.hs.config.stats_enabled = False - self.handler.stats_enabled = False - - u1 = self.register_user("u1", "pass") - u1_token = self.login("u1", "pass") - - u2 = self.register_user("u2", "pass") - u2_token = self.login("u2", "pass") - - u3 = self.register_user("u3", "pass") - u3_token = self.login("u3", "pass") - - room_1 = self.helper.create_room_as(u1, tok=u1_token) - self.helper.send_state( - room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token - ) - - # Begin the ingestion by creating the temp tables. This will also store - # the position that the deltas should begin at, once they take over. - self.hs.config.stats_enabled = True - self.handler.stats_enabled = True - self.store._all_done = False - self.get_success(self.store.update_stats_stream_pos(None)) - - self.get_success( - self.store._simple_insert( - "background_updates", - {"update_name": "populate_stats_createtables", "progress_json": "{}"}, - ) - ) - - while not self.get_success(self.store.has_completed_background_updates()): - self.get_success(self.store.do_next_background_update(100), by=0.1) - - # Now, before the table is actually ingested, add some more events. - self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token) - self.helper.join(room=room_1, user=u2, tok=u2_token) - - # Now do the initial ingestion. - self.get_success( - self.store._simple_insert( - "background_updates", - {"update_name": "populate_stats_process_rooms", "progress_json": "{}"}, - ) - ) - self.get_success( - self.store._simple_insert( - "background_updates", - { - "update_name": "populate_stats_cleanup", - "progress_json": "{}", - "depends_on": "populate_stats_process_rooms", - }, - ) - ) - - self.store._all_done = False - while not self.get_success(self.store.has_completed_background_updates()): - self.get_success(self.store.do_next_background_update(100), by=0.1) - - self.reactor.advance(86401) - - # Now add some more events, triggering ingestion. Because of the stream - # position being set to before the events sent in the middle, a simpler - # implementation would reprocess those events, and say there were four - # users, not three. - self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token) - self.helper.join(room=room_1, user=u3, tok=u3_token) - - # Get the deltas! There should be two -- day 1, and day 2. - r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) - - # The oldest has 2 joined members - self.assertEqual(r[-1]["joined_members"], 2) - - # The newest has 3 - self.assertEqual(r[0]["joined_members"], 3) - - def test_incorrect_state_transition(self): - """ - If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to - (JOIN, INVITE, LEAVE, BAN), an error is raised. - """ - events = { - "a1": {"membership": Membership.LEAVE}, - "a2": {"membership": "not a real thing"}, - } - - def get_event(event_id, allow_none=True): - m = Mock() - m.content = events[event_id] - d = defer.Deferred() - self.reactor.callLater(0.0, d.callback, m) - return d - - def get_received_ts(event_id): - return defer.succeed(1) - - self.store.get_received_ts = get_received_ts - self.store.get_event = get_event - - deltas = [ - { - "type": EventTypes.Member, - "state_key": "some_user", - "room_id": "room", - "event_id": "a1", - "prev_event_id": "a2", - "stream_id": 60, - } - ] - - f = self.get_failure(self.handler._handle_deltas(deltas), ValueError) - self.assertEqual( - f.value.args[0], "'not a real thing' is not a valid prev_membership" - ) - - # And the other way... - deltas = [ - { - "type": EventTypes.Member, - "state_key": "some_user", - "room_id": "room", - "event_id": "a2", - "prev_event_id": "a1", - "stream_id": 100, - } - ] - - f = self.get_failure(self.handler._handle_deltas(deltas), ValueError) - self.assertEqual( - f.value.args[0], "'not a real thing' is not a valid membership" - ) - - def test_redacted_prev_event(self): - """ - If the prev_event does not exist, then it is assumed to be a LEAVE. - """ - u1 = self.register_user("u1", "pass") - u1_token = self.login("u1", "pass") - - room_1 = self.helper.create_room_as(u1, tok=u1_token) - - # Do the initial population of the user directory via the background update - self._add_background_updates() - - while not self.get_success(self.store.has_completed_background_updates()): - self.get_success(self.store.do_next_background_update(100), by=0.1) - - events = {"a1": None, "a2": {"membership": Membership.JOIN}} - - def get_event(event_id, allow_none=True): - if events.get(event_id): - m = Mock() - m.content = events[event_id] - else: - m = None - d = defer.Deferred() - self.reactor.callLater(0.0, d.callback, m) - return d - - def get_received_ts(event_id): - return defer.succeed(1) - - self.store.get_received_ts = get_received_ts - self.store.get_event = get_event - - deltas = [ - { - "type": EventTypes.Member, - "state_key": "some_user:test", - "room_id": room_1, - "event_id": "a2", - "prev_event_id": "a1", - "stream_id": 100, - } - ] - - # Handle our fake deltas, which has a user going from LEAVE -> JOIN. - self.get_success(self.handler._handle_deltas(deltas)) - - # One delta, with two joined members -- the room creator, and our fake - # user. - r = self.get_success(self.store.get_deltas_for_room(room_1, 0)) - self.assertEqual(len(r), 1) - self.assertEqual(r[0]["joined_members"], 2) |