summary refs log tree commit diff
path: root/synapse
diff options
context:
space:
mode:
authorOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-19 15:18:46 +0100
committerOlivier Wilkinson (reivilibre) <olivier@librepush.net>2019-08-19 15:27:25 +0100
commit35098d5b041bcf699029488c9c12cf618ef68082 (patch)
tree3b09029cfc858835dd6a61ea560603ffa65cf153 /synapse
parentNewsfile (diff)
downloadsynapse-35098d5b041bcf699029488c9c12cf618ef68082.tar.xz
Tear out current room & user statistics.
Signed-off-by: Olivier Wilkinson (reivilibre) <olivier@librepush.net>
Diffstat (limited to 'synapse')
-rw-r--r--synapse/handlers/stats.py280
-rw-r--r--synapse/storage/schema/delta/56/stats_separated1.sql33
-rw-r--r--synapse/storage/stats.py412
3 files changed, 40 insertions, 685 deletions
diff --git a/synapse/handlers/stats.py b/synapse/handlers/stats.py
index 4449da6669..7b1d1b4203 100644
--- a/synapse/handlers/stats.py
+++ b/synapse/handlers/stats.py
@@ -15,14 +15,7 @@
 
 import logging
 
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, JoinRules, Membership
 from synapse.handlers.state_deltas import StateDeltasHandler
-from synapse.metrics import event_processing_positions
-from synapse.metrics.background_process_metrics import run_as_background_process
-from synapse.types import UserID
-from synapse.util.metrics import Measure
 
 logger = logging.getLogger(__name__)
 
@@ -49,9 +42,6 @@ class StatsHandler(StateDeltasHandler):
         # The current position in the current_state_delta stream
         self.pos = None
 
-        # Guard to ensure we only process deltas one at a time
-        self._is_processing = False
-
         if hs.config.stats_enabled:
             self.notifier.add_replication_callback(self.notify_new_event)
 
@@ -62,272 +52,4 @@ class StatsHandler(StateDeltasHandler):
     def notify_new_event(self):
         """Called when there may be more deltas to process
         """
