summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--docs/room_and_user_statistics.md146
-rw-r--r--synapse/storage/schema/delta/56/stats_separated1.sql106
-rw-r--r--synapse/storage/schema/delta/56/stats_separated2.sql.postgres24
-rw-r--r--synapse/storage/schema/delta/56/stats_separated2.sql.sqlite27
-rw-r--r--synapse/storage/stats.py280
5 files changed, 581 insertions, 2 deletions
diff --git a/docs/room_and_user_statistics.md b/docs/room_and_user_statistics.md
new file mode 100644
index 0000000000..eb003f771a
--- /dev/null
+++ b/docs/room_and_user_statistics.md
@@ -0,0 +1,146 @@
+Room and User Statistics
+========================
+
+Synapse maintains room and user statistics (as well as a cache of room state),
+in various tables.
+
+These can be used for administrative purposes but are also used when generating
+the public room directory. If these tables get stale or out of sync (possibly
+after database corruption), you may wish to regenerate them.
+
+
+# Synapse Administrator Documentation
+
+## Various SQL scripts that you may find useful
+
+### Delete stats, including historical stats
+
+```sql
+DELETE FROM room_stats_current;
+DELETE FROM room_stats_historical;
+DELETE FROM user_stats_current;
+DELETE FROM user_stats_historical;
+```
+
+### Regenerate stats (all subjects)
+
+```sql
+BEGIN;
+    DELETE FROM stats_incremental_position;
+    INSERT INTO stats_incremental_position (
+        state_delta_stream_id,
+        total_events_min_stream_ordering,
+        total_events_max_stream_ordering,
+        is_background_contract
+    ) VALUES (NULL, NULL, NULL, FALSE), (NULL, NULL, NULL, TRUE);
+COMMIT;
+
+DELETE FROM room_stats_current;
+DELETE FROM user_stats_current;
+```
+
+then follow the steps below for **'Regenerate stats (missing subjects only)'**
+
+### Regenerate stats (missing subjects only)
+
+```sql
+-- Set up staging tables
+-- we depend on current_state_events_membership because this is used
+-- in our counting.
+INSERT INTO background_updates (update_name, progress_json) VALUES
+    ('populate_stats_prepare', '{}', 'current_state_events_membership');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
+
+-- Run through each user and update stats.
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_cleanup', '{}', 'populate_stats_process_users');
+```
+
+then **restart Synapse**.
+
+
+# Synapse Developer Documentation
+
+## High-Level Concepts
+
+### Definitions
+
+* **subject**: Something we are tracking stats about – currently a room or user.
+* **current row**: An entry for a subject in the appropriate current statistics
+    table. Each subject can have only one.
+* **historical row**: An entry for a subject in the appropriate historical
+    statistics table. Each subject can have any number of these.
+
+### Overview
+
+Stats are maintained as time series. There are two kinds of column:
+
+* absolute columns – where the value is correct for the time given by `end_ts`
+    in the stats row. (Imagine a line graph for these values)
+* per-slice columns – where the value corresponds to how many of the occurrences
+    occurred within the time slice given by `(end_ts − bucket_size)…end_ts`
+    or `start_ts…end_ts`. (Imagine a histogram for these values)
+
+Currently, only absolute columns are in use.
+
+Stats are maintained in two tables (for each type): current and historical.
+
+Current stats correspond to the present values. Each subject can only have one
+entry.
+
+Historical stats correspond to values in the past. Subjects may have multiple
+entries.
+
+## Concepts around the management of stats
+
+### current rows
+
+#### dirty current rows
+
+Current rows can be **dirty**, which means that they have changed since the
+latest historical row for the same subject.
+**Dirty** current rows possess an end timestamp, `end_ts`.
+
+#### old current rows and old collection
+
+When a (necessarily dirty) current row has an `end_ts` in the past, it is said
+to be **old**.
+Old current rows must be copied into a historical row, and cleared of their dirty
+status, before further statistics can be tracked for that subject.
+The process which does this is referred to as **old collection**.
+
+#### incomplete current rows
+
+There are also **incomplete** current rows, which are current rows that do not
+contain a full count yet – this is because they are waiting for the stats
+regenerator to give them an initial count. Incomplete current rows DO NOT contain
+correct and up-to-date values. As such, *incomplete rows are not old-collected*.
+Instead, old incomplete rows will be extended so they are no longer old.
+
+### historical rows
+
+Historical rows can always be considered to be valid for the time slice and
+end time specified. (This, of course, assumes a lack of defects in the code
+to track the statistics, and assumes integrity of the database).
+
+Even still, there are two considerations that we may need to bear in mind:
+
+* historical rows will not exist for every time slice – they will be omitted
+    if there were no changes. In this case, the following assumptions can be
+    made to interpolate/recreate missing rows:
+    - absolute fields have the same values as in the preceding row
+    - per-slice fields are zero (`0`)
+* historical rows will not be retained forever – rows older than a configurable
+    time will be purged.
+
+#### purge
+
+The purging of historical rows is not yet implemented.
+
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
index 5b125d17b0..95daf8f53b 100644
--- a/synapse/storage/schema/delta/56/stats_separated1.sql
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -31,3 +31,109 @@ DELETE FROM background_updates WHERE update_name IN (
     'populate_stats_process_rooms',
     'populate_stats_cleanup'
 );
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+CREATE TABLE IF NOT EXISTS stats_incremental_position (
+    -- the stream_id of the last-processed state delta
+    state_delta_stream_id BIGINT,
+
+    -- the stream_ordering of the last-processed backfilled event
+    -- (this is negative)
+    total_events_min_stream_ordering BIGINT,
+
+    -- the stream_ordering of the last-processed normally-created event
+    -- (this is positive)
+    total_events_max_stream_ordering BIGINT,
+
+    -- If true, this represents the contract agreed upon by the stats
+    -- regenerator.
+    -- If false, this is suitable for use by the delta/incremental processor.
+    is_background_contract BOOLEAN NOT NULL PRIMARY KEY
+);
+
+-- insert a null row and make sure it is the only one.
+DELETE FROM stats_incremental_position;
+INSERT INTO stats_incremental_position (
+    state_delta_stream_id,
+    total_events_min_stream_ordering,
+    total_events_max_stream_ordering,
+    is_background_contract
+) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
+
+-- represents PRESENT room statistics for a room
+-- only holds absolute fields
+CREATE TABLE IF NOT EXISTS room_stats_current (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    current_state_events INT NOT NULL,
+    total_events INT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+
+    -- If initial stats regen is still to be performed: NULL
+    -- If initial stats regen has been performed: the maximum delta stream
+    --  position that this row takes into account.
+    completed_delta_stream_id BIGINT
+);
+
+
+-- represents HISTORICAL room statistics for a room
+CREATE TABLE IF NOT EXISTS room_stats_historical (
+    room_id TEXT NOT NULL,
+    -- These stats cover the time from (end_ts - bucket_size)...end_ts (in seconds).
+    -- Note that end_ts is quantised.
+    end_ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+
+    current_state_events INT NOT NULL,
+    total_events INT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+
+    PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular room.)
+
+
+-- represents PRESENT statistics for a user
+-- only holds absolute fields
+CREATE TABLE IF NOT EXISTS user_stats_current (
+    user_id TEXT NOT NULL PRIMARY KEY,
+
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL,
+
+    -- If initial stats regen is still to be performed: NULL
+    -- If initial stats regen has been performed: the maximum delta stream
+    --  position that this row takes into account.
+    completed_delta_stream_id BIGINT
+);
+
+-- represents HISTORICAL statistics for a user
+CREATE TABLE IF NOT EXISTS user_stats_historical (
+    user_id TEXT NOT NULL,
+    end_ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL,
+
+    PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular user.)
diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres
new file mode 100644
index 0000000000..0519fcff79
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres
@@ -0,0 +1,24 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- These partial indices helps us with finding incomplete stats row
+CREATE INDEX IF NOT EXISTS room_stats_not_complete
+    ON room_stats_current (room_id)
+    WHERE completed_delta_stream_id IS NULL;
+
+CREATE INDEX IF NOT EXISTS user_stats_not_complete
+    ON user_stats_current (user_id)
+    WHERE completed_delta_stream_id IS NULL;
+
diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
new file mode 100644
index 0000000000..181f4ec5b9
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
@@ -0,0 +1,27 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- even though SQLite >= 3.8 can support partial indices, we won't enable
+-- them, in case the SQLite database may be later used on another system.
+-- It's also the case that SQLite is only likely to be used in small
+-- deployments or testing, where the optimisations gained by use of a
+-- partial index are not a big concern.
+
+CREATE INDEX IF NOT EXISTS room_stats_not_complete
+    ON room_stats_current (completed_delta_stream_id, room_id);
+
+CREATE INDEX IF NOT EXISTS user_stats_not_complete
+    ON user_stats_current (completed_delta_stream_id, user_id);
+
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index 123c1ae220..5a7dfde926 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,6 +15,7 @@
 # limitations under the License.
 
 import logging
