summary refs log tree commit diff
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-07-18 23:56:57 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-07-18 23:56:57 +0100
commit18752982dbb1492f80942fba06b8c980150347f1 (patch)
tree72ae8107896ba89ca313061ff00c16a703248c81
parentWIP for updating the stats store (diff)
downloadsynapse-18752982dbb1492f80942fba06b8c980150347f1.tar.xz
hook up state deltas to stats
-rw-r--r--synapse/handlers/state_deltas.py73
-rw-r--r--synapse/handlers/stats.py205
-rw-r--r--synapse/handlers/user_directory.py50
-rw-r--r--synapse/storage/schema/delta/51/stats.sql3
-rw-r--r--synapse/storage/state.py53
-rw-r--r--synapse/storage/stats.py98
-rw-r--r--synapse/storage/user_directory.py2
7 files changed, 384 insertions, 100 deletions
diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py
new file mode 100644

index 0000000000..ed6332ac94 --- /dev/null +++ b/synapse/handlers/state_deltas.py
@@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +# Copyright 2017 Vector Creations Ltd +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import logging + +from twisted.internet import defer + +from .base import BaseHandler + +logger = logging.getLogger(__name__) + + +class StateDeltasHandler(BaseHandler): + + def __init__(self, hs): + super(StateDeltasHandler, self).__init__(hs) + self.store = hs.get_datastore() + + @defer.inlineCallbacks + def _get_key_change(self, prev_event_id, event_id, key_name, public_value): + """Given two events check if the `key_name` field in content changed + from not matching `public_value` to doing so. + + For example, check if `history_visibility` (`key_name`) changed from + `shared` to `world_readable` (`public_value`). + + Returns: + None if the field in the events either both match `public_value` + or if neither do, i.e. there has been no change. + True if it didnt match `public_value` but now does + False if it did match `public_value` but now doesn't + """ + prev_event = None + event = None + if prev_event_id: + prev_event = yield self.store.get_event(prev_event_id, allow_none=True) + + if event_id: + event = yield self.store.get_event(event_id, allow_none=True) + + if not event and not prev_event: + logger.debug("Neither event exists: %r %r", prev_event_id, event_id) + defer.returnValue(None) + + prev_value = None + value = None + + if prev_event: + prev_value = prev_event.content.get(key_name) + + if event: + value = event.content.get(key_name) + + logger.debug("prev_value: %r -> value: %r", prev_value, value) + + if value == public_value and prev_value != public_value: + defer.returnValue(True) + elif value != public_value and prev_value == public_value: + defer.returnValue(False) + else: + defer.returnValue(None) diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index d95fb3a774..4fa9f1de95 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py
@@ -17,14 +17,15 @@ import logging from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership -from synapse.types import get_localpart_from_id +from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.util.metrics import Measure +from .state_deltas import StateDeltasHandler + logger = logging.getLogger(__name__) -class StatsHandler(object): +class StatsHandler(StateDeltasHandler): """Handles keeping the *_stats tables updated with a simple time-series of information about the users, rooms and media on the server, such that admins have some idea of who is consuming their resouces. @@ -33,11 +34,11 @@ class StatsHandler(object): """ INITIAL_ROOM_SLEEP_MS = 50 - INITIAL_ROOM_SLEEP_COUNT = 100 - INITIAL_ROOM_BATCH_SIZE = 100 INITIAL_USER_SLEEP_MS = 10 def __init__(self, hs): + super(StatsHandler, self).__init__(hs) + self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname @@ -228,6 +229,14 @@ class StatsHandler(object): def _handle_deltas(self, deltas): """Called with the state deltas to process """ + + # XXX: shouldn't this be the timestamp where the delta was emitted rather + # than received? + now = self.clock.time_msec() + + # quantise time to the nearest bucket + now = int(now / (self.stats_bucket_size * 1000)) * self.stats_bucket_size * 1000 + for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -237,16 +246,182 @@ class StatsHandler(object): logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + if event_id is None: + return + + event = yield self.store.get_event(event_id) + if event is None: + return + + if typ == EventTypes.Member: + # we could use _get_key_change here but it's a bit inefficient + # given we're not testing for a specific result; might as well + # just grab the prev_membership and membership strings and + # compare them. + + if prev_event_id is not None: + prev_event = yield self.store.get_event(prev_event_id) + + prev_membership = None + membership = event.content.get("membership") + if prev_event: + prev_membership = prev_event.content.get("membership") + + if prev_membership != membership: + if prev_membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "joined_members", -1 + ) + elif prev_membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "invited_members", -1 + ) + elif prev_membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "left_members", -1 + ) + elif prev_membership == Membership.BAN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "banned_members", -1 + ) + + if membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "joined_members", +1 + ) + elif membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "invited_members", +1 + ) + elif membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "left_members", +1 + ) + elif membership == Membership.BAN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "banned_members", +1 + ) + + user_id = event.state_key + if self.is_mine_id(user_id): + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + if prev_membership != membership: + if prev_membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "user", user_id, + "public_rooms" if public else "private_rooms", + -1 + ) + elif membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "user", user_id, + "public_rooms" if public else "private_rooms", + +1 + ) + + elif typ == EventTypes.JoinRules: + self.store.update_room_state(room_id, { + "join_rules": event.content.get("join_rule") + }) + + is_public = self._get_key_change( + room_id, prev_event_id, event_id, + "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + self.store.update_public_room_stats( + now, self.stats_bucket_size, + room_id, is_public + ) + + elif typ == EventTypes.RoomHistoryVisibility: + yield self.store.update_room_state(room_id, { + "history_visibility": event.content.get("history_visibility") + }) + + is_public = self._get_key_change( + room_id, prev_event_id, event_id, + "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats( + now, self.stats_bucket_size, + room_id, is_public + ) + + elif typ == EventTypes.RoomEncryption: + self.store.update_room_state(room_id, { + "encryption": event.content.get("algorithm") + }) + elif typ == EventTypes.Name: + self.store.update_room_state(room_id, { + "name": event.content.get("name") + }) + elif typ == EventTypes.Topic: + self.store.update_room_state(room_id, { + "topic": event.content.get("topic") + }) + elif typ == EventTypes.RoomAvatar: + self.store.update_room_state(room_id, { + "avatar": event.content.get("url") + }) + elif typ == EventTypes.CanonicalAlias: + self.store.update_room_state(room_id, { + "canonical_alias": event.content.get("alias") + }) + @defer.inlineCallbacks - def _handle_local_user(self, user_id): - """Adds a new local roomless user into the user_directory_search table. - Used to populate up the user index when we have an - user_directory_search_all_users specified. - """ - logger.debug("Adding new local user to dir, %r", user_id) + def update_public_room_stats(self, ts, bucket_size, room_id, is_public): + # For now, blindly iterate over all local users in the room so that + # we can handle the whole problem of copying buckets over as needed + + user_ids = yield self.store.get_users_in_room(room_id) + + for user_id in user_ids: + if self.is_mine(user_id): + self.store.update_stats_delta( + ts, bucket_size, + "user", user_id, + "public_rooms", +1 if is_public else -1 + ) + self.store.update_stats_delta( + ts, bucket_size, + "user", user_id, + "private_rooms", -1 if is_public else +1 + ) + + @defer.inlineCallbacks + def _is_public_room(self, room_id): + events = yield self.store.get_current_state( + room_id, ( + (EventTypes.JoinRules, ""), + (EventTypes.RoomHistoryVisibility, "") + ) + ) + + join_rules = events.get((EventTypes.JoinRules, "")) + history_visibility = events.get((EventTypes.RoomHistoryVisibility, "")) - profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id)) + if ( + join_rules.content.get("join_rule") == JoinRules.PUBLIC or + history_visibility.content.get("history_visibility") == "world_readable" + ): + defer.returnValue(True) + else: + defer.returnValue(True) - row = yield self.store.get_user_in_directory(user_id) - if not row: - yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) + @defer.inlineCallbacks + def _handle_local_user(self, user_id): + logger.debug("Adding new local user to stats, %r", user_id) diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d4a51281cb..ed294f5826 100644 --- a/synapse/handlers/user_directory.py +++ b/synapse/handlers/user_directory.py
@@ -24,10 +24,12 @@ from synapse.storage.roommember import ProfileInfo from synapse.types import get_localpart_from_id from synapse.util.metrics import Measure +from .state_deltas import StateDeltasHandler + logger = logging.getLogger(__name__) -class UserDirectoryHandler(object): +class UserDirectoryHandler(StateDeltasHandler): """Handles querying of and keeping updated the user_directory. N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY @@ -49,6 +51,8 @@ class UserDirectoryHandler(object): INITIAL_USER_SLEEP_MS = 10 def __init__(self, hs): + super(UserDirectoryHandler, self).__init__(hs) + self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname @@ -643,47 +647,3 @@ class UserDirectoryHandler(object): yield self.store.update_profile_in_user_dir( user_id, new_name, new_avatar, room_id, ) - - @defer.inlineCallbacks - def _get_key_change(self, prev_event_id, event_id, key_name, public_value): - """Given two events check if the `key_name` field in content changed - from not matching `public_value` to doing so. - - For example, check if `history_visibility` (`key_name`) changed from - `shared` to `world_readable` (`public_value`). - - Returns: - None if the field in the events either both match `public_value` - or if neither do, i.e. there has been no change. - True if it didnt match `public_value` but now does - False if it did match `public_value` but now doesn't - """ - prev_event = None - event = None - if prev_event_id: - prev_event = yield self.store.get_event(prev_event_id, allow_none=True) - - if event_id: - event = yield self.store.get_event(event_id, allow_none=True) - - if not event and not prev_event: - logger.debug("Neither event exists: %r %r", prev_event_id, event_id) - defer.returnValue(None) - - prev_value = None - value = None - - if prev_event: - prev_value = prev_event.content.get(key_name) - - if event: - value = event.content.get(key_name) - - logger.debug("prev_value: %r -> value: %r", prev_value, value) - - if value == public_value and prev_value != public_value: - defer.returnValue(True) - elif value != public_value and prev_value == public_value: - defer.returnValue(False) - else: - defer.returnValue(None) diff --git a/synapse/storage/schema/delta/51/stats.sql b/synapse/storage/schema/delta/51/stats.sql
index 32f8f21a13..9d0383da9c 100644 --- a/synapse/storage/schema/delta/51/stats.sql +++ b/synapse/storage/schema/delta/51/stats.sql
@@ -26,7 +26,7 @@ CREATE TABLE user_stats ( ts BIGINT NOT NULL, bucket_size INT NOT NULL, sent_events INT NOT NULL, - stored_events INT NOT NULL, -- delta or absolute? + local_events INT NOT NULL, public_rooms INT NOT NULL, private_rooms INT NOT NULL, sent_file_count INT NOT NULL, @@ -60,6 +60,7 @@ CREATE TABLE room_state ( encrypted BOOLEAN, name TEXT NOT NULL, topic TEXT NOT NULL, + avatar TEXT NOT NULL, canonical_alias TEXT NOT NULL, -- get aliases straight from the right table ); diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 89a05c4618..e60d6ed486 100644 --- a/synapse/storage/state.py +++ b/synapse/storage/state.py
@@ -89,6 +89,59 @@ class StateGroupWorkerStore(SQLBaseStore): _get_current_state_ids_txn, ) + @defer.inlineCallbacks + def get_current_state(self, room_id, types): + """Get the current state event of a given type for a room based on the + current_state_events table. This may not be as up-to-date as the result + of doing a fresh state resolution as per state_handler.get_current_state + Args: + room_id (str) + types (list): List of (type, state_key) tuples which are used to + filter the state fetched. `state_key` may be None, which matches + any `state_key` + Returns: + deferred: dict of (type, state_key) -> event + """ + def _get_current_state_txn(txn): + sql = """SELECT type, state_key, event_id FROM current_state_events + WHERE room_id = ? and %s""" + # Turns out that postgres doesn't like doing a list of OR's and + # is about 1000x slower, so we just issue a query for each specific + # type seperately. + if types: + clause_to_args = [ + ( + "AND type = ? AND state_key = ?", + (etype, state_key) + ) if state_key is not None else ( + "AND type = ?", + (etype,) + ) + for etype, state_key in types + ] + else: + # If types is None we fetch all the state, and so just use an + # empty where clause with no extra args. + clause_to_args = [("", [])] + for where_clause, where_args in clause_to_args: + args = [room_id] + args.extend(where_args) + txn.execute(sql % (where_clause,), args) + for row in txn: + typ, state_key, event_id = row + key = (typ, state_key) + results[intern_string(key)] = event_id + return results + + results = self.runInteraction( + "get_current_state", + _get_current_state_txn, + ) + for (key, event_id) in iteritems(results): + results[key] = yield self.store.get_event(event_id, allow_none=True) + + defer.returnValue(results) + @cached(max_entries=10000, iterable=True) def get_state_group_delta(self, state_group): """Given a state group try to return a previous group and a delta between diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index facbdba5c9..f6fc56cbe5 100644 --- a/synapse/storage/stats.py +++ b/synapse/storage/stats.py
@@ -19,6 +19,37 @@ from .StateDeltasStore import StateDeltasStore logger = logging.getLogger(__name__) +# these fields track relative numbers (e.g. number of events sent in this timeslice) +RELATIVE_STATS_FIELDS = { + "room": ( + "sent_events" + ), + "user": ( + "sent_events" + ) +} + +# these fields track rather than absolutes (e.g. total number of rooms on the server) +ABSOLUTE_STATS_FIELDS = { + "room": ( + "current_state_events", + "joined_members", + "invited_members", + "left_members", + "banned_members", + "state_events", + "local_events", + "remote_events", + ), + "user": ( + "local_events", + "public_rooms", + "private_rooms", + "sent_file_count", + "sent_file_size", + ), +} + class StatsStore(StateDeltasStore): @@ -59,28 +90,7 @@ class StatsStore(StateDeltasStore): desc="update_stats", ) - # these fields track relative numbers (e.g. number of events sent in this timeslice) - RELATIVE_STATS_FIELDS = { - "room": { - "sent_events": True - } - } - - # these fields track rather than absolutes (e.g. total number of rooms on the server) - ABSOLUTE_STATS_FIELDS = { - "room": ( - "current_state_events", - "joined_members", - "invited_members", - "left_members", - "banned_members", - "state_events", - "local_events", - "remote_events", - ) - } - - def update_stats_delta(self, stats_type, stats_id, field, value): + def update_stats_delta(self, ts, bucket_size, stats_type, stats_id, field, value): def _update_stats_delta(txn): table = "%s_stats" % stats_type id_col = "%s_id" % stats_type @@ -96,34 +106,46 @@ class StatsStore(StateDeltasStore): rows = self.cursor_to_dict(txn) if len(rows) == 0: # silently skip as we don't have anything to apply a delta to yet. + # this tries to minimise any race between the initial sync and + # subsequent deltas arriving. return + values = { + key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type] + } + values[id_col] = stats_id + values["ts"] = ts + values["bucket_size"] = bucket_size + latest_ts = rows[0]["ts"] if ts != latest_ts: # we have to copy our absolute counters over to the new entry. self._simple_insert_txn( txn, table=table, - values={ + values=values + ) + + # actually update the new value + if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]: + self._simple_update_txn( + txn, + table=table, + keyvalues={ id_col: stats_id, "ts": ts, - key: rows[0][key] - for key in ABSOLUTE_STATS_FIELDS[stats_type], + }, + updatevalues={ + field: value } ) - - # actually update the new value - self._simple_update_txn( - txn, - table=table, - keyvalues={ - id_col: stats_id, - "ts": ts, - } - updatevalues={ - field: value - } - ) + else: + sql = ( + "UPDATE %s " + " SET %s=%s+?" + " WHERE %s=? AND ts=?" + ) % (table, field, field, id_col) + txn.execute(sql, (value, stats_id, ts)) return self.runInteraction( "update_stats_delta", _update_stats_delta diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 6b3ed10cca..b210551c5a 100644 --- a/synapse/storage/user_directory.py +++ b/synapse/storage/user_directory.py
@@ -25,7 +25,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine from synapse.types import get_domain_from_id, get_localpart_from_id from synapse.util.caches.descriptors import cached, cachedInlineCallbacks -from .StateDeltasStore import StateDeltasStore +from .state_deltas import StateDeltasStore logger = logging.getLogger(__name__)