summary refs log tree commit diff
diff options
context:
space:
mode:
-rw-r--r--changelog.d/5879.misc1
-rw-r--r--synapse/config/stats.py13
-rw-r--r--synapse/handlers/stats.py227
-rw-r--r--synapse/storage/events.py5
-rw-r--r--synapse/storage/schema/delta/56/stats_separated1.sql144
-rw-r--r--synapse/storage/schema/delta/56/stats_separated2.sql.postgres24
-rw-r--r--synapse/storage/schema/delta/56/stats_separated2.sql.sqlite27
-rw-r--r--synapse/storage/stats.py845
-rw-r--r--tests/handlers/test_stats.py304
9 files changed, 818 insertions, 772 deletions
diff --git a/changelog.d/5879.misc b/changelog.d/5879.misc
new file mode 100644
index 0000000000..e1c829772f
--- /dev/null
+++ b/changelog.d/5879.misc
@@ -0,0 +1 @@
+Rework room and user statistics to separate current & historical rows, as well as track stats correctly.
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
index b518a3ed9c..b18ddbd1fa 100644
--- a/synapse/config/stats.py
+++ b/synapse/config/stats.py
@@ -27,19 +27,16 @@ class StatsConfig(Config):
 
     def read_config(self, config, **kwargs):
         self.stats_enabled = True
-        self.stats_bucket_size = 86400
+        self.stats_bucket_size = 86400 * 1000
         self.stats_retention = sys.maxsize
         stats_config = config.get("stats", None)
         if stats_config:
             self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
-            self.stats_bucket_size = (
-                self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
+            self.stats_bucket_size = self.parse_duration(
+                stats_config.get("bucket_size", "1d")
             )
-            self.stats_retention = (
-                self.parse_duration(
-                    stats_config.get("retention", "%ds" % (sys.maxsize,))
-                )
-                / 1000
+            self.stats_retention = self.parse_duration(
+                stats_config.get("retention", "%ds" % (sys.maxsize,))
             )
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..f44adfc07b 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -49,9 +49,6 @@ class StatsHandler(StateDeltasHandler):
         # The current position in the current_state_delta stream
         self.pos = None
 
-        # Guard to ensure we only process deltas one at a time
-        self._is_processing = False
-
         if hs.config.stats_enabled:
             self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -65,43 +62,60 @@ class StatsHandler(StateDeltasHandler):
         if not self.hs.config.stats_enabled:
             return
 
-        if self._is_processing:
-            return
+        lock = self.store.stats_delta_processing_lock
 
         @defer.inlineCallbacks
         def process():
+            yield lock.acquire()
             try:
                 yield self._unsafe_process()
             finally:
-                self._is_processing = False
+                yield lock.release()
 
-        self._is_processing = True
-        run_as_background_process("stats.notify_new_event", process)
+        if not lock.locked:
+            # we only want to run this process one-at-a-time,
+            # and also, if the initial background updater wants us to keep out,
+            # we should respect that.
+            run_as_background_process("stats.notify_new_event", process)
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
-        if self.pos is None:
-            self.pos = yield self.store.get_stats_stream_pos()
+        # If None is one of the values, then means that the stats regenerator has not (or had not) yet unwedged us
+        #   but note that this might be outdated, so we retrieve the positions again.
+        if self.pos is None or None in self.pos.values():
+            self.pos = yield self.store.get_stats_positions()
 
-        # If still None then the initial background update hasn't happened yet
-        if self.pos is None:
+        # If still contains a None position, then the stats regenerator hasn't started yet
+        if None in self.pos.values():
             return None
 
         # Loop round handling deltas until we're up to date
+
         while True:
             with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
-                    return
+                deltas = yield self.store.get_current_state_deltas(
+                    self.pos["state_delta_stream_id"]
+                )
 
-                logger.info("Handling %d state deltas", len(deltas))
+                logger.debug("Handling %d state deltas", len(deltas))
                 yield self._handle_deltas(deltas)
 
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
+                self.pos["state_delta_stream_id"] = deltas[-1]["stream_id"]
+                yield self.store.update_stats_positions(self.pos)
+
+                event_processing_positions.labels("stats").set(
+                    self.pos["state_delta_stream_id"]
+                )
+
+            # Then count deltas for total_events and total_event_bytes.
+            with Measure(self.clock, "stats_total_events_and_bytes"):
+                self.pos, had_counts = yield self.store.incremental_update_room_total_events_and_bytes(
+                    self.pos
+                )
 
-                event_processing_positions.labels("stats").set(self.pos)
+            if not deltas and not had_counts:
+                break
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
@@ -119,7 +133,7 @@ class StatsHandler(StateDeltasHandler):
 
             logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+            token = yield self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -131,7 +145,10 @@ class StatsHandler(StateDeltasHandler):
                 continue
 
             if event_id is None and prev_event_id is None:
-                # Errr...
+                logger.error(
+                    "event ID is None and so is the previous event ID. stream_id: %s",
+                    stream_id,
+                )
                 continue
 
             event_content = {}
@@ -143,92 +160,87 @@ class StatsHandler(StateDeltasHandler):
 
             # We use stream_pos here rather than fetch by event_id as event_id
             # may be None
-            now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+            stream_timestamp = yield self.store.get_received_ts_by_stream_pos(
+                stream_pos
+            )
+            stream_timestamp = int(stream_timestamp)
+
+            # All the values in this dict are deltas (RELATIVE changes)
+            room_stats_delta = {}
+            is_newly_created = False
 
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+            if prev_event_id is None:
+                # this state event doesn't overwrite another,
+                # so it is a new effective/current state event
+                room_stats_delta["current_state_events"] = 1
 
             if typ == EventTypes.Member:
                 # we could use _get_key_change here but it's a bit inefficient
                 # given we're not testing for a specific result; might as well
                 # just grab the prev_membership and membership strings and
                 # compare them.
-                prev_event_content = {}
+                # We take None rather than leave as a previous membership
+                # in the absence of a previous event because we do not want to
+                # reduce the leave count when a new-to-the-room user joins.
+                prev_membership = None
                 if prev_event_id is not None:
                     prev_event = yield self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
+                        prev_membership = prev_event_content.get(
+                            "membership", Membership.LEAVE
+                        )
 
                 membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
 
-                if prev_membership == membership:
-                    continue
-
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
-                    )
+                if prev_membership is None:
+                    logger.debug("No previous membership for this user.")
+                elif membership == prev_membership:
+                    pass  # noop
+                elif prev_membership == Membership.JOIN:
+                    room_stats_delta["joined_members"] = -1
                 elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
-                    )
+                    room_stats_delta["invited_members"] = -1
                 elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