-        if not self.hs.config.stats_enabled:
-            return
-
-        if self._is_processing:
-            return
-
-        @defer.inlineCallbacks
-        def process():
-            try:
-                yield self._unsafe_process()
-            finally:
-                self._is_processing = False
-
-        self._is_processing = True
-        run_as_background_process("stats.notify_new_event", process)
-
-    @defer.inlineCallbacks
-    def _unsafe_process(self):
-        # If self.pos is None then means we haven't fetched it from DB
-        if self.pos is None:
-            self.pos = yield self.store.get_stats_stream_pos()
-
-        # If still None then the initial background update hasn't happened yet
-        if self.pos is None:
-            return None
-
-        # Loop round handling deltas until we're up to date
-        while True:
-            with Measure(self.clock, "stats_delta"):
-                deltas = yield self.store.get_current_state_deltas(self.pos)
-                if not deltas:
-                    return
-
-                logger.info("Handling %d state deltas", len(deltas))
-                yield self._handle_deltas(deltas)
-
-                self.pos = deltas[-1]["stream_id"]
-                yield self.store.update_stats_stream_pos(self.pos)
-
-                event_processing_positions.labels("stats").set(self.pos)
-
-    @defer.inlineCallbacks
-    def _handle_deltas(self, deltas):
-        """
-        Called with the state deltas to process
-        """
-        for delta in deltas:
-            typ = delta["type"]
-            state_key = delta["state_key"]
-            room_id = delta["room_id"]
-            event_id = delta["event_id"]
-            stream_id = delta["stream_id"]
-            prev_event_id = delta["prev_event_id"]
-            stream_pos = delta["stream_id"]
-
-            logger.debug("Handling: %r %r, %s", typ, state_key, event_id)
-
-            token = yield self.store.get_earliest_token_for_room_stats(room_id)
-
-            # If the earliest token to begin from is larger than our current
-            # stream ID, skip processing this delta.
-            if token is not None and token >= stream_id:
-                logger.debug(
-                    "Ignoring: %s as earlier than this room's initial ingestion event",
-                    event_id,
-                )
-                continue
-
-            if event_id is None and prev_event_id is None:
-                # Errr...
-                continue
-
-            event_content = {}
-
-            if event_id is not None:
-                event = yield self.store.get_event(event_id, allow_none=True)
-                if event:
-                    event_content = event.content or {}
-
-            # We use stream_pos here rather than fetch by event_id as event_id
-            # may be None
-            now = yield self.store.get_received_ts_by_stream_pos(stream_pos)
-
-            # quantise time to the nearest bucket
-            now = (now // 1000 // self.stats_bucket_size) * self.stats_bucket_size
-
-            if typ == EventTypes.Member:
-                # we could use _get_key_change here but it's a bit inefficient
-                # given we're not testing for a specific result; might as well
-                # just grab the prev_membership and membership strings and
-                # compare them.
-                prev_event_content = {}
-                if prev_event_id is not None:
-                    prev_event = yield self.store.get_event(
-                        prev_event_id, allow_none=True
-                    )
-                    if prev_event:
-                        prev_event_content = prev_event.content
-
-                membership = event_content.get("membership", Membership.LEAVE)
-                prev_membership = prev_event_content.get("membership", Membership.LEAVE)
-
-                if prev_membership == membership:
-                    continue
-
-                if prev_membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", -1
-                    )
-                elif prev_membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", -1
-                    )
-                elif prev_membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", -1
-                    )
-                elif prev_membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", -1
-                    )
-                else:
-                    err = "%s is not a valid prev_membership" % (repr(prev_membership),)
-                    logger.error(err)
-                    raise ValueError(err)
-
-                if membership == Membership.JOIN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "joined_members", +1
-                    )
-                elif membership == Membership.INVITE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "invited_members", +1
-                    )
-                elif membership == Membership.LEAVE:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "left_members", +1
-                    )
-                elif membership == Membership.BAN:
-                    yield self.store.update_stats_delta(
-                        now, "room", room_id, "banned_members", +1
-                    )
-                else:
-                    err = "%s is not a valid membership" % (repr(membership),)
-                    logger.error(err)
-                    raise ValueError(err)
-
-                user_id = state_key
-                if self.is_mine_id(user_id):
-                    # update user_stats as it's one of our users
-                    public = yield self._is_public_room(room_id)
-
-                    if membership == Membership.LEAVE:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            -1,
-                        )
-                    elif membership == Membership.JOIN:
-                        yield self.store.update_stats_delta(
-                            now,
-                            "user",
-                            user_id,
-                            "public_rooms" if public else "private_rooms",
-                            +1,
-                        )
-
-            elif typ == EventTypes.Create:
-                # Newly created room. Add it with all blank portions.
-                yield self.store.update_room_state(
-                    room_id,
-                    {
-                        "join_rules": None,
-                        "history_visibility": None,
-                        "encryption": None,
-                        "name": None,
-                        "topic": None,
-                        "avatar": None,
-                        "canonical_alias": None,
-                    },
-                )
-
-            elif typ == EventTypes.JoinRules:
-                yield self.store.update_room_state(
-                    room_id, {"join_rules": event_content.get("join_rule")}
-                )
-
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "join_rule", JoinRules.PUBLIC
-                )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
-
-            elif typ == EventTypes.RoomHistoryVisibility:
-                yield self.store.update_room_state(
-                    room_id,
-                    {"history_visibility": event_content.get("history_visibility")},
-                )
-
-                is_public = yield self._get_key_change(
-                    prev_event_id, event_id, "history_visibility", "world_readable"
-                )
-                if is_public is not None:
-                    yield self.update_public_room_stats(now, room_id, is_public)
-
-            elif typ == EventTypes.Encryption:
-                yield self.store.update_room_state(
-                    room_id, {"encryption": event_content.get("algorithm")}
-                )
-            elif typ == EventTypes.Name:
-                yield self.store.update_room_state(
-                    room_id, {"name": event_content.get("name")}
-                )
-            elif typ == EventTypes.Topic:
-                yield self.store.update_room_state(
-                    room_id, {"topic": event_content.get("topic")}
-                )
-            elif typ == EventTypes.RoomAvatar:
-                yield self.store.update_room_state(
-                    room_id, {"avatar": event_content.get("url")}
-                )
-            elif typ == EventTypes.CanonicalAlias:
-                yield self.store.update_room_state(
-                    room_id, {"canonical_alias": event_content.get("alias")}
-                )
-
-    @defer.inlineCallbacks
-    def update_public_room_stats(self, ts, room_id, is_public):
-        """
-        Increment/decrement a user's number of public rooms when a room they are
-        in changes to/from public visibility.
-
-        Args:
-            ts (int): Timestamp in seconds
-            room_id (str)
-            is_public (bool)
-        """
-        # For now, blindly iterate over all local users in the room so that
-        # we can handle the whole problem of copying buckets over as needed
-        user_ids = yield self.store.get_users_in_room(room_id)
-
-        for user_id in user_ids:
-            if self.hs.is_mine(UserID.from_string(user_id)):
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "public_rooms", +1 if is_public else -1
-                )
-                yield self.store.update_stats_delta(
-                    ts, "user", user_id, "private_rooms", -1 if is_public else +1
-                )
-
-    @defer.inlineCallbacks
-    def _is_public_room(self, room_id):
-        join_rules = yield self.state.get_current_state(room_id, EventTypes.JoinRules)
-        history_visibility = yield self.state.get_current_state(
-            room_id, EventTypes.RoomHistoryVisibility
-        )
-
-        if (join_rules and join_rules.content.get("join_rule") == JoinRules.PUBLIC) or (
-            (
-                history_visibility
-                and history_visibility.content.get("history_visibility")
-                == "world_readable"
-            )
-        ):
-            return True
-        else:
-            return False
+        pass
diff --git a/synapse/storage/schema/delta/56/stats_separated1.sql b/synapse/storage/schema/delta/56/stats_separated1.sql
new file mode 100644
index 0000000000..5b125d17b0
--- /dev/null
+++ b/synapse/storage/schema/delta/56/stats_separated1.sql
@@ -0,0 +1,33 @@
+/* Copyright 2018 New Vector Ltd
+ * Copyright 2019 The Matrix.org Foundation C.I.C.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+----- First clean up from previous versions of room stats.
+
+-- First remove old stats stuff
+DROP TABLE IF EXISTS room_stats;
+DROP TABLE IF EXISTS user_stats;
+DROP TABLE IF EXISTS room_stats_earliest_tokens;
+DROP TABLE IF EXISTS _temp_populate_stats_position;
+DROP TABLE IF EXISTS _temp_populate_stats_rooms;
+DROP TABLE IF EXISTS stats_stream_pos;
+
+-- Unschedule old background updates if they're still scheduled
+DELETE FROM background_updates WHERE update_name IN (
+    'populate_stats_createtables',
+    'populate_stats_process_rooms',
+    'populate_stats_cleanup'
+);
diff --git a/synapse/storage/stats.py b/synapse/storage/stats.py
index e13efed417..38a7fa71d9 100644
--- a/synapse/storage/stats.py
+++ b/synapse/storage/stats.py
@@ -15,12 +15,7 @@
 
 import logging
 
-from twisted.internet import defer
-
-from synapse.api.constants import EventTypes, Membership
-from synapse.storage.prepare_database import get_statements
 from synapse.storage.state_deltas import StateDeltasStore
-from synapse.util.caches.descriptors import cached
 
 logger = logging.getLogger(__name__)
 
@@ -39,8 +34,6 @@ ABSOLUTE_STATS_FIELDS = {
 
 TYPE_TO_ROOM = {"room": ("room_stats", "room_id"), "user": ("user_stats", "user_id")}
 
-TEMP_TABLE = "_temp_populate_stats"
-
 
 class StatsStore(StateDeltasStore):
     def __init__(self, db_conn, hs):
@@ -51,291 +44,14 @@ class StatsStore(StateDeltasStore):
         self.stats_enabled = hs.config.stats_enabled
         self.stats_bucket_size = hs.config.stats_bucket_size
 
-        self.register_background_update_handler(
-            "populate_stats_createtables", self._populate_stats_createtables
-        )
-        self.register_background_update_handler(
-            "populate_stats_process_rooms", self._populate_stats_process_rooms
+        self.register_noop_background_update(
+            "populate_stats_createtables"
         )
-        self.register_background_update_handler(
-            "populate_stats_cleanup", self._populate_stats_cleanup
-        )
-
-    @defer.inlineCallbacks
-    def _populate_stats_createtables(self, progress, batch_size):
-
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_createtables")
-            return 1
-
-        # Get all the rooms that we want to process.
-        def _make_staging_area(txn):
-            # Create the temporary tables
-            stmts = get_statements(
-                """
-                -- We just recreate the table, we'll be reinserting the
-                -- correct entries again later anyway.
-                DROP TABLE IF EXISTS {temp}_rooms;
-
-                CREATE TABLE IF NOT EXISTS {temp}_rooms(
-                    room_id TEXT NOT NULL,
-                    events BIGINT NOT NULL
-                );
-
-                CREATE INDEX {temp}_rooms_events
-                    ON {temp}_rooms(events);
-                CREATE INDEX {temp}_rooms_id
-                    ON {temp}_rooms(room_id);
-            """.format(
-                    temp=TEMP_TABLE
-                ).splitlines()
-            )
-
-            for statement in stmts:
-                txn.execute(statement)
-
-            sql = (
-                "CREATE TABLE IF NOT EXISTS "
-                + TEMP_TABLE
-                + "_position(position TEXT NOT NULL)"
-            )
-            txn.execute(sql)
-
-            # Get rooms we want to process from the database, only adding
-            # those that we haven't (i.e. those not in room_stats_earliest_token)
-            sql = """
-                INSERT INTO %s_rooms (room_id, events)
-                SELECT c.room_id, count(*) FROM current_state_events AS c
-                LEFT JOIN room_stats_earliest_token AS t USING (room_id)
-                WHERE t.room_id IS NULL
-                GROUP BY c.room_id
-            """ % (
-                TEMP_TABLE,
-            )
-            txn.execute(sql)
-
-        new_pos = yield self.get_max_stream_id_in_current_state_deltas()
-        yield self.runInteraction("populate_stats_temp_build", _make_staging_area)
-        yield self._simple_insert(TEMP_TABLE + "_position", {"position": new_pos})
-        self.get_earliest_token_for_room_stats.invalidate_all()
-
-        yield self._end_background_update("populate_stats_createtables")
-        return 1
-
-    @defer.inlineCallbacks
-    def _populate_stats_cleanup(self, progress, batch_size):
-        """
-        Update the user directory stream position, then clean up the old tables.
-        """
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_cleanup")
-            return 1
-
-        position = yield self._simple_select_one_onecol(
-            TEMP_TABLE + "_position", None, "position"
-        )
-        yield self.update_stats_stream_pos(position)
-
-        def _delete_staging_area(txn):
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_rooms")
-            txn.execute("DROP TABLE IF EXISTS " + TEMP_TABLE + "_position")
-
-        yield self.runInteraction("populate_stats_cleanup", _delete_staging_area)
-
-        yield self._end_background_update("populate_stats_cleanup")
-        return 1
-
-    @defer.inlineCallbacks
-    def _populate_stats_process_rooms(self, progress, batch_size):
-
-        if not self.stats_enabled:
-            yield self._end_background_update("populate_stats_process_rooms")
-            return 1
-
-        # If we don't have progress filed, delete everything.
-        if not progress:
-            yield self.delete_all_stats()
-
-        def _get_next_batch(txn):
-            # Only fetch 250 rooms, so we don't fetch too many at once, even
-            # if those 250 rooms have less than batch_size state events.
-            sql = """
-                SELECT room_id, events FROM %s_rooms
-                ORDER BY events DESC
-                LIMIT 250
-            """ % (
-                TEMP_TABLE,
-            )
-            txn.execute(sql)
-            rooms_to_work_on = txn.fetchall()
-
-            if not rooms_to_work_on:
-                return None
-
-            # Get how many are left to process, so we can give status on how
-            # far we are in processing
-            txn.execute("SELECT COUNT(*) FROM " + TEMP_TABLE + "_rooms")
-            progress["remaining"] = txn.fetchone()[0]
-
-            return rooms_to_work_on
-
-        rooms_to_work_on = yield self.runInteraction(
-            "populate_stats_temp_read", _get_next_batch
-        )
-
-        # No more rooms -- complete the transaction.
-        if not rooms_to_work_on:
-            yield self._end_background_update("populate_stats_process_rooms")
-            return 1
-
-        logger.info(
-            "Processing the next %d rooms of %d remaining",
-            len(rooms_to_work_on),
-            progress["remaining"],
-        )
-
-        # Number of state events we've processed by going through each room
-        processed_event_count = 0
-
-        for room_id, event_count in rooms_to_work_on:
-
-            current_state_ids = yield self.get_current_state_ids(room_id)
-
-            join_rules_id = current_state_ids.get((EventTypes.JoinRules, ""))
-            history_visibility_id = current_state_ids.get(
-                (EventTypes.RoomHistoryVisibility, "")
-            )
-            encryption_id = current_state_ids.get((EventTypes.RoomEncryption, ""))
-            name_id = current_state_ids.get((EventTypes.Name, ""))
-            topic_id = current_state_ids.get((EventTypes.Topic, ""))
-            avatar_id = current_state_ids.get((EventTypes.RoomAvatar, ""))
-            canonical_alias_id = current_state_ids.get((EventTypes.CanonicalAlias, ""))
-
-            event_ids = [
-                join_rules_id,
-                history_visibility_id,
-                encryption_id,
-                name_id,
-                topic_id,
-                avatar_id,
-                canonical_alias_id,
-            ]
-
-            state_events = yield self.get_events(
-                [ev for ev in event_ids if ev is not None]
-            )
-
-            def _get_or_none(event_id, arg):
-                event = state_events.get(event_id)
-                if event:
-                    return event.content.get(arg)
-                return None
-
-            yield self.update_room_state(
-                room_id,
-                {
-                    "join_rules": _get_or_none(join_rules_id, "join_rule"),
-                    "history_visibility": _get_or_none(
-                        history_visibility_id, "history_visibility"
-                    ),
-                    "encryption": _get_or_none(encryption_id, "algorithm"),
-                    "name": _get_or_none(name_id, "name"),
-                    "topic": _get_or_none(topic_id, "topic"),
-                    "avatar": _get_or_none(avatar_id, "url"),
-                    "canonical_alias": _get_or_none(canonical_alias_id, "alias"),
-                },
-            )
-
-            now = self.hs.get_reactor().seconds()
-
-            # quantise time to the nearest bucket
-            now = (now // self.stats_bucket_size) * self.stats_bucket_size
-
-            def _fetch_data(txn):
-
-                # Get the current token of the room
-                current_token = self._get_max_stream_id_in_current_state_deltas_txn(txn)
-
-                current_state_events = len(current_state_ids)
-
-                membership_counts = self._get_user_counts_in_room_txn(txn, room_id)
-
-                total_state_events = self._get_total_state_event_counts_txn(
-                    txn, room_id
-                )
-
-                self._update_stats_txn(
-                    txn,
-                    "room",
-                    room_id,
-                    now,
-                    {
-                        "bucket_size": self.stats_bucket_size,
-                        "current_state_events": current_state_events,
-                        "joined_members": membership_counts.get(Membership.JOIN, 0),
-                        "invited_members": membership_counts.get(Membership.INVITE, 0),
-                        "left_members": membership_counts.get(Membership.LEAVE, 0),
-                        "banned_members": membership_counts.get(Membership.BAN, 0),
-                        "state_events": total_state_events,
-                    },
-                )
-                self._simple_insert_txn(
-                    txn,
-                    "room_stats_earliest_token",
-                    {"room_id": room_id, "token": current_token},
-                )
-
-                # We've finished a room. Delete it from the table.
-                self._simple_delete_one_txn(
-                    txn, TEMP_TABLE + "_rooms", {"room_id": room_id}
-                )
-
-            yield self.runInteraction("update_room_stats", _fetch_data)
-
-            # Update the remaining counter.
-            progress["remaining"] -= 1
-            yield self.runInteraction(
-                "populate_stats",
-                self._background_update_progress_txn,
-                "populate_stats_process_rooms",
-                progress,
-            )
-
-            processed_event_count += event_count
-
-            if processed_event_count > batch_size:
-                # Don't process any more rooms, we've hit our batch size.
-                return processed_event_count
-
-        return processed_event_count
-
-    def delete_all_stats(self):
-        """
-        Delete all statistics records.
-        """
-
-        def _delete_all_stats_txn(txn):
-            txn.execute("DELETE FROM room_state")
-            txn.execute("DELETE FROM room_stats")
-            txn.execute("DELETE FROM room_stats_earliest_token")
-            txn.execute("DELETE FROM user_stats")
-
-        return self.runInteraction("delete_all_stats", _delete_all_stats_txn)
-
-    def get_stats_stream_pos(self):
-        return self._simple_select_one_onecol(
-            table="stats_stream_pos",
-            keyvalues={},
-            retcol="stream_id",
-            desc="stats_stream_pos",
+        self.register_noop_background_update(
+            "populate_stats_process_rooms"
         )
-
-    def update_stats_stream_pos(self, stream_id):
-        return self._simple_update_one(
-            table="stats_stream_pos",
-            keyvalues={},
-            updatevalues={"stream_id": stream_id},
-            desc="update_stats_stream_pos",
+        self.register_noop_background_update(
+            "populate_stats_cleanup"
         )
 
     def update_room_state(self, room_id, fields):
@@ -366,119 +82,3 @@ class StatsStore(StateDeltasStore):
             values=fields,
             desc="update_room_state",
         )
-
-    def get_deltas_for_room(self, room_id, start, size=100):
-        """
-        Get statistics deltas for a given room.
-
-        Args:
-            room_id (str)
-            start (int): Pagination start. Number of entries, not timestamp.
-            size (int): How many entries to return.
-
-        Returns:
-            Deferred[list[dict]], where the dict has the keys of
-            ABSOLUTE_STATS_FIELDS["room"] and "ts".
-        """
-        return self._simple_select_list_paginate(
-            "room_stats",
-            {"room_id": room_id},
-            "ts",
-            start,
-            size,
-            retcols=(list(ABSOLUTE_STATS_FIELDS["room"]) + ["ts"]),
-            order_direction="DESC",
-        )
-
-    def get_all_room_state(self):
-        return self._simple_select_list(
-            "room_state", None, retcols=("name", "topic", "canonical_alias")
-        )
-
-    @cached()
-    def get_earliest_token_for_room_stats(self, room_id):
-        """
-        Fetch the "earliest token". This is used by the room stats delta
-        processor to ignore deltas that have been processed between the
-        start of the background task and any particular room's stats
-        being calculated.
-
-        Returns:
-            Deferred[int]
-        """
-        return self._simple_select_one_onecol(
-            "room_stats_earliest_token",
-            {"room_id": room_id},
-            retcol="token",
-            allow_none=True,
-        )
-
-    def update_stats(self, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert(
-            table=table,
-            keyvalues={id_col: stats_id, "ts": ts},
-            values=fields,
-            desc="update_stats",
-        )
-
-    def _update_stats_txn(self, txn, stats_type, stats_id, ts, fields):
-        table, id_col = TYPE_TO_ROOM[stats_type]
-        return self._simple_upsert_txn(
-            txn, table=table, keyvalues={id_col: stats_id, "ts": ts}, values=fields
-        )
-
-    def update_stats_delta(self, ts, stats_type, stats_id, field, value):
-        def _update_stats_delta(txn):
-            table, id_col = TYPE_TO_ROOM[stats_type]
-
-            sql = (
-                "SELECT * FROM %s"
-                " WHERE %s=? and ts=("
-                "  SELECT MAX(ts) FROM %s"
-                "  WHERE %s=?"
-                ")"
-            ) % (table, id_col, table, id_col)
-            txn.execute(sql, (stats_id, stats_id))
-            rows = self.cursor_to_dict(txn)
-            if len(rows) == 0:
-                # silently skip as we don't have anything to apply a delta to yet.
-                # this tries to minimise any race between the initial sync and
-                # subsequent deltas arriving.
-                return
-
-            current_ts = ts
-            latest_ts = rows[0]["ts"]
-            if current_ts < latest_ts:
-                # This one is in the past, but we're just encountering it now.
-                # Mark it as part of the current bucket.
-                current_ts = latest_ts
-            elif ts != latest_ts:
-                # we have to copy our absolute counters over to the new entry.
-                values = {
-                    key: rows[0][key] for key in ABSOLUTE_STATS_FIELDS[stats_type]
-                }
-                values[id_col] = stats_id
-                values["ts"] = ts
-                values["bucket_size"] = self.stats_bucket_size
-
-                self._simple_insert_txn(txn, table=table, values=values)
-
-            # actually update the new value
-            if stats_type in ABSOLUTE_STATS_FIELDS[stats_type]:
-                self._simple_update_txn(
-                    txn,
-                    table=table,
-                    keyvalues={id_col: stats_id, "ts": current_ts},
-                    updatevalues={field: value},
-                )
-            else:
-                sql = ("UPDATE %s SET %s=%s+? WHERE %s=? AND ts=?") % (
-                    table,
-                    field,
-                    field,
-                    id_col,
-                )
-                txn.execute(sql, (value, stats_id, current_ts))
-
-        return self.runInteraction("update_stats_delta", _update_stats_delta)