summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/stats.py198
-rw-r--r--synapse/storage/registration.py11
-rw-r--r--synapse/storage/schema/delta/56/stats_separated1.sql168
-rw-r--r--synapse/storage/schema/delta/56/stats_separated2.py87
-rw-r--r--synapse/storage/state_deltas.py2
-rw-r--r--synapse/storage/stats.py1057
6 files changed, 1241 insertions, 282 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..dee19cdcec 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
         # The current position in the current_state_delta stream
         self.pos = None
 
-        # Guard to ensure we only process deltas one at a time
-        self._is_processing = False
-
         if hs.config.stats_enabled:
             self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -65,43 +62,58 @@ class StatsHandler(StateDeltasHandler):
         if not self.hs.config.stats_enabled:
             return
 
-        if self._is_processing:
-            return
+        lock = self.store.stats_delta_processing_lock
 
         @defer.inlineCallbacks
         def process():
             try:
                 yield self._unsafe_process()
             finally:
-                self._is_processing = False
+                lock.release()
 
-        self._is_processing = True
-        run_as_background_process("stats.notify_new_event", process)
+        if lock.acquire(blocking=False):
+            # we only want to run this process one-at-a-time,
+            # and also, if the initial background updater wants us to keep out,
+            # we should respect that.
+            try:
+                run_as_background_process("stats.notify_new_event", process)
+            except:  # noqa: E722 – re-raised so fine
+                lock.release()
+                raise
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
-        if self.pos is None:
-            self.pos = yield self.store.get_stats_stream_pos()
+        if self.pos is None or None in self.pos.values():
+            self.pos = yield self.store.get_stats_positions()
 
-        # If still None then the initial background update hasn't happened yet
-        if self.pos is None:
+        # If still None then the initial background update hasn't started yet
+        if self.pos is None or None in self.pos.values():
             return None
 
         # Loop round handling deltas until we're up to date
-        while True:
-            with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
+        with Measure(self.clock, "stats_delta"):
+            while True:
+                deltas = yield self.store.get_current_state_deltas(
+                    self.pos["state_delta_stream_id"]
+                )
                 if not deltas:
-                    return
+                    break
 
                 logger.info("Handling %d state deltas", len(deltas))
                 yield self._handle_deltas(deltas)
 
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
+                self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+
+                event_processing_positions.labels("stats").set(
+                    self.pos["state_delta_stream_id"]
+                )
+
+                if self.pos is not None:
+                    yield self.store.update_stats_positions(self.pos)
 
-                event_processing_positions.labels("stats").set(self.pos)
+        with Measure(self.clock, "stats_total_events"):
+            self.pos = yield self.store.incremental_update_total_events(self.pos)
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
@@ -119,7 +131,7 @@ class StatsHandler(StateDeltasHandler):
 
             logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+            token = yield self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -144,44 +156,56 @@ class StatsHandler(StateDeltasHandler):
             # We use stream_pos here rather than fetch by event_id as event_id
             # may be None
             now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+            now = int(now) // 1000
 
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+            room_stats_delta = {}
+            room_stats_complete = False
+
+            if prev_event_id is None:
+                # this state event doesn't overwrite another,
+                # so it is a new effective/current state event
+                room_stats_delta["current_state_events"] = (
+                    room_stats_delta.get("current_state_events", 0) + 1
+                )
 
             if typ == EventTypes.Member:
                 # we could use _get_key_change here but it's a bit inefficient
                 # given we're not testing for a specific result; might as well
                 # just grab the prev_membership and membership strings and
                 # compare them.
-                prev_event_content = {}
+                # We take None rather than leave as a previous membership
+                # in the absence of a previous event because we do not want to
+                # reduce the leave count when a new-to-the-room user joins.
+                prev_membership = None
                 if prev_event_id is not None:
                     prev_event = yield self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
+                        prev_membership = prev_event_content.get(
+                            "membership", Membership.LEAVE
+                        )
 
                 membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
 
-                if prev_membership == membership:
-                    continue
-
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
+                if prev_membership is None:
+                    logger.debug("No previous membership for this user.")
+                elif prev_membership == Membership.JOIN:
+                    room_stats_delta["joined_members"] = (
+                        room_stats_delta.get("joined_members", 0) - 1
                     )
                 elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
+                    room_stats_delta["invited_members"] = (
+                        room_stats_delta.get("invited_members", 0) - 1
                     )
                 elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
+                    room_stats_delta["left_members"] = (
+                        room_stats_delta.get("left_members", 0) - 1
                     )
                 elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
+                    room_stats_delta["banned_members"] = (
+                        room_stats_delta.get("banned_members", 0) - 1
                     )
                 else:
                     err = "%s is not a valid prev_membership" % (repr(prev_membership),)
@@ -189,20 +213,20 @@ class StatsHandler(StateDeltasHandler):
                     raise ValueError(err)
 
                 if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
+                    room_stats_delta["joined_members"] = (
+                        room_stats_delta.get("joined_members", 0) + 1
                     )
                 elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
+                    room_stats_delta["invited_members"] = (
+                        room_stats_delta.get("invited_members", 0) + 1
                     )
                 elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
+                    room_stats_delta["left_members"] = (
+                        room_stats_delta.get("left_members", 0) + 1
                     )
                 elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
+                    room_stats_delta["banned_members"] = (
+                        room_stats_delta.get("banned_members", 0) + 1
                     )
                 else:
                     err = "%s is not a valid membership" % (repr(membership),)
@@ -210,26 +234,19 @@ class StatsHandler(StateDeltasHandler):
                     raise ValueError(err)
 
                 user_id = state_key
-                if self.is_mine_id(user_id):
+                if self.is_mine_id(user_id) and membership in (
+                    Membership.JOIN,
+                    Membership.LEAVE,
+                ):
                     # update user_stats as it's one of our users
                     public = yield self._is_public_room(room_id)
 
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
-                        )
+                    field = "public_rooms" if public else "private_rooms"
+                    delta = +1 if membership == Membership.JOIN else -1
+
+                    yield self.store.update_stats_delta(
+                        now, "user", user_id, {field: delta}
+                    )
 
             elif typ == EventTypes.Create:
                 # Newly created room. Add it with all blank portions.
@@ -246,28 +263,46 @@ class StatsHandler(StateDeltasHandler):
                     },
                 )
 
+                room_stats_complete = True
+
             elif typ == EventTypes.JoinRules:
