diff --git a/changelog.d/5847.misc b/changelog.d/5847.misc
new file mode 100644
index 0000000000..f5e4203a25
--- /dev/null
+++ b/changelog.d/5847.misc
@@ -0,0 +1,2 @@
+Rework room and user statistics to separate current & historical rows, as well
+as track stats correctly.
\ No newline at end of file
diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md
new file mode 100644
index 0000000000..5b8944ba1c
--- /dev/null
+++ b/docs/room_and_user_statistics.md
@@ -0,0 +1,146 @@
+Room and User Statistics
+========================
+
+Synapse maintains room and user statistics (as well as a cache of room state),
+in various tables.
+
+These can be used for administrative purposes but are also used when generating
+the public room directory. If these tables get stale or out of sync (possibly
+after database corruption), you may wish to regenerate them.
+
+
+# Synapse Administrator Documentation
+
+## Various SQL scripts that you may find useful
+
+### Delete stats, including historical stats
+
+```sql
+DELETE FROM room_stats_current;
+DELETE FROM room_stats_historical;
+DELETE FROM user_stats_current;
+DELETE FROM user_stats_historical;
+```
+
+### Regenerate stats (all subjects)
+
+```sql
+BEGIN;
+ DELETE FROM stats_incremental_position;
+ INSERT INTO stats_incremental_position (
+ state_delta_stream_id,
+ total_events_min_stream_ordering,
+ total_events_max_stream_ordering,
+ is_background_contract
+ ) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE);
+COMMIT;
+
+DELETE FROM room_stats_current;
+DELETE FROM user_stats_current;
+```
+
+then follow the steps below for **'Regenerate stats (missing subjects only)'**
+
+### Regenerate stats (missing subjects only)
+
+```sql
+-- Set up staging tables
+-- we depend on current_state_events_membership because this is used
+-- in our counting.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+ ('populate_stats_prepare', '{}', 'current_state_events_membership');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
+
+-- Run through each user and update stats.
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_cleanup', '{}', 'populate_stats_process_users');
+```
+
+then **restart Synapse**.
+
+
+# Synapse Developer Documentation
+
+## High-Level Concepts
+
+### Definitions
+
+* **subject**: Something we are tracking stats about – currently a room or user.
+* **current row**: An entry for a subject in the appropriate current statistics
+ table. Each subject can have only one.
+* **historical row**: An entry for a subject in the appropriate historical
+ statistics table. Each subject can have any number of these.
+
+### Overview
+
+Stats are maintained as time series. There are two kinds of column:
+
+* absolute columns – where the value is correct for the time given by `end_ts`
+ in the stats row. (Imagine a line graph for these values)
+* per-slice columns – where the value corresponds to how many of the occurrences
+ occurred within the time slice given by `(end_ts − bucket_size)…end_ts`
+ or `start_ts…end_ts`. (Imagine a histogram for these values)
+
+Currently, only absolute columns are in use.
+
+Stats are maintained in two tables (for each type): current and historical.
+
+Current stats correspond to the present values. Each subject can only have one
+entry.
+
+Historical stats correspond to values in the past. Subjects may have multiple
+entries.
+
+## Concepts around the management of stats
+
+### current rows
+
+#### dirty current rows
+
+Current rows can be **dirty**, which means that they have changed since the
+latest historical row for the same subject.
+**Dirty** current rows possess an end timestamp, `end_ts`.
+
+#### old current rows and old collection
+
+When a (necessarily dirty) current row has an `end_ts` in the past, it is said
+to be **old**.
+Old current rows must be copied into a historical row, and cleared of their dirty
+status, before further statistics can be tracked for that subject.
+The process which does this is referred to as **old collection**.
+
+#### incomplete current rows
+
+There are also **incomplete** current rows, which are current rows that do not
+contain a full count yet – this is because they are waiting for the regeneration
+process to give them an initial count. Incomplete current rows DO NOT contain
+correct and up-to-date values. As such, *incomplete rows are not old-collected*.
+Instead, old incomplete rows will be extended so they are no longer old.
+
+### historical rows
+
+Historical rows can always be considered to be valid for the time slice and
+end time specified. (This, of course, assumes a lack of defects in the code
+to track the statistics, and assumes integrity of the database).
+
+Even still, there are two considerations that we may need to bear in mind:
+
+* historical rows will not exist for every time slice – they will be omitted
+ if there were no changes. In this case, the following assumptions can be
+ made to interpolate/recreate missing rows:
+ - absolute fields have the same values as in the preceding row
+ - per-slice fields are zero (`0`)
+* historical rows will not be retained forever – rows older than a configurable
+ time will be purged.
+
+#### purge
+
+The purging of historical rows is not yet implemented.
+
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..dee19cdcec 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
# The current position in the current_state_delta stream
self.pos = None
- # Guard to ensure we only process deltas one at a time
- self._is_processing = False
-
if hs.config.stats_enabled:
self.notifier.add_replication_callback(self.notify_new_event)
@@ -65,43 +62,58 @@ class StatsHandler(StateDeltasHandler):
if not self.hs.config.stats_enabled:
return
- if self._is_processing:
- return
+ lock = self.store.stats_delta_processing_lock
@defer.inlineCallbacks
def process():
try:
yield self._unsafe_process()
finally:
- self._is_processing = False
+ lock.release()
- self._is_processing = True
- run_as_background_process("stats.notify_new_event", process)
+ if lock.acquire(blocking=False):
+ # we only want to run this process one-at-a-time,
+ # and also, if the initial background updater wants us to keep out,
+ # we should respect that.
+ try:
+ run_as_background_process("stats.notify_new_event", process)
+ except: # noqa: E722 – re-raised so fine
+ lock.release()
+ raise
@defer.inlineCallbacks
def _unsafe_process(self):
# If self.pos is None then means we haven't fetched it from DB
- if self.pos is None:
- self.pos = yield self.store.get_stats_stream_pos()
+ if self.pos is None or None in self.pos.values():
+ self.pos = yield self.store.get_stats_positions()
- # If still None then the initial background update hasn't happened yet
- if self.pos is None:
+ # If still None then the initial background update hasn't started yet
+ if self.pos is None or None in self.pos.values():
return None
# Loop round handling deltas until we're up to date
- while True:
- with Measure(self.clock, "stats_delta"):
- deltas = yield self.store.get_current_state_deltas(self.pos)
+ with Measure(self.clock, "stats_delta"):
+ while True:
+ deltas = yield self.store.get_current_state_deltas(
+ self.pos["state_delta_stream_id"]
+ )
if not deltas:
- return
+ break
logger.info("Handling %d state deltas", len(deltas))
yield self._handle_deltas(deltas)
- self.pos = deltas[-1]["stream_id"]
- yield self.store.update_stats_stream_pos(self.pos)
+ self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+
+ event_processing_positions.labels("stats").set(
+ self.pos["state_delta_stream_id"]
+ )
+
+ if self.pos is not None:
+ yield self.store.update_stats_positions(self.pos)
- event_processing_positions.labels("stats").set(self.pos)
+ with Measure(self.clock, "stats_total_events"):
+ self.pos = yield self.store.incremental_update_total_events(self.pos)
@defer.inlineCallbacks
def _handle_deltas(self, deltas):
@@ -119,7 +131,7 @@ class StatsHandler(StateDeltasHandler):
logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
- token = yield self.store.get_earliest_token_for_room_stats(room_id)
+ token = yield self.store.get_earliest_token_for_stats("room", room_id)
# If the earliest token to begin from is larger than our current
# stream ID, skip processing this delta.
@@ -144,44 +156,56 @@ class StatsHandler(StateDeltasHandler):
# We use stream_pos here rather than fetch by event_id as event_id
# may be None
now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+ now = int(now) // 1000
- # quantise time to the nearest bucket
- now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+ room_stats_delta = {}
+ room_stats_complete = False
+
+ if prev_event_id is None:
+ # this state event doesn't overwrite another,
+ # so it is a new effective/current state event
+ room_stats_delta["current_state_events"] = (
+ room_stats_delta.get("current_state_events", 0) + 1
+ )
if typ == EventTypes.Member:
# we could use _get_key_change here but it's a bit inefficient
# given we're not testing for a specific result; might as well
# just grab the prev_membership and membership strings and
# compare them.
- prev_event_content = {}
+ # We take None rather than leave as a previous membership
+ # in the absence of a previous event because we do not want to
+ # reduce the leave count when a new-to-the-room user joins.
+ prev_membership = None
if prev_event_id is not None:
prev_event = yield self.store.get_event(
prev_event_id, allow_none=True
)
if prev_event:
prev_event_content = prev_event.content
+ prev_membership = prev_event_content.get(
+ "membership", Membership.LEAVE
+ )
membership = event_content.get("membership", Membership.LEAVE)
- prev_membership = prev_event_content.get("membership", Membership.LEAVE)
- if prev_membership == membership:
- continue
-
- if prev_membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "joined_members", -1
+ if prev_membership is None:
+ logger.debug("No previous membership for this user.")
+ elif prev_membership == Membership.JOIN:
+ room_stats_delta["joined_members"] = (
+ room_stats_delta.get("joined_members", 0) - 1
)
elif prev_membership == Membership.INVITE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "invited_members", -1
+ room_stats_delta["invited_members"] = (
+ room_stats_delta.get("invited_members", 0) - 1
)
elif prev_membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "left_members", -1
+ room_stats_delta["left_members"] = (
+ room_stats_delta.get("left_members", 0) - 1
)
elif prev_membership == Membership.BAN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "banned_members", -1
+ room_stats_delta["banned_members"] = (
+ room_stats_delta.get("banned_members", 0) - 1
)
else:
err = "%s is not a valid prev_membership" % (repr(prev_membership),)
@@ -189,20 +213,20 @@ class StatsHandler(StateDeltasHandler):
raise ValueError(err)
if membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "joined_members", +1
+ room_stats_delta["joined_members"] = (
+ room_stats_delta.get("joined_members", 0) + 1
)
elif membership == Membership.INVITE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "invited_members", +1
+ room_stats_delta["invited_members"] = (
+ room_stats_delta.get("invited_members", 0) + 1
)
elif membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now, "room", room_id, "left_members", +1
+ room_stats_delta["left_members"] = (
+ room_stats_delta.get("left_members", 0) + 1
)
elif membership == Membership.BAN:
- yield self.store.update_stats_delta(
- now, "room", room_id, "banned_members", +1
+ room_stats_delta["banned_members"] = (
+ room_stats_delta.get("banned_members", 0) + 1
)
else:
err = "%s is not a valid membership" % (repr(membership),)
@@ -210,26 +234,19 @@ class StatsHandler(StateDeltasHandler):
raise ValueError(err)
user_id = state_key
- if self.is_mine_id(user_id):
+ if self.is_mine_id(user_id) and membership in (
+ Membership.JOIN,
+ Membership.LEAVE,
+ ):
# update user_stats as it's one of our users
public = yield self._is_public_room(room_id)
- if membership == Membership.LEAVE:
- yield self.store.update_stats_delta(
- now,
- "user",
- user_id,
- "public_rooms" if public else "private_rooms",
- -1,
- )
- elif membership == Membership.JOIN:
- yield self.store.update_stats_delta(
- now,
- "user",
- user_id,
- "public_rooms" if public else "private_rooms",
- +1,
- )
+ field = "public_rooms" if public else "private_rooms"
+ delta = +1 if membership == Membership.JOIN else -1
+
+ yield self.store.update_stats_delta(
+ now, "user", user_id, {field: delta}
+ )
elif typ == EventTypes.Create:
# Newly created room. Add it with all blank portions.
@@ -246,28 +263,46 @@ class StatsHandler(StateDeltasHandler):
},
)
+ room_stats_complete = True
+
elif typ == EventTypes.JoinRules:
+ old_room_state = yield self.store.get_room_state(room_id)
yield self.store.update_room_state(
room_id, {"join_rules": event_content.get("join_rule")}
)
- is_public = yield self._get_key_change(
- prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+ # whether the room would be public anyway,
+ # because of history_visibility
+ other_field_gives_publicity = (
+ old_room_state["history_visibility"] == "world_readable"
)
- if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
+
+ if not other_field_gives_publicity:
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.RoomHistoryVisibility:
+ old_room_state = yield self.store.get_room_state(room_id)
yield self.store.update_room_state(
room_id,
{"history_visibility": event_content.get("history_visibility")},
)
- is_public = yield self._get_key_change(
- prev_event_id, event_id, "history_visibility", "world_readable"
+ # whether the room would be public anyway,
+ # because of join_rule
+ other_field_gives_publicity = (
+ old_room_state["join_rules"] == JoinRules.PUBLIC
)
- if is_public is not None:
- yield self.update_public_room_stats(now, room_id, is_public)
+
+ if not other_field_gives_publicity:
+ is_public = yield self._get_key_change(
+ prev_event_id, event_id, "history_visibility", "world_readable"
+ )
+ if is_public is not None:
+ yield self.update_public_room_stats(now, room_id, is_public)
elif typ == EventTypes.Encryption:
yield self.store.update_room_state(
@@ -290,6 +325,20 @@ class StatsHandler(StateDeltasHandler):
room_id, {"canonical_alias": event_content.get("alias")}
)
+ if room_stats_complete:
+ yield self.store.update_stats_delta(
+ now,
+ "room",
+ room_id,
+ room_stats_delta,
+ complete_with_stream_id=stream_id,
+ )
+
+ elif len(room_stats_delta) > 0:
+ yield self.store.update_stats_delta(
+ now, "room", room_id, room_stats_delta
+ )
+
@defer.inlineCallbacks
def update_public_room_stats(self, ts, room_id, is_public):
"""
@@ -308,10 +357,13 @@ class StatsHandler(StateDeltasHandler):
for user_id in user_ids:
if self.hs.is_mine(UserID.from_string(user_id)):
yield self.store.update_stats_delta(
- ts, "user", user_id, "public_rooms", +1 if is_public else -1
- )
- yield self.store.update_stats_delta(
- ts, "user", user_id, "private_rooms", -1 if is_public else +1
+ ts,
+ "user",
+ user_id,
+ {
+ "public_rooms": +1 if is_public else -1,
+ "private_rooms": -1 if is_public else +1,
+ },
)
@defer.inlineCallbacks
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 55e4e84d71..d8e1864a08 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -845,6 +845,17 @@ class RegistrationStore(
(user_id_obj.localpart, create_profile_with_displayname),
)
+ if self.hs.config.stats_enabled:
+ # we create a new completed user statistics row
+
+ # we don't strictly need current_token since this user really can't
+ # have any state deltas before now (as it is a new user), but still,
+ # we include it for completeness.
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+ self._update_stats_delta_txn(
+ txn, now, "user", user_id, {}, complete_with_stream_id=current_token
+ )
+
self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
txn.call_after(self.is_guest.invalidate, (user_id,))
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
new file mode 100644
index 0000000000..d987479363
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -0,0 +1,168 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+ 'populate_stats_createtables',
+ 'populate_stats_process_rooms',
+ 'populate_stats_cleanup',
+ 'regen_stats'
+);
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+CREATE TABLE IF NOT EXISTS stats_incremental_position (
+ -- the stream_id of the last-processed state delta
+ state_delta_stream_id BIGINT,
+
+ -- the stream_ordering of the last-processed backfilled event
+ -- (this is negative)
+ total_events_min_stream_ordering BIGINT,
+
+ -- the stream_ordering of the last-processed normally-created event
+ -- (this is positive)
+ total_events_max_stream_ordering BIGINT,
+
+ -- If true, this represents the contract agreed upon by the background
+ -- population processor.
+ -- If false, this is suitable for use by the delta/incremental processor.
+ is_background_contract BOOLEAN NOT NULL PRIMARY KEY
+);
+
+-- insert a null row and make sure it is the only one.
+DELETE FROM stats_incremental_position;
+INSERT INTO stats_incremental_position (
+ state_delta_stream_id,
+ total_events_min_stream_ordering,
+ total_events_max_stream_ordering,
+ is_background_contract
+) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
+
+-- represents PRESENT room statistics for a room
+CREATE TABLE IF NOT EXISTS room_stats_current (
+ room_id TEXT NOT NULL PRIMARY KEY,
+
+ -- These starts cover the time from start_ts…end_ts (in seconds).
+ -- Note that end_ts is quantised, and start_ts usually so.
+ start_ts BIGINT,
+ end_ts BIGINT,
+
+ current_state_events INT NOT NULL DEFAULT 0,
+ total_events INT NOT NULL DEFAULT 0,
+ joined_members INT NOT NULL DEFAULT 0,
+ invited_members INT NOT NULL DEFAULT 0,
+ left_members INT NOT NULL DEFAULT 0,
+ banned_members INT NOT NULL DEFAULT 0,
+
+ -- If initial background count is still to be performed: NULL
+ -- If initial background count has been performed: the maximum delta stream
+ -- position that this row takes into account.
+ completed_delta_stream_id BIGINT,
+
+ CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
+);
+
+
+-- represents HISTORICAL room statistics for a room
+CREATE TABLE IF NOT EXISTS room_stats_historical (
+ room_id TEXT NOT NULL,
+ -- These starts cover the time from (end_ts - bucket_size)…end_ts (in seconds).
+ -- Note that end_ts is quantised, and start_ts usually so.
+ end_ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+
+ current_state_events INT NOT NULL,
+ total_events INT NOT NULL,
+ joined_members INT NOT NULL,
+ invited_members INT NOT NULL,
+ left_members INT NOT NULL,
+ banned_members INT NOT NULL,
+
+ PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular room.)
+
+
+-- represents PRESENT statistics for a user
+CREATE TABLE IF NOT EXISTS user_stats_current (
+ user_id TEXT NOT NULL PRIMARY KEY,
+
+ -- The timestamp that represents the start of the
+ start_ts BIGINT,
+ end_ts BIGINT,
+
+ public_rooms INT DEFAULT 0 NOT NULL,
+ private_rooms INT DEFAULT 0 NOT NULL,
+
+ -- If initial background count is still to be performed: NULL
+ -- If initial background count has been performed: the maximum delta stream
+ -- position that this row takes into account.
+ completed_delta_stream_id BIGINT
+);
+
+-- represents HISTORICAL statistics for a user
+CREATE TABLE IF NOT EXISTS user_stats_historical (
+ user_id TEXT NOT NULL,
+ end_ts BIGINT NOT NULL,
+ bucket_size INT NOT NULL,
+
+ public_rooms INT NOT NULL,
+ private_rooms INT NOT NULL,
+
+ PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular user.)
+
+
+-- Set up staging tables
+-- we depend on current_state_events_membership because this is used
+-- in our counting.
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_prepare', '{}', 'current_state_events_membership');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
+
+-- Run through each user and update stats.
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+ ('populate_stats_cleanup', '{}', 'populate_stats_process_users');
diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py
new file mode 100644
index 0000000000..c5e3208adb
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This schema delta will be run after 'stats_separated1.sql' due to lexicographic
+# ordering. Note that it MUST be so.
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+
+def _run_create_generic(stats_type, cursor, database_engine):
+ """
+ Creates the pertinent (partial, if supported) indices for one kind of stats.
+ Args:
+ stats_type: "room" or "user" – the type of stats
+ cursor: Database Cursor
+ database_engine: Database Engine
+ """
+ if isinstance(database_engine, Sqlite3Engine):
+ # even though SQLite >= 3.8 can support partial indices, we won't enable
+ # them, in case the SQLite database may be later used on another system.
+ # It's also the case that SQLite is only likely to be used in small
+ # deployments or testing, where the optimisations gained by use of a
+ # partial index are not a big concern.
+ cursor.execute(
+ """
+ CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
+ ON %s_stats_current (end_ts);
+ """
+ % (stats_type, stats_type)
+ )
+ cursor.execute(
+ """
+ CREATE INDEX IF NOT EXISTS %s_stats_not_complete
+ ON %s_stats_current (completed_delta_stream_id, %s_id);
+ """
+ % (stats_type, stats_type, stats_type)
+ )
+ elif isinstance(database_engine, PostgresEngine):
+ # This partial index helps us with finding dirty stats rows
+ cursor.execute(
+ """
+ CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
+ ON %s_stats_current (end_ts)
+ WHERE end_ts IS NOT NULL;
+ """
+ % (stats_type, stats_type)
+ )
+ # This partial index helps us with old collection
+ cursor.execute(
+ """
+ CREATE INDEX IF NOT EXISTS %s_stats_not_complete
+ ON %s_stats_current (%s_id)
+ WHERE completed_delta_stream_id IS NULL;
+ """
+ % (stats_type, stats_type, stats_type)
+ )
+ else:
+ raise NotImplementedError("Unknown database engine.")
+
+
+def run_create(cursor, database_engine):
+ """
+ This function is called as part of the schema delta.
+ It will create indices – partial, if supported – for the new 'separated'
+ room & user statistics.
+ """
+ _run_create_generic("room", cursor, database_engine)
+ _run_create_generic("user", cursor, database_engine)
+
+
+def run_upgrade(cur, database_engine, config):
+ """
+ This function is run on a database upgrade (of a non-empty database).
+ We have no need to do anything specific here.
+ """
+ pass
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
index 5fdb442104..75b2217c97 100644
--- a/synapse/storage/state_deltas.py
+++ b/synapse/storage/state_deltas.py
@@ -31,7 +31,7 @@ class StateDeltasStore(SQLBaseStore):
- state_key (str):
- event_id (str|None): new event_id for this state key. None if the
state has been deleted.
- - prev_event_id (str|None): previous event_id for this state key. None
+ - prev_event_id (str): previous event_id for this state key. None
if it's new state.
Args:
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..b6959f7967 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
# -*- coding: utf-8 -*-
# Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
@@ -14,10 +15,13 @@
# limitations under the License.
import logging
+from itertools import chain
+from threading import Lock
from twisted.internet import defer
from synapse.api.constants import EventTypes, Membership
+from synapse.storage.engines import Sqlite3Engine
from synapse.storage.prepare_database import get_statements
from synapse.storage.state_deltas import StateDeltasStore
from synapse.util.caches.descriptors import cached
@@ -32,14 +36,21 @@ ABSOLUTE_STATS_FIELDS = {
"invited_members",
"left_members",
"banned_members",
- "state_events",
+ "total_events",
),
"user": ("public_rooms", "private_rooms"),
}
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
-TEMP_TABLE = "_temp_populate_stats"
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+
+class OldCollectionRequired(Exception):
+ """ Signal that we need to collect old stats rows and retry. """
+
+ pass
class StatsStore(StateDeltasStore):
@@ -51,121 +62,320 @@ class StatsStore(StateDeltasStore):
self.stats_enabled = hs.config.stats_enabled
self.stats_bucket_size = hs.config.stats_bucket_size
+ self.stats_delta_processing_lock = Lock()
+
self.register_background_update_handler(
- "populate_stats_createtables", self._populate_stats_createtables
+ "populate_stats_prepare", self._populate_stats_prepare
)
self.register_background_update_handler(
"populate_stats_process_rooms", self._populate_stats_process_rooms
)
self.register_background_update_handler(
- "populate_stats_cleanup", self._populate_stats_cleanup
+ "populate_stats_process_users", self._populate_stats_process_users
+ )
+ # we no longer need to perform clean-up, but we will give ourselves
+ # the potential to reintroduce it in the future – so documentation
+ # will still encourage the use of this no-op handler.
+ self.register_noop_background_update("populate_stats_cleanup")
+
+ def quantise_stats_time(self, ts):
+ """
+ Quantises a timestamp to be a multiple of the bucket size.
+
+ Args:
+ ts: the timestamp to quantise, in seconds since the Unix Epoch
+
+ Returns:
+ a timestamp which
+ - is divisible by the bucket size;
+ - is no later than `ts`; and
+ - is the largest such timestamp.
+ """
+ return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
+ @defer.inlineCallbacks
+ def _unwedge_incremental_processor(self, forced_promise):
+ """
+ Make a promise about what this initial background count will handle,
+ so that we can allow the incremental processor to start doing things
+ right away – 'unwedging' it.
+ """
+
+ if forced_promise is None:
+ promised_stats_delta_pos = (
+ yield self.get_max_stream_id_in_current_state_deltas()
+ )
+
+ promised_max = self.get_room_max_stream_ordering()
+ promised_min = self.get_room_min_stream_ordering()
+
+ promised_positions = {
+ "state_delta_stream_id": promised_stats_delta_pos,
+ "total_events_min_stream_ordering": promised_min,
+ "total_events_max_stream_ordering": promised_max,
+ }
+ else:
+ promised_positions = forced_promise
+
+ # this stores it for our reference later
+ yield self.update_stats_positions(
+ promised_positions, for_initial_processor=True
+ )
+
+ # this unwedges the incremental processor
+ yield self.update_stats_positions(
+ promised_positions, for_initial_processor=False
)
+ # with the delta processor unwedged, now let it catch up in case
+ # anything was missed during the wedge period
+ self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
+
@defer.inlineCallbacks
- def _populate_stats_createtables(self, progress, batch_size):
+ def _populate_stats_prepare(self, progress, batch_size):
+ """
+ This is a background update, which prepares the database for
+ statistics regeneration.
+ """
if not self.stats_enabled:
- yield self._end_background_update("populate_stats_createtables")
+ yield self._end_background_update("populate_stats_prepare")
return 1
- # Get all the rooms that we want to process.
- def _make_staging_area(txn):
- # Create the temporary tables
- stmts = get_statements(
+ def _wedge_incremental_processor(txn):
+ """
+ Wedge the incremental processor (by setting its positions to NULL),
+ and return its previous positions – atomically.
+ """
+
+ with self.stats_delta_processing_lock:
+ old = self._get_stats_positions_txn(txn, for_initial_processor=False)
+ self._update_stats_positions_txn(txn, None, for_initial_processor=False)
+
+ return old
+
+ def _make_skeletons(txn):
+ """
+ Get all the rooms and users that we want to process, and create
+ 'skeletons' (incomplete _stats_current rows) for them, if they do
+ not already have a row.
+ """
+
+ if isinstance(self.database_engine, Sqlite3Engine):
+ sqls = """
+ INSERT OR IGNORE INTO room_stats_current (room_id)
+ SELECT room_id FROM rooms;
+
+ INSERT OR IGNORE INTO user_stats_current (user_id)
+ SELECT name AS user_id FROM users;
+ """
+ else:
+ sqls = """
+ INSERT INTO room_stats_current (room_id)
+ SELECT room_id FROM rooms
+ ON CONFLICT DO NOTHING;
+
+ INSERT INTO user_stats_current (user_id)
+ SELECT name AS user_id FROM users
+ ON CONFLICT DO NOTHING;
"""
- -- We just recreate the table, we'll be reinserting the
- -- correct entries again later anyway.
- DROP TABLE IF EXISTS {temp}_rooms;
-
- CREATE TABLE IF NOT EXISTS {temp}_rooms(
- room_id TEXT NOT NULL,
- events BIGINT NOT NULL
- );
-
- CREATE INDEX {temp}_rooms_events
- ON {temp}_rooms(events);
- CREATE INDEX {temp}_rooms_id
- ON {temp}_rooms(room_id);
- """.format(
- temp=TEMP_TABLE
- ).splitlines()
- )
- for statement in stmts:
+ for statement in get_statements(sqls.splitlines()):
txn.execute(statement)
- sql = (
- "CREATE TABLE IF NOT EXISTS "
- + TEMP_TABLE
- + "_position(position TEXT NOT NULL)"
- )
- txn.execute(sql)
+ def _delete_dirty_skeletons(txn):
+ """
+ Delete pre-existing rows which are incomplete.
+ """
+ sqls = """
+ DELETE FROM room_stats_current
+ WHERE completed_delta_stream_id IS NULL;
- # Get rooms we want to process from the database, only adding
- # those that we haven't (i.e. those not in room_stats_earliest_token)
- sql = """
- INSERT INTO %s_rooms (room_id, events)
- SELECT c.room_id, count(*) FROM current_state_events AS c
- LEFT JOIN room_stats_earliest_token AS t USING (room_id)
- WHERE t.room_id IS NULL
- GROUP BY c.room_id
- """ % (
- TEMP_TABLE,
- )
- txn.execute(sql)
+ DELETE FROM user_stats_current
+ WHERE completed_delta_stream_id IS NULL;
+ """
- new_pos = yield self.get_max_stream_id_in_current_state_deltas()
- yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
- yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
- self.get_earliest_token_for_room_stats.invalidate_all()
+ for statement in get_statements(sqls.splitlines()):
+ txn.execute(statement)
- yield self._end_background_update("populate_stats_createtables")
+ # first wedge the incremental processor and reset our promise
+ old_positions = yield self.runInteraction(
+ "populate_stats_wedge", _wedge_incremental_processor
+ )
+
+ if None in old_positions.values():
+ old_positions = None
+
+ # with the incremental processor wedged, we delete dirty skeleton rows
+ # since we don't want to double-count them.
+ yield self.runInteraction(
+ "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
+ )
+
+ yield self._unwedge_incremental_processor(old_positions)
+
+ yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
+ self.get_earliest_token_for_stats.invalidate_all()
+
+ yield self._end_background_update("populate_stats_prepare")
return 1
@defer.inlineCallbacks
- def _populate_stats_cleanup(self, progress, batch_size):
+ def _populate_stats_process_users(self, progress, batch_size):
"""
- Update the user directory stream position, then clean up the old tables.
+ This is a background update which regenerates statistics for users.
"""
if not self.stats_enabled:
- yield self._end_background_update("populate_stats_cleanup")
+ yield self._end_background_update("populate_stats_process_users")
+ return 1
+
+ def _get_next_batch(txn):
+ # Only fetch 250 users, so we don't fetch too many at once, even
+ # if those 250 users have less than batch_size state events.
+ sql = """
+ SELECT user_id FROM user_stats_current
+ WHERE completed_delta_stream_id IS NULL
+ LIMIT 250
+ """
+ txn.execute(sql)
+ users_to_work_on = txn.fetchall()
+
+ if not users_to_work_on:
+ return None
+
+ # Get how many are left to process, so we can give status on how
+ # far we are in processing
+ txn.execute(
+ "SELECT COUNT(*) FROM room_stats_current"
+ " WHERE completed_delta_stream_id IS NULL"
+ )
+ progress["remaining"] = txn.fetchone()[0]
+
+ return users_to_work_on
+
+ users_to_work_on = yield self.runInteraction(
+ "populate_stats_users_get_batch", _get_next_batch
+ )
+
+ # No more users -- complete the transaction.
+ if not users_to_work_on:
+ yield self._end_background_update("populate_stats_process_users")
return 1
- position = yield self._simple_select_one_onecol(
- TEMP_TABLE + "_position", None, "position"
+ logger.info(
+ "Processing the next %d users of %d remaining",
+ len(users_to_work_on),
+ progress["remaining"],
)
- yield self.update_stats_stream_pos(position)
- def _delete_staging_area(txn):
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
- txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+ processed_membership_count = 0
- yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+ promised_positions = yield self.get_stats_positions(for_initial_processor=True)
- yield self._end_background_update("populate_stats_cleanup")
- return 1
+ if None in promised_positions:
+ logger.error(
+ "There is a None in promised_positions;"
+ " dependency task must not have been run."
+ " promised_positions: %s",
+ promised_positions,
+ )
+ yield self._end_background_update("populate_stats_process_users")
+ return 1
+
+ for (user_id,) in users_to_work_on:
+ now = self.hs.get_reactor().seconds()
+
+ def _process_user(txn):
+ # Get the current token
+ current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+ sql = """
+ SELECT
+ (
+ join_rules = 'public'
+ OR history_visibility = 'world_readable'
+ ) AS is_public,
+ COUNT(*) AS count
+ FROM room_memberships
+ JOIN room_state USING (room_id)
+ WHERE
+ user_id = ? AND membership = 'join'
+ GROUP BY is_public
+ """
+ txn.execute(sql, (user_id,))
+ room_counts_by_publicness = dict(txn.fetchall())
+
+ try:
+ self._update_stats_delta_txn(
+ txn,
+ now,
+ "user",
+ user_id,
+ {},
+ complete_with_stream_id=current_token,
+ absolute_fields={
+ # these are counted absolutely because it is
+ # more difficult to count them from the promised time,
+ # because counting them now can use the quick lookup
+ # tables.
+ "public_rooms": room_counts_by_publicness.get(True, 0),
+ "private_rooms": room_counts_by_publicness.get(False, 0),
+ },
+ )
+ except OldCollectionRequired:
+ # this can't (shouldn't) actually happen
+ # since we only run the background update for incomplete rows
+ # and incomplete rows can never be old.
+ # However, if it does, the most graceful handling is just to
+ # ignore it – and carry on processing other users.
+ logger.error(
+ "Supposedly Impossible: OldCollectionRequired in initial"
+ " background update, for user ID %s",
+ user_id,
+ exc_info=True,
+ )
+ pass
+
+ # we use this count for rate-limiting
+ return sum(room_counts_by_publicness.values())
+
+ processed_membership_count += yield self.runInteraction(
+ "update_user_stats", _process_user
+ )
+
+ # Update the remaining counter.
+ progress["remaining"] -= 1
+
+ if processed_membership_count > batch_size:
+ # Don't process any more users, we've hit our batch size.
+ return processed_membership_count
+
+ yield self.runInteraction(
+ "populate_stats",
+ self._background_update_progress_txn,
+ "populate_stats_process_users",
+ progress,
+ )
+
+ return processed_membership_count
@defer.inlineCallbacks
def _populate_stats_process_rooms(self, progress, batch_size):
-
+ """
+ This is a background update which regenerates statistics for rooms.
+ """
if not self.stats_enabled:
yield self._end_background_update("populate_stats_process_rooms")
return 1
- # If we don't have progress filed, delete everything.
- if not progress:
- yield self.delete_all_stats()
-
def _get_next_batch(txn):
# Only fetch 250 rooms, so we don't fetch too many at once, even
# if those 250 rooms have less than batch_size state events.
sql = """
- SELECT room_id, events FROM %s_rooms
- ORDER BY events DESC
+ SELECT room_id FROM room_stats_current
+ WHERE completed_delta_stream_id IS NULL
LIMIT 250
- """ % (
- TEMP_TABLE,
- )
+ """
txn.execute(sql)
rooms_to_work_on = txn.fetchall()
@@ -174,13 +384,16 @@ class StatsStore(StateDeltasStore):
# Get how many are left to process, so we can give status on how
# far we are in processing
- txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+ txn.execute(
+ "SELECT COUNT(*) FROM room_stats_current"
+ " WHERE completed_delta_stream_id IS NULL"
+ )
progress["remaining"] = txn.fetchone()[0]
return rooms_to_work_on
rooms_to_work_on = yield self.runInteraction(
- "populate_stats_temp_read", _get_next_batch
+ "populate_stats_rooms_get_batch", _get_next_batch
)
# No more rooms -- complete the transaction.
@@ -197,8 +410,19 @@ class StatsStore(StateDeltasStore):
# Number of state events we've processed by going through each room
processed_event_count = 0
- for room_id, event_count in rooms_to_work_on:
+ promised_positions = yield self.get_stats_positions(for_initial_processor=True)
+
+ if None in promised_positions:
+ logger.error(
+ "There is a None in promised_positions;"
+ " dependency task must not have been run."
+ " promised_positions: %s",
+ promised_positions,
+ )
+ yield self._end_background_update("populate_stats_process_rooms")
+ return 1
+ for (room_id,) in rooms_to_work_on:
current_state_ids = yield self.get_current_state_ids(room_id)
join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
@@ -248,11 +472,7 @@ class StatsStore(StateDeltasStore):
now = self.hs.get_reactor().seconds()
- # quantise time to the nearest bucket
- now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
def _fetch_data(txn):
-
# Get the current token of the room
current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
@@ -260,82 +480,234 @@ class StatsStore(StateDeltasStore):
membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
- total_state_events = self._get_total_state_event_counts_txn(
- txn, room_id
- )
-
- self._update_stats_txn(
+ room_total_event_count = self._count_events_in_room_txn(
txn,
- "room",
room_id,
- now,
- {
- "bucket_size": self.stats_bucket_size,
- "current_state_events": current_state_events,
- "joined_members": membership_counts.get(Membership.JOIN, 0),
- "invited_members": membership_counts.get(Membership.INVITE, 0),
- "left_members": membership_counts.get(Membership.LEAVE, 0),
- "banned_members": membership_counts.get(Membership.BAN, 0),
- "state_events": total_state_events,
- },
- )
- self._simple_insert_txn(
- txn,
- "room_stats_earliest_token",
- {"room_id": room_id, "token": current_token},
+ promised_positions["total_events_min_stream_ordering"],
+ promised_positions["total_events_max_stream_ordering"],
)
- # We've finished a room. Delete it from the table.
- self._simple_delete_one_txn(
- txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
- )
-
- yield self.runInteraction("update_room_stats", _fetch_data)
+ try:
+ self._update_stats_delta_txn(
+ txn,
+ now,
+ "room",
+ room_id,
+ {"total_events": room_total_event_count},
+ complete_with_stream_id=current_token,
+ absolute_fields={
+ # these are counted absolutely because it is
+ # more difficult to count them from the promised time,
+ # because counting them now can use the quick lookup
+ # tables.
+ "current_state_events": current_state_events,
+ "joined_members": membership_counts.get(Membership.JOIN, 0),
+ "invited_members": membership_counts.get(
+ Membership.INVITE, 0
+ ),
+ "left_members": membership_counts.get(Membership.LEAVE, 0),
+ "banned_members": membership_counts.get(Membership.BAN, 0),
+ },
+ )
+ except OldCollectionRequired:
+ # this can't (shouldn't) actually happen
+ # since we only run the background update for incomplete rows
+ # and incomplete rows can never be old.
+ # However, if it does, the most graceful handling is just to
+ # ignore it – and carry on processing other rooms.
+ logger.error(
+ "Supposedly Impossible: OldCollectionRequired in initial"
+ " background update, for room ID %s",
+ room_id,
+ exc_info=True,
+ )
+ pass
+
+ # we use this count for rate-limiting
+ return room_total_event_count
+
+ room_event_count = yield self.runInteraction(
+ "update_room_stats", _fetch_data
+ )
# Update the remaining counter.
progress["remaining"] -= 1
- yield self.runInteraction(
- "populate_stats",
- self._background_update_progress_txn,
- "populate_stats_process_rooms",
- progress,
- )
- processed_event_count += event_count
+ processed_event_count += room_event_count
if processed_event_count > batch_size:
# Don't process any more rooms, we've hit our batch size.
return processed_event_count
+ yield self.runInteraction(
+ "populate_stats",
+ self._background_update_progress_txn,
+ "populate_stats_process_rooms",
+ progress,
+ )
+
return processed_event_count
- def delete_all_stats(self):
+ def update_total_event_count_between_txn(self, txn, low_pos, high_pos):
"""
- Delete all statistics records.
+ Updates the total_events counts for rooms, in a range of stream_orderings.
+
+ Inclusivity of low_pos and high_pos is dependent upon their signs.
+ This makes it intuitive to use this function for both backfilled
+ and non-backfilled events.
+
+ Examples:
+ (low, high) → (kind)
+ (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+ (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+ (-7, 7) → -7 <= … <= 7 (both)
+
+ Args:
+ txn: Database transaction. It is assumed that you will have one,
+ since you probably want to update pointers at the same time.
+ low_pos: Low stream ordering
+ high_pos: High stream ordering
"""
- def _delete_all_stats_txn(txn):
- txn.execute("DELETE FROM room_state")
- txn.execute("DELETE FROM room_stats")
- txn.execute("DELETE FROM room_stats_earliest_token")
- txn.execute("DELETE FROM user_stats")
+ if low_pos >= high_pos:
+ # nothing to do here.
+ return
- return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
+ now = self.hs.get_reactor().seconds()
- def get_stats_stream_pos(self):
- return self._simple_select_one_onecol(
- table="stats_stream_pos",
- keyvalues={},
- retcol="stream_id",
- desc="stats_stream_pos",
+ # we choose comparators based on the signs
+ low_comparator = "<=" if low_pos < 0 else "<"
+ high_comparator = "<" if high_pos < 0 else "<="
+
+ sql = """
+ SELECT room_id, COUNT(*) AS new_events
+ FROM events
+ WHERE ? %s stream_ordering AND stream_ordering %s ?
+ GROUP BY room_id
+ """ % (
+ low_comparator,
+ high_comparator,
+ )
+
+ txn.execute(sql, (low_pos, high_pos))
+
+ for room_id, new_events in txn.fetchall():
+ while True:
+ try:
+ self._update_stats_delta_txn(
+ txn, now, "room", room_id, {"total_events": new_events}
+ )
+ break
+ except OldCollectionRequired:
+ self._collect_old_txn(txn, "room")
+ continue
+
+ def _count_events_in_room_txn(self, txn, room_id, low_token, high_token):
+ """
+ Count the number of events in a room between two tokens, inclusive.
+ Args:
+ txn (cursor): The database
+ room_id (str): The ID of the room to count events for
+ low_token (int): the minimum stream ordering to count
+ high_token (int): the maximum stream ordering to count
+
+ Returns (int):
+ the number of events
+ """
+
+ sql = """
+ SELECT COUNT(*) AS num_events
+ FROM events
+ WHERE room_id = ?
+ AND ? <= stream_ordering
+ AND stream_ordering <= ?
+ """
+ txn.execute(sql, (room_id, low_token, high_token))
+ return txn.fetchone()[0]
+
+ def get_stats_positions(self, for_initial_processor=False):
+ """
+ Returns the stats processor positions.
+
+ Args:
+ for_initial_processor (bool, optional): If true, returns the position
+ promised by the latest stats regeneration, rather than the current
+ incremental processor's position.
+ Otherwise (if false), return the incremental processor's position.
+
+ Returns (dict):
+ Dict containing :-
+ state_delta_stream_id: stream_id of last-processed state delta
+ total_events_min_stream_ordering: stream_ordering of latest-processed
+ backfilled event, in the context of total_events counting.
+ total_events_max_stream_ordering: stream_ordering of latest-processed
+ non-backfilled event, in the context of total_events counting.
+ """
+ return self._simple_select_one(
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ retcols=(
+ "state_delta_stream_id",
+ "total_events_min_stream_ordering",
+ "total_events_max_stream_ordering",
+ ),
+ desc="stats_incremental_position",
+ )
+
+ def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+ """
+ See L{get_stats_positions}.
+
+ Args:
+ txn (cursor): Database cursor
+ """
+ return self._simple_select_one_txn(
+ txn=txn,
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ retcols=(
+ "state_delta_stream_id",
+ "total_events_min_stream_ordering",
+ "total_events_max_stream_ordering",
+ ),
)
- def update_stats_stream_pos(self, stream_id):
+ def update_stats_positions(self, positions, for_initial_processor=False):
+ """
+ Updates the stats processor positions.
+
+ Args:
+ positions: See L{get_stats_positions}
+ for_initial_processor: See L{get_stats_positions}
+ """
+ if positions is None:
+ positions = {
+ "state_delta_stream_id": None,
+ "total_events_min_stream_ordering": None,
+ "total_events_max_stream_ordering": None,
+ }
return self._simple_update_one(
- table="stats_stream_pos",
- keyvalues={},
- updatevalues={"stream_id": stream_id},
- desc="update_stats_stream_pos",
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ updatevalues=positions,
+ desc="update_stats_incremental_position",
+ )
+
+ def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
+ """
+ See L{update_stats_positions}
+ """
+ if positions is None:
+ positions = {
+ "state_delta_stream_id": None,
+ "total_events_min_stream_ordering": None,
+ "total_events_max_stream_ordering": None,
+ }
+ return self._simple_update_one_txn(
+ txn,
+ table="stats_incremental_position",
+ keyvalues={"is_background_contract": for_initial_processor},
+ updatevalues=positions,
)
def update_room_state(self, room_id, fields):
@@ -367,36 +739,109 @@ class StatsStore(StateDeltasStore):
desc="update_room_state",
)
- def get_deltas_for_room(self, room_id, start, size=100):
+ def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
"""
- Get statistics deltas for a given room.
+ Get statistics for a given subject.
Args:
- room_id (str)
+ stats_type (str): The type of subject
+ stats_id (str): The ID of the subject (e.g. room_id or user_id)
start (int): Pagination start. Number of entries, not timestamp.
size (int): How many entries to return.
Returns:
Deferred[list[dict]], where the dict has the keys of
- ABSOLUTE_STATS_FIELDS["room"] and "ts".
+ ABSOLUTE_STATS_FIELDS[stats_type], and "bucket_size" and "end_ts".
"""
- return self._simple_select_list_paginate(
- "room_stats",
- {"room_id": room_id},
- "ts",
+ return self.runInteraction(
+ "get_statistics_for_subject",
+ self._get_statistics_for_subject_txn,
+ stats_type,
+ stats_id,
start,
size,
- retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+ )
+
+ def _get_statistics_for_subject_txn(
+ self, txn, stats_type, stats_id, start, size=100
+ ):
+ """
+ Transaction-bound version of L{get_statistics_for_subject}.
+ """
+
+ table, id_col = TYPE_TO_TABLE[stats_type]
+ selected_columns = list(
+ ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+ )
+
+ slice_list = self._simple_select_list_paginate_txn(
+ txn,
+ table + "_historical",
+ {id_col: stats_id},
+ "end_ts",
+ start,
+ size,
+ retcols=selected_columns + ["bucket_size", "end_ts"],
order_direction="DESC",
)
+ if len(slice_list) < size:
+ # also fetch the current row
+ current = self._simple_select_one_txn(
+ txn,
+ table + "_current",
+ {id_col: stats_id},
+ retcols=selected_columns
+ + ["start_ts", "end_ts", "completed_delta_stream_id"],
+ allow_none=True,
+ )
+
+ if current is not None:
+ completed = current["completed_delta_stream_id"] is not None
+ dirty = current["end_ts"] is not None
+
+ if completed and dirty:
+ # it is dirty, so contains new information, so should be included
+ # we don't accept incomplete rows as that would almost certainly
+ # be giving misinformation, since it is awaiting an
+ # initial background count
+ current["bucket_size"] = current["end_ts"] - current["start_ts"]
+ del current["start_ts"]
+ return [current] + slice_list
+ return slice_list
+
def get_all_room_state(self):
return self._simple_select_list(
"room_state", None, retcols=("name", "topic", "canonical_alias")
)
+ def get_room_state(self, room_id):
+ """
+ Returns the current room_state for a room.
+
+ Args:
+ room_id (str): The ID of the room to return state for.
+
+ Returns (dict):
+ Dictionary containing these keys:
+ "name", "topic", "canonical_alias", "avatar", "join_rules",
+ "history_visibility"
+ """
+ return self._simple_select_one(
+ "room_state",
+ {"room_id": room_id},
+ retcols=(
+ "name",
+ "topic",
+ "canonical_alias",
+ "avatar",
+ "join_rules",
+ "history_visibility",
+ ),
+ )
+
@cached()
- def get_earliest_token_for_room_stats(self, room_id):
+ def get_earliest_token_for_stats(self, stats_type, id):
"""
Fetch the "earliest token". This is used by the room stats delta
processor to ignore deltas that have been processed between the
@@ -406,79 +851,275 @@ class StatsStore(StateDeltasStore):
Returns:
Deferred[int]
"""
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
return self._simple_select_one_onecol(
- "room_stats_earliest_token",
- {"room_id": room_id},
- retcol="token",
+ "%s_current" % (table,),
+ {id_col: id},
+ retcol="completed_delta_stream_id",
allow_none=True,
)
- def update_stats(self, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert(
- table=table,
- keyvalues={id_col: stats_id, "ts": ts},
- values=fields,
- desc="update_stats",
+ def _collect_old_txn(self, txn, stats_type, limit=500):
+ """
+ See {collect_old}. Runs only a small batch, specified by limit.
+
+ Returns (bool):
+ True iff there is possibly more to do (i.e. this needs re-running),
+ False otherwise.
+
+ """
+ # we do them in batches to prevent concurrent updates from
+ # messing us over with lots of retries
+
+ now = self.hs.get_reactor().seconds()
+ quantised_ts = self.quantise_stats_time(now)
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ fields = ", ".join(
+ field
+ for field in chain(
+ ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]
+ )
+ )
+
+ # `end_ts IS NOT NULL` is for partial index optimisation
+ if isinstance(self.database_engine, Sqlite3Engine):
+ # SQLite doesn't support SELECT FOR UPDATE
+ sql = (
+ "SELECT %s FROM %s_current"
+ " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+ " LIMIT %d"
+ ) % (id_col, table, limit)
+ else:
+ sql = (
+ "SELECT %s FROM %s_current"
+ " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+ " LIMIT %d FOR UPDATE"
+ ) % (id_col, table, limit)
+ txn.execute(sql, (quantised_ts,))
+ maybe_more = txn.rowcount == limit
+ updates = txn.fetchall()
+
+ sql = (
+ "INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)"
+ " SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts"
+ " FROM %s_current WHERE %s = ?"
+ ) % (table, id_col, fields, id_col, fields, table, id_col)
+ txn.executemany(sql, updates)
+
+ sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % (
+ table,
+ id_col,
)
+ txn.executemany(sql, updates)
+
+ return maybe_more
+
+ @defer.inlineCallbacks
+ def collect_old(self, stats_type):
+ """
+ Run 'old collection' on current stats rows.
+
+ Old collection is the process of copying dirty (updated) stats rows
+ from the current table to the historical table, when those rows have
+ finished their stats time slice.
+ Collected rows are then cleared of their dirty status.
+
+ Args:
+ stats_type: "room" or "user" – the type of stats to run old collection
+ on.
+
+ """
+ while True:
+ maybe_more = yield self.runInteraction(
+ "stats_collect_old", self._collect_old_txn, stats_type
+ )
+ if not maybe_more:
+ return None
+
+ @defer.inlineCallbacks
+ def update_stats_delta(
+ self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+ ):
+ """
+ Updates the statistics for a subject, with a delta (difference/relative
+ change).
+
+ Args:
+ ts (int): timestamp of the change
+ stats_type (str): "room" or "user" – the kind of subject
+ stats_id (str): the subject's ID (room ID or user ID)
+ fields (dict[str, int]): Deltas of stats values.
+ complete_with_stream_id (int, optional):
+ If supplied, converts an incomplete row into a complete row,
+ with the supplied stream_id marked as the stream_id where the
+ row was completed.
+ """
+
+ while True:
+ try:
+ res = yield self.runInteraction(
+ "update_stats_delta",
+ self._update_stats_delta_txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=complete_with_stream_id,
+ )
+ return res
+ except OldCollectionRequired:
+ # retry after collecting old rows
+ yield self.collect_old(stats_type)
+
+ def _update_stats_delta_txn(
+ self,
+ txn,
+ ts,
+ stats_type,
+ stats_id,
+ fields,
+ complete_with_stream_id=None,
+ absolute_fields=None,
+ ):
+ """
+ See L{update_stats_delta}
+ Additional Args:
+ absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+ """
+ table, id_col = TYPE_TO_TABLE[stats_type]
+
+ quantised_ts = self.quantise_stats_time(int(ts))
+ end_ts = quantised_ts + self.stats_bucket_size
+
+ field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
+ field_values = list(fields.values())
+
+ if absolute_fields is not None:
+ field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
+ field_values += list(absolute_fields.values())
+
+ if complete_with_stream_id is not None:
+ field_sqls.append("completed_delta_stream_id = ?")
+ field_values.append(complete_with_stream_id)
+
+ sql = (
+ "UPDATE %s_current SET end_ts = ?, %s"
+ " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
+ " AND %s = ?"
+ ) % (table, ", ".join(field_sqls), id_col)
+
+ qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
+
+ txn.execute(sql, qargs)
- def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
- table, id_col = TYPE_TO_ROOM[stats_type]
- return self._simple_upsert_txn(
- txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+ if txn.rowcount > 0:
+ # success.
+ return
+
+ # if we're here, it's because we didn't succeed in updating a stats
+ # row. Why? Let's find out…
+
+ current_row = self._simple_select_one_txn(
+ txn,
+ table + "_current",
+ {id_col: stats_id},
+ ("end_ts", "completed_delta_stream_id"),
+ allow_none=True,
)
- def update_stats_delta(self, ts, stats_type, stats_id, field, value):
- def _update_stats_delta(txn):
- table, id_col = TYPE_TO_ROOM[stats_type]
+ if current_row is None:
+ # we need to insert a row! (insert a dirty, incomplete row)
+ insertee = {
+ id_col: stats_id,
+ "end_ts": end_ts,
+ "start_ts": ts,
+ "completed_delta_stream_id": complete_with_stream_id,
+ }
+
+ # we assume that, by default, blank fields should be zero.
+ for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
+ insertee[field_name] = 0
+
+ for field_name in PER_SLICE_FIELDS[stats_type]:
+ insertee[field_name] = 0
+
+ for (field, value) in fields.items():
+ insertee[field] = value
+
+ if absolute_fields is not None:
+ for (field, value) in absolute_fields.items():
+ insertee[field] = value
+
+ self._simple_insert_txn(txn, table + "_current", insertee)
+ elif current_row["end_ts"] is None:
+ # update the row, including start_ts
sql = (
- "SELECT * FROM %s"
- " WHERE %s=? and ts=("
- " SELECT MAX(ts) FROM %s"
- " WHERE %s=?"
- ")"
- ) % (table, id_col, table, id_col)
- txn.execute(sql, (stats_id, stats_id))
- rows = self.cursor_to_dict(txn)
- if len(rows) == 0:
- # silently skip as we don't have anything to apply a delta to yet.
- # this tries to minimise any race between the initial sync and
- # subsequent deltas arriving.
- return
-
- current_ts = ts
- latest_ts = rows[0]["ts"]
- if current_ts < latest_ts:
- # This one is in the past, but we're just encountering it now.
- # Mark it as part of the current bucket.
- current_ts = latest_ts
- elif ts != latest_ts:
- # we have to copy our absolute counters over to the new entry.
- values = {
- key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
- }
- values[id_col] = stats_id
- values["ts"] = ts
- values["bucket_size"] = self.stats_bucket_size
-
- self._simple_insert_txn(txn, table=table, values=values)
-
- # actually update the new value
- if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
- self._simple_update_txn(
- txn,
- table=table,
- keyvalues={id_col: stats_id, "ts": current_ts},
- updatevalues={field: value},
- )
- else:
- sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
- table,
- field,
- field,
- id_col,
+ "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
+ " WHERE end_ts IS NULL AND %s = ?"
+ ) % (table, ", ".join(field_sqls), id_col)
+
+ qargs = (
+ [end_ts - self.stats_bucket_size, end_ts]
+ + list(field_values)
+ + [stats_id]
+ )
+
+ txn.execute(sql, qargs)
+ if txn.rowcount == 0:
+ raise RuntimeError(
+ "Should be impossible: No rows updated"
+ " but all conditions are known to be met."
)
- txn.execute(sql, (value, stats_id, current_ts))
- return self.runInteraction("update_stats_delta", _update_stats_delta)
+ elif current_row["end_ts"] < end_ts:
+ # we need to perform old collection first
+ raise OldCollectionRequired()
+
+ def incremental_update_total_events(self, in_positions):
+ """
+ Counts the number of events per-room and then adds these to the respective
+ total_events room counts.
+
+ Args:
+ in_positions (dict): Positions,
+ as retrieved from L{get_stats_positions}.
+
+ Returns (dict):
+ The new positions. Note that this is for reference only –
+ the new positions WILL be committed by this function.
+ """
+
+ def incremental_update_total_events_txn(txn):
+ positions = in_positions.copy()
+
+ max_pos = self.get_room_max_stream_ordering()
+ min_pos = self.get_room_min_stream_ordering()
+ self.update_total_event_count_between_txn(
+ txn,
+ low_pos=positions["total_events_max_stream_ordering"],
+ high_pos=max_pos,
+ )
+
+ self.update_total_event_count_between_txn(
+ txn,
+ low_pos=min_pos,
+ high_pos=positions["total_events_min_stream_ordering"],
+ )
+
+ if (
+ positions["total_events_max_stream_ordering"] != max_pos
+ or positions["total_events_min_stream_ordering"] != min_pos
+ ):
+ positions["total_events_max_stream_ordering"] = max_pos
+ positions["total_events_min_stream_ordering"] = min_pos
+
+ self._update_stats_positions_txn(txn, positions)
+
+ return positions
+
+ return self.runInteraction(
+ "stats_incremental_total_events", incremental_update_total_events_txn
+ )
diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py
index a8b858eb4f..c4b7dfab73 100644
--- a/tests/handlers/test_stats.py
+++ b/tests/handlers/test_stats.py
@@ -17,12 +17,18 @@ from mock import Mock
from twisted.internet import defer
+from synapse import storage
from synapse.api.constants import EventTypes, Membership
from synapse.rest import admin
from synapse.rest.client.v1 import login, room
from tests import unittest
+# The expected number of state events in a fresh public room.
+EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM = 5
+# The expected number of state events in a fresh private room.
+EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM = 6
+
class StatsRoomTests(unittest.HomeserverTestCase):
@@ -33,7 +39,6 @@ class StatsRoomTests(unittest.HomeserverTestCase):
]
def prepare(self, reactor, clock, hs):
-
self.store = hs.get_datastore()
self.handler = self.hs.get_stats_handler()
@@ -47,7 +52,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.get_success(
self.store._simple_insert(
"background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+ {"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
self.get_success(
@@ -56,7 +61,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
{
"update_name": "populate_stats_process_rooms",
"progress_json": "{}",
- "depends_on": "populate_stats_createtables",
+ "depends_on": "populate_stats_prepare",
},
)
)
@@ -64,12 +69,44 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.store._simple_insert(
"background_updates",
{
- "update_name": "populate_stats_cleanup",
+ "update_name": "populate_stats_process_users",
"progress_json": "{}",
"depends_on": "populate_stats_process_rooms",
},
)
)
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_cleanup",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_process_users",
+ },
+ )
+ )
+
+ def _get_current_stats(self, stats_type, stat_id):
+ table, id_col = storage.stats.TYPE_TO_TABLE[stats_type]
+
+ cols = (
+ ["start_ts", "end_ts", "completed_delta_stream_id"]
+ + list(storage.stats.ABSOLUTE_STATS_FIELDS[stats_type])
+ + list(storage.stats.PER_SLICE_FIELDS[stats_type])
+ )
+
+ return self.get_success(
+ self.store._simple_select_one(
+ table + "_current", {id_col: stat_id}, cols, allow_none=True
+ )
+ )
+
+ def _perform_background_initial_update(self):
+ # Do the initial population of the stats via the background update
+ self._add_background_updates()
+
+ while not self.get_success(self.store.has_completed_background_updates()):
+ self.get_success(self.store.do_next_background_update(100), by=0.1)
def test_initial_room(self):
"""
@@ -114,6 +151,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
Ingestion via notify_new_event will ignore tokens that the background
update have already processed.
"""
+
self.reactor.advance(86401)
self.hs.config.stats_enabled = False
@@ -138,12 +176,12 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.hs.config.stats_enabled = True
self.handler.stats_enabled = True
self.store._all_done = False
- self.get_success(self.store.update_stats_stream_pos(None))
+ self.get_success(self.store.update_stats_positions(None))
self.get_success(
self.store._simple_insert(
"background_updates",
- {"update_name": "populate_stats_createtables", "progress_json": "{}"},
+ {"update_name": "populate_stats_prepare", "progress_json": "{}"},
)
)
@@ -154,6 +192,8 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
self.helper.join(room=room_1, user=u2, tok=u2_token)
+ # orig_delta_processor = self.store.
+
# Now do the initial ingestion.
self.get_success(
self.store._simple_insert(
@@ -185,8 +225,13 @@ class StatsRoomTests(unittest.HomeserverTestCase):
self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
self.helper.join(room=room_1, user=u3, tok=u3_token)
- # Get the deltas! There should be two -- day 1, and day 2.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
+ # self.handler.notify_new_event()
+
+ # We need to let the delta processor advance…
+ self.pump(10 * 60)
+
+ # Get the slices! There should be two -- day 1, and day 2.
+ r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
# The oldest has 2 joined members
self.assertEqual(r[-1]["joined_members"], 2)
@@ -259,7 +304,7 @@ class StatsRoomTests(unittest.HomeserverTestCase):
room_1 = self.helper.create_room_as(u1, tok=u1_token)
- # Do the initial population of the user directory via the background update
+ # Do the initial population of the stats via the background update
self._add_background_updates()
while not self.get_success(self.store.has_completed_background_updates()):
@@ -299,6 +344,528 @@ class StatsRoomTests(unittest.HomeserverTestCase):
# One delta, with two joined members -- the room creator, and our fake
# user.
- r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
+ r = self.get_success(self.store.get_statistics_for_subject("room", room_1, 0))
self.assertEqual(len(r), 1)
self.assertEqual(r[0]["joined_members"], 2)
+
+ def test_create_user(self):
+ """
+ When we create a user, it should have statistics already ready.
+ """
+
+ u1 = self.register_user("u1", "pass")
+
+ u1stats = self._get_current_stats("user", u1)
+
+ self.assertIsNotNone(u1stats)
+
+ # row is complete
+ self.assertIsNotNone(u1stats["completed_delta_stream_id"])
+
+ # not in any rooms by default
+ self.assertEqual(u1stats["public_rooms"], 0)
+ self.assertEqual(u1stats["private_rooms"], 0)
+
+ def test_create_room(self):
+ """
+ When we create a room, it should have statistics already ready.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+ r1stats = self._get_current_stats("room", r1)
+ r2 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
+ r2stats = self._get_current_stats("room", r2)
+
+ self.assertIsNotNone(r1stats)
+ self.assertIsNotNone(r2stats)
+
+ # row is complete
+ self.assertIsNotNone(r1stats["completed_delta_stream_id"])
+ self.assertIsNotNone(r2stats["completed_delta_stream_id"])
+
+ # contains the default things you'd expect in a fresh room
+ self.assertEqual(
+ r1stats["total_events"],
+ EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM,
+ "Wrong number of total_events in new room's stats!"
+ " You may need to update this if more state events are added to"
+ " the room creation process.",
+ )
+ self.assertEqual(
+ r2stats["total_events"],
+ EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
+ "Wrong number of total_events in new room's stats!"
+ " You may need to update this if more state events are added to"
+ " the room creation process.",
+ )
+
+ self.assertEqual(
+ r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
+ )
+ self.assertEqual(
+ r2stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM
+ )
+
+ self.assertEqual(r1stats["joined_members"], 1)
+ self.assertEqual(r1stats["invited_members"], 0)
+ self.assertEqual(r1stats["banned_members"], 0)
+
+ self.assertEqual(r2stats["joined_members"], 1)
+ self.assertEqual(r2stats["invited_members"], 0)
+ self.assertEqual(r2stats["banned_members"], 0)
+
+ def test_send_message_increments_total_events(self):
+ """
+ When we send a message, it increments total_events.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.send(r1, "hiss", tok=u1token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+
+ def test_send_state_event_nonoverwriting(self):
+ """
+ When we send a non-overwriting state event, it increments total_events AND current_state_events
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
+ )
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": False}, tok=u1token, state_key="moggy"
+ )
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 1,
+ )
+
+ def test_send_state_event_overwriting(self):
+ """
+ When we send an overwriting state event, it increments total_events ONLY
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": True}, tok=u1token, state_key="tabby"
+ )
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.send_state(
+ r1, "cat.hissing", {"value": False}, tok=u1token, state_key="tabby"
+ )
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+
+ def test_join_first_time(self):
+ """
+ When a user joins a room for the first time, total_events, current_state_events and
+ joined_members should increase by exactly 1.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 1,
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], 1
+ )
+
+ def test_join_after_leave(self):
+ """
+ When a user joins a room after being previously left, total_events and
+ joined_members should increase by exactly 1.
+ current_state_events should not increase.
+ left_members should decrease by exactly 1.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.join(r1, u2, tok=u2token)
+ self.helper.leave(r1, u2, tok=u2token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["left_members"] - r1stats_ante["left_members"], -1
+ )
+
+ def test_invited(self):
+ """
+ When a user invites another user, current_state_events, total_events and
+ invited_members should increase by exactly 1.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.invite(r1, u1, u2, tok=u1token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 1,
+ )
+ self.assertEqual(
+ r1stats_post["invited_members"] - r1stats_ante["invited_members"], +1
+ )
+
+ def test_join_after_invite(self):
+ """
+ When a user joins a room after being invited, total_events and
+ joined_members should increase by exactly 1.
+ current_state_events should not increase.
+ invited_members should decrease by exactly 1.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.invite(r1, u1, u2, tok=u1token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["invited_members"] - r1stats_ante["invited_members"], -1
+ )
+
+ def test_left(self):
+ """
+ When a user leaves a room after joining, total_events and
+ left_members should increase by exactly 1.
+ current_state_events should not increase.
+ joined_members should decrease by exactly 1.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.leave(r1, u2, tok=u2token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["left_members"] - r1stats_ante["left_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
+ )
+
+ def test_banned(self):
+ """
+ When a user is banned from a room after joining, total_events and
+ left_members should increase by exactly 1.
+ current_state_events should not increase.
+ banned_members should decrease by exactly 1.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+
+ self.helper.join(r1, u2, tok=u2token)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+
+ self.helper.change_membership(r1, u1, u2, "ban", tok=u1token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+
+ self.assertEqual(r1stats_post["total_events"] - r1stats_ante["total_events"], 1)
+ self.assertEqual(
+ r1stats_post["current_state_events"] - r1stats_ante["current_state_events"],
+ 0,
+ )
+ self.assertEqual(
+ r1stats_post["banned_members"] - r1stats_ante["banned_members"], +1
+ )
+ self.assertEqual(
+ r1stats_post["joined_members"] - r1stats_ante["joined_members"], -1
+ )
+
+ def test_initial_background_update(self):
+ """
+ Test that statistics can be generated by the initial background update
+ handler.
+
+ This test also tests that stats rows are not created for new subjects
+ when stats are disabled. However, it may be desirable to change this
+ behaviour eventually to still keep current rows.
+ """
+
+ self.hs.config.stats_enabled = False
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token)
+
+ # test that these subjects, which were created during a time of disabled
+ # stats, do not have stats.
+ self.assertIsNone(self._get_current_stats("room", r1))
+ self.assertIsNone(self._get_current_stats("user", u1))
+
+ self.hs.config.stats_enabled = True
+
+ self._perform_background_initial_update()
+
+ r1stats = self._get_current_stats("room", r1)
+ u1stats = self._get_current_stats("user", u1)
+
+ self.assertIsNotNone(r1stats["completed_delta_stream_id"])
+ self.assertIsNotNone(u1stats["completed_delta_stream_id"])
+
+ self.assertEqual(r1stats["joined_members"], 1)
+ self.assertEqual(
+ r1stats["total_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
+ )
+ self.assertEqual(
+ r1stats["current_state_events"], EXPT_NUM_STATE_EVTS_IN_FRESH_PUBLIC_ROOM
+ )
+
+ self.assertEqual(u1stats["public_rooms"], 1)
+
+ def test_incomplete_stats(self):
+ """
+ This tests that we track incomplete statistics.
+
+ We first test that incomplete stats are incrementally generated,
+ following the preparation of a background regen.
+
+ We then test that these incomplete rows are completed by the background
+ regen.
+ """
+
+ u1 = self.register_user("u1", "pass")
+ u1token = self.login("u1", "pass")
+ u2 = self.register_user("u2", "pass")
+ u2token = self.login("u2", "pass")
+ u3 = self.register_user("u3", "pass")
+ r1 = self.helper.create_room_as(u1, tok=u1token, is_public=False)
+
+ # preparation stage of the initial background update
+ # Ugh, have to reset this flag
+ self.store._all_done = False
+
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {"update_name": "populate_stats_prepare", "progress_json": "{}"},
+ )
+ )
+
+ self.get_success(
+ self.store._simple_delete(
+ "room_stats_current", {"1": 1}, "test_delete_stats"
+ )
+ )
+ self.get_success(
+ self.store._simple_delete(
+ "user_stats_current", {"1": 1}, "test_delete_stats"
+ )
+ )
+
+ while not self.get_success(self.store.has_completed_background_updates()):
+ self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+ r1stats_ante = self._get_current_stats("room", r1)
+ u1stats_ante = self._get_current_stats("user", u1)
+ u2stats_ante = self._get_current_stats("user", u2)
+
+ self.helper.invite(r1, u1, u2, tok=u1token)
+ self.helper.join(r1, u2, tok=u2token)
+ self.helper.invite(r1, u1, u3, tok=u1token)
+ self.helper.send(r1, "thou shalt yield", tok=u1token)
+
+ r1stats_post = self._get_current_stats("room", r1)
+ u1stats_post = self._get_current_stats("user", u1)
+ u2stats_post = self._get_current_stats("user", u2)
+
+ # now let the background update continue & finish
+
+ self.store._all_done = False
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_process_rooms",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_prepare",
+ },
+ )
+ )
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_process_users",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_process_rooms",
+ },
+ )
+ )
+ self.get_success(
+ self.store._simple_insert(
+ "background_updates",
+ {
+ "update_name": "populate_stats_cleanup",
+ "progress_json": "{}",
+ "depends_on": "populate_stats_process_users",
+ },
+ )
+ )
+
+ while not self.get_success(self.store.has_completed_background_updates()):
+ self.get_success(self.store.do_next_background_update(100), by=0.1)
+
+ r1stats_complete = self._get_current_stats("room", r1)
+ u1stats_complete = self._get_current_stats("user", u1)
+ u2stats_complete = self._get_current_stats("user", u2)
+
+ # now we make our assertions
+
+ # first check that none of the stats rows were complete before
+ # the background update occurred.
+ self.assertIsNone(r1stats_ante["completed_delta_stream_id"])
+ self.assertIsNone(r1stats_post["completed_delta_stream_id"])
+ self.assertIsNone(u1stats_ante["completed_delta_stream_id"])
+ self.assertIsNone(u1stats_post["completed_delta_stream_id"])
+ self.assertIsNone(u2stats_ante["completed_delta_stream_id"])
+ self.assertIsNone(u2stats_post["completed_delta_stream_id"])
+
+ # check that _ante rows are all skeletons without any deltas applied
+ self.assertEqual(r1stats_ante["joined_members"], 0)
+ self.assertEqual(r1stats_ante["invited_members"], 0)
+ self.assertEqual(r1stats_ante["total_events"], 0)
+ self.assertEqual(r1stats_ante["current_state_events"], 0)
+
+ self.assertEqual(u1stats_ante["public_rooms"], 0)
+ self.assertEqual(u1stats_ante["private_rooms"], 0)
+ self.assertEqual(u2stats_ante["public_rooms"], 0)
+ self.assertEqual(u2stats_ante["private_rooms"], 0)
+
+ # check that _post rows have the expected deltas applied
+ self.assertEqual(r1stats_post["joined_members"], 1)
+ self.assertEqual(r1stats_post["invited_members"], 1)
+ self.assertEqual(r1stats_post["total_events"], 4)
+ self.assertEqual(r1stats_post["current_state_events"], 2)
+
+ self.assertEqual(u1stats_post["public_rooms"], 0)
+ self.assertEqual(u1stats_post["private_rooms"], 0)
+ self.assertEqual(u2stats_post["public_rooms"], 0)
+ self.assertEqual(u2stats_post["private_rooms"], 1)
+
+ # check that _complete rows are complete and correct
+ self.assertEqual(r1stats_complete["joined_members"], 2)
+ self.assertEqual(r1stats_complete["invited_members"], 1)
+ self.assertEqual(
+ r1stats_complete["total_events"],
+ 4 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
+ )
+ self.assertEqual(
+ r1stats_complete["current_state_events"],
+ 2 + EXPT_NUM_STATE_EVTS_IN_FRESH_PRIVATE_ROOM,
+ )
+
+ self.assertEqual(u1stats_complete["public_rooms"], 0)
+ self.assertEqual(u1stats_complete["private_rooms"], 1)
+ self.assertEqual(u2stats_complete["public_rooms"], 0)
+ self.assertEqual(u2stats_complete["private_rooms"], 1)
diff --git a/tests/rest/client/v1/utils.py b/tests/rest/client/v1/utils.py
index 9915367144..cdded88b7f 100644
--- a/tests/rest/client/v1/utils.py
+++ b/tests/rest/client/v1/utils.py
@@ -128,8 +128,12 @@ class RestHelper(object):
return channel.json_body
- def send_state(self, room_id, event_type, body, tok, expect_code=200):
- path = "/_matrix/client/r0/rooms/%s/state/%s" % (room_id, event_type)
+ def send_state(self, room_id, event_type, body, tok, expect_code=200, state_key=""):
+ path = "/_matrix/client/r0/rooms/%s/state/%s/%s" % (
+ room_id,
+ event_type,
+ state_key,
+ )
if tok:
path = path + "?access_token=%s" % tok
|