diff options
Diffstat (limited to 'synapse')
-rw-r--r-- | synapse/handlers/stats.py | 280 | ||||
-rw-r--r-- | synapse/storage/schema/delta/56/stats_separated1.sql | 33 | ||||
-rw-r--r-- | synapse/storage/stats.py | 412 |
3 files changed, 37 insertions, 688 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py index 4449da6669..7b1d1b4203 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py @@ -15,14 +15,7 @@ import logging -from twisted.internet import defer - -from synapse.api.constants import EventTypes, JoinRules, Membership from synapse.handlers.state_deltas import StateDeltasHandler -from synapse.metrics import event_processing_positions -from synapse.metrics.background_process_metrics import run_as_background_process -from synapse.types import UserID -from synapse.util.metrics import Measure logger = logging.getLogger(__name__) @@ -49,9 +42,6 @@ class StatsHandler(StateDeltasHandler): # The current position in the current_state_delta stream self.pos = None - # Guard to ensure we only process deltas one at a time - self._is_processing = False - if hs.config.stats_enabled: self.notifier.add_replication_callback(self.notify_new_event) @@ -62,272 +52,4 @@ class StatsHandler(StateDeltasHandler): def notify_new_event(self): """Called when there may be more deltas to process """ - if not self.hs.config.stats_enabled: - return - - if self._is_processing: - return - - @defer.inlineCallbacks - def process(): - try: - yield self._unsafe_process() - finally: - self._is_processing = False - - self._is_processing = True - run_as_background_process("stats.notify_new_event", process) - - @defer.inlineCallbacks - def _unsafe_process(self): - # If self.pos is None then means we haven't fetched it from DB - if self.pos is None: - self.pos = yield self.store.get_stats_stream_pos() - - # If still None then the initial background update hasn't happened yet - if self.pos is None: - return None - - # Loop round handling deltas until we're up to date - while True: - with Measure(self.clock, "stats_delta"): - deltas = yield self.store.get_current_state_deltas(self.pos) - if not deltas: - return - - logger.info("Handling %d state deltas", len(deltas)) - yield self._handle_deltas(deltas) - - self.pos = deltas[-1]["stream_id"] - yield self.store.update_stats_stream_pos(self.pos) - - event_processing_positions.labels("stats").set(self.pos) - - @defer.inlineCallbacks - def _handle_deltas(self, deltas): - """ - Called with the state deltas to process - """ - for delta in deltas: - typ = delta["type"] - state_key = delta["state_key"] - room_id = delta["room_id"] - event_id = delta["event_id"] - stream_id = delta["stream_id"] - prev_event_id = delta["prev_event_id"] - stream_pos = delta["stream_id"] - - logger.debug("Handling: %r %r, %s", typ, state_key, event_id) - - token = yield self.store.get_earliest_token_for_room_stats(room_id) - - # If the earliest token to begin from is larger than our current - # stream ID, skip processing this delta. - if token is not None and token >= stream_id: - logger.debug( - "Ignoring: %s as earlier than this room's initial ingestion event", - event_id, - ) - continue - - if event_id is None and prev_event_id is None: - # Errr... - continue - - event_content = {} - - if event_id is not None: - event = yield self.store.get_event(event_id, allow_none=True) - if event: - event_content = event.content or {} - - # We use stream_pos here rather than fetch by event_id as event_id - # may be None - now = yield self.store.get_received_ts_by_stream_pos(stream_pos) - - # quantise time to the nearest bucket - now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size - - if typ == EventTypes.Member: - # we could use _get_key_change here but it's a bit inefficient - # given we're not testing for a specific result; might as well - # just grab the prev_membership and membership strings and - # compare them. - prev_event_content = {} - if prev_event_id is not None: - prev_event = yield self.store.get_event( - prev_event_id, allow_none=True - ) - if prev_event: - prev_event_content = prev_event.content - - membership = event_content.get("membership", Membership.LEAVE) - prev_membership = prev_event_content.get("membership", Membership.LEAVE) - - if prev_membership == membership: - continue - - if prev_membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", -1 - ) - elif prev_membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", -1 - ) - elif prev_membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", -1 - ) - elif prev_membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", -1 - ) - else: - err = "%s is not a valid prev_membership" % (repr(prev_membership),) - logger.error(err) - raise ValueError(err) - - if membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, "room", room_id, "joined_members", +1 - ) - elif membership == Membership.INVITE: - yield self.store.update_stats_delta( - now, "room", room_id, "invited_members", +1 - ) - elif membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, "room", room_id, "left_members", +1 - ) - elif membership == Membership.BAN: - yield self.store.update_stats_delta( - now, "room", room_id, "banned_members", +1 - ) - else: - err = "%s is not a valid membership" % (repr(membership),) - logger.error(err) - raise ValueError(err) - - user_id = state_key - if self.is_mine_id(user_id): - # update user_stats as it's one of our users - public = yield self._is_public_room(room_id) - - if membership == Membership.LEAVE: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - -1, - ) - elif membership == Membership.JOIN: - yield self.store.update_stats_delta( - now, - "user", - user_id, - "public_rooms" if public else "private_rooms", - +1, - ) - - elif typ == EventTypes.Create: - # Newly created room. Add it with all blank portions. - yield self.store.update_room_state( - room_id, - { - "join_rules": None, - "history_visibility": None, - "encryption": None, - "name": None, - "topic": None, - "avatar": None, - "canonical_alias": None, - }, - ) - - elif typ == EventTypes.JoinRules: - yield self.store.update_room_state( - room_id, {"join_rules": event_content.get("join_rule")} - ) - - is_public = yield self._get_key_change( - prev_event_id, event_id, "join_rule", JoinRules.PUBLIC - ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) - - elif typ == EventTypes.RoomHistoryVisibility: - yield self.store.update_room_state( - room_id, - {"history_visibility": event_content.get("history_visibility")}, - ) - - is_public = yield self._get_key_change( - prev_event_id, event_id, "history_visibility", "world_readable" - ) - if is_public is not None: - yield self.update_public_room_stats(now, room_id, is_public) - - elif typ == EventTypes.Encryption: - yield self.store.update_room_state( - room_id, {"encryption": event_content.get("algorithm")} - ) - elif typ == EventTypes.Name: - yield self.store.update_room_state( - room_id, {"name": event_content.get("name")} - ) - elif typ == EventTypes.Topic: - yield self.store.update_room_state( - room_id, {"topic": event_content.get("topic")} - ) - elif typ == EventTypes.RoomAvatar: - yield self.store.update_room_state( - room_id, {"avatar": event_content.get("url")} - ) - elif typ == EventTypes.CanonicalAlias: - yield self.store.update_room_state( - room_id, {"canonical_alias": event_content.get("alias")} - ) - - @defer.inlineCallbacks - def update_public_room_stats(self, ts, room_id, is_public): - """ - Increment/decrement a user's number of public rooms when a room they are - in changes to/from public visibility. - - Args: - ts (int): Timestamp in seconds - room_id (str) - is_public (bool) - """ - # For now, blindly iterate over all local users in the room so that - # we can handle the whole problem of copying buckets over as needed - user_ids = yield self.store.get_users_in_room(room_id) - - for user_id in user_ids: - if self.hs.is_mine(UserID.from_string(user_id)): - yield self.store.update_stats_delta( - ts, "user", user_id, "public_rooms", +1 if is_public else -1 - ) - yield self.store.update_stats_delta( - ts, "user", user_id, "private_rooms", -1 if is_public else +1 - ) - - @defer.inlineCallbacks - def _is_public_room(self, room_id): - join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules) - history_visibility = yield self.state.get_current_state( - room_id, EventTypes.RoomHistoryVisibility - ) - - if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or ( - ( - history_visibility - and history_visibility.content.get("history_visibility") - == "world_readable" - ) - ): - return True - else: - return False + pass diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql new file mode 100644 index 0000000000..5b125d17b0 --- /dev/null +++ b/synapse/storage/schema/delta/56/stats_separated1.sql @@ -0,0 +1,33 @@ +/* Copyright 2018 New Vector Ltd + * Copyright 2019 The Matrix.org Foundation C.I.C. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +----- First clean up from previous versions of room stats. + +-- First remove old stats stuff +DROP TABLE IF EXISTS room_stats; +DROP TABLE IF EXISTS user_stats; +DROP TABLE IF EXISTS room_stats_earliest_tokens; +DROP TABLE IF EXISTS _temp_populate_stats_position; +DROP TABLE IF EXISTS _temp_populate_stats_rooms; +DROP TABLE IF EXISTS stats_stream_pos; + +-- Unschedule old background updates if they're still scheduled +DELETE FROM background_updates WHERE update_name IN ( + 'populate_stats_createtables', + 'populate_stats_process_rooms', + 'populate_stats_cleanup' +); diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py index e13efed417..123c1ae220 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py @@ -15,12 +15,7 @@ import logging -from twisted.internet import defer - -from synapse.api.constants import EventTypes, Membership -from synapse.storage.prepare_database import get_statements from synapse.storage.state_deltas import StateDeltasStore -from synapse.util.caches.descriptors import cached logger = logging.getLogger(__name__) @@ -39,8 +34,6 @@ ABSOLUTE_STATS_FIELDS = { TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")} -TEMP_TABLE = "_temp_populate_stats" - class StatsStore(StateDeltasStore): def __init__(self, db_conn, hs): @@ -51,292 +44,9 @@ class StatsStore(StateDeltasStore): self.stats_enabled = hs.config.stats_enabled self.stats_bucket_size = hs.config.stats_bucket_size - self.register_background_update_handler( - "populate_stats_createtables", self._populate_stats_createtables - ) - self.register_background_update_handler( - "populate_stats_process_rooms", self._populate_stats_process_rooms - ) - self.register_background_update_handler( - "populate_stats_cleanup", self._populate_stats_cleanup - ) - - @defer.inlineCallbacks - def _populate_stats_createtables(self, progress, batch_size): - - if not self.stats_enabled: - yield self._end_background_update("populate_stats_createtables") - return 1 - - # Get all the rooms that we want to process. - def _make_staging_area(txn): - # Create the temporary tables - stmts = get_statements( - """ - -- We just recreate the table, we'll be reinserting the - -- correct entries again later anyway. - DROP TABLE IF EXISTS {temp}_rooms; - - CREATE TABLE IF NOT EXISTS {temp}_rooms( - room_id TEXT NOT NULL, - events BIGINT NOT NULL - ); - - CREATE INDEX {temp}_rooms_events - ON {temp}_rooms(events); - CREATE INDEX {temp}_rooms_id - ON {temp}_rooms(room_id); - """.format( - temp=TEMP_TABLE - ).splitlines() - ) - - for statement in stmts: - txn.execute(statement) - - sql = ( - "CREATE TABLE IF NOT EXISTS " - + TEMP_TABLE - + "_position(position TEXT NOT NULL)" - ) - txn.execute(sql) - - # Get rooms we want to process from the database, only adding - # those that we haven't (i.e. those not in room_stats_earliest_token) - sql = """ - INSERT INTO %s_rooms (room_id, events) - SELECT c.room_id, count(*) FROM current_state_events AS c - LEFT JOIN room_stats_earliest_token AS t USING (room_id) - WHERE t.room_id IS NULL - GROUP BY c.room_id - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) - - new_pos = yield self.get_max_stream_id_in_current_state_deltas() - yield self.runInteraction("populate_stats_temp_build", _make_staging_area) - yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos}) - self.get_earliest_token_for_room_stats.invalidate_all() - - yield self._end_background_update("populate_stats_createtables") - return 1 - - @defer.inlineCallbacks - def _populate_stats_cleanup(self, progress, batch_size): - """ - Update the user directory stream position, then clean up the old tables. - """ - if not self.stats_enabled: - yield self._end_background_update("populate_stats_cleanup") - return 1 - - position = yield self._simple_select_one_onecol( - TEMP_TABLE + "_position", None, "position" - ) - yield self.update_stats_stream_pos(position) - - def _delete_staging_area(txn): - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms") - txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position") - - yield self.runInteraction("populate_stats_cleanup", _delete_staging_area) - - yield self._end_background_update("populate_stats_cleanup") - return 1 - - @defer.inlineCallbacks - def _populate_stats_process_rooms(self, progress, batch_size): - - if not self.stats_enabled: - yield self._end_background_update("populate_stats_process_rooms") - return 1 - - # If we don't have progress filed, delete everything. - if not progress: - yield self.delete_all_stats() - - def _get_next_batch(txn): - # Only fetch 250 rooms, so we don't fetch too many at once, even - # if those 250 rooms have less than batch_size state events. - sql = """ - SELECT room_id, events FROM %s_rooms - ORDER BY events DESC - LIMIT 250 - """ % ( - TEMP_TABLE, - ) - txn.execute(sql) - rooms_to_work_on = txn.fetchall() - - if not rooms_to_work_on: - return None - - # Get how many are left to process, so we can give status on how - # far we are in processing - txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms") - progress["remaining"] = txn.fetchone()[0] - - return rooms_to_work_on - - rooms_to_work_on = yield self.runInteraction( - "populate_stats_temp_read", _get_next_batch - ) - - # No more rooms -- complete the transaction. - if not rooms_to_work_on: - yield self._end_background_update("populate_stats_process_rooms") - return 1 - - logger.info( - "Processing the next %d rooms of %d remaining", - len(rooms_to_work_on), - progress["remaining"], - ) - - # Number of state events we've processed by going through each room - processed_event_count = 0 - - for room_id, event_count in rooms_to_work_on: - - current_state_ids = yield self.get_current_state_ids(room_id) - - join_rules_id = current_state_ids.get((EventTypes.JoinRules, "")) - history_visibility_id = current_state_ids.get( - (EventTypes.RoomHistoryVisibility, "") - ) - encryption_id = current_state_ids.get((EventTypes.RoomEncryption, "")) - name_id = current_state_ids.get((EventTypes.Name, "")) - topic_id = current_state_ids.get((EventTypes.Topic, "")) - avatar_id = current_state_ids.get((EventTypes.RoomAvatar, "")) - canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, "")) - - event_ids = [ - join_rules_id, - history_visibility_id, - encryption_id, - name_id, - topic_id, - avatar_id, - canonical_alias_id, - ] - - state_events = yield self.get_events( - [ev for ev in event_ids if ev is not None] - ) - - def _get_or_none(event_id, arg): - event = state_events.get(event_id) - if event: - return event.content.get(arg) - return None - - yield self.update_room_state( - room_id, - { - "join_rules": _get_or_none(join_rules_id, "join_rule"), - "history_visibility": _get_or_none( - history_visibility_id, "history_visibility" - ), - "encryption": _get_or_none(encryption_id, "algorithm"), - "name": _get_or_none(name_id, "name"), - "topic": _get_or_none(topic_id, "topic"), - "avatar": _get_or_none(avatar_id, "url"), - "canonical_alias": _get_or_none(canonical_alias_id, "alias"), - }, - ) - - now = self.hs.get_reactor().seconds() - - # quantise time to the nearest bucket - now = (now // self.stats_bucket_size) * self.stats_bucket_size - - def _fetch_data(txn): - - # Get the current token of the room - current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn) - - current_state_events = len(current_state_ids) - - membership_counts = self._get_user_counts_in_room_txn(txn, room_id) - - total_state_events = self._get_total_state_event_counts_txn( - txn, room_id - ) - - self._update_stats_txn( - txn, - "room", - room_id, - now, - { - "bucket_size": self.stats_bucket_size, - "current_state_events": current_state_events, - "joined_members": membership_counts.get(Membership.JOIN, 0), - "invited_members": membership_counts.get(Membership.INVITE, 0), - "left_members": membership_counts.get(Membership.LEAVE, 0), - "banned_members": membership_counts.get(Membership.BAN, 0), - "state_events": total_state_events, - }, - ) - self._simple_insert_txn( - txn, - "room_stats_earliest_token", - {"room_id": room_id, "token": current_token}, - ) - - # We've finished a room. Delete it from the table. - self._simple_delete_one_txn( - txn, TEMP_TABLE + "_rooms", {"room_id": room_id} - ) - - yield self.runInteraction("update_room_stats", _fetch_data) - - # Update the remaining counter. - progress["remaining"] -= 1 - yield self.runInteraction( - "populate_stats", - self._background_update_progress_txn, - "populate_stats_process_rooms", - progress, - ) - - processed_event_count += event_count - - if processed_event_count > batch_size: - # Don't process any more rooms, we've hit our batch size. - return processed_event_count - - return processed_event_count - - def delete_all_stats(self): - """ - Delete all statistics records. - """ - - def _delete_all_stats_txn(txn): - txn.execute("DELETE FROM room_state") - txn.execute("DELETE FROM room_stats") - txn.execute("DELETE FROM room_stats_earliest_token") - txn.execute("DELETE FROM user_stats") - - return self.runInteraction("delete_all_stats", _delete_all_stats_txn) - - def get_stats_stream_pos(self): - return self._simple_select_one_onecol( - table="stats_stream_pos", - keyvalues={}, - retcol="stream_id", - desc="stats_stream_pos", - ) - - def update_stats_stream_pos(self, stream_id): - return self._simple_update_one( - table="stats_stream_pos", - keyvalues={}, - updatevalues={"stream_id": stream_id}, - desc="update_stats_stream_pos", - ) + self.register_noop_background_update("populate_stats_createtables") + self.register_noop_background_update("populate_stats_process_rooms") + self.register_noop_background_update("populate_stats_cleanup") def update_room_state(self, room_id, fields): """ @@ -366,119 +76,3 @@ class StatsStore(StateDeltasStore): values=fields, desc="update_room_state", ) - - def get_deltas_for_room(self, room_id, start, size=100): - """ - Get statistics deltas for a given room. - - Args: - room_id (str) - start (int): Pagination start. Number of entries, not timestamp. - size (int): How many entries to return. - - Returns: - Deferred[list[dict]], where the dict has the keys of - ABSOLUTE_STATS_FIELDS["room"] and "ts". - """ - return self._simple_select_list_paginate( - "room_stats", - {"room_id": room_id}, - "ts", - start, - size, - retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]), - order_direction="DESC", - ) - - def get_all_room_state(self): - return self._simple_select_list( - "room_state", None, retcols=("name", "topic", "canonical_alias") - ) - - @cached() - def get_earliest_token_for_room_stats(self, room_id): - """ - Fetch the "earliest token". This is used by the room stats delta - processor to ignore deltas that have been processed between the - start of the background task and any particular room's stats - being calculated. - - Returns: - Deferred[int] - """ - return self._simple_select_one_onecol( - "room_stats_earliest_token", - {"room_id": room_id}, - retcol="token", - allow_none=True, - ) - - def update_stats(self, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert( - table=table, - keyvalues={id_col: stats_id, "ts": ts}, - values=fields, - desc="update_stats", - ) - - def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields): - table, id_col = TYPE_TO_ROOM[stats_type] - return self._simple_upsert_txn( - txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields - ) - - def update_stats_delta(self, ts, stats_type, stats_id, field, value): - def _update_stats_delta(txn): - table, id_col = TYPE_TO_ROOM[stats_type] - - sql = ( - "SELECT * FROM %s" - " WHERE %s=? and ts=(" - " SELECT MAX(ts) FROM %s" - " WHERE %s=?" - ")" - ) % (table, id_col, table, id_col) - txn.execute(sql, (stats_id, stats_id)) - rows = self.cursor_to_dict(txn) - if len(rows) == 0: - # silently skip as we don't have anything to apply a delta to yet. - # this tries to minimise any race between the initial sync and - # subsequent deltas arriving. - return - - current_ts = ts - latest_ts = rows[0]["ts"] - if current_ts < latest_ts: - # This one is in the past, but we're just encountering it now. - # Mark it as part of the current bucket. - current_ts = latest_ts - elif ts != latest_ts: - # we have to copy our absolute counters over to the new entry. - values = { - key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] - } - values[id_col] = stats_id - values["ts"] = ts - values["bucket_size"] = self.stats_bucket_size - - self._simple_insert_txn(txn, table=table, values=values) - - # actually update the new value - if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: - self._simple_update_txn( - txn, - table=table, - keyvalues={id_col: stats_id, "ts": current_ts}, - updatevalues={field: value}, - ) - else: - sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % ( - table, - field, - field, - id_col, - ) - txn.execute(sql, (value, stats_id, current_ts)) - - return self.runInteraction("update_stats_delta", _update_stats_delta) |