diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index d95fb3a774..4fa9f1de95 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -17,14 +17,15 @@ import logging
from twisted.internet import defer
-from synapse.api.constants import EventTypes, Membership
-from synapse.types import get_localpart_from_id
+from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.util.metrics import Measure
+from .state_deltas import StateDeltasHandler
+
logger = logging.getLogger(__name__)
-class StatsHandler(object):
+class StatsHandler(StateDeltasHandler):
"""Handles keeping the *_stats tables updated with a simple time-series of
information about the users, rooms and media on the server, such that admins
have some idea of who is consuming their resouces.
@@ -33,11 +34,11 @@ class StatsHandler(object):
"""
INITIAL_ROOM_SLEEP_MS = 50
- INITIAL_ROOM_SLEEP_COUNT = 100
- INITIAL_ROOM_BATCH_SIZE = 100
INITIAL_USER_SLEEP_MS = 10
def __init__(self, hs):
+ super(StatsHandler, self).__init__(hs)
+
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
@@ -228,6 +229,14 @@ class StatsHandler(object):
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
"""
+
+ # XXX: shouldn't this be the timestamp where the delta was emitted rather
+ # than received?
+ now = self.clock.time_msec()
+
+ # quantise time to the nearest bucket
+ now = int(now / (self.stats_bucket_size * 1000)) * self.stats_bucket_size * 1000
+
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
@@ -237,16 +246,182 @@ class StatsHandler(object):
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+ if event_id is None:
+ return
+
+ event = yield self.store.get_event(event_id)
+ if event is None:
+ return
+
+ if typ == EventTypes.Member:
+ # we could use _get_key_change here but it's a bit inefficient
+ # given we're not testing for a specific result; might as well
+ # just grab the prev_membership and membership strings and
+ # compare them.
+
+ if prev_event_id is not None:
+ prev_event = yield self.store.get_event(prev_event_id)
+
+ prev_membership = None
+ membership = event.content.get("membership")
+ if prev_event:
+ prev_membership = prev_event.content.get("membership")
+
+ if prev_membership != membership:
+ if prev_membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "joined_members", -1
+ )
+ elif prev_membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "invited_members", -1
+ )
+ elif prev_membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "left_members", -1
+ )
+ elif prev_membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "banned_members", -1
+ )
+
+ if membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "joined_members", +1
+ )
+ elif membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "invited_members", +1
+ )
+ elif membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "left_members", +1
+ )
+ elif membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "banned_members", +1
+ )
+
+ user_id = event.state_key
+ if self.is_mine_id(user_id):
+ # update user_stats as it's one of our users
+ public = yield self._is_public_room(room_id)
+
+ if prev_membership != membership:
+ if prev_membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "user", user_id,
+ "public_rooms" if public else "private_rooms",
+ -1
+ )
+ elif membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "user", user_id,
+ "public_rooms" if public else "private_rooms",
+ +1
+ )
+
+ elif typ == EventTypes.JoinRules:
+ self.store.update_room_state(room_id, {
+ "join_rules": event.content.get("join_rule")
+ })
+
+ is_public = self._get_key_change(
+ room_id, prev_event_id, event_id,
+ "join_rule", JoinRules.PUBLIC
+ )
+ if is_public is not None:
+ self.store.update_public_room_stats(
+ now, self.stats_bucket_size,
+ room_id, is_public
+ )
+
+ elif typ == EventTypes.RoomHistoryVisibility:
+ yield self.store.update_room_state(room_id, {
+ "history_visibility": event.content.get("history_visibility")
+ })
+
+ is_public = self._get_key_change(
+ room_id, prev_event_id, event_id,
+ "history_visibility", "world_readable"
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(
+ now, self.stats_bucket_size,
+ room_id, is_public
+ )
+
+ elif typ == EventTypes.RoomEncryption:
+ self.store.update_room_state(room_id, {
+ "encryption": event.content.get("algorithm")
+ })
+ elif typ == EventTypes.Name:
+ self.store.update_room_state(room_id, {
+ "name": event.content.get("name")
+ })
+ elif typ == EventTypes.Topic:
+ self.store.update_room_state(room_id, {
+ "topic": event.content.get("topic")
+ })
+ elif typ == EventTypes.RoomAvatar:
+ self.store.update_room_state(room_id, {
+ "avatar": event.content.get("url")
+ })
+ elif typ == EventTypes.CanonicalAlias:
+ self.store.update_room_state(room_id, {
+ "canonical_alias": event.content.get("alias")
+ })
+
@defer.inlineCallbacks
- def _handle_local_user(self, user_id):
- """Adds a new local roomless user into the user_directory_search table.
- Used to populate up the user index when we have an
- user_directory_search_all_users specified.
- """
- logger.debug("Adding new local user to dir, %r", user_id)
+ def update_public_room_stats(self, ts, bucket_size, room_id, is_public):
+ # For now, blindly iterate over all local users in the room so that
+ # we can handle the whole problem of copying buckets over as needed
+
+ user_ids = yield self.store.get_users_in_room(room_id)
+
+ for user_id in user_ids:
+ if self.is_mine(user_id):
+ self.store.update_stats_delta(
+ ts, bucket_size,
+ "user", user_id,
+ "public_rooms", +1 if is_public else -1
+ )
+ self.store.update_stats_delta(
+ ts, bucket_size,
+ "user", user_id,
+ "private_rooms", -1 if is_public else +1
+ )
+
+ @defer.inlineCallbacks
+ def _is_public_room(self, room_id):
+ events = yield self.store.get_current_state(
+ room_id, (
+ (EventTypes.JoinRules, ""),
+ (EventTypes.RoomHistoryVisibility, "")
+ )
+ )
+
+ join_rules = events.get((EventTypes.JoinRules, ""))
+ history_visibility = events.get((EventTypes.RoomHistoryVisibility, ""))
- profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id))
+ if (
+ join_rules.content.get("join_rule") == JoinRules.PUBLIC or
+ history_visibility.content.get("history_visibility") == "world_readable"
+ ):
+ defer.returnValue(True)
+ else:
+ defer.returnValue(True)
- row = yield self.store.get_user_in_directory(user_id)
- if not row:
- yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+ @defer.inlineCallbacks
+ def _handle_local_user(self, user_id):
+ logger.debug("Adding new local user to stats, %r", user_id)
|