+                old_room_state = yield self.store.get_room_state(room_id)
                 yield self.store.update_room_state(
                     room_id, {"join_rules": event_content.get("join_rule")}
                 )
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                # whether the room would be public anyway,
+                # because of history_visibility
+                other_field_gives_publicity = (
+                    old_room_state["history_visibility"] == "world_readable"
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+
+                if not other_field_gives_publicity:
+                    is_public = yield self._get_key_change(
+                        prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                    )
+                    if is_public is not None:
+                        yield self.update_public_room_stats(now, room_id, is_public)
 
             elif typ == EventTypes.RoomHistoryVisibility:
+                old_room_state = yield self.store.get_room_state(room_id)
                 yield self.store.update_room_state(
                     room_id,
                     {"history_visibility": event_content.get("history_visibility")},
                 )
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
+                # whether the room would be public anyway,
+                # because of join_rule
+                other_field_gives_publicity = (
+                    old_room_state["join_rules"] == JoinRules.PUBLIC
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+
+                if not other_field_gives_publicity:
+                    is_public = yield self._get_key_change(
+                        prev_event_id, event_id, "history_visibility", "world_readable"
+                    )
+                    if is_public is not None:
+                        yield self.update_public_room_stats(now, room_id, is_public)
 
             elif typ == EventTypes.Encryption:
                 yield self.store.update_room_state(
@@ -290,6 +325,20 @@ class StatsHandler(StateDeltasHandler):
                     room_id, {"canonical_alias": event_content.get("alias")}
                 )
 
+            if room_stats_complete:
+                yield self.store.update_stats_delta(
+                    now,
+                    "room",
+                    room_id,
+                    room_stats_delta,
+                    complete_with_stream_id=stream_id,
+                )
+
+            elif len(room_stats_delta) > 0:
+                yield self.store.update_stats_delta(
+                    now, "room", room_id, room_stats_delta
+                )
+
     @defer.inlineCallbacks
     def update_public_room_stats(self, ts, room_id, is_public):
         """
@@ -308,10 +357,13 @@ class StatsHandler(StateDeltasHandler):
         for user_id in user_ids:
             if self.hs.is_mine(UserID.from_string(user_id)):
                 yield self.store.update_stats_delta(
-                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
-                )
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
+                    ts,
+                    "user",
+                    user_id,
+                    {
+                        "public_rooms": +1 if is_public else -1,
+                        "private_rooms": -1 if is_public else +1,
+                    },
                 )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 55e4e84d71..d8e1864a08 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -845,6 +845,17 @@ class RegistrationStore(
                 (user_id_obj.localpart, create_profile_with_displayname),
             )
 
+        if self.hs.config.stats_enabled:
+            # we create a new completed user statistics row
+
+            # we don't strictly need current_token since this user really can't
+            # have any state deltas before now (as it is a new user), but still,
+            # we include it for completeness.
+            current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+            self._update_stats_delta_txn(
+                txn, now, "user", user_id, {}, complete_with_stream_id=current_token
+            )
+
         self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
         txn.call_after(self.is_guest.invalidate, (user_id,))
 
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
new file mode 100644
index 0000000000..d987479363
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -0,0 +1,168 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+    'populate_stats_createtables',
+    'populate_stats_process_rooms',
+    'populate_stats_cleanup',
+    'regen_stats'
+);
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+CREATE TABLE IF NOT EXISTS stats_incremental_position (
+    -- the stream_id of the last-processed state delta
+    state_delta_stream_id BIGINT,
+
+    -- the stream_ordering of the last-processed backfilled event
+    -- (this is negative)
+    total_events_min_stream_ordering BIGINT,
+
+    -- the stream_ordering of the last-processed normally-created event
+    -- (this is positive)
+    total_events_max_stream_ordering BIGINT,
+
+    -- If true, this represents the contract agreed upon by the background
+    -- population processor.
+    -- If false, this is suitable for use by the delta/incremental processor.
+    is_background_contract BOOLEAN NOT NULL PRIMARY KEY
+);
+
+-- insert a null row and make sure it is the only one.
+DELETE FROM stats_incremental_position;
+INSERT INTO stats_incremental_position (
+    state_delta_stream_id,
+    total_events_min_stream_ordering,
+    total_events_max_stream_ordering,
+    is_background_contract
+) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
+
+-- represents PRESENT room statistics for a room
+CREATE TABLE IF NOT EXISTS room_stats_current (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    -- These starts cover the time from start_ts…end_ts (in seconds).
+    -- Note that end_ts is quantised, and start_ts usually so.
+    start_ts BIGINT,
+    end_ts BIGINT,
+
+    current_state_events INT NOT NULL DEFAULT 0,
+    total_events INT NOT NULL DEFAULT 0,
+    joined_members INT NOT NULL DEFAULT 0,
+    invited_members INT NOT NULL DEFAULT 0,
+    left_members INT NOT NULL DEFAULT 0,
+    banned_members INT NOT NULL DEFAULT 0,
+
+    -- If initial background count is still to be performed: NULL
+    -- If initial background count has been performed: the maximum delta stream
+    --  position that this row takes into account.
+    completed_delta_stream_id BIGINT,
+
+    CONSTRAINT timestamp_nullity_equality CHECK ((start_ts IS NULL) = (end_ts IS NULL))
+);
+
+
+-- represents HISTORICAL room statistics for a room
+CREATE TABLE IF NOT EXISTS room_stats_historical (
+    room_id TEXT NOT NULL,
+    -- These starts cover the time from (end_ts - bucket_size)…end_ts (in seconds).
+    -- Note that end_ts is quantised, and start_ts usually so.
+    end_ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+
+    current_state_events INT NOT NULL,
+    total_events INT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+
+    PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular room.)
+
+
+-- represents PRESENT statistics for a user
+CREATE TABLE IF NOT EXISTS user_stats_current (
+    user_id TEXT NOT NULL PRIMARY KEY,
+
+    -- The timestamp that represents the start of the
+    start_ts BIGINT,
+    end_ts BIGINT,
+
+    public_rooms INT DEFAULT 0 NOT NULL,
+    private_rooms INT DEFAULT 0 NOT NULL,
+
+    -- If initial background count is still to be performed: NULL
+    -- If initial background count has been performed: the maximum delta stream
+    --  position that this row takes into account.
+    completed_delta_stream_id BIGINT
+);
+
+-- represents HISTORICAL statistics for a user
+CREATE TABLE IF NOT EXISTS user_stats_historical (
+    user_id TEXT NOT NULL,
+    end_ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL,
+
+    PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular user.)
+
+
+-- Set up staging tables
+-- we depend on current_state_events_membership because this is used
+-- in our counting.
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_prepare', '{}', 'current_state_events_membership');
+
+-- Run through each room and update stats
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_rooms', '{}', 'populate_stats_prepare');
+
+-- Run through each user and update stats.
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
+
+-- Clean up staging tables
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_cleanup', '{}', 'populate_stats_process_users');
diff --git a/synapse/storage/schema/delta/56/stats_separated2.py b/synapse/storage/schema/delta/56/stats_separated2.py
new file mode 100644
index 0000000000..c5e3208adb
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.py
@@ -0,0 +1,87 @@
+# -*- coding: utf-8 -*-
+# Copyright 2019 The Matrix.org Foundation C.I.C.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+# This schema delta will be run after 'stats_separated1.sql' due to lexicographic
+# ordering. Note that it MUST be so.
+from synapse.storage.engines import PostgresEngine, Sqlite3Engine
+
+
+def _run_create_generic(stats_type, cursor, database_engine):
+    """
+    Creates the pertinent (partial, if supported) indices for one kind of stats.
+    Args:
+        stats_type: "room" or "user" – the type of stats
+        cursor: Database Cursor
+        database_engine: Database Engine
+    """
+    if isinstance(database_engine, Sqlite3Engine):
+        # even though SQLite >= 3.8 can support partial indices, we won't enable
+        # them, in case the SQLite database may be later used on another system.
+        # It's also the case that SQLite is only likely to be used in small
+        # deployments or testing, where the optimisations gained by use of a
+        # partial index are not a big concern.
+        cursor.execute(
+            """
+                CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
+                    ON %s_stats_current (end_ts);
+            """
+            % (stats_type, stats_type)
+        )
+        cursor.execute(
+            """
+                CREATE INDEX IF NOT EXISTS %s_stats_not_complete
+                    ON %s_stats_current (completed_delta_stream_id, %s_id);
+            """
+            % (stats_type, stats_type, stats_type)
+        )
+    elif isinstance(database_engine, PostgresEngine):
+        # This partial index helps us with finding dirty stats rows
+        cursor.execute(
+            """
+                CREATE INDEX IF NOT EXISTS %s_stats_current_dirty
+                    ON %s_stats_current (end_ts)
+                    WHERE end_ts IS NOT NULL;
+            """
+            % (stats_type, stats_type)
+        )
+        # This partial index helps us with old collection
+        cursor.execute(
+            """
+                CREATE INDEX IF NOT EXISTS %s_stats_not_complete
+                    ON %s_stats_current (%s_id)
+                    WHERE completed_delta_stream_id IS NULL;
+            """
+            % (stats_type, stats_type, stats_type)
+        )
+    else:
+        raise NotImplementedError("Unknown database engine.")
+
+
+def run_create(cursor, database_engine):
+    """
+    This function is called as part of the schema delta.
+    It will create indices – partial, if supported – for the new 'separated'
+    room & user statistics.
+    """
+    _run_create_generic("room", cursor, database_engine)
+    _run_create_generic("user", cursor, database_engine)
+
+
+def run_upgrade(cur, database_engine, config):
+    """
+    This function is run on a database upgrade (of a non-empty database).
+    We have no need to do anything specific here.
+    """
+    pass
diff --git a/synapse/storage/state_deltas.py b/synapse/storage/state_deltas.py
index 5fdb442104..75b2217c97 100644
--- a/synapse/storage/state_deltas.py
+++ b/synapse/storage/state_deltas.py
@@ -31,7 +31,7 @@ class StateDeltasStore(SQLBaseStore):
             - state_key (str):
             - event_id (str|None): new event_id for this state key. None if the
                 state has been deleted.
-            - prev_event_id (str|None): previous event_id for this state key. None
+            - prev_event_id (str): previous event_id for this state key. None
                 if it's new state.
 
         Args:
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..b6959f7967 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,10 +15,13 @@
 # limitations under the License.
 
 import logging
+from itertools import chain
+from threading import Lock
 
 from twisted.internet import defer
 
 from synapse.api.constants import EventTypes, Membership
+from synapse.storage.engines import Sqlite3Engine
 from synapse.storage.prepare_database import get_statements
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
@@ -32,14 +36,21 @@ ABSOLUTE_STATS_FIELDS = {
         "invited_members",
         "left_members",
         "banned_members",
-        "state_events",
+        "total_events",
     ),
     "user": ("public_rooms", "private_rooms"),
 }
 
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
 
-TEMP_TABLE = "_temp_populate_stats"
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+
+
+class OldCollectionRequired(Exception):
+    """ Signal that we need to collect old stats rows and retry. """
+
+    pass
 
 
 class StatsStore(StateDeltasStore):
@@ -51,121 +62,320 @@ class StatsStore(StateDeltasStore):
         self.stats_enabled = hs.config.stats_enabled
         self.stats_bucket_size = hs.config.stats_bucket_size
 
+        self.stats_delta_processing_lock = Lock()
+
         self.register_background_update_handler(
-            "populate_stats_createtables", self._populate_stats_createtables
+            "populate_stats_prepare", self._populate_stats_prepare
         )
         self.register_background_update_handler(
             "populate_stats_process_rooms", self._populate_stats_process_rooms
         )
         self.register_background_update_handler(
-            "populate_stats_cleanup", self._populate_stats_cleanup
+            "populate_stats_process_users", self._populate_stats_process_users
+        )
+        # we no longer need to perform clean-up, but we will give ourselves
+        # the potential to reintroduce it in the future – so documentation
+        # will still encourage the use of this no-op handler.
+        self.register_noop_background_update("populate_stats_cleanup")
+
+    def quantise_stats_time(self, ts):
+        """
+        Quantises a timestamp to be a multiple of the bucket size.
+
+        Args:
+            ts: the timestamp to quantise, in seconds since the Unix Epoch
+
+        Returns:
+            a timestamp which
+              - is divisible by the bucket size;
+              - is no later than `ts`; and
+              - is the largest such timestamp.
+        """
+        return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
+    @defer.inlineCallbacks
+    def _unwedge_incremental_processor(self, forced_promise):
+        """
+        Make a promise about what this initial background count will handle,
+        so that we can allow the incremental processor to start doing things
+        right away – 'unwedging' it.
+        """
+
+        if forced_promise is None:
+            promised_stats_delta_pos = (
+                yield self.get_max_stream_id_in_current_state_deltas()
+            )
+
+            promised_max = self.get_room_max_stream_ordering()
+            promised_min = self.get_room_min_stream_ordering()
+
+            promised_positions = {
+                "state_delta_stream_id": promised_stats_delta_pos,
+                "total_events_min_stream_ordering": promised_min,
+                "total_events_max_stream_ordering": promised_max,
+            }
+        else:
+            promised_positions = forced_promise
+
+        # this stores it for our reference later
+        yield self.update_stats_positions(
+            promised_positions, for_initial_processor=True
+        )
+
+        # this unwedges the incremental processor
+        yield self.update_stats_positions(
+            promised_positions, for_initial_processor=False
         )
 
+        # with the delta processor unwedged, now let it catch up in case
+        # anything was missed during the wedge period
+        self.clock.call_later(0, self.hs.get_stats_handler().notify_new_event)
+
     @defer.inlineCallbacks
-    def _populate_stats_createtables(self, progress, batch_size):
+    def _populate_stats_prepare(self, progress, batch_size):
+        """
+        This is a background update, which prepares the database for
+        statistics regeneration.
+        """
 
         if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_createtables")
+            yield self._end_background_update("populate_stats_prepare")
             return 1
 
-        # Get all the rooms that we want to process.
-        def _make_staging_area(txn):
-            # Create the temporary tables
-            stmts = get_statements(
+        def _wedge_incremental_processor(txn):
+            """
+            Wedge the incremental processor (by setting its positions to NULL),
+            and return its previous positions – atomically.
+            """
+
+            with self.stats_delta_processing_lock:
+                old = self._get_stats_positions_txn(txn, for_initial_processor=False)
+                self._update_stats_positions_txn(txn, None, for_initial_processor=False)
+
+            return old
+
+        def _make_skeletons(txn):
+            """
+            Get all the rooms and users that we want to process, and create
+            'skeletons' (incomplete _stats_current rows) for them, if they do
+            not already have a row.
+            """
+
+            if isinstance(self.database_engine, Sqlite3Engine):
+                sqls = """
+                    INSERT OR IGNORE INTO room_stats_current (room_id)
+                    SELECT room_id FROM rooms;
+
+                    INSERT OR IGNORE INTO user_stats_current (user_id)
+                    SELECT name AS user_id FROM users;
+                """
+            else:
+                sqls = """
+                    INSERT INTO room_stats_current (room_id)
+                    SELECT room_id FROM rooms
+                    ON CONFLICT DO NOTHING;
+
+                    INSERT INTO user_stats_current (user_id)
+                    SELECT name AS user_id FROM users
+                    ON CONFLICT DO NOTHING;
                 """
-                -- We just recreate the table, we'll be reinserting the
-                -- correct entries again later anyway.
-                DROP TABLE IF EXISTS {temp}_rooms;
-
-                CREATE TABLE IF NOT EXISTS {temp}_rooms(
-                    room_id TEXT NOT NULL,
-                    events BIGINT NOT NULL
-                );
-
-                CREATE INDEX {temp}_rooms_events
-                    ON {temp}_rooms(events);
-                CREATE INDEX {temp}_rooms_id
-                    ON {temp}_rooms(room_id);
-            """.format(
-                    temp=TEMP_TABLE
-                ).splitlines()
-            )
 
-            for statement in stmts:
+            for statement in get_statements(sqls.splitlines()):
                 txn.execute(statement)
 
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_position(position TEXT NOT NULL)"
-            )
-            txn.execute(sql)
+        def _delete_dirty_skeletons(txn):
+            """
+            Delete pre-existing rows which are incomplete.
+            """
+            sqls = """
+                DELETE FROM room_stats_current
+                WHERE completed_delta_stream_id IS NULL;
 
-            # Get rooms we want to process from the database, only adding
-            # those that we haven't (i.e. those not in room_stats_earliest_token)
-            sql = """
-                INSERT INTO %s_rooms (room_id, events)
-                SELECT c.room_id, count(*) FROM current_state_events AS c
-                LEFT JOIN room_stats_earliest_token AS t USING (room_id)
-                WHERE t.room_id IS NULL
-                GROUP BY c.room_id
-            """ % (
-                TEMP_TABLE,
-            )
-            txn.execute(sql)
+                DELETE FROM user_stats_current
+                WHERE completed_delta_stream_id IS NULL;
+            """
 