-                    )
+                    room_stats_delta["left_members"] = -1
                 elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
-                    )
+                    room_stats_delta["banned_members"] = -1
                 else:
-                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError(
+                        "%r is not a valid prev_membership" % (prev_membership,)
+                    )
 
+                if membership == prev_membership:
+                    pass  # noop
                 if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
-                    )
+                    room_stats_delta["joined_members"] = +1
                 elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
-                    )
+                    room_stats_delta["invited_members"] = +1
                 elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
-                    )
+                    room_stats_delta["left_members"] = +1
                 elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
-                    )
+                    room_stats_delta["banned_members"] = +1
                 else:
-                    err = "%s is not a valid membership" % (repr(membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError("%r is not a valid membership" % (membership,))
 
                 user_id = state_key
                 if self.is_mine_id(user_id):
-                    # update user_stats as it's one of our users
-                    public = yield self._is_public_room(room_id)
+                    # this accounts for transitions like leave → ban and so on.
+                    has_changed_joinedness = (prev_membership == Membership.JOIN) != (
+                        membership == Membership.JOIN
+                    )
+
+                    if has_changed_joinedness:
+                        # update user_stats as it's one of our users
+                        public = yield self._is_public_room(room_id)
+
+                        field = "public_rooms" if public else "private_rooms"
+                        delta = +1 if membership == Membership.JOIN else -1
 
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
                         yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
+                            stream_timestamp, "user", user_id, {field: delta}
                         )
 
             elif typ == EventTypes.Create:
@@ -246,28 +258,50 @@ class StatsHandler(StateDeltasHandler):
                     },
                 )
 
+                is_newly_created = True
+
             elif typ == EventTypes.JoinRules:
