summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
Diffstat (limited to 'synapse')
-rw-r--r--synapse/config/stats.py13
-rw-r--r--synapse/handlers/stats.py307
-rw-r--r--synapse/storage/events.py5
-rw-r--r--synapse/storage/registration.py12
-rw-r--r--synapse/storage/roommember.py44
-rw-r--r--synapse/storage/schema/delta/56/stats_separated.sql152
-rw-r--r--synapse/storage/stats.py1036
7 files changed, 1041 insertions, 528 deletions
diff --git a/synapse/config/stats.py b/synapse/config/stats.py
index b518a3ed9c..b18ddbd1fa 100644
--- a/synapse/config/stats.py
+++ b/synapse/config/stats.py
@@ -27,19 +27,16 @@ class StatsConfig(Config):
 
     def read_config(self, config, **kwargs):
         self.stats_enabled = True
-        self.stats_bucket_size = 86400
+        self.stats_bucket_size = 86400 * 1000
         self.stats_retention = sys.maxsize
         stats_config = config.get("stats", None)
         if stats_config:
             self.stats_enabled = stats_config.get("enabled", self.stats_enabled)
-            self.stats_bucket_size = (
-                self.parse_duration(stats_config.get("bucket_size", "1d")) / 1000
+            self.stats_bucket_size = self.parse_duration(
+                stats_config.get("bucket_size", "1d")
             )
-            self.stats_retention = (
-                self.parse_duration(
-                    stats_config.get("retention", "%ds" % (sys.maxsize,))
-                )
-                / 1000
+            self.stats_retention = self.parse_duration(
+                stats_config.get("retention", "%ds" % (sys.maxsize,))
             )
 
     def generate_config_section(self, config_dir_path, server_name, **kwargs):
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..921735edb3 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -14,15 +14,14 @@
 # limitations under the License.
 
 import logging
+from collections import Counter
 
 from twisted.internet import defer
 
-from synapse.api.constants import EventTypes, JoinRules, Membership
+from synapse.api.constants import EventTypes, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
 from synapse.metrics import event_processing_positions
 from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID
-from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -62,11 +61,10 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled:
+        if not self.hs.config.stats_enabled or self._is_processing:
             return
 
-        if self._is_processing:
-            return
+        self._is_processing = True
 
         @defer.inlineCallbacks
         def process():
@@ -75,39 +73,72 @@ class StatsHandler(StateDeltasHandler):
             finally:
                 self._is_processing = False
 
-        self._is_processing = True
         run_as_background_process("stats.notify_new_event", process)
 
     @defer.inlineCallbacks
     def _unsafe_process(self):
         # If self.pos is None then means we haven't fetched it from DB
         if self.pos is None:
-            self.pos = yield self.store.get_stats_stream_pos()
-
-        # If still None then the initial background update hasn't happened yet
-        if self.pos is None:
-            return None
+            self.pos = yield self.store.get_stats_positions()
 
         # Loop round handling deltas until we're up to date
+
         while True:
-            with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
-                    return
+            deltas = yield self.store.get_current_state_deltas(self.pos)
+
+            if deltas:
+                logger.debug("Handling %d state deltas", len(deltas))
+                room_deltas, user_deltas = yield self._handle_deltas(deltas)
+
+                max_pos = deltas[-1]["stream_id"]
+            else:
+                room_deltas = {}
+                user_deltas = {}
+                max_pos = yield self.store.get_room_max_stream_ordering()
 
-                logger.info("Handling %d state deltas", len(deltas))
-                yield self._handle_deltas(deltas)
+            # Then count deltas for total_events and total_event_bytes.
+            room_count, user_count = yield self.store.get_changes_room_total_events_and_bytes(
+                self.pos, max_pos
+            )
+
+            for room_id, fields in room_count.items():
+                room_deltas.setdefault(room_id, {}).update(fields)
+
+            for user_id, fields in user_count.items():
+                user_deltas.setdefault(user_id, {}).update(fields)
+
+            logger.debug("room_deltas: %s", room_deltas)
+            logger.debug("user_deltas: %s", user_deltas)
 
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
+            # Always call this so that we update the stats position.
+            yield self.store.bulk_update_stats_delta(
+                self.clock.time_msec(),
+                updates={"room": room_deltas, "user": user_deltas},
+                stream_id=max_pos,
+            )
+
+            event_processing_positions.labels("stats").set(max_pos)
 
-                event_processing_positions.labels("stats").set(self.pos)
+            if self.pos == max_pos:
+                break
+
+            self.pos = max_pos
 
     @defer.inlineCallbacks
     def _handle_deltas(self, deltas):
+        """Called with the state deltas to process
+
+        Returns:
+            Deferred[tuple[dict[str, Counter], dict[str, counter]]]
+            Resovles to two dicts, the room deltas and the user deltas,
+            mapping from room/user ID to changes in the various fields.
         """
-        Called with the state deltas to process
-        """
+
+        room_to_stats_deltas = {}
+        user_to_stats_deltas = {}
+
+        room_to_state_updates = {}
+
         for delta in deltas:
             typ = delta["type"]
             state_key = delta["state_key"]
@@ -115,11 +146,10 @@ class StatsHandler(StateDeltasHandler):
             event_id = delta["event_id"]
             stream_id = delta["stream_id"]
             prev_event_id = delta["prev_event_id"]
-            stream_pos = delta["stream_id"]
 
-            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
+            logger.debug("Handling: %r, %r %r, %s", room_id, typ, state_key, event_id)
 
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
+            token = yield self.store.get_earliest_token_for_stats("room", room_id)
 
             # If the earliest token to begin from is larger than our current
             # stream ID, skip processing this delta.
@@ -131,203 +161,130 @@ class StatsHandler(StateDeltasHandler):
                 continue
 
             if event_id is None and prev_event_id is None:
-                # Errr...
+                logger.error(
+                    "event ID is None and so is the previous event ID. stream_id: %s",
+                    stream_id,
+                )
                 continue
 
             event_content = {}
 
+            sender = None
             if event_id is not None:
                 event = yield self.store.get_event(event_id, allow_none=True)
                 if event:
                     event_content = event.content or {}
+                    sender = event.sender
+
+            # All the values in this dict are deltas (RELATIVE changes)
+            room_stats_delta = room_to_stats_deltas.setdefault(room_id, Counter())
 
-            # We use stream_pos here rather than fetch by event_id as event_id
-            # may be None
-            now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
+            room_state = room_to_state_updates.setdefault(room_id, {})
 
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
+            if prev_event_id is None:
+                # this state event doesn't overwrite another,
+                # so it is a new effective/current state event
+                room_stats_delta["current_state_events"] += 1
 
             if typ == EventTypes.Member:
                 # we could use _get_key_change here but it's a bit inefficient
                 # given we're not testing for a specific result; might as well
                 # just grab the prev_membership and membership strings and
                 # compare them.
-                prev_event_content = {}
+                # We take None rather than leave as a previous membership
+                # in the absence of a previous event because we do not want to
+                # reduce the leave count when a new-to-the-room user joins.
+                prev_membership = None
                 if prev_event_id is not None:
                     prev_event = yield self.store.get_event(
                         prev_event_id, allow_none=True
                     )
                     if prev_event:
                         prev_event_content = prev_event.content
+                        prev_membership = prev_event_content.get(
+                            "membership", Membership.LEAVE
+                        )
 
                 membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
-
-                if prev_membership == membership:
-                    continue
 
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
-                    )
+                if prev_membership is None:
+                    logger.debug("No previous membership for this user.")
+                elif membership == prev_membership:
+                    pass  # noop
+                elif prev_membership == Membership.JOIN:
+                    room_stats_delta["joined_members"] -= 1
                 elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
-                    )
+                    room_stats_delta["invited_members"] -= 1
                 elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
-                    )
+                    room_stats_delta["left_members"] -= 1
                 elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
-                    )
+                    room_stats_delta["banned_members"] -= 1
                 else:
-                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError(
+                        "%r is not a valid prev_membership" % (prev_membership,)
+                    )
 
+                if membership == prev_membership:
+                    pass  # noop
                 if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
-                    )
+                    room_stats_delta["joined_members"] += 1
                 elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
-                    )
+                    room_stats_delta["invited_members"] += 1
+
+                    if sender and self.is_mine_id(sender):
+                        user_to_stats_deltas.setdefault(sender, Counter())[
+                            "invites_sent"
+                        ] += 1
+
                 elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
-                    )
+                    room_stats_delta["left_members"] += 1
                 elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
-                    )
+                    room_stats_delta["banned_members"] += 1
                 else:
-                    err = "%s is not a valid membership" % (repr(membership),)
-                    logger.error(err)
-                    raise ValueError(err)
+                    raise ValueError("%r is not a valid membership" % (membership,))
 
                 user_id = state_key
                 if self.is_mine_id(user_id):
-                    # update user_stats as it's one of our users
-                    public = yield self._is_public_room(room_id)
-
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
-                        )
+                    # this accounts for transitions like leave → ban and so on.
+                    has_changed_joinedness = (prev_membership == Membership.JOIN) != (
+                        membership == Membership.JOIN
+                    )
 
-            elif typ == EventTypes.Create:
-                # Newly created room. Add it with all blank portions.
-                yield self.store.update_room_state(
-                    room_id,
-                    {
-                        "join_rules": None,
-                        "history_visibility": None,
-                        "encryption": None,
-                        "name": None,
-                        "topic": None,
-                        "avatar": None,
-                        "canonical_alias": None,
-                    },
-                )
+                    if has_changed_joinedness:
+                        delta = +1 if membership == Membership.JOIN else -1
 
-            elif typ == EventTypes.JoinRules:
-                yield self.store.update_room_state(
-                    room_id, {"join_rules": event_content.get("join_rule")}
-                )
+                        user_to_stats_deltas.setdefault(user_id, Counter())[
+                            "joined_rooms"
+                        ] += delta
 
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
-                )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
+                        room_stats_delta["local_users_in_room"] += delta
 
+            elif typ == EventTypes.Create:
+                room_state["is_federatable"] = event_content.get("m.federate", True)
+                if sender and self.is_mine_id(sender):
+                    user_to_stats_deltas.setdefault(sender, Counter())[
+                        "rooms_created"
+                    ] += 1
+            elif typ == EventTypes.JoinRules:
+                room_state["join_rules"] = event_content.get("join_rule")
             elif typ == EventTypes.RoomHistoryVisibility:
-                yield self.store.update_room_state(
-                    room_id,
-                    {"history_visibility": event_content.get("history_visibility")},
-                )
-
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
+                room_state["history_visibility"] = event_content.get(
+                    "history_visibility"
                 )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
-
             elif typ == EventTypes.Encryption:
-                yield self.store.update_room_state(
-                    room_id, {"encryption": event_content.get("algorithm")}
-                )
+                room_state["encryption"] = event_content.get("algorithm")
             elif typ == EventTypes.Name:
-                yield self.store.update_room_state(
-                    room_id, {"name": event_content.get("name")}
-                )
+                room_state["name"] = event_content.get("name")
             elif typ == EventTypes.Topic:
-                yield self.store.update_room_state(
-                    room_id, {"topic": event_content.get("topic")}
-                )
+                room_state["topic"] = event_content.get("topic")
             elif typ == EventTypes.RoomAvatar:
-                yield self.store.update_room_state(
-                    room_id, {"avatar": event_content.get("url")}
-                )
+                room_state["avatar"] = event_content.get("url")
             elif typ == EventTypes.CanonicalAlias:
-                yield self.store.update_room_state(
-                    room_id, {"canonical_alias": event_content.get("alias")}
-                )
+                room_state["canonical_alias"] = event_content.get("alias")
+            elif typ == EventTypes.GuestAccess:
+                room_state["guest_access"] = event_content.get("guest_access")
 
-    @defer.inlineCallbacks
-    def update_public_room_stats(self, ts, room_id, is_public):
-        """
-        Increment/decrement a user's number of public rooms when a room they are
-        in changes to/from public visibility.
+        for room_id, state in room_to_state_updates.items():
+            yield self.store.update_room_state(room_id, state)
 
-        Args:
-            ts (int): Timestamp in seconds
-            room_id (str)
-            is_public (bool)
-        """
-        # For now, blindly iterate over all local users in the room so that
-        # we can handle the whole problem of copying buckets over as needed
-        user_ids = yield self.store.get_users_in_room(room_id)
-
-        for user_id in user_ids:
-            if self.hs.is_mine(UserID.from_string(user_id)):
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
-                )
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
-                )
-
-    @defer.inlineCallbacks
-    def _is_public_room(self, room_id):
-        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
-        history_visibility = yield self.state.get_current_state(
-            room_id, EventTypes.RoomHistoryVisibility
-        )
-
-        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
-            (
-                history_visibility
-                and history_visibility.content.get("history_visibility")
-                == "world_readable"
-            )
-        ):
-            return True
-        else:
-            return False
+        return room_to_stats_deltas, user_to_stats_deltas
diff --git a/synapse/storage/events.py b/synapse/storage/events.py
index 32050868ff..1958afe1d7 100644
--- a/synapse/storage/events.py
+++ b/synapse/storage/events.py
@@ -2270,8 +2270,9 @@ class EventsStore(
             "room_aliases",
             "room_depth",
             "room_memberships",
-            "room_state",
-            "room_stats",
+            "room_stats_state",
+            "room_stats_current",
+            "room_stats_historical",
             "room_stats_earliest_token",
             "rooms",
             "stream_ordering_to_exterm",
diff --git a/synapse/storage/registration.py b/synapse/storage/registration.py
index 3f50324253..2d3c7e2dc9 100644
--- a/synapse/storage/registration.py
+++ b/synapse/storage/registration.py
@@ -869,6 +869,17 @@ class RegistrationStore(
                 (user_id_obj.localpart, create_profile_with_displayname),
             )
 
+        if self.hs.config.stats_enabled:
+            # we create a new completed user statistics row
+
+            # we don't strictly need current_token since this user really can't
+            # have any state deltas before now (as it is a new user), but still,
+            # we include it for completeness.
+            current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
+            self._update_stats_delta_txn(
+                txn, now, "user", user_id, {}, complete_with_stream_id=current_token
+            )
+
         self._invalidate_cache_and_stream(txn, self.get_user_by_id, (user_id,))
         txn.call_after(self.is_guest.invalidate, (user_id,))
 
@@ -1140,6 +1151,7 @@ class RegistrationStore(
             deferred str|None: A str representing a link to redirect the user
             to if there is one.
         """
+
         # Insert everything into a transaction in order to run atomically
         def validate_threepid_session_txn(txn):
             row = self._simple_select_one_txn(
diff --git a/synapse/storage/roommember.py b/synapse/storage/roommember.py
index eecb276465..f8b682ebd9 100644
--- a/synapse/storage/roommember.py
+++ b/synapse/storage/roommember.py
@@ -112,29 +112,31 @@ class RoomMemberWorkerStore(EventsWorkerStore):
 
     @cached(max_entries=100000, iterable=True)
     def get_users_in_room(self, room_id):
-        def f(txn):
-            # If we can assume current_state_events.membership is up to date
-            # then we can avoid a join, which is a Very Good Thing given how
-            # frequently this function gets called.
-            if self._current_state_events_membership_up_to_date:
-                sql = """
-                    SELECT state_key FROM current_state_events
-                    WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
-                """
-            else:
-                sql = """
-                    SELECT state_key FROM room_memberships as m
-                    INNER JOIN current_state_events as c
-                    ON m.event_id = c.event_id
-                    AND m.room_id = c.room_id
-                    AND m.user_id = c.state_key
-                    WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
-                """
+        return self.runInteraction(
+            "get_users_in_room", self.get_users_in_room_txn, room_id
+        )
 
-            txn.execute(sql, (room_id, Membership.JOIN))
-            return [to_ascii(r[0]) for r in txn]
+    def get_users_in_room_txn(self, txn, room_id):
+        # If we can assume current_state_events.membership is up to date
+        # then we can avoid a join, which is a Very Good Thing given how
+        # frequently this function gets called.
+        if self._current_state_events_membership_up_to_date:
+            sql = """
+                SELECT state_key FROM current_state_events
+                WHERE type = 'm.room.member' AND room_id = ? AND membership = ?
+            """
+        else:
+            sql = """
+                SELECT state_key FROM room_memberships as m
+                INNER JOIN current_state_events as c
+                ON m.event_id = c.event_id
+                AND m.room_id = c.room_id
+                AND m.user_id = c.state_key
+                WHERE c.type = 'm.room.member' AND c.room_id = ? AND m.membership = ?
+            """
 
-        return self.runInteraction("get_users_in_room", f)
+        txn.execute(sql, (room_id, Membership.JOIN))
+        return [to_ascii(r[0]) for r in txn]
 
     @cached(max_entries=100000)
     def get_room_summary(self, room_id):
diff --git a/synapse/storage/schema/delta/56/stats_separated.sql b/synapse/storage/schema/delta/56/stats_separated.sql
new file mode 100644
index 0000000000..163529c071
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated.sql
@@ -0,0 +1,152 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS room_state;
+DROP TABLE IF EXISTS room_stats_state;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+    'populate_stats_createtables',
+    'populate_stats_process_rooms',
+    'populate_stats_process_users',
+    'populate_stats_cleanup'
+);
+
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_rooms', '{}', '');
+
+INSERT INTO background_updates (update_name, progress_json, depends_on) VALUES
+    ('populate_stats_process_users', '{}', 'populate_stats_process_rooms');
+
+----- Create tables for our version of room stats.
+
+-- single-row table to track position of incremental updates
+DROP TABLE IF EXISTS stats_incremental_position;
+CREATE TABLE stats_incremental_position (
+    Lock CHAR(1) NOT NULL DEFAULT 'X' UNIQUE,  -- Makes sure this table only has one row.
+    stream_id  BIGINT NOT NULL,
+    CHECK (Lock='X')
+);
+
+-- insert a null row and make sure it is the only one.
+INSERT INTO stats_incremental_position (
+    stream_id
+) SELECT COALESCE(MAX(stream_ordering), 0) from events;
+
+-- represents PRESENT room statistics for a room
+-- only holds absolute fields
+DROP TABLE IF EXISTS room_stats_current;
+CREATE TABLE room_stats_current (
+    room_id TEXT NOT NULL PRIMARY KEY,
+
+    -- These are absolute counts
+    current_state_events INT NOT NULL,
+    joined_members INT NOT NULL,
+    invited_members INT NOT NULL,
+    left_members INT NOT NULL,
+    banned_members INT NOT NULL,
+
+    local_users_in_room INT NOT NULL,
+
+    -- The maximum delta stream position that this row takes into account.
+    completed_delta_stream_id BIGINT NOT NULL
+);
+
+
+-- represents HISTORICAL room statistics for a room
+DROP TABLE IF EXISTS room_stats_historical;
+CREATE TABLE room_stats_historical (
+    room_id TEXT NOT NULL,
+    -- These stats cover the time from (end_ts - bucket_size)...end_ts (in ms).
+    -- Note that end_ts is quantised.
+    end_ts BIGINT NOT NULL,
+    bucket_size BIGINT NOT NULL,
+
+    -- These stats are absolute counts
+    current_state_events BIGINT NOT NULL,
+    joined_members BIGINT NOT NULL,
+    invited_members BIGINT NOT NULL,
+    left_members BIGINT NOT NULL,
+    banned_members BIGINT NOT NULL,
+    local_users_in_room BIGINT NOT NULL,
+
+    -- These stats are per time slice
+    total_events BIGINT NOT NULL,
+    total_event_bytes BIGINT NOT NULL,
+
+    PRIMARY KEY (room_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient room stats.
+CREATE INDEX room_stats_historical_end_ts ON room_stats_historical (end_ts);
+
+-- represents PRESENT statistics for a user
+-- only holds absolute fields
+DROP TABLE IF EXISTS user_stats_current;
+CREATE TABLE user_stats_current (
+    user_id TEXT NOT NULL PRIMARY KEY,
+
+    joined_rooms BIGINT NOT NULL,
+
+    -- The maximum delta stream position that this row takes into account.
+    completed_delta_stream_id BIGINT NOT NULL
+);
+
+-- represents HISTORICAL statistics for a user
+DROP TABLE IF EXISTS user_stats_historical;
+CREATE TABLE user_stats_historical (
+    user_id TEXT NOT NULL,
+    end_ts BIGINT NOT NULL,
+    bucket_size BIGINT NOT NULL,
+
+    joined_rooms BIGINT NOT NULL,
+
+    invites_sent BIGINT NOT NULL,
+    rooms_created BIGINT NOT NULL,
+    total_events BIGINT NOT NULL,
+    total_event_bytes BIGINT NOT NULL,
+
+    PRIMARY KEY (user_id, end_ts)
+);
+
+-- We use this index to speed up deletion of ancient user stats.
+CREATE INDEX user_stats_historical_end_ts ON user_stats_historical (end_ts);
+
+
+CREATE TABLE room_stats_state (
+    room_id TEXT NOT NULL,
+    name TEXT,
+    canonical_alias TEXT,
+    join_rules TEXT,
+    history_visibility TEXT,
+    encryption TEXT,
+    avatar TEXT,
+    guest_access TEXT,
+    is_federatable BOOLEAN,
+    topic TEXT
+);
+
+CREATE UNIQUE INDEX room_stats_state_room ON room_stats_state(room_id);
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..6560173c08 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -1,5 +1,6 @@
 # -*- coding: utf-8 -*-
 # Copyright 2018, 2019 New Vector Ltd
+# Copyright 2019 The Matrix.org Foundation C.I.C.
 #
 # Licensed under the Apache License, Version 2.0 (the "License");
 # you may not use this file except in compliance with the License.
@@ -14,17 +15,22 @@
 # limitations under the License.
 
 import logging
+from itertools import chain
 
 from twisted.internet import defer
+from twisted.internet.defer import DeferredLock
 
 from synapse.api.constants import EventTypes, Membership
-from synapse.storage.prepare_database import get_statements
+from synapse.storage import PostgresEngine
 from synapse.storage.state_deltas import StateDeltasStore
 from synapse.util.caches.descriptors import cached
 
 logger = logging.getLogger(__name__)
 
 # these fields track absolutes (e.g. total number of rooms on the server)
+# You can think of these as Prometheus Gauges.
+# You can draw these stats on a line graph.
+# Example: number of users in a room
 ABSOLUTE_STATS_FIELDS = {
     "room": (
         "current_state_events",
@@ -32,14 +38,23 @@ ABSOLUTE_STATS_FIELDS = {
         "invited_members",
         "left_members",
         "banned_members",
-        "state_events",
+        "local_users_in_room",
     ),
-    "user": ("public_rooms", "private_rooms"),
+    "user": ("joined_rooms",),
 }
 
-TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
+# these fields are per-timeslice and so should be reset to 0 upon a new slice
+# You can draw these stats on a histogram.
+# Example: number of events sent locally during a time slice
+PER_SLICE_FIELDS = {
+    "room": ("total_events", "total_event_bytes"),
+    "user": ("invites_sent", "rooms_created", "total_events", "total_event_bytes"),
+}
+
+TYPE_TO_TABLE = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
-TEMP_TABLE = "_temp_populate_stats"
+# these are the tables (& ID columns) which contain our actual subjects
+TYPE_TO_ORIGIN_TABLE = {"room": ("rooms", "room_id"), "user": ("users", "name")}
 
 
 class StatsStore(StateDeltasStore):
@@ -51,136 +66,102 @@ class StatsStore(StateDeltasStore):
         self.stats_enabled = hs.config.stats_enabled
         self.stats_bucket_size = hs.config.stats_bucket_size
 
-        self.register_background_update_handler(
-            "populate_stats_createtables", self._populate_stats_createtables
-        )
+        self.stats_delta_processing_lock = DeferredLock()
+
         self.register_background_update_handler(
             "populate_stats_process_rooms", self._populate_stats_process_rooms
         )
         self.register_background_update_handler(
-            "populate_stats_cleanup", self._populate_stats_cleanup
+            "populate_stats_process_users", self._populate_stats_process_users
         )
+        # we no longer need to perform clean-up, but we will give ourselves
+        # the potential to reintroduce it in the future – so documentation
+        # will still encourage the use of this no-op handler.
+        self.register_noop_background_update("populate_stats_cleanup")
+        self.register_noop_background_update("populate_stats_prepare")
 
-    @defer.inlineCallbacks
-    def _populate_stats_createtables(self, progress, batch_size):
-
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_createtables")
-            return 1
-
-        # Get all the rooms that we want to process.
-        def _make_staging_area(txn):
-            # Create the temporary tables
-            stmts = get_statements(
-                """
-                -- We just recreate the table, we'll be reinserting the
-                -- correct entries again later anyway.
-                DROP TABLE IF EXISTS {temp}_rooms;
-
-                CREATE TABLE IF NOT EXISTS {temp}_rooms(
-                    room_id TEXT NOT NULL,
-                    events BIGINT NOT NULL
-                );
-
-                CREATE INDEX {temp}_rooms_events
-                    ON {temp}_rooms(events);
-                CREATE INDEX {temp}_rooms_id
-                    ON {temp}_rooms(room_id);
-            """.format(
-                    temp=TEMP_TABLE
-                ).splitlines()
-            )
-
-            for statement in stmts:
-                txn.execute(statement)
-
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_position(position TEXT NOT NULL)"
-            )
-            txn.execute(sql)
-
-            # Get rooms we want to process from the database, only adding
-            # those that we haven't (i.e. those not in room_stats_earliest_token)
-            sql = """
-                INSERT INTO %s_rooms (room_id, events)
-                SELECT c.room_id, count(*) FROM current_state_events AS c
-                LEFT JOIN room_stats_earliest_token AS t USING (room_id)
-                WHERE t.room_id IS NULL
-                GROUP BY c.room_id
-            """ % (
-                TEMP_TABLE,
-            )
-            txn.execute(sql)
+    def quantise_stats_time(self, ts):
+        """
+        Quantises a timestamp to be a multiple of the bucket size.
 
-        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
-        yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
-        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
-        self.get_earliest_token_for_room_stats.invalidate_all()
+        Args:
+            ts (int): the timestamp to quantise, in milliseconds since the Unix
+                Epoch
 
-        yield self._end_background_update("populate_stats_createtables")
-        return 1
+        Returns:
+            int: a timestamp which
+              - is divisible by the bucket size;
+              - is no later than `ts`; and
+              - is the largest such timestamp.
+        """
+        return (ts // self.stats_bucket_size) * self.stats_bucket_size
 
     @defer.inlineCallbacks
-    def _populate_stats_cleanup(self, progress, batch_size):
+    def _populate_stats_process_users(self, progress, batch_size):
         """
-        Update the user directory stream position, then clean up the old tables.
+        This is a background update which regenerates statistics for users.
         """
         if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_cleanup")
+            yield self._end_background_update("populate_stats_process_users")
             return 1
 
-        position = yield self._simple_select_one_onecol(
-            TEMP_TABLE + "_position", None, "position"
+        last_user_id = progress.get("last_user_id", "")
+
+        def _get_next_batch(txn):
+            sql = """
+                    SELECT DISTINCT name FROM users
+                    WHERE name > ?
+                    ORDER BY name ASC
+                    LIMIT ?
+                """
+            txn.execute(sql, (last_user_id, batch_size))
+            return [r for r, in txn]
+
+        users_to_work_on = yield self.runInteraction(
+            "_populate_stats_process_users", _get_next_batch
         )
-        yield self.update_stats_stream_pos(position)
 
-        def _delete_staging_area(txn):
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
+        # No more rooms -- complete the transaction.
+        if not users_to_work_on:
+            yield self._end_background_update("populate_stats_process_users")
+            return 1
 
-        yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
+        for user_id in users_to_work_on:
+            yield self._calculate_and_set_initial_state_for_user(user_id)
+            progress["last_user_id"] = user_id
 
-        yield self._end_background_update("populate_stats_cleanup")
-        return 1
+        yield self.runInteraction(
+            "populate_stats_process_users",
+            self._background_update_progress_txn,
+            "populate_stats_process_users",
+            progress,
+        )
+
+        return len(users_to_work_on)
 
     @defer.inlineCallbacks
     def _populate_stats_process_rooms(self, progress, batch_size):
-
+        """
+        This is a background update which regenerates statistics for rooms.
+        """
         if not self.stats_enabled:
             yield self._end_background_update("populate_stats_process_rooms")
             return 1
 
-        # If we don't have progress filed, delete everything.
-        if not progress:
-            yield self.delete_all_stats()
+        last_room_id = progress.get("last_room_id", "")
 
         def _get_next_batch(txn):
-            # Only fetch 250 rooms, so we don't fetch too many at once, even
-            # if those 250 rooms have less than batch_size state events.
             sql = """
-                SELECT room_id, events FROM %s_rooms
-                ORDER BY events DESC
-                LIMIT 250
-            """ % (
-                TEMP_TABLE,
-            )
-            txn.execute(sql)
-            rooms_to_work_on = txn.fetchall()
-
-            if not rooms_to_work_on:
-                return None
-
-            # Get how many are left to process, so we can give status on how
-            # far we are in processing
-            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
-            progress["remaining"] = txn.fetchone()[0]
-
-            return rooms_to_work_on
+                    SELECT DISTINCT room_id FROM current_state_events
+                    WHERE room_id > ?
+                    ORDER BY room_id ASC
+                    LIMIT ?
+                """
+            txn.execute(sql, (last_room_id, batch_size))
+            return [r for r, in txn]
 
         rooms_to_work_on = yield self.runInteraction(
-            "populate_stats_temp_read", _get_next_batch
+            "populate_stats_rooms_get_batch", _get_next_batch
         )
 
         # No more rooms -- complete the transaction.
@@ -188,154 +169,28 @@ class StatsStore(StateDeltasStore):
             yield self._end_background_update("populate_stats_process_rooms")
             return 1
 
-        logger.info(
-            "Processing the next %d rooms of %d remaining",
-            len(rooms_to_work_on),
-            progress["remaining"],
-        )
-
-        # Number of state events we've processed by going through each room
-        processed_event_count = 0
-
-        for room_id, event_count in rooms_to_work_on:
-
-            current_state_ids = yield self.get_current_state_ids(room_id)
-
-            join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
-            history_visibility_id = current_state_ids.get(
-                (EventTypes.RoomHistoryVisibility, "")
-            )
-            encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
-            name_id = current_state_ids.get((EventTypes.Name, ""))
-            topic_id = current_state_ids.get((EventTypes.Topic, ""))
-            avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
-            canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
-
-            event_ids = [
-                join_rules_id,
-                history_visibility_id,
-                encryption_id,
-                name_id,
-                topic_id,
-                avatar_id,
-                canonical_alias_id,
-            ]
-
-            state_events = yield self.get_events(
-                [ev for ev in event_ids if ev is not None]
-            )
-
-            def _get_or_none(event_id, arg):
-                event = state_events.get(event_id)
-                if event:
-                    return event.content.get(arg)
-                return None
-
-            yield self.update_room_state(
-                room_id,
-                {
-                    "join_rules": _get_or_none(join_rules_id, "join_rule"),
-                    "history_visibility": _get_or_none(
-                        history_visibility_id, "history_visibility"
-                    ),
-                    "encryption": _get_or_none(encryption_id, "algorithm"),
-                    "name": _get_or_none(name_id, "name"),
-                    "topic": _get_or_none(topic_id, "topic"),
-                    "avatar": _get_or_none(avatar_id, "url"),
-                    "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
-                },
-            )
+        for room_id in rooms_to_work_on:
+            yield self._calculate_and_set_initial_state_for_room(room_id)
+            progress["last_room_id"] = room_id
 
-            now = self.hs.get_reactor().seconds()
-
-            # quantise time to the nearest bucket
-            now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
-            def _fetch_data(txn):
-
-                # Get the current token of the room
-                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
-                current_state_events = len(current_state_ids)
-
-                membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
-
-                total_state_events = self._get_total_state_event_counts_txn(
-                    txn, room_id
-                )
-
-                self._update_stats_txn(
-                    txn,
-                    "room",
-                    room_id,
-                    now,
-                    {
-                        "bucket_size": self.stats_bucket_size,
-                        "current_state_events": current_state_events,
-                        "joined_members": membership_counts.get(Membership.JOIN, 0),
-                        "invited_members": membership_counts.get(Membership.INVITE, 0),
-                        "left_members": membership_counts.get(Membership.LEAVE, 0),
-                        "banned_members": membership_counts.get(Membership.BAN, 0),
-                        "state_events": total_state_events,
-                    },
-                )
-                self._simple_insert_txn(
-                    txn,
-                    "room_stats_earliest_token",
-                    {"room_id": room_id, "token": current_token},
-                )
-
-                # We've finished a room. Delete it from the table.
-                self._simple_delete_one_txn(
-                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
-                )
-
-            yield self.runInteraction("update_room_stats", _fetch_data)
-
-            # Update the remaining counter.
-            progress["remaining"] -= 1
-            yield self.runInteraction(
-                "populate_stats",
-                self._background_update_progress_txn,
-                "populate_stats_process_rooms",
-                progress,
-            )
-
-            processed_event_count += event_count
-
-            if processed_event_count > batch_size:
-                # Don't process any more rooms, we've hit our batch size.
-                return processed_event_count
+        yield self.runInteraction(
+            "_populate_stats_process_rooms",
+            self._background_update_progress_txn,
+            "populate_stats_process_rooms",
+            progress,
+        )
 
-        return processed_event_count
+        return len(rooms_to_work_on)
 
-    def delete_all_stats(self):
+    def get_stats_positions(self):
         """
-        Delete all statistics records.
+        Returns the stats processor positions.
         """
-
-        def _delete_all_stats_txn(txn):
-            txn.execute("DELETE FROM room_state")
-            txn.execute("DELETE FROM room_stats")
-            txn.execute("DELETE FROM room_stats_earliest_token")
-            txn.execute("DELETE FROM user_stats")
-
-        return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
-
-    def get_stats_stream_pos(self):
         return self._simple_select_one_onecol(
-            table="stats_stream_pos",
+            table="stats_incremental_position",
             keyvalues={},
             retcol="stream_id",
-            desc="stats_stream_pos",
-        )
-
-    def update_stats_stream_pos(self, stream_id):
-        return self._simple_update_one(
-            table="stats_stream_pos",
-            keyvalues={},
-            updatevalues={"stream_id": stream_id},
-            desc="update_stats_stream_pos",
+            desc="stats_incremental_position",
         )
 
     def update_room_state(self, room_id, fields):
@@ -361,42 +216,87 @@ class StatsStore(StateDeltasStore):
                 fields[col] = None
 
         return self._simple_upsert(
-            table="room_state",
+            table="room_stats_state",
             keyvalues={"room_id": room_id},
             values=fields,
             desc="update_room_state",
         )
 
-    def get_deltas_for_room(self, room_id, start, size=100):
+    def get_statistics_for_subject(self, stats_type, stats_id, start, size=100):
         """
-        Get statistics deltas for a given room.
+        Get statistics for a given subject.
 
         Args:
-            room_id (str)
+            stats_type (str): The type of subject
+            stats_id (str): The ID of the subject (e.g. room_id or user_id)
             start (int): Pagination start. Number of entries, not timestamp.
             size (int): How many entries to return.
 
         Returns:
             Deferred[list[dict]], where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS["room"] and "ts".
+            ABSOLUTE_STATS_FIELDS[stats_type],  and "bucket_size" and "end_ts".
         """
-        return self._simple_select_list_paginate(
-            "room_stats",
-            {"room_id": room_id},
-            "ts",
+        return self.runInteraction(
+            "get_statistics_for_subject",
+            self._get_statistics_for_subject_txn,
+            stats_type,
+            stats_id,
+            start,
+            size,
+        )
+
+    def _get_statistics_for_subject_txn(
+        self, txn, stats_type, stats_id, start, size=100
+    ):
+        """
+        Transaction-bound version of L{get_statistics_for_subject}.
+        """
+
+        table, id_col = TYPE_TO_TABLE[stats_type]
+        selected_columns = list(
+            ABSOLUTE_STATS_FIELDS[stats_type] + PER_SLICE_FIELDS[stats_type]
+        )
+
+        slice_list = self._simple_select_list_paginate_txn(
+            txn,
+            table + "_historical",
+            {id_col: stats_id},
+            "end_ts",
             start,
             size,
-            retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
+            retcols=selected_columns + ["bucket_size", "end_ts"],
             order_direction="DESC",
         )
 
-    def get_all_room_state(self):
-        return self._simple_select_list(
-            "room_state", None, retcols=("name", "topic", "canonical_alias")
+        return slice_list
+
+    def get_room_stats_state(self, room_id):
+        """
+        Returns the current room_stats_state for a room.
+
+        Args:
+            room_id (str): The ID of the room to return state for.
+
+        Returns (dict):
+            Dictionary containing these keys:
+                "name", "topic", "canonical_alias", "avatar", "join_rules",
+                "history_visibility"
+        """
+        return self._simple_select_one(
+            "room_stats_state",
+            {"room_id": room_id},
+            retcols=(
+                "name",
+                "topic",
+                "canonical_alias",
+                "avatar",
+                "join_rules",
+                "history_visibility",
+            ),
         )
 
     @cached()
-    def get_earliest_token_for_room_stats(self, room_id):
+    def get_earliest_token_for_stats(self, stats_type, id):
         """
         Fetch the "earliest token". This is used by the room stats delta
         processor to ignore deltas that have been processed between the
@@ -406,79 +306,571 @@ class StatsStore(StateDeltasStore):
         Returns:
             Deferred[int]
         """
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
         return self._simple_select_one_onecol(
-            "room_stats_earliest_token",
-            {"room_id": room_id},
-            retcol="token",
+            "%s_current" % (table,),
+            keyvalues={id_col: id},
+            retcol="completed_delta_stream_id",
             allow_none=True,
         )
 
-    def update_stats(self, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert(
-            table=table,
-            keyvalues={id_col: stats_id, "ts": ts},
-            values=fields,
-            desc="update_stats",
+    def bulk_update_stats_delta(self, ts, updates, stream_id):
+        """Bulk update stats tables for a given stream_id and updates the stats
+        incremental position.
+
+        Args:
+            ts (int): Current timestamp in ms
+            updates(dict[str, dict[str, dict[str, Counter]]]): The updates to
+                commit as a mapping stats_type -> stats_id -> field -> delta.
+            stream_id (int): Current position.
+
+        Returns:
+            Deferred
+        """
+
+        def _bulk_update_stats_delta_txn(txn):
+            for stats_type, stats_updates in updates.items():
+                for stats_id, fields in stats_updates.items():
+                    self._update_stats_delta_txn(
+                        txn,
+                        ts=ts,
+                        stats_type=stats_type,
+                        stats_id=stats_id,
+                        fields=fields,
+                        complete_with_stream_id=stream_id,
+                    )
+
+            self._simple_update_one_txn(
+                txn,
+                table="stats_incremental_position",
+                keyvalues={},
+                updatevalues={"stream_id": stream_id},
+            )
+
+        return self.runInteraction(
+            "bulk_update_stats_delta", _bulk_update_stats_delta_txn
         )
 
-    def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert_txn(
-            txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
+    def update_stats_delta(
+        self,
+        ts,
+        stats_type,
+        stats_id,
+        fields,
+        complete_with_stream_id,
+        absolute_field_overrides=None,
+    ):
+        """
+        Updates the statistics for a subject, with a delta (difference/relative
+        change).
+
+        Args:
+            ts (int): timestamp of the change
+            stats_type (str): "room" or "user" – the kind of subject
+            stats_id (str): the subject's ID (room ID or user ID)
+            fields (dict[str, int]): Deltas of stats values.
+            complete_with_stream_id (int, optional):
+                If supplied, converts an incomplete row into a complete row,
+                with the supplied stream_id marked as the stream_id where the
+                row was completed.
+            absolute_field_overrides (dict[str, int]): Current stats values
+                (i.e. not deltas) of absolute fields.
+                Does not work with per-slice fields.
+        """
+
+        return self.runInteraction(
+            "update_stats_delta",
+            self._update_stats_delta_txn,
+            ts,
+            stats_type,
+            stats_id,
+            fields,
+            complete_with_stream_id=complete_with_stream_id,
+            absolute_field_overrides=absolute_field_overrides,
         )
 
-    def update_stats_delta(self, ts, stats_type, stats_id, field, value):
-        def _update_stats_delta(txn):
-            table, id_col = TYPE_TO_ROOM[stats_type]
-
-            sql = (
-                "SELECT * FROM %s"
-                " WHERE %s=? and ts=("
-                "  SELECT MAX(ts) FROM %s"
-                "  WHERE %s=?"
-                ")"
-            ) % (table, id_col, table, id_col)
-            txn.execute(sql, (stats_id, stats_id))
-            rows = self.cursor_to_dict(txn)
-            if len(rows) == 0:
-                # silently skip as we don't have anything to apply a delta to yet.
-                # this tries to minimise any race between the initial sync and
-                # subsequent deltas arriving.
-                return
-
-            current_ts = ts
-            latest_ts = rows[0]["ts"]
-            if current_ts < latest_ts:
-                # This one is in the past, but we're just encountering it now.
-                # Mark it as part of the current bucket.
-                current_ts = latest_ts
-            elif ts != latest_ts:
-                # we have to copy our absolute counters over to the new entry.
-                values = {
-                    key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
-                }
-                values[id_col] = stats_id
-                values["ts"] = ts
-                values["bucket_size"] = self.stats_bucket_size
-
-                self._simple_insert_txn(txn, table=table, values=values)
-
-            # actually update the new value
-            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
-                self._simple_update_txn(
-                    txn,
-                    table=table,
-                    keyvalues={id_col: stats_id, "ts": current_ts},
-                    updatevalues={field: value},
+    def _update_stats_delta_txn(
+        self,
+        txn,
+        ts,
+        stats_type,
+        stats_id,
+        fields,
+        complete_with_stream_id,
+        absolute_field_overrides=None,
+    ):
+        if absolute_field_overrides is None:
+            absolute_field_overrides = {}
+
+        table, id_col = TYPE_TO_TABLE[stats_type]
+
+        quantised_ts = self.quantise_stats_time(int(ts))
+        end_ts = quantised_ts + self.stats_bucket_size
+
+        # Lets be paranoid and check that all the given field names are known
+        abs_field_names = ABSOLUTE_STATS_FIELDS[stats_type]
+        slice_field_names = PER_SLICE_FIELDS[stats_type]
+        for field in chain(fields.keys(), absolute_field_overrides.keys()):
+            if field not in abs_field_names and field not in slice_field_names:
+                # guard against potential SQL injection dodginess
+                raise ValueError(
+                    "%s is not a recognised field"
+                    " for stats type %s" % (field, stats_type)
                 )
+
+        # Per slice fields do not get added to the _current table
+
+        # This calculates the deltas (`field = field + ?` values)
+        # for absolute fields,
+        # * defaulting to 0 if not specified
+        #     (required for the INSERT part of upserting to work)
+        # * omitting overrides specified in `absolute_field_overrides`
+        deltas_of_absolute_fields = {
+            key: fields.get(key, 0)
+            for key in abs_field_names
+            if key not in absolute_field_overrides
+        }
+
+        # Keep the delta stream ID field up to date
+        absolute_field_overrides = absolute_field_overrides.copy()
+        absolute_field_overrides["completed_delta_stream_id"] = complete_with_stream_id
+
+        # first upsert the `_current` table
+        self._upsert_with_additive_relatives_txn(
+            txn=txn,
+            table=table + "_current",
+            keyvalues={id_col: stats_id},
+            absolutes=absolute_field_overrides,
+            additive_relatives=deltas_of_absolute_fields,
+        )
+
+        per_slice_additive_relatives = {
+            key: fields.get(key, 0) for key in slice_field_names
+        }
+        self._upsert_copy_from_table_with_additive_relatives_txn(
+            txn=txn,
+            into_table=table + "_historical",
+            keyvalues={id_col: stats_id},
+            extra_dst_insvalues={"bucket_size": self.stats_bucket_size},
+            extra_dst_keyvalues={"end_ts": end_ts},
+            additive_relatives=per_slice_additive_relatives,
+            src_table=table + "_current",
+            copy_columns=abs_field_names,
+        )
+
+    def _upsert_with_additive_relatives_txn(
+        self, txn, table, keyvalues, absolutes, additive_relatives
+    ):
+        """Used to update values in the stats tables.
+
+        This is basically a slightly convoluted upsert that *adds* to any
+        existing rows.
+
+        Args:
+            txn
+            table (str): Table name
+            keyvalues (dict[str, any]): Row-identifying key values
+            absolutes (dict[str, any]): Absolute (set) fields
+            additive_relatives (dict[str, int]): Fields that will be added onto
+                if existing row present.
+        """
+        if self.database_engine.can_native_upsert:
+            absolute_updates = [
+                "%(field)s = EXCLUDED.%(field)s" % {"field": field}
+                for field in absolutes.keys()
+            ]
+
+            relative_updates = [
+                "%(field)s = EXCLUDED.%(field)s + %(table)s.%(field)s"
+                % {"table": table, "field": field}
+                for field in additive_relatives.keys()
+            ]
+
+            insert_cols = []
+            qargs = []
+
+            for (key, val) in chain(
+                keyvalues.items(), absolutes.items(), additive_relatives.items()
+            ):
+                insert_cols.append(key)
+                qargs.append(val)
+
+            sql = """
+                INSERT INTO %(table)s (%(insert_cols_cs)s)
+                VALUES (%(insert_vals_qs)s)
+                ON CONFLICT (%(key_columns)s) DO UPDATE SET %(updates)s
+            """ % {
+                "table": table,
+                "insert_cols_cs": ", ".join(insert_cols),
+                "insert_vals_qs": ", ".join(
+                    ["?"] * (len(keyvalues) + len(absolutes) + len(additive_relatives))
+                ),
+                "key_columns": ", ".join(keyvalues),
+                "updates": ", ".join(chain(absolute_updates, relative_updates)),
+            }
+
+            txn.execute(sql, qargs)
+        else:
+            self.database_engine.lock_table(txn, table)
+            retcols = list(chain(absolutes.keys(), additive_relatives.keys()))
+            current_row = self._simple_select_one_txn(
+                txn, table, keyvalues, retcols, allow_none=True
+            )
+            if current_row is None:
+                merged_dict = {**keyvalues, **absolutes, **additive_relatives}
+                self._simple_insert_txn(txn, table, merged_dict)
+            else:
+                for (key, val) in additive_relatives.items():
+                    current_row[key] += val
+                current_row.update(absolutes)
+                self._simple_update_one_txn(txn, table, keyvalues, current_row)
+
+    def _upsert_copy_from_table_with_additive_relatives_txn(
+        self,
+        txn,
+        into_table,
+        keyvalues,
+        extra_dst_keyvalues,
+        extra_dst_insvalues,
+        additive_relatives,
+        src_table,
+        copy_columns,
+    ):
+        """Updates the historic stats table with latest updates.
+
+        This involves copying "absolute" fields from the `_current` table, and
+        adding relative fields to any existing values.
+
+        Args:
+             txn: Transaction
+             into_table (str): The destination table to UPSERT the row into
+             keyvalues (dict[str, any]): Row-identifying key values
+             extra_dst_keyvalues (dict[str, any]): Additional keyvalues
+                for `into_table`.
+             extra_dst_insvalues (dict[str, any]): Additional values to insert
+                on new row creation for `into_table`.
+             additive_relatives (dict[str, any]): Fields that will be added onto
+                if existing row present. (Must be disjoint from copy_columns.)
+             src_table (str): The source table to copy from
+             copy_columns (iterable[str]): The list of columns to copy
+        """
+        if self.database_engine.can_native_upsert:
+            ins_columns = chain(
+                keyvalues,
+                copy_columns,
+                additive_relatives,
+                extra_dst_keyvalues,
+                extra_dst_insvalues,
+            )
+            sel_exprs = chain(
+                keyvalues,
+                copy_columns,
+                (
+                    "?"
+                    for _ in chain(
+                        additive_relatives, extra_dst_keyvalues, extra_dst_insvalues
+                    )
+                ),
+            )
+            keyvalues_where = ("%s = ?" % f for f in keyvalues)
+
+            sets_cc = ("%s = EXCLUDED.%s" % (f, f) for f in copy_columns)
+            sets_ar = (
+                "%s = EXCLUDED.%s + %s.%s" % (f, f, into_table, f)
+                for f in additive_relatives
+            )
+
+            sql = """
+                INSERT INTO %(into_table)s (%(ins_columns)s)
+                SELECT %(sel_exprs)s
+                FROM %(src_table)s
+                WHERE %(keyvalues_where)s
+                ON CONFLICT (%(keyvalues)s)
+                DO UPDATE SET %(sets)s
+            """ % {
+                "into_table": into_table,
+                "ins_columns": ", ".join(ins_columns),
+                "sel_exprs": ", ".join(sel_exprs),
+                "keyvalues_where": " AND ".join(keyvalues_where),
+                "src_table": src_table,
+                "keyvalues": ", ".join(
+                    chain(keyvalues.keys(), extra_dst_keyvalues.keys())
+                ),
+                "sets": ", ".join(chain(sets_cc, sets_ar)),
+            }
+
+            qargs = list(
+                chain(
+                    additive_relatives.values(),
+                    extra_dst_keyvalues.values(),
+                    extra_dst_insvalues.values(),
+                    keyvalues.values(),
+                )
+            )
+            txn.execute(sql, qargs)
+        else:
+            self.database_engine.lock_table(txn, into_table)
+            src_row = self._simple_select_one_txn(
+                txn, src_table, keyvalues, copy_columns
+            )
+            all_dest_keyvalues = {**keyvalues, **extra_dst_keyvalues}
+            dest_current_row = self._simple_select_one_txn(
+                txn,
+                into_table,
+                keyvalues=all_dest_keyvalues,
+                retcols=list(chain(additive_relatives.keys(), copy_columns)),
+                allow_none=True,
+            )
+
+            if dest_current_row is None:
+                merged_dict = {
+                    **keyvalues,
+                    **extra_dst_keyvalues,
+                    **extra_dst_insvalues,
+                    **src_row,
+                    **additive_relatives,
+                }
+                self._simple_insert_txn(txn, into_table, merged_dict)
             else:
-                sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
-                    table,
-                    field,
-                    field,
-                    id_col,
+                for (key, val) in additive_relatives.items():
+                    src_row[key] = dest_current_row[key] + val
+                self._simple_update_txn(txn, into_table, all_dest_keyvalues, src_row)
+
+    def get_changes_room_total_events_and_bytes(self, min_pos, max_pos):
+        """Fetches the counts of events in the given range of stream IDs.
+
+        Args:
+            min_pos (int)
+            max_pos (int)
+
+        Returns:
+            Deferred[dict[str, dict[str, int]]]: Mapping of room ID to field
+            changes.
+        """
+
+        return self.runInteraction(
+            "stats_incremental_total_events_and_bytes",
+            self.get_changes_room_total_events_and_bytes_txn,
+            min_pos,
+            max_pos,
+        )
+
+    def get_changes_room_total_events_and_bytes_txn(self, txn, low_pos, high_pos):
+        """Gets the total_events and total_event_bytes counts for rooms and
+        senders, in a range of stream_orderings (including backfilled events).
+
+        Args:
+            txn
+            low_pos (int): Low stream ordering
+            high_pos (int): High stream ordering
+
+        Returns:
+            tuple[dict[str, dict[str, int]], dict[str, dict[str, int]]]: The
+            room and user deltas for total_events/total_event_bytes in the
+            format of `stats_id` -> fields
+        """
+
+        if low_pos >= high_pos:
+            # nothing to do here.
+            return {}, {}
+
+        if isinstance(self.database_engine, PostgresEngine):
+            new_bytes_expression = "OCTET_LENGTH(json)"
+        else:
+            new_bytes_expression = "LENGTH(CAST(json AS BLOB))"
+
+        sql = """
+            SELECT events.room_id, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+            FROM events INNER JOIN event_json USING (event_id)
+            WHERE (? < stream_ordering AND stream_ordering <= ?)
+                OR (? <= stream_ordering AND stream_ordering <= ?)
+            GROUP BY events.room_id
+        """ % (
+            new_bytes_expression,
+        )
+
+        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
+
+        room_deltas = {
+            room_id: {"total_events": new_events, "total_event_bytes": new_bytes}
+            for room_id, new_events, new_bytes in txn
+        }
+
+        sql = """
+            SELECT events.sender, COUNT(*) AS new_events, SUM(%s) AS new_bytes
+            FROM events INNER JOIN event_json USING (event_id)
+            WHERE (? < stream_ordering AND stream_ordering <= ?)
+                OR (? <= stream_ordering AND stream_ordering <= ?)
+            GROUP BY events.sender
+        """ % (
+            new_bytes_expression,
+        )
+
+        txn.execute(sql, (low_pos, high_pos, -high_pos, -low_pos))
+
+        user_deltas = {
+            user_id: {"total_events": new_events, "total_event_bytes": new_bytes}
+            for user_id, new_events, new_bytes in txn
+            if self.hs.is_mine_id(user_id)
+        }
+
+        return room_deltas, user_deltas
+
+    @defer.inlineCallbacks
+    def _calculate_and_set_initial_state_for_room(self, room_id):
+        """Calculate and insert an entry into room_stats_current.
+
+        Args:
+            room_id (str)
+
+        Returns:
+            Deferred[tuple[dict, dict, int]]: A tuple of room state, membership
+            counts and stream position.
+        """
+
+        def _fetch_current_state_stats(txn):
+            pos = self.get_room_max_stream_ordering()
+
+            rows = self._simple_select_many_txn(
+                txn,
+                table="current_state_events",
+                column="type",
+                iterable=[
+                    EventTypes.Create,
+                    EventTypes.JoinRules,
+                    EventTypes.RoomHistoryVisibility,
+                    EventTypes.Encryption,
+                    EventTypes.Name,
+                    EventTypes.Topic,
+                    EventTypes.RoomAvatar,
+                    EventTypes.CanonicalAlias,
+                ],
+                keyvalues={"room_id": room_id, "state_key": ""},
+                retcols=["event_id"],
+            )
+
+            event_ids = [row["event_id"] for row in rows]
+
+            txn.execute(
+                """
+                    SELECT membership, count(*) FROM current_state_events
+                    WHERE room_id = ? AND type = 'm.room.member'
+                    GROUP BY membership
+                """,
+                (room_id,),
+            )
+            membership_counts = {membership: cnt for membership, cnt in txn}
+
+            txn.execute(
+                """
+                    SELECT COALESCE(count(*), 0) FROM current_state_events
+                    WHERE room_id = ?
+                """,
+                (room_id,),
+            )
+
+            current_state_events_count, = txn.fetchone()
+
+            users_in_room = self.get_users_in_room_txn(txn, room_id)
+
+            return (
+                event_ids,
+                membership_counts,
+                current_state_events_count,
+                users_in_room,
+                pos,
+            )
+
+        (
+            event_ids,
+            membership_counts,
+            current_state_events_count,
+            users_in_room,
+            pos,
+        ) = yield self.runInteraction(
+            "get_initial_state_for_room", _fetch_current_state_stats
+        )
+
+        state_event_map = yield self.get_events(event_ids, get_prev_content=False)
+
+        room_state = {
+            "join_rules": None,
+            "history_visibility": None,
+            "encryption": None,
+            "name": None,
+            "topic": None,
+            "avatar": None,
+            "canonical_alias": None,
+            "is_federatable": True,
+        }
+
+        for event in state_event_map.values():
+            if event.type == EventTypes.JoinRules:
+                room_state["join_rules"] = event.content.get("join_rule")
+            elif event.type == EventTypes.RoomHistoryVisibility:
+                room_state["history_visibility"] = event.content.get(
+                    "history_visibility"
                 )
-                txn.execute(sql, (value, stats_id, current_ts))
+            elif event.type == EventTypes.Encryption:
+                room_state["encryption"] = event.content.get("algorithm")
+            elif event.type == EventTypes.Name:
+                room_state["name"] = event.content.get("name")
+            elif event.type == EventTypes.Topic:
+                room_state["topic"] = event.content.get("topic")
+            elif event.type == EventTypes.RoomAvatar:
+                room_state["avatar"] = event.content.get("url")
+            elif event.type == EventTypes.CanonicalAlias:
+                room_state["canonical_alias"] = event.content.get("alias")
+            elif event.type == EventTypes.Create:
+                room_state["is_federatable"] = event.content.get("m.federate", True)
+
+        yield self.update_room_state(room_id, room_state)
+
+        local_users_in_room = [u for u in users_in_room if self.hs.is_mine_id(u)]
+
+        yield self.update_stats_delta(
+            ts=self.clock.time_msec(),
+            stats_type="room",
+            stats_id=room_id,
+            fields={},
+            complete_with_stream_id=pos,
+            absolute_field_overrides={
+                "current_state_events": current_state_events_count,
+                "joined_members": membership_counts.get(Membership.JOIN, 0),
+                "invited_members": membership_counts.get(Membership.INVITE, 0),
+                "left_members": membership_counts.get(Membership.LEAVE, 0),
+                "banned_members": membership_counts.get(Membership.BAN, 0),
+                "local_users_in_room": len(local_users_in_room),
+            },
+        )
+
+    @defer.inlineCallbacks
+    def _calculate_and_set_initial_state_for_user(self, user_id):
+        def _calculate_and_set_initial_state_for_user_txn(txn):
+            pos = self._get_max_stream_id_in_current_state_deltas_txn(txn)
 
-        return self.runInteraction("update_stats_delta", _update_stats_delta)
+            txn.execute(
+                """
+                SELECT COUNT(distinct room_id) FROM current_state_events
+                    WHERE type = 'm.room.member' AND state_key = ?
+                        AND membership = 'join'
+                """,
+                (user_id,),
+            )
+            count, = txn.fetchone()
+            return count, pos
+
+        joined_rooms, pos = yield self.runInteraction(
+            "calculate_and_set_initial_state_for_user",
+            _calculate_and_set_initial_state_for_user_txn,
+        )
+
+        yield self.update_stats_delta(
+            ts=self.clock.time_msec(),
+            stats_type="user",
+            stats_id=user_id,
+            fields={},
+            complete_with_stream_id=pos,
+            absolute_field_overrides={"joined_rooms": joined_rooms},
+        )