-        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
-        yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
-        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
-        self.get_earliest_token_for_room_stats.invalidate_all()
+            for statement in get_statements(sqls.splitlines()):
+                txn.execute(statement)
 
-        yield self._end_background_update("populate_stats_createtables")
+        # first wedge the incremental processor and reset our promise
+        old_positions = yield self.runInteraction(
+            "populate_stats_wedge", _wedge_incremental_processor
+        )
+
+        if None in old_positions.values():
+            old_positions = None
+
+        # with the incremental processor wedged, we delete dirty skeleton rows
+        # since we don't want to double-count them.
+        yield self.runInteraction(
+            "populate_stats_delete_dirty_skeletons", _delete_dirty_skeletons
+        )
+
+        yield self._unwedge_incremental_processor(old_positions)
+
+        yield self.runInteraction("populate_stats_make_skeletons", _make_skeletons)
+        self.get_earliest_token_for_stats.invalidate_all()
+
+        yield self._end_background_update("populate_stats_prepare")
         return 1
 
     @defer.inlineCallbacks
-    def _populate_stats_cleanup(self, progress, batch_size):
+    def _populate_stats_process_users(self, progress, batch_size):
         """
-        Update the user directory stream position, then clean up the old tables.
+        This is a background update which regenerates statistics for users.
         """
         if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_cleanup")