+from itertools import chain
 
 from synapse.storage.state_deltas import StateDeltasStore
 
@@ -27,12 +29,21 @@ ABSOLUTE_STATS_FIELDS = {
         "invited_members",
         "left_members",
         "banned_members",
-        "state_events",
+        "total_events",
     ),
     "user": ("public_rooms", "private_rooms"),
 }
 
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
+
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+
+class OldCollectionRequired(Exception):
+    """ Signal that we need to collect old stats rows and retry. """
+
+    pass
 
 
 class StatsStore(StateDeltasStore):
@@ -48,6 +59,21 @@ class StatsStore(StateDeltasStore):
         self.register_noop_background_update("populate_stats_process_rooms")
         self.register_noop_background_update("populate_stats_cleanup")
 
+    def quantise_stats_time(self, ts):
+        """
+        Quantises a timestamp to be a multiple of the bucket size.
+
+        Args:
+            ts (int): the timestamp to quantise, in seconds since the Unix Epoch
+
+        Returns:
+            int: a timestamp which
+              - is divisible by the bucket size;
+              - is no later than `ts`; and
+              - is the largest such timestamp.
+        """
+        return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
     def update_room_state(self, room_id, fields):
         """
         Args:
@@ -76,3 +102,253 @@ class StatsStore(StateDeltasStore):
             values=fields,
             desc="update_room_state",
         )
+
+    def update_stats_delta(
+        self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+    ):
+        """
+        Updates the statistics for a subject, with a delta (difference/relative
+        change).
+
+        Args:
+            ts (int): timestamp of the change
+            stats_type (str): "room" or "user" – the kind of subject
+            stats_id (str): the subject's ID (room ID or user ID)
+            fields (dict[str, int]): Deltas of stats values.
+            complete_with_stream_id (int, optional):
+                If supplied, converts an incomplete row into a complete row,
+                with the supplied stream_id marked as the stream_id where the
+                row was completed.
+        """
+
+        return self.runInteraction(
+            "update_stats_delta",
+            self._update_stats_delta_txn,
+            ts,
+            stats_type,
+            stats_id,
+            fields,
+            complete_with_stream_id=complete_with_stream_id,
+        )
+
+    def _upsert_with_additive_relatives_txn(
+        self, txn, table, keyvalues, absolutes, additive_relatives
+    ):
+        """
+
+        Args:
+            txn: Transaction
+            table (str): Table name
+            keyvalues (dict[str, any]): Row-identifying key values
+            absolutes (dict[str, any]): Absolute (set) fields
+            additive_relatives (dict[str, int]): Fields that will be added onto
+                if existing row present.
+        """
+        if self.database_engine.can_native_upsert:
+            absolute_updates = [
+                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+                for field in absolutes.keys()
+            ]
+
+            relative_updates = [
+                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                % {"table": table, "field": field}
+                for field in additive_relatives.keys()
+            ]
+
+            insert_cols = []
+            qargs = [table]
+
+            for (key, val) in chain(
+                keyvalues.items(), absolutes.items(), additive_relatives.items()
+            ):
+                insert_cols.append(key)
+                qargs.append(val)
+
+            sql = """
+                INSERT INTO %(table)s (%(insert_cols_cs)s)
+                VALUES (%(insert_vals_qs)s)
+                ON CONFLICT DO UPDATE SET %(updates)s
+            """ % {
+                "table": table,
+                "insert_cols_cs": ", ".join(insert_cols),
+                "insert_vals_qs": ", ".join(
+                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+                ),
+                "updates": ", ".join(chain(absolute_updates, relative_updates)),
+            }
+
+            txn.execute(sql, qargs)
+        else:
+            retcols = chain(absolutes.keys(), additive_relatives.keys())
+            current_row = self._simple_select_one_txn(
+                txn, table, keyvalues, retcols, allow_none=True
+            )
+            if current_row is None:
+                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+                self._simple_insert_txn(txn, table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    current_row[key] += val
+                for (key, val) in absolutes.items():
+                    current_row[key] = val
+                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+    def _upsert_copy_from_table_with_additive_relatives_txn(
+        self,
+        txn,
+        into_table,
+        keyvalues,
+        extra_dst_keyvalues,
+        additive_relatives,
+        src_table,
+        copy_columns,
+        additional_where="",
+    ):
+        """
+        Args:
+             txn: Transaction
+             into_table (str): The destination table to UPSERT the row into
+             keyvalues (dict[str, any]): Row-identifying key values
+             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+                for `into_table`.
+             additive_relatives (dict[str, any]): Fields that will be added onto
+                if existing row present. (Must be disjoint from copy_columns.)
+             src_table (str): The source table to copy from
+             copy_columns (iterable[str]): The list of columns to copy
+             additional_where (str): Additional SQL for where (prefix with AND
+                if using).
+        """
+        if self.database_engine.can_native_upsert:
+            ins_columns = chain(
+                keyvalues,
+                copy_columns,
+                additive_relatives.keys(),
+                extra_dst_keyvalues.keys(),
+            )
+            sel_exprs = chain(
+                keyvalues,
+                copy_columns,
+                ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+            )
+            keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+            sets_ar = (
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+            )
+
+            sql = """
+                INSERT INTO %(into_table)s (%(ins_columns)s)
+                SELECT %(sel_exprs)s
+                FROM %(src_table)s
+                WHERE %(keyvalues_where)s %(additional_where)s
+                ON CONFLICT (%(keyvalues)s)
+                DO UPDATE SET %(sets)s
+            """ % {
+                "into_table": into_table,
+                "ins_columns": ", ".join(ins_columns),
+                "sel_exprs": ", ".join(sel_exprs),
+                "keyvalues_where": " AND ".join(keyvalues_where),
+                "src_table": src_table,
+                "keyvalues": ", ".join(
+                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+                ),
+                "sets": ", ".join(chain(sets_cc, sets_ar)),
+                "additional_where": additional_where,
+            }
+
+            qargs = chain(additive_relatives.values(), keyvalues.values())
+            txn.execute(sql, qargs)
+        else:
+            src_row = self._simple_select_one_txn(
+                txn, src_table, keyvalues, copy_columns
+            )
+            dest_current_row = self._simple_select_one_txn(
+                txn,
+                into_table,
+                keyvalues,
+                chain(additive_relatives.keys(), copy_columns),
+                allow_none=True,
+            )
+
+            if dest_current_row is None:
+                merged_dict = {**keyvalues, **src_row, **additive_relatives}
+                self._simple_insert_txn(txn, into_table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    src_row[key] = dest_current_row[key] + val
+                self._simple_update_txn(txn, into_table, keyvalues, src_row)
+
+    def _update_stats_delta_txn(
+        self,
+        txn,
+        ts,
+        stats_type,
+        stats_id,
+        fields,
+        complete_with_stream_id=None,
+        absolute_fields=None,
+    ):
+        """
+        See L{update_stats_delta}
+        Additional Args:
+            absolute_fields (dict[str, int]): Absolute current stats values
+                (i.e. not deltas). Does not work with per-slice fields.
+        """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        quantised_ts = self.quantise_stats_time(int(ts))
+        end_ts = quantised_ts + self.stats_bucket_size
+
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
+        for field in chain(fields.keys(), absolute_fields.keys()):
+            if field not in abs_field_names and field not in slice_field_names:
+                # guard against potential SQL injection dodginess
+                raise ValueError(
+                    "%s is not a recognised field"
+                    " for stats type %s" % (field, stats_type)
+                )
+
+        additive_relatives = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_fields
+        }
+
+        if absolute_fields is None:
+            absolute_fields = {}
+        elif complete_with_stream_id is not None:
+            absolute_fields = absolute_fields.copy()
+
+        absolute_fields["completed_delta_stream_id"] = complete_with_stream_id
+
+        # first upsert the current table
+        self._upsert_with_additive_relatives_txn(
+            txn,
+            table + "_current",
+            {id_col: stats_id},
+            absolute_fields,
+            additive_relatives,
+        )
+
+        if self.has_completed_background_updates():
+            # TODO want to check specifically for stats regenerator, not all
+            #   background updates…
+            # then upsert the historical table.
+            # we don't support absolute_fields for slice_field_names as it makes
+            # no sense.
+            per_slice_additive_relatives = {
+                key: fields.get(key, 0) for key in slice_field_names
+            }
+            self._upsert_copy_from_table_with_additive_relatives_txn(
+                txn,
+                table + "_historical",
+                {id_col: stats_id},
+                {"end_ts": end_ts, "bucket_size": self.stats_bucket_size},
+                per_slice_additive_relatives,
+                table + "_current",
+                abs_field_names,
+                additional_where=" AND completed_delta_stream_id IS NOT NULL",
+            )