diff --git a/synapse/handlers/state_deltas.py b/synapse/handlers/state_deltas.py
new file mode 100644
index 0000000000..ed6332ac94
--- /dev/null
+++ b/synapse/handlers/state_deltas.py
@@ -0,0 +1,73 @@
+# -*- coding: utf-8 -*-
+# Copyright 2017 Vector Creations Ltd
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import logging
+
+from twisted.internet import defer
+
+from .base import BaseHandler
+
+logger = logging.getLogger(__name__)
+
+
+class StateDeltasHandler(BaseHandler):
+
+ def __init__(self, hs):
+ super(StateDeltasHandler, self).__init__(hs)
+ self.store = hs.get_datastore()
+
+ @defer.inlineCallbacks
+ def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
+ """Given two events check if the `key_name` field in content changed
+ from not matching `public_value` to doing so.
+
+ For example, check if `history_visibility` (`key_name`) changed from
+ `shared` to `world_readable` (`public_value`).
+
+ Returns:
+ None if the field in the events either both match `public_value`
+ or if neither do, i.e. there has been no change.
+ True if it didnt match `public_value` but now does
+ False if it did match `public_value` but now doesn't
+ """
+ prev_event = None
+ event = None
+ if prev_event_id:
+ prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
+
+ if event_id:
+ event = yield self.store.get_event(event_id, allow_none=True)
+
+ if not event and not prev_event:
+ logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
+ defer.returnValue(None)
+
+ prev_value = None
+ value = None
+
+ if prev_event:
+ prev_value = prev_event.content.get(key_name)
+
+ if event:
+ value = event.content.get(key_name)
+
+ logger.debug("prev_value: %r -> value: %r", prev_value, value)
+
+ if value == public_value and prev_value != public_value:
+ defer.returnValue(True)
+ elif value != public_value and prev_value == public_value:
+ defer.returnValue(False)
+ else:
+ defer.returnValue(None)
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index d95fb3a774..4fa9f1de95 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -17,14 +17,15 @@ import logging
from twisted.internet import defer
-from synapse.api.constants import EventTypes, Membership
-from synapse.types import get_localpart_from_id
+from synapse.api.constants import EventTypes, Membership, JoinRules
from synapse.util.metrics import Measure
+from .state_deltas import StateDeltasHandler
+
logger = logging.getLogger(__name__)
-class StatsHandler(object):
+class StatsHandler(StateDeltasHandler):
"""Handles keeping the *_stats tables updated with a simple time-series of
information about the users, rooms and media on the server, such that admins
have some idea of who is consuming their resouces.
@@ -33,11 +34,11 @@ class StatsHandler(object):
"""
INITIAL_ROOM_SLEEP_MS = 50
- INITIAL_ROOM_SLEEP_COUNT = 100
- INITIAL_ROOM_BATCH_SIZE = 100
INITIAL_USER_SLEEP_MS = 10
def __init__(self, hs):
+ super(StatsHandler, self).__init__(hs)
+
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
@@ -228,6 +229,14 @@ class StatsHandler(object):
def _handle_deltas(self, deltas):
"""Called with the state deltas to process
"""
+
+ # XXX: shouldn't this be the timestamp where the delta was emitted rather
+ # than received?
+ now = self.clock.time_msec()
+
+ # quantise time to the nearest bucket
+ now = int(now / (self.stats_bucket_size * 1000)) * self.stats_bucket_size * 1000
+
for delta in deltas:
typ = delta["type"]
state_key = delta["state_key"]
@@ -237,16 +246,182 @@ class StatsHandler(object):
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+ if event_id is None:
+ return
+
+ event = yield self.store.get_event(event_id)
+ if event is None:
+ return
+
+ if typ == EventTypes.Member:
+ # we could use _get_key_change here but it's a bit inefficient
+ # given we're not testing for a specific result; might as well
+ # just grab the prev_membership and membership strings and
+ # compare them.
+
+ if prev_event_id is not None:
+ prev_event = yield self.store.get_event(prev_event_id)
+
+ prev_membership = None
+ membership = event.content.get("membership")
+ if prev_event:
+ prev_membership = prev_event.content.get("membership")
+
+ if prev_membership != membership:
+ if prev_membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "joined_members", -1
+ )
+ elif prev_membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "invited_members", -1
+ )
+ elif prev_membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "left_members", -1
+ )
+ elif prev_membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "banned_members", -1
+ )
+
+ if membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "joined_members", +1
+ )
+ elif membership == Membership.INVITE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "invited_members", +1
+ )
+ elif membership == Membership.LEAVE:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "left_members", +1
+ )
+ elif membership == Membership.BAN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "room", room_id, "banned_members", +1
+ )
+
+ user_id = event.state_key
+ if self.is_mine_id(user_id):
+ # update user_stats as it's one of our users
+ public = yield self._is_public_room(room_id)
+
+ if prev_membership != membership:
+ if prev_membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "user", user_id,
+ "public_rooms" if public else "private_rooms",
+ -1
+ )
+ elif membership == Membership.JOIN:
+ yield self.store.update_stats_delta(
+ now, self.stats_bucket_size,
+ "user", user_id,
+ "public_rooms" if public else "private_rooms",
+ +1
+ )
+
+ elif typ == EventTypes.JoinRules:
+ self.store.update_room_state(room_id, {
+ "join_rules": event.content.get("join_rule")
+ })
+
+ is_public = self._get_key_change(
+ room_id, prev_event_id, event_id,
+ "join_rule", JoinRules.PUBLIC
+ )
+ if is_public is not None:
+ self.store.update_public_room_stats(
+ now, self.stats_bucket_size,
+ room_id, is_public
+ )
+
+ elif typ == EventTypes.RoomHistoryVisibility:
+ yield self.store.update_room_state(room_id, {
+ "history_visibility": event.content.get("history_visibility")
+ })
+
+ is_public = self._get_key_change(
+ room_id, prev_event_id, event_id,
+ "history_visibility", "world_readable"
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(
+ now, self.stats_bucket_size,
+ room_id, is_public
+ )
+
+ elif typ == EventTypes.RoomEncryption:
+ self.store.update_room_state(room_id, {
+ "encryption": event.content.get("algorithm")
+ })
+ elif typ == EventTypes.Name:
+ self.store.update_room_state(room_id, {
+ "name": event.content.get("name")
+ })
+ elif typ == EventTypes.Topic:
+ self.store.update_room_state(room_id, {
+ "topic": event.content.get("topic")
+ })
+ elif typ == EventTypes.RoomAvatar:
+ self.store.update_room_state(room_id, {
+ "avatar": event.content.get("url")
+ })
+ elif typ == EventTypes.CanonicalAlias:
+ self.store.update_room_state(room_id, {
+ "canonical_alias": event.content.get("alias")
+ })
+
@defer.inlineCallbacks
- def _handle_local_user(self, user_id):
- """Adds a new local roomless user into the user_directory_search table.
- Used to populate up the user index when we have an
- user_directory_search_all_users specified.
- """
- logger.debug("Adding new local user to dir, %r", user_id)
+ def update_public_room_stats(self, ts, bucket_size, room_id, is_public):
+ # For now, blindly iterate over all local users in the room so that
+ # we can handle the whole problem of copying buckets over as needed
+
+ user_ids = yield self.store.get_users_in_room(room_id)
+
+ for user_id in user_ids:
+ if self.is_mine(user_id):
+ self.store.update_stats_delta(
+ ts, bucket_size,
+ "user", user_id,
+ "public_rooms", +1 if is_public else -1
+ )
+ self.store.update_stats_delta(
+ ts, bucket_size,
+ "user", user_id,
+ "private_rooms", -1 if is_public else +1
+ )
+
+ @defer.inlineCallbacks
+ def _is_public_room(self, room_id):
+ events = yield self.store.get_current_state(
+ room_id, (
+ (EventTypes.JoinRules, ""),
+ (EventTypes.RoomHistoryVisibility, "")
+ )
+ )
+
+ join_rules = events.get((EventTypes.JoinRules, ""))
+ history_visibility = events.get((EventTypes.RoomHistoryVisibility, ""))
- profile = yield self.store.get_profileinfo(get_localpart_from_id(user_id))
+ if (
+ join_rules.content.get("join_rule") == JoinRules.PUBLIC or
+ history_visibility.content.get("history_visibility") == "world_readable"
+ ):
+ defer.returnValue(True)
+ else:
+ defer.returnValue(True)
- row = yield self.store.get_user_in_directory(user_id)
- if not row:
- yield self.store.add_profiles_to_user_dir(None, {user_id: profile})
+ @defer.inlineCallbacks
+ def _handle_local_user(self, user_id):
+ logger.debug("Adding new local user to stats, %r", user_id)
diff --git a/synapse/handlers/user_directory.py b/synapse/handlers/user_directory.py
index d4a51281cb..ed294f5826 100644
--- a/synapse/handlers/user_directory.py
+++ b/synapse/handlers/user_directory.py
@@ -24,10 +24,12 @@ from synapse.storage.roommember import ProfileInfo
from synapse.types import get_localpart_from_id
from synapse.util.metrics import Measure
+from .state_deltas import StateDeltasHandler
+
logger = logging.getLogger(__name__)
-class UserDirectoryHandler(object):
+class UserDirectoryHandler(StateDeltasHandler):
"""Handles querying of and keeping updated the user_directory.
N.B.: ASSUMES IT IS THE ONLY THING THAT MODIFIES THE USER DIRECTORY
@@ -49,6 +51,8 @@ class UserDirectoryHandler(object):
INITIAL_USER_SLEEP_MS = 10
def __init__(self, hs):
+ super(UserDirectoryHandler, self).__init__(hs)
+
self.store = hs.get_datastore()
self.state = hs.get_state_handler()
self.server_name = hs.hostname
@@ -643,47 +647,3 @@ class UserDirectoryHandler(object):
yield self.store.update_profile_in_user_dir(
user_id, new_name, new_avatar, room_id,
)
-
- @defer.inlineCallbacks
- def _get_key_change(self, prev_event_id, event_id, key_name, public_value):
- """Given two events check if the `key_name` field in content changed
- from not matching `public_value` to doing so.
-
- For example, check if `history_visibility` (`key_name`) changed from
- `shared` to `world_readable` (`public_value`).
-
- Returns:
- None if the field in the events either both match `public_value`
- or if neither do, i.e. there has been no change.
- True if it didnt match `public_value` but now does
- False if it did match `public_value` but now doesn't
- """
- prev_event = None
- event = None
- if prev_event_id:
- prev_event = yield self.store.get_event(prev_event_id, allow_none=True)
-
- if event_id:
- event = yield self.store.get_event(event_id, allow_none=True)
-
- if not event and not prev_event:
- logger.debug("Neither event exists: %r %r", prev_event_id, event_id)
- defer.returnValue(None)
-
- prev_value = None
- value = None
-
- if prev_event:
- prev_value = prev_event.content.get(key_name)
-
- if event:
- value = event.content.get(key_name)
-
- logger.debug("prev_value: %r -> value: %r", prev_value, value)
-
- if value == public_value and prev_value != public_value:
- defer.returnValue(True)
- elif value != public_value and prev_value == public_value:
- defer.returnValue(False)
- else:
- defer.returnValue(None)
diff --git a/synapse/storage/schema/delta/51/stats.sql b/synapse/storage/schema/delta/51/stats.sql
index 32f8f21a13..9d0383da9c 100644
--- a/synapse/storage/schema/delta/51/stats.sql
+++ b/synapse/storage/schema/delta/51/stats.sql
@@ -26,7 +26,7 @@ CREATE TABLE user_stats (
ts BIGINT NOT NULL,
bucket_size INT NOT NULL,
sent_events INT NOT NULL,
- stored_events INT NOT NULL, -- delta or absolute?
+ local_events INT NOT NULL,
public_rooms INT NOT NULL,
private_rooms INT NOT NULL,
sent_file_count INT NOT NULL,
@@ -60,6 +60,7 @@ CREATE TABLE room_state (
encrypted BOOLEAN,
name TEXT NOT NULL,
topic TEXT NOT NULL,
+ avatar TEXT NOT NULL,
canonical_alias TEXT NOT NULL,
-- get aliases straight from the right table
);
diff --git a/synapse/storage/state.py b/synapse/storage/state.py
index 89a05c4618..e60d6ed486 100644
--- a/synapse/storage/state.py
+++ b/synapse/storage/state.py
@@ -89,6 +89,59 @@ class StateGroupWorkerStore(SQLBaseStore):
_get_current_state_ids_txn,
)
+ @defer.inlineCallbacks
+ def get_current_state(self, room_id, types):
+ """Get the current state event of a given type for a room based on the
+ current_state_events table. This may not be as up-to-date as the result
+ of doing a fresh state resolution as per state_handler.get_current_state
+ Args:
+ room_id (str)
+ types (list): List of (type, state_key) tuples which are used to
+ filter the state fetched. `state_key` may be None, which matches
+ any `state_key`
+ Returns:
+ deferred: dict of (type, state_key) -> event
+ """
+ def _get_current_state_txn(txn):
+ sql = """SELECT type, state_key, event_id FROM current_state_events
+ WHERE room_id = ? and %s"""
+ # Turns out that postgres doesn't like doing a list of OR's and
+ # is about 1000x slower, so we just issue a query for each specific
+ # type seperately.
+ if types:
+ clause_to_args = [
+ (
+ "AND type = ? AND state_key = ?",
+ (etype, state_key)
+ ) if state_key is not None else (
+ "AND type = ?",
+ (etype,)
+ )
+ for etype, state_key in types
+ ]
+ else:
+ # If types is None we fetch all the state, and so just use an
+ # empty where clause with no extra args.
+ clause_to_args = [("", [])]
+ for where_clause, where_args in clause_to_args:
+ args = [room_id]
+ args.extend(where_args)
+ txn.execute(sql % (where_clause,), args)
+ for row in txn:
+ typ, state_key, event_id = row
+ key = (typ, state_key)
+ results[intern_string(key)] = event_id
+ return results
+
+ results = self.runInteraction(
+ "get_current_state",
+ _get_current_state_txn,
+ )
+ for (key, event_id) in iteritems(results):
+ results[key] = yield self.store.get_event(event_id, allow_none=True)
+
+ defer.returnValue(results)
+
@cached(max_entries=10000, iterable=True)
def get_state_group_delta(self, state_group):
"""Given a state group try to return a previous group and a delta between
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index facbdba5c9..f6fc56cbe5 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -19,6 +19,37 @@ from .StateDeltasStore import StateDeltasStore
logger = logging.getLogger(__name__)
+# these fields track relative numbers (e.g. number of events sent in this timeslice)
+RELATIVE_STATS_FIELDS = {
+ "room": (
+ "sent_events"
+ ),
+ "user": (
+ "sent_events"
+ )
+}
+
+# these fields track rather than absolutes (e.g. total number of rooms on the server)
+ABSOLUTE_STATS_FIELDS = {
+ "room": (
+ "current_state_events",
+ "joined_members",
+ "invited_members",
+ "left_members",
+ "banned_members",
+ "state_events",
+ "local_events",
+ "remote_events",
+ ),
+ "user": (
+ "local_events",
+ "public_rooms",
+ "private_rooms",
+ "sent_file_count",
+ "sent_file_size",
+ ),
+}
+
class StatsStore(StateDeltasStore):
@@ -59,28 +90,7 @@ class StatsStore(StateDeltasStore):
desc="update_stats",
)
- # these fields track relative numbers (e.g. number of events sent in this timeslice)
- RELATIVE_STATS_FIELDS = {
- "room": {
- "sent_events": True
- }
- }
-
- # these fields track rather than absolutes (e.g. total number of rooms on the server)
- ABSOLUTE_STATS_FIELDS = {
- "room": (
- "current_state_events",
- "joined_members",
- "invited_members",
- "left_members",
- "banned_members",
- "state_events",
- "local_events",
- "remote_events",
- )
- }
-
- def update_stats_delta(self, stats_type, stats_id, field, value):
+ def update_stats_delta(self, ts, bucket_size, stats_type, stats_id, field, value):
def _update_stats_delta(txn):
table = "%s_stats" % stats_type
id_col = "%s_id" % stats_type
@@ -96,34 +106,46 @@ class StatsStore(StateDeltasStore):
rows = self.cursor_to_dict(txn)
if len(rows) == 0:
# silently skip as we don't have anything to apply a delta to yet.
+ # this tries to minimise any race between the initial sync and
+ # subsequent deltas arriving.
return
+ values = {
+ key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
+ }
+ values[id_col] = stats_id
+ values["ts"] = ts
+ values["bucket_size"] = bucket_size
+
latest_ts = rows[0]["ts"]
if ts != latest_ts:
# we have to copy our absolute counters over to the new entry.
self._simple_insert_txn(
txn,
table=table,
- values={
+ values=values
+ )
+
+ # actually update the new value
+ if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
+ self._simple_update_txn(
+ txn,
+ table=table,
+ keyvalues={
id_col: stats_id,
"ts": ts,
- key: rows[0][key]
- for key in ABSOLUTE_STATS_FIELDS[stats_type],
+ },
+ updatevalues={
+ field: value
}
)
-
- # actually update the new value
- self._simple_update_txn(
- txn,
- table=table,
- keyvalues={
- id_col: stats_id,
- "ts": ts,
- }
- updatevalues={
- field: value
- }
- )
+ else:
+ sql = (
+ "UPDATE %s "
+ " SET %s=%s+?"
+ " WHERE %s=? AND ts=?"
+ ) % (table, field, field, id_col)
+ txn.execute(sql, (value, stats_id, ts))
return self.runInteraction(
"update_stats_delta", _update_stats_delta
diff --git a/synapse/storage/user_directory.py b/synapse/storage/user_directory.py
index 6b3ed10cca..b210551c5a 100644
--- a/synapse/storage/user_directory.py
+++ b/synapse/storage/user_directory.py
@@ -25,7 +25,7 @@ from synapse.storage.engines import PostgresEngine, Sqlite3Engine
from synapse.types import get_domain_from_id, get_localpart_from_id
from synapse.util.caches.descriptors import cached, cachedInlineCallbacks
-from .StateDeltasStore import StateDeltasStore
+from .state_deltas import StateDeltasStore
logger = logging.getLogger(__name__)
|