+                old_room_state = yield self.store.get_room_state(room_id)
                 yield self.store.update_room_state(
                     room_id, {"join_rules": event_content.get("join_rule")}
                 )
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                # whether the room would be public anyway,
+                # because of history_visibility
+                other_field_gives_publicity = (
+                    old_room_state["history_visibility"] == "world_readable"
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+
+                if not other_field_gives_publicity:
+                    is_public = yield self._get_key_change(
+                        prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
+                    )
+                    if is_public is not None:
+                        yield self.update_public_room_stats(
+                            stream_timestamp, room_id, is_public
+                        )
 
             elif typ == EventTypes.RoomHistoryVisibility:
+                old_room_state = yield self.store.get_room_state(room_id)
                 yield self.store.update_room_state(
                     room_id,
                     {"history_visibility": event_content.get("history_visibility")},
                 )
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
+                # whether the room would be public anyway,
+                # because of join_rule
+                other_field_gives_publicity = (
+                    old_room_state["join_rules"] == JoinRules.PUBLIC
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+
+                if not other_field_gives_publicity:
+                    is_public = yield self._get_key_change(
+                        prev_event_id, event_id, "history_visibility", "world_readable"
+                    )
+                    if is_public is not None:
+                        yield self.update_public_room_stats(
+                            stream_timestamp, room_id, is_public
+                        )
 
             elif typ == EventTypes.Encryption:
                 yield self.store.update_room_state(
@@ -290,6 +324,20 @@ class StatsHandler(StateDeltasHandler):
                     room_id, {"canonical_alias": event_content.get("alias")}
                 )
 
+            if is_newly_created:
+                yield self.store.update_stats_delta(
+                    stream_timestamp,
+                    "room",
+                    room_id,
+                    room_stats_delta,
+                    complete_with_stream_id=stream_id,
+                )
+
+            elif len(room_stats_delta) > 0:
+                yield self.store.update_stats_delta(
+                    stream_timestamp, "room", room_id, room_stats_delta
+                )
+
     @defer.inlineCallbacks
     def update_public_room_stats(self, ts, room_id, is_public):
         """
@@ -308,10 +356,13 @@ class StatsHandler(StateDeltasHandler):
         for user_id in user_ids:
             if self.hs.is_mine(UserID.from_string(user_id)):
                 yield self.store.update_stats_delta(
-                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
-                )
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
+                    ts,
+                    "user",
+                    user_id,
+                    {
+                        "public_rooms": +1 if is_public else -1,
+                        "private_rooms": -1 if is_public else +1,
+                    },
                 )
 
     @defer.inlineCallbacks
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 5a95c36a8b..5527dd208e 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2270,8 +2270,9 @@ class EventsStore(
             "room_aliases",
             "room_depth",
             "room_memberships",
-            "room_state",
-            "room_stats",
+            "room_stats_state",
+            "room_stats_current",
+            "room_stats_historical",
             "room_stats_earliest_token",
             "rooms",
             "stream_ordering_to_exterm",
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
new file mode 100644
index 0000000000..6d4648c0d7
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -0,0 +1,144 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+    'populate_stats_createtables',
+    'populate_stats_process_rooms',
+    'populate_stats_cleanup'
+);
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+CREATE TABLE IF NOT EXISTS stats_incremental_position (
+    -- the stream_id of the last-processed state delta
+    state_delta_stream_id BIGINT,
+
+    -- the stream_ordering of the last-processed backfilled event
+    -- (this is negative)
+    total_events_min_stream_ordering BIGINT,
+
+    -- the stream_ordering of the last-processed normally-created event
+    -- (this is positive)
+    total_events_max_stream_ordering BIGINT,
+
+    -- If true, this represents the contract agreed upon by the stats
+    -- regenerator.
+    -- If false, this is suitable for use by the delta/incremental processor.
+    is_background_contract BOOLEAN NOT NULL PRIMARY KEY
+);
+
+-- insert a null row and make sure it is the only one.
+DELETE FROM stats_incremental_position;
+INSERT INTO stats_incremental_position (
+    state_delta_stream_id,
+    total_events_min_stream_ordering,
+    total_events_max_stream_ordering,
+    is_background_contract
+) VALUES (NULL, NULL, NULL, (0 = 1)), (NULL, NULL, NULL, (1 = 1));
+
+-- represents PRESENT room statistics for a room
+-- only holds absolute fields
+CREATE TABLE IF NOT EXISTS room_stats_current (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    current_state_events INT NOT NULL,
+    total_events INT NOT NULL,
+    total_event_bytes BIGINT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+
+    -- If initial stats regen is still to be performed: NULL
+    -- If initial stats regen has been performed: the maximum delta stream
+    --  position that this row takes into account.
+    completed_delta_stream_id BIGINT
+);
+
+
+-- represents HISTORICAL room statistics for a room
+CREATE TABLE IF NOT EXISTS room_stats_historical (
+    room_id TEXT NOT NULL,
+    -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
+    -- Note that end_ts is quantised.
+    end_ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+
+    current_state_events INT NOT NULL,
+    total_events INT NOT NULL,
+    total_event_bytes BIGINT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+
+    PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX IF NOT EXISTS room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- We don't need an index on (room_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular room.)
+
+
+-- represents PRESENT statistics for a user
+-- only holds absolute fields
+CREATE TABLE IF NOT EXISTS user_stats_current (
+    user_id TEXT NOT NULL PRIMARY KEY,
+
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL,
+
+    -- If initial stats regen is still to be performed: NULL
+    -- If initial stats regen has been performed: the maximum delta stream
+    --  position that this row takes into account.
+    completed_delta_stream_id BIGINT
+);
+
+-- represents HISTORICAL statistics for a user
+CREATE TABLE IF NOT EXISTS user_stats_historical (
+    user_id TEXT NOT NULL,
+    end_ts BIGINT NOT NULL,
+    bucket_size INT NOT NULL,
+
+    public_rooms INT NOT NULL,
+    private_rooms INT NOT NULL,
+
+    PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX IF NOT EXISTS user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+-- We don't need an index on (user_id, end_ts) because PRIMARY KEY sorts that
+-- out for us. (We would want it to review stats for a particular user.)
+
+-- Also rename room_state to room_stats_state to make its ownership clear.
+ALTER TABLE room_state RENAME TO room_stats_state;
diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.postgres b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres
new file mode 100644
index 0000000000..0519fcff79
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.sql.postgres
@@ -0,0 +1,24 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- These partial indices helps us with finding incomplete stats row
+CREATE INDEX IF NOT EXISTS room_stats_not_complete
+    ON room_stats_current (room_id)
+    WHERE completed_delta_stream_id IS NULL;
+
+CREATE INDEX IF NOT EXISTS user_stats_not_complete
+    ON user_stats_current (user_id)
+    WHERE completed_delta_stream_id IS NULL;
+
diff --git a/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
new file mode 100644
index 0000000000..181f4ec5b9
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated2.sql.sqlite
@@ -0,0 +1,27 @@
+/* Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+-- even though SQLite >= 3.8 can support partial indices, we won't enable
+-- them, in case the SQLite database may be later used on another system.
+-- It's also the case that SQLite is only likely to be used in small
+-- deployments or testing, where the optimisations gained by use of a
+-- partial index are not a big concern.
+
+CREATE INDEX IF NOT EXISTS room_stats_not_complete
+    ON room_stats_current (completed_delta_stream_id, room_id);
+
+CREATE INDEX IF NOT EXISTS user_stats_not_complete
+    ON user_stats_current (completed_delta_stream_id, user_id);
+
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..9d6c3027d5 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,17 +15,20 @@
 # limitations under the License.
 
 import logging
+from itertools import chain
 
-from twisted.internet import defer
+from twisted.internet.defer import DeferredLock
 
-from synapse.api.constants import EventTypes, Membership
-from synapse.storage.prepare_database import get_statements
+from synapse.storage import PostgresEngine
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
 logger = logging.getLogger(__name__)
 
 # these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
 ABSOLUTE_STATS_FIELDS = {
     "room": (
         "current_state_events",
@@ -32,14 +36,18 @@ ABSOLUTE_STATS_FIELDS = {
         "invited_members",
         "left_members",
         "banned_members",
-        "state_events",
+        "total_events",
+        "total_event_bytes",
     ),
     "user": ("public_rooms", "private_rooms"),
 }
 
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
+PER_SLICE_FIELDS = {"room": (), "user": ()}
 
-TEMP_TABLE = "_temp_populate_stats"
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
 
 class StatsStore(StateDeltasStore):
@@ -51,291 +59,111 @@ class StatsStore(StateDeltasStore):
         self.stats_enabled = hs.config.stats_enabled
         self.stats_bucket_size = hs.config.stats_bucket_size
 
-        self.register_background_update_handler(
-            "populate_stats_createtables", self._populate_stats_createtables
-        )
-        self.register_background_update_handler(
-            "populate_stats_process_rooms", self._populate_stats_process_rooms
-        )
-        self.register_background_update_handler(
-            "populate_stats_cleanup", self._populate_stats_cleanup
-        )
-
-    @defer.inlineCallbacks
-    def _populate_stats_createtables(self, progress, batch_size):
-
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_createtables")
-            return 1
-
-        # Get all the rooms that we want to process.
-        def _make_staging_area(txn):
-            # Create the temporary tables
-            stmts = get_statements(
-                """
-                -- We just recreate the table, we'll be reinserting the
-                -- correct entries again later anyway.
-                DROP TABLE IF EXISTS {temp}_rooms;
-
-                CREATE TABLE IF NOT EXISTS {temp}_rooms(
-                    room_id TEXT NOT NULL,
-                    events BIGINT NOT NULL
-                );
-
-                CREATE INDEX {temp}_rooms_events
-                    ON {temp}_rooms(events);
-                CREATE INDEX {temp}_rooms_id
-                    ON {temp}_rooms(room_id);
-            """.format(
-                    temp=TEMP_TABLE
-                ).splitlines()
-            )
+        self.stats_delta_processing_lock = DeferredLock()
 
-            for statement in stmts:
-                txn.execute(statement)
-
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_position(position TEXT NOT NULL)"
-            )
-            txn.execute(sql)
-
-            # Get rooms we want to process from the database, only adding
-            # those that we haven't (i.e. those not in room_stats_earliest_token)
-            sql = """
-                INSERT INTO %s_rooms (room_id, events)
-                SELECT c.room_id, count(*) FROM current_state_events AS c
-                LEFT JOIN room_stats_earliest_token AS t USING (room_id)
-                WHERE t.room_id IS NULL
-                GROUP BY c.room_id
-            """ % (
-                TEMP_TABLE,
-            )
-            txn.execute(sql)
+        self.register_noop_background_update("populate_stats_createtables")
+        self.register_noop_background_update("populate_stats_process_rooms")
+        self.register_noop_background_update("populate_stats_cleanup")
 
-        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
-        yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
-        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
-        self.get_earliest_token_for_room_stats.invalidate_all()
+    def quantise_stats_time(self, ts):
+        """
+        Quantises a timestamp to be a multiple of the bucket size.
 
-        yield self._end_background_update("populate_stats_createtables")
-        return 1
+        Args:
+            ts (int): the timestamp to quantise, in milliseconds since the Unix
+                Epoch
 
-    @defer.inlineCallbacks
-    def _populate_stats_cleanup(self, progress, batch_size):
+        Returns:
+            int: a timestamp which
+              - is divisible by the bucket size;
+              - is no later than `ts`; and
+              - is the largest such timestamp.
         """
-        Update the user directory stream position, then clean up the old tables.
+        return (ts // self.stats_bucket_size) * self.stats_bucket_size
+
+    def get_stats_positions(self, for_initial_processor=False):
         """
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_cleanup")
-            return 1
+        Returns the stats processor positions.
 
-        position = yield self._simple_select_one_onecol(
-            TEMP_TABLE + "_position", None, "position"
+        Args:
+            for_initial_processor (bool, optional): If true, returns the position
+                promised by the latest stats regeneration, rather than the current
+                incremental processor's position.
+                Otherwise (if false), return the incremental processor's position.
+
+        Returns (dict):
+            Dict containing :-
+                state_delta_stream_id: stream_id of last-processed state delta
+                total_events_min_stream_ordering: stream_ordering of latest-processed
+                    backfilled event, in the context of total_events counting.
+                total_events_max_stream_ordering: stream_ordering of latest-processed
+                    non-backfilled event, in the context of total_events counting.
+        """
+        return self._simple_select_one(
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            retcols=(
+                "state_delta_stream_id",
+                "total_events_min_stream_ordering",
+                "total_events_max_stream_ordering",
+            ),
+            desc="stats_incremental_position",
         )
-        yield self.update_stats_stream_pos(position)
-
-        def _delete_staging_area(txn):
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
-
-        yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
-
-        yield self._end_background_update("populate_stats_cleanup")
-        return 1
-
-    @defer.inlineCallbacks
-    def _populate_stats_process_rooms(self, progress, batch_size):
 
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_process_rooms")
-            return 1
-
-        # If we don't have progress filed, delete everything.
-        if not progress:
-            yield self.delete_all_stats()
-
-        def _get_next_batch(txn):
-            # Only fetch 250 rooms, so we don't fetch too many at once, even
-            # if those 250 rooms have less than batch_size state events.
-            sql = """
-                SELECT room_id, events FROM %s_rooms
-                ORDER BY events DESC
-                LIMIT 250
-            """ % (
-                TEMP_TABLE,
-            )
-            txn.execute(sql)
-            rooms_to_work_on = txn.fetchall()
-
-            if not rooms_to_work_on:
-                return None
-
-            # Get how many are left to process, so we can give status on how
-            # far we are in processing
-            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
-            progress["remaining"] = txn.fetchone()[0]
-
-            return rooms_to_work_on
+    def _get_stats_positions_txn(self, txn, for_initial_processor=False):
+        """
+        See L{get_stats_positions}.
 
-        rooms_to_work_on = yield self.runInteraction(
-            "populate_stats_temp_read", _get_next_batch
+        Args:
+             txn (cursor): Database cursor
+        """
+        return self._simple_select_one_txn(
+            txn=txn,
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            retcols=(
+                "state_delta_stream_id",
+                "total_events_min_stream_ordering",
+                "total_events_max_stream_ordering",
+            ),
         )
 
-        # No more rooms -- complete the transaction.
-        if not rooms_to_work_on:
-            yield self._end_background_update("populate_stats_process_rooms")
-            return 1
+    def update_stats_positions(self, positions, for_initial_processor=False):
+        """
+        Updates the stats processor positions.
 
-        logger.info(
-            "Processing the next %d rooms of %d remaining",
-            len(rooms_to_work_on),
-            progress["remaining"],
+        Args:
+            positions: See L{get_stats_positions}
+            for_initial_processor: See L{get_stats_positions}
+        """
+        if positions is None:
+            positions = {
+                "state_delta_stream_id": None,
+                "total_events_min_stream_ordering": None,
+                "total_events_max_stream_ordering": None,
+            }
+        return self._simple_update_one(
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            updatevalues=positions,
+            desc="update_stats_incremental_position",
         )
 
-        # Number of state events we've processed by going through each room
-        processed_event_count = 0
-
-        for room_id, event_count in rooms_to_work_on:
-
-            current_state_ids = yield self.get_current_state_ids(room_id)
-
-            join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
-            history_visibility_id = current_state_ids.get(
-                (EventTypes.RoomHistoryVisibility, "")
-            )
-            encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
-            name_id = current_state_ids.get((EventTypes.Name, ""))
-            topic_id = current_state_ids.get((EventTypes.Topic, ""))
-            avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
-            canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
-
-            event_ids = [
-                join_rules_id,
-                history_visibility_id,
-                encryption_id,
-                name_id,
-                topic_id,
-                avatar_id,
-                canonical_alias_id,
-            ]
-
-            state_events = yield self.get_events(
-                [ev for ev in event_ids if ev is not None]
-            )
-
-            def _get_or_none(event_id, arg):
-                event = state_events.get(event_id)
-                if event:
-                    return event.content.get(arg)
-                return None
-
-            yield self.update_room_state(
-                room_id,
-                {
-                    "join_rules": _get_or_none(join_rules_id, "join_rule"),
-                    "history_visibility": _get_or_none(
-                        history_visibility_id, "history_visibility"
-                    ),
-                    "encryption": _get_or_none(encryption_id, "algorithm"),
-                    "name": _get_or_none(name_id, "name"),
-                    "topic": _get_or_none(topic_id, "topic"),
-                    "avatar": _get_or_none(avatar_id, "url"),
-                    "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
-                },
-            )
-
-            now = self.hs.get_reactor().seconds()
-
-            # quantise time to the nearest bucket
-            now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
-            def _fetch_data(txn):
-
-                # Get the current token of the room
-                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
-                current_state_events = len(current_state_ids)
-
-                membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
-
-                total_state_events = self._get_total_state_event_counts_txn(
-                    txn, room_id
-                )
-
-                self._update_stats_txn(
-                    txn,
-                    "room",
-                    room_id,
-                    now,
-                    {
-                        "bucket_size": self.stats_bucket_size,
-                        "current_state_events": current_state_events,
-                        "joined_members": membership_counts.get(Membership.JOIN, 0),
-                        "invited_members": membership_counts.get(Membership.INVITE, 0),
-                        "left_members": membership_counts.get(Membership.LEAVE, 0),
-                        "banned_members": membership_counts.get(Membership.BAN, 0),
-                        "state_events": total_state_events,
-                    },
-                )
-                self._simple_insert_txn(
-                    txn,
-                    "room_stats_earliest_token",
-                    {"room_id": room_id, "token": current_token},
-                )
-
-                # We've finished a room. Delete it from the table.
-                self._simple_delete_one_txn(
-                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
-                )
-
-            yield self.runInteraction("update_room_stats", _fetch_data)
-
-            # Update the remaining counter.
-            progress["remaining"] -= 1
-            yield self.runInteraction(
-                "populate_stats",
-                self._background_update_progress_txn,
-                "populate_stats_process_rooms",
-                progress,
-            )
-
-            processed_event_count += event_count
-
-            if processed_event_count > batch_size:
-                # Don't process any more rooms, we've hit our batch size.
-                return processed_event_count
-
-        return processed_event_count
-
-    def delete_all_stats(self):
+    def _update_stats_positions_txn(self, txn, positions, for_initial_processor=False):
         """
-        Delete all statistics records.
+        See L{update_stats_positions}
         """
-
-        def _delete_all_stats_txn(txn):
-            txn.execute("DELETE FROM room_state")
-            txn.execute("DELETE FROM room_stats")
-            txn.execute("DELETE FROM room_stats_earliest_token")
-            txn.execute("DELETE FROM user_stats")
-
-        return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
-
-    def get_stats_stream_pos(self):
-        return self._simple_select_one_onecol(
-            table="stats_stream_pos",
-            keyvalues={},
-            retcol="stream_id",
-            desc="stats_stream_pos",
-        )
-
-    def update_stats_stream_pos(self, stream_id):
-        return self._simple_update_one(
-            table="stats_stream_pos",
-            keyvalues={},
-            updatevalues={"stream_id": stream_id},
-            desc="update_stats_stream_pos",
+        if positions is None:
+            positions = {
+                "state_delta_stream_id": None,
+                "total_events_min_stream_ordering": None,
+                "total_events_max_stream_ordering": None,
+            }
+        return self._simple_update_one_txn(
+            txn,
+            table="stats_incremental_position",
+            keyvalues={"is_background_contract": for_initial_processor},
+            updatevalues=positions,
         )
 
     def update_room_state(self, room_id, fields):
@@ -361,42 +189,14 @@ class StatsStore(StateDeltasStore):
                 fields[col] = None
 
         return self._simple_upsert(
-            table="room_state",
+            table="room_stats_state",
             keyvalues={"room_id": room_id},
             values=fields,
             desc="update_room_state",
         )
 
-    def get_deltas_for_room(self, room_id, start, size=100):
-        """
-        Get statistics deltas for a given room.
-
-        Args:
-            room_id (str)
-            start (int): Pagination start. Number of entries, not timestamp.
-            size (int): How many entries to return.
-
-        Returns:
-            Deferred[list[dict]], where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS["room"] and "ts".
-        """
-        return self._simple_select_list_paginate(
-            "room_stats",
-            {"room_id": room_id},
-            "ts",
-            start,
-            size,
-            retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
-            order_direction="DESC",
-        )
-
-    def get_all_room_state(self):
-        return self._simple_select_list(
-            "room_state", None, retcols=("name", "topic", "canonical_alias")
-        )
-
     @cached()
-    def get_earliest_token_for_room_stats(self, room_id):
+    def get_earliest_token_for_stats(self, stats_type, id):
         """
         Fetch the "earliest token". This is used by the room stats delta
         processor to ignore deltas that have been processed between the
@@ -406,79 +206,384 @@ class StatsStore(StateDeltasStore):
         Returns:
             Deferred[int]
         """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
         return self._simple_select_one_onecol(
-            "room_stats_earliest_token",
-            {"room_id": room_id},
-            retcol="token",
+            "%s_current" % (table,),
+            {id_col: id},
+            retcol="completed_delta_stream_id",
             allow_none=True,
         )
 
-    def update_stats(self, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert(
-            table=table,
-            keyvalues={id_col: stats_id, "ts": ts},
-            values=fields,
-            desc="update_stats",
-        )
+    def update_stats_delta(
+        self, ts, stats_type, stats_id, fields, complete_with_stream_id=None
+    ):
+        """
+        Updates the statistics for a subject, with a delta (difference/relative
+        change).
+
+        Args:
+            ts (int): timestamp of the change
+            stats_type (str): "room" or "user" – the kind of subject
+            stats_id (str): the subject's ID (room ID or user ID)
+            fields (dict[str, int]): Deltas of stats values.
+            complete_with_stream_id (int, optional):
+                If supplied, converts an incomplete row into a complete row,
+                with the supplied stream_id marked as the stream_id where the
+                row was completed.
+        """
 
-    def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert_txn(
-            txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+        return self.runInteraction(
+            "update_stats_delta",
+            self._update_stats_delta_txn,
+            ts,
+            stats_type,
+            stats_id,
+            fields,
+            complete_with_stream_id=complete_with_stream_id,
         )
 
-    def update_stats_delta(self, ts, stats_type, stats_id, field, value):
-        def _update_stats_delta(txn):
-            table, id_col = TYPE_TO_ROOM[stats_type]
-
-            sql = (
-                "SELECT * FROM %s"
-                " WHERE %s=? and ts=("
-                "  SELECT MAX(ts) FROM %s"
-                "  WHERE %s=?"
-                ")"
-            ) % (table, id_col, table, id_col)
-            txn.execute(sql, (stats_id, stats_id))
-            rows = self.cursor_to_dict(txn)
-            if len(rows) == 0:
-                # silently skip as we don't have anything to apply a delta to yet.
-                # this tries to minimise any race between the initial sync and
-                # subsequent deltas arriving.
-                return
-
-            current_ts = ts
-            latest_ts = rows[0]["ts"]
-            if current_ts < latest_ts:
-                # This one is in the past, but we're just encountering it now.
-                # Mark it as part of the current bucket.
-                current_ts = latest_ts
-            elif ts != latest_ts:
-                # we have to copy our absolute counters over to the new entry.
-                values = {
-                    key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
-                }
-                values[id_col] = stats_id
-                values["ts"] = ts
-                values["bucket_size"] = self.stats_bucket_size
-
-                self._simple_insert_txn(txn, table=table, values=values)
-
-            # actually update the new value
-            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
-                self._simple_update_txn(
-                    txn,
-                    table=table,
-                    keyvalues={id_col: stats_id, "ts": current_ts},
-                    updatevalues={field: value},
+    def _update_stats_delta_txn(
+        self,
+        txn,
+        ts,
+        stats_type,
+        stats_id,
+        fields,
+        complete_with_stream_id=None,
+        absolute_field_overrides=None,
+    ):
+        """
+        See L{update_stats_delta}
+        Additional Args:
+            absolute_field_overrides (dict[str, int]): Current stats values
+                (i.e. not deltas) of absolute fields.
+                Does not work with per-slice fields.
+        """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        quantised_ts = self.quantise_stats_time(int(ts))
+        end_ts = quantised_ts + self.stats_bucket_size
+
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
+        for field in chain(fields.keys(), absolute_field_overrides.keys()):
+            if field not in abs_field_names and field not in slice_field_names:
+                # guard against potential SQL injection dodginess
+                raise ValueError(
+                    "%s is not a recognised field"
+                    " for stats type %s" % (field, stats_type)
                 )
+
+        # only absolute stats fields are tracked in the `_current` stats tables,
+        # so those are the only ones that we process deltas for when
+        # we upsert against the `_current` table.
+
+        # This calculates the deltas (`field = field + ?` values)
+        # for absolute fields,
+        # * defaulting to 0 if not specified
+        #     (required for the INSERT part of upserting to work)
+        # * omitting overrides specified in `absolute_field_overrides`
+        deltas_of_absolute_fields = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_field_overrides
+        }
+
+        if absolute_field_overrides is None:
+            absolute_field_overrides = {}
+
+        if complete_with_stream_id is not None:
+            absolute_field_overrides = absolute_field_overrides.copy()
+            absolute_field_overrides[
+                "completed_delta_stream_id"
+            ] = complete_with_stream_id
+
+        # first upsert the `_current` table
+        self._upsert_with_additive_relatives_txn(
+            txn=txn,
+            table=table + "_current",
+            keyvalues={id_col: stats_id},
+            absolutes=absolute_field_overrides,
+            additive_relatives=deltas_of_absolute_fields,
+        )
+
+        if self.has_completed_background_updates():
+            # TODO want to check specifically for stats regenerator, not all
+            #   background updates…
+            # then upsert the `_historical` table.
+            # we don't support absolute_fields for per-slice fields as it makes
+            # no sense.
+            per_slice_additive_relatives = {
+                key: fields.get(key, 0) for key in slice_field_names
+            }
+            self._upsert_copy_from_table_with_additive_relatives_txn(
+                txn=txn,
+                into_table=table + "_historical",
+                keyvalues={id_col: stats_id},
+                extra_dst_keyvalues={
+                    "end_ts": end_ts,
+                    "bucket_size": self.stats_bucket_size,
+                },
+                additive_relatives=per_slice_additive_relatives,
+                src_table=table + "_current",
+                copy_columns=abs_field_names,
+                additional_where=" AND completed_delta_stream_id IS NOT NULL",
+            )
+
+    def _upsert_with_additive_relatives_txn(
+        self, txn, table, keyvalues, absolutes, additive_relatives
+    ):
+        """Used to update values in the stats tables.
+
+        Args:
+            txn: Transaction
+            table (str): Table name
+            keyvalues (dict[str, any]): Row-identifying key values
+            absolutes (dict[str, any]): Absolute (set) fields
+            additive_relatives (dict[str, int]): Fields that will be added onto
+                if existing row present.
+        """
+        if self.database_engine.can_native_upsert:
+            absolute_updates = [
+                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+                for field in absolutes.keys()
+            ]
+
+            relative_updates = [
+                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                % {"table": table, "field": field}
+                for field in additive_relatives.keys()
+            ]
+
+            insert_cols = []
+            qargs = [table]
+
+            for (key, val) in chain(
+                keyvalues.items(), absolutes.items(), additive_relatives.items()
+            ):
+                insert_cols.append(key)
+                qargs.append(val)
+
+            sql = """
+                INSERT INTO %(table)s (%(insert_cols_cs)s)
+                VALUES (%(insert_vals_qs)s)
+                ON CONFLICT DO UPDATE SET %(updates)s
+            """ % {
+                "table": table,
+                "insert_cols_cs": ", ".join(insert_cols),
+                "insert_vals_qs": ", ".join(
+                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+                ),
+                "updates": ", ".join(chain(absolute_updates, relative_updates)),
+            }
+
+            txn.execute(sql, qargs)
+        else:
+            self.database_engine.lock_table(txn, table)
+            retcols = chain(absolutes.keys(), additive_relatives.keys())
+            current_row = self._simple_select_one_txn(
+                txn, table, keyvalues, retcols, allow_none=True
+            )
+            if current_row is None:
+                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+                self._simple_insert_txn(txn, table, merged_dict)
             else:
-                sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
-                    table,
-                    field,
-                    field,
-                    id_col,
-                )
-                txn.execute(sql, (value, stats_id, current_ts))
+                for (key, val) in additive_relatives.items():
+                    current_row[key] += val
+                current_row.update(absolutes)
+                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+    def _upsert_copy_from_table_with_additive_relatives_txn(
+        self,
+        txn,
+        into_table,
+        keyvalues,
+        extra_dst_keyvalues,
+        additive_relatives,
+        src_table,
+        copy_columns,
+        additional_where="",
+    ):
+        """
+        Args:
+             txn: Transaction
+             into_table (str): The destination table to UPSERT the row into
+             keyvalues (dict[str, any]): Row-identifying key values
+             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+                for `into_table`.
+             additive_relatives (dict[str, any]): Fields that will be added onto
+                if existing row present. (Must be disjoint from copy_columns.)
+             src_table (str): The source table to copy from
+             copy_columns (iterable[str]): The list of columns to copy
+             additional_where (str): Additional SQL for where (prefix with AND
+                if using).
+        """
+        if self.database_engine.can_native_upsert:
+            ins_columns = chain(
+                keyvalues, copy_columns, additive_relatives, extra_dst_keyvalues
+            )
+            sel_exprs = chain(
+                keyvalues,
+                copy_columns,
+                ("?" for _ in chain(additive_relatives, extra_dst_keyvalues)),
+            )
+            keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+            sets_ar = (
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f) for f in copy_columns
+            )
+
+            sql = """
+                INSERT INTO %(into_table)s (%(ins_columns)s)
+                SELECT %(sel_exprs)s
+                FROM %(src_table)s
+                WHERE %(keyvalues_where)s %(additional_where)s
+                ON CONFLICT (%(keyvalues)s)
+                DO UPDATE SET %(sets)s
+            """ % {
+                "into_table": into_table,
+                "ins_columns": ", ".join(ins_columns),
+                "sel_exprs": ", ".join(sel_exprs),
+                "keyvalues_where": " AND ".join(keyvalues_where),
+                "src_table": src_table,
+                "keyvalues": ", ".join(
+                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+                ),
+                "sets": ", ".join(chain(sets_cc, sets_ar)),
+                "additional_where": additional_where,
+            }
+
+            qargs = chain(additive_relatives.values(), keyvalues.values())
+            txn.execute(sql, qargs)
+        else:
+            self.database_engine.lock_table(txn, into_table)
+            src_row = self._simple_select_one_txn(
+                txn, src_table, keyvalues, copy_columns
+            )
+            dest_current_row = self._simple_select_one_txn(
+                txn,
+                into_table,
+                keyvalues,
+                chain(additive_relatives.keys(), copy_columns),
+                allow_none=True,
+            )
+
+            if dest_current_row is None:
+                merged_dict = {**keyvalues, **src_row, **additive_relatives}
+                self._simple_insert_txn(txn, into_table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    src_row[key] = dest_current_row[key] + val
+                self._simple_update_txn(txn, into_table, keyvalues, src_row)
+
+    def incremental_update_room_total_events_and_bytes(self, in_positions):
+        """
+        Counts the number of events and total event bytes per-room and then adds
+        these to the respective total_events and total_event_bytes room counts.
+
+        Args:
+            in_positions (dict): Positions,
+                as retrieved from L{get_stats_positions}.
+
+        Returns (Deferred[tuple[dict, bool]]):
+            First element (dict):
+                The new positions. Note that this is for reference only –
+                the new positions WILL be committed by this function.
+            Second element (bool):
+                true iff there was a change to the positions, false otherwise
+        """
+
+        def incremental_update_total_events_and_bytes_txn(txn):
+            positions = in_positions.copy()
+
+            max_pos = self.get_room_max_stream_ordering()
+            min_pos = self.get_room_min_stream_ordering()
+            self.update_total_event_and_bytes_count_between_txn(
+                txn,
+                low_pos=positions["total_events_max_stream_ordering"],
+                high_pos=max_pos,
+            )
+
+            self.update_total_event_and_bytes_count_between_txn(
+                txn,
+                low_pos=min_pos,
+                high_pos=positions["total_events_min_stream_ordering"],
+            )
+
+            if (
+                positions["total_events_max_stream_ordering"] != max_pos
+                or positions["total_events_min_stream_ordering"] != min_pos
+            ):
+                positions["total_events_max_stream_ordering"] = max_pos
+                positions["total_events_min_stream_ordering"] = min_pos
+
+                self._update_stats_positions_txn(txn, positions)
+
+                return positions, True
+            else:
+                return positions, False
+
+        return self.runInteraction(
+            "stats_incremental_total_events_and_bytes",
+            incremental_update_total_events_and_bytes_txn,
+        )
+
+    def update_total_event_and_bytes_count_between_txn(self, txn, low_pos, high_pos):
+        """
+        Updates the total_events and total_event_bytes counts for rooms,
+            in a range of stream_orderings.
+
+        Inclusivity of low_pos and high_pos is dependent upon their signs.
+        This makes it intuitive to use this function for both backfilled
+        and non-backfilled events.
 
-        return self.runInteraction("update_stats_delta", _update_stats_delta)
+        Examples:
+        (low, high) → (kind)
+        (3, 7) → 3 < … <= 7 (normal-filled; low already processed before)
+        (-4, -2) → -4 <= … < -2 (backfilled; high already processed before)
+        (-7, 7) → -7 <= … <= 7 (both)
+
+        Args:
+            txn: Database transaction.
+            low_pos: Low stream ordering
+            high_pos: High stream ordering
+        """
+
+        if low_pos >= high_pos:
+            # nothing to do here.
+            return
+
+        now = self.hs.clock.time_msec()
+
+        # we choose comparators based on the signs
+        low_comparator = "<=" if low_pos < 0 else "<"
+        high_comparator = "<" if high_pos < 0 else "<="
+
+        if isinstance(self.database_engine, PostgresEngine):
+            new_bytes_expression = "OCTET_LENGTH(json)"
+        else:
+            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+        sql = """
+            SELECT room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+            FROM events INNER JOIN event_json USING (event_id)
+            WHERE ? %s stream_ordering AND stream_ordering %s ?
+            GROUP BY room_id
+        """ % (
+            low_comparator,
+            high_comparator,
+            new_bytes_expression,
+        )
+
+        txn.execute(sql, (low_pos, high_pos))
+
+        for room_id, new_events, new_bytes in txn.fetchall():
+            self._update_stats_delta_txn(
+                txn,
+                now,
+                "room",
+                room_id,
+                {"total_events": new_events, "total_event_bytes": new_bytes},
+            )
diff --git a/tests/handlers/test_stats.py b/tests/handlers/test_stats.py
deleted file mode 100644
index a8b858eb4f..0000000000
--- a/tests/handlers/test_stats.py
+++ /dev/null
@@ -1,304 +0,0 @@
-# -*- coding: utf-8 -*-
-# Copyright 2019 New Vector Ltd
-#
-# Licensed under the Apache License, Version 2.0 (the "License");
-# you may not use this file except in compliance with the License.
-# You may obtain a copy of the License at
-#
-#     http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-from mock import Mock
-
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, Membership
-from synapse.rest import admin
-from synapse.rest.client.v1 import login, room
-
-from tests import unittest
-
-
-class StatsRoomTests(unittest.HomeserverTestCase):
-
-    servlets = [
-        admin.register_servlets_for_client_rest_resource,
-        room.register_servlets,
-        login.register_servlets,
-    ]
-
-    def prepare(self, reactor, clock, hs):
-
-        self.store = hs.get_datastore()
-        self.handler = self.hs.get_stats_handler()
-
-    def _add_background_updates(self):
-        """
-        Add the background updates we need to run.
-        """
-        # Ugh, have to reset this flag
-        self.store._all_done = False
-
-        self.get_success(
-            self.store._simple_insert(
-                "background_updates",
-                {"update_name": "populate_stats_createtables", "progress_json": "{}"},
-            )
-        )
-        self.get_success(
-            self.store._simple_insert(
-                "background_updates",
-                {
-                    "update_name": "populate_stats_process_rooms",
-                    "progress_json": "{}",
-                    "depends_on": "populate_stats_createtables",
-                },
-            )
-        )
-        self.get_success(
-            self.store._simple_insert(
-                "background_updates",
-                {
-                    "update_name": "populate_stats_cleanup",
-                    "progress_json": "{}",
-                    "depends_on": "populate_stats_process_rooms",
-                },
-            )
-        )
-
-    def test_initial_room(self):
-        """
-        The background updates will build the table from scratch.
-        """
-        r = self.get_success(self.store.get_all_room_state())
-        self.assertEqual(len(r), 0)
-
-        # Disable stats
-        self.hs.config.stats_enabled = False
-        self.handler.stats_enabled = False
-
-        u1 = self.register_user("u1", "pass")
-        u1_token = self.login("u1", "pass")
-
-        room_1 = self.helper.create_room_as(u1, tok=u1_token)
-        self.helper.send_state(
-            room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
-        )
-
-        # Stats disabled, shouldn't have done anything
-        r = self.get_success(self.store.get_all_room_state())
-        self.assertEqual(len(r), 0)
-
-        # Enable stats
-        self.hs.config.stats_enabled = True
-        self.handler.stats_enabled = True
-
-        # Do the initial population of the user directory via the background update
-        self._add_background_updates()
-
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
-
-        r = self.get_success(self.store.get_all_room_state())
-
-        self.assertEqual(len(r), 1)
-        self.assertEqual(r[0]["topic"], "foo")
-
-    def test_initial_earliest_token(self):
-        """
-        Ingestion via notify_new_event will ignore tokens that the background
-        update have already processed.
-        """
-        self.reactor.advance(86401)
-
-        self.hs.config.stats_enabled = False
-        self.handler.stats_enabled = False
-
-        u1 = self.register_user("u1", "pass")
-        u1_token = self.login("u1", "pass")
-
-        u2 = self.register_user("u2", "pass")
-        u2_token = self.login("u2", "pass")
-
-        u3 = self.register_user("u3", "pass")
-        u3_token = self.login("u3", "pass")
-
-        room_1 = self.helper.create_room_as(u1, tok=u1_token)
-        self.helper.send_state(
-            room_1, event_type="m.room.topic", body={"topic": "foo"}, tok=u1_token
-        )
-
-        # Begin the ingestion by creating the temp tables. This will also store
-        # the position that the deltas should begin at, once they take over.
-        self.hs.config.stats_enabled = True
-        self.handler.stats_enabled = True
-        self.store._all_done = False
-        self.get_success(self.store.update_stats_stream_pos(None))
-
-        self.get_success(
-            self.store._simple_insert(
-                "background_updates",
-                {"update_name": "populate_stats_createtables", "progress_json": "{}"},
-            )
-        )
-
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
-
-        # Now, before the table is actually ingested, add some more events.
-        self.helper.invite(room=room_1, src=u1, targ=u2, tok=u1_token)
-        self.helper.join(room=room_1, user=u2, tok=u2_token)
-
-        # Now do the initial ingestion.
-        self.get_success(
-            self.store._simple_insert(
-                "background_updates",
-                {"update_name": "populate_stats_process_rooms", "progress_json": "{}"},
-            )
-        )
-        self.get_success(
-            self.store._simple_insert(
-                "background_updates",
-                {
-                    "update_name": "populate_stats_cleanup",
-                    "progress_json": "{}",
-                    "depends_on": "populate_stats_process_rooms",
-                },
-            )
-        )
-
-        self.store._all_done = False
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
-
-        self.reactor.advance(86401)
-
-        # Now add some more events, triggering ingestion. Because of the stream
-        # position being set to before the events sent in the middle, a simpler
-        # implementation would reprocess those events, and say there were four
-        # users, not three.
-        self.helper.invite(room=room_1, src=u1, targ=u3, tok=u1_token)
-        self.helper.join(room=room_1, user=u3, tok=u3_token)
-
-        # Get the deltas! There should be two -- day 1, and day 2.
-        r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
-
-        # The oldest has 2 joined members
-        self.assertEqual(r[-1]["joined_members"], 2)
-
-        # The newest has 3
-        self.assertEqual(r[0]["joined_members"], 3)
-
-    def test_incorrect_state_transition(self):
-        """
-        If the state transition is not one of (JOIN, INVITE, LEAVE, BAN) to
-        (JOIN, INVITE, LEAVE, BAN), an error is raised.
-        """
-        events = {
-            "a1": {"membership": Membership.LEAVE},
-            "a2": {"membership": "not a real thing"},
-        }
-
-        def get_event(event_id, allow_none=True):
-            m = Mock()
-            m.content = events[event_id]
-            d = defer.Deferred()
-            self.reactor.callLater(0.0, d.callback, m)
-            return d
-
-        def get_received_ts(event_id):
-            return defer.succeed(1)
-
-        self.store.get_received_ts = get_received_ts
-        self.store.get_event = get_event
-
-        deltas = [
-            {
-                "type": EventTypes.Member,
-                "state_key": "some_user",
-                "room_id": "room",
-                "event_id": "a1",
-                "prev_event_id": "a2",
-                "stream_id": 60,
-            }
-        ]
-
-        f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
-        self.assertEqual(
-            f.value.args[0], "'not a real thing' is not a valid prev_membership"
-        )
-
-        # And the other way...
-        deltas = [
-            {
-                "type": EventTypes.Member,
-                "state_key": "some_user",
-                "room_id": "room",
-                "event_id": "a2",
-                "prev_event_id": "a1",
-                "stream_id": 100,
-            }
-        ]
-
-        f = self.get_failure(self.handler._handle_deltas(deltas), ValueError)
-        self.assertEqual(
-            f.value.args[0], "'not a real thing' is not a valid membership"
-        )
-
-    def test_redacted_prev_event(self):
-        """
-        If the prev_event does not exist, then it is assumed to be a LEAVE.
-        """
-        u1 = self.register_user("u1", "pass")
-        u1_token = self.login("u1", "pass")
-
-        room_1 = self.helper.create_room_as(u1, tok=u1_token)
-
-        # Do the initial population of the user directory via the background update
-        self._add_background_updates()
-
-        while not self.get_success(self.store.has_completed_background_updates()):
-            self.get_success(self.store.do_next_background_update(100), by=0.1)
-
-        events = {"a1": None, "a2": {"membership": Membership.JOIN}}
-
-        def get_event(event_id, allow_none=True):
-            if events.get(event_id):
-                m = Mock()
-                m.content = events[event_id]
-            else:
-                m = None
-            d = defer.Deferred()
-            self.reactor.callLater(0.0, d.callback, m)
-            return d
-
-        def get_received_ts(event_id):
-            return defer.succeed(1)
-
-        self.store.get_received_ts = get_received_ts
-        self.store.get_event = get_event
-
-        deltas = [
-            {
-                "type": EventTypes.Member,
-                "state_key": "some_user:test",
-                "room_id": room_1,
-                "event_id": "a2",
-                "prev_event_id": "a1",
-                "stream_id": 100,
-            }
-        ]
-
-        # Handle our fake deltas, which has a user going from LEAVE -> JOIN.
-        self.get_success(self.handler._handle_deltas(deltas))
-
-        # One delta, with two joined members -- the room creator, and our fake
-        # user.
-        r = self.get_success(self.store.get_deltas_for_room(room_1, 0))
-        self.assertEqual(len(r), 1)
-        self.assertEqual(r[0]["joined_members"], 2)