+            yield self._end_background_update("populate_stats_process_users")
+            return 1
+
+        def _get_next_batch(txn):
+            # Only fetch 250 users, so we don't fetch too many at once, even
+            # if those 250 users have less than batch_size state events.
+            sql = """
+                SELECT user_id FROM user_stats_current
+                WHERE completed_delta_stream_id IS NULL
+                LIMIT 250
+            """
+            txn.execute(sql)
+            users_to_work_on = txn.fetchall()
+
+            if not users_to_work_on:
+                return None
+
+            # Get how many are left to process, so we can give status on how
+            # far we are in processing
+            txn.execute(
+                "SELECT COUNT(*) FROM room_stats_current"
+                " WHERE completed_delta_stream_id IS NULL"
+            )
+            progress["remaining"] = txn.fetchone()[0]
+
+            return users_to_work_on
+
+        users_to_work_on = yield self.runInteraction(
+            "populate_stats_users_get_batch", _get_next_batch
+        )
+
+        # No more users -- complete the transaction.
+        if not users_to_work_on:
+            yield self._end_background_update("populate_stats_process_users")
             return 1
 
-        position = yield self._simple_select_one_onecol(
-            TEMP_TABLE + "_position", None, "position"
+        logger.info(
+            "Processing the next %d users of %d remaining",
+            len(users_to_work_on),
+            progress["remaining"],
         )
-        yield self.update_stats_stream_pos(position)
 
-        def _delete_staging_area(txn):
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+        processed_membership_count = 0
 
-        yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+        promised_positions = yield self.get_stats_positions(for_initial_processor=True)
 
-        yield self._end_background_update("populate_stats_cleanup")
-        return 1
+        if None in promised_positions:
+            logger.error(
+                "There is a None in promised_positions;"
+                " dependency task must not have been run."
+                " promised_positions: %s",
+                promised_positions,
+            )
+            yield self._end_background_update("populate_stats_process_users")
+            return 1
+
+        for (user_id,) in users_to_work_on:
+            now = self.hs.get_reactor().seconds()
+
+            def _process_user(txn):
+                # Get the current token
+                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+
+                sql = """
+                    SELECT
+                        (
+                            join_rules = 'public'
+                            OR history_visibility = 'world_readable'
+                        ) AS is_public,
+                        COUNT(*) AS count
+                    FROM room_memberships
+                    JOIN room_state USING (room_id)
+                    WHERE
+                        user_id = ? AND membership = 'join'
+                    GROUP BY is_public
+                """
+                txn.execute(sql, (user_id,))
+                room_counts_by_publicness = dict(txn.fetchall())
+
+                try:
+                    self._update_stats_delta_txn(
+                        txn,
+                        now,
+                        "user",
+                        user_id,
+                        {},
+                        complete_with_stream_id=current_token,
+                        absolute_fields={
+                            # these are counted absolutely because it is
+                            # more difficult to count them from the promised time,
+                            # because counting them now can use the quick lookup
+                            # tables.
+                            "public_rooms": room_counts_by_publicness.get(True, 0),
+                            "private_rooms": room_counts_by_publicness.get(False, 0),
+                        },
+                    )
+                except OldCollectionRequired:
+                    # this can't (shouldn't) actually happen
+                    # since we only run the background update for incomplete rows
+                    # and incomplete rows can never be old.
+                    # However, if it does, the most graceful handling is just to
+                    # ignore it – and carry on processing other users.
+                    logger.error(
+                        "Supposedly Impossible: OldCollectionRequired in initial"
+                        " background update, for user ID %s",
+                        user_id,
+                        exc_info=True,
+                    )
+                    pass
+
+                # we use this count for rate-limiting
+                return sum(room_counts_by_publicness.values())
+
+            processed_membership_count += yield self.runInteraction(
+                "update_user_stats", _process_user
+            )
+
+            # Update the remaining counter.
+            progress["remaining"] -= 1
+
+            if processed_membership_count > batch_size:
+                # Don't process any more users, we've hit our batch size.
+                return processed_membership_count
+
+        yield self.runInteraction(
+            "populate_stats",
+            self._background_update_progress_txn,
+            "populate_stats_process_users",
+            progress,
+        )
+
+        return processed_membership_count
 
     @defer.inlineCallbacks
     def _populate_stats_process_rooms(self, progress, batch_size):
-
+        """
+        This is a background update which regenerates statistics for rooms.
+        """
         if not self.stats_enabled:
             yield self._end_background_update("populate_stats_process_rooms")
             return 1
 
-        # If we don't have progress filed, delete everything.
-        if not progress:
-            yield self.delete_all_stats()
-
         def _get_next_batch(txn):
             # Only fetch 250 rooms, so we don't fetch too many at once, even
             # if those 250 rooms have less than batch_size state events.
             sql = """
-                SELECT room_id, events FROM %s_rooms
-                ORDER BY events DESC
+                SELECT room_id FROM room_stats_current
+                WHERE completed_delta_stream_id IS NULL
                 LIMIT 250
-            """ % (
-                TEMP_TABLE,
-            )
+            """
             txn.execute(sql)
             rooms_to_work_on = txn.fetchall()
 
@@ -174,13 +384,16 @@ class StatsStore(StateDeltasStore):
 
             # Get how many are left to process, so we can give status on how
             # far we are in processing
-            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
+            txn.execute(
+                "SELECT COUNT(*) FROM room_stats_current"
+                " WHERE completed_delta_stream_id IS NULL"
+            )
             progress["remaining"] = txn.fetchone()[0]
 
             return rooms_to_work_on
 
         rooms_to_work_on = yield self.runInteraction(
-            "populate_stats_temp_read", _get_next_batch
+            "populate_stats_rooms_get_batch", _get_next_batch
         )
 
         # No more rooms -- complete the transaction.
@@ -197,8 +410,19 @@ class StatsStore(StateDeltasStore):
         # Number of state events we've processed by going through each room
         processed_event_count = 0
 
-        for room_id, event_count in rooms_to_work_on:
+        promised_positions = yield self.get_stats_positions(for_initial_processor=True)
+
+        if None in promised_positions:
+            logger.error(
+                "There is a None in promised_positions;"
+                " dependency task must not have been run."
+                " promised_positions: %s",
+                promised_positions,
+            )
+            yield self._end_background_update("populate_stats_process_rooms")
+            return 1
 
+        for (room_id,) in rooms_to_work_on:
             current_state_ids = yield self.get_current_state_ids(room_id)
 
             join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
@@ -248,11 +472,7 @@ class StatsStore(StateDeltasStore):
 
             now = self.hs.get_reactor().seconds()
 
-            # quantise time to the nearest bucket
-            now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
             def _fetch_data(txn):
-
                 # Get the current token of the room
                 current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
 
@@ -260,82 +480,234 @@ class StatsStore(StateDeltasStore):
 
                 membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
 
-                total_state_events = self._get_total_state_event_counts_txn(
-                    txn, room_id
-                )
-
-                self._update_stats_txn(
+                room_total_event_count = self._count_events_in_room_txn(
                     txn,
-                    "room",
                     room_id,
-                    now,
-                    {
-                        "bucket_size": self.stats_bucket_size,
-                        "current_state_events": current_state_events,
-                        "joined_members": membership_counts.get(Membership.JOIN, 0),
-                        "invited_members": membership_counts.get(Membership.INVITE, 0),
-                        "left_members": membership_counts.get(Membership.LEAVE, 0),
-                        "banned_members": membership_counts.get(Membership.BAN, 0),
-                        "state_events": total_state_events,
-                    },
-                )
-                self._simple_insert_txn(
-                    txn,
-                    "room_stats_earliest_token",
-                    {"room_id": room_id, "token": current_token},
+                    promised_positions["total_events_min_stream_ordering"],
+                    promised_positions["total_events_max_stream_ordering"],
                 )
 
-                # We've finished a room. Delete it from the table.
-                self._simple_delete_one_txn(
-                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
-                )
-
-            yield self.runInteraction("update_room_stats", _fetch_data)
+                try:
+                    self._update_stats_delta_txn(
+                        txn,
+                        now,
+                        "room",
+                        room_id,
+                        {"total_events": room_total_event_count},
+                        complete_with_stream_id=current_token,
+                        absolute_fields={
+                            # these are counted absolutely because it is
+                            # more difficult to count them from the promised time,
+                            # because counting them now can use the quick lookup
+                            # tables.
+                            "current_state_events": current_state_events,
+                            "joined_members": membership_counts.get(Membership.JOIN, 0),
+                            "invited_members": membership_counts.get(
+                                Membership.INVITE, 0
+                            ),
+                            "left_members": membership_counts.get(Membership.LEAVE, 0),
+                            "banned_members": membership_counts.get(Membership.BAN, 0),
+                        },
+                    )
+                except OldCollectionRequired:
+                    # this can't (shouldn't) actually happen
+                    # since we only run the background update for incomplete rows
+                    # and incomplete rows can never be old.
+                    # However, if it does, the most graceful handling is just to
+                    # ignore it – and carry on processing other rooms.
+                    logger.error(
+                        "Supposedly Impossible: OldCollectionRequired in initial"
+                        " background update, for room ID %s",
+                        room_id,
+                        exc_info=True,
+                    )
+                    pass
+
+                # we use this count for rate-limiting
+                return room_total_event_count
+
+            room_event_count = yield self.runInteraction(
+                "update_room_stats", _fetch_data
+            )
 
             # Update the remaining counter.
             progress["remaining"] -= 1
-            yield self.runInteraction(
-                "populate_stats",
-                self._background_update_progress_txn,
-                "populate_stats_process_rooms",
-                progress,
-            )
 
-            processed_event_count += event_count
+            processed_event_count += room_event_count
 
             if processed_event_count > batch_size:
                 # Don't process any more rooms, we've hit our batch size.
                 return processed_event_count
 
+        yield self.runInteraction(
+            "populate_stats",
+            self._background_update_progress_txn,
+            "populate_stats_process_rooms",
+            progress,
+        )
+
         return processed_event_count
 
-    def delete_all_stats(self):
+    def update_total_event_count_between_txn(self, txn, low_pos, high_pos):
         """
-        Delete all statistics records.
+        Updates the total_events counts for rooms, in a range of stream_orderings.
+
+        Inclusivity of low_pos and high_pos is dependent upon their signs.
+        This makes it intuitive to use this function for both backfilled
+        and non-backfilled events.
+
+        Examples:
+        (low, high) → (kind)
+        (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+        (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+        (-7, 7) → -7 <= … <= 7 (both)
+
+        Args:
+            txn: Database transaction. It is assumed that you will have one,
+                since you probably want to update pointers at the same time.
+            low_pos: Low stream ordering
+            high_pos: High stream ordering
         """
 
-        def _delete_all_stats_txn(txn):
-            txn.execute("DELETE FROM room_state")
-            txn.execute("DELETE FROM room_stats")
-            txn.execute("DELETE FROM room_stats_earliest_token")
-            txn.execute("DELETE FROM user_stats")
+        if low_pos >= high_pos:
+            # nothing to do here.
+            return
 
-        return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
+        now = self.hs.get_reactor().seconds()
 
-    def get_stats_stream_pos(self):
-        return self._simple_select_one_onecol(
-            table="stats_stream_pos",
-            keyvalues={},
-            retcol="stream_id",
-            desc="stats_stream_pos",
+        # we choose comparators based on the signs
+        low_comparator = "<=" if low_pos < 0 else "<"
+        high_comparator = "<" if high_pos < 0 else "<="
+
+        sql = """
+            SELECT room_id, COUNT(*) AS new_events
+            FROM events
+            WHERE ? %s stream_ordering AND stream_ordering %s ?
+            GROUP BY room_id
+        """ % (
+            low_comparator,
+            high_comparator,
+        )
+
+        txn.execute(sql, (low_pos, high_pos))
+
+        for room_id, new_events in txn.fetchall():
+            while True:
+                try:
+                    self._update_stats_delta_txn(
+                        txn, now, "room", room_id, {"total_events": new_events}
+                    )
+                    break
+                except OldCollectionRequired:
+                    self._collect_old_txn(txn, "room")
+                    continue
+
+    def _count_events_in_room_txn(self, txn, room_id, low_token, high_token):
+        """
+        Count the number of events in a room between two tokens, inclusive.
+        Args:
+            txn (cursor): The database
+            room_id (str): The ID of the room to count events for
+            low_token (int): the minimum stream ordering to count
+            high_token (int): the maximum stream ordering to count
+
+        Returns (int):
+            the number of events
+        """
+
+        sql = """
+            SELECT COUNT(*) AS num_events
+            FROM events
+            WHERE room_id = ?
+                AND ? <= stream_ordering
+                AND stream_ordering <= ?
+        """
+        txn.execute(sql, (room_id, low_token, high_token))
+        return txn.fetchone()[0]
+
+    def get_stats_positions(self, for_initial_processor=False):
+        """
+        Returns the stats processor positions.
+
+        Args:
+            for_initial_processor (bool, optional): If true, returns the position
+                promised by the latest stats regeneration, rather than the current
+                incremental processor's position.
+                Otherwise (if false), return the incremental processor's position.
+
+        Returns (dict):
+            Dict containing :-
+                state_delta_stream_id: stream_id of last-processed state delta
+                total_events_min_stream_ordering: stream_ordering of latest-processed
+                    backfilled event, in the context of total_events counting.
+                total_events_max_stream_ordering: stream_ordering of latest-processed
+                    non-backfilled event, in the context of total_events counting.
+        """
+        return self._simple_select_one(
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            retcols=(
+                "state_delta_stream_id",
+                "total_events_min_stream_ordering",
+                "total_events_max_stream_ordering",
+            ),
+            desc="stats_incremental_position",
+        )
+
+    def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+        """
+        See L{get_stats_positions}.
+
+        Args:
+             txn (cursor): Database cursor
+        """
+        return self._simple_select_one_txn(
+            txn=txn,
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            retcols=(
+                "state_delta_stream_id",
+                "total_events_min_stream_ordering",
+                "total_events_max_stream_ordering",
+            ),
         )
 
-    def update_stats_stream_pos(self, stream_id):
+    def update_stats_positions(self, positions, for_initial_processor=False):
+        """
+        Updates the stats processor positions.
+
+        Args:
+            positions: See L{get_stats_positions}
+            for_initial_processor: See L{get_stats_positions}
+        """
+        if positions is None:
+            positions = {
+                "state_delta_stream_id": None,
+                "total_events_min_stream_ordering": None,
+                "total_events_max_stream_ordering": None,
+            }
         return self._simple_update_one(
-            table="stats_stream_pos",
-            keyvalues={},
-            updatevalues={"stream_id": stream_id},
-            desc="update_stats_stream_pos",
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            updatevalues=positions,
+            desc="update_stats_incremental_position",
+        )
+
+    def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
+        """
+        See L{update_stats_positions}
+        """
+        if positions is None:
+            positions = {
+                "state_delta_stream_id": None,
+                "total_events_min_stream_ordering": None,
+                "total_events_max_stream_ordering": None,
+            }
+        return self._simple_update_one_txn(
+            txn,
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            updatevalues=positions,
         )
 
     def update_room_state(self, room_id, fields):
@@ -367,36 +739,109 @@ class StatsStore(StateDeltasStore):
             desc="update_room_state",
         )
 
-    def get_deltas_for_room(self, room_id, start, size=100):
+    def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
         """
-        Get statistics deltas for a given room.
+        Get statistics for a given subject.
 
         Args:
-            room_id (str)
+            stats_type (str): The type of subject
+            stats_id (str): The ID of the subject (e.g. room_id or user_id)
             start (int): Pagination start. Number of entries, not timestamp.
             size (int): How many entries to return.
 
         Returns:
             Deferred[list[dict]], where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS["room"] and "ts".
+            ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
         """
-        return self._simple_select_list_paginate(
-            "room_stats",
-            {"room_id": room_id},
-            "ts",
+        return self.runInteraction(
+            "get_statistics_for_subject",
+            self._get_statistics_for_subject_txn,
+            stats_type,
+            stats_id,
             start,
             size,
-            retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+        )
+
+    def _get_statistics_for_subject_txn(
+        self, txn, stats_type, stats_id, start, size=100
+    ):
+        """
+        Transaction-bound version of L{get_statistics_for_subject}.
+        """
+
+        table, id_col = TYPE_TO_TABLE[stats_type]
+        selected_columns = list(
+            ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+        )
+
+        slice_list = self._simple_select_list_paginate_txn(
+            txn,
+            table + "_historical",
+            {id_col: stats_id},
+            "end_ts",
+            start,
+            size,
+            retcols=selected_columns + ["bucket_size", "end_ts"],
             order_direction="DESC",
         )
 
+        if len(slice_list) < size:
+            # also fetch the current row
+            current = self._simple_select_one_txn(
+                txn,
+                table + "_current",
+                {id_col: stats_id},
+                retcols=selected_columns
+                + ["start_ts", "end_ts", "completed_delta_stream_id"],
+                allow_none=True,
+            )
+
+            if current is not None:
+                completed = current["completed_delta_stream_id"] is not None
+                dirty = current["end_ts"] is not None
+
+                if completed and dirty:
+                    # it is dirty, so contains new information, so should be included
+                    # we don't accept incomplete rows as that would almost certainly
+                    # be giving misinformation, since it is awaiting an
+                    # initial background count
+                    current["bucket_size"] = current["end_ts"] - current["start_ts"]
+                    del current["start_ts"]
+                    return [current] + slice_list
+        return slice_list
+
     def get_all_room_state(self):
         return self._simple_select_list(
             "room_state", None, retcols=("name", "topic", "canonical_alias")
         )
 
+    def get_room_state(self, room_id):
+        """
+        Returns the current room_state for a room.
+
+        Args:
+            room_id (str): The ID of the room to return state for.
+
+        Returns (dict):
+            Dictionary containing these keys:
+                "name", "topic", "canonical_alias", "avatar", "join_rules",
+                "history_visibility"
+        """
+        return self._simple_select_one(
+            "room_state",
+            {"room_id": room_id},
+            retcols=(
+                "name",
+                "topic",
+                "canonical_alias",
+                "avatar",
+                "join_rules",
+                "history_visibility",
+            ),
+        )
+
     @cached()
-    def get_earliest_token_for_room_stats(self, room_id):
+    def get_earliest_token_for_stats(self, stats_type, id):
         """
         Fetch the "earliest token". This is used by the room stats delta
         processor to ignore deltas that have been processed between the
@@ -406,79 +851,275 @@ class StatsStore(StateDeltasStore):
         Returns:
             Deferred[int]
         """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
         return self._simple_select_one_onecol(
-            "room_stats_earliest_token",
-            {"room_id": room_id},
-            retcol="token",
+            "%s_current" % (table,),
+            {id_col: id},
+            retcol="completed_delta_stream_id",
             allow_none=True,
         )
 
-    def update_stats(self, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert(
-            table=table,
-            keyvalues={id_col: stats_id, "ts": ts},
-            values=fields,
-            desc="update_stats",
+    def _collect_old_txn(self, txn, stats_type, limit=500):
+        """
+        See {collect_old}. Runs only a small batch, specified by limit.
+
+        Returns (bool):
+            True iff there is possibly more to do (i.e. this needs re-running),
+            False otherwise.
+
+        """
+        # we do them in batches to prevent concurrent updates from
+        # messing us over with lots of retries
+
+        now = self.hs.get_reactor().seconds()
+        quantised_ts = self.quantise_stats_time(now)
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        fields = ", ".join(
+            field
+            for field in chain(
+                ABSOLUTE_STATS_FIELDS[stats_type], PER_SLICE_FIELDS[stats_type]
+            )
+        )
+
+        # `end_ts IS NOT NULL` is for partial index optimisation
+        if isinstance(self.database_engine, Sqlite3Engine):
+            # SQLite doesn't support SELECT FOR UPDATE
+            sql = (
+                "SELECT %s FROM %s_current"
+                " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+                " LIMIT %d"
+            ) % (id_col, table, limit)
+        else:
+            sql = (
+                "SELECT %s FROM %s_current"
+                " WHERE end_ts <= ? AND end_ts IS NOT NULL"
+                " LIMIT %d FOR UPDATE"
+            ) % (id_col, table, limit)
+        txn.execute(sql, (quantised_ts,))
+        maybe_more = txn.rowcount == limit
+        updates = txn.fetchall()
+
+        sql = (
+            "INSERT INTO %s_historical (%s, %s, bucket_size, end_ts)"
+            " SELECT %s, %s, end_ts - start_ts AS bucket_size, end_ts"
+            " FROM %s_current WHERE %s = ?"
+        ) % (table, id_col, fields, id_col, fields, table, id_col)
+        txn.executemany(sql, updates)
+
+        sql = ("UPDATE %s_current SET start_ts = NULL, end_ts = NULL WHERE %s = ?") % (
+            table,
+            id_col,
         )
+        txn.executemany(sql, updates)
+
+        return maybe_more
+
+    @defer.inlineCallbacks
+    def collect_old(self, stats_type):
+        """
+        Run 'old collection' on current stats rows.
+
+        Old collection is the process of copying dirty (updated) stats rows
+        from the current table to the historical table, when those rows have
+        finished their stats time slice.
+        Collected rows are then cleared of their dirty status.
+
+        Args:
+            stats_type: "room" or "user" – the type of stats to run old collection
+                on.
+
+        """
+        while True:
+            maybe_more = yield self.runInteraction(
+                "stats_collect_old", self._collect_old_txn, stats_type
+            )
+            if not maybe_more:
+                return None
+
+    @defer.inlineCallbacks
+    def update_stats_delta(
+        self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+    ):
+        """
+        Updates the statistics for a subject, with a delta (difference/relative
+        change).
+
+        Args:
+            ts (int): timestamp of the change
+            stats_type (str): "room" or "user" – the kind of subject
+            stats_id (str): the subject's ID (room ID or user ID)
+            fields (dict[str, int]): Deltas of stats values.
+            complete_with_stream_id (int, optional):
+                If supplied, converts an incomplete row into a complete row,
+                with the supplied stream_id marked as the stream_id where the
+                row was completed.
+        """
+
+        while True:
+            try:
+                res = yield self.runInteraction(
+                    "update_stats_delta",
+                    self._update_stats_delta_txn,
+                    ts,
+                    stats_type,
+                    stats_id,
+                    fields,
+                    complete_with_stream_id=complete_with_stream_id,
+                )
+                return res
+            except OldCollectionRequired:
+                # retry after collecting old rows
+                yield self.collect_old(stats_type)
+
+    def _update_stats_delta_txn(
+        self,
+        txn,
+        ts,
+        stats_type,
+        stats_id,
+        fields,
+        complete_with_stream_id=None,
+        absolute_fields=None,
+    ):
+        """
+        See L{update_stats_delta}
+        Additional Args:
+            absolute_fields (dict[str, int]): Absolute stats values (i.e. not deltas).
+        """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        quantised_ts = self.quantise_stats_time(int(ts))
+        end_ts = quantised_ts + self.stats_bucket_size
+
+        field_sqls = ["%s = %s + ?" % (field, field) for field in fields.keys()]
+        field_values = list(fields.values())
+
+        if absolute_fields is not None:
+            field_sqls += ["%s = ?" % (field,) for field in absolute_fields.keys()]
+            field_values += list(absolute_fields.values())
+
+        if complete_with_stream_id is not None:
+            field_sqls.append("completed_delta_stream_id = ?")
+            field_values.append(complete_with_stream_id)
+
+        sql = (
+            "UPDATE %s_current SET end_ts = ?, %s"
+            " WHERE (end_ts IS NOT NULL AND (end_ts >= ? OR completed_delta_stream_id IS NULL))"
+            " AND %s = ?"
+        ) % (table, ", ".join(field_sqls), id_col)
+
+        qargs = [end_ts] + list(field_values) + [end_ts, stats_id]
+
+        txn.execute(sql, qargs)
 
-    def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert_txn(
-            txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+        if txn.rowcount > 0:
+            # success.
+            return
+
+        # if we're here, it's because we didn't succeed in updating a stats
+        # row. Why? Let's find out…
+
+        current_row = self._simple_select_one_txn(
+            txn,
+            table + "_current",
+            {id_col: stats_id},
+            ("end_ts", "completed_delta_stream_id"),
+            allow_none=True,
         )
 
-    def update_stats_delta(self, ts, stats_type, stats_id, field, value):
-        def _update_stats_delta(txn):
-            table, id_col = TYPE_TO_ROOM[stats_type]
+        if current_row is None:
+            # we need to insert a row! (insert a dirty, incomplete row)
+            insertee = {
+                id_col: stats_id,
+                "end_ts": end_ts,
+                "start_ts": ts,
+                "completed_delta_stream_id": complete_with_stream_id,
+            }
+
+            # we assume that, by default, blank fields should be zero.
+            for field_name in ABSOLUTE_STATS_FIELDS[stats_type]:
+                insertee[field_name] = 0
+
+            for field_name in PER_SLICE_FIELDS[stats_type]:
+                insertee[field_name] = 0
+
+            for (field, value) in fields.items():
+                insertee[field] = value
+
+            if absolute_fields is not None:
+                for (field, value) in absolute_fields.items():
+                    insertee[field] = value
+
+            self._simple_insert_txn(txn, table + "_current", insertee)
 
+        elif current_row["end_ts"] is None:
+            # update the row, including start_ts
             sql = (
-                "SELECT * FROM %s"
-                " WHERE %s=? and ts=("
-                "  SELECT MAX(ts) FROM %s"
-                "  WHERE %s=?"
-                ")"
-            ) % (table, id_col, table, id_col)
-            txn.execute(sql, (stats_id, stats_id))
-            rows = self.cursor_to_dict(txn)
-            if len(rows) == 0:
-                # silently skip as we don't have anything to apply a delta to yet.
-                # this tries to minimise any race between the initial sync and
-                # subsequent deltas arriving.
-                return
-
-            current_ts = ts
-            latest_ts = rows[0]["ts"]
-            if current_ts < latest_ts:
-                # This one is in the past, but we're just encountering it now.
-                # Mark it as part of the current bucket.
-                current_ts = latest_ts
-            elif ts != latest_ts:
-                # we have to copy our absolute counters over to the new entry.
-                values = {
-                    key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
-                }
-                values[id_col] = stats_id
-                values["ts"] = ts
-                values["bucket_size"] = self.stats_bucket_size
-
-                self._simple_insert_txn(txn, table=table, values=values)
-
-            # actually update the new value
-            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
-                self._simple_update_txn(
-                    txn,
-                    table=table,
-                    keyvalues={id_col: stats_id, "ts": current_ts},
-                    updatevalues={field: value},
-                )
-            else:
-                sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
-                    table,
-                    field,
-                    field,
-                    id_col,
+                "UPDATE %s_current SET start_ts = ?, end_ts = ?, %s"
+                " WHERE end_ts IS NULL AND %s = ?"
+            ) % (table, ", ".join(field_sqls), id_col)
+
+            qargs = (
+                [end_ts - self.stats_bucket_size, end_ts]
+                + list(field_values)
+                + [stats_id]
+            )
+
+            txn.execute(sql, qargs)
+            if txn.rowcount == 0:
+                raise RuntimeError(
+                    "Should be impossible: No rows updated"
+                    " but all conditions are known to be met."
                 )
-                txn.execute(sql, (value, stats_id, current_ts))
 
-        return self.runInteraction("update_stats_delta", _update_stats_delta)
+        elif current_row["end_ts"] < end_ts:
+            # we need to perform old collection first
+            raise OldCollectionRequired()
+
+    def incremental_update_total_events(self, in_positions):
+        """
+        Counts the number of events per-room and then adds these to the respective
+        total_events room counts.
+
+        Args:
+            in_positions (dict): Positions,
+                as retrieved from L{get_stats_positions}.
+
+        Returns (dict):
+            The new positions. Note that this is for reference only –
+            the new positions WILL be committed by this function.
+        """
+
+        def incremental_update_total_events_txn(txn):
+            positions = in_positions.copy()
+
+            max_pos = self.get_room_max_stream_ordering()
+            min_pos = self.get_room_min_stream_ordering()
+            self.update_total_event_count_between_txn(
+                txn,
+                low_pos=positions["total_events_max_stream_ordering"],
+                high_pos=max_pos,
+            )
+
+            self.update_total_event_count_between_txn(
+                txn,
+                low_pos=min_pos,
+                high_pos=positions["total_events_min_stream_ordering"],
+            )
+
+            if (
+                positions["total_events_max_stream_ordering"] != max_pos
+                or positions["total_events_min_stream_ordering"] != min_pos
+            ):
+                positions["total_events_max_stream_ordering"] = max_pos
+                positions["total_events_min_stream_ordering"] = min_pos
+
+                self._update_stats_positions_txn(txn, positions)
+
+            return positions
+
+        return self.runInteraction(
+            "stats_incremental_total_events", incremental_update_total_events_txn
+        )