summary refs log tree commit diff
path: root/synapse/handlers/stats.py
diff options
context:
space:
mode:
authorMatthew Hodgson <matthew@matrix.org>2018-07-18 23:56:57 +0100
committerMatthew Hodgson <matthew@matrix.org>2018-07-18 23:56:57 +0100
commit18752982dbb1492f80942fba06b8c980150347f1 (patch)
tree72ae8107896ba89ca313061ff00c16a703248c81 /synapse/handlers/stats.py
parentWIP for updating the stats store (diff)
downloadsynapse-18752982dbb1492f80942fba06b8c980150347f1.tar.xz
hook up state deltas to stats
Diffstat (limited to 'synapse/handlers/stats.py')
-rw-r--r--synapse/handlers/stats.py205
1 files changed, 190 insertions, 15 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py

index d95fb3a774..4fa9f1de95 100644 --- a/synapse/handlers/stats.py +++ b/synapse/handlers/stats.py
@@ -17,14 +17,15 @@ import logging from twisted.internet import defer -from synapse.api.constants import EventTypes, Membership -from synapse.types import get_localpart_from_id +from synapse.api.constants import EventTypes, Membership, JoinRules from synapse.util.metrics import Measure +from .state_deltas import StateDeltasHandler + logger = logging.getLogger(__name__) -class StatsHandler(object): +class StatsHandler(StateDeltasHandler): """Handles keeping the *_stats tables updated with a simple time-series of information about the users, rooms and media on the server, such that admins have some idea of who is consuming their resouces. @@ -33,11 +34,11 @@ class StatsHandler(object): """ INITIAL_ROOM_SLEEP_MS = 50 - INITIAL_ROOM_SLEEP_COUNT = 100 - INITIAL_ROOM_BATCH_SIZE = 100 INITIAL_USER_SLEEP_MS = 10 def __init__(self, hs): + super(StatsHandler, self).__init__(hs) + self.store = hs.get_datastore() self.state = hs.get_state_handler() self.server_name = hs.hostname @@ -228,6 +229,14 @@ class StatsHandler(object): def _handle_deltas(self, deltas): """Called with the state deltas to process """ + + # XXX: shouldn't this be the timestamp where the delta was emitted rather + # than received? + now = self.clock.time_msec() + + # quantise time to the nearest bucket + now = int(now / (self.stats_bucket_size * 1000)) * self.stats_bucket_size * 1000 + for delta in deltas: typ = delta["type"] state_key = delta["state_key"] @@ -237,16 +246,182 @@ class StatsHandler(object): logger.debug("Handling: %r %r, %s", typ, state_key, event_id) + if event_id is None: + return + + event = yield self.store.get_event(event_id) + if event is None: + return + + if typ == EventTypes.Member: + # we could use _get_key_change here but it's a bit inefficient + # given we're not testing for a specific result; might as well + # just grab the prev_membership and membership strings and + # compare them. + + if prev_event_id is not None: + prev_event = yield self.store.get_event(prev_event_id) + + prev_membership = None + membership = event.content.get("membership") + if prev_event: + prev_membership = prev_event.content.get("membership") + + if prev_membership != membership: + if prev_membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "joined_members", -1 + ) + elif prev_membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "invited_members", -1 + ) + elif prev_membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "left_members", -1 + ) + elif prev_membership == Membership.BAN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "banned_members", -1 + ) + + if membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "joined_members", +1 + ) + elif membership == Membership.INVITE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "invited_members", +1 + ) + elif membership == Membership.LEAVE: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "left_members", +1 + ) + elif membership == Membership.BAN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "room", room_id, "banned_members", +1 + ) + + user_id = event.state_key + if self.is_mine_id(user_id): + # update user_stats as it's one of our users + public = yield self._is_public_room(room_id) + + if prev_membership != membership: + if prev_membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "user", user_id, + "public_rooms" if public else "private_rooms", + -1 + ) + elif membership == Membership.JOIN: + yield self.store.update_stats_delta( + now, self.stats_bucket_size, + "user", user_id, + "public_rooms" if public else "private_rooms", + +1 + ) + + elif typ == EventTypes.JoinRules: + self.store.update_room_state(room_id, { + "join_rules": event.content.get("join_rule") + }) + + is_public = self._get_key_change( + room_id, prev_event_id, event_id, + "join_rule", JoinRules.PUBLIC + ) + if is_public is not None: + self.store.update_public_room_stats( + now, self.stats_bucket_size, + room_id, is_public + ) + + elif typ == EventTypes.RoomHistoryVisibility: + yield self.store.update_room_state(room_id, { + "history_visibility": event.content.get("history_visibility") + }) + + is_public = self._get_key_change( + room_id, prev_event_id, event_id, + "history_visibility", "world_readable" + ) + if is_public is not None: + yield self.update_public_room_stats( + now, self.stats_bucket_size, + room_id, is_public + ) + + elif typ == EventTypes.RoomEncryption: + self.store.update_room_state(room_id, { + "encryption": event.content.get("algorithm") + }) + elif typ == EventTypes.Name: + self.store.update_room_state(room_id, { + "name": event.content.get("name") + }) + elif typ == EventTypes.Topic: + self.store.update_room_state(room_id, { + "topic": event.content.get("topic") + }) + elif typ == EventTypes.RoomAvatar: + self.store.update_room_state(room_id, { + "avatar": event.content.get("url") + }) + elif typ == EventTypes.CanonicalAlias: + self.store.update_room_state(room_id, { + "canonical_alias": event.content.get("alias") + }) + @defer.inlineCallbacks - def _handle_local_user(self, user_id): - """Adds a new local roomless user into the user_directory_search table. - Used to populate up the user index when we have an - user_directory_search_all_users specified. - """ - logger.debug("Adding new local user to dir, %r", user_id) + def update_public_room_stats(self, ts, bucket_size, room_id, is_public): + # For now, blindly iterate over all local users in the room so that + # we can handle the whole problem of copying buckets over as needed + + user_ids = yield self.store.get_users_in_room(room_id) + + for user_id in user_ids: + if self.is_mine(user_id): + self.store.update_stats_delta( + ts, bucket_size, + "user", user_id, + "public_rooms", +1 if is_public else -1 + ) + self.store.update_stats_delta( + ts, bucket_size, + "user", user_id, + "private_rooms", -1 if is_public else +1 + ) + + @defer.inlineCallbacks + def _is_public_room(self, room_id): + events = yield self.store.get_current_state( + room_id, ( + (EventTypes.JoinRules, ""), + (EventTypes.RoomHistoryVisibility, "") + ) + ) + + join_rules = events.get((EventTypes.JoinRules, "")) + history_visibility = events.get((EventTypes.RoomHistoryVisibility, "")) - profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id)) + if ( + join_rules.content.get("join_rule") == JoinRules.PUBLIC or + history_visibility.content.get("history_visibility") == "world_readable" + ): + defer.returnValue(True) + else: + defer.returnValue(True) - row = yield self.store.get_user_in_directory(user_id) - if not row: - yield self.store.add_profiles_to_user_dir(None, {user_id: profile}) + @defer.inlineCallbacks + def _handle_local_user(self, user_id): + logger.debug("Adding new local user to stats, %r", user_id)